wasmcloud_host/wasmbus/providers/
mod.rs

1//! Provider module
2//!
3//! The root of this module includes functionality for running and managing provider binaries. The
4//! submodules contain builtin implementations of wasmCloud capabilities providers.
5use std::collections::BTreeMap;
6use std::env;
7use std::path::{Path, PathBuf};
8use std::process::Stdio;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11use std::time::Duration;
12
13use anyhow::{bail, Context as _};
14use async_nats::Client;
15use base64::engine::general_purpose::STANDARD;
16use base64::Engine;
17use bytes::Bytes;
18use futures::{stream, Future, StreamExt};
19use nkeys::XKey;
20use tokio::io::AsyncWriteExt;
21use tokio::process;
22use tokio::sync::RwLock;
23use tokio::task::JoinSet;
24use tracing::{error, instrument, trace, warn};
25use uuid::Uuid;
26use wascap::jwt::{CapabilityProvider, Token};
27use wasmcloud_core::{
28    health_subject, provider_config_update_subject, HealthCheckResponse, HostData, OtelConfig,
29};
30use wasmcloud_runtime::capability::secrets::store::SecretValue;
31use wasmcloud_tracing::context::TraceContextInjector;
32
33use crate::event::EventPublisher;
34use crate::jwt;
35use crate::wasmbus::injector_to_headers;
36use crate::wasmbus::{config::ConfigBundle, Annotations};
37
38use super::Host;
39
40// Add internal provider modules to the host
41mod http_client;
42mod http_server;
43mod messaging_nats;
44
45/// A trait for sending and receiving messages to/from a provider
46#[async_trait::async_trait]
47pub trait ProviderManager: Send + Sync {
48    /// Put a link to the provider
49    async fn put_link(
50        &self,
51        link: &wasmcloud_core::InterfaceLinkDefinition,
52        target: &str,
53    ) -> anyhow::Result<()>;
54
55    /// Delete a link from the provider
56    async fn delete_link(
57        &self,
58        link: &wasmcloud_core::InterfaceLinkDefinition,
59        target: &str,
60    ) -> anyhow::Result<()>;
61}
62
63/// An Provider instance
64#[derive(Debug)]
65pub(crate) struct Provider {
66    pub(crate) image_ref: String,
67    pub(crate) claims_token: Option<jwt::Token<jwt::CapabilityProvider>>,
68    pub(crate) xkey: XKey,
69    pub(crate) annotations: Annotations,
70    /// Shutdown signal for the provider, set to `false` initially. When set to `true`, the
71    /// tasks running the provider, health check, and config watcher will stop.
72    pub(crate) shutdown: Arc<AtomicBool>,
73    /// Tasks running the provider, health check, and config watcher
74    pub(crate) tasks: JoinSet<()>,
75}
76
77impl Host {
78    /// Fetch configuration and secrets for a capability provider, forming the host configuration
79    /// with links, config and secrets to pass to that provider. Also returns the config bundle
80    /// which is used to watch for changes to the configuration, or can be discarded if
81    /// configuration updates aren't necessary.
82    pub(crate) async fn prepare_provider_config(
83        &self,
84        config: &[String],
85        claims_token: Option<&Token<CapabilityProvider>>,
86        provider_id: &str,
87        provider_xkey: &XKey,
88        annotations: &BTreeMap<String, String>,
89    ) -> anyhow::Result<(HostData, ConfigBundle)> {
90        let (config, secrets) = self
91            .fetch_config_and_secrets(
92                config,
93                claims_token.as_ref().map(|t| &t.jwt),
94                annotations.get("wasmcloud.dev/appspec"),
95            )
96            .await?;
97        // We only need to store the public key of the provider xkey, as the private key is only needed by the provider
98        let xkey = XKey::from_public_key(&provider_xkey.public_key())
99            .context("failed to create XKey from provider public key xkey")?;
100
101        // Prepare startup links by generating the source and target configs. Note that because the provider may be the source
102        // or target of a link, we need to iterate over all links to find the ones that involve the provider.
103        let all_links = self.links.read().await;
104        let provider_links = all_links
105            .values()
106            .flatten()
107            .filter(|link| link.source_id() == provider_id || link.target() == provider_id);
108        let link_definitions = stream::iter(provider_links)
109            .filter_map(|link| async {
110                if link.source_id() == provider_id || link.target() == provider_id {
111                    match self
112                        .resolve_link_config(
113                            link.clone(),
114                            claims_token.as_ref().map(|t| &t.jwt),
115                            annotations.get("wasmcloud.dev/appspec"),
116                            &xkey,
117                        )
118                        .await
119                    {
120                        Ok(provider_link) => Some(provider_link),
121                        Err(e) => {
122                            error!(
123                                error = ?e,
124                                provider_id,
125                                source_id = link.source_id(),
126                                target = link.target(),
127                                "failed to resolve link config, skipping link"
128                            );
129                            None
130                        }
131                    }
132                } else {
133                    None
134                }
135            })
136            .collect::<Vec<wasmcloud_core::InterfaceLinkDefinition>>()
137            .await;
138
139        let secrets = {
140            // NOTE(brooksmtownsend): This trait import is used here to ensure we're only exposing secret
141            // values when we need them.
142            use secrecy::ExposeSecret;
143            secrets
144                .iter()
145                .map(|(k, v)| match v.expose_secret() {
146                    SecretValue::String(s) => (
147                        k.clone(),
148                        wasmcloud_core::secrets::SecretValue::String(s.to_owned()),
149                    ),
150                    SecretValue::Bytes(b) => (
151                        k.clone(),
152                        wasmcloud_core::secrets::SecretValue::Bytes(b.to_owned()),
153                    ),
154                })
155                .collect()
156        };
157        let host_config = config.get_config().await.clone();
158        let lattice_rpc_user_seed = self
159            .host_config
160            .rpc_key
161            .as_ref()
162            .map(|key| key.seed())
163            .transpose()
164            .context("private key missing for provider RPC key")?;
165        let default_rpc_timeout_ms = Some(
166            self.host_config
167                .rpc_timeout
168                .as_millis()
169                .try_into()
170                .context("failed to convert rpc_timeout to u64")?,
171        );
172        let otel_config = OtelConfig {
173            enable_observability: self.host_config.otel_config.enable_observability,
174            enable_traces: self.host_config.otel_config.enable_traces,
175            enable_metrics: self.host_config.otel_config.enable_metrics,
176            enable_logs: self.host_config.otel_config.enable_logs,
177            observability_endpoint: self.host_config.otel_config.observability_endpoint.clone(),
178            traces_endpoint: self.host_config.otel_config.traces_endpoint.clone(),
179            metrics_endpoint: self.host_config.otel_config.metrics_endpoint.clone(),
180            logs_endpoint: self.host_config.otel_config.logs_endpoint.clone(),
181            protocol: self.host_config.otel_config.protocol,
182            additional_ca_paths: self.host_config.otel_config.additional_ca_paths.clone(),
183            trace_level: self.host_config.otel_config.trace_level.clone(),
184            ..Default::default()
185        };
186
187        // The provider itself needs to know its private key
188        let provider_xkey_private_key = if let Ok(seed) = provider_xkey.seed() {
189            seed
190        } else {
191            // This should never happen since this returns an error when an Xkey is
192            // created from a public key, but if we can't generate one for whatever
193            // reason, we should bail.
194            bail!("failed to generate seed for provider xkey")
195        };
196        let host_data = HostData {
197            host_id: self.host_key.public_key(),
198            lattice_rpc_prefix: self.host_config.lattice.to_string(),
199            link_name: "default".to_string(),
200            lattice_rpc_user_jwt: self.host_config.rpc_jwt.clone().unwrap_or_default(),
201            lattice_rpc_user_seed: lattice_rpc_user_seed.unwrap_or_default(),
202            lattice_rpc_url: self.host_config.rpc_nats_url.to_string(),
203            env_values: vec![],
204            instance_id: Uuid::new_v4().to_string(),
205            provider_key: provider_id.to_string(),
206            link_definitions,
207            config: host_config,
208            secrets,
209            provider_xkey_private_key,
210            host_xkey_public_key: self.secrets_xkey.public_key(),
211            cluster_issuers: vec![],
212            default_rpc_timeout_ms,
213            log_level: Some(self.host_config.log_level.clone()),
214            structured_logging: self.host_config.enable_structured_logging,
215            otel_config,
216        };
217        Ok((host_data, config))
218    }
219
220    /// Start a binary provider
221    #[allow(clippy::too_many_arguments)]
222    #[instrument(level = "debug", skip_all)]
223    pub(crate) async fn start_binary_provider(
224        self: Arc<Self>,
225        path: PathBuf,
226        host_data: HostData,
227        config: Arc<RwLock<ConfigBundle>>,
228        provider_xkey: XKey,
229        provider_id: &str,
230        config_names: Vec<String>,
231        claims_token: Option<Token<CapabilityProvider>>,
232        annotations: BTreeMap<String, String>,
233        shutdown: Arc<AtomicBool>,
234    ) -> anyhow::Result<JoinSet<()>> {
235        trace!("spawn provider process");
236
237        let mut tasks = JoinSet::new();
238
239        // Spawn a task to ensure the provider is restarted if it exits prematurely,
240        // updating the configuration as needed
241        tasks.spawn(
242            Arc::clone(&self)
243                .run_provider(
244                    path,
245                    host_data,
246                    Arc::clone(&config),
247                    provider_xkey,
248                    provider_id.to_string(),
249                    config_names,
250                    claims_token,
251                    annotations,
252                    shutdown.clone(),
253                )
254                .await?,
255        );
256
257        // Spawn a task to check the health of the provider every 30 seconds
258        tasks.spawn(check_health(
259            Arc::clone(&self.rpc_nats),
260            self.event_publisher.clone(),
261            Arc::clone(&self.host_config.lattice),
262            self.host_key.public_key(),
263            provider_id.to_string(),
264        ));
265
266        Ok(tasks)
267    }
268
269    /// Run and supervise a binary provider, restarting it if it exits prematurely.
270    #[allow(clippy::too_many_arguments)]
271    async fn run_provider(
272        self: Arc<Self>,
273        path: PathBuf,
274        host_data: HostData,
275        config_bundle: Arc<RwLock<ConfigBundle>>,
276        provider_xkey: XKey,
277        provider_id: String,
278        config_names: Vec<String>,
279        claims_token: Option<Token<CapabilityProvider>>,
280        annotations: BTreeMap<String, String>,
281        shutdown: Arc<AtomicBool>,
282    ) -> anyhow::Result<impl Future<Output = ()>> {
283        let host_data =
284            serde_json::to_vec(&host_data).context("failed to serialize provider data")?;
285
286        // If there's any issues starting the provider, we want to exit immediately
287        let child = Arc::new(RwLock::new(
288            provider_command(&path, host_data)
289                .await
290                .context("failed to configure binary provider command")?,
291        ));
292        let lattice = Arc::clone(&self.host_config.lattice);
293        Ok(async move {
294            // Use a JoinSet to manage the config watcher task so that
295            // it can be cancelled on drop and replaced with new config
296            // when a provider restarts
297            let mut config_task = JoinSet::new();
298            config_task.spawn(watch_config(
299                Arc::clone(&self.rpc_nats),
300                Arc::clone(&config_bundle),
301                Arc::clone(&lattice),
302                provider_id.clone(),
303            ));
304            loop {
305                let mut child = child.write().await;
306                match child.wait().await {
307                    Ok(status) => {
308                        // When the provider is shutting down, don't restart it
309                        if shutdown.load(Ordering::Relaxed) {
310                            trace!(
311                                path = ?path.display(),
312                                status = ?status,
313                                "provider exited but will not be restarted since it's shutting down",
314                            );
315                            // Avoid a hot loop by waiting 1s before checking the status again
316                            tokio::time::sleep(Duration::from_secs(1)).await;
317                            continue;
318                        }
319
320                        warn!(
321                            path = ?path.display(),
322                            status = ?status,
323                            "restarting provider that exited while being supervised",
324                        );
325
326                        let (host_data, new_config_bundle) = match self
327                            .prepare_provider_config(
328                                &config_names,
329                                claims_token.as_ref(),
330                                &provider_id,
331                                &provider_xkey,
332                                &annotations,
333                            )
334                            .await
335                            .map(|(host_data, config)| {
336                                (
337                                    serde_json::to_vec(&host_data)
338                                        .context("failed to serialize provider data"),
339                                    Arc::new(RwLock::new(config)),
340                                )
341                            }) {
342                            Ok((Ok(host_data), new_config_bundle)) => {
343                                (host_data, new_config_bundle)
344                            }
345                            Err(e) => {
346                                error!(err = ?e, "failed to prepare provider host data while restarting");
347                                shutdown.store(true, Ordering::Relaxed);
348                                return;
349                            }
350                            Ok((Err(e), _)) => {
351                                error!(err = ?e, "failed to serialize provider host data while restarting");
352                                shutdown.store(true, Ordering::Relaxed);
353                                return;
354                            }
355                        };
356
357                        // Stop the config watcher and start a new one with the new config bundle
358                        config_task.abort_all();
359                        config_task.spawn(watch_config(
360                            Arc::clone(&self.rpc_nats),
361                            new_config_bundle,
362                            Arc::clone(&lattice),
363                            provider_id.clone(),
364                        ));
365
366                        // Restart the provider by attempting to re-execute the binary with the same
367                        // host data
368                        let Ok(child_cmd) = provider_command(&path, host_data).await else {
369                            error!(path = ?path.display(), "failed to restart provider");
370                            shutdown.store(true, Ordering::Relaxed);
371                            return;
372                        };
373                        *child = child_cmd;
374
375                        // To avoid a tight loop, we wait 5 seconds after restarting. In the worst case,
376                        // the provider will continually execute and exit every 5 seconds.
377                        tokio::time::sleep(Duration::from_secs(5)).await;
378                    }
379                    Err(e) => {
380                        error!(
381                            path = ?path.display(),
382                            err = ?e,
383                            "failed to wait for provider to execute",
384                        );
385
386                        shutdown.store(true, Ordering::Relaxed);
387                        return;
388                    }
389                }
390            }
391        })
392    }
393}
394
395/// Using the provided path as the provider binary, start the provider process and
396/// pass the host data to it over stdin. Returns the child process handle which
397/// has already been spawned.
398async fn provider_command(path: &Path, host_data: Vec<u8>) -> anyhow::Result<process::Child> {
399    let mut child_cmd = process::Command::new(path);
400    // Prevent the provider from inheriting the host's environment, with the exception of
401    // the following variables we manually add back
402    child_cmd.env_clear();
403
404    if cfg!(windows) {
405        // Proxy SYSTEMROOT to providers. Without this, providers on Windows won't be able to start
406        child_cmd.env(
407            "SYSTEMROOT",
408            env::var("SYSTEMROOT").context("SYSTEMROOT is not set. Providers cannot be started")?,
409        );
410    }
411
412    // Proxy RUST_LOG to (Rust) providers, so they can use the same module-level directives
413    if let Ok(rust_log) = env::var("RUST_LOG") {
414        let _ = child_cmd.env("RUST_LOG", rust_log);
415    }
416
417    // Pass through any OTEL configuration options to the provider as well
418    for (k, v) in env::vars() {
419        if k.starts_with("OTEL_") {
420            let _ = child_cmd.env(k, v);
421        }
422    }
423
424    let mut child = child_cmd
425        .stdin(Stdio::piped())
426        .kill_on_drop(true)
427        .spawn()
428        .context("failed to spawn provider process")?;
429    let mut stdin = child.stdin.take().context("failed to take stdin")?;
430    stdin
431        .write_all(STANDARD.encode(host_data).as_bytes())
432        .await
433        .context("failed to write provider data")?;
434    stdin
435        .write_all(b"\r\n")
436        .await
437        .context("failed to write newline")?;
438    stdin.shutdown().await.context("failed to close stdin")?;
439
440    Ok(child)
441}
442
443/// Watch for health check responses from the provider
444///
445/// Returns a future that should be polled to continually check provider
446/// health every 30 seconds until the health receiver gets a message to stop
447fn check_health(
448    rpc_nats: Arc<Client>,
449    event_publisher: Arc<dyn EventPublisher + Send + Sync>,
450    lattice: Arc<str>,
451    host_id: String,
452    provider_id: String,
453) -> impl Future<Output = ()> {
454    let health_subject = async_nats::Subject::from(health_subject(&lattice, &provider_id));
455
456    // Check the health of the provider every 30 seconds
457    let mut health_check = tokio::time::interval(Duration::from_secs(30));
458    let mut previous_healthy = false;
459    // Allow the provider 5 seconds to initialize
460    health_check.reset_after(Duration::from_secs(5));
461    async move {
462        loop {
463            let _ = health_check.tick().await;
464            trace!(?provider_id, "performing provider health check");
465            let request =
466                async_nats::Request::new()
467                    .payload(Bytes::new())
468                    .headers(injector_to_headers(
469                        &TraceContextInjector::default_with_span(),
470                    ));
471            if let Ok(async_nats::Message { payload, .. }) =
472                rpc_nats.send_request(health_subject.clone(), request).await
473            {
474                match (
475                    serde_json::from_slice::<HealthCheckResponse>(&payload),
476                    previous_healthy,
477                ) {
478                    (Ok(HealthCheckResponse { healthy: true, .. }), false) => {
479                        trace!(?provider_id, "provider health check succeeded");
480                        previous_healthy = true;
481                        if let Err(e) = event_publisher
482                            .publish_event(
483                                "health_check_passed",
484                                crate::event::provider_health_check(&host_id, &provider_id),
485                            )
486                            .await
487                        {
488                            warn!(
489                                ?e,
490                                ?provider_id,
491                                "failed to publish provider health check succeeded event",
492                            );
493                        }
494                    }
495                    (Ok(HealthCheckResponse { healthy: false, .. }), true) => {
496                        trace!(?provider_id, "provider health check failed");
497                        previous_healthy = false;
498                        if let Err(e) = event_publisher
499                            .publish_event(
500                                "health_check_failed",
501                                crate::event::provider_health_check(&host_id, &provider_id),
502                            )
503                            .await
504                        {
505                            warn!(
506                                ?e,
507                                ?provider_id,
508                                "failed to publish provider health check failed event",
509                            );
510                        }
511                    }
512                    // If the provider health status didn't change, we simply publish a health check status event
513                    (Ok(_), _) => {
514                        if let Err(e) = event_publisher
515                            .publish_event(
516                                "health_check_status",
517                                crate::event::provider_health_check(&host_id, &provider_id),
518                            )
519                            .await
520                        {
521                            warn!(
522                                ?e,
523                                ?provider_id,
524                                "failed to publish provider health check status event",
525                            );
526                        }
527                    }
528                    _ => warn!(
529                        ?provider_id,
530                        "failed to deserialize provider health check response"
531                    ),
532                }
533            } else {
534                warn!(
535                    ?provider_id,
536                    "failed to request provider health, retrying in 30 seconds"
537                );
538            }
539        }
540    }
541}
542
543/// Watch for config updates and send them to the provider
544///
545/// Returns a future that continually checks provider config changes
546/// until the config receiver gets a message
547fn watch_config(
548    rpc_nats: Arc<Client>,
549    config: Arc<RwLock<ConfigBundle>>,
550    lattice: Arc<str>,
551    provider_id: String,
552) -> impl Future<Output = ()> {
553    let subject = provider_config_update_subject(&lattice, &provider_id);
554    trace!(?provider_id, "starting config update listener");
555    async move {
556        loop {
557            let mut config = config.write().await;
558            if let Ok(update) = config.changed().await {
559                trace!(?provider_id, "provider config bundle changed");
560                let bytes = match serde_json::to_vec(&*update) {
561                    Ok(bytes) => bytes,
562                    Err(err) => {
563                        error!(%err, ?provider_id, ?lattice, "failed to serialize configuration update ");
564                        continue;
565                    }
566                };
567                trace!(?provider_id, subject, "publishing config bundle bytes");
568                if let Err(err) = rpc_nats.publish(subject.clone(), Bytes::from(bytes)).await {
569                    error!(%err, ?provider_id, ?lattice, "failed to publish configuration update bytes to component");
570                }
571            } else {
572                break;
573            };
574        }
575    }
576}