wasmcloud_host/wasmbus/
mod.rs

1#![allow(clippy::type_complexity)]
2
3use core::sync::atomic::Ordering;
4
5use std::collections::hash_map::Entry;
6use std::collections::{hash_map, BTreeMap, HashMap};
7use std::env::consts::{ARCH, FAMILY, OS};
8use std::future::Future;
9use std::num::NonZeroUsize;
10use std::ops::Deref;
11use std::pin::Pin;
12use std::str::FromStr;
13use std::sync::atomic::AtomicBool;
14use std::sync::Arc;
15use std::task::{Context, Poll};
16use std::time::Duration;
17
18use anyhow::{anyhow, bail, ensure, Context as _};
19use async_nats::jetstream::kv::Store;
20use bytes::{BufMut, Bytes, BytesMut};
21use claims::{Claims, StoredClaims};
22use cloudevents::{EventBuilder, EventBuilderV10};
23use ctl::ControlInterfaceServer;
24use futures::future::Either;
25use futures::stream::{AbortHandle, Abortable, SelectAll};
26use futures::{join, stream, try_join, Stream, StreamExt, TryFutureExt, TryStreamExt};
27use hyper_util::rt::{TokioExecutor, TokioIo};
28use nkeys::{KeyPair, KeyPairType, XKey};
29use providers::Provider;
30use secrecy::SecretBox;
31use serde::{Deserialize, Serialize};
32use serde_json::json;
33use sysinfo::System;
34use tokio::io::AsyncWrite;
35use tokio::net::TcpListener;
36use tokio::spawn;
37use tokio::sync::{mpsc, watch, RwLock, Semaphore};
38use tokio::task::{JoinHandle, JoinSet};
39use tokio::time::{interval_at, timeout, Instant};
40use tokio_stream::wrappers::IntervalStream;
41use tracing::{debug, debug_span, error, info, instrument, trace, warn, Instrument as _};
42use tracing_opentelemetry::OpenTelemetrySpanExt;
43use wascap::jwt;
44use wasmcloud_control_interface::{
45    ComponentAuctionAck, ComponentAuctionRequest, ComponentDescription, CtlResponse,
46    DeleteInterfaceLinkDefinitionRequest, HostInventory, HostLabel, HostLabelIdentifier, Link,
47    ProviderAuctionAck, ProviderAuctionRequest, ProviderDescription, RegistryCredential,
48    ScaleComponentCommand, StartProviderCommand, StopHostCommand, StopProviderCommand,
49    UpdateComponentCommand,
50};
51use wasmcloud_core::{ComponentId, CTL_API_VERSION_1};
52use wasmcloud_runtime::capability::secrets::store::SecretValue;
53use wasmcloud_runtime::component::{from_string_map, Limits, WrpcServeEvent};
54use wasmcloud_runtime::Runtime;
55use wasmcloud_secrets_types::SECRET_PREFIX;
56use wasmcloud_tracing::context::TraceContextInjector;
57use wasmcloud_tracing::{global, InstrumentationScope, KeyValue};
58
59use crate::registry::RegistryCredentialExt;
60use crate::wasmbus::jetstream::create_bucket;
61use crate::workload_identity::WorkloadIdentityConfig;
62use crate::{
63    fetch_component, HostMetrics, OciConfig, PolicyHostInfo, PolicyManager, PolicyResponse,
64    RegistryAuth, RegistryConfig, RegistryType, ResourceRef, SecretsManager,
65};
66
67mod claims;
68mod ctl;
69mod event;
70mod experimental;
71mod handler;
72mod jetstream;
73mod providers;
74
75pub mod config;
76/// wasmCloud host configuration
77pub mod host_config;
78
79pub use self::experimental::Features;
80pub use self::host_config::Host as HostConfig;
81pub use jetstream::ComponentSpecification;
82
83use self::config::{BundleGenerator, ConfigBundle};
84use self::handler::Handler;
85
86const MAX_INVOCATION_CHANNEL_SIZE: usize = 5000;
87const MIN_INVOCATION_CHANNEL_SIZE: usize = 256;
88
89#[derive(Debug)]
90struct Queue {
91    all_streams: SelectAll<async_nats::Subscriber>,
92}
93
94impl Stream for Queue {
95    type Item = async_nats::Message;
96
97    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
98        self.all_streams.poll_next_unpin(cx)
99    }
100}
101
102#[derive(Clone, Default)]
103struct AsyncBytesMut(Arc<std::sync::Mutex<BytesMut>>);
104
105impl AsyncWrite for AsyncBytesMut {
106    fn poll_write(
107        self: Pin<&mut Self>,
108        _cx: &mut Context<'_>,
109        buf: &[u8],
110    ) -> Poll<Result<usize, std::io::Error>> {
111        Poll::Ready({
112            self.0
113                .lock()
114                .map_err(|e| std::io::Error::other(e.to_string()))?
115                .put_slice(buf);
116            Ok(buf.len())
117        })
118    }
119
120    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
121        Poll::Ready(Ok(()))
122    }
123
124    fn poll_shutdown(
125        self: Pin<&mut Self>,
126        _cx: &mut Context<'_>,
127    ) -> Poll<Result<(), std::io::Error>> {
128        Poll::Ready(Ok(()))
129    }
130}
131
132impl TryFrom<AsyncBytesMut> for Vec<u8> {
133    type Error = anyhow::Error;
134
135    fn try_from(buf: AsyncBytesMut) -> Result<Self, Self::Error> {
136        buf.0
137            .lock()
138            .map(|buf| buf.clone().into())
139            .map_err(|e| anyhow!(e.to_string()).context("failed to lock"))
140    }
141}
142
143impl Queue {
144    #[instrument]
145    async fn new(
146        nats: &async_nats::Client,
147        topic_prefix: &str,
148        lattice: &str,
149        host_key: &KeyPair,
150        component_auction: bool,
151        provider_auction: bool,
152    ) -> anyhow::Result<Self> {
153        let host_id = host_key.public_key();
154        let mut subs = vec![
155            Either::Left(nats.subscribe(format!(
156                "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.registry.put",
157            ))),
158            Either::Left(nats.subscribe(format!(
159                "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.host.ping",
160            ))),
161            Either::Right(nats.queue_subscribe(
162                format!("{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.link.*"),
163                format!("{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.link",),
164            )),
165            Either::Right(nats.queue_subscribe(
166                format!("{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.claims.get"),
167                format!("{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.claims"),
168            )),
169            Either::Left(nats.subscribe(format!(
170                "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.component.*.{host_id}"
171            ))),
172            Either::Left(nats.subscribe(format!(
173                "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.provider.*.{host_id}"
174            ))),
175            Either::Left(nats.subscribe(format!(
176                "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.label.*.{host_id}"
177            ))),
178            Either::Left(nats.subscribe(format!(
179                "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.host.*.{host_id}"
180            ))),
181            Either::Right(nats.queue_subscribe(
182                format!("{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.config.>"),
183                format!("{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.config"),
184            )),
185        ];
186        if component_auction {
187            subs.push(Either::Left(nats.subscribe(format!(
188                "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.component.auction",
189            ))));
190        }
191        if provider_auction {
192            subs.push(Either::Left(nats.subscribe(format!(
193                "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.provider.auction",
194            ))));
195        }
196        let streams = futures::future::join_all(subs)
197            .await
198            .into_iter()
199            .collect::<Result<Vec<_>, async_nats::SubscribeError>>()
200            .context("failed to subscribe to queues")?;
201        Ok(Self {
202            all_streams: futures::stream::select_all(streams),
203        })
204    }
205}
206
207type Annotations = BTreeMap<String, String>;
208
209#[derive(Debug)]
210struct Component {
211    component: wasmcloud_runtime::Component<Handler>,
212    /// Unique component identifier for this component
213    id: Arc<str>,
214    handler: Handler,
215    exports: JoinHandle<()>,
216    annotations: Annotations,
217    /// Maximum number of instances of this component that can be running at once
218    max_instances: NonZeroUsize,
219    limits: Option<Limits>,
220    image_reference: Arc<str>,
221    events: mpsc::Sender<WrpcServeEvent<<WrpcServer as wrpc_transport::Serve>::Context>>,
222    permits: Arc<Semaphore>,
223}
224
225impl Deref for Component {
226    type Target = wasmcloud_runtime::Component<Handler>;
227
228    fn deref(&self) -> &Self::Target {
229        &self.component
230    }
231}
232
233#[derive(Clone)]
234struct WrpcServer {
235    nats: wrpc_transport_nats::Client,
236    claims: Option<Arc<jwt::Claims<jwt::Component>>>,
237    id: Arc<str>,
238    image_reference: Arc<str>,
239    annotations: Arc<Annotations>,
240    policy_manager: Arc<PolicyManager>,
241    metrics: Arc<HostMetrics>,
242}
243
244struct InvocationContext {
245    start_at: Instant,
246    attributes: Vec<KeyValue>,
247    span: tracing::Span,
248}
249
250impl Deref for InvocationContext {
251    type Target = tracing::Span;
252
253    fn deref(&self) -> &Self::Target {
254        &self.span
255    }
256}
257
258impl wrpc_transport::Serve for WrpcServer {
259    type Context = InvocationContext;
260    type Outgoing = <wrpc_transport_nats::Client as wrpc_transport::Serve>::Outgoing;
261    type Incoming = <wrpc_transport_nats::Client as wrpc_transport::Serve>::Incoming;
262
263    #[instrument(
264        level = "info",
265        skip(self, paths),
266        fields(
267            component_id = ?self.id,
268            component_ref = ?self.image_reference)
269    )]
270    async fn serve(
271        &self,
272        instance: &str,
273        func: &str,
274        paths: impl Into<Arc<[Box<[Option<usize>]>]>> + Send,
275    ) -> anyhow::Result<
276        impl Stream<Item = anyhow::Result<(Self::Context, Self::Outgoing, Self::Incoming)>>
277            + Send
278            + 'static,
279    > {
280        debug!("serving invocations");
281        let invocations = self.nats.serve(instance, func, paths).await?;
282
283        let func: Arc<str> = Arc::from(func);
284        let instance: Arc<str> = Arc::from(instance);
285        let annotations = Arc::clone(&self.annotations);
286        let id = Arc::clone(&self.id);
287        let image_reference = Arc::clone(&self.image_reference);
288        let metrics = Arc::clone(&self.metrics);
289        let policy_manager = Arc::clone(&self.policy_manager);
290        let claims = self.claims.clone();
291        Ok(invocations.and_then(move |(cx, tx, rx)| {
292            let annotations = Arc::clone(&annotations);
293            let claims = claims.clone();
294            let func = Arc::clone(&func);
295            let id = Arc::clone(&id);
296            let image_reference = Arc::clone(&image_reference);
297            let instance = Arc::clone(&instance);
298            let metrics = Arc::clone(&metrics);
299            let policy_manager = Arc::clone(&policy_manager);
300            let span = tracing::info_span!("component_invocation", func = %func, id = %id, instance = %instance);
301            async move {
302                if let Some(ref cx) = cx {
303                    // Coerce the HashMap<String, Vec<String>> into a Vec<(String, String)> by
304                    // flattening the values
305                    let trace_context = cx
306                        .iter()
307                        .flat_map(|(key, value)| {
308                            value
309                                .iter()
310                                .map(|v| (key.to_string(), v.to_string()))
311                                .collect::<Vec<_>>()
312                        })
313                        .collect::<Vec<(String, String)>>();
314                    let _ = span.set_parent(wasmcloud_tracing::context::get_span_context(&trace_context));
315                }
316
317                let PolicyResponse {
318                    request_id,
319                    permitted,
320                    message,
321                } = policy_manager
322                    .evaluate_perform_invocation(
323                        &id,
324                        &image_reference,
325                        &annotations,
326                        claims.as_deref(),
327                        instance.to_string(),
328                        func.to_string(),
329                    )
330                    .instrument(debug_span!(parent: &span, "policy_check"))
331                    .await?;
332                ensure!(
333                    permitted,
334                    "policy denied request to invoke component `{request_id}`: `{message:?}`",
335                );
336
337                Ok((
338                    InvocationContext{
339                        start_at: Instant::now(),
340                        // TODO(metrics): insert information about the source once we have concrete context data
341                        attributes: vec![
342                            KeyValue::new("component.ref", image_reference),
343                            KeyValue::new("lattice", metrics.lattice_id.clone()),
344                            KeyValue::new("host", metrics.host_id.clone()),
345                            KeyValue::new("operation", format!("{instance}/{func}")),
346                        ],
347                        span,
348                    },
349                    tx,
350                    rx,
351                ))
352            }
353        }))
354    }
355}
356
357/// wasmCloud Host
358pub struct Host {
359    components: Arc<RwLock<HashMap<ComponentId, Arc<Component>>>>,
360    event_builder: EventBuilderV10,
361    friendly_name: String,
362    heartbeat: AbortHandle,
363    host_config: HostConfig,
364    host_key: Arc<KeyPair>,
365    host_token: Arc<jwt::Token<jwt::Host>>,
366    /// The Xkey used to encrypt secrets when sending them over NATS
367    secrets_xkey: Arc<XKey>,
368    labels: Arc<RwLock<BTreeMap<String, String>>>,
369    ctl_topic_prefix: String,
370    /// NATS client to use for control interface subscriptions and jetstream queries
371    ctl_nats: async_nats::Client,
372    /// NATS client to use for RPC calls
373    rpc_nats: Arc<async_nats::Client>,
374    data: Store,
375    /// Task to watch for changes in the LATTICEDATA store
376    data_watch: AbortHandle,
377    config_data: Store,
378    config_generator: BundleGenerator,
379    policy_manager: Arc<PolicyManager>,
380    secrets_manager: Arc<SecretsManager>,
381    /// The provider map is a map of provider component ID to provider
382    providers: RwLock<HashMap<String, Provider>>,
383    registry_config: RwLock<HashMap<String, RegistryConfig>>,
384    runtime: Runtime,
385    start_at: Instant,
386    stop_tx: watch::Sender<Option<Instant>>,
387    stop_rx: watch::Receiver<Option<Instant>>,
388    queue: AbortHandle,
389    // Component ID -> All Links
390    links: RwLock<HashMap<String, Vec<Link>>>,
391    component_claims: Arc<RwLock<HashMap<ComponentId, jwt::Claims<jwt::Component>>>>, // TODO: use a single map once Claims is an enum
392    provider_claims: Arc<RwLock<HashMap<String, jwt::Claims<jwt::CapabilityProvider>>>>,
393    metrics: Arc<HostMetrics>,
394    max_execution_time: Duration,
395    messaging_links:
396        Arc<RwLock<HashMap<Arc<str>, Arc<RwLock<HashMap<Box<str>, async_nats::Client>>>>>>,
397    /// Experimental features to enable in the host that gate functionality
398    experimental_features: Features,
399    ready: Arc<AtomicBool>,
400    /// A set of host tasks
401    #[allow(unused)]
402    tasks: JoinSet<()>,
403}
404
405/// Given the NATS address, authentication jwt, seed, tls requirement and optional request timeout,
406/// attempt to establish connection.
407///
408/// This function should be used to create a NATS client for Host communication, for non-host NATS
409/// clients we recommend using async-nats directly.
410///
411/// # Errors
412///
413/// Returns an error if:
414/// - Only one of JWT or seed is specified, as we cannot authenticate with only one of them
415/// - Connection fails
416pub async fn connect_nats(
417    addr: impl async_nats::ToServerAddrs,
418    jwt: Option<&String>,
419    key: Option<Arc<KeyPair>>,
420    require_tls: bool,
421    request_timeout: Option<Duration>,
422    workload_identity_config: Option<WorkloadIdentityConfig>,
423) -> anyhow::Result<async_nats::Client> {
424    let opts = match (jwt, key, workload_identity_config) {
425        (Some(jwt), Some(key), None) => {
426            async_nats::ConnectOptions::with_jwt(jwt.to_string(), move |nonce| {
427                let key = key.clone();
428                async move { key.sign(&nonce).map_err(async_nats::AuthError::new) }
429            })
430            .name("wasmbus")
431        }
432        (Some(_), None, _) | (None, Some(_), _) => {
433            bail!("cannot authenticate if only one of jwt or seed is specified")
434        }
435        (jwt, key, Some(wid_cfg)) => {
436            setup_workload_identity_nats_connect_options(jwt, key, wid_cfg).await?
437        }
438        _ => async_nats::ConnectOptions::new().name("wasmbus"),
439    };
440    let opts = if let Some(timeout) = request_timeout {
441        opts.request_timeout(Some(timeout))
442    } else {
443        opts
444    };
445    let opts = opts.require_tls(require_tls);
446    opts.connect(addr)
447        .await
448        .context("failed to connect to NATS")
449}
450
451#[cfg(unix)]
452async fn setup_workload_identity_nats_connect_options(
453    jwt: Option<&String>,
454    key: Option<Arc<KeyPair>>,
455    wid_cfg: WorkloadIdentityConfig,
456) -> anyhow::Result<async_nats::ConnectOptions> {
457    let wid_cfg = Arc::new(wid_cfg);
458    let jwt = jwt.map(String::to_string).map(Arc::new);
459    let key = key.clone();
460
461    // Return an auth callback that'll get called any time the
462    // NATS connection needs to be (re-)established. This is
463    // necessary to ensure that we always provide a recently
464    // issued JWT-SVID.
465    Ok(
466        async_nats::ConnectOptions::with_auth_callback(move |nonce| {
467            let key = key.clone();
468            let jwt = jwt.clone();
469            let wid_cfg = wid_cfg.clone();
470
471            let fetch_svid_handle = tokio::spawn(async move {
472                let mut client = spiffe::WorkloadApiClient::default()
473                    .await
474                    .map_err(async_nats::AuthError::new)?;
475                client
476                    .fetch_jwt_svid(&[wid_cfg.auth_service_audience.as_str()], None)
477                    .await
478                    .map_err(async_nats::AuthError::new)
479            });
480
481            async move {
482                let svid = fetch_svid_handle
483                    .await
484                    .map_err(async_nats::AuthError::new)?
485                    .map_err(async_nats::AuthError::new)?;
486
487                let mut auth = async_nats::Auth::new();
488                if let Some(key) = key {
489                    let signature = key.sign(&nonce).map_err(async_nats::AuthError::new)?;
490                    auth.signature = Some(signature);
491                }
492                if let Some(jwt) = jwt {
493                    auth.jwt = Some(jwt.to_string());
494                }
495                auth.token = Some(svid.token().into());
496                Ok(auth)
497            }
498        })
499        .name("wasmbus"),
500    )
501}
502
503#[cfg(target_family = "windows")]
504async fn setup_workload_identity_nats_connect_options(
505    jwt: Option<&String>,
506    key: Option<Arc<KeyPair>>,
507    wid_cfg: WorkloadIdentityConfig,
508) -> anyhow::Result<async_nats::ConnectOptions> {
509    bail!("workload identity is not supported on Windows")
510}
511
512#[derive(Debug, Default)]
513struct SupplementalConfig {
514    registry_config: Option<HashMap<String, RegistryConfig>>,
515}
516
517#[instrument(level = "debug", skip_all)]
518async fn load_supplemental_config(
519    ctl_nats: &async_nats::Client,
520    lattice: &str,
521    labels: &BTreeMap<String, String>,
522) -> anyhow::Result<SupplementalConfig> {
523    #[derive(Deserialize, Default)]
524    struct SerializedSupplementalConfig {
525        #[serde(default, rename = "registryCredentials")]
526        registry_credentials: Option<HashMap<String, RegistryCredential>>,
527    }
528
529    let cfg_topic = format!("wasmbus.cfg.{lattice}.req");
530    let cfg_payload = serde_json::to_vec(&json!({
531        "labels": labels,
532    }))
533    .context("failed to serialize config payload")?;
534
535    debug!("requesting supplemental config");
536    match ctl_nats.request(cfg_topic, cfg_payload.into()).await {
537        Ok(resp) => {
538            match serde_json::from_slice::<SerializedSupplementalConfig>(resp.payload.as_ref()) {
539                Ok(ser_cfg) => Ok(SupplementalConfig {
540                    registry_config: ser_cfg.registry_credentials.and_then(|creds| {
541                        creds
542                            .into_iter()
543                            .map(|(k, v)| {
544                                debug!(registry_url = %k, "set registry config");
545                                v.into_registry_config().map(|v| (k, v))
546                            })
547                            .collect::<anyhow::Result<_>>()
548                            .ok()
549                    }),
550                }),
551                Err(e) => {
552                    error!(
553                        ?e,
554                        "failed to deserialize supplemental config. Defaulting to empty config"
555                    );
556                    Ok(SupplementalConfig::default())
557                }
558            }
559        }
560        Err(e) => {
561            error!(
562                ?e,
563                "failed to request supplemental config. Defaulting to empty config"
564            );
565            Ok(SupplementalConfig::default())
566        }
567    }
568}
569
570#[instrument(level = "debug", skip_all)]
571async fn merge_registry_config(
572    registry_config: &RwLock<HashMap<String, RegistryConfig>>,
573    oci_opts: OciConfig,
574) -> () {
575    let mut registry_config = registry_config.write().await;
576    let allow_latest = oci_opts.allow_latest;
577    let additional_ca_paths = oci_opts.additional_ca_paths;
578
579    // update auth for specific registry, if provided
580    if let Some(reg) = oci_opts.oci_registry {
581        match registry_config.entry(reg.clone()) {
582            Entry::Occupied(_entry) => {
583                // note we don't update config here, since the config service should take priority
584                warn!(oci_registry_url = %reg, "ignoring OCI registry config, overridden by config service");
585            }
586            Entry::Vacant(entry) => {
587                debug!(oci_registry_url = %reg, "set registry config");
588                entry.insert(
589                    RegistryConfig::builder()
590                        .reg_type(RegistryType::Oci)
591                        .auth(RegistryAuth::from((
592                            oci_opts.oci_user,
593                            oci_opts.oci_password,
594                        )))
595                        .build()
596                        .expect("failed to build registry config"),
597                );
598            }
599        }
600    }
601
602    // update or create entry for all registries in allowed_insecure
603    oci_opts.allowed_insecure.into_iter().for_each(|reg| {
604        match registry_config.entry(reg.clone()) {
605            Entry::Occupied(mut entry) => {
606                debug!(oci_registry_url = %reg, "set allowed_insecure");
607                entry.get_mut().set_allow_insecure(true);
608            }
609            Entry::Vacant(entry) => {
610                debug!(oci_registry_url = %reg, "set allowed_insecure");
611                entry.insert(
612                    RegistryConfig::builder()
613                        .reg_type(RegistryType::Oci)
614                        .allow_insecure(true)
615                        .build()
616                        .expect("failed to build registry config"),
617                );
618            }
619        }
620    });
621
622    // update allow_latest for all registries
623    registry_config.iter_mut().for_each(|(url, config)| {
624        if !additional_ca_paths.is_empty() {
625            config.set_additional_ca_paths(additional_ca_paths.clone());
626        }
627        if allow_latest {
628            debug!(oci_registry_url = %url, "set allow_latest");
629        }
630        config.set_allow_latest(allow_latest);
631    });
632}
633
634impl Host {
635    const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30);
636
637    const NAME_ADJECTIVES: &'static str = "
638    autumn hidden bitter misty silent empty dry dark summer
639    icy delicate quiet white cool spring winter patient
640    twilight dawn crimson wispy weathered blue billowing
641    broken cold damp falling frosty green long late lingering
642    bold little morning muddy old red rough still small
643    sparkling bouncing shy wandering withered wild black
644    young holy solitary fragrant aged snowy proud floral
645    restless divine polished ancient purple lively nameless
646    gray orange mauve
647    ";
648
649    const NAME_NOUNS: &'static str = "
650    waterfall river breeze moon rain wind sea morning
651    snow lake sunset pine shadow leaf dawn glitter forest
652    hill cloud meadow sun glade bird brook butterfly
653    bush dew dust field fire flower firefly ladybug feather grass
654    haze mountain night pond darkness snowflake silence
655    sound sky shape stapler surf thunder violet water wildflower
656    wave water resonance sun timber dream cherry tree fog autocorrect
657    frost voice paper frog smoke star hamster ocean emoji robot
658    ";
659
660    /// Generate a friendly name for the host based on a random number.
661    /// Names we pulled from a list of friendly or neutral adjectives and nouns suitable for use in
662    /// public and on hosts/domain names
663    fn generate_friendly_name() -> Option<String> {
664        let adjectives: Vec<_> = Self::NAME_ADJECTIVES.split_whitespace().collect();
665        let nouns: Vec<_> = Self::NAME_NOUNS.split_whitespace().collect();
666        names::Generator::new(&adjectives, &nouns, names::Name::Numbered).next()
667    }
668
669    /// Construct a new [Host] returning a tuple of its [Arc] and an async shutdown function.
670    #[instrument(level = "debug", skip_all)]
671    pub async fn new(
672        config: HostConfig,
673    ) -> anyhow::Result<(Arc<Self>, impl Future<Output = anyhow::Result<()>>)> {
674        let host_key = if let Some(host_key) = &config.host_key {
675            ensure!(host_key.key_pair_type() == KeyPairType::Server);
676            Arc::clone(host_key)
677        } else {
678            Arc::new(KeyPair::new(KeyPairType::Server))
679        };
680
681        let mut labels = BTreeMap::from([
682            ("hostcore.arch".into(), ARCH.into()),
683            ("hostcore.os".into(), OS.into()),
684            ("hostcore.osfamily".into(), FAMILY.into()),
685        ]);
686        labels.extend(config.labels.clone().into_iter());
687        let friendly_name =
688            Self::generate_friendly_name().context("failed to generate friendly name")?;
689
690        let host_issuer = Arc::new(KeyPair::new_account());
691        let claims = jwt::Claims::<jwt::Host>::new(
692            friendly_name.clone(),
693            host_issuer.public_key(),
694            host_key.public_key().clone(),
695            Some(HashMap::from_iter([(
696                "self_signed".to_string(),
697                "true".to_string(),
698            )])),
699        );
700        let jwt = claims
701            .encode(&host_issuer)
702            .context("failed to encode host claims")?;
703        let host_token = Arc::new(jwt::Token { jwt, claims });
704
705        let start_evt = json!({
706            "friendly_name": friendly_name,
707            "labels": labels,
708            "uptime_seconds": 0,
709            "version": config.version,
710        });
711
712        let workload_identity_config = if config.experimental_features.workload_identity_auth {
713            Some(WorkloadIdentityConfig::from_env()?)
714        } else {
715            None
716        };
717
718        let ((ctl_nats, queue), rpc_nats) = try_join!(
719            async {
720                debug!(
721                    ctl_nats_url = config.ctl_nats_url.as_str(),
722                    "connecting to NATS control server"
723                );
724                let ctl_nats = connect_nats(
725                    config.ctl_nats_url.as_str(),
726                    config.ctl_jwt.as_ref(),
727                    config.ctl_key.clone(),
728                    config.ctl_tls,
729                    None,
730                    workload_identity_config.clone(),
731                )
732                .await
733                .context("failed to establish NATS control server connection")?;
734                let queue = Queue::new(
735                    &ctl_nats,
736                    &config.ctl_topic_prefix,
737                    &config.lattice,
738                    &host_key,
739                    config.enable_component_auction,
740                    config.enable_provider_auction,
741                )
742                .await
743                .context("failed to initialize queue")?;
744                ctl_nats.flush().await.context("failed to flush")?;
745                Ok((ctl_nats, queue))
746            },
747            async {
748                debug!(
749                    rpc_nats_url = config.rpc_nats_url.as_str(),
750                    "connecting to NATS RPC server"
751                );
752                connect_nats(
753                    config.rpc_nats_url.as_str(),
754                    config.rpc_jwt.as_ref(),
755                    config.rpc_key.clone(),
756                    config.rpc_tls,
757                    Some(config.rpc_timeout),
758                    workload_identity_config.clone(),
759                )
760                .await
761                .context("failed to establish NATS RPC server connection")
762            }
763        )?;
764
765        let start_at = Instant::now();
766
767        let heartbeat_interval = config
768            .heartbeat_interval
769            .unwrap_or(Self::DEFAULT_HEARTBEAT_INTERVAL);
770        let heartbeat_start_at = start_at
771            .checked_add(heartbeat_interval)
772            .context("failed to compute heartbeat start time")?;
773        let heartbeat = IntervalStream::new(interval_at(heartbeat_start_at, heartbeat_interval));
774
775        let (stop_tx, stop_rx) = watch::channel(None);
776
777        let (runtime, _epoch) = Runtime::builder()
778            .max_execution_time(config.max_execution_time)
779            .max_linear_memory(config.max_linear_memory)
780            .max_components(config.max_components)
781            .max_core_instances_per_component(config.max_core_instances_per_component)
782            .max_component_size(config.max_component_size)
783            .experimental_features(config.experimental_features.into())
784            .build()
785            .context("failed to build runtime")?;
786        let event_builder = EventBuilderV10::new().source(host_key.public_key());
787
788        let ctl_jetstream = if let Some(domain) = config.js_domain.as_ref() {
789            async_nats::jetstream::with_domain(ctl_nats.clone(), domain)
790        } else {
791            async_nats::jetstream::new(ctl_nats.clone())
792        };
793        let bucket = format!("LATTICEDATA_{}", config.lattice);
794        let data = create_bucket(&ctl_jetstream, &bucket).await?;
795
796        let config_bucket = format!("CONFIGDATA_{}", config.lattice);
797        let config_data = create_bucket(&ctl_jetstream, &config_bucket).await?;
798
799        let (queue_abort, queue_abort_reg) = AbortHandle::new_pair();
800        let (heartbeat_abort, heartbeat_abort_reg) = AbortHandle::new_pair();
801        let (data_watch_abort, data_watch_abort_reg) = AbortHandle::new_pair();
802
803        let supplemental_config = if config.config_service_enabled {
804            load_supplemental_config(&ctl_nats, &config.lattice, &labels).await?
805        } else {
806            SupplementalConfig::default()
807        };
808
809        let registry_config = RwLock::new(supplemental_config.registry_config.unwrap_or_default());
810        merge_registry_config(&registry_config, config.oci_opts.clone()).await;
811
812        let policy_manager = PolicyManager::new(
813            ctl_nats.clone(),
814            PolicyHostInfo {
815                public_key: host_key.public_key(),
816                lattice: config.lattice.to_string(),
817                labels: HashMap::from_iter(labels.clone()),
818            },
819            config.policy_service_config.policy_topic.clone(),
820            config.policy_service_config.policy_timeout_ms,
821            config.policy_service_config.policy_changes_topic.clone(),
822        )
823        .await?;
824
825        // If provided, secrets topic must be non-empty
826        // TODO(#2411): Validate secrets topic prefix as a valid NATS subject
827        ensure!(
828            config.secrets_topic_prefix.is_none()
829                || config
830                    .secrets_topic_prefix
831                    .as_ref()
832                    .is_some_and(|topic| !topic.is_empty()),
833            "secrets topic prefix must be non-empty"
834        );
835
836        let secrets_manager = Arc::new(SecretsManager::new(
837            &config_data,
838            config.secrets_topic_prefix.as_ref(),
839            &ctl_nats,
840        ));
841
842        let scope = InstrumentationScope::builder("wasmcloud-host")
843            .with_version(config.version.clone())
844            .with_attributes(vec![
845                KeyValue::new("host.id", host_key.public_key()),
846                KeyValue::new("host.version", config.version.clone()),
847                KeyValue::new("host.arch", ARCH),
848                KeyValue::new("host.os", OS),
849                KeyValue::new("host.osfamily", FAMILY),
850                KeyValue::new("host.friendly_name", friendly_name.clone()),
851                KeyValue::new("host.hostname", System::host_name().unwrap_or_default()),
852                KeyValue::new(
853                    "host.kernel_version",
854                    System::kernel_version().unwrap_or_default(),
855                ),
856                KeyValue::new("host.os_version", System::os_version().unwrap_or_default()),
857            ])
858            .build();
859        let meter = global::meter_with_scope(scope);
860        let metrics = HostMetrics::new(
861            &meter,
862            host_key.public_key(),
863            config.lattice.to_string(),
864            None,
865        )
866        .context("failed to create HostMetrics instance")?;
867
868        let config_generator = BundleGenerator::new(config_data.clone());
869
870        let max_execution_time_ms = config.max_execution_time;
871
872        debug!("Feature flags: {:?}", config.experimental_features);
873
874        let mut tasks = JoinSet::new();
875        let ready = Arc::new(AtomicBool::new(true));
876        if let Some(addr) = config.http_admin {
877            let socket = TcpListener::bind(addr)
878                .await
879                .context("failed to bind on HTTP administration endpoint")?;
880            let ready = Arc::clone(&ready);
881            let svc = hyper::service::service_fn(move |req| {
882                const OK: &str = r#"{"status":"ok"}"#;
883                const FAIL: &str = r#"{"status":"failure"}"#;
884                let ready = Arc::clone(&ready);
885                async move {
886                    let (http::request::Parts { method, uri, .. }, _) = req.into_parts();
887                    match (method.as_str(), uri.path()) {
888                        ("HEAD", "/livez") => Ok(http::Response::default()),
889                        ("GET", "/livez") => Ok(http::Response::new(http_body_util::Full::new(
890                            Bytes::from(OK),
891                        ))),
892                        (method, "/livez") => http::Response::builder()
893                            .status(http::StatusCode::METHOD_NOT_ALLOWED)
894                            .body(http_body_util::Full::new(Bytes::from(format!(
895                                "method `{method}` not supported for path `/livez`"
896                            )))),
897                        ("HEAD", "/readyz") => {
898                            if ready.load(Ordering::Relaxed) {
899                                Ok(http::Response::default())
900                            } else {
901                                http::Response::builder()
902                                    .status(http::StatusCode::INTERNAL_SERVER_ERROR)
903                                    .body(http_body_util::Full::default())
904                            }
905                        }
906                        ("GET", "/readyz") => {
907                            if ready.load(Ordering::Relaxed) {
908                                Ok(http::Response::new(http_body_util::Full::new(Bytes::from(
909                                    OK,
910                                ))))
911                            } else {
912                                http::Response::builder()
913                                    .status(http::StatusCode::INTERNAL_SERVER_ERROR)
914                                    .body(http_body_util::Full::new(Bytes::from(FAIL)))
915                            }
916                        }
917                        (method, "/readyz") => http::Response::builder()
918                            .status(http::StatusCode::METHOD_NOT_ALLOWED)
919                            .body(http_body_util::Full::new(Bytes::from(format!(
920                                "method `{method}` not supported for path `/readyz`"
921                            )))),
922                        (.., path) => http::Response::builder()
923                            .status(http::StatusCode::NOT_FOUND)
924                            .body(http_body_util::Full::new(Bytes::from(format!(
925                                "unknown endpoint `{path}`"
926                            )))),
927                    }
928                }
929            });
930            let srv = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
931            tasks.spawn(async move {
932                loop {
933                    let stream = match socket.accept().await {
934                        Ok((stream, _)) => stream,
935                        Err(err) => {
936                            error!(?err, "failed to accept HTTP administration connection");
937                            continue;
938                        }
939                    };
940                    let svc = svc.clone();
941                    if let Err(err) = srv.serve_connection(TokioIo::new(stream), svc).await {
942                        error!(?err, "failed to serve HTTP administration connection");
943                    }
944                }
945            });
946        }
947
948        let host = Host {
949            components: Arc::default(),
950            event_builder,
951            friendly_name,
952            heartbeat: heartbeat_abort.clone(),
953            ctl_topic_prefix: config.ctl_topic_prefix.clone(),
954            host_key,
955            host_token,
956            secrets_xkey: Arc::new(XKey::new()),
957            labels: Arc::new(RwLock::new(labels)),
958            ctl_nats,
959            rpc_nats: Arc::new(rpc_nats),
960            experimental_features: config.experimental_features,
961            host_config: config,
962            data: data.clone(),
963            data_watch: data_watch_abort.clone(),
964            config_data: config_data.clone(),
965            config_generator,
966            policy_manager,
967            secrets_manager,
968            providers: RwLock::default(),
969            registry_config,
970            runtime,
971            start_at,
972            stop_rx,
973            stop_tx,
974            queue: queue_abort.clone(),
975            links: RwLock::default(),
976            component_claims: Arc::default(),
977            provider_claims: Arc::default(),
978            metrics: Arc::new(metrics),
979            max_execution_time: max_execution_time_ms,
980            messaging_links: Arc::default(),
981            ready: Arc::clone(&ready),
982            tasks,
983        };
984
985        let host = Arc::new(host);
986        let queue = spawn({
987            let host = Arc::clone(&host);
988            async move {
989                let mut queue = Abortable::new(queue, queue_abort_reg);
990                queue
991                    .by_ref()
992                    .for_each_concurrent(None, {
993                        let host = Arc::clone(&host);
994                        move |msg| {
995                            let host = Arc::clone(&host);
996                            async move { host.handle_ctl_message(msg).await }
997                        }
998                    })
999                    .await;
1000                let deadline = { *host.stop_rx.borrow() };
1001                host.stop_tx.send_replace(deadline);
1002                if queue.is_aborted() {
1003                    info!("control interface queue task gracefully stopped");
1004                } else {
1005                    error!("control interface queue task unexpectedly stopped");
1006                }
1007            }
1008        });
1009
1010        let data_watch: JoinHandle<anyhow::Result<_>> = spawn({
1011            let data = data.clone();
1012            let host = Arc::clone(&host);
1013            async move {
1014                let data_watch = data
1015                    .watch_all()
1016                    .await
1017                    .context("failed to watch lattice data bucket")?;
1018                let mut data_watch = Abortable::new(data_watch, data_watch_abort_reg);
1019                data_watch
1020                    .by_ref()
1021                    .for_each({
1022                        let host = Arc::clone(&host);
1023                        move |entry| {
1024                            let host = Arc::clone(&host);
1025                            async move {
1026                                match entry {
1027                                    Err(error) => {
1028                                        error!("failed to watch lattice data bucket: {error}");
1029                                    }
1030                                    Ok(entry) => host.process_entry(entry).await,
1031                                }
1032                            }
1033                        }
1034                    })
1035                    .await;
1036                let deadline = { *host.stop_rx.borrow() };
1037                host.stop_tx.send_replace(deadline);
1038                if data_watch.is_aborted() {
1039                    info!("data watch task gracefully stopped");
1040                } else {
1041                    error!("data watch task unexpectedly stopped");
1042                }
1043                Ok(())
1044            }
1045        });
1046
1047        let heartbeat = spawn({
1048            let host = Arc::clone(&host);
1049            async move {
1050                let mut heartbeat = Abortable::new(heartbeat, heartbeat_abort_reg);
1051                heartbeat
1052                    .by_ref()
1053                    .for_each({
1054                        let host = Arc::clone(&host);
1055                        move |_| {
1056                            let host = Arc::clone(&host);
1057                            async move {
1058                                let heartbeat = match host.heartbeat().await {
1059                                    Ok(heartbeat) => heartbeat,
1060                                    Err(e) => {
1061                                        error!("failed to generate heartbeat: {e}");
1062                                        return;
1063                                    }
1064                                };
1065
1066                                if let Err(e) =
1067                                    host.publish_event("host_heartbeat", heartbeat).await
1068                                {
1069                                    error!("failed to publish heartbeat: {e}");
1070                                }
1071                            }
1072                        }
1073                    })
1074                    .await;
1075                let deadline = { *host.stop_rx.borrow() };
1076                host.stop_tx.send_replace(deadline);
1077                if heartbeat.is_aborted() {
1078                    info!("heartbeat task gracefully stopped");
1079                } else {
1080                    error!("heartbeat task unexpectedly stopped");
1081                }
1082            }
1083        });
1084
1085        // Process existing data without emitting events
1086        data.keys()
1087            .await
1088            .context("failed to read keys of lattice data bucket")?
1089            .map_err(|e| anyhow!(e).context("failed to read lattice data stream"))
1090            .try_filter_map(|key| async {
1091                data.entry(key)
1092                    .await
1093                    .context("failed to get entry in lattice data bucket")
1094            })
1095            .for_each(|entry| async {
1096                match entry {
1097                    Ok(entry) => host.process_entry(entry).await,
1098                    Err(err) => error!(%err, "failed to read entry from lattice data bucket"),
1099                }
1100            })
1101            .await;
1102
1103        host.publish_event("host_started", start_evt)
1104            .await
1105            .context("failed to publish start event")?;
1106        info!(
1107            host_id = host.host_key.public_key(),
1108            "wasmCloud host started"
1109        );
1110
1111        Ok((Arc::clone(&host), async move {
1112            ready.store(false, Ordering::Relaxed);
1113            heartbeat_abort.abort();
1114            queue_abort.abort();
1115            data_watch_abort.abort();
1116            host.policy_manager.policy_changes.abort();
1117            let _ = try_join!(queue, data_watch, heartbeat).context("failed to await tasks")?;
1118            host.publish_event(
1119                "host_stopped",
1120                json!({
1121                    "labels": *host.labels.read().await,
1122                }),
1123            )
1124            .await
1125            .context("failed to publish stop event")?;
1126            // Before we exit, make sure to flush all messages or we may lose some that we've
1127            // thought were sent (like the host_stopped event)
1128            try_join!(host.ctl_nats.flush(), host.rpc_nats.flush())
1129                .context("failed to flush NATS clients")?;
1130            Ok(())
1131        }))
1132    }
1133
1134    /// Waits for host to be stopped via lattice commands and returns the shutdown deadline on
1135    /// success
1136    ///
1137    /// # Errors
1138    ///
1139    /// Returns an error if internal stop channel is closed prematurely
1140    #[instrument(level = "debug", skip_all)]
1141    pub async fn stopped(&self) -> anyhow::Result<Option<Instant>> {
1142        self.stop_rx
1143            .clone()
1144            .changed()
1145            .await
1146            .context("failed to wait for stop")?;
1147        Ok(*self.stop_rx.borrow())
1148    }
1149
1150    #[instrument(level = "debug", skip_all)]
1151    async fn inventory(&self) -> HostInventory {
1152        trace!("generating host inventory");
1153        let components: Vec<_> = {
1154            let components = self.components.read().await;
1155            stream::iter(components.iter())
1156                .filter_map(|(id, component)| async move {
1157                    let mut description = ComponentDescription::builder()
1158                        .id(id.into())
1159                        .image_ref(component.image_reference.to_string())
1160                        .annotations(component.annotations.clone().into_iter().collect())
1161                        .max_instances(component.max_instances.get().try_into().unwrap_or(u32::MAX))
1162                        .limits(component.limits.map(|limits| limits.to_string_map()))
1163                        .revision(
1164                            component
1165                                .claims()
1166                                .and_then(|claims| claims.metadata.as_ref())
1167                                .and_then(|jwt::Component { rev, .. }| *rev)
1168                                .unwrap_or_default(),
1169                        );
1170                    // Add name if present
1171                    if let Some(name) = component
1172                        .claims()
1173                        .and_then(|claims| claims.metadata.as_ref())
1174                        .and_then(|metadata| metadata.name.as_ref())
1175                        .cloned()
1176                    {
1177                        description = description.name(name);
1178                    };
1179
1180                    Some(
1181                        description
1182                            .build()
1183                            .expect("failed to build component description: {e}"),
1184                    )
1185                })
1186                .collect()
1187                .await
1188        };
1189
1190        let providers: Vec<_> = self
1191            .providers
1192            .read()
1193            .await
1194            .iter()
1195            .map(
1196                |(
1197                    provider_id,
1198                    Provider {
1199                        annotations,
1200                        claims_token,
1201                        image_ref,
1202                        ..
1203                    },
1204                )| {
1205                    let mut provider_description = ProviderDescription::builder()
1206                        .id(provider_id)
1207                        .image_ref(image_ref);
1208                    if let Some(name) = claims_token
1209                        .as_ref()
1210                        .and_then(|claims| claims.claims.metadata.as_ref())
1211                        .and_then(|metadata| metadata.name.as_ref())
1212                    {
1213                        provider_description = provider_description.name(name);
1214                    }
1215                    provider_description
1216                        .annotations(
1217                            annotations
1218                                .clone()
1219                                .into_iter()
1220                                .collect::<BTreeMap<String, String>>(),
1221                        )
1222                        .revision(
1223                            claims_token
1224                                .as_ref()
1225                                .and_then(|claims| claims.claims.metadata.as_ref())
1226                                .and_then(|jwt::CapabilityProvider { rev, .. }| *rev)
1227                                .unwrap_or_default(),
1228                        )
1229                        .build()
1230                        .expect("failed to build provider description")
1231                },
1232            )
1233            .collect();
1234
1235        let uptime = self.start_at.elapsed();
1236        HostInventory::builder()
1237            .components(components)
1238            .providers(providers)
1239            .friendly_name(self.friendly_name.clone())
1240            .labels(self.labels.read().await.clone())
1241            .uptime_human(human_friendly_uptime(uptime))
1242            .uptime_seconds(uptime.as_secs())
1243            .version(self.host_config.version.clone())
1244            .host_id(self.host_key.public_key())
1245            .build()
1246            .expect("failed to build host inventory")
1247    }
1248
1249    #[instrument(level = "debug", skip_all)]
1250    async fn heartbeat(&self) -> anyhow::Result<serde_json::Value> {
1251        trace!("generating heartbeat");
1252        Ok(serde_json::to_value(self.inventory().await)?)
1253    }
1254
1255    #[instrument(level = "debug", skip(self))]
1256    async fn publish_event(&self, name: &str, data: serde_json::Value) -> anyhow::Result<()> {
1257        event::publish(
1258            &self.event_builder,
1259            &self.ctl_nats,
1260            &self.host_config.lattice,
1261            name,
1262            data,
1263        )
1264        .await
1265    }
1266
1267    /// Instantiate a component
1268    #[allow(clippy::too_many_arguments)] // TODO: refactor into a config struct
1269    #[instrument(level = "debug", skip_all)]
1270    async fn instantiate_component(
1271        &self,
1272        annotations: &Annotations,
1273        image_reference: Arc<str>,
1274        id: Arc<str>,
1275        max_instances: NonZeroUsize,
1276        limits: Option<Limits>,
1277        mut component: wasmcloud_runtime::Component<Handler>,
1278        handler: Handler,
1279    ) -> anyhow::Result<Arc<Component>> {
1280        trace!(
1281            component_ref = ?image_reference,
1282            max_instances,
1283            "instantiating component"
1284        );
1285
1286        let max_execution_time = self.max_execution_time; // TODO: Needs approval to go ahead.
1287        component.set_max_execution_time(max_execution_time);
1288
1289        let (events_tx, mut events_rx) = mpsc::channel(
1290            max_instances
1291                .get()
1292                .clamp(MIN_INVOCATION_CHANNEL_SIZE, MAX_INVOCATION_CHANNEL_SIZE),
1293        );
1294        let prefix = Arc::from(format!("{}.{id}", &self.host_config.lattice));
1295        let nats = wrpc_transport_nats::Client::new(
1296            Arc::clone(&self.rpc_nats),
1297            Arc::clone(&prefix),
1298            Some(prefix),
1299        )
1300        .await?;
1301        let exports = component
1302            .serve_wrpc(
1303                &WrpcServer {
1304                    nats,
1305                    claims: component.claims().cloned().map(Arc::new),
1306                    id: Arc::clone(&id),
1307                    image_reference: Arc::clone(&image_reference),
1308                    annotations: Arc::new(annotations.clone()),
1309                    policy_manager: Arc::clone(&self.policy_manager),
1310                    metrics: Arc::clone(&self.metrics),
1311                },
1312                handler.clone(),
1313                events_tx.clone(),
1314            )
1315            .await?;
1316        let permits = Arc::new(Semaphore::new(
1317            usize::from(max_instances).min(Semaphore::MAX_PERMITS),
1318        ));
1319        let component_attributes = Arc::new(vec![
1320            KeyValue::new("component.id", id.to_string()),
1321            KeyValue::new("component.ref", image_reference.to_string()),
1322            KeyValue::new("lattice", self.host_config.lattice.clone()),
1323            KeyValue::new("host", self.host_key.public_key()),
1324        ]);
1325        self.metrics
1326            .set_max_instances(max_instances.get() as u64, &component_attributes);
1327
1328        let metrics = Arc::clone(&self.metrics);
1329        Ok(Arc::new(Component {
1330            component,
1331            id: Arc::clone(&id),
1332            handler,
1333            events: events_tx,
1334            permits: Arc::clone(&permits),
1335            exports: spawn(async move {
1336                // Since we are joining two `move` closures, we need two separate `Arc`s
1337                let metrics_left = Arc::clone(&metrics);
1338                let metrics_right = Arc::clone(&metrics);
1339                join!(
1340                    async move {
1341                        let mut exports = stream::select_all(exports);
1342                        loop {
1343                            let metrics_left = Arc::clone(&metrics_left);
1344                            let component_attributes = Arc::clone(&component_attributes);
1345                            let permits = Arc::clone(&permits);
1346                            if let Some(fut) = exports.next().await {
1347                                match fut {
1348                                    Ok(fut) => {
1349                                        debug!("accepted invocation, acquiring permit");
1350                                        let permit = permits.acquire_owned().await;
1351
1352                                        // Record that an instance is active
1353                                        metrics_left
1354                                            .increment_active_instance(&component_attributes);
1355                                        spawn(async move {
1356                                            let _permit = permit;
1357                                            debug!("handling invocation");
1358                                            // Awaiting this future drives the execution of the component
1359                                            let result = timeout(max_execution_time, fut).await;
1360                                            metrics_left
1361                                                .decrement_active_instance(&component_attributes);
1362
1363                                            match result {
1364                                                Ok(Ok(())) => {
1365                                                    debug!("successfully handled invocation");
1366                                                    Ok(())
1367                                                }
1368                                                Ok(Err(err)) => {
1369                                                    warn!(?err, "failed to handle invocation");
1370                                                    Err(err)
1371                                                }
1372                                                Err(_err) => {
1373                                                    warn!("component invocation timed out");
1374                                                    Err(anyhow::anyhow!(
1375                                                        "component invocation timed out"
1376                                                    ))
1377                                                }
1378                                            }
1379                                        });
1380                                    }
1381                                    Err(err) => {
1382                                        warn!(?err, "failed to accept invocation")
1383                                    }
1384                                }
1385                            }
1386                        }
1387                    },
1388                    async move {
1389                        while let Some(evt) = events_rx.recv().await {
1390                            match evt {
1391                                WrpcServeEvent::HttpIncomingHandlerHandleReturned {
1392                                    context:
1393                                        InvocationContext {
1394                                            start_at,
1395                                            ref attributes,
1396                                            ..
1397                                        },
1398                                    success,
1399                                }
1400                                | WrpcServeEvent::MessagingHandlerHandleMessageReturned {
1401                                    context:
1402                                        InvocationContext {
1403                                            start_at,
1404                                            ref attributes,
1405                                            ..
1406                                        },
1407                                    success,
1408                                }
1409                                | WrpcServeEvent::DynamicExportReturned {
1410                                    context:
1411                                        InvocationContext {
1412                                            start_at,
1413                                            ref attributes,
1414                                            ..
1415                                        },
1416                                    success,
1417                                } => metrics_right.record_component_invocation(
1418                                    u64::try_from(start_at.elapsed().as_nanos())
1419                                        .unwrap_or_default(),
1420                                    attributes,
1421                                    !success,
1422                                ),
1423                            }
1424                        }
1425                        debug!("serving event stream is done");
1426                    },
1427                );
1428                debug!("export serving task done");
1429            }),
1430            annotations: annotations.clone(),
1431            max_instances,
1432            limits,
1433            image_reference: Arc::clone(&image_reference),
1434        }))
1435    }
1436
1437    #[allow(clippy::too_many_arguments)]
1438    #[instrument(level = "debug", skip_all)]
1439    async fn start_component<'a>(
1440        &self,
1441        entry: hash_map::VacantEntry<'a, String, Arc<Component>>,
1442        wasm: &[u8],
1443        claims: Option<jwt::Claims<jwt::Component>>,
1444        component_ref: Arc<str>,
1445        component_id: Arc<str>,
1446        max_instances: NonZeroUsize,
1447        limits: Option<Limits>,
1448        annotations: &Annotations,
1449        config: ConfigBundle,
1450        secrets: HashMap<String, SecretBox<SecretValue>>,
1451    ) -> anyhow::Result<&'a mut Arc<Component>> {
1452        debug!(?component_ref, ?max_instances, "starting new component");
1453
1454        if let Some(ref claims) = claims {
1455            self.store_claims(Claims::Component(claims.clone()))
1456                .await
1457                .context("failed to store claims")?;
1458        }
1459
1460        let component_spec = self
1461            .get_component_spec(&component_id)
1462            .await?
1463            .unwrap_or_else(|| ComponentSpecification::new(&component_ref));
1464        self.store_component_spec(&component_id, &component_spec)
1465            .await?;
1466
1467        // Map the imports to pull out the result types of the functions for lookup when invoking them
1468        let handler = Handler {
1469            nats: Arc::clone(&self.rpc_nats),
1470            config_data: Arc::new(RwLock::new(config)),
1471            lattice: Arc::clone(&self.host_config.lattice),
1472            component_id: Arc::clone(&component_id),
1473            secrets: Arc::new(RwLock::new(secrets)),
1474            targets: Arc::default(),
1475            instance_links: Arc::new(RwLock::new(component_import_links(&component_spec.links))),
1476            messaging_links: {
1477                let mut links = self.messaging_links.write().await;
1478                Arc::clone(links.entry(Arc::clone(&component_id)).or_default())
1479            },
1480            invocation_timeout: Duration::from_secs(10), // TODO: Make this configurable
1481            experimental_features: self.experimental_features,
1482            host_labels: Arc::clone(&self.labels),
1483        };
1484        let component = wasmcloud_runtime::Component::new(&self.runtime, wasm, limits)?;
1485        let component = self
1486            .instantiate_component(
1487                annotations,
1488                Arc::clone(&component_ref),
1489                Arc::clone(&component_id),
1490                max_instances,
1491                limits,
1492                component,
1493                handler,
1494            )
1495            .await
1496            .context("failed to instantiate component")?;
1497
1498        info!(?component_ref, "component started");
1499        self.publish_event(
1500            "component_scaled",
1501            event::component_scaled(
1502                claims.as_ref(),
1503                annotations,
1504                self.host_key.public_key(),
1505                max_instances,
1506                &component_ref,
1507                &component_id,
1508            ),
1509        )
1510        .await?;
1511
1512        Ok(entry.insert(component))
1513    }
1514
1515    #[instrument(level = "debug", skip_all)]
1516    async fn stop_component(&self, component: &Component, _host_id: &str) -> anyhow::Result<()> {
1517        trace!(component_id = %component.id, "stopping component");
1518
1519        component.exports.abort();
1520
1521        Ok(())
1522    }
1523
1524    #[instrument(level = "trace", skip_all)]
1525    async fn fetch_component(&self, component_ref: &str) -> anyhow::Result<Vec<u8>> {
1526        let registry_config = self.registry_config.read().await;
1527        fetch_component(
1528            component_ref,
1529            self.host_config.allow_file_load,
1530            &self.host_config.oci_opts,
1531            &registry_config,
1532        )
1533        .await
1534        .context("failed to fetch component")
1535    }
1536
1537    #[instrument(level = "trace", skip_all)]
1538    async fn store_component_claims(
1539        &self,
1540        claims: jwt::Claims<jwt::Component>,
1541    ) -> anyhow::Result<()> {
1542        let mut component_claims = self.component_claims.write().await;
1543        component_claims.insert(claims.subject.clone(), claims);
1544        Ok(())
1545    }
1546
1547    #[instrument(level = "debug", skip_all)]
1548    async fn handle_auction_component(
1549        &self,
1550        payload: impl AsRef<[u8]>,
1551    ) -> anyhow::Result<Option<CtlResponse<ComponentAuctionAck>>> {
1552        let request = serde_json::from_slice::<ComponentAuctionRequest>(payload.as_ref())
1553            .context("failed to deserialize component auction command")?;
1554        <Self as ControlInterfaceServer>::handle_auction_component(self, request).await
1555    }
1556
1557    #[instrument(level = "debug", skip_all)]
1558    async fn handle_auction_provider(
1559        &self,
1560        payload: impl AsRef<[u8]>,
1561    ) -> anyhow::Result<Option<CtlResponse<ProviderAuctionAck>>> {
1562        let request = serde_json::from_slice::<ProviderAuctionRequest>(payload.as_ref())
1563            .context("failed to deserialize provider auction command")?;
1564        <Self as ControlInterfaceServer>::handle_auction_provider(self, request).await
1565    }
1566
1567    #[instrument(level = "debug", skip_all)]
1568    async fn handle_stop_host(
1569        &self,
1570        payload: impl AsRef<[u8]>,
1571        transport_host_id: &str,
1572    ) -> anyhow::Result<CtlResponse<()>> {
1573        // Allow an empty payload to be used for stopping hosts
1574        let timeout = if payload.as_ref().is_empty() {
1575            None
1576        } else {
1577            let cmd = serde_json::from_slice::<StopHostCommand>(payload.as_ref())
1578                .context("failed to deserialize stop command")?;
1579            let timeout = cmd.timeout();
1580            let host_id = cmd.host_id();
1581
1582            // If the Host ID was provided (i..e not the empty string, due to #[serde(default)]), then
1583            // we should check it against the known transport-provided host_id, and this actual host's ID
1584            if !host_id.is_empty() {
1585                anyhow::ensure!(
1586                    host_id == transport_host_id && host_id == self.host_key.public_key(),
1587                    "invalid host_id [{host_id}]"
1588                );
1589            }
1590            timeout
1591        };
1592
1593        // It *should* be impossible for the transport-derived host ID to not match at this point
1594        anyhow::ensure!(
1595            transport_host_id == self.host_key.public_key(),
1596            "invalid host_id [{transport_host_id}]"
1597        );
1598
1599        let mut stop_command = StopHostCommand::builder().host_id(transport_host_id);
1600        if let Some(timeout) = timeout {
1601            stop_command = stop_command.timeout(timeout);
1602        }
1603        <Self as ControlInterfaceServer>::handle_stop_host(
1604            self,
1605            stop_command
1606                .build()
1607                .map_err(|e| anyhow!(e))
1608                .context("failed to build stop host command")?,
1609        )
1610        .await
1611    }
1612
1613    #[instrument(level = "debug", skip_all)]
1614    async fn handle_scale_component(
1615        self: Arc<Self>,
1616        payload: impl AsRef<[u8]>,
1617    ) -> anyhow::Result<CtlResponse<()>> {
1618        let request = serde_json::from_slice::<ScaleComponentCommand>(payload.as_ref())
1619            .context("failed to deserialize component scale command")?;
1620        <Self as ControlInterfaceServer>::handle_scale_component(self, request).await
1621    }
1622
1623    #[instrument(level = "debug", skip_all)]
1624    /// Handles scaling an component to a supplied number of `max` concurrently executing instances.
1625    /// Supplying `0` will result in stopping that component instance.
1626    #[allow(clippy::too_many_arguments)]
1627    async fn handle_scale_component_task(
1628        &self,
1629        component_ref: Arc<str>,
1630        component_id: Arc<str>,
1631        host_id: &str,
1632        max_instances: u32,
1633        component_limits: Option<HashMap<String, String>>,
1634        annotations: &Annotations,
1635        config: Vec<String>,
1636        wasm: anyhow::Result<Vec<u8>>,
1637        claims_token: Option<&jwt::Token<jwt::Component>>,
1638    ) -> anyhow::Result<()> {
1639        trace!(?component_ref, max_instances, "scale component task");
1640
1641        let claims = claims_token.map(|c| c.claims.clone());
1642        match self
1643            .policy_manager
1644            .evaluate_start_component(
1645                &component_id,
1646                &component_ref,
1647                max_instances,
1648                annotations,
1649                claims.as_ref(),
1650            )
1651            .await?
1652        {
1653            PolicyResponse {
1654                permitted: false,
1655                message: Some(message),
1656                ..
1657            } => bail!("Policy denied request to scale component `{component_id}`: `{message:?}`"),
1658            PolicyResponse {
1659                permitted: false, ..
1660            } => bail!("Policy denied request to scale component `{component_id}`"),
1661            PolicyResponse {
1662                permitted: true, ..
1663            } => (),
1664        };
1665
1666        let limits: Option<Limits> = from_string_map(component_limits.as_ref());
1667
1668        let scaled_event = match (
1669            self.components
1670                .write()
1671                .await
1672                .entry(component_id.to_string()),
1673            NonZeroUsize::new(max_instances as usize),
1674        ) {
1675            // No component is running and we requested to scale to zero, noop.
1676            // We still publish the event to indicate that the component has been scaled to zero
1677            (hash_map::Entry::Vacant(_), None) => event::component_scaled(
1678                claims.as_ref(),
1679                annotations,
1680                host_id,
1681                0_usize,
1682                &component_ref,
1683                &component_id,
1684            ),
1685            // No component is running and we requested to scale to some amount, start with specified max
1686            (hash_map::Entry::Vacant(entry), Some(max)) => {
1687                let (config, secrets) = self
1688                    .fetch_config_and_secrets(
1689                        &config,
1690                        claims_token.as_ref().map(|c| &c.jwt),
1691                        annotations.get("wasmcloud.dev/appspec"),
1692                    )
1693                    .await?;
1694                match &wasm {
1695                    Ok(wasm) => {
1696                        self.start_component(
1697                            entry,
1698                            wasm,
1699                            claims.clone(),
1700                            Arc::clone(&component_ref),
1701                            Arc::clone(&component_id),
1702                            max,
1703                            limits,
1704                            annotations,
1705                            config,
1706                            secrets,
1707                        )
1708                        .await?;
1709
1710                        event::component_scaled(
1711                            claims.as_ref(),
1712                            annotations,
1713                            host_id,
1714                            max,
1715                            &component_ref,
1716                            &component_id,
1717                        )
1718                    }
1719                    Err(e) => {
1720                        error!(%component_ref, %component_id, err = ?e, "failed to scale component");
1721                        if let Err(e) = self
1722                            .publish_event(
1723                                "component_scale_failed",
1724                                event::component_scale_failed(
1725                                    claims_token.map(|c| c.claims.clone()).as_ref(),
1726                                    annotations,
1727                                    host_id,
1728                                    &component_ref,
1729                                    &component_id,
1730                                    max_instances,
1731                                    e,
1732                                ),
1733                            )
1734                            .await
1735                        {
1736                            error!(%component_ref, %component_id, err = ?e, "failed to publish component scale failed event");
1737                        }
1738                        return Ok(());
1739                    }
1740                }
1741            }
1742            // Component is running and we requested to scale to zero instances, stop component
1743            (hash_map::Entry::Occupied(entry), None) => {
1744                let component = entry.remove();
1745                self.stop_component(&component, host_id)
1746                    .await
1747                    .context("failed to stop component in response to scale to zero")?;
1748
1749                info!(?component_ref, "component stopped");
1750                event::component_scaled(
1751                    claims.as_ref(),
1752                    &component.annotations,
1753                    host_id,
1754                    0_usize,
1755                    &component.image_reference,
1756                    &component.id,
1757                )
1758            }
1759            // Component is running and we requested to scale to some amount or unbounded, scale component
1760            (hash_map::Entry::Occupied(mut entry), Some(max)) => {
1761                let component = entry.get_mut();
1762                let config_changed =
1763                    &config != component.handler.config_data.read().await.config_names();
1764
1765                // Create the event first to avoid borrowing the component
1766                // This event is idempotent.
1767                let event = event::component_scaled(
1768                    claims.as_ref(),
1769                    &component.annotations,
1770                    host_id,
1771                    max,
1772                    &component.image_reference,
1773                    &component.id,
1774                );
1775
1776                // Modify scale only if the requested max differs from the current max or if the configuration has changed
1777                if component.max_instances != max || config_changed {
1778                    // We must partially clone the handler as we can't be sharing the targets between components
1779                    let handler = component.handler.copy_for_new();
1780                    if config_changed {
1781                        let (config, secrets) = self
1782                            .fetch_config_and_secrets(
1783                                &config,
1784                                claims_token.as_ref().map(|c| &c.jwt),
1785                                annotations.get("wasmcloud.dev/appspec"),
1786                            )
1787                            .await?;
1788                        *handler.config_data.write().await = config;
1789                        *handler.secrets.write().await = secrets;
1790                    }
1791                    let instance = self
1792                        .instantiate_component(
1793                            annotations,
1794                            Arc::clone(&component_ref),
1795                            Arc::clone(&component.id),
1796                            max,
1797                            limits,
1798                            component.component.clone(),
1799                            handler,
1800                        )
1801                        .await
1802                        .context("failed to instantiate component")?;
1803                    let component = entry.insert(instance);
1804                    self.stop_component(&component, host_id)
1805                        .await
1806                        .context("failed to stop component after scaling")?;
1807
1808                    info!(?component_ref, ?max, "component scaled");
1809                } else {
1810                    debug!(?component_ref, ?max, "component already at desired scale");
1811                }
1812                event
1813            }
1814        };
1815
1816        self.publish_event("component_scaled", scaled_event).await?;
1817
1818        Ok(())
1819    }
1820
1821    // TODO(#1548): With component IDs, new component references, configuration, etc, we're going to need to do some
1822    // design thinking around how update component should work. Should it be limited to a single host or latticewide?
1823    // Should it also update configuration, or is that separate? Should scaling be done via an update?
1824    #[instrument(level = "debug", skip_all)]
1825    async fn handle_update_component(
1826        self: Arc<Self>,
1827        payload: impl AsRef<[u8]>,
1828    ) -> anyhow::Result<CtlResponse<()>> {
1829        let cmd = serde_json::from_slice::<UpdateComponentCommand>(payload.as_ref())
1830            .context("failed to deserialize component update command")?;
1831        <Self as ControlInterfaceServer>::handle_update_component(self, cmd).await
1832    }
1833
1834    async fn handle_update_component_task(
1835        &self,
1836        component_id: Arc<str>,
1837        new_component_ref: Arc<str>,
1838        host_id: &str,
1839        annotations: Option<BTreeMap<String, String>>,
1840    ) -> anyhow::Result<()> {
1841        // NOTE: This block is specifically scoped to ensure we drop the read lock on `self.components` before
1842        // we attempt to grab a write lock.
1843        let component = {
1844            let components = self.components.read().await;
1845            let existing_component = components
1846                .get(&*component_id)
1847                .context("component not found")?;
1848            let annotations = annotations.unwrap_or_default().into_iter().collect();
1849
1850            // task is a no-op if the component image reference is the same
1851            if existing_component.image_reference == new_component_ref {
1852                info!(%component_id, %new_component_ref, "component already updated");
1853                return Ok(());
1854            }
1855
1856            let new_component = self.fetch_component(&new_component_ref).await?;
1857            let new_component = wasmcloud_runtime::Component::new(
1858                &self.runtime,
1859                &new_component,
1860                existing_component.limits,
1861            )
1862            .context("failed to initialize component")?;
1863            let new_claims = new_component.claims().cloned();
1864            if let Some(ref claims) = new_claims {
1865                self.store_claims(Claims::Component(claims.clone()))
1866                    .await
1867                    .context("failed to store claims")?;
1868            }
1869
1870            let max = existing_component.max_instances;
1871            let Ok(component) = self
1872                .instantiate_component(
1873                    &annotations,
1874                    Arc::clone(&new_component_ref),
1875                    Arc::clone(&component_id),
1876                    max,
1877                    existing_component.limits,
1878                    new_component,
1879                    existing_component.handler.copy_for_new(),
1880                )
1881                .await
1882            else {
1883                bail!("failed to instantiate component from new reference");
1884            };
1885
1886            info!(%new_component_ref, "component updated");
1887            self.publish_event(
1888                "component_scaled",
1889                event::component_scaled(
1890                    new_claims.as_ref(),
1891                    &component.annotations,
1892                    host_id,
1893                    max,
1894                    new_component_ref,
1895                    &component_id,
1896                ),
1897            )
1898            .await?;
1899
1900            // TODO(#1548): If this errors, we need to rollback
1901            self.stop_component(&component, host_id)
1902                .await
1903                .context("failed to stop old component")?;
1904            self.publish_event(
1905                "component_scaled",
1906                event::component_scaled(
1907                    component.claims(),
1908                    &component.annotations,
1909                    host_id,
1910                    0_usize,
1911                    &component.image_reference,
1912                    &component.id,
1913                ),
1914            )
1915            .await?;
1916
1917            component
1918        };
1919
1920        self.components
1921            .write()
1922            .await
1923            .insert(component_id.to_string(), component);
1924        Ok(())
1925    }
1926
1927    #[instrument(level = "debug", skip_all)]
1928    async fn handle_start_provider(
1929        self: Arc<Self>,
1930        payload: impl AsRef<[u8]>,
1931    ) -> anyhow::Result<Option<CtlResponse<()>>> {
1932        let cmd = serde_json::from_slice::<StartProviderCommand>(payload.as_ref())
1933            .context("failed to deserialize provider start command")?;
1934        <Self as ControlInterfaceServer>::handle_start_provider(self, cmd).await
1935    }
1936
1937    #[instrument(level = "debug", skip_all)]
1938    async fn handle_start_provider_task(
1939        self: Arc<Self>,
1940        config_names: &[String],
1941        provider_id: &str,
1942        provider_ref: &str,
1943        annotations: BTreeMap<String, String>,
1944        host_id: &str,
1945    ) -> anyhow::Result<()> {
1946        trace!(provider_ref, provider_id, "start provider task");
1947
1948        let registry_config = self.registry_config.read().await;
1949        let provider_ref =
1950            ResourceRef::try_from(provider_ref).context("failed to parse provider reference")?;
1951        let (path, claims_token) = match &provider_ref {
1952            ResourceRef::Builtin(..) => (None, None),
1953            _ => {
1954                let (path, claims_token) = crate::fetch_provider(
1955                    &provider_ref,
1956                    host_id,
1957                    self.host_config.allow_file_load,
1958                    &self.host_config.oci_opts,
1959                    &registry_config,
1960                )
1961                .await
1962                .context("failed to fetch provider")?;
1963                (Some(path), claims_token)
1964            }
1965        };
1966        let claims = claims_token.as_ref().map(|t| t.claims.clone());
1967
1968        if let Some(claims) = claims.clone() {
1969            self.store_claims(Claims::Provider(claims))
1970                .await
1971                .context("failed to store claims")?;
1972        }
1973
1974        let annotations: Annotations = annotations.into_iter().collect();
1975
1976        let PolicyResponse {
1977            permitted,
1978            request_id,
1979            message,
1980        } = self
1981            .policy_manager
1982            .evaluate_start_provider(
1983                provider_id,
1984                provider_ref.as_ref(),
1985                &annotations,
1986                claims.as_ref(),
1987            )
1988            .await?;
1989        ensure!(
1990            permitted,
1991            "policy denied request to start provider `{request_id}`: `{message:?}`",
1992        );
1993
1994        let component_specification = self
1995            .get_component_spec(provider_id)
1996            .await?
1997            .unwrap_or_else(|| ComponentSpecification::new(provider_ref.as_ref()));
1998
1999        self.store_component_spec(&provider_id, &component_specification)
2000            .await?;
2001
2002        let mut providers = self.providers.write().await;
2003        if let hash_map::Entry::Vacant(entry) = providers.entry(provider_id.into()) {
2004            let provider_xkey = XKey::new();
2005            // We only need to store the public key of the provider xkey, as the private key is only needed by the provider
2006            let xkey = XKey::from_public_key(&provider_xkey.public_key())
2007                .context("failed to create XKey from provider public key xkey")?;
2008            // Generate the HostData and ConfigBundle for the provider
2009            let (host_data, config_bundle) = self
2010                .prepare_provider_config(
2011                    config_names,
2012                    claims_token.as_ref(),
2013                    provider_id,
2014                    &provider_xkey,
2015                    &annotations,
2016                )
2017                .await?;
2018            let config_bundle = Arc::new(RwLock::new(config_bundle));
2019            // Used by provider child tasks (health check, config watch, process restarter) to
2020            // know when to shutdown.
2021            let shutdown = Arc::new(AtomicBool::new(false));
2022            let tasks = match (path, &provider_ref) {
2023                (Some(path), ..) => {
2024                    Arc::clone(&self)
2025                        .start_binary_provider(
2026                            path,
2027                            host_data,
2028                            Arc::clone(&config_bundle),
2029                            provider_xkey,
2030                            provider_id,
2031                            // Arguments to allow regenerating configuration later
2032                            config_names.to_vec(),
2033                            claims_token.clone(),
2034                            annotations.clone(),
2035                            shutdown.clone(),
2036                        )
2037                        .await?
2038                }
2039                (None, ResourceRef::Builtin(name)) => match *name {
2040                    "http-client" if self.experimental_features.builtin_http_client => {
2041                        self.start_http_client_provider(host_data, provider_xkey, provider_id)
2042                            .await?
2043                    }
2044                    "http-client" => {
2045                        bail!("feature `builtin-http-client` is not enabled, denying start")
2046                    }
2047                    "http-server" if self.experimental_features.builtin_http_server => {
2048                        self.start_http_server_provider(host_data, provider_xkey, provider_id)
2049                            .await?
2050                    }
2051                    "http-server" => {
2052                        bail!("feature `builtin-http-server` is not enabled, denying start")
2053                    }
2054                    "messaging-nats" if self.experimental_features.builtin_messaging_nats => {
2055                        self.start_messaging_nats_provider(host_data, provider_xkey, provider_id)
2056                            .await?
2057                    }
2058                    "messaging-nats" => {
2059                        bail!("feature `builtin-messaging-nats` is not enabled, denying start")
2060                    }
2061                    _ => bail!("unknown builtin name: {name}"),
2062                },
2063                _ => bail!("invalid provider reference"),
2064            };
2065
2066            info!(
2067                provider_ref = provider_ref.as_ref(),
2068                provider_id, "provider started"
2069            );
2070            self.publish_event(
2071                "provider_started",
2072                event::provider_started(
2073                    claims.as_ref(),
2074                    &annotations,
2075                    host_id,
2076                    &provider_ref,
2077                    provider_id,
2078                ),
2079            )
2080            .await?;
2081
2082            // Add the provider
2083            entry.insert(Provider {
2084                tasks,
2085                annotations,
2086                claims_token,
2087                image_ref: provider_ref.as_ref().to_string(),
2088                xkey,
2089                shutdown,
2090            });
2091        } else {
2092            bail!("provider is already running with that ID")
2093        }
2094
2095        Ok(())
2096    }
2097
2098    #[instrument(level = "debug", skip_all)]
2099    async fn handle_stop_provider(
2100        &self,
2101        payload: impl AsRef<[u8]>,
2102    ) -> anyhow::Result<CtlResponse<()>> {
2103        let cmd = serde_json::from_slice::<StopProviderCommand>(payload.as_ref())
2104            .context("failed to deserialize provider stop command")?;
2105        <Self as ControlInterfaceServer>::handle_stop_provider(self, cmd).await
2106    }
2107
2108    #[instrument(level = "debug", skip_all)]
2109    async fn handle_inventory(&self) -> anyhow::Result<CtlResponse<HostInventory>> {
2110        <Self as ControlInterfaceServer>::handle_inventory(self).await
2111    }
2112
2113    #[instrument(level = "trace", skip_all)]
2114    async fn handle_claims(&self) -> anyhow::Result<CtlResponse<Vec<HashMap<String, String>>>> {
2115        <Self as ControlInterfaceServer>::handle_claims(self).await
2116    }
2117
2118    #[instrument(level = "trace", skip_all)]
2119    async fn handle_links(&self) -> anyhow::Result<Vec<u8>> {
2120        <Self as ControlInterfaceServer>::handle_links(self).await
2121    }
2122
2123    #[instrument(level = "trace", skip(self))]
2124    async fn handle_config_get(&self, config_name: &str) -> anyhow::Result<Vec<u8>> {
2125        <Self as ControlInterfaceServer>::handle_config_get(self, config_name).await
2126    }
2127
2128    #[instrument(level = "debug", skip_all)]
2129    async fn handle_label_put(
2130        &self,
2131        host_id: &str,
2132        payload: impl AsRef<[u8]>,
2133    ) -> anyhow::Result<CtlResponse<()>> {
2134        let host_label = serde_json::from_slice::<HostLabel>(payload.as_ref())
2135            .context("failed to deserialize put label request")?;
2136        <Self as ControlInterfaceServer>::handle_label_put(self, host_label, host_id).await
2137    }
2138
2139    #[instrument(level = "debug", skip_all)]
2140    async fn handle_label_del(
2141        &self,
2142        host_id: &str,
2143        payload: impl AsRef<[u8]>,
2144    ) -> anyhow::Result<CtlResponse<()>> {
2145        let label = serde_json::from_slice::<HostLabelIdentifier>(payload.as_ref())
2146            .context("failed to deserialize delete label request")?;
2147        <Self as ControlInterfaceServer>::handle_label_del(self, label, host_id).await
2148    }
2149
2150    /// Handle a new link by modifying the relevant source [ComponentSpecification]. Once
2151    /// the change is written to the LATTICEDATA store, each host in the lattice (including this one)
2152    /// will handle the new specification and update their own internal link maps via [process_component_spec_put].
2153    #[instrument(level = "debug", skip_all)]
2154    async fn handle_link_put(&self, payload: impl AsRef<[u8]>) -> anyhow::Result<CtlResponse<()>> {
2155        let link: Link = serde_json::from_slice(payload.as_ref())
2156            .context("failed to deserialize wrpc link definition")?;
2157        <Self as ControlInterfaceServer>::handle_link_put(self, link).await
2158    }
2159
2160    #[instrument(level = "debug", skip_all)]
2161    /// Remove an interface link on a source component for a specific package
2162    async fn handle_link_del(&self, payload: impl AsRef<[u8]>) -> anyhow::Result<CtlResponse<()>> {
2163        let req = serde_json::from_slice::<DeleteInterfaceLinkDefinitionRequest>(payload.as_ref())
2164            .context("failed to deserialize wrpc link definition")?;
2165        <Self as ControlInterfaceServer>::handle_link_del(self, req).await
2166    }
2167
2168    #[instrument(level = "debug", skip_all)]
2169    async fn handle_registries_put(
2170        &self,
2171        payload: impl AsRef<[u8]>,
2172    ) -> anyhow::Result<CtlResponse<()>> {
2173        let registry_creds: HashMap<String, RegistryCredential> =
2174            serde_json::from_slice(payload.as_ref())
2175                .context("failed to deserialize registries put command")?;
2176        <Self as ControlInterfaceServer>::handle_registries_put(self, registry_creds).await
2177    }
2178
2179    #[instrument(level = "debug", skip_all, fields(%config_name))]
2180    async fn handle_config_put(
2181        &self,
2182        config_name: &str,
2183        data: Bytes,
2184    ) -> anyhow::Result<CtlResponse<()>> {
2185        // Validate that the data is of the proper type by deserialing it
2186        serde_json::from_slice::<HashMap<String, String>>(&data)
2187            .context("config data should be a map of string -> string")?;
2188        <Self as ControlInterfaceServer>::handle_config_put(self, config_name, data).await
2189    }
2190
2191    #[instrument(level = "debug", skip_all, fields(%config_name))]
2192    async fn handle_config_delete(&self, config_name: &str) -> anyhow::Result<CtlResponse<()>> {
2193        <Self as ControlInterfaceServer>::handle_config_delete(self, config_name).await
2194    }
2195
2196    #[instrument(level = "debug", skip_all)]
2197    async fn handle_ping_hosts(
2198        &self,
2199    ) -> anyhow::Result<CtlResponse<wasmcloud_control_interface::Host>> {
2200        <Self as ControlInterfaceServer>::handle_ping_hosts(self).await
2201    }
2202
2203    #[instrument(level = "trace", skip_all, fields(subject = %message.subject))]
2204    async fn handle_ctl_message(self: Arc<Self>, message: async_nats::Message) {
2205        // NOTE: if log level is not `trace`, this won't have an effect, since the current span is
2206        // disabled. In most cases that's fine, since we aren't aware of any control interface
2207        // requests including a trace context
2208        opentelemetry_nats::attach_span_context(&message);
2209        // Skip the topic prefix, the version, and the lattice
2210        // e.g. `wasmbus.ctl.v1.{prefix}`
2211        let subject = message.subject;
2212        let mut parts = subject
2213            .trim()
2214            .trim_start_matches(&self.ctl_topic_prefix)
2215            .trim_start_matches('.')
2216            .split('.')
2217            .skip(2);
2218        trace!(%subject, "handling control interface request");
2219
2220        // This response is a wrapped Result<Option<Result<Vec<u8>>>> for a good reason.
2221        // The outer Result is for reporting protocol errors in handling the request, e.g. failing to
2222        //    deserialize the request payload.
2223        // The Option is for the case where the request is handled successfully, but the handler
2224        //    doesn't want to send a response back to the client, like with an auction.
2225        // The inner Result is purely for the success or failure of serializing the [CtlResponse], which
2226        //    should never fail but it's a result we must handle.
2227        // And finally, the Vec<u8> is the serialized [CtlResponse] that we'll send back to the client
2228        let ctl_response = match (parts.next(), parts.next(), parts.next(), parts.next()) {
2229            // Component commands
2230            (Some("component"), Some("auction"), None, None) => self
2231                .handle_auction_component(message.payload)
2232                .await
2233                .map(serialize_ctl_response),
2234            (Some("component"), Some("scale"), Some(_host_id), None) => Arc::clone(&self)
2235                .handle_scale_component(message.payload)
2236                .await
2237                .map(Some)
2238                .map(serialize_ctl_response),
2239            (Some("component"), Some("update"), Some(_host_id), None) => Arc::clone(&self)
2240                .handle_update_component(message.payload)
2241                .await
2242                .map(Some)
2243                .map(serialize_ctl_response),
2244            // Provider commands
2245            (Some("provider"), Some("auction"), None, None) => self
2246                .handle_auction_provider(message.payload)
2247                .await
2248                .map(serialize_ctl_response),
2249            (Some("provider"), Some("start"), Some(_host_id), None) => Arc::clone(&self)
2250                .handle_start_provider(message.payload)
2251                .await
2252                .map(serialize_ctl_response),
2253            (Some("provider"), Some("stop"), Some(_host_id), None) => self
2254                .handle_stop_provider(message.payload)
2255                .await
2256                .map(Some)
2257                .map(serialize_ctl_response),
2258            // Host commands
2259            (Some("host"), Some("get"), Some(_host_id), None) => self
2260                .handle_inventory()
2261                .await
2262                .map(Some)
2263                .map(serialize_ctl_response),
2264            (Some("host"), Some("ping"), None, None) => self
2265                .handle_ping_hosts()
2266                .await
2267                .map(Some)
2268                .map(serialize_ctl_response),
2269            (Some("host"), Some("stop"), Some(host_id), None) => self
2270                .handle_stop_host(message.payload, host_id)
2271                .await
2272                .map(Some)
2273                .map(serialize_ctl_response),
2274            // Claims commands
2275            (Some("claims"), Some("get"), None, None) => self
2276                .handle_claims()
2277                .await
2278                .map(Some)
2279                .map(serialize_ctl_response),
2280            // Link commands
2281            (Some("link"), Some("del"), None, None) => self
2282                .handle_link_del(message.payload)
2283                .await
2284                .map(Some)
2285                .map(serialize_ctl_response),
2286            (Some("link"), Some("get"), None, None) => {
2287                // Explicitly returning a Vec<u8> for non-cloning efficiency within handle_links
2288                self.handle_links().await.map(|bytes| Some(Ok(bytes)))
2289            }
2290            (Some("link"), Some("put"), None, None) => self
2291                .handle_link_put(message.payload)
2292                .await
2293                .map(Some)
2294                .map(serialize_ctl_response),
2295            // Label commands
2296            (Some("label"), Some("del"), Some(host_id), None) => self
2297                .handle_label_del(host_id, message.payload)
2298                .await
2299                .map(Some)
2300                .map(serialize_ctl_response),
2301            (Some("label"), Some("put"), Some(host_id), None) => self
2302                .handle_label_put(host_id, message.payload)
2303                .await
2304                .map(Some)
2305                .map(serialize_ctl_response),
2306            // Registry commands
2307            (Some("registry"), Some("put"), None, None) => self
2308                .handle_registries_put(message.payload)
2309                .await
2310                .map(Some)
2311                .map(serialize_ctl_response),
2312            // Config commands
2313            (Some("config"), Some("get"), Some(config_name), None) => self
2314                .handle_config_get(config_name)
2315                .await
2316                .map(|bytes| Some(Ok(bytes))),
2317            (Some("config"), Some("put"), Some(config_name), None) => self
2318                .handle_config_put(config_name, message.payload)
2319                .await
2320                .map(Some)
2321                .map(serialize_ctl_response),
2322            (Some("config"), Some("del"), Some(config_name), None) => self
2323                .handle_config_delete(config_name)
2324                .await
2325                .map(Some)
2326                .map(serialize_ctl_response),
2327            // Topic fallback
2328            _ => {
2329                warn!(%subject, "received control interface request on unsupported subject");
2330                Ok(serialize_ctl_response(Some(CtlResponse::error(
2331                    "unsupported subject",
2332                ))))
2333            }
2334        };
2335
2336        if let Err(err) = &ctl_response {
2337            error!(%subject, ?err, "failed to handle control interface request");
2338        } else {
2339            trace!(%subject, "handled control interface request");
2340        }
2341
2342        if let Some(reply) = message.reply {
2343            let headers = injector_to_headers(&TraceContextInjector::default_with_span());
2344
2345            let payload: Option<Bytes> = match ctl_response {
2346                Ok(Some(Ok(payload))) => Some(payload.into()),
2347                // No response from the host (e.g. auctioning provider)
2348                Ok(None) => None,
2349                Err(e) => Some(
2350                    serde_json::to_vec(&CtlResponse::error(&e.to_string()))
2351                        .context("failed to encode control interface response")
2352                        // This should never fail to serialize, but the fallback ensures that we send
2353                        // something back to the client even if we somehow fail.
2354                        .unwrap_or_else(|_| format!(r#"{{"success":false,"error":"{e}"}}"#).into())
2355                        .into(),
2356                ),
2357                // This would only occur if we failed to serialize a valid CtlResponse. This is
2358                // programmer error.
2359                Ok(Some(Err(e))) => Some(
2360                    serde_json::to_vec(&CtlResponse::error(&e.to_string()))
2361                        .context("failed to encode control interface response")
2362                        .unwrap_or_else(|_| format!(r#"{{"success":false,"error":"{e}"}}"#).into())
2363                        .into(),
2364                ),
2365            };
2366
2367            if let Some(payload) = payload {
2368                let max_payload = self.ctl_nats.server_info().max_payload;
2369                if payload.len() > max_payload {
2370                    warn!(
2371                        size = payload.len(),
2372                        max_size = max_payload,
2373                        "ctl response payload is too large to publish and may fail",
2374                    );
2375                }
2376                if let Err(err) = self
2377                    .ctl_nats
2378                    .publish_with_headers(reply.clone(), headers, payload)
2379                    .err_into::<anyhow::Error>()
2380                    .and_then(|()| self.ctl_nats.flush().err_into::<anyhow::Error>())
2381                    .await
2382                {
2383                    error!(%subject, ?err, "failed to publish reply to control interface request");
2384                }
2385            }
2386        }
2387    }
2388
2389    // TODO: Remove this before wasmCloud 1.2 is released. This is a backwards-compatible
2390    // provider link definition put that is published to the provider's id, which is what
2391    // providers built for wasmCloud 1.0 expected.
2392    //
2393    // Thankfully, in a lattice where there are no "older" providers running, these publishes
2394    // will return immediately as there will be no subscribers on those topics.
2395    async fn put_backwards_compat_provider_link(&self, link: &Link) -> anyhow::Result<()> {
2396        // Only attempt to publish the backwards-compatible provider link definition if the link
2397        // does not contain any secret values.
2398        let source_config_contains_secret = link
2399            .source_config()
2400            .iter()
2401            .any(|c| c.starts_with(SECRET_PREFIX));
2402        let target_config_contains_secret = link
2403            .target_config()
2404            .iter()
2405            .any(|c| c.starts_with(SECRET_PREFIX));
2406        if source_config_contains_secret || target_config_contains_secret {
2407            debug!("link contains secrets and is not backwards compatible, skipping");
2408            return Ok(());
2409        }
2410        let provider_link = self
2411            .resolve_link_config(link.clone(), None, None, &XKey::new())
2412            .await
2413            .context("failed to resolve link config")?;
2414        let lattice = &self.host_config.lattice;
2415        let payload: Bytes = serde_json::to_vec(&provider_link)
2416            .context("failed to serialize provider link definition")?
2417            .into();
2418
2419        if let Err(e) = self
2420            .rpc_nats
2421            .publish_with_headers(
2422                format!("wasmbus.rpc.{lattice}.{}.linkdefs.put", link.source_id()),
2423                injector_to_headers(&TraceContextInjector::default_with_span()),
2424                payload.clone(),
2425            )
2426            .await
2427        {
2428            warn!(
2429                ?e,
2430                "failed to publish backwards-compatible provider link to source"
2431            );
2432        }
2433
2434        if let Err(e) = self
2435            .rpc_nats
2436            .publish_with_headers(
2437                format!("wasmbus.rpc.{lattice}.{}.linkdefs.put", link.target()),
2438                injector_to_headers(&TraceContextInjector::default_with_span()),
2439                payload,
2440            )
2441            .await
2442        {
2443            warn!(
2444                ?e,
2445                "failed to publish backwards-compatible provider link to target"
2446            );
2447        }
2448
2449        Ok(())
2450    }
2451
2452    /// Publishes a link to a provider running on this host to handle.
2453    #[instrument(level = "debug", skip_all)]
2454    async fn put_provider_link(&self, provider: &Provider, link: &Link) -> anyhow::Result<()> {
2455        let provider_link = self
2456            .resolve_link_config(
2457                link.clone(),
2458                provider.claims_token.as_ref().map(|t| &t.jwt),
2459                provider.annotations.get("wasmcloud.dev/appspec"),
2460                &provider.xkey,
2461            )
2462            .await
2463            .context("failed to resolve link config and secrets")?;
2464        let lattice = &self.host_config.lattice;
2465        let payload: Bytes = serde_json::to_vec(&provider_link)
2466            .context("failed to serialize provider link definition")?
2467            .into();
2468
2469        self.rpc_nats
2470            .publish_with_headers(
2471                format!(
2472                    "wasmbus.rpc.{lattice}.{}.linkdefs.put",
2473                    provider.xkey.public_key()
2474                ),
2475                injector_to_headers(&TraceContextInjector::default_with_span()),
2476                payload.clone(),
2477            )
2478            .await
2479            .context("failed to publish provider link definition put")
2480    }
2481
2482    /// Publishes a delete link to the lattice for all instances of a provider to handle
2483    /// Right now this is publishing _both_ to the source and the target in order to
2484    /// ensure that the provider is aware of the link delete. This would cause problems if a provider
2485    /// is linked to a provider (which it should never be.)
2486    #[instrument(level = "debug", skip(self))]
2487    async fn del_provider_link(&self, link: &Link) -> anyhow::Result<()> {
2488        let lattice = &self.host_config.lattice;
2489        // The provider expects the [`wasmcloud_core::InterfaceLinkDefinition`]
2490        let link = wasmcloud_core::InterfaceLinkDefinition {
2491            source_id: link.source_id().to_string(),
2492            target: link.target().to_string(),
2493            wit_namespace: link.wit_namespace().to_string(),
2494            wit_package: link.wit_package().to_string(),
2495            name: link.name().to_string(),
2496            interfaces: link.interfaces().clone(),
2497            // Configuration isn't needed for deletion
2498            ..Default::default()
2499        };
2500        let source_id = &link.source_id;
2501        let target = &link.target;
2502        let payload: Bytes = serde_json::to_vec(&link)
2503            .context("failed to serialize provider link definition for deletion")?
2504            .into();
2505
2506        let (source_result, target_result) = futures::future::join(
2507            self.rpc_nats.publish_with_headers(
2508                format!("wasmbus.rpc.{lattice}.{source_id}.linkdefs.del"),
2509                injector_to_headers(&TraceContextInjector::default_with_span()),
2510                payload.clone(),
2511            ),
2512            self.rpc_nats.publish_with_headers(
2513                format!("wasmbus.rpc.{lattice}.{target}.linkdefs.del"),
2514                injector_to_headers(&TraceContextInjector::default_with_span()),
2515                payload,
2516            ),
2517        )
2518        .await;
2519
2520        source_result
2521            .and(target_result)
2522            .context("failed to publish provider link definition delete")
2523    }
2524
2525    async fn fetch_config_and_secrets(
2526        &self,
2527        config_names: &[String],
2528        entity_jwt: Option<&String>,
2529        application: Option<&String>,
2530    ) -> anyhow::Result<(ConfigBundle, HashMap<String, SecretBox<SecretValue>>)> {
2531        let (secret_names, config_names) = config_names
2532            .iter()
2533            .map(|s| s.to_string())
2534            .partition(|name| name.starts_with(SECRET_PREFIX));
2535
2536        let config = self
2537            .config_generator
2538            .generate(config_names)
2539            .await
2540            .context("Unable to fetch requested config")?;
2541
2542        let secrets = self
2543            .secrets_manager
2544            .fetch_secrets(secret_names, entity_jwt, &self.host_token.jwt, application)
2545            .await
2546            .context("Unable to fetch requested secrets")?;
2547
2548        Ok((config, secrets))
2549    }
2550
2551    /// Validates that the provided configuration names exist in the store and are valid.
2552    ///
2553    /// For any configuration that starts with `SECRET_`, the configuration is expected to be a secret reference.
2554    /// For any other configuration, the configuration is expected to be a [`HashMap<String, String>`].
2555    async fn validate_config<I>(&self, config_names: I) -> anyhow::Result<()>
2556    where
2557        I: IntoIterator<Item: AsRef<str>>,
2558    {
2559        let config_store = self.config_data.clone();
2560        let validation_errors =
2561            futures::future::join_all(config_names.into_iter().map(|config_name| {
2562                let config_store = config_store.clone();
2563                let config_name = config_name.as_ref().to_string();
2564                async move {
2565                    match config_store.get(&config_name).await {
2566                        Ok(Some(_)) => None,
2567                        Ok(None) if config_name.starts_with(SECRET_PREFIX) => Some(format!(
2568                            "Secret reference {config_name} not found in config store"
2569                        )),
2570                        Ok(None) => Some(format!(
2571                            "Configuration {config_name} not found in config store"
2572                        )),
2573                        Err(e) => Some(e.to_string()),
2574                    }
2575                }
2576            }))
2577            .await;
2578
2579        // NOTE(brooksmtownsend): Not using `join` here because it requires a `String` and we
2580        // need to flatten out the `None` values.
2581        let validation_errors = validation_errors
2582            .into_iter()
2583            .flatten()
2584            .fold(String::new(), |acc, e| acc + &e + ". ");
2585        if !validation_errors.is_empty() {
2586            bail!(format!(
2587                "Failed to validate configuration and secrets. {validation_errors}",
2588            ));
2589        }
2590
2591        Ok(())
2592    }
2593
2594    /// Transform a [`wasmcloud_control_interface::Link`] into a [`wasmcloud_core::InterfaceLinkDefinition`]
2595    /// by fetching the source and target configurations and secrets, and encrypting the secrets.
2596    async fn resolve_link_config(
2597        &self,
2598        link: Link,
2599        provider_jwt: Option<&String>,
2600        application: Option<&String>,
2601        provider_xkey: &XKey,
2602    ) -> anyhow::Result<wasmcloud_core::InterfaceLinkDefinition> {
2603        let (source_bundle, raw_source_secrets) = self
2604            .fetch_config_and_secrets(link.source_config().as_slice(), provider_jwt, application)
2605            .await?;
2606        let (target_bundle, raw_target_secrets) = self
2607            .fetch_config_and_secrets(link.target_config().as_slice(), provider_jwt, application)
2608            .await?;
2609
2610        let source_config = source_bundle.get_config().await;
2611        let target_config = target_bundle.get_config().await;
2612        // NOTE(brooksmtownsend): This trait import is used here to ensure we're only exposing secret
2613        // values when we need them.
2614        use secrecy::ExposeSecret;
2615        let source_secrets_map: HashMap<String, wasmcloud_core::secrets::SecretValue> =
2616            raw_source_secrets
2617                .iter()
2618                .map(|(k, v)| match v.expose_secret() {
2619                    SecretValue::String(s) => (
2620                        k.clone(),
2621                        wasmcloud_core::secrets::SecretValue::String(s.to_owned()),
2622                    ),
2623                    SecretValue::Bytes(b) => (
2624                        k.clone(),
2625                        wasmcloud_core::secrets::SecretValue::Bytes(b.to_owned()),
2626                    ),
2627                })
2628                .collect();
2629        let target_secrets_map: HashMap<String, wasmcloud_core::secrets::SecretValue> =
2630            raw_target_secrets
2631                .iter()
2632                .map(|(k, v)| match v.expose_secret() {
2633                    SecretValue::String(s) => (
2634                        k.clone(),
2635                        wasmcloud_core::secrets::SecretValue::String(s.to_owned()),
2636                    ),
2637                    SecretValue::Bytes(b) => (
2638                        k.clone(),
2639                        wasmcloud_core::secrets::SecretValue::Bytes(b.to_owned()),
2640                    ),
2641                })
2642                .collect();
2643        // Serializing & sealing an empty map results in a non-empty Vec, which is difficult to tell the
2644        // difference between an empty map and an encrypted empty map. To avoid this, we explicitly handle
2645        // the case where the map is empty.
2646        let source_secrets = if source_secrets_map.is_empty() {
2647            None
2648        } else {
2649            Some(
2650                serde_json::to_vec(&source_secrets_map)
2651                    .map(|secrets| self.secrets_xkey.seal(&secrets, provider_xkey))
2652                    .context("failed to serialize and encrypt source secrets")??,
2653            )
2654        };
2655        let target_secrets = if target_secrets_map.is_empty() {
2656            None
2657        } else {
2658            Some(
2659                serde_json::to_vec(&target_secrets_map)
2660                    .map(|secrets| self.secrets_xkey.seal(&secrets, provider_xkey))
2661                    .context("failed to serialize and encrypt target secrets")??,
2662            )
2663        };
2664
2665        Ok(wasmcloud_core::InterfaceLinkDefinition {
2666            source_id: link.source_id().to_string(),
2667            target: link.target().to_string(),
2668            name: link.name().to_string(),
2669            wit_namespace: link.wit_namespace().to_string(),
2670            wit_package: link.wit_package().to_string(),
2671            interfaces: link.interfaces().clone(),
2672            source_config: source_config.clone(),
2673            target_config: target_config.clone(),
2674            source_secrets,
2675            target_secrets,
2676        })
2677    }
2678}
2679
2680/// Helper function to transform a Vec of [`Link`]s into the structure components expect to be able
2681/// to quickly look up the desired target for a given interface
2682///
2683/// # Arguments
2684/// - links: A Vec of [`Link`]s
2685///
2686/// # Returns
2687/// - A `HashMap` in the form of `link_name` -> `instance` -> target
2688fn component_import_links(links: &[Link]) -> HashMap<Box<str>, HashMap<Box<str>, Box<str>>> {
2689    let mut m = HashMap::new();
2690    for link in links {
2691        let instances: &mut HashMap<Box<str>, Box<str>> = m
2692            .entry(link.name().to_string().into_boxed_str())
2693            .or_default();
2694        for interface in link.interfaces() {
2695            instances.insert(
2696                format!(
2697                    "{}:{}/{interface}",
2698                    link.wit_namespace(),
2699                    link.wit_package(),
2700                )
2701                .into_boxed_str(),
2702                link.target().to_string().into_boxed_str(),
2703            );
2704        }
2705    }
2706    m
2707}
2708
2709/// Helper function to serialize `CtlResponse`<T> into a Vec<u8> if the response is Some
2710fn serialize_ctl_response<T: Serialize>(
2711    ctl_response: Option<CtlResponse<T>>,
2712) -> Option<anyhow::Result<Vec<u8>>> {
2713    ctl_response.map(|resp| serde_json::to_vec(&resp).map_err(anyhow::Error::from))
2714}
2715
2716fn human_friendly_uptime(uptime: Duration) -> String {
2717    // strip sub-seconds, then convert to human-friendly format
2718    humantime::format_duration(
2719        uptime.saturating_sub(Duration::from_nanos(uptime.subsec_nanos().into())),
2720    )
2721    .to_string()
2722}
2723
2724fn injector_to_headers(injector: &TraceContextInjector) -> async_nats::header::HeaderMap {
2725    injector
2726        .iter()
2727        .filter_map(|(k, v)| {
2728            // There's not really anything we can do about headers that don't parse
2729            let name = async_nats::header::HeaderName::from_str(k.as_str()).ok()?;
2730            let value = async_nats::header::HeaderValue::from_str(v.as_str()).ok()?;
2731            Some((name, value))
2732        })
2733        .collect()
2734}
2735
2736#[cfg(test)]
2737mod test {
2738    // Ensure that the helper function to translate a list of links into a map of imports works as expected
2739    #[test]
2740    fn can_compute_component_links() {
2741        use std::collections::HashMap;
2742        use wasmcloud_control_interface::Link;
2743
2744        let links = vec![
2745            Link::builder()
2746                .source_id("source_component")
2747                .target("kv-redis")
2748                .wit_namespace("wasi")
2749                .wit_package("keyvalue")
2750                .interfaces(vec!["atomics".into(), "store".into()])
2751                .name("default")
2752                .build()
2753                .expect("failed to build link"),
2754            Link::builder()
2755                .source_id("source_component")
2756                .target("kv-vault")
2757                .wit_namespace("wasi")
2758                .wit_package("keyvalue")
2759                .interfaces(vec!["atomics".into(), "store".into()])
2760                .name("secret")
2761                .source_config(vec![])
2762                .target_config(vec!["my-secret".into()])
2763                .build()
2764                .expect("failed to build link"),
2765            Link::builder()
2766                .source_id("source_component")
2767                .target("kv-vault-offsite")
2768                .wit_namespace("wasi")
2769                .wit_package("keyvalue")
2770                .interfaces(vec!["atomics".into()])
2771                .name("secret")
2772                .source_config(vec![])
2773                .target_config(vec!["my-secret".into()])
2774                .build()
2775                .expect("failed to build link"),
2776            Link::builder()
2777                .source_id("http")
2778                .target("source_component")
2779                .wit_namespace("wasi")
2780                .wit_package("http")
2781                .interfaces(vec!["incoming-handler".into()])
2782                .name("default")
2783                .source_config(vec!["some-port".into()])
2784                .target_config(vec![])
2785                .build()
2786                .expect("failed to build link"),
2787            Link::builder()
2788                .source_id("source_component")
2789                .target("httpclient")
2790                .wit_namespace("wasi")
2791                .wit_package("http")
2792                .interfaces(vec!["outgoing-handler".into()])
2793                .name("default")
2794                .source_config(vec![])
2795                .target_config(vec!["some-port".into()])
2796                .build()
2797                .expect("failed to build link"),
2798            Link::builder()
2799                .source_id("source_component")
2800                .target("other_component")
2801                .wit_namespace("custom")
2802                .wit_package("foo")
2803                .interfaces(vec!["bar".into(), "baz".into()])
2804                .name("default")
2805                .source_config(vec![])
2806                .target_config(vec![])
2807                .build()
2808                .expect("failed to build link"),
2809            Link::builder()
2810                .source_id("other_component")
2811                .target("target")
2812                .wit_namespace("wit")
2813                .wit_package("package")
2814                .interfaces(vec!["interface3".into()])
2815                .name("link2")
2816                .source_config(vec![])
2817                .target_config(vec![])
2818                .build()
2819                .expect("failed to build link"),
2820        ];
2821
2822        let links_map = super::component_import_links(&links);
2823
2824        // Expected structure:
2825        // {
2826        //     "default": {
2827        //         "wasi:keyvalue": {
2828        //             "atomics": "kv-redis",
2829        //             "store": "kv-redis"
2830        //         },
2831        //         "wasi:http": {
2832        //             "incoming-handler": "source_component"
2833        //         },
2834        //         "custom:foo": {
2835        //             "bar": "other_component",
2836        //             "baz": "other_component"
2837        //         }
2838        //     },
2839        //     "secret": {
2840        //         "wasi:keyvalue": {
2841        //             "atomics": "kv-vault-offsite",
2842        //             "store": "kv-vault"
2843        //         }
2844        //     },
2845        //     "link2": {
2846        //         "wit:package": {
2847        //             "interface3": "target"
2848        //         }
2849        //     }
2850        // }
2851        let expected_result = HashMap::from_iter([
2852            (
2853                "default".into(),
2854                HashMap::from([
2855                    ("wasi:keyvalue/atomics".into(), "kv-redis".into()),
2856                    ("wasi:keyvalue/store".into(), "kv-redis".into()),
2857                    (
2858                        "wasi:http/incoming-handler".into(),
2859                        "source_component".into(),
2860                    ),
2861                    ("wasi:http/outgoing-handler".into(), "httpclient".into()),
2862                    ("custom:foo/bar".into(), "other_component".into()),
2863                    ("custom:foo/baz".into(), "other_component".into()),
2864                ]),
2865            ),
2866            (
2867                "secret".into(),
2868                HashMap::from([
2869                    ("wasi:keyvalue/atomics".into(), "kv-vault-offsite".into()),
2870                    ("wasi:keyvalue/store".into(), "kv-vault".into()),
2871                ]),
2872            ),
2873            (
2874                "link2".into(),
2875                HashMap::from([("wit:package/interface3".into(), "target".into())]),
2876            ),
2877        ]);
2878
2879        assert_eq!(links_map, expected_result);
2880    }
2881}