1use std::{
4 collections::{BTreeMap, HashMap},
5 sync::Arc,
6 time::Duration,
7};
8
9use anyhow::{ensure, Context as _};
10use async_nats::{jetstream::kv::Store, Client};
11use nkeys::KeyPair;
12use serde::Deserialize;
13use serde_json::json;
14use tracing::{debug, error, instrument};
15use wasmcloud_control_interface::RegistryCredential;
16use wasmcloud_core::RegistryConfig;
17
18use crate::{
19 event::EventPublisher,
20 nats::{event::NatsEventPublisher, policy::NatsPolicyManager, secrets::NatsSecretsManager},
21 oci,
22 registry::{merge_registry_config, RegistryCredentialExt as _, SupplementalConfig},
23 secrets::SecretsManager,
24 store::StoreManager,
25 wasmbus::{config::BundleGenerator, HostBuilder},
26 PolicyHostInfo, PolicyManager, WasmbusHostConfig,
27};
28
29const DEFAULT_CTL_TOPIC_PREFIX: &str = "wasmbus.ctl";
30
31use super::{create_bucket, ctl::NatsControlInterfaceServer};
32
33pub struct NatsHostBuilder {
41 ctl_nats: Client,
43 ctl_topic_prefix: String,
44 lattice: String,
45 config_generator: BundleGenerator,
46 registry_config: HashMap<String, RegistryConfig>,
47 enable_component_auction: bool,
48 enable_provider_auction: bool,
49
50 config_store: Arc<dyn StoreManager>,
52 data_store: Store,
53 policy_manager: Option<Arc<dyn PolicyManager>>,
54 secrets_manager: Option<Arc<dyn SecretsManager>>,
55 event_publisher: Option<Arc<dyn EventPublisher>>,
56}
57
58impl NatsHostBuilder {
59 #[allow(clippy::too_many_arguments)]
61 pub async fn new(
62 ctl_nats: Client,
63 ctl_topic_prefix: Option<String>,
64 lattice: String,
65 js_domain: Option<String>,
66 oci_opts: Option<oci::Config>,
67 labels: BTreeMap<String, String>,
68 config_service_enabled: bool,
69 enable_component_auction: bool,
70 enable_provider_auction: bool,
71 ) -> anyhow::Result<Self> {
72 let ctl_jetstream = if let Some(domain) = js_domain.as_ref() {
73 async_nats::jetstream::with_domain(ctl_nats.clone(), domain)
74 } else {
75 async_nats::jetstream::new(ctl_nats.clone())
76 };
77 let bucket = format!("LATTICEDATA_{lattice}");
78 let data_store = create_bucket(&ctl_jetstream, &bucket).await?;
79
80 let config_bucket = format!("CONFIGDATA_{lattice}");
81 let config_data = create_bucket(&ctl_jetstream, &config_bucket).await?;
82
83 let supplemental_config = if config_service_enabled {
84 load_supplemental_config(&ctl_nats, &lattice, &labels).await?
85 } else {
86 SupplementalConfig::default()
87 };
88
89 let mut registry_config = supplemental_config.registry_config.unwrap_or_default();
90 if let Some(oci_opts) = oci_opts {
91 debug!("supplementing OCI config with OCI options");
92 merge_registry_config(&mut registry_config, oci_opts).await;
93 }
94
95 let config_generator = BundleGenerator::new(Arc::new(config_data.clone()));
96
97 Ok(Self {
98 ctl_nats,
99 ctl_topic_prefix: ctl_topic_prefix
100 .unwrap_or_else(|| DEFAULT_CTL_TOPIC_PREFIX.to_string()),
101 lattice,
102 config_generator,
103 registry_config,
104 config_store: Arc::new(config_data),
105 data_store,
106 policy_manager: None,
107 secrets_manager: None,
108 event_publisher: None,
109 enable_component_auction,
110 enable_provider_auction,
111 })
112 }
113
114 pub async fn with_policy_manager(
116 self,
117 host_key: Arc<KeyPair>,
118 labels: HashMap<String, String>,
119 policy_topic: Option<String>,
120 policy_timeout: Option<Duration>,
121 policy_changes_topic: Option<String>,
122 ) -> anyhow::Result<Self> {
123 let policy_manager = NatsPolicyManager::new(
124 self.ctl_nats.clone(),
125 PolicyHostInfo {
126 public_key: host_key.public_key(),
127 lattice: self.lattice.clone(),
128 labels,
129 },
130 policy_topic,
131 policy_timeout,
132 policy_changes_topic,
133 )
134 .await?;
135
136 Ok(NatsHostBuilder {
137 policy_manager: Some(Arc::new(policy_manager)),
138 ..self
139 })
140 }
141
142 pub fn with_secrets_manager(self, secrets_topic_prefix: String) -> anyhow::Result<Self> {
144 ensure!(
145 !secrets_topic_prefix.is_empty(),
146 "secrets topic prefix must be non-empty"
147 );
148 let secrets_manager = NatsSecretsManager::new(
149 Arc::clone(&self.config_store),
150 Some(&secrets_topic_prefix),
151 &self.ctl_nats,
152 );
153
154 Ok(NatsHostBuilder {
155 secrets_manager: Some(Arc::new(secrets_manager)),
156 ..self
157 })
158 }
159
160 pub fn with_event_publisher(self, source: String) -> Self {
166 let event_publisher =
167 NatsEventPublisher::new(source, self.lattice.clone(), self.ctl_nats.clone());
168
169 NatsHostBuilder {
170 event_publisher: Some(Arc::new(event_publisher)),
171 ..self
172 }
173 }
174
175 pub async fn build(
177 self,
178 config: WasmbusHostConfig,
179 ) -> anyhow::Result<(HostBuilder, NatsControlInterfaceServer)> {
180 Ok((
181 HostBuilder::from(config)
182 .with_registry_config(self.registry_config)
183 .with_event_publisher(self.event_publisher)
184 .with_policy_manager(self.policy_manager)
185 .with_secrets_manager(self.secrets_manager)
186 .with_bundle_generator(Some(self.config_generator))
187 .with_config_store(Some(self.config_store))
188 .with_data_store(Some(Arc::new(self.data_store.clone()))),
189 NatsControlInterfaceServer::new(
190 self.ctl_nats,
191 self.data_store,
192 self.ctl_topic_prefix,
193 self.enable_component_auction,
194 self.enable_provider_auction,
195 ),
196 ))
197 }
198}
199
200#[instrument(level = "debug", skip_all)]
201async fn load_supplemental_config(
202 ctl_nats: &async_nats::Client,
203 lattice: &str,
204 labels: &BTreeMap<String, String>,
205) -> anyhow::Result<SupplementalConfig> {
206 #[derive(Deserialize, Default)]
207 struct SerializedSupplementalConfig {
208 #[serde(default, rename = "registryCredentials")]
209 registry_credentials: Option<HashMap<String, RegistryCredential>>,
210 }
211
212 let cfg_topic = format!("wasmbus.cfg.{lattice}.req");
213 let cfg_payload = serde_json::to_vec(&json!({
214 "labels": labels,
215 }))
216 .context("failed to serialize config payload")?;
217
218 debug!("requesting supplemental config");
219 match ctl_nats.request(cfg_topic, cfg_payload.into()).await {
220 Ok(resp) => {
221 match serde_json::from_slice::<SerializedSupplementalConfig>(resp.payload.as_ref()) {
222 Ok(ser_cfg) => Ok(SupplementalConfig {
223 registry_config: ser_cfg.registry_credentials.and_then(|creds| {
224 creds
225 .into_iter()
226 .map(|(k, v)| {
227 debug!(registry_url = %k, "set registry config");
228 v.into_registry_config().map(|v| (k, v))
229 })
230 .collect::<anyhow::Result<_>>()
231 .ok()
232 }),
233 }),
234 Err(e) => {
235 error!(
236 ?e,
237 "failed to deserialize supplemental config. Defaulting to empty config"
238 );
239 Ok(SupplementalConfig::default())
240 }
241 }
242 }
243 Err(e) => {
244 error!(
245 ?e,
246 "failed to request supplemental config. Defaulting to empty config"
247 );
248 Ok(SupplementalConfig::default())
249 }
250 }
251}