1#![allow(clippy::type_complexity)]
2
3use core::sync::atomic::Ordering;
4
5use std::collections::{hash_map, BTreeMap, HashMap};
6use std::env::consts::{ARCH, FAMILY, OS};
7use std::future::Future;
8use std::num::NonZeroUsize;
9use std::ops::Deref;
10use std::pin::Pin;
11use std::str::FromStr;
12use std::sync::atomic::AtomicBool;
13use std::sync::Arc;
14use std::task::{Context, Poll};
15use std::time::Duration;
16
17use anyhow::{anyhow, bail, ensure, Context as _};
18use bytes::{BufMut, Bytes, BytesMut};
19use claims::{Claims, StoredClaims};
20use futures::stream::{AbortHandle, Abortable};
21use futures::{join, stream, Stream, StreamExt, TryStreamExt};
22use hyper_util::rt::{TokioExecutor, TokioIo};
23use nkeys::{KeyPair, KeyPairType, XKey};
24use providers::Provider;
25use secrecy::SecretBox;
26use serde_json::json;
27use sysinfo::System;
28use tokio::io::AsyncWrite;
29use tokio::net::TcpListener;
30use tokio::spawn;
31use tokio::sync::{mpsc, watch, RwLock, Semaphore};
32use tokio::task::{JoinHandle, JoinSet};
33use tokio::time::{interval_at, timeout, Instant};
34use tokio_stream::wrappers::IntervalStream;
35use tracing::{debug, debug_span, error, info, instrument, trace, warn, Instrument as _};
36use tracing_opentelemetry::OpenTelemetrySpanExt;
37use wascap::jwt;
38use wasmcloud_control_interface::{
39 ComponentAuctionAck, ComponentAuctionRequest, ComponentDescription, CtlResponse,
40 DeleteInterfaceLinkDefinitionRequest, HostInventory, HostLabel, HostLabelIdentifier, Link,
41 ProviderAuctionAck, ProviderAuctionRequest, ProviderDescription, RegistryCredential,
42 ScaleComponentCommand, StartProviderCommand, StopHostCommand, StopProviderCommand,
43 UpdateComponentCommand,
44};
45use wasmcloud_core::ComponentId;
46use wasmcloud_runtime::capability::secrets::store::SecretValue;
47use wasmcloud_runtime::component::{from_string_map, Limits, WrpcServeEvent};
48use wasmcloud_runtime::Runtime;
49use wasmcloud_secrets_types::SECRET_PREFIX;
50use wasmcloud_tracing::context::TraceContextInjector;
51use wasmcloud_tracing::{global, InstrumentationScope, KeyValue};
52
53use crate::event::{DefaultEventPublisher, EventPublisher};
54use crate::metrics::HostMetrics;
55use crate::nats::connect_nats;
56use crate::nats::provider::NatsProviderManager;
57use crate::policy::DefaultPolicyManager;
58use crate::secrets::{DefaultSecretsManager, SecretsManager};
59use crate::store::{DefaultStore, StoreManager};
60use crate::wasmbus::ctl::ControlInterfaceServer;
61use crate::workload_identity::WorkloadIdentityConfig;
62use crate::{fetch_component, PolicyManager, PolicyResponse, RegistryConfig, ResourceRef};
63
64mod component_spec;
65mod experimental;
66mod handler;
67
68pub(crate) mod claims;
69pub(crate) mod providers;
70
71pub mod ctl;
73
74pub mod config;
76
77pub mod host_config;
79
80pub use self::experimental::Features;
81pub use self::host_config::Host as HostConfig;
82pub use component_spec::ComponentSpecification;
83pub use providers::ProviderManager;
84
85use self::config::{BundleGenerator, ConfigBundle};
86use self::handler::Handler;
87
88const MAX_INVOCATION_CHANNEL_SIZE: usize = 5000;
89const MIN_INVOCATION_CHANNEL_SIZE: usize = 256;
90
91#[derive(Clone, Default)]
92struct AsyncBytesMut(Arc<std::sync::Mutex<BytesMut>>);
93
94impl AsyncWrite for AsyncBytesMut {
95 fn poll_write(
96 self: Pin<&mut Self>,
97 _cx: &mut Context<'_>,
98 buf: &[u8],
99 ) -> Poll<Result<usize, std::io::Error>> {
100 Poll::Ready({
101 self.0
102 .lock()
103 .map_err(|e| std::io::Error::other(e.to_string()))?
104 .put_slice(buf);
105 Ok(buf.len())
106 })
107 }
108
109 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
110 Poll::Ready(Ok(()))
111 }
112
113 fn poll_shutdown(
114 self: Pin<&mut Self>,
115 _cx: &mut Context<'_>,
116 ) -> Poll<Result<(), std::io::Error>> {
117 Poll::Ready(Ok(()))
118 }
119}
120
121impl TryFrom<AsyncBytesMut> for Vec<u8> {
122 type Error = anyhow::Error;
123
124 fn try_from(buf: AsyncBytesMut) -> Result<Self, Self::Error> {
125 buf.0
126 .lock()
127 .map(|buf| buf.clone().into())
128 .map_err(|e| anyhow!(e.to_string()).context("failed to lock"))
129 }
130}
131
132type Annotations = BTreeMap<String, String>;
133
134#[derive(Debug)]
135struct Component {
136 component: wasmcloud_runtime::Component<Handler>,
137 id: Arc<str>,
139 handler: Handler,
140 exports: JoinHandle<()>,
141 annotations: Annotations,
142 max_instances: NonZeroUsize,
144 limits: Option<Limits>,
145 image_reference: Arc<str>,
146 events: mpsc::Sender<WrpcServeEvent<<WrpcServer as wrpc_transport::Serve>::Context>>,
147 permits: Arc<Semaphore>,
148}
149
150impl Deref for Component {
151 type Target = wasmcloud_runtime::Component<Handler>;
152
153 fn deref(&self) -> &Self::Target {
154 &self.component
155 }
156}
157
158#[derive(Clone)]
159struct WrpcServer {
160 nats: wrpc_transport_nats::Client,
161 claims: Option<Arc<jwt::Claims<jwt::Component>>>,
162 id: Arc<str>,
163 image_reference: Arc<str>,
164 annotations: Arc<Annotations>,
165 policy_manager: Arc<dyn PolicyManager>,
166 metrics: Arc<HostMetrics>,
167}
168
169struct InvocationContext {
170 start_at: Instant,
171 attributes: Vec<KeyValue>,
172 span: tracing::Span,
173}
174
175impl Deref for InvocationContext {
176 type Target = tracing::Span;
177
178 fn deref(&self) -> &Self::Target {
179 &self.span
180 }
181}
182
183impl wrpc_transport::Serve for WrpcServer {
184 type Context = InvocationContext;
185 type Outgoing = <wrpc_transport_nats::Client as wrpc_transport::Serve>::Outgoing;
186 type Incoming = <wrpc_transport_nats::Client as wrpc_transport::Serve>::Incoming;
187
188 #[instrument(
189 level = "info",
190 skip(self, paths),
191 fields(
192 component_id = ?self.id,
193 component_ref = ?self.image_reference)
194 )]
195 async fn serve(
196 &self,
197 instance: &str,
198 func: &str,
199 paths: impl Into<Arc<[Box<[Option<usize>]>]>> + Send,
200 ) -> anyhow::Result<
201 impl Stream<Item = anyhow::Result<(Self::Context, Self::Outgoing, Self::Incoming)>>
202 + Send
203 + 'static,
204 > {
205 debug!("serving invocations");
206 let invocations = self.nats.serve(instance, func, paths).await?;
207
208 let func: Arc<str> = Arc::from(func);
209 let instance: Arc<str> = Arc::from(instance);
210 let annotations = Arc::clone(&self.annotations);
211 let id = Arc::clone(&self.id);
212 let image_reference = Arc::clone(&self.image_reference);
213 let metrics = Arc::clone(&self.metrics);
214 let policy_manager = Arc::clone(&self.policy_manager);
215 let claims = self.claims.clone();
216 Ok(invocations.and_then(move |(cx, tx, rx)| {
217 let annotations = Arc::clone(&annotations);
218 let claims = claims.clone();
219 let func = Arc::clone(&func);
220 let id = Arc::clone(&id);
221 let image_reference = Arc::clone(&image_reference);
222 let instance = Arc::clone(&instance);
223 let metrics = Arc::clone(&metrics);
224 let policy_manager = Arc::clone(&policy_manager);
225 let span = tracing::info_span!("component_invocation", func = %func, id = %id, instance = %instance);
226 async move {
227 if let Some(ref cx) = cx {
228 let trace_context = cx
231 .iter()
232 .flat_map(|(key, value)| {
233 value
234 .iter()
235 .map(|v| (key.to_string(), v.to_string()))
236 .collect::<Vec<_>>()
237 })
238 .collect::<Vec<(String, String)>>();
239 span.set_parent(wasmcloud_tracing::context::get_span_context(&trace_context));
240 }
241
242 let PolicyResponse {
243 request_id,
244 permitted,
245 message,
246 } = policy_manager
247 .evaluate_perform_invocation(
248 &id,
249 &image_reference,
250 &annotations,
251 claims.as_deref(),
252 instance.to_string(),
253 func.to_string(),
254 )
255 .instrument(debug_span!(parent: &span, "policy_check"))
256 .await?;
257 ensure!(
258 permitted,
259 "policy denied request to invoke component `{request_id}`: `{message:?}`",
260 );
261
262 Ok((
263 InvocationContext{
264 start_at: Instant::now(),
265 attributes: vec![
267 KeyValue::new("component.ref", image_reference),
268 KeyValue::new("lattice", metrics.lattice_id.clone()),
269 KeyValue::new("host", metrics.host_id.clone()),
270 KeyValue::new("operation", format!("{instance}/{func}")),
271 ],
272 span,
273 },
274 tx,
275 rx,
276 ))
277 }
278 }))
279 }
280}
281
282pub struct Host {
286 friendly_name: String,
288
289 heartbeat: AbortHandle,
291
292 host_config: HostConfig,
294
295 host_key: Arc<KeyPair>,
297
298 host_token: Arc<jwt::Token<jwt::Host>>,
300
301 labels: Arc<RwLock<BTreeMap<String, String>>>,
303
304 max_execution_time: Duration,
306
307 start_at: Instant,
309
310 pub stop_tx: watch::Sender<Option<Instant>>,
312
313 pub stop_rx: watch::Receiver<Option<Instant>>,
315
316 experimental_features: Features,
318
319 ready: Arc<AtomicBool>,
321
322 secrets_xkey: Arc<XKey>,
324
325 components: Arc<RwLock<HashMap<ComponentId, Arc<Component>>>>,
327
328 component_claims: Arc<RwLock<HashMap<ComponentId, jwt::Claims<jwt::Component>>>>,
330
331 links: RwLock<HashMap<String, Vec<Link>>>,
333
334 messaging_links:
336 Arc<RwLock<HashMap<Arc<str>, Arc<RwLock<HashMap<Box<str>, async_nats::Client>>>>>>,
337
338 providers: RwLock<HashMap<String, Provider>>,
340
341 provider_claims: Arc<RwLock<HashMap<String, jwt::Claims<jwt::CapabilityProvider>>>>,
343
344 runtime: Runtime,
346
347 registry_config: RwLock<HashMap<String, RegistryConfig>>,
349
350 rpc_nats: Arc<async_nats::Client>,
352
353 provider_manager: Arc<dyn ProviderManager>,
355
356 metrics: Arc<HostMetrics>,
358
359 pub(crate) event_publisher: Arc<dyn EventPublisher>,
361
362 policy_manager: Arc<dyn PolicyManager>,
364
365 secrets_manager: Arc<dyn SecretsManager>,
367
368 config_store: Arc<dyn StoreManager>,
370
371 data_store: Arc<dyn StoreManager>,
373
374 config_generator: BundleGenerator,
376
377 #[allow(unused)]
379 tasks: JoinSet<()>,
380}
381
382#[derive(Default)]
384pub struct HostBuilder {
385 config: HostConfig,
386
387 registry_config: HashMap<String, RegistryConfig>,
389
390 bundle_generator: Option<BundleGenerator>,
392 config_store: Option<Arc<dyn StoreManager>>,
394 data_store: Option<Arc<dyn StoreManager>>,
396 event_publisher: Option<Arc<dyn EventPublisher>>,
398 policy_manager: Option<Arc<dyn PolicyManager>>,
400 secrets_manager: Option<Arc<dyn SecretsManager>>,
402}
403
404impl HostBuilder {
405 const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30);
406
407 const NAME_ADJECTIVES: &'static str = "
408 autumn hidden bitter misty silent empty dry dark summer
409 icy delicate quiet white cool spring winter patient
410 twilight dawn crimson wispy weathered blue billowing
411 broken cold damp falling frosty green long late lingering
412 bold little morning muddy old red rough still small
413 sparkling bouncing shy wandering withered wild black
414 young holy solitary fragrant aged snowy proud floral
415 restless divine polished ancient purple lively nameless
416 gray orange mauve
417 ";
418
419 const NAME_NOUNS: &'static str = "
420 waterfall river breeze moon rain wind sea morning
421 snow lake sunset pine shadow leaf dawn glitter forest
422 hill cloud meadow sun glade bird brook butterfly
423 bush dew dust field fire flower firefly ladybug feather grass
424 haze mountain night pond darkness snowflake silence
425 sound sky shape stapler surf thunder violet water wildflower
426 wave water resonance sun timber dream cherry tree fog autocorrect
427 frost voice paper frog smoke star hamster ocean emoji robot
428 ";
429
430 fn generate_friendly_name() -> Option<String> {
434 let adjectives: Vec<_> = Self::NAME_ADJECTIVES.split_whitespace().collect();
435 let nouns: Vec<_> = Self::NAME_NOUNS.split_whitespace().collect();
436 names::Generator::new(&adjectives, &nouns, names::Name::Numbered).next()
437 }
438
439 pub fn new() -> Self {
441 HostBuilder::default()
442 }
443
444 pub fn with_event_publisher(self, event_publisher: Option<Arc<dyn EventPublisher>>) -> Self {
446 Self {
447 event_publisher,
448 ..self
449 }
450 }
451
452 pub fn with_policy_manager(self, policy_manager: Option<Arc<dyn PolicyManager>>) -> Self {
454 Self {
455 policy_manager,
456 ..self
457 }
458 }
459
460 pub fn with_registry_config(self, registry_config: HashMap<String, RegistryConfig>) -> Self {
462 Self {
463 registry_config,
464 ..self
465 }
466 }
467
468 pub fn with_secrets_manager(self, secrets_manager: Option<Arc<dyn SecretsManager>>) -> Self {
470 Self {
471 secrets_manager,
472 ..self
473 }
474 }
475
476 pub fn with_config_store(self, config_store: Option<Arc<dyn StoreManager>>) -> Self {
478 Self {
479 config_store,
480 ..self
481 }
482 }
483
484 pub fn with_data_store(self, data_store: Option<Arc<dyn StoreManager>>) -> Self {
486 Self { data_store, ..self }
487 }
488
489 pub fn with_bundle_generator(self, bundle_generator: Option<BundleGenerator>) -> Self {
491 Self {
492 bundle_generator,
493 ..self
494 }
495 }
496
497 #[instrument(level = "debug", skip_all)]
499 pub async fn build(
500 self,
501 ) -> anyhow::Result<(Arc<Host>, impl Future<Output = anyhow::Result<()>>)> {
502 ensure!(self.config.host_key.key_pair_type() == KeyPairType::Server);
503
504 let mut labels = BTreeMap::from([
505 ("hostcore.arch".into(), ARCH.into()),
506 ("hostcore.os".into(), OS.into()),
507 ("hostcore.osfamily".into(), FAMILY.into()),
508 ]);
509 labels.extend(self.config.labels.clone().into_iter());
510 let friendly_name =
511 Self::generate_friendly_name().context("failed to generate friendly name")?;
512
513 let host_issuer = Arc::new(KeyPair::new_account());
514 let claims = jwt::Claims::<jwt::Host>::new(
515 friendly_name.clone(),
516 host_issuer.public_key(),
517 self.config.host_key.public_key().clone(),
518 Some(HashMap::from_iter([(
519 "self_signed".to_string(),
520 "true".to_string(),
521 )])),
522 );
523 let jwt = claims
524 .encode(&host_issuer)
525 .context("failed to encode host claims")?;
526 let host_token = Arc::new(jwt::Token { jwt, claims });
527
528 let workload_identity_config = if self.config.experimental_features.workload_identity_auth {
529 Some(WorkloadIdentityConfig::from_env()?)
530 } else {
531 None
532 };
533
534 debug!(
535 rpc_nats_url = self.config.rpc_nats_url.as_str(),
536 "connecting to NATS RPC server"
537 );
538 let rpc_nats = Arc::new(
539 connect_nats(
540 self.config.rpc_nats_url.as_str(),
541 self.config.rpc_jwt.as_ref(),
542 self.config.rpc_key.clone(),
543 self.config.rpc_tls,
544 Some(self.config.rpc_timeout),
545 workload_identity_config.clone(),
546 )
547 .await
548 .context("failed to establish NATS RPC server connection")?,
549 );
550
551 let (stop_tx, stop_rx) = watch::channel(None);
552
553 let (runtime, _epoch) = Runtime::builder()
554 .max_execution_time(self.config.max_execution_time)
555 .max_linear_memory(self.config.max_linear_memory)
556 .max_components(self.config.max_components)
557 .max_core_instances_per_component(self.config.max_core_instances_per_component)
558 .max_component_size(self.config.max_component_size)
559 .experimental_features(self.config.experimental_features.into())
560 .build()
561 .context("failed to build runtime")?;
562
563 let scope = InstrumentationScope::builder("wasmcloud-host")
564 .with_version(self.config.version.clone())
565 .with_attributes(vec![
566 KeyValue::new("host.id", self.config.host_key.public_key()),
567 KeyValue::new("host.version", self.config.version.clone()),
568 KeyValue::new("host.arch", ARCH),
569 KeyValue::new("host.os", OS),
570 KeyValue::new("host.osfamily", FAMILY),
571 KeyValue::new("host.friendly_name", friendly_name.clone()),
572 KeyValue::new("host.hostname", System::host_name().unwrap_or_default()),
573 KeyValue::new(
574 "host.kernel_version",
575 System::kernel_version().unwrap_or_default(),
576 ),
577 KeyValue::new("host.os_version", System::os_version().unwrap_or_default()),
578 ])
579 .build();
580 let meter = global::meter_with_scope(scope);
581 let metrics = HostMetrics::new(
582 &meter,
583 self.config.host_key.public_key(),
584 self.config.lattice.to_string(),
585 None,
586 )
587 .context("failed to create HostMetrics instance")?;
588
589 debug!("Feature flags: {:?}", self.config.experimental_features);
590 let mut tasks = JoinSet::new();
591 let ready = Arc::new(AtomicBool::new(true));
592 if let Some(addr) = self.config.http_admin {
593 let socket = TcpListener::bind(addr)
594 .await
595 .context("failed to bind on HTTP administration endpoint")?;
596 let ready = Arc::clone(&ready);
597 let svc = hyper::service::service_fn(move |req| {
598 const OK: &str = r#"{"status":"ok"}"#;
599 const FAIL: &str = r#"{"status":"failure"}"#;
600 let ready = Arc::clone(&ready);
601 async move {
602 let (http::request::Parts { method, uri, .. }, _) = req.into_parts();
603 match (method.as_str(), uri.path()) {
604 ("HEAD", "/livez") => Ok(http::Response::default()),
605 ("GET", "/livez") => Ok(http::Response::new(http_body_util::Full::new(
606 Bytes::from(OK),
607 ))),
608 (method, "/livez") => http::Response::builder()
609 .status(http::StatusCode::METHOD_NOT_ALLOWED)
610 .body(http_body_util::Full::new(Bytes::from(format!(
611 "method `{method}` not supported for path `/livez`"
612 )))),
613 ("HEAD", "/readyz") => {
614 if ready.load(Ordering::Relaxed) {
615 Ok(http::Response::default())
616 } else {
617 http::Response::builder()
618 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
619 .body(http_body_util::Full::default())
620 }
621 }
622 ("GET", "/readyz") => {
623 if ready.load(Ordering::Relaxed) {
624 Ok(http::Response::new(http_body_util::Full::new(Bytes::from(
625 OK,
626 ))))
627 } else {
628 http::Response::builder()
629 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
630 .body(http_body_util::Full::new(Bytes::from(FAIL)))
631 }
632 }
633 (method, "/readyz") => http::Response::builder()
634 .status(http::StatusCode::METHOD_NOT_ALLOWED)
635 .body(http_body_util::Full::new(Bytes::from(format!(
636 "method `{method}` not supported for path `/readyz`"
637 )))),
638 (.., path) => http::Response::builder()
639 .status(http::StatusCode::NOT_FOUND)
640 .body(http_body_util::Full::new(Bytes::from(format!(
641 "unknown endpoint `{path}`"
642 )))),
643 }
644 }
645 });
646 let srv = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
647 tasks.spawn(async move {
648 loop {
649 let stream = match socket.accept().await {
650 Ok((stream, _)) => stream,
651 Err(err) => {
652 error!(?err, "failed to accept HTTP administration connection");
653 continue;
654 }
655 };
656 let svc = svc.clone();
657 if let Err(err) = srv.serve_connection(TokioIo::new(stream), svc).await {
658 error!(?err, "failed to serve HTTP administration connection");
659 }
660 }
661 });
662 }
663
664 let (heartbeat_abort, heartbeat_abort_reg) = AbortHandle::new_pair();
665 let start_at = Instant::now();
666
667 let host = Host {
668 components: Arc::new(RwLock::new(HashMap::new())),
669 providers: RwLock::new(HashMap::new()),
670 friendly_name,
671 heartbeat: heartbeat_abort.clone(),
672 host_key: self.config.host_key.clone(),
673 host_token,
674 secrets_xkey: Arc::new(XKey::new()),
675 labels: Arc::new(RwLock::new(labels)),
676 experimental_features: self.config.experimental_features,
677 runtime,
678 start_at,
679 stop_rx,
680 stop_tx,
681 links: RwLock::new(HashMap::new()),
682 component_claims: Arc::new(RwLock::new(HashMap::new())),
683 provider_claims: Arc::new(RwLock::new(HashMap::new())),
684 metrics: Arc::new(metrics),
685 max_execution_time: self.config.max_execution_time,
686 messaging_links: Arc::default(),
687 ready: Arc::clone(&ready),
688 tasks,
689 rpc_nats: Arc::clone(&rpc_nats),
690 registry_config: RwLock::new(self.registry_config),
691 event_publisher: self
693 .event_publisher
694 .unwrap_or_else(|| Arc::new(DefaultEventPublisher::default())),
695 policy_manager: self
696 .policy_manager
697 .unwrap_or_else(|| Arc::new(DefaultPolicyManager)),
698 secrets_manager: self
699 .secrets_manager
700 .unwrap_or_else(|| Arc::new(DefaultSecretsManager::default())),
701 data_store: self
702 .data_store
703 .unwrap_or_else(|| Arc::new(DefaultStore::default())),
704 config_store: self
705 .config_store
706 .unwrap_or_else(|| Arc::new(DefaultStore::default())),
707 config_generator: self
708 .bundle_generator
709 .unwrap_or_else(|| BundleGenerator::new(Arc::new(DefaultStore::default()))),
710 provider_manager: Arc::new(NatsProviderManager::new(
714 rpc_nats,
715 self.config.lattice.to_string(),
716 )),
717 host_config: self.config,
718 };
719
720 let host = Arc::new(host);
721
722 let heartbeat_interval = host
723 .host_config
724 .heartbeat_interval
725 .unwrap_or(Self::DEFAULT_HEARTBEAT_INTERVAL);
726 let heartbeat_start_at = start_at
727 .checked_add(heartbeat_interval)
728 .context("failed to compute heartbeat start time")?;
729 let heartbeat = IntervalStream::new(interval_at(heartbeat_start_at, heartbeat_interval));
730 let heartbeat = spawn({
731 let host = Arc::clone(&host);
732 async move {
733 let mut heartbeat = Abortable::new(heartbeat, heartbeat_abort_reg);
734 heartbeat
735 .by_ref()
736 .for_each({
737 let host = Arc::clone(&host);
738 move |_| {
739 let host = Arc::clone(&host);
740 async move {
741 let heartbeat = match host.heartbeat().await {
742 Ok(heartbeat) => heartbeat,
743 Err(e) => {
744 error!("failed to generate heartbeat: {e}");
745 return;
746 }
747 };
748
749 if let Err(e) = host
750 .event_publisher
751 .publish_event("host_heartbeat", heartbeat)
752 .await
753 {
754 error!("failed to publish heartbeat: {e}");
755 }
756 }
757 }
758 })
759 .await;
760 let deadline = { *host.stop_rx.borrow() };
761 host.stop_tx.send_replace(deadline);
762 if heartbeat.is_aborted() {
763 info!("heartbeat task gracefully stopped");
764 } else {
765 error!("heartbeat task unexpectedly stopped");
766 }
767 }
768 });
769
770 let start_evt = json!({
771 "id": host.host_key.public_key(),
772 "friendly_name": host.friendly_name,
773 "labels": *host.labels.read().await,
774 "uptime_seconds": 0,
775 "version": host.host_config.version,
776 });
777 host.event_publisher
778 .publish_event("host_started", start_evt)
779 .await
780 .context("failed to publish start event")?;
781 info!(
782 host_id = host.host_key.public_key(),
783 "wasmCloud host started"
784 );
785
786 Ok((Arc::clone(&host), async move {
787 ready.store(false, Ordering::Relaxed);
788 heartbeat_abort.abort();
789 heartbeat.await.context("failed to await heartbeat")?;
790 host.event_publisher
791 .publish_event(
792 "host_stopped",
793 json!({
794 "labels": *host.labels.read().await,
795 }),
796 )
797 .await
798 .context("failed to publish stop event")?;
799 host.rpc_nats
802 .flush()
803 .await
804 .context("failed to flush NATS clients")?;
805 Ok(())
806 }))
807 }
808}
809
810impl From<HostConfig> for HostBuilder {
811 fn from(config: HostConfig) -> Self {
812 HostBuilder {
813 config,
814 ..Default::default()
815 }
816 }
817}
818
819impl Host {
820 #[instrument(level = "debug", skip_all)]
822 pub fn builder() -> HostBuilder {
823 HostBuilder::default()
824 }
825
826 #[instrument(level = "debug", skip_all)]
833 pub async fn stopped(&self) -> anyhow::Result<Option<Instant>> {
834 self.stop_rx
835 .clone()
836 .changed()
837 .await
838 .context("failed to wait for stop")?;
839 Ok(*self.stop_rx.borrow())
840 }
841
842 #[instrument(level = "trace", skip_all)]
844 pub fn id(&self) -> String {
845 self.host_key.public_key()
846 }
847
848 #[instrument(level = "trace", skip_all)]
850 pub fn lattice(&self) -> &str {
851 &self.host_config.lattice
852 }
853
854 #[instrument(level = "debug", skip_all)]
855 async fn inventory(&self) -> HostInventory {
856 trace!("generating host inventory");
857 let components: Vec<_> = {
858 let components = self.components.read().await;
859 stream::iter(components.iter())
860 .filter_map(|(id, component)| async move {
861 let mut description = ComponentDescription::builder()
862 .id(id.into())
863 .image_ref(component.image_reference.to_string())
864 .annotations(component.annotations.clone().into_iter().collect())
865 .max_instances(component.max_instances.get().try_into().unwrap_or(u32::MAX))
866 .limits(component.limits.map(|limits| limits.to_string_map()))
867 .revision(
868 component
869 .claims()
870 .and_then(|claims| claims.metadata.as_ref())
871 .and_then(|jwt::Component { rev, .. }| *rev)
872 .unwrap_or_default(),
873 );
874 if let Some(name) = component
876 .claims()
877 .and_then(|claims| claims.metadata.as_ref())
878 .and_then(|metadata| metadata.name.as_ref())
879 .cloned()
880 {
881 description = description.name(name);
882 };
883
884 Some(
885 description
886 .build()
887 .expect("failed to build component description: {e}"),
888 )
889 })
890 .collect()
891 .await
892 };
893
894 let providers: Vec<_> = self
895 .providers
896 .read()
897 .await
898 .iter()
899 .map(
900 |(
901 provider_id,
902 Provider {
903 annotations,
904 claims_token,
905 image_ref,
906 ..
907 },
908 )| {
909 let mut provider_description = ProviderDescription::builder()
910 .id(provider_id)
911 .image_ref(image_ref);
912 if let Some(name) = claims_token
913 .as_ref()
914 .and_then(|claims| claims.claims.metadata.as_ref())
915 .and_then(|metadata| metadata.name.as_ref())
916 {
917 provider_description = provider_description.name(name);
918 }
919 provider_description
920 .annotations(
921 annotations
922 .clone()
923 .into_iter()
924 .collect::<BTreeMap<String, String>>(),
925 )
926 .revision(
927 claims_token
928 .as_ref()
929 .and_then(|claims| claims.claims.metadata.as_ref())
930 .and_then(|jwt::CapabilityProvider { rev, .. }| *rev)
931 .unwrap_or_default(),
932 )
933 .build()
934 .expect("failed to build provider description")
935 },
936 )
937 .collect();
938
939 let uptime = self.start_at.elapsed();
940 HostInventory::builder()
941 .components(components)
942 .providers(providers)
943 .friendly_name(self.friendly_name.clone())
944 .labels(self.labels.read().await.clone())
945 .uptime_human(human_friendly_uptime(uptime))
946 .uptime_seconds(uptime.as_secs())
947 .version(self.host_config.version.clone())
948 .host_id(self.host_key.public_key())
949 .build()
950 .expect("failed to build host inventory")
951 }
952
953 #[instrument(level = "debug", skip_all)]
954 async fn heartbeat(&self) -> anyhow::Result<serde_json::Value> {
955 trace!("generating heartbeat");
956 Ok(serde_json::to_value(self.inventory().await)?)
957 }
958
959 #[allow(clippy::too_many_arguments)] #[instrument(level = "debug", skip_all)]
962 async fn instantiate_component(
963 &self,
964 annotations: &Annotations,
965 image_reference: Arc<str>,
966 id: Arc<str>,
967 max_instances: NonZeroUsize,
968 limits: Option<Limits>,
969 mut component: wasmcloud_runtime::Component<Handler>,
970 handler: Handler,
971 ) -> anyhow::Result<Arc<Component>> {
972 trace!(
973 component_ref = ?image_reference,
974 max_instances,
975 "instantiating component"
976 );
977
978 let max_execution_time = self.max_execution_time; component.set_max_execution_time(max_execution_time);
980
981 let (events_tx, mut events_rx) = mpsc::channel(
982 max_instances
983 .get()
984 .clamp(MIN_INVOCATION_CHANNEL_SIZE, MAX_INVOCATION_CHANNEL_SIZE),
985 );
986 let prefix = Arc::from(format!("{}.{id}", &self.host_config.lattice));
987 let nats = wrpc_transport_nats::Client::new(
988 Arc::clone(&self.rpc_nats),
989 Arc::clone(&prefix),
990 Some(prefix),
991 )
992 .await?;
993 let exports = component
994 .serve_wrpc(
995 &WrpcServer {
996 nats,
997 claims: component.claims().cloned().map(Arc::new),
998 id: Arc::clone(&id),
999 image_reference: Arc::clone(&image_reference),
1000 annotations: Arc::new(annotations.clone()),
1001 policy_manager: Arc::clone(&self.policy_manager),
1002 metrics: Arc::clone(&self.metrics),
1003 },
1004 handler.clone(),
1005 events_tx.clone(),
1006 )
1007 .await?;
1008 let permits = Arc::new(Semaphore::new(
1009 usize::from(max_instances).min(Semaphore::MAX_PERMITS),
1010 ));
1011 let component_attributes = Arc::new(vec![
1012 KeyValue::new("component.id", id.to_string()),
1013 KeyValue::new("component.ref", image_reference.to_string()),
1014 KeyValue::new("lattice", self.host_config.lattice.clone()),
1015 KeyValue::new("host", self.host_key.public_key()),
1016 ]);
1017 self.metrics
1018 .set_max_instances(max_instances.get() as u64, &component_attributes);
1019
1020 let metrics = Arc::clone(&self.metrics);
1021 Ok(Arc::new(Component {
1022 component,
1023 id: Arc::clone(&id),
1024 handler,
1025 events: events_tx,
1026 permits: Arc::clone(&permits),
1027 exports: spawn(async move {
1028 let metrics_left = Arc::clone(&metrics);
1030 let metrics_right = Arc::clone(&metrics);
1031 join!(
1032 async move {
1033 let mut exports = stream::select_all(exports);
1034 loop {
1035 let metrics_left = Arc::clone(&metrics_left);
1036 let component_attributes = Arc::clone(&component_attributes);
1037 let permits = Arc::clone(&permits);
1038 if let Some(fut) = exports.next().await {
1039 match fut {
1040 Ok(fut) => {
1041 debug!("accepted invocation, acquiring permit");
1042 let permit = permits.acquire_owned().await;
1043
1044 metrics_left
1046 .increment_active_instance(&component_attributes);
1047 spawn(async move {
1048 let _permit = permit;
1049 debug!("handling invocation");
1050 let result = timeout(max_execution_time, fut).await;
1052 metrics_left
1053 .decrement_active_instance(&component_attributes);
1054
1055 match result {
1056 Ok(Ok(())) => {
1057 debug!("successfully handled invocation");
1058 Ok(())
1059 }
1060 Ok(Err(err)) => {
1061 warn!(?err, "failed to handle invocation");
1062 Err(err)
1063 }
1064 Err(_err) => {
1065 warn!("component invocation timed out");
1066 Err(anyhow::anyhow!(
1067 "component invocation timed out"
1068 ))
1069 }
1070 }
1071 });
1072 }
1073 Err(err) => {
1074 warn!(?err, "failed to accept invocation")
1075 }
1076 }
1077 }
1078 }
1079 },
1080 async move {
1081 while let Some(evt) = events_rx.recv().await {
1082 match evt {
1083 WrpcServeEvent::HttpIncomingHandlerHandleReturned {
1084 context:
1085 InvocationContext {
1086 start_at,
1087 ref attributes,
1088 ..
1089 },
1090 success,
1091 }
1092 | WrpcServeEvent::MessagingHandlerHandleMessageReturned {
1093 context:
1094 InvocationContext {
1095 start_at,
1096 ref attributes,
1097 ..
1098 },
1099 success,
1100 }
1101 | WrpcServeEvent::DynamicExportReturned {
1102 context:
1103 InvocationContext {
1104 start_at,
1105 ref attributes,
1106 ..
1107 },
1108 success,
1109 } => metrics_right.record_component_invocation(
1110 u64::try_from(start_at.elapsed().as_nanos())
1111 .unwrap_or_default(),
1112 attributes,
1113 !success,
1114 ),
1115 }
1116 }
1117 debug!("serving event stream is done");
1118 },
1119 );
1120 debug!("export serving task done");
1121 }),
1122 annotations: annotations.clone(),
1123 max_instances,
1124 limits,
1125 image_reference: Arc::clone(&image_reference),
1126 }))
1127 }
1128
1129 #[allow(clippy::too_many_arguments)]
1130 #[instrument(level = "debug", skip_all)]
1131 async fn start_component<'a>(
1132 &self,
1133 entry: hash_map::VacantEntry<'a, String, Arc<Component>>,
1134 wasm: &[u8],
1135 claims: Option<jwt::Claims<jwt::Component>>,
1136 component_ref: Arc<str>,
1137 component_id: Arc<str>,
1138 max_instances: NonZeroUsize,
1139 limits: Option<Limits>,
1140 annotations: &Annotations,
1141 config: ConfigBundle,
1142 secrets: HashMap<String, SecretBox<SecretValue>>,
1143 ) -> anyhow::Result<&'a mut Arc<Component>> {
1144 debug!(?component_ref, ?max_instances, "starting new component");
1145
1146 if let Some(ref claims) = claims {
1147 self.store_claims(Claims::Component(claims.clone()))
1148 .await
1149 .context("failed to store claims")?;
1150 }
1151
1152 let component_spec = self
1153 .get_component_spec(&component_id)
1154 .await?
1155 .unwrap_or_else(|| ComponentSpecification::new(&component_ref));
1156 self.store_component_spec(&component_id, &component_spec)
1157 .await?;
1158
1159 let handler = Handler {
1161 nats: Arc::clone(&self.rpc_nats),
1162 config_data: Arc::new(RwLock::new(config)),
1163 lattice: Arc::clone(&self.host_config.lattice),
1164 component_id: Arc::clone(&component_id),
1165 secrets: Arc::new(RwLock::new(secrets)),
1166 targets: Arc::default(),
1167 instance_links: Arc::new(RwLock::new(component_import_links(&component_spec.links))),
1168 messaging_links: {
1169 let mut links = self.messaging_links.write().await;
1170 Arc::clone(links.entry(Arc::clone(&component_id)).or_default())
1171 },
1172 invocation_timeout: Duration::from_secs(10), experimental_features: self.experimental_features,
1174 host_labels: Arc::clone(&self.labels),
1175 };
1176 let component = wasmcloud_runtime::Component::new(&self.runtime, wasm, limits)?;
1177 let component = self
1178 .instantiate_component(
1179 annotations,
1180 Arc::clone(&component_ref),
1181 Arc::clone(&component_id),
1182 max_instances,
1183 limits,
1184 component,
1185 handler,
1186 )
1187 .await
1188 .context("failed to instantiate component")?;
1189
1190 info!(?component_ref, "component started");
1191 self.event_publisher
1192 .publish_event(
1193 "component_scaled",
1194 crate::event::component_scaled(
1195 claims.as_ref(),
1196 annotations,
1197 self.host_key.public_key(),
1198 max_instances,
1199 &component_ref,
1200 &component_id,
1201 ),
1202 )
1203 .await?;
1204
1205 Ok(entry.insert(component))
1206 }
1207
1208 #[instrument(level = "debug", skip_all)]
1209 async fn stop_component(&self, component: &Component, _host_id: &str) -> anyhow::Result<()> {
1210 trace!(component_id = %component.id, "stopping component");
1211
1212 component.exports.abort();
1213
1214 Ok(())
1215 }
1216
1217 #[instrument(level = "trace", skip_all)]
1218 async fn fetch_component(&self, component_ref: &str) -> anyhow::Result<Vec<u8>> {
1219 let registry_config = self.registry_config.read().await;
1220 fetch_component(
1221 component_ref,
1222 self.host_config.allow_file_load,
1223 &self.host_config.oci_opts,
1224 ®istry_config,
1225 )
1226 .await
1227 .context("failed to fetch component")
1228 }
1229
1230 #[instrument(level = "debug", skip_all)]
1231 pub(crate) async fn handle_auction_component(
1232 &self,
1233 payload: impl AsRef<[u8]>,
1234 ) -> anyhow::Result<Option<CtlResponse<ComponentAuctionAck>>> {
1235 let request = serde_json::from_slice::<ComponentAuctionRequest>(payload.as_ref())
1236 .context("failed to deserialize component auction command")?;
1237 <Self as ControlInterfaceServer>::handle_auction_component(self, request).await
1238 }
1239
1240 #[instrument(level = "debug", skip_all)]
1241 pub(crate) async fn handle_auction_provider(
1242 &self,
1243 payload: impl AsRef<[u8]>,
1244 ) -> anyhow::Result<Option<CtlResponse<ProviderAuctionAck>>> {
1245 let request = serde_json::from_slice::<ProviderAuctionRequest>(payload.as_ref())
1246 .context("failed to deserialize provider auction command")?;
1247 <Self as ControlInterfaceServer>::handle_auction_provider(self, request).await
1248 }
1249
1250 #[instrument(level = "debug", skip_all)]
1251 pub(crate) async fn handle_stop_host(
1252 &self,
1253 payload: impl AsRef<[u8]>,
1254 transport_host_id: &str,
1255 ) -> anyhow::Result<CtlResponse<()>> {
1256 let timeout = if payload.as_ref().is_empty() {
1258 None
1259 } else {
1260 let cmd = serde_json::from_slice::<StopHostCommand>(payload.as_ref())
1261 .context("failed to deserialize stop command")?;
1262 let timeout = cmd.timeout();
1263 let host_id = cmd.host_id();
1264
1265 if !host_id.is_empty() {
1268 anyhow::ensure!(
1269 host_id == transport_host_id && host_id == self.host_key.public_key(),
1270 "invalid host_id [{host_id}]"
1271 );
1272 }
1273 timeout
1274 };
1275
1276 anyhow::ensure!(
1278 transport_host_id == self.host_key.public_key(),
1279 "invalid host_id [{transport_host_id}]"
1280 );
1281
1282 let mut stop_command = StopHostCommand::builder().host_id(transport_host_id);
1283 if let Some(timeout) = timeout {
1284 stop_command = stop_command.timeout(timeout);
1285 }
1286 <Self as ControlInterfaceServer>::handle_stop_host(
1287 self,
1288 stop_command
1289 .build()
1290 .map_err(|e| anyhow!(e))
1291 .context("failed to build stop host command")?,
1292 )
1293 .await
1294 }
1295
1296 #[instrument(level = "debug", skip_all)]
1297 pub(crate) async fn handle_scale_component(
1298 self: Arc<Self>,
1299 payload: impl AsRef<[u8]>,
1300 ) -> anyhow::Result<CtlResponse<()>> {
1301 let request = serde_json::from_slice::<ScaleComponentCommand>(payload.as_ref())
1302 .context("failed to deserialize component scale command")?;
1303 <Self as ControlInterfaceServer>::handle_scale_component(self, request).await
1304 }
1305
1306 #[instrument(level = "debug", skip_all)]
1307 #[allow(clippy::too_many_arguments)]
1310 async fn handle_scale_component_task(
1311 &self,
1312 component_ref: Arc<str>,
1313 component_id: Arc<str>,
1314 host_id: &str,
1315 max_instances: u32,
1316 component_limits: Option<HashMap<String, String>>,
1317 annotations: &Annotations,
1318 config: Vec<String>,
1319 wasm: anyhow::Result<Vec<u8>>,
1320 claims_token: Option<&jwt::Token<jwt::Component>>,
1321 ) -> anyhow::Result<()> {
1322 trace!(?component_ref, max_instances, "scale component task");
1323
1324 let claims = claims_token.map(|c| c.claims.clone());
1325 match self
1326 .policy_manager
1327 .evaluate_start_component(
1328 &component_id,
1329 &component_ref,
1330 max_instances,
1331 annotations,
1332 claims.as_ref(),
1333 )
1334 .await?
1335 {
1336 PolicyResponse {
1337 permitted: false,
1338 message: Some(message),
1339 ..
1340 } => bail!("Policy denied request to scale component `{component_id}`: `{message:?}`"),
1341 PolicyResponse {
1342 permitted: false, ..
1343 } => bail!("Policy denied request to scale component `{component_id}`"),
1344 PolicyResponse {
1345 permitted: true, ..
1346 } => (),
1347 };
1348
1349 let limits: Option<Limits> = from_string_map(component_limits.as_ref());
1350
1351 let scaled_event = match (
1352 self.components
1353 .write()
1354 .await
1355 .entry(component_id.to_string()),
1356 NonZeroUsize::new(max_instances as usize),
1357 ) {
1358 (hash_map::Entry::Vacant(_), None) => crate::event::component_scaled(
1361 claims.as_ref(),
1362 annotations,
1363 host_id,
1364 0_usize,
1365 &component_ref,
1366 &component_id,
1367 ),
1368 (hash_map::Entry::Vacant(entry), Some(max)) => {
1370 let (config, secrets) = self
1371 .fetch_config_and_secrets(
1372 &config,
1373 claims_token.as_ref().map(|c| &c.jwt),
1374 annotations.get("wasmcloud.dev/appspec"),
1375 )
1376 .await?;
1377 match &wasm {
1378 Ok(wasm) => {
1379 self.start_component(
1380 entry,
1381 wasm,
1382 claims.clone(),
1383 Arc::clone(&component_ref),
1384 Arc::clone(&component_id),
1385 max,
1386 limits,
1387 annotations,
1388 config,
1389 secrets,
1390 )
1391 .await?;
1392
1393 crate::event::component_scaled(
1394 claims.as_ref(),
1395 annotations,
1396 host_id,
1397 max,
1398 &component_ref,
1399 &component_id,
1400 )
1401 }
1402 Err(e) => {
1403 error!(%component_ref, %component_id, err = ?e, "failed to scale component");
1404 if let Err(e) = self
1405 .event_publisher
1406 .publish_event(
1407 "component_scale_failed",
1408 crate::event::component_scale_failed(
1409 claims_token.map(|c| c.claims.clone()).as_ref(),
1410 annotations,
1411 host_id,
1412 &component_ref,
1413 &component_id,
1414 max_instances,
1415 e,
1416 ),
1417 )
1418 .await
1419 {
1420 error!(%component_ref, %component_id, err = ?e, "failed to publish component scale failed event");
1421 }
1422 return Ok(());
1423 }
1424 }
1425 }
1426 (hash_map::Entry::Occupied(entry), None) => {
1428 let component = entry.remove();
1429 self.stop_component(&component, host_id)
1430 .await
1431 .context("failed to stop component in response to scale to zero")?;
1432
1433 info!(?component_ref, "component stopped");
1434 crate::event::component_scaled(
1435 claims.as_ref(),
1436 &component.annotations,
1437 host_id,
1438 0_usize,
1439 &component.image_reference,
1440 &component.id,
1441 )
1442 }
1443 (hash_map::Entry::Occupied(mut entry), Some(max)) => {
1445 let component = entry.get_mut();
1446 let config_changed =
1447 &config != component.handler.config_data.read().await.config_names();
1448
1449 let event = crate::event::component_scaled(
1452 claims.as_ref(),
1453 &component.annotations,
1454 host_id,
1455 max,
1456 &component.image_reference,
1457 &component.id,
1458 );
1459
1460 if component.max_instances != max || config_changed {
1462 let handler = component.handler.copy_for_new();
1464 if config_changed {
1465 let (config, secrets) = self
1466 .fetch_config_and_secrets(
1467 &config,
1468 claims_token.as_ref().map(|c| &c.jwt),
1469 annotations.get("wasmcloud.dev/appspec"),
1470 )
1471 .await?;
1472 *handler.config_data.write().await = config;
1473 *handler.secrets.write().await = secrets;
1474 }
1475 let instance = self
1476 .instantiate_component(
1477 annotations,
1478 Arc::clone(&component_ref),
1479 Arc::clone(&component.id),
1480 max,
1481 limits,
1482 component.component.clone(),
1483 handler,
1484 )
1485 .await
1486 .context("failed to instantiate component")?;
1487 let component = entry.insert(instance);
1488 self.stop_component(&component, host_id)
1489 .await
1490 .context("failed to stop component after scaling")?;
1491
1492 info!(?component_ref, ?max, "component scaled");
1493 } else {
1494 debug!(?component_ref, ?max, "component already at desired scale");
1495 }
1496 event
1497 }
1498 };
1499
1500 self.event_publisher
1501 .publish_event("component_scaled", scaled_event)
1502 .await?;
1503
1504 Ok(())
1505 }
1506
1507 #[instrument(level = "debug", skip_all)]
1511 pub(crate) async fn handle_update_component(
1512 self: Arc<Self>,
1513 payload: impl AsRef<[u8]>,
1514 ) -> anyhow::Result<CtlResponse<()>> {
1515 let cmd = serde_json::from_slice::<UpdateComponentCommand>(payload.as_ref())
1516 .context("failed to deserialize component update command")?;
1517 <Self as ControlInterfaceServer>::handle_update_component(self, cmd).await
1518 }
1519
1520 async fn handle_update_component_task(
1521 &self,
1522 component_id: Arc<str>,
1523 new_component_ref: Arc<str>,
1524 host_id: &str,
1525 annotations: Option<BTreeMap<String, String>>,
1526 ) -> anyhow::Result<()> {
1527 let component = {
1530 let components = self.components.read().await;
1531 let existing_component = components
1532 .get(&*component_id)
1533 .context("component not found")?;
1534 let annotations = annotations.unwrap_or_default().into_iter().collect();
1535
1536 if existing_component.image_reference == new_component_ref {
1538 info!(%component_id, %new_component_ref, "component already updated");
1539 return Ok(());
1540 }
1541
1542 let new_component = self.fetch_component(&new_component_ref).await?;
1543 let new_component = wasmcloud_runtime::Component::new(
1544 &self.runtime,
1545 &new_component,
1546 existing_component.limits,
1547 )
1548 .context("failed to initialize component")?;
1549 let new_claims = new_component.claims().cloned();
1550 if let Some(ref claims) = new_claims {
1551 self.store_claims(Claims::Component(claims.clone()))
1552 .await
1553 .context("failed to store claims")?;
1554 }
1555
1556 let max = existing_component.max_instances;
1557 let Ok(component) = self
1558 .instantiate_component(
1559 &annotations,
1560 Arc::clone(&new_component_ref),
1561 Arc::clone(&component_id),
1562 max,
1563 existing_component.limits,
1564 new_component,
1565 existing_component.handler.copy_for_new(),
1566 )
1567 .await
1568 else {
1569 bail!("failed to instantiate component from new reference");
1570 };
1571
1572 info!(%new_component_ref, "component updated");
1573 self.event_publisher
1574 .publish_event(
1575 "component_scaled",
1576 crate::event::component_scaled(
1577 new_claims.as_ref(),
1578 &component.annotations,
1579 host_id,
1580 max,
1581 new_component_ref,
1582 &component_id,
1583 ),
1584 )
1585 .await?;
1586
1587 self.stop_component(&component, host_id)
1589 .await
1590 .context("failed to stop old component")?;
1591 self.event_publisher
1592 .publish_event(
1593 "component_scaled",
1594 crate::event::component_scaled(
1595 component.claims(),
1596 &component.annotations,
1597 host_id,
1598 0_usize,
1599 &component.image_reference,
1600 &component.id,
1601 ),
1602 )
1603 .await?;
1604
1605 component
1606 };
1607
1608 self.components
1609 .write()
1610 .await
1611 .insert(component_id.to_string(), component);
1612 Ok(())
1613 }
1614
1615 #[instrument(level = "debug", skip_all)]
1616 pub(crate) async fn handle_start_provider(
1617 self: Arc<Self>,
1618 payload: impl AsRef<[u8]>,
1619 ) -> anyhow::Result<Option<CtlResponse<()>>> {
1620 let cmd = serde_json::from_slice::<StartProviderCommand>(payload.as_ref())
1621 .context("failed to deserialize provider start command")?;
1622 <Self as ControlInterfaceServer>::handle_start_provider(self, cmd).await
1623 }
1624
1625 #[instrument(level = "debug", skip_all)]
1626 async fn handle_start_provider_task(
1627 self: Arc<Self>,
1628 config_names: &[String],
1629 provider_id: &str,
1630 provider_ref: &str,
1631 annotations: BTreeMap<String, String>,
1632 host_id: &str,
1633 ) -> anyhow::Result<()> {
1634 trace!(provider_ref, provider_id, "start provider task");
1635
1636 let registry_config = self.registry_config.read().await;
1637 let provider_ref =
1638 ResourceRef::try_from(provider_ref).context("failed to parse provider reference")?;
1639 let (path, claims_token) = match &provider_ref {
1640 ResourceRef::Builtin(..) => (None, None),
1641 _ => {
1642 let (path, claims_token) = crate::fetch_provider(
1643 &provider_ref,
1644 host_id,
1645 self.host_config.allow_file_load,
1646 &self.host_config.oci_opts,
1647 ®istry_config,
1648 )
1649 .await
1650 .context("failed to fetch provider")?;
1651 (Some(path), claims_token)
1652 }
1653 };
1654 let claims = claims_token.as_ref().map(|t| t.claims.clone());
1655
1656 if let Some(claims) = claims.clone() {
1657 self.store_claims(Claims::Provider(claims))
1658 .await
1659 .context("failed to store claims")?;
1660 }
1661
1662 let annotations: Annotations = annotations.into_iter().collect();
1663
1664 let PolicyResponse {
1665 permitted,
1666 request_id,
1667 message,
1668 } = self
1669 .policy_manager
1670 .evaluate_start_provider(
1671 provider_id,
1672 provider_ref.as_ref(),
1673 &annotations,
1674 claims.as_ref(),
1675 )
1676 .await?;
1677 ensure!(
1678 permitted,
1679 "policy denied request to start provider `{request_id}`: `{message:?}`",
1680 );
1681
1682 let component_specification = self
1683 .get_component_spec(provider_id)
1684 .await?
1685 .unwrap_or_else(|| ComponentSpecification::new(provider_ref.as_ref()));
1686
1687 self.store_component_spec(&provider_id, &component_specification)
1688 .await?;
1689 self.update_host_with_spec(&provider_id, &component_specification)
1690 .await?;
1691
1692 let mut providers = self.providers.write().await;
1693 if let hash_map::Entry::Vacant(entry) = providers.entry(provider_id.into()) {
1694 let provider_xkey = XKey::new();
1695 let xkey = XKey::from_public_key(&provider_xkey.public_key())
1697 .context("failed to create XKey from provider public key xkey")?;
1698 let (host_data, config_bundle) = self
1700 .prepare_provider_config(
1701 config_names,
1702 claims_token.as_ref(),
1703 provider_id,
1704 &provider_xkey,
1705 &annotations,
1706 )
1707 .await?;
1708 let config_bundle = Arc::new(RwLock::new(config_bundle));
1709 let shutdown = Arc::new(AtomicBool::new(false));
1712 let tasks = match (path, &provider_ref) {
1713 (Some(path), ..) => {
1714 Arc::clone(&self)
1715 .start_binary_provider(
1716 path,
1717 host_data,
1718 Arc::clone(&config_bundle),
1719 provider_xkey,
1720 provider_id,
1721 config_names.to_vec(),
1723 claims_token.clone(),
1724 annotations.clone(),
1725 shutdown.clone(),
1726 )
1727 .await?
1728 }
1729 (None, ResourceRef::Builtin(name)) => match *name {
1730 "http-client" if self.experimental_features.builtin_http_client => {
1731 self.start_http_client_provider(host_data, provider_xkey, provider_id)
1732 .await?
1733 }
1734 "http-client" => {
1735 bail!("feature `builtin-http-client` is not enabled, denying start")
1736 }
1737 "http-server" if self.experimental_features.builtin_http_server => {
1738 self.start_http_server_provider(host_data, provider_xkey, provider_id)
1739 .await?
1740 }
1741 "http-server" => {
1742 bail!("feature `builtin-http-server` is not enabled, denying start")
1743 }
1744 "messaging-nats" if self.experimental_features.builtin_messaging_nats => {
1745 self.start_messaging_nats_provider(host_data, provider_xkey, provider_id)
1746 .await?
1747 }
1748 "messaging-nats" => {
1749 bail!("feature `builtin-messaging-nats` is not enabled, denying start")
1750 }
1751 _ => bail!("unknown builtin name: {name}"),
1752 },
1753 _ => bail!("invalid provider reference"),
1754 };
1755
1756 info!(
1757 provider_ref = provider_ref.as_ref(),
1758 provider_id, "provider started"
1759 );
1760 self.event_publisher
1761 .publish_event(
1762 "provider_started",
1763 crate::event::provider_started(
1764 claims.as_ref(),
1765 &annotations,
1766 host_id,
1767 &provider_ref,
1768 provider_id,
1769 ),
1770 )
1771 .await?;
1772
1773 entry.insert(Provider {
1775 tasks,
1776 annotations,
1777 claims_token,
1778 image_ref: provider_ref.as_ref().to_string(),
1779 xkey,
1780 shutdown,
1781 });
1782 } else {
1783 bail!("provider is already running with that ID")
1784 }
1785
1786 Ok(())
1787 }
1788
1789 #[instrument(level = "debug", skip_all)]
1790 pub(crate) async fn handle_stop_provider(
1791 &self,
1792 payload: impl AsRef<[u8]>,
1793 ) -> anyhow::Result<CtlResponse<()>> {
1794 let cmd = serde_json::from_slice::<StopProviderCommand>(payload.as_ref())
1795 .context("failed to deserialize provider stop command")?;
1796 <Self as ControlInterfaceServer>::handle_stop_provider(self, cmd).await
1797 }
1798
1799 #[instrument(level = "debug", skip_all)]
1800 pub(crate) async fn handle_inventory(&self) -> anyhow::Result<CtlResponse<HostInventory>> {
1801 <Self as ControlInterfaceServer>::handle_inventory(self).await
1802 }
1803
1804 #[instrument(level = "trace", skip_all)]
1805 pub(crate) async fn handle_claims(
1806 &self,
1807 ) -> anyhow::Result<CtlResponse<Vec<HashMap<String, String>>>> {
1808 <Self as ControlInterfaceServer>::handle_claims(self).await
1809 }
1810
1811 #[instrument(level = "trace", skip_all)]
1812 pub(crate) async fn handle_links(&self) -> anyhow::Result<Vec<u8>> {
1813 <Self as ControlInterfaceServer>::handle_links(self).await
1814 }
1815
1816 #[instrument(level = "trace", skip(self))]
1817 pub(crate) async fn handle_config_get(&self, config_name: &str) -> anyhow::Result<Vec<u8>> {
1818 <Self as ControlInterfaceServer>::handle_config_get(self, config_name).await
1819 }
1820
1821 #[instrument(level = "debug", skip_all)]
1822 pub(crate) async fn handle_label_put(
1823 &self,
1824 host_id: &str,
1825 payload: impl AsRef<[u8]>,
1826 ) -> anyhow::Result<CtlResponse<()>> {
1827 let host_label = serde_json::from_slice::<HostLabel>(payload.as_ref())
1828 .context("failed to deserialize put label request")?;
1829 <Self as ControlInterfaceServer>::handle_label_put(self, host_label, host_id).await
1830 }
1831
1832 #[instrument(level = "debug", skip_all)]
1833 pub(crate) async fn handle_label_del(
1834 &self,
1835 host_id: &str,
1836 payload: impl AsRef<[u8]>,
1837 ) -> anyhow::Result<CtlResponse<()>> {
1838 let label = serde_json::from_slice::<HostLabelIdentifier>(payload.as_ref())
1839 .context("failed to deserialize delete label request")?;
1840 <Self as ControlInterfaceServer>::handle_label_del(self, label, host_id).await
1841 }
1842
1843 #[instrument(level = "debug", skip_all)]
1847 pub(crate) async fn handle_link_put(
1848 &self,
1849 payload: impl AsRef<[u8]>,
1850 ) -> anyhow::Result<CtlResponse<()>> {
1851 let link: Link = serde_json::from_slice(payload.as_ref())
1852 .context("failed to deserialize wrpc link definition")?;
1853 <Self as ControlInterfaceServer>::handle_link_put(self, link).await
1854 }
1855
1856 #[instrument(level = "debug", skip_all)]
1857 pub(crate) async fn handle_link_del(
1859 &self,
1860 payload: impl AsRef<[u8]>,
1861 ) -> anyhow::Result<CtlResponse<()>> {
1862 let req = serde_json::from_slice::<DeleteInterfaceLinkDefinitionRequest>(payload.as_ref())
1863 .context("failed to deserialize wrpc link definition")?;
1864 <Self as ControlInterfaceServer>::handle_link_del(self, req).await
1865 }
1866
1867 #[instrument(level = "debug", skip_all)]
1868 pub(crate) async fn handle_registries_put(
1869 &self,
1870 payload: impl AsRef<[u8]>,
1871 ) -> anyhow::Result<CtlResponse<()>> {
1872 let registry_creds: HashMap<String, RegistryCredential> =
1873 serde_json::from_slice(payload.as_ref())
1874 .context("failed to deserialize registries put command")?;
1875 <Self as ControlInterfaceServer>::handle_registries_put(self, registry_creds).await
1876 }
1877
1878 #[instrument(level = "debug", skip_all, fields(%config_name))]
1879 pub(crate) async fn handle_config_put(
1880 &self,
1881 config_name: &str,
1882 data: Bytes,
1883 ) -> anyhow::Result<CtlResponse<()>> {
1884 serde_json::from_slice::<HashMap<String, String>>(&data)
1886 .context("config data should be a map of string -> string")?;
1887 <Self as ControlInterfaceServer>::handle_config_put(self, config_name, data).await
1888 }
1889
1890 #[instrument(level = "debug", skip_all, fields(%config_name))]
1891 pub(crate) async fn handle_config_delete(
1892 &self,
1893 config_name: &str,
1894 ) -> anyhow::Result<CtlResponse<()>> {
1895 <Self as ControlInterfaceServer>::handle_config_delete(self, config_name).await
1896 }
1897
1898 #[instrument(level = "debug", skip_all)]
1899 pub(crate) async fn handle_ping_hosts(
1900 &self,
1901 ) -> anyhow::Result<CtlResponse<wasmcloud_control_interface::Host>> {
1902 <Self as ControlInterfaceServer>::handle_ping_hosts(self).await
1903 }
1904
1905 async fn put_backwards_compat_provider_link(&self, link: &Link) -> anyhow::Result<()> {
1913 let source_config_contains_secret = link
1916 .source_config()
1917 .iter()
1918 .any(|c| c.starts_with(SECRET_PREFIX));
1919 let target_config_contains_secret = link
1920 .target_config()
1921 .iter()
1922 .any(|c| c.starts_with(SECRET_PREFIX));
1923 if source_config_contains_secret || target_config_contains_secret {
1924 debug!("link contains secrets and is not backwards compatible, skipping");
1925 return Ok(());
1926 }
1927 let provider_link = self
1928 .resolve_link_config(link.clone(), None, None, &XKey::new())
1929 .await
1930 .context("failed to resolve link config")?;
1931
1932 if let Err(e) = self
1933 .provider_manager
1934 .put_link(&provider_link, link.source_id())
1935 .await
1936 {
1937 warn!(
1938 ?e,
1939 "failed to publish backwards-compatible provider link to source"
1940 );
1941 }
1942
1943 if let Err(e) = self
1944 .provider_manager
1945 .put_link(&provider_link, link.target())
1946 .await
1947 {
1948 warn!(
1949 ?e,
1950 "failed to publish backwards-compatible provider link to target"
1951 );
1952 }
1953
1954 Ok(())
1955 }
1956
1957 #[instrument(level = "debug", skip_all)]
1959 async fn put_provider_link(&self, provider: &Provider, link: &Link) -> anyhow::Result<()> {
1960 let provider_link = self
1961 .resolve_link_config(
1962 link.clone(),
1963 provider.claims_token.as_ref().map(|t| &t.jwt),
1964 provider.annotations.get("wasmcloud.dev/appspec"),
1965 &provider.xkey,
1966 )
1967 .await
1968 .context("failed to resolve link config and secrets")?;
1969
1970 self.provider_manager
1971 .put_link(&provider_link, &provider.xkey.public_key())
1972 .await
1973 .context("failed to publish provider link definition put")
1974 }
1975
1976 #[instrument(level = "debug", skip(self))]
1981 async fn del_provider_link(&self, link: &Link) -> anyhow::Result<()> {
1982 let link = wasmcloud_core::InterfaceLinkDefinition {
1984 source_id: link.source_id().to_string(),
1985 target: link.target().to_string(),
1986 wit_namespace: link.wit_namespace().to_string(),
1987 wit_package: link.wit_package().to_string(),
1988 name: link.name().to_string(),
1989 interfaces: link.interfaces().clone(),
1990 ..Default::default()
1992 };
1993 let source_id = &link.source_id;
1994 let target = &link.target;
1995
1996 let (source_result, target_result) = futures::future::join(
1997 self.provider_manager.delete_link(&link, source_id),
1998 self.provider_manager.delete_link(&link, target),
1999 )
2000 .await;
2001
2002 source_result
2003 .and(target_result)
2004 .context("failed to publish provider link definition delete")
2005 }
2006
2007 async fn fetch_config_and_secrets(
2008 &self,
2009 config_names: &[String],
2010 entity_jwt: Option<&String>,
2011 application: Option<&String>,
2012 ) -> anyhow::Result<(ConfigBundle, HashMap<String, SecretBox<SecretValue>>)> {
2013 let (secret_names, config_names) = config_names
2014 .iter()
2015 .map(|s| s.to_string())
2016 .partition(|name| name.starts_with(SECRET_PREFIX));
2017
2018 let config = self
2019 .config_generator
2020 .generate(config_names)
2021 .await
2022 .context("Unable to fetch requested config")?;
2023
2024 let secrets = self
2025 .secrets_manager
2026 .fetch_secrets(secret_names, entity_jwt, &self.host_token.jwt, application)
2027 .await
2028 .context("Unable to fetch requested secrets")?;
2029
2030 Ok((config, secrets))
2031 }
2032
2033 async fn validate_config<I>(&self, config_names: I) -> anyhow::Result<()>
2038 where
2039 I: IntoIterator<Item: AsRef<str>>,
2040 {
2041 let config_store = self.config_store.clone();
2042 let validation_errors =
2043 futures::future::join_all(config_names.into_iter().map(|config_name| {
2044 let config_store = config_store.clone();
2045 let config_name = config_name.as_ref().to_string();
2046 async move {
2047 match config_store.get(&config_name).await {
2048 Ok(Some(_)) => None,
2049 Ok(None) if config_name.starts_with(SECRET_PREFIX) => Some(format!(
2050 "Secret reference {config_name} not found in config store"
2051 )),
2052 Ok(None) => Some(format!(
2053 "Configuration {config_name} not found in config store"
2054 )),
2055 Err(e) => Some(e.to_string()),
2056 }
2057 }
2058 }))
2059 .await;
2060
2061 let validation_errors = validation_errors
2064 .into_iter()
2065 .flatten()
2066 .fold(String::new(), |acc, e| acc + &e + ". ");
2067 if !validation_errors.is_empty() {
2068 bail!(format!(
2069 "Failed to validate configuration and secrets. {validation_errors}",
2070 ));
2071 }
2072
2073 Ok(())
2074 }
2075
2076 async fn resolve_link_config(
2079 &self,
2080 link: Link,
2081 provider_jwt: Option<&String>,
2082 application: Option<&String>,
2083 provider_xkey: &XKey,
2084 ) -> anyhow::Result<wasmcloud_core::InterfaceLinkDefinition> {
2085 let (source_bundle, raw_source_secrets) = self
2086 .fetch_config_and_secrets(link.source_config().as_slice(), provider_jwt, application)
2087 .await?;
2088 let (target_bundle, raw_target_secrets) = self
2089 .fetch_config_and_secrets(link.target_config().as_slice(), provider_jwt, application)
2090 .await?;
2091
2092 let source_config = source_bundle.get_config().await;
2093 let target_config = target_bundle.get_config().await;
2094 use secrecy::ExposeSecret;
2097 let source_secrets_map: HashMap<String, wasmcloud_core::secrets::SecretValue> =
2098 raw_source_secrets
2099 .iter()
2100 .map(|(k, v)| match v.expose_secret() {
2101 SecretValue::String(s) => (
2102 k.clone(),
2103 wasmcloud_core::secrets::SecretValue::String(s.to_owned()),
2104 ),
2105 SecretValue::Bytes(b) => (
2106 k.clone(),
2107 wasmcloud_core::secrets::SecretValue::Bytes(b.to_owned()),
2108 ),
2109 })
2110 .collect();
2111 let target_secrets_map: HashMap<String, wasmcloud_core::secrets::SecretValue> =
2112 raw_target_secrets
2113 .iter()
2114 .map(|(k, v)| match v.expose_secret() {
2115 SecretValue::String(s) => (
2116 k.clone(),
2117 wasmcloud_core::secrets::SecretValue::String(s.to_owned()),
2118 ),
2119 SecretValue::Bytes(b) => (
2120 k.clone(),
2121 wasmcloud_core::secrets::SecretValue::Bytes(b.to_owned()),
2122 ),
2123 })
2124 .collect();
2125 let source_secrets = if source_secrets_map.is_empty() {
2129 None
2130 } else {
2131 Some(
2132 serde_json::to_vec(&source_secrets_map)
2133 .map(|secrets| self.secrets_xkey.seal(&secrets, provider_xkey))
2134 .context("failed to serialize and encrypt source secrets")??,
2135 )
2136 };
2137 let target_secrets = if target_secrets_map.is_empty() {
2138 None
2139 } else {
2140 Some(
2141 serde_json::to_vec(&target_secrets_map)
2142 .map(|secrets| self.secrets_xkey.seal(&secrets, provider_xkey))
2143 .context("failed to serialize and encrypt target secrets")??,
2144 )
2145 };
2146
2147 Ok(wasmcloud_core::InterfaceLinkDefinition {
2148 source_id: link.source_id().to_string(),
2149 target: link.target().to_string(),
2150 name: link.name().to_string(),
2151 wit_namespace: link.wit_namespace().to_string(),
2152 wit_package: link.wit_package().to_string(),
2153 interfaces: link.interfaces().clone(),
2154 source_config: source_config.clone(),
2155 target_config: target_config.clone(),
2156 source_secrets,
2157 target_secrets,
2158 })
2159 }
2160}
2161
2162fn component_import_links(links: &[Link]) -> HashMap<Box<str>, HashMap<Box<str>, Box<str>>> {
2171 let mut m = HashMap::new();
2172 for link in links {
2173 let instances: &mut HashMap<Box<str>, Box<str>> = m
2174 .entry(link.name().to_string().into_boxed_str())
2175 .or_default();
2176 for interface in link.interfaces() {
2177 instances.insert(
2178 format!(
2179 "{}:{}/{interface}",
2180 link.wit_namespace(),
2181 link.wit_package(),
2182 )
2183 .into_boxed_str(),
2184 link.target().to_string().into_boxed_str(),
2185 );
2186 }
2187 }
2188 m
2189}
2190
2191fn human_friendly_uptime(uptime: Duration) -> String {
2192 humantime::format_duration(
2194 uptime.saturating_sub(Duration::from_nanos(uptime.subsec_nanos().into())),
2195 )
2196 .to_string()
2197}
2198
2199pub fn injector_to_headers(injector: &TraceContextInjector) -> async_nats::header::HeaderMap {
2201 injector
2202 .iter()
2203 .filter_map(|(k, v)| {
2204 let name = async_nats::header::HeaderName::from_str(k.as_str()).ok()?;
2206 let value = async_nats::header::HeaderValue::from_str(v.as_str()).ok()?;
2207 Some((name, value))
2208 })
2209 .collect()
2210}
2211
2212#[cfg(test)]
2213mod test {
2214 #[test]
2216 fn can_compute_component_links() {
2217 use std::collections::HashMap;
2218 use wasmcloud_control_interface::Link;
2219
2220 let links = vec![
2221 Link::builder()
2222 .source_id("source_component")
2223 .target("kv-redis")
2224 .wit_namespace("wasi")
2225 .wit_package("keyvalue")
2226 .interfaces(vec!["atomics".into(), "store".into()])
2227 .name("default")
2228 .build()
2229 .expect("failed to build link"),
2230 Link::builder()
2231 .source_id("source_component")
2232 .target("kv-vault")
2233 .wit_namespace("wasi")
2234 .wit_package("keyvalue")
2235 .interfaces(vec!["atomics".into(), "store".into()])
2236 .name("secret")
2237 .source_config(vec![])
2238 .target_config(vec!["my-secret".into()])
2239 .build()
2240 .expect("failed to build link"),
2241 Link::builder()
2242 .source_id("source_component")
2243 .target("kv-vault-offsite")
2244 .wit_namespace("wasi")
2245 .wit_package("keyvalue")
2246 .interfaces(vec!["atomics".into()])
2247 .name("secret")
2248 .source_config(vec![])
2249 .target_config(vec!["my-secret".into()])
2250 .build()
2251 .expect("failed to build link"),
2252 Link::builder()
2253 .source_id("http")
2254 .target("source_component")
2255 .wit_namespace("wasi")
2256 .wit_package("http")
2257 .interfaces(vec!["incoming-handler".into()])
2258 .name("default")
2259 .source_config(vec!["some-port".into()])
2260 .target_config(vec![])
2261 .build()
2262 .expect("failed to build link"),
2263 Link::builder()
2264 .source_id("source_component")
2265 .target("httpclient")
2266 .wit_namespace("wasi")
2267 .wit_package("http")
2268 .interfaces(vec!["outgoing-handler".into()])
2269 .name("default")
2270 .source_config(vec![])
2271 .target_config(vec!["some-port".into()])
2272 .build()
2273 .expect("failed to build link"),
2274 Link::builder()
2275 .source_id("source_component")
2276 .target("other_component")
2277 .wit_namespace("custom")
2278 .wit_package("foo")
2279 .interfaces(vec!["bar".into(), "baz".into()])
2280 .name("default")
2281 .source_config(vec![])
2282 .target_config(vec![])
2283 .build()
2284 .expect("failed to build link"),
2285 Link::builder()
2286 .source_id("other_component")
2287 .target("target")
2288 .wit_namespace("wit")
2289 .wit_package("package")
2290 .interfaces(vec!["interface3".into()])
2291 .name("link2")
2292 .source_config(vec![])
2293 .target_config(vec![])
2294 .build()
2295 .expect("failed to build link"),
2296 ];
2297
2298 let links_map = super::component_import_links(&links);
2299
2300 let expected_result = HashMap::from_iter([
2328 (
2329 "default".into(),
2330 HashMap::from([
2331 ("wasi:keyvalue/atomics".into(), "kv-redis".into()),
2332 ("wasi:keyvalue/store".into(), "kv-redis".into()),
2333 (
2334 "wasi:http/incoming-handler".into(),
2335 "source_component".into(),
2336 ),
2337 ("wasi:http/outgoing-handler".into(), "httpclient".into()),
2338 ("custom:foo/bar".into(), "other_component".into()),
2339 ("custom:foo/baz".into(), "other_component".into()),
2340 ]),
2341 ),
2342 (
2343 "secret".into(),
2344 HashMap::from([
2345 ("wasi:keyvalue/atomics".into(), "kv-vault-offsite".into()),
2346 ("wasi:keyvalue/store".into(), "kv-vault".into()),
2347 ]),
2348 ),
2349 (
2350 "link2".into(),
2351 HashMap::from([("wit:package/interface3".into(), "target".into())]),
2352 ),
2353 ]);
2354
2355 assert_eq!(links_map, expected_result);
2356 }
2357}