1use std::collections::BTreeMap;
6use std::env;
7use std::path::{Path, PathBuf};
8use std::process::Stdio;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11use std::time::Duration;
12
13use anyhow::{bail, Context as _};
14use async_nats::Client;
15use base64::engine::general_purpose::STANDARD;
16use base64::Engine;
17use bytes::Bytes;
18use futures::{stream, Future, StreamExt};
19use nkeys::XKey;
20use tokio::io::AsyncWriteExt;
21use tokio::process;
22use tokio::sync::RwLock;
23use tokio::task::JoinSet;
24use tracing::{error, instrument, trace, warn};
25use uuid::Uuid;
26use wascap::jwt::{CapabilityProvider, Token};
27use wasmcloud_core::{
28 health_subject, provider_config_update_subject, HealthCheckResponse, HostData, OtelConfig,
29};
30use wasmcloud_runtime::capability::secrets::store::SecretValue;
31use wasmcloud_tracing::context::TraceContextInjector;
32
33use crate::event::EventPublisher;
34use crate::jwt;
35use crate::wasmbus::injector_to_headers;
36use crate::wasmbus::{config::ConfigBundle, Annotations};
37
38use super::Host;
39
40mod http_client;
42mod http_server;
43mod messaging_nats;
44
45#[async_trait::async_trait]
47pub trait ProviderManager: Send + Sync {
48 async fn put_link(
50 &self,
51 link: &wasmcloud_core::InterfaceLinkDefinition,
52 target: &str,
53 ) -> anyhow::Result<()>;
54
55 async fn delete_link(
57 &self,
58 link: &wasmcloud_core::InterfaceLinkDefinition,
59 target: &str,
60 ) -> anyhow::Result<()>;
61}
62
63#[derive(Debug)]
65pub(crate) struct Provider {
66 pub(crate) image_ref: String,
67 pub(crate) claims_token: Option<jwt::Token<jwt::CapabilityProvider>>,
68 pub(crate) xkey: XKey,
69 pub(crate) annotations: Annotations,
70 pub(crate) shutdown: Arc<AtomicBool>,
73 pub(crate) tasks: JoinSet<()>,
75}
76
77impl Host {
78 pub(crate) async fn prepare_provider_config(
83 &self,
84 config: &[String],
85 claims_token: Option<&Token<CapabilityProvider>>,
86 provider_id: &str,
87 provider_xkey: &XKey,
88 annotations: &BTreeMap<String, String>,
89 ) -> anyhow::Result<(HostData, ConfigBundle)> {
90 let (config, secrets) = self
91 .fetch_config_and_secrets(
92 config,
93 claims_token.as_ref().map(|t| &t.jwt),
94 annotations.get("wasmcloud.dev/appspec"),
95 )
96 .await?;
97 let xkey = XKey::from_public_key(&provider_xkey.public_key())
99 .context("failed to create XKey from provider public key xkey")?;
100
101 let all_links = self.links.read().await;
104 let provider_links = all_links
105 .values()
106 .flatten()
107 .filter(|link| link.source_id() == provider_id || link.target() == provider_id);
108 let link_definitions = stream::iter(provider_links)
109 .filter_map(|link| async {
110 if link.source_id() == provider_id || link.target() == provider_id {
111 match self
112 .resolve_link_config(
113 link.clone(),
114 claims_token.as_ref().map(|t| &t.jwt),
115 annotations.get("wasmcloud.dev/appspec"),
116 &xkey,
117 )
118 .await
119 {
120 Ok(provider_link) => Some(provider_link),
121 Err(e) => {
122 error!(
123 error = ?e,
124 provider_id,
125 source_id = link.source_id(),
126 target = link.target(),
127 "failed to resolve link config, skipping link"
128 );
129 None
130 }
131 }
132 } else {
133 None
134 }
135 })
136 .collect::<Vec<wasmcloud_core::InterfaceLinkDefinition>>()
137 .await;
138
139 let secrets = {
140 use secrecy::ExposeSecret;
143 secrets
144 .iter()
145 .map(|(k, v)| match v.expose_secret() {
146 SecretValue::String(s) => (
147 k.clone(),
148 wasmcloud_core::secrets::SecretValue::String(s.to_owned()),
149 ),
150 SecretValue::Bytes(b) => (
151 k.clone(),
152 wasmcloud_core::secrets::SecretValue::Bytes(b.to_owned()),
153 ),
154 })
155 .collect()
156 };
157 let host_config = config.get_config().await.clone();
158 let lattice_rpc_user_seed = self
159 .host_config
160 .rpc_key
161 .as_ref()
162 .map(|key| key.seed())
163 .transpose()
164 .context("private key missing for provider RPC key")?;
165 let default_rpc_timeout_ms = Some(
166 self.host_config
167 .rpc_timeout
168 .as_millis()
169 .try_into()
170 .context("failed to convert rpc_timeout to u64")?,
171 );
172 let otel_config = OtelConfig {
173 enable_observability: self.host_config.otel_config.enable_observability,
174 enable_traces: self.host_config.otel_config.enable_traces,
175 enable_metrics: self.host_config.otel_config.enable_metrics,
176 enable_logs: self.host_config.otel_config.enable_logs,
177 observability_endpoint: self.host_config.otel_config.observability_endpoint.clone(),
178 traces_endpoint: self.host_config.otel_config.traces_endpoint.clone(),
179 metrics_endpoint: self.host_config.otel_config.metrics_endpoint.clone(),
180 logs_endpoint: self.host_config.otel_config.logs_endpoint.clone(),
181 protocol: self.host_config.otel_config.protocol,
182 additional_ca_paths: self.host_config.otel_config.additional_ca_paths.clone(),
183 trace_level: self.host_config.otel_config.trace_level.clone(),
184 ..Default::default()
185 };
186
187 let provider_xkey_private_key = if let Ok(seed) = provider_xkey.seed() {
189 seed
190 } else {
191 bail!("failed to generate seed for provider xkey")
195 };
196 let host_data = HostData {
197 host_id: self.host_key.public_key(),
198 lattice_rpc_prefix: self.host_config.lattice.to_string(),
199 link_name: "default".to_string(),
200 lattice_rpc_user_jwt: self.host_config.rpc_jwt.clone().unwrap_or_default(),
201 lattice_rpc_user_seed: lattice_rpc_user_seed.unwrap_or_default(),
202 lattice_rpc_url: self.host_config.rpc_nats_url.to_string(),
203 env_values: vec![],
204 instance_id: Uuid::new_v4().to_string(),
205 provider_key: provider_id.to_string(),
206 link_definitions,
207 config: host_config,
208 secrets,
209 provider_xkey_private_key,
210 host_xkey_public_key: self.secrets_xkey.public_key(),
211 cluster_issuers: vec![],
212 default_rpc_timeout_ms,
213 log_level: Some(self.host_config.log_level.clone()),
214 structured_logging: self.host_config.enable_structured_logging,
215 otel_config,
216 };
217 Ok((host_data, config))
218 }
219
220 #[allow(clippy::too_many_arguments)]
222 #[instrument(level = "debug", skip_all)]
223 pub(crate) async fn start_binary_provider(
224 self: Arc<Self>,
225 path: PathBuf,
226 host_data: HostData,
227 config: Arc<RwLock<ConfigBundle>>,
228 provider_xkey: XKey,
229 provider_id: &str,
230 config_names: Vec<String>,
231 claims_token: Option<Token<CapabilityProvider>>,
232 annotations: BTreeMap<String, String>,
233 shutdown: Arc<AtomicBool>,
234 ) -> anyhow::Result<JoinSet<()>> {
235 trace!("spawn provider process");
236
237 let mut tasks = JoinSet::new();
238
239 tasks.spawn(
242 Arc::clone(&self)
243 .run_provider(
244 path,
245 host_data,
246 Arc::clone(&config),
247 provider_xkey,
248 provider_id.to_string(),
249 config_names,
250 claims_token,
251 annotations,
252 shutdown.clone(),
253 )
254 .await?,
255 );
256
257 tasks.spawn(check_health(
259 Arc::clone(&self.rpc_nats),
260 self.event_publisher.clone(),
261 Arc::clone(&self.host_config.lattice),
262 self.host_key.public_key(),
263 provider_id.to_string(),
264 ));
265
266 Ok(tasks)
267 }
268
269 #[allow(clippy::too_many_arguments)]
271 async fn run_provider(
272 self: Arc<Self>,
273 path: PathBuf,
274 host_data: HostData,
275 config_bundle: Arc<RwLock<ConfigBundle>>,
276 provider_xkey: XKey,
277 provider_id: String,
278 config_names: Vec<String>,
279 claims_token: Option<Token<CapabilityProvider>>,
280 annotations: BTreeMap<String, String>,
281 shutdown: Arc<AtomicBool>,
282 ) -> anyhow::Result<impl Future<Output = ()>> {
283 let host_data =
284 serde_json::to_vec(&host_data).context("failed to serialize provider data")?;
285
286 let child = Arc::new(RwLock::new(
288 provider_command(&path, host_data)
289 .await
290 .context("failed to configure binary provider command")?,
291 ));
292 let lattice = Arc::clone(&self.host_config.lattice);
293 Ok(async move {
294 let mut config_task = JoinSet::new();
298 config_task.spawn(watch_config(
299 Arc::clone(&self.rpc_nats),
300 Arc::clone(&config_bundle),
301 Arc::clone(&lattice),
302 provider_id.clone(),
303 ));
304 loop {
305 let mut child = child.write().await;
306 match child.wait().await {
307 Ok(status) => {
308 if shutdown.load(Ordering::Relaxed) {
310 trace!(
311 path = ?path.display(),
312 status = ?status,
313 "provider exited but will not be restarted since it's shutting down",
314 );
315 tokio::time::sleep(Duration::from_secs(1)).await;
317 continue;
318 }
319
320 warn!(
321 path = ?path.display(),
322 status = ?status,
323 "restarting provider that exited while being supervised",
324 );
325
326 let (host_data, new_config_bundle) = match self
327 .prepare_provider_config(
328 &config_names,
329 claims_token.as_ref(),
330 &provider_id,
331 &provider_xkey,
332 &annotations,
333 )
334 .await
335 .map(|(host_data, config)| {
336 (
337 serde_json::to_vec(&host_data)
338 .context("failed to serialize provider data"),
339 Arc::new(RwLock::new(config)),
340 )
341 }) {
342 Ok((Ok(host_data), new_config_bundle)) => {
343 (host_data, new_config_bundle)
344 }
345 Err(e) => {
346 error!(err = ?e, "failed to prepare provider host data while restarting");
347 shutdown.store(true, Ordering::Relaxed);
348 return;
349 }
350 Ok((Err(e), _)) => {
351 error!(err = ?e, "failed to serialize provider host data while restarting");
352 shutdown.store(true, Ordering::Relaxed);
353 return;
354 }
355 };
356
357 config_task.abort_all();
359 config_task.spawn(watch_config(
360 Arc::clone(&self.rpc_nats),
361 new_config_bundle,
362 Arc::clone(&lattice),
363 provider_id.clone(),
364 ));
365
366 let Ok(child_cmd) = provider_command(&path, host_data).await else {
369 error!(path = ?path.display(), "failed to restart provider");
370 shutdown.store(true, Ordering::Relaxed);
371 return;
372 };
373 *child = child_cmd;
374
375 tokio::time::sleep(Duration::from_secs(5)).await;
378 }
379 Err(e) => {
380 error!(
381 path = ?path.display(),
382 err = ?e,
383 "failed to wait for provider to execute",
384 );
385
386 shutdown.store(true, Ordering::Relaxed);
387 return;
388 }
389 }
390 }
391 })
392 }
393}
394
395async fn provider_command(path: &Path, host_data: Vec<u8>) -> anyhow::Result<process::Child> {
399 let mut child_cmd = process::Command::new(path);
400 child_cmd.env_clear();
403
404 if cfg!(windows) {
405 child_cmd.env(
407 "SYSTEMROOT",
408 env::var("SYSTEMROOT").context("SYSTEMROOT is not set. Providers cannot be started")?,
409 );
410 }
411
412 if let Ok(rust_log) = env::var("RUST_LOG") {
414 let _ = child_cmd.env("RUST_LOG", rust_log);
415 }
416
417 for (k, v) in env::vars() {
419 if k.starts_with("OTEL_") {
420 let _ = child_cmd.env(k, v);
421 }
422 }
423
424 let mut child = child_cmd
425 .stdin(Stdio::piped())
426 .kill_on_drop(true)
427 .spawn()
428 .context("failed to spawn provider process")?;
429 let mut stdin = child.stdin.take().context("failed to take stdin")?;
430 stdin
431 .write_all(STANDARD.encode(host_data).as_bytes())
432 .await
433 .context("failed to write provider data")?;
434 stdin
435 .write_all(b"\r\n")
436 .await
437 .context("failed to write newline")?;
438 stdin.shutdown().await.context("failed to close stdin")?;
439
440 Ok(child)
441}
442
443fn check_health(
448 rpc_nats: Arc<Client>,
449 event_publisher: Arc<dyn EventPublisher + Send + Sync>,
450 lattice: Arc<str>,
451 host_id: String,
452 provider_id: String,
453) -> impl Future<Output = ()> {
454 let health_subject = async_nats::Subject::from(health_subject(&lattice, &provider_id));
455
456 let mut health_check = tokio::time::interval(Duration::from_secs(30));
458 let mut previous_healthy = false;
459 health_check.reset_after(Duration::from_secs(5));
461 async move {
462 loop {
463 let _ = health_check.tick().await;
464 trace!(?provider_id, "performing provider health check");
465 let request =
466 async_nats::Request::new()
467 .payload(Bytes::new())
468 .headers(injector_to_headers(
469 &TraceContextInjector::default_with_span(),
470 ));
471 if let Ok(async_nats::Message { payload, .. }) =
472 rpc_nats.send_request(health_subject.clone(), request).await
473 {
474 match (
475 serde_json::from_slice::<HealthCheckResponse>(&payload),
476 previous_healthy,
477 ) {
478 (Ok(HealthCheckResponse { healthy: true, .. }), false) => {
479 trace!(?provider_id, "provider health check succeeded");
480 previous_healthy = true;
481 if let Err(e) = event_publisher
482 .publish_event(
483 "health_check_passed",
484 crate::event::provider_health_check(&host_id, &provider_id),
485 )
486 .await
487 {
488 warn!(
489 ?e,
490 ?provider_id,
491 "failed to publish provider health check succeeded event",
492 );
493 }
494 }
495 (Ok(HealthCheckResponse { healthy: false, .. }), true) => {
496 trace!(?provider_id, "provider health check failed");
497 previous_healthy = false;
498 if let Err(e) = event_publisher
499 .publish_event(
500 "health_check_failed",
501 crate::event::provider_health_check(&host_id, &provider_id),
502 )
503 .await
504 {
505 warn!(
506 ?e,
507 ?provider_id,
508 "failed to publish provider health check failed event",
509 );
510 }
511 }
512 (Ok(_), _) => {
514 if let Err(e) = event_publisher
515 .publish_event(
516 "health_check_status",
517 crate::event::provider_health_check(&host_id, &provider_id),
518 )
519 .await
520 {
521 warn!(
522 ?e,
523 ?provider_id,
524 "failed to publish provider health check status event",
525 );
526 }
527 }
528 _ => warn!(
529 ?provider_id,
530 "failed to deserialize provider health check response"
531 ),
532 }
533 } else {
534 warn!(
535 ?provider_id,
536 "failed to request provider health, retrying in 30 seconds"
537 );
538 }
539 }
540 }
541}
542
543fn watch_config(
548 rpc_nats: Arc<Client>,
549 config: Arc<RwLock<ConfigBundle>>,
550 lattice: Arc<str>,
551 provider_id: String,
552) -> impl Future<Output = ()> {
553 let subject = provider_config_update_subject(&lattice, &provider_id);
554 trace!(?provider_id, "starting config update listener");
555 async move {
556 loop {
557 let mut config = config.write().await;
558 if let Ok(update) = config.changed().await {
559 trace!(?provider_id, "provider config bundle changed");
560 let bytes = match serde_json::to_vec(&*update) {
561 Ok(bytes) => bytes,
562 Err(err) => {
563 error!(%err, ?provider_id, ?lattice, "failed to serialize configuration update ");
564 continue;
565 }
566 };
567 trace!(?provider_id, subject, "publishing config bundle bytes");
568 if let Err(err) = rpc_nats.publish(subject.clone(), Bytes::from(bytes)).await {
569 error!(%err, ?provider_id, ?lattice, "failed to publish configuration update bytes to component");
570 }
571 } else {
572 break;
573 };
574 }
575 }
576}