wasmcloud_host/nats/
builder.rs

1//! An opinionated [crate::wasmbus::HostBuilder] that uses NATS as the primary transport.
2
3use std::{
4    collections::{BTreeMap, HashMap},
5    sync::Arc,
6    time::Duration,
7};
8
9use anyhow::{ensure, Context as _};
10use async_nats::{jetstream::kv::Store, Client};
11use nkeys::KeyPair;
12use serde::Deserialize;
13use serde_json::json;
14use tracing::{debug, error, instrument};
15use wasmcloud_control_interface::RegistryCredential;
16use wasmcloud_core::RegistryConfig;
17
18use crate::{
19    event::EventPublisher,
20    nats::{event::NatsEventPublisher, policy::NatsPolicyManager, secrets::NatsSecretsManager},
21    oci,
22    registry::{merge_registry_config, RegistryCredentialExt as _, SupplementalConfig},
23    secrets::SecretsManager,
24    store::StoreManager,
25    wasmbus::{config::BundleGenerator, HostBuilder},
26    PolicyHostInfo, PolicyManager, WasmbusHostConfig,
27};
28
29const DEFAULT_CTL_TOPIC_PREFIX: &str = "wasmbus.ctl";
30
31use super::{create_bucket, ctl::NatsControlInterfaceServer};
32
33/// Opinionated [crate::wasmbus::HostBuilder] that uses NATS as the primary transport and implementations
34/// for the [crate::wasmbus::Host] extension traits.
35///
36/// This builder is used to create a [crate::wasmbus::HostBuilder] and a [NatsControlInterfaceServer] for
37/// listening for control messages on the NATS message bus. Incoming messages will use the
38/// [crate::wasmbus::ctl::ControlInterfaceServer] trait to handle the messages and
39/// send them to the host.
40pub struct NatsHostBuilder {
41    // Required fields
42    ctl_nats: Client,
43    ctl_topic_prefix: String,
44    lattice: String,
45    config_generator: BundleGenerator,
46    registry_config: HashMap<String, RegistryConfig>,
47    enable_component_auction: bool,
48    enable_provider_auction: bool,
49
50    // Trait implementations for NATS
51    config_store: Arc<dyn StoreManager>,
52    data_store: Store,
53    policy_manager: Option<Arc<dyn PolicyManager>>,
54    secrets_manager: Option<Arc<dyn SecretsManager>>,
55    event_publisher: Option<Arc<dyn EventPublisher>>,
56}
57
58impl NatsHostBuilder {
59    /// Initialize the host with the NATS control interface connection
60    #[allow(clippy::too_many_arguments)]
61    pub async fn new(
62        ctl_nats: Client,
63        ctl_topic_prefix: Option<String>,
64        lattice: String,
65        js_domain: Option<String>,
66        oci_opts: Option<oci::Config>,
67        labels: BTreeMap<String, String>,
68        config_service_enabled: bool,
69        enable_component_auction: bool,
70        enable_provider_auction: bool,
71    ) -> anyhow::Result<Self> {
72        let ctl_jetstream = if let Some(domain) = js_domain.as_ref() {
73            async_nats::jetstream::with_domain(ctl_nats.clone(), domain)
74        } else {
75            async_nats::jetstream::new(ctl_nats.clone())
76        };
77        let bucket = format!("LATTICEDATA_{lattice}");
78        let data_store = create_bucket(&ctl_jetstream, &bucket).await?;
79
80        let config_bucket = format!("CONFIGDATA_{lattice}");
81        let config_data = create_bucket(&ctl_jetstream, &config_bucket).await?;
82
83        let supplemental_config = if config_service_enabled {
84            load_supplemental_config(&ctl_nats, &lattice, &labels).await?
85        } else {
86            SupplementalConfig::default()
87        };
88
89        let mut registry_config = supplemental_config.registry_config.unwrap_or_default();
90        if let Some(oci_opts) = oci_opts {
91            debug!("supplementing OCI config with OCI options");
92            merge_registry_config(&mut registry_config, oci_opts).await;
93        }
94
95        let config_generator = BundleGenerator::new(Arc::new(config_data.clone()));
96
97        Ok(Self {
98            ctl_nats,
99            ctl_topic_prefix: ctl_topic_prefix
100                .unwrap_or_else(|| DEFAULT_CTL_TOPIC_PREFIX.to_string()),
101            lattice,
102            config_generator,
103            registry_config,
104            config_store: Arc::new(config_data),
105            data_store,
106            policy_manager: None,
107            secrets_manager: None,
108            event_publisher: None,
109            enable_component_auction,
110            enable_provider_auction,
111        })
112    }
113
114    /// Setup the NATS policy manager for the host
115    pub async fn with_policy_manager(
116        self,
117        host_key: Arc<KeyPair>,
118        labels: HashMap<String, String>,
119        policy_topic: Option<String>,
120        policy_timeout: Option<Duration>,
121        policy_changes_topic: Option<String>,
122    ) -> anyhow::Result<Self> {
123        let policy_manager = NatsPolicyManager::new(
124            self.ctl_nats.clone(),
125            PolicyHostInfo {
126                public_key: host_key.public_key(),
127                lattice: self.lattice.clone(),
128                labels,
129            },
130            policy_topic,
131            policy_timeout,
132            policy_changes_topic,
133        )
134        .await?;
135
136        Ok(NatsHostBuilder {
137            policy_manager: Some(Arc::new(policy_manager)),
138            ..self
139        })
140    }
141
142    /// Setup the NATS secrets manager for the host
143    pub fn with_secrets_manager(self, secrets_topic_prefix: String) -> anyhow::Result<Self> {
144        ensure!(
145            !secrets_topic_prefix.is_empty(),
146            "secrets topic prefix must be non-empty"
147        );
148        let secrets_manager = NatsSecretsManager::new(
149            Arc::clone(&self.config_store),
150            Some(&secrets_topic_prefix),
151            &self.ctl_nats,
152        );
153
154        Ok(NatsHostBuilder {
155            secrets_manager: Some(Arc::new(secrets_manager)),
156            ..self
157        })
158    }
159
160    /// Setup the NATS event publisher for the host
161    ///
162    /// This will create a new NATS event publisher with the provided source. It's strongly
163    /// recommended to use the host's public key as the source, as this will allow tracing
164    /// events back to the host that published them.
165    pub fn with_event_publisher(self, source: String) -> Self {
166        let event_publisher =
167            NatsEventPublisher::new(source, self.lattice.clone(), self.ctl_nats.clone());
168
169        NatsHostBuilder {
170            event_publisher: Some(Arc::new(event_publisher)),
171            ..self
172        }
173    }
174
175    /// Build the [`HostBuilder`] with the NATS extension traits and the provided [`WasmbusHostConfig`].
176    pub async fn build(
177        self,
178        config: WasmbusHostConfig,
179    ) -> anyhow::Result<(HostBuilder, NatsControlInterfaceServer)> {
180        Ok((
181            HostBuilder::from(config)
182                .with_registry_config(self.registry_config)
183                .with_event_publisher(self.event_publisher)
184                .with_policy_manager(self.policy_manager)
185                .with_secrets_manager(self.secrets_manager)
186                .with_bundle_generator(Some(self.config_generator))
187                .with_config_store(Some(self.config_store))
188                .with_data_store(Some(Arc::new(self.data_store.clone()))),
189            NatsControlInterfaceServer::new(
190                self.ctl_nats,
191                self.data_store,
192                self.ctl_topic_prefix,
193                self.enable_component_auction,
194                self.enable_provider_auction,
195            ),
196        ))
197    }
198}
199
200#[instrument(level = "debug", skip_all)]
201async fn load_supplemental_config(
202    ctl_nats: &async_nats::Client,
203    lattice: &str,
204    labels: &BTreeMap<String, String>,
205) -> anyhow::Result<SupplementalConfig> {
206    #[derive(Deserialize, Default)]
207    struct SerializedSupplementalConfig {
208        #[serde(default, rename = "registryCredentials")]
209        registry_credentials: Option<HashMap<String, RegistryCredential>>,
210    }
211
212    let cfg_topic = format!("wasmbus.cfg.{lattice}.req");
213    let cfg_payload = serde_json::to_vec(&json!({
214        "labels": labels,
215    }))
216    .context("failed to serialize config payload")?;
217
218    debug!("requesting supplemental config");
219    match ctl_nats.request(cfg_topic, cfg_payload.into()).await {
220        Ok(resp) => {
221            match serde_json::from_slice::<SerializedSupplementalConfig>(resp.payload.as_ref()) {
222                Ok(ser_cfg) => Ok(SupplementalConfig {
223                    registry_config: ser_cfg.registry_credentials.and_then(|creds| {
224                        creds
225                            .into_iter()
226                            .map(|(k, v)| {
227                                debug!(registry_url = %k, "set registry config");
228                                v.into_registry_config().map(|v| (k, v))
229                            })
230                            .collect::<anyhow::Result<_>>()
231                            .ok()
232                    }),
233                }),
234                Err(e) => {
235                    error!(
236                        ?e,
237                        "failed to deserialize supplemental config. Defaulting to empty config"
238                    );
239                    Ok(SupplementalConfig::default())
240                }
241            }
242        }
243        Err(e) => {
244            error!(
245                ?e,
246                "failed to request supplemental config. Defaulting to empty config"
247            );
248            Ok(SupplementalConfig::default())
249        }
250    }
251}