wasmcloud_host/wasmbus/
mod.rs

1#![allow(clippy::type_complexity)]
2
3use core::sync::atomic::Ordering;
4
5use std::collections::{hash_map, BTreeMap, HashMap};
6use std::env::consts::{ARCH, FAMILY, OS};
7use std::future::Future;
8use std::num::NonZeroUsize;
9use std::ops::Deref;
10use std::pin::Pin;
11use std::str::FromStr;
12use std::sync::atomic::AtomicBool;
13use std::sync::Arc;
14use std::task::{Context, Poll};
15use std::time::Duration;
16
17use anyhow::{anyhow, bail, ensure, Context as _};
18use bytes::{BufMut, Bytes, BytesMut};
19use claims::{Claims, StoredClaims};
20use futures::stream::{AbortHandle, Abortable};
21use futures::{join, stream, Stream, StreamExt, TryStreamExt};
22use hyper_util::rt::{TokioExecutor, TokioIo};
23use nkeys::{KeyPair, KeyPairType, XKey};
24use providers::Provider;
25use secrecy::SecretBox;
26use serde_json::json;
27use sysinfo::System;
28use tokio::io::AsyncWrite;
29use tokio::net::TcpListener;
30use tokio::spawn;
31use tokio::sync::{mpsc, watch, RwLock, Semaphore};
32use tokio::task::{JoinHandle, JoinSet};
33use tokio::time::{interval_at, timeout, Instant};
34use tokio_stream::wrappers::IntervalStream;
35use tracing::{debug, debug_span, error, info, instrument, trace, warn, Instrument as _};
36use tracing_opentelemetry::OpenTelemetrySpanExt;
37use wascap::jwt;
38use wasmcloud_control_interface::{
39    ComponentAuctionAck, ComponentAuctionRequest, ComponentDescription, CtlResponse,
40    DeleteInterfaceLinkDefinitionRequest, HostInventory, HostLabel, HostLabelIdentifier, Link,
41    ProviderAuctionAck, ProviderAuctionRequest, ProviderDescription, RegistryCredential,
42    ScaleComponentCommand, StartProviderCommand, StopHostCommand, StopProviderCommand,
43    UpdateComponentCommand,
44};
45use wasmcloud_core::ComponentId;
46use wasmcloud_runtime::capability::secrets::store::SecretValue;
47use wasmcloud_runtime::component::{from_string_map, Limits, WrpcServeEvent};
48use wasmcloud_runtime::Runtime;
49use wasmcloud_secrets_types::SECRET_PREFIX;
50use wasmcloud_tracing::context::TraceContextInjector;
51use wasmcloud_tracing::{global, InstrumentationScope, KeyValue};
52
53use crate::event::{DefaultEventPublisher, EventPublisher};
54use crate::metrics::HostMetrics;
55use crate::nats::connect_nats;
56use crate::nats::provider::NatsProviderManager;
57use crate::policy::DefaultPolicyManager;
58use crate::secrets::{DefaultSecretsManager, SecretsManager};
59use crate::store::{DefaultStore, StoreManager};
60use crate::wasmbus::ctl::ControlInterfaceServer;
61use crate::workload_identity::WorkloadIdentityConfig;
62use crate::{fetch_component, PolicyManager, PolicyResponse, RegistryConfig, ResourceRef};
63
64mod component_spec;
65mod experimental;
66mod handler;
67
68pub(crate) mod claims;
69pub(crate) mod providers;
70
71/// Control interface implementation
72pub mod ctl;
73
74/// Configuration service
75pub mod config;
76
77/// wasmCloud host configuration
78pub mod host_config;
79
80pub use self::experimental::Features;
81pub use self::host_config::Host as HostConfig;
82pub use component_spec::ComponentSpecification;
83pub use providers::ProviderManager;
84
85use self::config::{BundleGenerator, ConfigBundle};
86use self::handler::Handler;
87
88const MAX_INVOCATION_CHANNEL_SIZE: usize = 5000;
89const MIN_INVOCATION_CHANNEL_SIZE: usize = 256;
90
91#[derive(Clone, Default)]
92struct AsyncBytesMut(Arc<std::sync::Mutex<BytesMut>>);
93
94impl AsyncWrite for AsyncBytesMut {
95    fn poll_write(
96        self: Pin<&mut Self>,
97        _cx: &mut Context<'_>,
98        buf: &[u8],
99    ) -> Poll<Result<usize, std::io::Error>> {
100        Poll::Ready({
101            self.0
102                .lock()
103                .map_err(|e| std::io::Error::other(e.to_string()))?
104                .put_slice(buf);
105            Ok(buf.len())
106        })
107    }
108
109    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
110        Poll::Ready(Ok(()))
111    }
112
113    fn poll_shutdown(
114        self: Pin<&mut Self>,
115        _cx: &mut Context<'_>,
116    ) -> Poll<Result<(), std::io::Error>> {
117        Poll::Ready(Ok(()))
118    }
119}
120
121impl TryFrom<AsyncBytesMut> for Vec<u8> {
122    type Error = anyhow::Error;
123
124    fn try_from(buf: AsyncBytesMut) -> Result<Self, Self::Error> {
125        buf.0
126            .lock()
127            .map(|buf| buf.clone().into())
128            .map_err(|e| anyhow!(e.to_string()).context("failed to lock"))
129    }
130}
131
132type Annotations = BTreeMap<String, String>;
133
134#[derive(Debug)]
135struct Component {
136    component: wasmcloud_runtime::Component<Handler>,
137    /// Unique component identifier for this component
138    id: Arc<str>,
139    handler: Handler,
140    exports: JoinHandle<()>,
141    annotations: Annotations,
142    /// Maximum number of instances of this component that can be running at once
143    max_instances: NonZeroUsize,
144    limits: Option<Limits>,
145    image_reference: Arc<str>,
146    events: mpsc::Sender<WrpcServeEvent<<WrpcServer as wrpc_transport::Serve>::Context>>,
147    permits: Arc<Semaphore>,
148}
149
150impl Deref for Component {
151    type Target = wasmcloud_runtime::Component<Handler>;
152
153    fn deref(&self) -> &Self::Target {
154        &self.component
155    }
156}
157
158#[derive(Clone)]
159struct WrpcServer {
160    nats: wrpc_transport_nats::Client,
161    claims: Option<Arc<jwt::Claims<jwt::Component>>>,
162    id: Arc<str>,
163    image_reference: Arc<str>,
164    annotations: Arc<Annotations>,
165    policy_manager: Arc<dyn PolicyManager>,
166    metrics: Arc<HostMetrics>,
167}
168
169struct InvocationContext {
170    start_at: Instant,
171    attributes: Vec<KeyValue>,
172    span: tracing::Span,
173}
174
175impl Deref for InvocationContext {
176    type Target = tracing::Span;
177
178    fn deref(&self) -> &Self::Target {
179        &self.span
180    }
181}
182
183impl wrpc_transport::Serve for WrpcServer {
184    type Context = InvocationContext;
185    type Outgoing = <wrpc_transport_nats::Client as wrpc_transport::Serve>::Outgoing;
186    type Incoming = <wrpc_transport_nats::Client as wrpc_transport::Serve>::Incoming;
187
188    #[instrument(
189        level = "info",
190        skip(self, paths),
191        fields(
192            component_id = ?self.id,
193            component_ref = ?self.image_reference)
194    )]
195    async fn serve(
196        &self,
197        instance: &str,
198        func: &str,
199        paths: impl Into<Arc<[Box<[Option<usize>]>]>> + Send,
200    ) -> anyhow::Result<
201        impl Stream<Item = anyhow::Result<(Self::Context, Self::Outgoing, Self::Incoming)>>
202            + Send
203            + 'static,
204    > {
205        debug!("serving invocations");
206        let invocations = self.nats.serve(instance, func, paths).await?;
207
208        let func: Arc<str> = Arc::from(func);
209        let instance: Arc<str> = Arc::from(instance);
210        let annotations = Arc::clone(&self.annotations);
211        let id = Arc::clone(&self.id);
212        let image_reference = Arc::clone(&self.image_reference);
213        let metrics = Arc::clone(&self.metrics);
214        let policy_manager = Arc::clone(&self.policy_manager);
215        let claims = self.claims.clone();
216        Ok(invocations.and_then(move |(cx, tx, rx)| {
217            let annotations = Arc::clone(&annotations);
218            let claims = claims.clone();
219            let func = Arc::clone(&func);
220            let id = Arc::clone(&id);
221            let image_reference = Arc::clone(&image_reference);
222            let instance = Arc::clone(&instance);
223            let metrics = Arc::clone(&metrics);
224            let policy_manager = Arc::clone(&policy_manager);
225            let span = tracing::info_span!("component_invocation", func = %func, id = %id, instance = %instance);
226            async move {
227                if let Some(ref cx) = cx {
228                    // Coerce the HashMap<String, Vec<String>> into a Vec<(String, String)> by
229                    // flattening the values
230                    let trace_context = cx
231                        .iter()
232                        .flat_map(|(key, value)| {
233                            value
234                                .iter()
235                                .map(|v| (key.to_string(), v.to_string()))
236                                .collect::<Vec<_>>()
237                        })
238                        .collect::<Vec<(String, String)>>();
239                    span.set_parent(wasmcloud_tracing::context::get_span_context(&trace_context));
240                }
241
242                    let PolicyResponse {
243                        request_id,
244                        permitted,
245                        message,
246                    } = policy_manager
247                        .evaluate_perform_invocation(
248                            &id,
249                            &image_reference,
250                            &annotations,
251                            claims.as_deref(),
252                            instance.to_string(),
253                            func.to_string(),
254                        )
255                        .instrument(debug_span!(parent: &span, "policy_check"))
256                        .await?;
257                    ensure!(
258                        permitted,
259                        "policy denied request to invoke component `{request_id}`: `{message:?}`",
260                    );
261
262                Ok((
263                    InvocationContext{
264                        start_at: Instant::now(),
265                        // TODO(metrics): insert information about the source once we have concrete context data
266                        attributes: vec![
267                            KeyValue::new("component.ref", image_reference),
268                            KeyValue::new("lattice", metrics.lattice_id.clone()),
269                            KeyValue::new("host", metrics.host_id.clone()),
270                            KeyValue::new("operation", format!("{instance}/{func}")),
271                        ],
272                        span,
273                    },
274                    tx,
275                    rx,
276                ))
277            }
278        }))
279    }
280}
281
282/// wasmCloud Host
283/// Represents the core host structure for managing components, providers, links, and other
284/// functionalities in a wasmCloud host environment.
285pub struct Host {
286    /// A user-friendly name for the host.
287    friendly_name: String,
288
289    /// Handle to abort the heartbeat task.
290    heartbeat: AbortHandle,
291
292    /// Configuration settings for the host.
293    host_config: HostConfig,
294
295    /// The cryptographic key pair associated with the host.
296    host_key: Arc<KeyPair>,
297
298    /// The JWT token representing the host's identity.
299    host_token: Arc<jwt::Token<jwt::Host>>,
300
301    /// A map of labels associated with the host, used for metadata and identification.
302    labels: Arc<RwLock<BTreeMap<String, String>>>,
303
304    /// The maximum allowed execution time for tasks within the host.
305    max_execution_time: Duration,
306
307    /// The timestamp indicating when the host started.
308    start_at: Instant,
309
310    /// A channel to send a stop event to the host, signaling it to shut down.
311    pub stop_tx: watch::Sender<Option<Instant>>,
312
313    /// A channel to receive a stop event from the host, signaling it to shut down.
314    pub stop_rx: watch::Receiver<Option<Instant>>,
315
316    /// Experimental features enabled in the host for gated functionality.
317    experimental_features: Features,
318
319    /// Indicates whether the host is ready to process requests.
320    ready: Arc<AtomicBool>,
321
322    /// The encryption key used to secure secrets when transmitting over NATS.
323    secrets_xkey: Arc<XKey>,
324
325    /// A map of components managed by the host, keyed by their component IDs.
326    components: Arc<RwLock<HashMap<ComponentId, Arc<Component>>>>,
327
328    /// A map of claims associated with components, keyed by their component IDs.
329    component_claims: Arc<RwLock<HashMap<ComponentId, jwt::Claims<jwt::Component>>>>,
330
331    /// A map of component IDs to their associated links.
332    links: RwLock<HashMap<String, Vec<Link>>>,
333
334    /// A map of messaging links for NATS clients, organized by component and link names.
335    messaging_links:
336        Arc<RwLock<HashMap<Arc<str>, Arc<RwLock<HashMap<Box<str>, async_nats::Client>>>>>>,
337
338    /// A map of providers managed by the host, keyed by their identifiers.
339    providers: RwLock<HashMap<String, Provider>>,
340
341    /// A map of claims associated with capability providers, keyed by their identifiers.
342    provider_claims: Arc<RwLock<HashMap<String, jwt::Claims<jwt::CapabilityProvider>>>>,
343
344    /// The runtime environment used by the host for executing tasks.
345    runtime: Runtime,
346
347    /// Optional overrides for registry configuration settings.
348    registry_config: RwLock<HashMap<String, RegistryConfig>>,
349
350    /// The NATS client used for making RPC calls.
351    rpc_nats: Arc<async_nats::Client>,
352
353    /// The manager for communicating with capability providers.
354    provider_manager: Arc<dyn ProviderManager>,
355
356    /// Configured OpenTelemetry metrics for monitoring the host.
357    metrics: Arc<HostMetrics>,
358
359    /// The event publisher used for emitting events from the host.
360    pub(crate) event_publisher: Arc<dyn EventPublisher>,
361
362    /// The policy manager used for evaluating policy decisions.
363    policy_manager: Arc<dyn PolicyManager>,
364
365    /// The secrets manager used for managing and retrieving secrets.
366    secrets_manager: Arc<dyn SecretsManager>,
367
368    /// The configuration manager for managing component, provider, link, and secret configurations.
369    config_store: Arc<dyn StoreManager>,
370
371    /// The data store for managing links, claims, and component specifications.
372    data_store: Arc<dyn StoreManager>,
373
374    /// The generator for creating configuration bundles.
375    config_generator: BundleGenerator,
376
377    /// A set of tasks managed by the host.
378    #[allow(unused)]
379    tasks: JoinSet<()>,
380}
381
382/// HostBuilder is used to construct a new Host instance
383#[derive(Default)]
384pub struct HostBuilder {
385    config: HostConfig,
386
387    /// Additional configuration of OCI registries
388    registry_config: HashMap<String, RegistryConfig>,
389
390    // Host trait extensions
391    bundle_generator: Option<BundleGenerator>,
392    /// The configuration store to use for managing configuration data
393    config_store: Option<Arc<dyn StoreManager>>,
394    /// The data store to use for managing data
395    data_store: Option<Arc<dyn StoreManager>>,
396    /// The event publisher to use for sending events
397    event_publisher: Option<Arc<dyn EventPublisher>>,
398    /// The policy manager to use for evaluating policy decisions
399    policy_manager: Option<Arc<dyn PolicyManager>>,
400    /// The secrets manager to use for managing secrets
401    secrets_manager: Option<Arc<dyn SecretsManager>>,
402}
403
404impl HostBuilder {
405    const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30);
406
407    const NAME_ADJECTIVES: &'static str = "
408    autumn hidden bitter misty silent empty dry dark summer
409    icy delicate quiet white cool spring winter patient
410    twilight dawn crimson wispy weathered blue billowing
411    broken cold damp falling frosty green long late lingering
412    bold little morning muddy old red rough still small
413    sparkling bouncing shy wandering withered wild black
414    young holy solitary fragrant aged snowy proud floral
415    restless divine polished ancient purple lively nameless
416    gray orange mauve
417    ";
418
419    const NAME_NOUNS: &'static str = "
420    waterfall river breeze moon rain wind sea morning
421    snow lake sunset pine shadow leaf dawn glitter forest
422    hill cloud meadow sun glade bird brook butterfly
423    bush dew dust field fire flower firefly ladybug feather grass
424    haze mountain night pond darkness snowflake silence
425    sound sky shape stapler surf thunder violet water wildflower
426    wave water resonance sun timber dream cherry tree fog autocorrect
427    frost voice paper frog smoke star hamster ocean emoji robot
428    ";
429
430    /// Generate a friendly name for the host based on a random number.
431    /// Names we pulled from a list of friendly or neutral adjectives and nouns suitable for use in
432    /// public and on hosts/domain names
433    fn generate_friendly_name() -> Option<String> {
434        let adjectives: Vec<_> = Self::NAME_ADJECTIVES.split_whitespace().collect();
435        let nouns: Vec<_> = Self::NAME_NOUNS.split_whitespace().collect();
436        names::Generator::new(&adjectives, &nouns, names::Name::Numbered).next()
437    }
438
439    /// Create a new [HostBuilder] instance with the default configuration
440    pub fn new() -> Self {
441        HostBuilder::default()
442    }
443
444    /// Initialize the host with the given event publisher for sending events
445    pub fn with_event_publisher(self, event_publisher: Option<Arc<dyn EventPublisher>>) -> Self {
446        Self {
447            event_publisher,
448            ..self
449        }
450    }
451
452    /// Initialize the host with the given policy manager for evaluating policy decisions
453    pub fn with_policy_manager(self, policy_manager: Option<Arc<dyn PolicyManager>>) -> Self {
454        Self {
455            policy_manager,
456            ..self
457        }
458    }
459
460    /// Initialize the host with the given registry configuration
461    pub fn with_registry_config(self, registry_config: HashMap<String, RegistryConfig>) -> Self {
462        Self {
463            registry_config,
464            ..self
465        }
466    }
467
468    /// Initialize the host with the given secrets manager for managing secrets
469    pub fn with_secrets_manager(self, secrets_manager: Option<Arc<dyn SecretsManager>>) -> Self {
470        Self {
471            secrets_manager,
472            ..self
473        }
474    }
475
476    /// Initialize the host with the given configuration store
477    pub fn with_config_store(self, config_store: Option<Arc<dyn StoreManager>>) -> Self {
478        Self {
479            config_store,
480            ..self
481        }
482    }
483
484    /// Initialize the host with the given data store
485    pub fn with_data_store(self, data_store: Option<Arc<dyn StoreManager>>) -> Self {
486        Self { data_store, ..self }
487    }
488
489    /// Initialize the host with the given configuration watching bundle
490    pub fn with_bundle_generator(self, bundle_generator: Option<BundleGenerator>) -> Self {
491        Self {
492            bundle_generator,
493            ..self
494        }
495    }
496
497    /// Build a new [Host] instance with the given configuration
498    #[instrument(level = "debug", skip_all)]
499    pub async fn build(
500        self,
501    ) -> anyhow::Result<(Arc<Host>, impl Future<Output = anyhow::Result<()>>)> {
502        ensure!(self.config.host_key.key_pair_type() == KeyPairType::Server);
503
504        let mut labels = BTreeMap::from([
505            ("hostcore.arch".into(), ARCH.into()),
506            ("hostcore.os".into(), OS.into()),
507            ("hostcore.osfamily".into(), FAMILY.into()),
508        ]);
509        labels.extend(self.config.labels.clone().into_iter());
510        let friendly_name =
511            Self::generate_friendly_name().context("failed to generate friendly name")?;
512
513        let host_issuer = Arc::new(KeyPair::new_account());
514        let claims = jwt::Claims::<jwt::Host>::new(
515            friendly_name.clone(),
516            host_issuer.public_key(),
517            self.config.host_key.public_key().clone(),
518            Some(HashMap::from_iter([(
519                "self_signed".to_string(),
520                "true".to_string(),
521            )])),
522        );
523        let jwt = claims
524            .encode(&host_issuer)
525            .context("failed to encode host claims")?;
526        let host_token = Arc::new(jwt::Token { jwt, claims });
527
528        let workload_identity_config = if self.config.experimental_features.workload_identity_auth {
529            Some(WorkloadIdentityConfig::from_env()?)
530        } else {
531            None
532        };
533
534        debug!(
535            rpc_nats_url = self.config.rpc_nats_url.as_str(),
536            "connecting to NATS RPC server"
537        );
538        let rpc_nats = Arc::new(
539            connect_nats(
540                self.config.rpc_nats_url.as_str(),
541                self.config.rpc_jwt.as_ref(),
542                self.config.rpc_key.clone(),
543                self.config.rpc_tls,
544                Some(self.config.rpc_timeout),
545                workload_identity_config.clone(),
546            )
547            .await
548            .context("failed to establish NATS RPC server connection")?,
549        );
550
551        let (stop_tx, stop_rx) = watch::channel(None);
552
553        let (runtime, _epoch) = Runtime::builder()
554            .max_execution_time(self.config.max_execution_time)
555            .max_linear_memory(self.config.max_linear_memory)
556            .max_components(self.config.max_components)
557            .max_core_instances_per_component(self.config.max_core_instances_per_component)
558            .max_component_size(self.config.max_component_size)
559            .experimental_features(self.config.experimental_features.into())
560            .build()
561            .context("failed to build runtime")?;
562
563        let scope = InstrumentationScope::builder("wasmcloud-host")
564            .with_version(self.config.version.clone())
565            .with_attributes(vec![
566                KeyValue::new("host.id", self.config.host_key.public_key()),
567                KeyValue::new("host.version", self.config.version.clone()),
568                KeyValue::new("host.arch", ARCH),
569                KeyValue::new("host.os", OS),
570                KeyValue::new("host.osfamily", FAMILY),
571                KeyValue::new("host.friendly_name", friendly_name.clone()),
572                KeyValue::new("host.hostname", System::host_name().unwrap_or_default()),
573                KeyValue::new(
574                    "host.kernel_version",
575                    System::kernel_version().unwrap_or_default(),
576                ),
577                KeyValue::new("host.os_version", System::os_version().unwrap_or_default()),
578            ])
579            .build();
580        let meter = global::meter_with_scope(scope);
581        let metrics = HostMetrics::new(
582            &meter,
583            self.config.host_key.public_key(),
584            self.config.lattice.to_string(),
585            None,
586        )
587        .context("failed to create HostMetrics instance")?;
588
589        debug!("Feature flags: {:?}", self.config.experimental_features);
590        let mut tasks = JoinSet::new();
591        let ready = Arc::new(AtomicBool::new(true));
592        if let Some(addr) = self.config.http_admin {
593            let socket = TcpListener::bind(addr)
594                .await
595                .context("failed to bind on HTTP administration endpoint")?;
596            let ready = Arc::clone(&ready);
597            let svc = hyper::service::service_fn(move |req| {
598                const OK: &str = r#"{"status":"ok"}"#;
599                const FAIL: &str = r#"{"status":"failure"}"#;
600                let ready = Arc::clone(&ready);
601                async move {
602                    let (http::request::Parts { method, uri, .. }, _) = req.into_parts();
603                    match (method.as_str(), uri.path()) {
604                        ("HEAD", "/livez") => Ok(http::Response::default()),
605                        ("GET", "/livez") => Ok(http::Response::new(http_body_util::Full::new(
606                            Bytes::from(OK),
607                        ))),
608                        (method, "/livez") => http::Response::builder()
609                            .status(http::StatusCode::METHOD_NOT_ALLOWED)
610                            .body(http_body_util::Full::new(Bytes::from(format!(
611                                "method `{method}` not supported for path `/livez`"
612                            )))),
613                        ("HEAD", "/readyz") => {
614                            if ready.load(Ordering::Relaxed) {
615                                Ok(http::Response::default())
616                            } else {
617                                http::Response::builder()
618                                    .status(http::StatusCode::INTERNAL_SERVER_ERROR)
619                                    .body(http_body_util::Full::default())
620                            }
621                        }
622                        ("GET", "/readyz") => {
623                            if ready.load(Ordering::Relaxed) {
624                                Ok(http::Response::new(http_body_util::Full::new(Bytes::from(
625                                    OK,
626                                ))))
627                            } else {
628                                http::Response::builder()
629                                    .status(http::StatusCode::INTERNAL_SERVER_ERROR)
630                                    .body(http_body_util::Full::new(Bytes::from(FAIL)))
631                            }
632                        }
633                        (method, "/readyz") => http::Response::builder()
634                            .status(http::StatusCode::METHOD_NOT_ALLOWED)
635                            .body(http_body_util::Full::new(Bytes::from(format!(
636                                "method `{method}` not supported for path `/readyz`"
637                            )))),
638                        (.., path) => http::Response::builder()
639                            .status(http::StatusCode::NOT_FOUND)
640                            .body(http_body_util::Full::new(Bytes::from(format!(
641                                "unknown endpoint `{path}`"
642                            )))),
643                    }
644                }
645            });
646            let srv = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
647            tasks.spawn(async move {
648                loop {
649                    let stream = match socket.accept().await {
650                        Ok((stream, _)) => stream,
651                        Err(err) => {
652                            error!(?err, "failed to accept HTTP administration connection");
653                            continue;
654                        }
655                    };
656                    let svc = svc.clone();
657                    if let Err(err) = srv.serve_connection(TokioIo::new(stream), svc).await {
658                        error!(?err, "failed to serve HTTP administration connection");
659                    }
660                }
661            });
662        }
663
664        let (heartbeat_abort, heartbeat_abort_reg) = AbortHandle::new_pair();
665        let start_at = Instant::now();
666
667        let host = Host {
668            components: Arc::new(RwLock::new(HashMap::new())),
669            providers: RwLock::new(HashMap::new()),
670            friendly_name,
671            heartbeat: heartbeat_abort.clone(),
672            host_key: self.config.host_key.clone(),
673            host_token,
674            secrets_xkey: Arc::new(XKey::new()),
675            labels: Arc::new(RwLock::new(labels)),
676            experimental_features: self.config.experimental_features,
677            runtime,
678            start_at,
679            stop_rx,
680            stop_tx,
681            links: RwLock::new(HashMap::new()),
682            component_claims: Arc::new(RwLock::new(HashMap::new())),
683            provider_claims: Arc::new(RwLock::new(HashMap::new())),
684            metrics: Arc::new(metrics),
685            max_execution_time: self.config.max_execution_time,
686            messaging_links: Arc::default(),
687            ready: Arc::clone(&ready),
688            tasks,
689            rpc_nats: Arc::clone(&rpc_nats),
690            registry_config: RwLock::new(self.registry_config),
691            // Extension traits that we fallback to defaults for
692            event_publisher: self
693                .event_publisher
694                .unwrap_or_else(|| Arc::new(DefaultEventPublisher::default())),
695            policy_manager: self
696                .policy_manager
697                .unwrap_or_else(|| Arc::new(DefaultPolicyManager)),
698            secrets_manager: self
699                .secrets_manager
700                .unwrap_or_else(|| Arc::new(DefaultSecretsManager::default())),
701            data_store: self
702                .data_store
703                .unwrap_or_else(|| Arc::new(DefaultStore::default())),
704            config_store: self
705                .config_store
706                .unwrap_or_else(|| Arc::new(DefaultStore::default())),
707            config_generator: self
708                .bundle_generator
709                .unwrap_or_else(|| BundleGenerator::new(Arc::new(DefaultStore::default()))),
710            // TODO(#4407): This trait abstraction isn't actually abstracted since all capability
711            // providers are NATS based. As we revise communication with providers, we can update
712            // this to be a trait object from the builder instead.
713            provider_manager: Arc::new(NatsProviderManager::new(
714                rpc_nats,
715                self.config.lattice.to_string(),
716            )),
717            host_config: self.config,
718        };
719
720        let host = Arc::new(host);
721
722        let heartbeat_interval = host
723            .host_config
724            .heartbeat_interval
725            .unwrap_or(Self::DEFAULT_HEARTBEAT_INTERVAL);
726        let heartbeat_start_at = start_at
727            .checked_add(heartbeat_interval)
728            .context("failed to compute heartbeat start time")?;
729        let heartbeat = IntervalStream::new(interval_at(heartbeat_start_at, heartbeat_interval));
730        let heartbeat = spawn({
731            let host = Arc::clone(&host);
732            async move {
733                let mut heartbeat = Abortable::new(heartbeat, heartbeat_abort_reg);
734                heartbeat
735                    .by_ref()
736                    .for_each({
737                        let host = Arc::clone(&host);
738                        move |_| {
739                            let host = Arc::clone(&host);
740                            async move {
741                                let heartbeat = match host.heartbeat().await {
742                                    Ok(heartbeat) => heartbeat,
743                                    Err(e) => {
744                                        error!("failed to generate heartbeat: {e}");
745                                        return;
746                                    }
747                                };
748
749                                if let Err(e) = host
750                                    .event_publisher
751                                    .publish_event("host_heartbeat", heartbeat)
752                                    .await
753                                {
754                                    error!("failed to publish heartbeat: {e}");
755                                }
756                            }
757                        }
758                    })
759                    .await;
760                let deadline = { *host.stop_rx.borrow() };
761                host.stop_tx.send_replace(deadline);
762                if heartbeat.is_aborted() {
763                    info!("heartbeat task gracefully stopped");
764                } else {
765                    error!("heartbeat task unexpectedly stopped");
766                }
767            }
768        });
769
770        let start_evt = json!({
771            "id": host.host_key.public_key(),
772            "friendly_name": host.friendly_name,
773            "labels": *host.labels.read().await,
774            "uptime_seconds": 0,
775            "version": host.host_config.version,
776        });
777        host.event_publisher
778            .publish_event("host_started", start_evt)
779            .await
780            .context("failed to publish start event")?;
781        info!(
782            host_id = host.host_key.public_key(),
783            "wasmCloud host started"
784        );
785
786        Ok((Arc::clone(&host), async move {
787            ready.store(false, Ordering::Relaxed);
788            heartbeat_abort.abort();
789            heartbeat.await.context("failed to await heartbeat")?;
790            host.event_publisher
791                .publish_event(
792                    "host_stopped",
793                    json!({
794                        "labels": *host.labels.read().await,
795                    }),
796                )
797                .await
798                .context("failed to publish stop event")?;
799            // Before we exit, make sure to flush all messages or we may lose some that we've
800            // thought were sent (like the host_stopped event)
801            host.rpc_nats
802                .flush()
803                .await
804                .context("failed to flush NATS clients")?;
805            Ok(())
806        }))
807    }
808}
809
810impl From<HostConfig> for HostBuilder {
811    fn from(config: HostConfig) -> Self {
812        HostBuilder {
813            config,
814            ..Default::default()
815        }
816    }
817}
818
819impl Host {
820    /// Create a new HostBuilder
821    #[instrument(level = "debug", skip_all)]
822    pub fn builder() -> HostBuilder {
823        HostBuilder::default()
824    }
825
826    /// Waits for host to be stopped via lattice commands and returns the shutdown deadline on
827    /// success
828    ///
829    /// # Errors
830    ///
831    /// Returns an error if internal stop channel is closed prematurely
832    #[instrument(level = "debug", skip_all)]
833    pub async fn stopped(&self) -> anyhow::Result<Option<Instant>> {
834        self.stop_rx
835            .clone()
836            .changed()
837            .await
838            .context("failed to wait for stop")?;
839        Ok(*self.stop_rx.borrow())
840    }
841
842    /// Returns the host's unique identifier
843    #[instrument(level = "trace", skip_all)]
844    pub fn id(&self) -> String {
845        self.host_key.public_key()
846    }
847
848    /// Returns the lattice the host is running on
849    #[instrument(level = "trace", skip_all)]
850    pub fn lattice(&self) -> &str {
851        &self.host_config.lattice
852    }
853
854    #[instrument(level = "debug", skip_all)]
855    async fn inventory(&self) -> HostInventory {
856        trace!("generating host inventory");
857        let components: Vec<_> = {
858            let components = self.components.read().await;
859            stream::iter(components.iter())
860                .filter_map(|(id, component)| async move {
861                    let mut description = ComponentDescription::builder()
862                        .id(id.into())
863                        .image_ref(component.image_reference.to_string())
864                        .annotations(component.annotations.clone().into_iter().collect())
865                        .max_instances(component.max_instances.get().try_into().unwrap_or(u32::MAX))
866                        .limits(component.limits.map(|limits| limits.to_string_map()))
867                        .revision(
868                            component
869                                .claims()
870                                .and_then(|claims| claims.metadata.as_ref())
871                                .and_then(|jwt::Component { rev, .. }| *rev)
872                                .unwrap_or_default(),
873                        );
874                    // Add name if present
875                    if let Some(name) = component
876                        .claims()
877                        .and_then(|claims| claims.metadata.as_ref())
878                        .and_then(|metadata| metadata.name.as_ref())
879                        .cloned()
880                    {
881                        description = description.name(name);
882                    };
883
884                    Some(
885                        description
886                            .build()
887                            .expect("failed to build component description: {e}"),
888                    )
889                })
890                .collect()
891                .await
892        };
893
894        let providers: Vec<_> = self
895            .providers
896            .read()
897            .await
898            .iter()
899            .map(
900                |(
901                    provider_id,
902                    Provider {
903                        annotations,
904                        claims_token,
905                        image_ref,
906                        ..
907                    },
908                )| {
909                    let mut provider_description = ProviderDescription::builder()
910                        .id(provider_id)
911                        .image_ref(image_ref);
912                    if let Some(name) = claims_token
913                        .as_ref()
914                        .and_then(|claims| claims.claims.metadata.as_ref())
915                        .and_then(|metadata| metadata.name.as_ref())
916                    {
917                        provider_description = provider_description.name(name);
918                    }
919                    provider_description
920                        .annotations(
921                            annotations
922                                .clone()
923                                .into_iter()
924                                .collect::<BTreeMap<String, String>>(),
925                        )
926                        .revision(
927                            claims_token
928                                .as_ref()
929                                .and_then(|claims| claims.claims.metadata.as_ref())
930                                .and_then(|jwt::CapabilityProvider { rev, .. }| *rev)
931                                .unwrap_or_default(),
932                        )
933                        .build()
934                        .expect("failed to build provider description")
935                },
936            )
937            .collect();
938
939        let uptime = self.start_at.elapsed();
940        HostInventory::builder()
941            .components(components)
942            .providers(providers)
943            .friendly_name(self.friendly_name.clone())
944            .labels(self.labels.read().await.clone())
945            .uptime_human(human_friendly_uptime(uptime))
946            .uptime_seconds(uptime.as_secs())
947            .version(self.host_config.version.clone())
948            .host_id(self.host_key.public_key())
949            .build()
950            .expect("failed to build host inventory")
951    }
952
953    #[instrument(level = "debug", skip_all)]
954    async fn heartbeat(&self) -> anyhow::Result<serde_json::Value> {
955        trace!("generating heartbeat");
956        Ok(serde_json::to_value(self.inventory().await)?)
957    }
958
959    /// Instantiate a component
960    #[allow(clippy::too_many_arguments)] // TODO: refactor into a config struct
961    #[instrument(level = "debug", skip_all)]
962    async fn instantiate_component(
963        &self,
964        annotations: &Annotations,
965        image_reference: Arc<str>,
966        id: Arc<str>,
967        max_instances: NonZeroUsize,
968        limits: Option<Limits>,
969        mut component: wasmcloud_runtime::Component<Handler>,
970        handler: Handler,
971    ) -> anyhow::Result<Arc<Component>> {
972        trace!(
973            component_ref = ?image_reference,
974            max_instances,
975            "instantiating component"
976        );
977
978        let max_execution_time = self.max_execution_time; // TODO: Needs approval to go ahead.
979        component.set_max_execution_time(max_execution_time);
980
981        let (events_tx, mut events_rx) = mpsc::channel(
982            max_instances
983                .get()
984                .clamp(MIN_INVOCATION_CHANNEL_SIZE, MAX_INVOCATION_CHANNEL_SIZE),
985        );
986        let prefix = Arc::from(format!("{}.{id}", &self.host_config.lattice));
987        let nats = wrpc_transport_nats::Client::new(
988            Arc::clone(&self.rpc_nats),
989            Arc::clone(&prefix),
990            Some(prefix),
991        )
992        .await?;
993        let exports = component
994            .serve_wrpc(
995                &WrpcServer {
996                    nats,
997                    claims: component.claims().cloned().map(Arc::new),
998                    id: Arc::clone(&id),
999                    image_reference: Arc::clone(&image_reference),
1000                    annotations: Arc::new(annotations.clone()),
1001                    policy_manager: Arc::clone(&self.policy_manager),
1002                    metrics: Arc::clone(&self.metrics),
1003                },
1004                handler.clone(),
1005                events_tx.clone(),
1006            )
1007            .await?;
1008        let permits = Arc::new(Semaphore::new(
1009            usize::from(max_instances).min(Semaphore::MAX_PERMITS),
1010        ));
1011        let component_attributes = Arc::new(vec![
1012            KeyValue::new("component.id", id.to_string()),
1013            KeyValue::new("component.ref", image_reference.to_string()),
1014            KeyValue::new("lattice", self.host_config.lattice.clone()),
1015            KeyValue::new("host", self.host_key.public_key()),
1016        ]);
1017        self.metrics
1018            .set_max_instances(max_instances.get() as u64, &component_attributes);
1019
1020        let metrics = Arc::clone(&self.metrics);
1021        Ok(Arc::new(Component {
1022            component,
1023            id: Arc::clone(&id),
1024            handler,
1025            events: events_tx,
1026            permits: Arc::clone(&permits),
1027            exports: spawn(async move {
1028                // Since we are joining two `move` closures, we need two separate `Arc`s
1029                let metrics_left = Arc::clone(&metrics);
1030                let metrics_right = Arc::clone(&metrics);
1031                join!(
1032                    async move {
1033                        let mut exports = stream::select_all(exports);
1034                        loop {
1035                            let metrics_left = Arc::clone(&metrics_left);
1036                            let component_attributes = Arc::clone(&component_attributes);
1037                            let permits = Arc::clone(&permits);
1038                            if let Some(fut) = exports.next().await {
1039                                match fut {
1040                                    Ok(fut) => {
1041                                        debug!("accepted invocation, acquiring permit");
1042                                        let permit = permits.acquire_owned().await;
1043
1044                                        // Record that an instance is active
1045                                        metrics_left
1046                                            .increment_active_instance(&component_attributes);
1047                                        spawn(async move {
1048                                            let _permit = permit;
1049                                            debug!("handling invocation");
1050                                            // Awaiting this future drives the execution of the component
1051                                            let result = timeout(max_execution_time, fut).await;
1052                                            metrics_left
1053                                                .decrement_active_instance(&component_attributes);
1054
1055                                            match result {
1056                                                Ok(Ok(())) => {
1057                                                    debug!("successfully handled invocation");
1058                                                    Ok(())
1059                                                }
1060                                                Ok(Err(err)) => {
1061                                                    warn!(?err, "failed to handle invocation");
1062                                                    Err(err)
1063                                                }
1064                                                Err(_err) => {
1065                                                    warn!("component invocation timed out");
1066                                                    Err(anyhow::anyhow!(
1067                                                        "component invocation timed out"
1068                                                    ))
1069                                                }
1070                                            }
1071                                        });
1072                                    }
1073                                    Err(err) => {
1074                                        warn!(?err, "failed to accept invocation")
1075                                    }
1076                                }
1077                            }
1078                        }
1079                    },
1080                    async move {
1081                        while let Some(evt) = events_rx.recv().await {
1082                            match evt {
1083                                WrpcServeEvent::HttpIncomingHandlerHandleReturned {
1084                                    context:
1085                                        InvocationContext {
1086                                            start_at,
1087                                            ref attributes,
1088                                            ..
1089                                        },
1090                                    success,
1091                                }
1092                                | WrpcServeEvent::MessagingHandlerHandleMessageReturned {
1093                                    context:
1094                                        InvocationContext {
1095                                            start_at,
1096                                            ref attributes,
1097                                            ..
1098                                        },
1099                                    success,
1100                                }
1101                                | WrpcServeEvent::DynamicExportReturned {
1102                                    context:
1103                                        InvocationContext {
1104                                            start_at,
1105                                            ref attributes,
1106                                            ..
1107                                        },
1108                                    success,
1109                                } => metrics_right.record_component_invocation(
1110                                    u64::try_from(start_at.elapsed().as_nanos())
1111                                        .unwrap_or_default(),
1112                                    attributes,
1113                                    !success,
1114                                ),
1115                            }
1116                        }
1117                        debug!("serving event stream is done");
1118                    },
1119                );
1120                debug!("export serving task done");
1121            }),
1122            annotations: annotations.clone(),
1123            max_instances,
1124            limits,
1125            image_reference: Arc::clone(&image_reference),
1126        }))
1127    }
1128
1129    #[allow(clippy::too_many_arguments)]
1130    #[instrument(level = "debug", skip_all)]
1131    async fn start_component<'a>(
1132        &self,
1133        entry: hash_map::VacantEntry<'a, String, Arc<Component>>,
1134        wasm: &[u8],
1135        claims: Option<jwt::Claims<jwt::Component>>,
1136        component_ref: Arc<str>,
1137        component_id: Arc<str>,
1138        max_instances: NonZeroUsize,
1139        limits: Option<Limits>,
1140        annotations: &Annotations,
1141        config: ConfigBundle,
1142        secrets: HashMap<String, SecretBox<SecretValue>>,
1143    ) -> anyhow::Result<&'a mut Arc<Component>> {
1144        debug!(?component_ref, ?max_instances, "starting new component");
1145
1146        if let Some(ref claims) = claims {
1147            self.store_claims(Claims::Component(claims.clone()))
1148                .await
1149                .context("failed to store claims")?;
1150        }
1151
1152        let component_spec = self
1153            .get_component_spec(&component_id)
1154            .await?
1155            .unwrap_or_else(|| ComponentSpecification::new(&component_ref));
1156        self.store_component_spec(&component_id, &component_spec)
1157            .await?;
1158
1159        // Map the imports to pull out the result types of the functions for lookup when invoking them
1160        let handler = Handler {
1161            nats: Arc::clone(&self.rpc_nats),
1162            config_data: Arc::new(RwLock::new(config)),
1163            lattice: Arc::clone(&self.host_config.lattice),
1164            component_id: Arc::clone(&component_id),
1165            secrets: Arc::new(RwLock::new(secrets)),
1166            targets: Arc::default(),
1167            instance_links: Arc::new(RwLock::new(component_import_links(&component_spec.links))),
1168            messaging_links: {
1169                let mut links = self.messaging_links.write().await;
1170                Arc::clone(links.entry(Arc::clone(&component_id)).or_default())
1171            },
1172            invocation_timeout: Duration::from_secs(10), // TODO: Make this configurable
1173            experimental_features: self.experimental_features,
1174            host_labels: Arc::clone(&self.labels),
1175        };
1176        let component = wasmcloud_runtime::Component::new(&self.runtime, wasm, limits)?;
1177        let component = self
1178            .instantiate_component(
1179                annotations,
1180                Arc::clone(&component_ref),
1181                Arc::clone(&component_id),
1182                max_instances,
1183                limits,
1184                component,
1185                handler,
1186            )
1187            .await
1188            .context("failed to instantiate component")?;
1189
1190        info!(?component_ref, "component started");
1191        self.event_publisher
1192            .publish_event(
1193                "component_scaled",
1194                crate::event::component_scaled(
1195                    claims.as_ref(),
1196                    annotations,
1197                    self.host_key.public_key(),
1198                    max_instances,
1199                    &component_ref,
1200                    &component_id,
1201                ),
1202            )
1203            .await?;
1204
1205        Ok(entry.insert(component))
1206    }
1207
1208    #[instrument(level = "debug", skip_all)]
1209    async fn stop_component(&self, component: &Component, _host_id: &str) -> anyhow::Result<()> {
1210        trace!(component_id = %component.id, "stopping component");
1211
1212        component.exports.abort();
1213
1214        Ok(())
1215    }
1216
1217    #[instrument(level = "trace", skip_all)]
1218    async fn fetch_component(&self, component_ref: &str) -> anyhow::Result<Vec<u8>> {
1219        let registry_config = self.registry_config.read().await;
1220        fetch_component(
1221            component_ref,
1222            self.host_config.allow_file_load,
1223            &self.host_config.oci_opts,
1224            &registry_config,
1225        )
1226        .await
1227        .context("failed to fetch component")
1228    }
1229
1230    #[instrument(level = "debug", skip_all)]
1231    pub(crate) async fn handle_auction_component(
1232        &self,
1233        payload: impl AsRef<[u8]>,
1234    ) -> anyhow::Result<Option<CtlResponse<ComponentAuctionAck>>> {
1235        let request = serde_json::from_slice::<ComponentAuctionRequest>(payload.as_ref())
1236            .context("failed to deserialize component auction command")?;
1237        <Self as ControlInterfaceServer>::handle_auction_component(self, request).await
1238    }
1239
1240    #[instrument(level = "debug", skip_all)]
1241    pub(crate) async fn handle_auction_provider(
1242        &self,
1243        payload: impl AsRef<[u8]>,
1244    ) -> anyhow::Result<Option<CtlResponse<ProviderAuctionAck>>> {
1245        let request = serde_json::from_slice::<ProviderAuctionRequest>(payload.as_ref())
1246            .context("failed to deserialize provider auction command")?;
1247        <Self as ControlInterfaceServer>::handle_auction_provider(self, request).await
1248    }
1249
1250    #[instrument(level = "debug", skip_all)]
1251    pub(crate) async fn handle_stop_host(
1252        &self,
1253        payload: impl AsRef<[u8]>,
1254        transport_host_id: &str,
1255    ) -> anyhow::Result<CtlResponse<()>> {
1256        // Allow an empty payload to be used for stopping hosts
1257        let timeout = if payload.as_ref().is_empty() {
1258            None
1259        } else {
1260            let cmd = serde_json::from_slice::<StopHostCommand>(payload.as_ref())
1261                .context("failed to deserialize stop command")?;
1262            let timeout = cmd.timeout();
1263            let host_id = cmd.host_id();
1264
1265            // If the Host ID was provided (i..e not the empty string, due to #[serde(default)]), then
1266            // we should check it against the known transport-provided host_id, and this actual host's ID
1267            if !host_id.is_empty() {
1268                anyhow::ensure!(
1269                    host_id == transport_host_id && host_id == self.host_key.public_key(),
1270                    "invalid host_id [{host_id}]"
1271                );
1272            }
1273            timeout
1274        };
1275
1276        // It *should* be impossible for the transport-derived host ID to not match at this point
1277        anyhow::ensure!(
1278            transport_host_id == self.host_key.public_key(),
1279            "invalid host_id [{transport_host_id}]"
1280        );
1281
1282        let mut stop_command = StopHostCommand::builder().host_id(transport_host_id);
1283        if let Some(timeout) = timeout {
1284            stop_command = stop_command.timeout(timeout);
1285        }
1286        <Self as ControlInterfaceServer>::handle_stop_host(
1287            self,
1288            stop_command
1289                .build()
1290                .map_err(|e| anyhow!(e))
1291                .context("failed to build stop host command")?,
1292        )
1293        .await
1294    }
1295
1296    #[instrument(level = "debug", skip_all)]
1297    pub(crate) async fn handle_scale_component(
1298        self: Arc<Self>,
1299        payload: impl AsRef<[u8]>,
1300    ) -> anyhow::Result<CtlResponse<()>> {
1301        let request = serde_json::from_slice::<ScaleComponentCommand>(payload.as_ref())
1302            .context("failed to deserialize component scale command")?;
1303        <Self as ControlInterfaceServer>::handle_scale_component(self, request).await
1304    }
1305
1306    #[instrument(level = "debug", skip_all)]
1307    /// Handles scaling an component to a supplied number of `max` concurrently executing instances.
1308    /// Supplying `0` will result in stopping that component instance.
1309    #[allow(clippy::too_many_arguments)]
1310    async fn handle_scale_component_task(
1311        &self,
1312        component_ref: Arc<str>,
1313        component_id: Arc<str>,
1314        host_id: &str,
1315        max_instances: u32,
1316        component_limits: Option<HashMap<String, String>>,
1317        annotations: &Annotations,
1318        config: Vec<String>,
1319        wasm: anyhow::Result<Vec<u8>>,
1320        claims_token: Option<&jwt::Token<jwt::Component>>,
1321    ) -> anyhow::Result<()> {
1322        trace!(?component_ref, max_instances, "scale component task");
1323
1324        let claims = claims_token.map(|c| c.claims.clone());
1325        match self
1326            .policy_manager
1327            .evaluate_start_component(
1328                &component_id,
1329                &component_ref,
1330                max_instances,
1331                annotations,
1332                claims.as_ref(),
1333            )
1334            .await?
1335        {
1336            PolicyResponse {
1337                permitted: false,
1338                message: Some(message),
1339                ..
1340            } => bail!("Policy denied request to scale component `{component_id}`: `{message:?}`"),
1341            PolicyResponse {
1342                permitted: false, ..
1343            } => bail!("Policy denied request to scale component `{component_id}`"),
1344            PolicyResponse {
1345                permitted: true, ..
1346            } => (),
1347        };
1348
1349        let limits: Option<Limits> = from_string_map(component_limits.as_ref());
1350
1351        let scaled_event = match (
1352            self.components
1353                .write()
1354                .await
1355                .entry(component_id.to_string()),
1356            NonZeroUsize::new(max_instances as usize),
1357        ) {
1358            // No component is running and we requested to scale to zero, noop.
1359            // We still publish the event to indicate that the component has been scaled to zero
1360            (hash_map::Entry::Vacant(_), None) => crate::event::component_scaled(
1361                claims.as_ref(),
1362                annotations,
1363                host_id,
1364                0_usize,
1365                &component_ref,
1366                &component_id,
1367            ),
1368            // No component is running and we requested to scale to some amount, start with specified max
1369            (hash_map::Entry::Vacant(entry), Some(max)) => {
1370                let (config, secrets) = self
1371                    .fetch_config_and_secrets(
1372                        &config,
1373                        claims_token.as_ref().map(|c| &c.jwt),
1374                        annotations.get("wasmcloud.dev/appspec"),
1375                    )
1376                    .await?;
1377                match &wasm {
1378                    Ok(wasm) => {
1379                        self.start_component(
1380                            entry,
1381                            wasm,
1382                            claims.clone(),
1383                            Arc::clone(&component_ref),
1384                            Arc::clone(&component_id),
1385                            max,
1386                            limits,
1387                            annotations,
1388                            config,
1389                            secrets,
1390                        )
1391                        .await?;
1392
1393                        crate::event::component_scaled(
1394                            claims.as_ref(),
1395                            annotations,
1396                            host_id,
1397                            max,
1398                            &component_ref,
1399                            &component_id,
1400                        )
1401                    }
1402                    Err(e) => {
1403                        error!(%component_ref, %component_id, err = ?e, "failed to scale component");
1404                        if let Err(e) = self
1405                            .event_publisher
1406                            .publish_event(
1407                                "component_scale_failed",
1408                                crate::event::component_scale_failed(
1409                                    claims_token.map(|c| c.claims.clone()).as_ref(),
1410                                    annotations,
1411                                    host_id,
1412                                    &component_ref,
1413                                    &component_id,
1414                                    max_instances,
1415                                    e,
1416                                ),
1417                            )
1418                            .await
1419                        {
1420                            error!(%component_ref, %component_id, err = ?e, "failed to publish component scale failed event");
1421                        }
1422                        return Ok(());
1423                    }
1424                }
1425            }
1426            // Component is running and we requested to scale to zero instances, stop component
1427            (hash_map::Entry::Occupied(entry), None) => {
1428                let component = entry.remove();
1429                self.stop_component(&component, host_id)
1430                    .await
1431                    .context("failed to stop component in response to scale to zero")?;
1432
1433                info!(?component_ref, "component stopped");
1434                crate::event::component_scaled(
1435                    claims.as_ref(),
1436                    &component.annotations,
1437                    host_id,
1438                    0_usize,
1439                    &component.image_reference,
1440                    &component.id,
1441                )
1442            }
1443            // Component is running and we requested to scale to some amount or unbounded, scale component
1444            (hash_map::Entry::Occupied(mut entry), Some(max)) => {
1445                let component = entry.get_mut();
1446                let config_changed =
1447                    &config != component.handler.config_data.read().await.config_names();
1448
1449                // Create the event first to avoid borrowing the component
1450                // This event is idempotent.
1451                let event = crate::event::component_scaled(
1452                    claims.as_ref(),
1453                    &component.annotations,
1454                    host_id,
1455                    max,
1456                    &component.image_reference,
1457                    &component.id,
1458                );
1459
1460                // Modify scale only if the requested max differs from the current max or if the configuration has changed
1461                if component.max_instances != max || config_changed {
1462                    // We must partially clone the handler as we can't be sharing the targets between components
1463                    let handler = component.handler.copy_for_new();
1464                    if config_changed {
1465                        let (config, secrets) = self
1466                            .fetch_config_and_secrets(
1467                                &config,
1468                                claims_token.as_ref().map(|c| &c.jwt),
1469                                annotations.get("wasmcloud.dev/appspec"),
1470                            )
1471                            .await?;
1472                        *handler.config_data.write().await = config;
1473                        *handler.secrets.write().await = secrets;
1474                    }
1475                    let instance = self
1476                        .instantiate_component(
1477                            annotations,
1478                            Arc::clone(&component_ref),
1479                            Arc::clone(&component.id),
1480                            max,
1481                            limits,
1482                            component.component.clone(),
1483                            handler,
1484                        )
1485                        .await
1486                        .context("failed to instantiate component")?;
1487                    let component = entry.insert(instance);
1488                    self.stop_component(&component, host_id)
1489                        .await
1490                        .context("failed to stop component after scaling")?;
1491
1492                    info!(?component_ref, ?max, "component scaled");
1493                } else {
1494                    debug!(?component_ref, ?max, "component already at desired scale");
1495                }
1496                event
1497            }
1498        };
1499
1500        self.event_publisher
1501            .publish_event("component_scaled", scaled_event)
1502            .await?;
1503
1504        Ok(())
1505    }
1506
1507    // TODO(#1548): With component IDs, new component references, configuration, etc, we're going to need to do some
1508    // design thinking around how update component should work. Should it be limited to a single host or latticewide?
1509    // Should it also update configuration, or is that separate? Should scaling be done via an update?
1510    #[instrument(level = "debug", skip_all)]
1511    pub(crate) async fn handle_update_component(
1512        self: Arc<Self>,
1513        payload: impl AsRef<[u8]>,
1514    ) -> anyhow::Result<CtlResponse<()>> {
1515        let cmd = serde_json::from_slice::<UpdateComponentCommand>(payload.as_ref())
1516            .context("failed to deserialize component update command")?;
1517        <Self as ControlInterfaceServer>::handle_update_component(self, cmd).await
1518    }
1519
1520    async fn handle_update_component_task(
1521        &self,
1522        component_id: Arc<str>,
1523        new_component_ref: Arc<str>,
1524        host_id: &str,
1525        annotations: Option<BTreeMap<String, String>>,
1526    ) -> anyhow::Result<()> {
1527        // NOTE: This block is specifically scoped to ensure we drop the read lock on `self.components` before
1528        // we attempt to grab a write lock.
1529        let component = {
1530            let components = self.components.read().await;
1531            let existing_component = components
1532                .get(&*component_id)
1533                .context("component not found")?;
1534            let annotations = annotations.unwrap_or_default().into_iter().collect();
1535
1536            // task is a no-op if the component image reference is the same
1537            if existing_component.image_reference == new_component_ref {
1538                info!(%component_id, %new_component_ref, "component already updated");
1539                return Ok(());
1540            }
1541
1542            let new_component = self.fetch_component(&new_component_ref).await?;
1543            let new_component = wasmcloud_runtime::Component::new(
1544                &self.runtime,
1545                &new_component,
1546                existing_component.limits,
1547            )
1548            .context("failed to initialize component")?;
1549            let new_claims = new_component.claims().cloned();
1550            if let Some(ref claims) = new_claims {
1551                self.store_claims(Claims::Component(claims.clone()))
1552                    .await
1553                    .context("failed to store claims")?;
1554            }
1555
1556            let max = existing_component.max_instances;
1557            let Ok(component) = self
1558                .instantiate_component(
1559                    &annotations,
1560                    Arc::clone(&new_component_ref),
1561                    Arc::clone(&component_id),
1562                    max,
1563                    existing_component.limits,
1564                    new_component,
1565                    existing_component.handler.copy_for_new(),
1566                )
1567                .await
1568            else {
1569                bail!("failed to instantiate component from new reference");
1570            };
1571
1572            info!(%new_component_ref, "component updated");
1573            self.event_publisher
1574                .publish_event(
1575                    "component_scaled",
1576                    crate::event::component_scaled(
1577                        new_claims.as_ref(),
1578                        &component.annotations,
1579                        host_id,
1580                        max,
1581                        new_component_ref,
1582                        &component_id,
1583                    ),
1584                )
1585                .await?;
1586
1587            // TODO(#1548): If this errors, we need to rollback
1588            self.stop_component(&component, host_id)
1589                .await
1590                .context("failed to stop old component")?;
1591            self.event_publisher
1592                .publish_event(
1593                    "component_scaled",
1594                    crate::event::component_scaled(
1595                        component.claims(),
1596                        &component.annotations,
1597                        host_id,
1598                        0_usize,
1599                        &component.image_reference,
1600                        &component.id,
1601                    ),
1602                )
1603                .await?;
1604
1605            component
1606        };
1607
1608        self.components
1609            .write()
1610            .await
1611            .insert(component_id.to_string(), component);
1612        Ok(())
1613    }
1614
1615    #[instrument(level = "debug", skip_all)]
1616    pub(crate) async fn handle_start_provider(
1617        self: Arc<Self>,
1618        payload: impl AsRef<[u8]>,
1619    ) -> anyhow::Result<Option<CtlResponse<()>>> {
1620        let cmd = serde_json::from_slice::<StartProviderCommand>(payload.as_ref())
1621            .context("failed to deserialize provider start command")?;
1622        <Self as ControlInterfaceServer>::handle_start_provider(self, cmd).await
1623    }
1624
1625    #[instrument(level = "debug", skip_all)]
1626    async fn handle_start_provider_task(
1627        self: Arc<Self>,
1628        config_names: &[String],
1629        provider_id: &str,
1630        provider_ref: &str,
1631        annotations: BTreeMap<String, String>,
1632        host_id: &str,
1633    ) -> anyhow::Result<()> {
1634        trace!(provider_ref, provider_id, "start provider task");
1635
1636        let registry_config = self.registry_config.read().await;
1637        let provider_ref =
1638            ResourceRef::try_from(provider_ref).context("failed to parse provider reference")?;
1639        let (path, claims_token) = match &provider_ref {
1640            ResourceRef::Builtin(..) => (None, None),
1641            _ => {
1642                let (path, claims_token) = crate::fetch_provider(
1643                    &provider_ref,
1644                    host_id,
1645                    self.host_config.allow_file_load,
1646                    &self.host_config.oci_opts,
1647                    &registry_config,
1648                )
1649                .await
1650                .context("failed to fetch provider")?;
1651                (Some(path), claims_token)
1652            }
1653        };
1654        let claims = claims_token.as_ref().map(|t| t.claims.clone());
1655
1656        if let Some(claims) = claims.clone() {
1657            self.store_claims(Claims::Provider(claims))
1658                .await
1659                .context("failed to store claims")?;
1660        }
1661
1662        let annotations: Annotations = annotations.into_iter().collect();
1663
1664        let PolicyResponse {
1665            permitted,
1666            request_id,
1667            message,
1668        } = self
1669            .policy_manager
1670            .evaluate_start_provider(
1671                provider_id,
1672                provider_ref.as_ref(),
1673                &annotations,
1674                claims.as_ref(),
1675            )
1676            .await?;
1677        ensure!(
1678            permitted,
1679            "policy denied request to start provider `{request_id}`: `{message:?}`",
1680        );
1681
1682        let component_specification = self
1683            .get_component_spec(provider_id)
1684            .await?
1685            .unwrap_or_else(|| ComponentSpecification::new(provider_ref.as_ref()));
1686
1687        self.store_component_spec(&provider_id, &component_specification)
1688            .await?;
1689        self.update_host_with_spec(&provider_id, &component_specification)
1690            .await?;
1691
1692        let mut providers = self.providers.write().await;
1693        if let hash_map::Entry::Vacant(entry) = providers.entry(provider_id.into()) {
1694            let provider_xkey = XKey::new();
1695            // We only need to store the public key of the provider xkey, as the private key is only needed by the provider
1696            let xkey = XKey::from_public_key(&provider_xkey.public_key())
1697                .context("failed to create XKey from provider public key xkey")?;
1698            // Generate the HostData and ConfigBundle for the provider
1699            let (host_data, config_bundle) = self
1700                .prepare_provider_config(
1701                    config_names,
1702                    claims_token.as_ref(),
1703                    provider_id,
1704                    &provider_xkey,
1705                    &annotations,
1706                )
1707                .await?;
1708            let config_bundle = Arc::new(RwLock::new(config_bundle));
1709            // Used by provider child tasks (health check, config watch, process restarter) to
1710            // know when to shutdown.
1711            let shutdown = Arc::new(AtomicBool::new(false));
1712            let tasks = match (path, &provider_ref) {
1713                (Some(path), ..) => {
1714                    Arc::clone(&self)
1715                        .start_binary_provider(
1716                            path,
1717                            host_data,
1718                            Arc::clone(&config_bundle),
1719                            provider_xkey,
1720                            provider_id,
1721                            // Arguments to allow regenerating configuration later
1722                            config_names.to_vec(),
1723                            claims_token.clone(),
1724                            annotations.clone(),
1725                            shutdown.clone(),
1726                        )
1727                        .await?
1728                }
1729                (None, ResourceRef::Builtin(name)) => match *name {
1730                    "http-client" if self.experimental_features.builtin_http_client => {
1731                        self.start_http_client_provider(host_data, provider_xkey, provider_id)
1732                            .await?
1733                    }
1734                    "http-client" => {
1735                        bail!("feature `builtin-http-client` is not enabled, denying start")
1736                    }
1737                    "http-server" if self.experimental_features.builtin_http_server => {
1738                        self.start_http_server_provider(host_data, provider_xkey, provider_id)
1739                            .await?
1740                    }
1741                    "http-server" => {
1742                        bail!("feature `builtin-http-server` is not enabled, denying start")
1743                    }
1744                    "messaging-nats" if self.experimental_features.builtin_messaging_nats => {
1745                        self.start_messaging_nats_provider(host_data, provider_xkey, provider_id)
1746                            .await?
1747                    }
1748                    "messaging-nats" => {
1749                        bail!("feature `builtin-messaging-nats` is not enabled, denying start")
1750                    }
1751                    _ => bail!("unknown builtin name: {name}"),
1752                },
1753                _ => bail!("invalid provider reference"),
1754            };
1755
1756            info!(
1757                provider_ref = provider_ref.as_ref(),
1758                provider_id, "provider started"
1759            );
1760            self.event_publisher
1761                .publish_event(
1762                    "provider_started",
1763                    crate::event::provider_started(
1764                        claims.as_ref(),
1765                        &annotations,
1766                        host_id,
1767                        &provider_ref,
1768                        provider_id,
1769                    ),
1770                )
1771                .await?;
1772
1773            // Add the provider
1774            entry.insert(Provider {
1775                tasks,
1776                annotations,
1777                claims_token,
1778                image_ref: provider_ref.as_ref().to_string(),
1779                xkey,
1780                shutdown,
1781            });
1782        } else {
1783            bail!("provider is already running with that ID")
1784        }
1785
1786        Ok(())
1787    }
1788
1789    #[instrument(level = "debug", skip_all)]
1790    pub(crate) async fn handle_stop_provider(
1791        &self,
1792        payload: impl AsRef<[u8]>,
1793    ) -> anyhow::Result<CtlResponse<()>> {
1794        let cmd = serde_json::from_slice::<StopProviderCommand>(payload.as_ref())
1795            .context("failed to deserialize provider stop command")?;
1796        <Self as ControlInterfaceServer>::handle_stop_provider(self, cmd).await
1797    }
1798
1799    #[instrument(level = "debug", skip_all)]
1800    pub(crate) async fn handle_inventory(&self) -> anyhow::Result<CtlResponse<HostInventory>> {
1801        <Self as ControlInterfaceServer>::handle_inventory(self).await
1802    }
1803
1804    #[instrument(level = "trace", skip_all)]
1805    pub(crate) async fn handle_claims(
1806        &self,
1807    ) -> anyhow::Result<CtlResponse<Vec<HashMap<String, String>>>> {
1808        <Self as ControlInterfaceServer>::handle_claims(self).await
1809    }
1810
1811    #[instrument(level = "trace", skip_all)]
1812    pub(crate) async fn handle_links(&self) -> anyhow::Result<Vec<u8>> {
1813        <Self as ControlInterfaceServer>::handle_links(self).await
1814    }
1815
1816    #[instrument(level = "trace", skip(self))]
1817    pub(crate) async fn handle_config_get(&self, config_name: &str) -> anyhow::Result<Vec<u8>> {
1818        <Self as ControlInterfaceServer>::handle_config_get(self, config_name).await
1819    }
1820
1821    #[instrument(level = "debug", skip_all)]
1822    pub(crate) async fn handle_label_put(
1823        &self,
1824        host_id: &str,
1825        payload: impl AsRef<[u8]>,
1826    ) -> anyhow::Result<CtlResponse<()>> {
1827        let host_label = serde_json::from_slice::<HostLabel>(payload.as_ref())
1828            .context("failed to deserialize put label request")?;
1829        <Self as ControlInterfaceServer>::handle_label_put(self, host_label, host_id).await
1830    }
1831
1832    #[instrument(level = "debug", skip_all)]
1833    pub(crate) async fn handle_label_del(
1834        &self,
1835        host_id: &str,
1836        payload: impl AsRef<[u8]>,
1837    ) -> anyhow::Result<CtlResponse<()>> {
1838        let label = serde_json::from_slice::<HostLabelIdentifier>(payload.as_ref())
1839            .context("failed to deserialize delete label request")?;
1840        <Self as ControlInterfaceServer>::handle_label_del(self, label, host_id).await
1841    }
1842
1843    /// Handle a new link by modifying the relevant source [ComponentSpecification]. Once
1844    /// the change is written to the LATTICEDATA store, each host in the lattice (including this one)
1845    /// will handle the new specification and update their own internal link maps via [process_component_spec_put].
1846    #[instrument(level = "debug", skip_all)]
1847    pub(crate) async fn handle_link_put(
1848        &self,
1849        payload: impl AsRef<[u8]>,
1850    ) -> anyhow::Result<CtlResponse<()>> {
1851        let link: Link = serde_json::from_slice(payload.as_ref())
1852            .context("failed to deserialize wrpc link definition")?;
1853        <Self as ControlInterfaceServer>::handle_link_put(self, link).await
1854    }
1855
1856    #[instrument(level = "debug", skip_all)]
1857    /// Remove an interface link on a source component for a specific package
1858    pub(crate) async fn handle_link_del(
1859        &self,
1860        payload: impl AsRef<[u8]>,
1861    ) -> anyhow::Result<CtlResponse<()>> {
1862        let req = serde_json::from_slice::<DeleteInterfaceLinkDefinitionRequest>(payload.as_ref())
1863            .context("failed to deserialize wrpc link definition")?;
1864        <Self as ControlInterfaceServer>::handle_link_del(self, req).await
1865    }
1866
1867    #[instrument(level = "debug", skip_all)]
1868    pub(crate) async fn handle_registries_put(
1869        &self,
1870        payload: impl AsRef<[u8]>,
1871    ) -> anyhow::Result<CtlResponse<()>> {
1872        let registry_creds: HashMap<String, RegistryCredential> =
1873            serde_json::from_slice(payload.as_ref())
1874                .context("failed to deserialize registries put command")?;
1875        <Self as ControlInterfaceServer>::handle_registries_put(self, registry_creds).await
1876    }
1877
1878    #[instrument(level = "debug", skip_all, fields(%config_name))]
1879    pub(crate) async fn handle_config_put(
1880        &self,
1881        config_name: &str,
1882        data: Bytes,
1883    ) -> anyhow::Result<CtlResponse<()>> {
1884        // Validate that the data is of the proper type by deserialing it
1885        serde_json::from_slice::<HashMap<String, String>>(&data)
1886            .context("config data should be a map of string -> string")?;
1887        <Self as ControlInterfaceServer>::handle_config_put(self, config_name, data).await
1888    }
1889
1890    #[instrument(level = "debug", skip_all, fields(%config_name))]
1891    pub(crate) async fn handle_config_delete(
1892        &self,
1893        config_name: &str,
1894    ) -> anyhow::Result<CtlResponse<()>> {
1895        <Self as ControlInterfaceServer>::handle_config_delete(self, config_name).await
1896    }
1897
1898    #[instrument(level = "debug", skip_all)]
1899    pub(crate) async fn handle_ping_hosts(
1900        &self,
1901    ) -> anyhow::Result<CtlResponse<wasmcloud_control_interface::Host>> {
1902        <Self as ControlInterfaceServer>::handle_ping_hosts(self).await
1903    }
1904
1905    // NOTE(brooksmtownsend): This is only necessary when running capability providers that were
1906    // build for wasmCloud versions before 1.2. This is a backwards-compatible
1907    // provider link definition put that is published to the provider's id, which is what
1908    // providers built for wasmCloud 1.0 expected.
1909    //
1910    // Thankfully, in a lattice where there are no "older" providers running, these publishes
1911    // will return immediately as there will be no subscribers on those topics.
1912    async fn put_backwards_compat_provider_link(&self, link: &Link) -> anyhow::Result<()> {
1913        // Only attempt to publish the backwards-compatible provider link definition if the link
1914        // does not contain any secret values.
1915        let source_config_contains_secret = link
1916            .source_config()
1917            .iter()
1918            .any(|c| c.starts_with(SECRET_PREFIX));
1919        let target_config_contains_secret = link
1920            .target_config()
1921            .iter()
1922            .any(|c| c.starts_with(SECRET_PREFIX));
1923        if source_config_contains_secret || target_config_contains_secret {
1924            debug!("link contains secrets and is not backwards compatible, skipping");
1925            return Ok(());
1926        }
1927        let provider_link = self
1928            .resolve_link_config(link.clone(), None, None, &XKey::new())
1929            .await
1930            .context("failed to resolve link config")?;
1931
1932        if let Err(e) = self
1933            .provider_manager
1934            .put_link(&provider_link, link.source_id())
1935            .await
1936        {
1937            warn!(
1938                ?e,
1939                "failed to publish backwards-compatible provider link to source"
1940            );
1941        }
1942
1943        if let Err(e) = self
1944            .provider_manager
1945            .put_link(&provider_link, link.target())
1946            .await
1947        {
1948            warn!(
1949                ?e,
1950                "failed to publish backwards-compatible provider link to target"
1951            );
1952        }
1953
1954        Ok(())
1955    }
1956
1957    /// Publishes a link to a provider running on this host to handle.
1958    #[instrument(level = "debug", skip_all)]
1959    async fn put_provider_link(&self, provider: &Provider, link: &Link) -> anyhow::Result<()> {
1960        let provider_link = self
1961            .resolve_link_config(
1962                link.clone(),
1963                provider.claims_token.as_ref().map(|t| &t.jwt),
1964                provider.annotations.get("wasmcloud.dev/appspec"),
1965                &provider.xkey,
1966            )
1967            .await
1968            .context("failed to resolve link config and secrets")?;
1969
1970        self.provider_manager
1971            .put_link(&provider_link, &provider.xkey.public_key())
1972            .await
1973            .context("failed to publish provider link definition put")
1974    }
1975
1976    /// Publishes a delete link to the lattice for all instances of a provider to handle
1977    /// Right now this is publishing _both_ to the source and the target in order to
1978    /// ensure that the provider is aware of the link delete. This would cause problems if a provider
1979    /// is linked to a provider (which it should never be.)
1980    #[instrument(level = "debug", skip(self))]
1981    async fn del_provider_link(&self, link: &Link) -> anyhow::Result<()> {
1982        // The provider expects the [`wasmcloud_core::InterfaceLinkDefinition`]
1983        let link = wasmcloud_core::InterfaceLinkDefinition {
1984            source_id: link.source_id().to_string(),
1985            target: link.target().to_string(),
1986            wit_namespace: link.wit_namespace().to_string(),
1987            wit_package: link.wit_package().to_string(),
1988            name: link.name().to_string(),
1989            interfaces: link.interfaces().clone(),
1990            // Configuration isn't needed for deletion
1991            ..Default::default()
1992        };
1993        let source_id = &link.source_id;
1994        let target = &link.target;
1995
1996        let (source_result, target_result) = futures::future::join(
1997            self.provider_manager.delete_link(&link, source_id),
1998            self.provider_manager.delete_link(&link, target),
1999        )
2000        .await;
2001
2002        source_result
2003            .and(target_result)
2004            .context("failed to publish provider link definition delete")
2005    }
2006
2007    async fn fetch_config_and_secrets(
2008        &self,
2009        config_names: &[String],
2010        entity_jwt: Option<&String>,
2011        application: Option<&String>,
2012    ) -> anyhow::Result<(ConfigBundle, HashMap<String, SecretBox<SecretValue>>)> {
2013        let (secret_names, config_names) = config_names
2014            .iter()
2015            .map(|s| s.to_string())
2016            .partition(|name| name.starts_with(SECRET_PREFIX));
2017
2018        let config = self
2019            .config_generator
2020            .generate(config_names)
2021            .await
2022            .context("Unable to fetch requested config")?;
2023
2024        let secrets = self
2025            .secrets_manager
2026            .fetch_secrets(secret_names, entity_jwt, &self.host_token.jwt, application)
2027            .await
2028            .context("Unable to fetch requested secrets")?;
2029
2030        Ok((config, secrets))
2031    }
2032
2033    /// Validates that the provided configuration names exist in the store and are valid.
2034    ///
2035    /// For any configuration that starts with `SECRET_`, the configuration is expected to be a secret reference.
2036    /// For any other configuration, the configuration is expected to be a [`HashMap<String, String>`].
2037    async fn validate_config<I>(&self, config_names: I) -> anyhow::Result<()>
2038    where
2039        I: IntoIterator<Item: AsRef<str>>,
2040    {
2041        let config_store = self.config_store.clone();
2042        let validation_errors =
2043            futures::future::join_all(config_names.into_iter().map(|config_name| {
2044                let config_store = config_store.clone();
2045                let config_name = config_name.as_ref().to_string();
2046                async move {
2047                    match config_store.get(&config_name).await {
2048                        Ok(Some(_)) => None,
2049                        Ok(None) if config_name.starts_with(SECRET_PREFIX) => Some(format!(
2050                            "Secret reference {config_name} not found in config store"
2051                        )),
2052                        Ok(None) => Some(format!(
2053                            "Configuration {config_name} not found in config store"
2054                        )),
2055                        Err(e) => Some(e.to_string()),
2056                    }
2057                }
2058            }))
2059            .await;
2060
2061        // NOTE(brooksmtownsend): Not using `join` here because it requires a `String` and we
2062        // need to flatten out the `None` values.
2063        let validation_errors = validation_errors
2064            .into_iter()
2065            .flatten()
2066            .fold(String::new(), |acc, e| acc + &e + ". ");
2067        if !validation_errors.is_empty() {
2068            bail!(format!(
2069                "Failed to validate configuration and secrets. {validation_errors}",
2070            ));
2071        }
2072
2073        Ok(())
2074    }
2075
2076    /// Transform a [`wasmcloud_control_interface::Link`] into a [`wasmcloud_core::InterfaceLinkDefinition`]
2077    /// by fetching the source and target configurations and secrets, and encrypting the secrets.
2078    async fn resolve_link_config(
2079        &self,
2080        link: Link,
2081        provider_jwt: Option<&String>,
2082        application: Option<&String>,
2083        provider_xkey: &XKey,
2084    ) -> anyhow::Result<wasmcloud_core::InterfaceLinkDefinition> {
2085        let (source_bundle, raw_source_secrets) = self
2086            .fetch_config_and_secrets(link.source_config().as_slice(), provider_jwt, application)
2087            .await?;
2088        let (target_bundle, raw_target_secrets) = self
2089            .fetch_config_and_secrets(link.target_config().as_slice(), provider_jwt, application)
2090            .await?;
2091
2092        let source_config = source_bundle.get_config().await;
2093        let target_config = target_bundle.get_config().await;
2094        // NOTE(brooksmtownsend): This trait import is used here to ensure we're only exposing secret
2095        // values when we need them.
2096        use secrecy::ExposeSecret;
2097        let source_secrets_map: HashMap<String, wasmcloud_core::secrets::SecretValue> =
2098            raw_source_secrets
2099                .iter()
2100                .map(|(k, v)| match v.expose_secret() {
2101                    SecretValue::String(s) => (
2102                        k.clone(),
2103                        wasmcloud_core::secrets::SecretValue::String(s.to_owned()),
2104                    ),
2105                    SecretValue::Bytes(b) => (
2106                        k.clone(),
2107                        wasmcloud_core::secrets::SecretValue::Bytes(b.to_owned()),
2108                    ),
2109                })
2110                .collect();
2111        let target_secrets_map: HashMap<String, wasmcloud_core::secrets::SecretValue> =
2112            raw_target_secrets
2113                .iter()
2114                .map(|(k, v)| match v.expose_secret() {
2115                    SecretValue::String(s) => (
2116                        k.clone(),
2117                        wasmcloud_core::secrets::SecretValue::String(s.to_owned()),
2118                    ),
2119                    SecretValue::Bytes(b) => (
2120                        k.clone(),
2121                        wasmcloud_core::secrets::SecretValue::Bytes(b.to_owned()),
2122                    ),
2123                })
2124                .collect();
2125        // Serializing & sealing an empty map results in a non-empty Vec, which is difficult to tell the
2126        // difference between an empty map and an encrypted empty map. To avoid this, we explicitly handle
2127        // the case where the map is empty.
2128        let source_secrets = if source_secrets_map.is_empty() {
2129            None
2130        } else {
2131            Some(
2132                serde_json::to_vec(&source_secrets_map)
2133                    .map(|secrets| self.secrets_xkey.seal(&secrets, provider_xkey))
2134                    .context("failed to serialize and encrypt source secrets")??,
2135            )
2136        };
2137        let target_secrets = if target_secrets_map.is_empty() {
2138            None
2139        } else {
2140            Some(
2141                serde_json::to_vec(&target_secrets_map)
2142                    .map(|secrets| self.secrets_xkey.seal(&secrets, provider_xkey))
2143                    .context("failed to serialize and encrypt target secrets")??,
2144            )
2145        };
2146
2147        Ok(wasmcloud_core::InterfaceLinkDefinition {
2148            source_id: link.source_id().to_string(),
2149            target: link.target().to_string(),
2150            name: link.name().to_string(),
2151            wit_namespace: link.wit_namespace().to_string(),
2152            wit_package: link.wit_package().to_string(),
2153            interfaces: link.interfaces().clone(),
2154            source_config: source_config.clone(),
2155            target_config: target_config.clone(),
2156            source_secrets,
2157            target_secrets,
2158        })
2159    }
2160}
2161
2162/// Helper function to transform a Vec of [`Link`]s into the structure components expect to be able
2163/// to quickly look up the desired target for a given interface
2164///
2165/// # Arguments
2166/// - links: A Vec of [`Link`]s
2167///
2168/// # Returns
2169/// - A `HashMap` in the form of `link_name` -> `instance` -> target
2170fn component_import_links(links: &[Link]) -> HashMap<Box<str>, HashMap<Box<str>, Box<str>>> {
2171    let mut m = HashMap::new();
2172    for link in links {
2173        let instances: &mut HashMap<Box<str>, Box<str>> = m
2174            .entry(link.name().to_string().into_boxed_str())
2175            .or_default();
2176        for interface in link.interfaces() {
2177            instances.insert(
2178                format!(
2179                    "{}:{}/{interface}",
2180                    link.wit_namespace(),
2181                    link.wit_package(),
2182                )
2183                .into_boxed_str(),
2184                link.target().to_string().into_boxed_str(),
2185            );
2186        }
2187    }
2188    m
2189}
2190
2191fn human_friendly_uptime(uptime: Duration) -> String {
2192    // strip sub-seconds, then convert to human-friendly format
2193    humantime::format_duration(
2194        uptime.saturating_sub(Duration::from_nanos(uptime.subsec_nanos().into())),
2195    )
2196    .to_string()
2197}
2198
2199/// Helper function to inject trace context into NATS headers
2200pub fn injector_to_headers(injector: &TraceContextInjector) -> async_nats::header::HeaderMap {
2201    injector
2202        .iter()
2203        .filter_map(|(k, v)| {
2204            // There's not really anything we can do about headers that don't parse
2205            let name = async_nats::header::HeaderName::from_str(k.as_str()).ok()?;
2206            let value = async_nats::header::HeaderValue::from_str(v.as_str()).ok()?;
2207            Some((name, value))
2208        })
2209        .collect()
2210}
2211
2212#[cfg(test)]
2213mod test {
2214    // Ensure that the helper function to translate a list of links into a map of imports works as expected
2215    #[test]
2216    fn can_compute_component_links() {
2217        use std::collections::HashMap;
2218        use wasmcloud_control_interface::Link;
2219
2220        let links = vec![
2221            Link::builder()
2222                .source_id("source_component")
2223                .target("kv-redis")
2224                .wit_namespace("wasi")
2225                .wit_package("keyvalue")
2226                .interfaces(vec!["atomics".into(), "store".into()])
2227                .name("default")
2228                .build()
2229                .expect("failed to build link"),
2230            Link::builder()
2231                .source_id("source_component")
2232                .target("kv-vault")
2233                .wit_namespace("wasi")
2234                .wit_package("keyvalue")
2235                .interfaces(vec!["atomics".into(), "store".into()])
2236                .name("secret")
2237                .source_config(vec![])
2238                .target_config(vec!["my-secret".into()])
2239                .build()
2240                .expect("failed to build link"),
2241            Link::builder()
2242                .source_id("source_component")
2243                .target("kv-vault-offsite")
2244                .wit_namespace("wasi")
2245                .wit_package("keyvalue")
2246                .interfaces(vec!["atomics".into()])
2247                .name("secret")
2248                .source_config(vec![])
2249                .target_config(vec!["my-secret".into()])
2250                .build()
2251                .expect("failed to build link"),
2252            Link::builder()
2253                .source_id("http")
2254                .target("source_component")
2255                .wit_namespace("wasi")
2256                .wit_package("http")
2257                .interfaces(vec!["incoming-handler".into()])
2258                .name("default")
2259                .source_config(vec!["some-port".into()])
2260                .target_config(vec![])
2261                .build()
2262                .expect("failed to build link"),
2263            Link::builder()
2264                .source_id("source_component")
2265                .target("httpclient")
2266                .wit_namespace("wasi")
2267                .wit_package("http")
2268                .interfaces(vec!["outgoing-handler".into()])
2269                .name("default")
2270                .source_config(vec![])
2271                .target_config(vec!["some-port".into()])
2272                .build()
2273                .expect("failed to build link"),
2274            Link::builder()
2275                .source_id("source_component")
2276                .target("other_component")
2277                .wit_namespace("custom")
2278                .wit_package("foo")
2279                .interfaces(vec!["bar".into(), "baz".into()])
2280                .name("default")
2281                .source_config(vec![])
2282                .target_config(vec![])
2283                .build()
2284                .expect("failed to build link"),
2285            Link::builder()
2286                .source_id("other_component")
2287                .target("target")
2288                .wit_namespace("wit")
2289                .wit_package("package")
2290                .interfaces(vec!["interface3".into()])
2291                .name("link2")
2292                .source_config(vec![])
2293                .target_config(vec![])
2294                .build()
2295                .expect("failed to build link"),
2296        ];
2297
2298        let links_map = super::component_import_links(&links);
2299
2300        // Expected structure:
2301        // {
2302        //     "default": {
2303        //         "wasi:keyvalue": {
2304        //             "atomics": "kv-redis",
2305        //             "store": "kv-redis"
2306        //         },
2307        //         "wasi:http": {
2308        //             "incoming-handler": "source_component"
2309        //         },
2310        //         "custom:foo": {
2311        //             "bar": "other_component",
2312        //             "baz": "other_component"
2313        //         }
2314        //     },
2315        //     "secret": {
2316        //         "wasi:keyvalue": {
2317        //             "atomics": "kv-vault-offsite",
2318        //             "store": "kv-vault"
2319        //         }
2320        //     },
2321        //     "link2": {
2322        //         "wit:package": {
2323        //             "interface3": "target"
2324        //         }
2325        //     }
2326        // }
2327        let expected_result = HashMap::from_iter([
2328            (
2329                "default".into(),
2330                HashMap::from([
2331                    ("wasi:keyvalue/atomics".into(), "kv-redis".into()),
2332                    ("wasi:keyvalue/store".into(), "kv-redis".into()),
2333                    (
2334                        "wasi:http/incoming-handler".into(),
2335                        "source_component".into(),
2336                    ),
2337                    ("wasi:http/outgoing-handler".into(), "httpclient".into()),
2338                    ("custom:foo/bar".into(), "other_component".into()),
2339                    ("custom:foo/baz".into(), "other_component".into()),
2340                ]),
2341            ),
2342            (
2343                "secret".into(),
2344                HashMap::from([
2345                    ("wasi:keyvalue/atomics".into(), "kv-vault-offsite".into()),
2346                    ("wasi:keyvalue/store".into(), "kv-vault".into()),
2347                ]),
2348            ),
2349            (
2350                "link2".into(),
2351                HashMap::from([("wit:package/interface3".into(), "target".into())]),
2352            ),
2353        ]);
2354
2355        assert_eq!(links_map, expected_result);
2356    }
2357}