wasmcloud_host/nats/
mod.rs1use std::{sync::Arc, time::Duration};
4
5use anyhow::{bail, Context as _};
6use async_nats::jetstream::kv::Store;
7use nkeys::KeyPair;
8use tracing::{info, instrument};
9
10use crate::workload_identity::{
11 setup_workload_identity_nats_connect_options, WorkloadIdentityConfig,
12};
13
14pub mod builder;
16
17pub mod ctl;
19
20pub mod event;
23
24pub mod policy;
26
27pub mod provider;
30
31pub mod secrets;
34
35pub mod store;
38
39pub async fn connect_nats(
51 addr: impl async_nats::ToServerAddrs,
52 jwt: Option<&String>,
53 key: Option<Arc<KeyPair>>,
54 require_tls: bool,
55 request_timeout: Option<Duration>,
56 workload_identity_config: Option<WorkloadIdentityConfig>,
57) -> anyhow::Result<async_nats::Client> {
58 let opts = match (jwt, key, workload_identity_config) {
59 (Some(jwt), Some(key), None) => {
60 async_nats::ConnectOptions::with_jwt(jwt.to_string(), move |nonce| {
61 let key = key.clone();
62 async move { key.sign(&nonce).map_err(async_nats::AuthError::new) }
63 })
64 .name("wasmbus")
65 }
66 (Some(_), None, _) | (None, Some(_), _) => {
67 bail!("cannot authenticate if only one of jwt or seed is specified")
68 }
69 (jwt, key, Some(wid_cfg)) => {
70 setup_workload_identity_nats_connect_options(jwt, key, wid_cfg).await?
71 }
72 _ => async_nats::ConnectOptions::new().name("wasmbus"),
73 };
74 let opts = if let Some(timeout) = request_timeout {
75 opts.request_timeout(Some(timeout))
76 } else {
77 opts
78 };
79 let opts = opts.require_tls(require_tls);
80 opts.connect(addr)
81 .await
82 .context("failed to connect to NATS")
83}
84
85#[instrument(level = "debug", skip_all)]
86pub(crate) async fn create_bucket(
87 jetstream: &async_nats::jetstream::Context,
88 bucket: &str,
89) -> anyhow::Result<Store> {
90 if let Ok(store) = jetstream.get_key_value(bucket).await {
92 info!(%bucket, "bucket already exists. Skipping creation.");
93 return Ok(store);
94 }
95
96 match jetstream
97 .create_key_value(async_nats::jetstream::kv::Config {
98 bucket: bucket.to_string(),
99 ..Default::default()
100 })
101 .await
102 {
103 Ok(store) => {
104 info!(%bucket, "created bucket with 1 replica");
105 Ok(store)
106 }
107 Err(err) => {
108 Err(anyhow::anyhow!(err).context(format!("failed to create bucket '{bucket}'")))
109 }
110 }
111}