1#![allow(clippy::type_complexity)]
2
3use core::sync::atomic::Ordering;
4
5use std::collections::hash_map::Entry;
6use std::collections::{hash_map, BTreeMap, HashMap};
7use std::env::consts::{ARCH, FAMILY, OS};
8use std::future::Future;
9use std::num::NonZeroUsize;
10use std::ops::Deref;
11use std::pin::Pin;
12use std::str::FromStr;
13use std::sync::atomic::AtomicBool;
14use std::sync::Arc;
15use std::task::{Context, Poll};
16use std::time::Duration;
17
18use anyhow::{anyhow, bail, ensure, Context as _};
19use async_nats::jetstream::kv::Store;
20use bytes::{BufMut, Bytes, BytesMut};
21use claims::{Claims, StoredClaims};
22use cloudevents::{EventBuilder, EventBuilderV10};
23use ctl::ControlInterfaceServer;
24use futures::future::Either;
25use futures::stream::{AbortHandle, Abortable, SelectAll};
26use futures::{join, stream, try_join, Stream, StreamExt, TryFutureExt, TryStreamExt};
27use hyper_util::rt::{TokioExecutor, TokioIo};
28use nkeys::{KeyPair, KeyPairType, XKey};
29use providers::Provider;
30use secrecy::SecretBox;
31use serde::{Deserialize, Serialize};
32use serde_json::json;
33use sysinfo::System;
34use tokio::io::AsyncWrite;
35use tokio::net::TcpListener;
36use tokio::spawn;
37use tokio::sync::{mpsc, watch, RwLock, Semaphore};
38use tokio::task::{JoinHandle, JoinSet};
39use tokio::time::{interval_at, timeout, Instant};
40use tokio_stream::wrappers::IntervalStream;
41use tracing::{debug, debug_span, error, info, instrument, trace, warn, Instrument as _};
42use tracing_opentelemetry::OpenTelemetrySpanExt;
43use wascap::jwt;
44use wasmcloud_control_interface::{
45 ComponentAuctionAck, ComponentAuctionRequest, ComponentDescription, CtlResponse,
46 DeleteInterfaceLinkDefinitionRequest, HostInventory, HostLabel, HostLabelIdentifier, Link,
47 ProviderAuctionAck, ProviderAuctionRequest, ProviderDescription, RegistryCredential,
48 ScaleComponentCommand, StartProviderCommand, StopHostCommand, StopProviderCommand,
49 UpdateComponentCommand,
50};
51use wasmcloud_core::{ComponentId, CTL_API_VERSION_1};
52use wasmcloud_runtime::capability::secrets::store::SecretValue;
53use wasmcloud_runtime::component::{from_string_map, Limits, WrpcServeEvent};
54use wasmcloud_runtime::Runtime;
55use wasmcloud_secrets_types::SECRET_PREFIX;
56use wasmcloud_tracing::context::TraceContextInjector;
57use wasmcloud_tracing::{global, InstrumentationScope, KeyValue};
58
59use crate::registry::RegistryCredentialExt;
60use crate::wasmbus::jetstream::create_bucket;
61use crate::workload_identity::WorkloadIdentityConfig;
62use crate::{
63 fetch_component, HostMetrics, OciConfig, PolicyHostInfo, PolicyManager, PolicyResponse,
64 RegistryAuth, RegistryConfig, RegistryType, ResourceRef, SecretsManager,
65};
66
67mod claims;
68mod ctl;
69mod event;
70mod experimental;
71mod handler;
72mod jetstream;
73mod providers;
74
75pub mod config;
76pub mod host_config;
78
79pub use self::experimental::Features;
80pub use self::host_config::Host as HostConfig;
81pub use jetstream::ComponentSpecification;
82
83use self::config::{BundleGenerator, ConfigBundle};
84use self::handler::Handler;
85
86const MAX_INVOCATION_CHANNEL_SIZE: usize = 5000;
87const MIN_INVOCATION_CHANNEL_SIZE: usize = 256;
88
89#[derive(Debug)]
90struct Queue {
91 all_streams: SelectAll<async_nats::Subscriber>,
92}
93
94impl Stream for Queue {
95 type Item = async_nats::Message;
96
97 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
98 self.all_streams.poll_next_unpin(cx)
99 }
100}
101
102#[derive(Clone, Default)]
103struct AsyncBytesMut(Arc<std::sync::Mutex<BytesMut>>);
104
105impl AsyncWrite for AsyncBytesMut {
106 fn poll_write(
107 self: Pin<&mut Self>,
108 _cx: &mut Context<'_>,
109 buf: &[u8],
110 ) -> Poll<Result<usize, std::io::Error>> {
111 Poll::Ready({
112 self.0
113 .lock()
114 .map_err(|e| std::io::Error::other(e.to_string()))?
115 .put_slice(buf);
116 Ok(buf.len())
117 })
118 }
119
120 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
121 Poll::Ready(Ok(()))
122 }
123
124 fn poll_shutdown(
125 self: Pin<&mut Self>,
126 _cx: &mut Context<'_>,
127 ) -> Poll<Result<(), std::io::Error>> {
128 Poll::Ready(Ok(()))
129 }
130}
131
132impl TryFrom<AsyncBytesMut> for Vec<u8> {
133 type Error = anyhow::Error;
134
135 fn try_from(buf: AsyncBytesMut) -> Result<Self, Self::Error> {
136 buf.0
137 .lock()
138 .map(|buf| buf.clone().into())
139 .map_err(|e| anyhow!(e.to_string()).context("failed to lock"))
140 }
141}
142
143impl Queue {
144 #[instrument]
145 async fn new(
146 nats: &async_nats::Client,
147 topic_prefix: &str,
148 lattice: &str,
149 host_key: &KeyPair,
150 component_auction: bool,
151 provider_auction: bool,
152 ) -> anyhow::Result<Self> {
153 let host_id = host_key.public_key();
154 let mut subs = vec![
155 Either::Left(nats.subscribe(format!(
156 "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.registry.put",
157 ))),
158 Either::Left(nats.subscribe(format!(
159 "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.host.ping",
160 ))),
161 Either::Right(nats.queue_subscribe(
162 format!("{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.link.*"),
163 format!("{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.link",),
164 )),
165 Either::Right(nats.queue_subscribe(
166 format!("{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.claims.get"),
167 format!("{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.claims"),
168 )),
169 Either::Left(nats.subscribe(format!(
170 "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.component.*.{host_id}"
171 ))),
172 Either::Left(nats.subscribe(format!(
173 "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.provider.*.{host_id}"
174 ))),
175 Either::Left(nats.subscribe(format!(
176 "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.label.*.{host_id}"
177 ))),
178 Either::Left(nats.subscribe(format!(
179 "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.host.*.{host_id}"
180 ))),
181 Either::Right(nats.queue_subscribe(
182 format!("{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.config.>"),
183 format!("{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.config"),
184 )),
185 ];
186 if component_auction {
187 subs.push(Either::Left(nats.subscribe(format!(
188 "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.component.auction",
189 ))));
190 }
191 if provider_auction {
192 subs.push(Either::Left(nats.subscribe(format!(
193 "{topic_prefix}.{CTL_API_VERSION_1}.{lattice}.provider.auction",
194 ))));
195 }
196 let streams = futures::future::join_all(subs)
197 .await
198 .into_iter()
199 .collect::<Result<Vec<_>, async_nats::SubscribeError>>()
200 .context("failed to subscribe to queues")?;
201 Ok(Self {
202 all_streams: futures::stream::select_all(streams),
203 })
204 }
205}
206
207type Annotations = BTreeMap<String, String>;
208
209#[derive(Debug)]
210struct Component {
211 component: wasmcloud_runtime::Component<Handler>,
212 id: Arc<str>,
214 handler: Handler,
215 exports: JoinHandle<()>,
216 annotations: Annotations,
217 max_instances: NonZeroUsize,
219 limits: Option<Limits>,
220 image_reference: Arc<str>,
221 events: mpsc::Sender<WrpcServeEvent<<WrpcServer as wrpc_transport::Serve>::Context>>,
222 permits: Arc<Semaphore>,
223}
224
225impl Deref for Component {
226 type Target = wasmcloud_runtime::Component<Handler>;
227
228 fn deref(&self) -> &Self::Target {
229 &self.component
230 }
231}
232
233#[derive(Clone)]
234struct WrpcServer {
235 nats: wrpc_transport_nats::Client,
236 claims: Option<Arc<jwt::Claims<jwt::Component>>>,
237 id: Arc<str>,
238 image_reference: Arc<str>,
239 annotations: Arc<Annotations>,
240 policy_manager: Arc<PolicyManager>,
241 metrics: Arc<HostMetrics>,
242}
243
244struct InvocationContext {
245 start_at: Instant,
246 attributes: Vec<KeyValue>,
247 span: tracing::Span,
248}
249
250impl Deref for InvocationContext {
251 type Target = tracing::Span;
252
253 fn deref(&self) -> &Self::Target {
254 &self.span
255 }
256}
257
258impl wrpc_transport::Serve for WrpcServer {
259 type Context = InvocationContext;
260 type Outgoing = <wrpc_transport_nats::Client as wrpc_transport::Serve>::Outgoing;
261 type Incoming = <wrpc_transport_nats::Client as wrpc_transport::Serve>::Incoming;
262
263 #[instrument(
264 level = "info",
265 skip(self, paths),
266 fields(
267 component_id = ?self.id,
268 component_ref = ?self.image_reference)
269 )]
270 async fn serve(
271 &self,
272 instance: &str,
273 func: &str,
274 paths: impl Into<Arc<[Box<[Option<usize>]>]>> + Send,
275 ) -> anyhow::Result<
276 impl Stream<Item = anyhow::Result<(Self::Context, Self::Outgoing, Self::Incoming)>>
277 + Send
278 + 'static,
279 > {
280 debug!("serving invocations");
281 let invocations = self.nats.serve(instance, func, paths).await?;
282
283 let func: Arc<str> = Arc::from(func);
284 let instance: Arc<str> = Arc::from(instance);
285 let annotations = Arc::clone(&self.annotations);
286 let id = Arc::clone(&self.id);
287 let image_reference = Arc::clone(&self.image_reference);
288 let metrics = Arc::clone(&self.metrics);
289 let policy_manager = Arc::clone(&self.policy_manager);
290 let claims = self.claims.clone();
291 Ok(invocations.and_then(move |(cx, tx, rx)| {
292 let annotations = Arc::clone(&annotations);
293 let claims = claims.clone();
294 let func = Arc::clone(&func);
295 let id = Arc::clone(&id);
296 let image_reference = Arc::clone(&image_reference);
297 let instance = Arc::clone(&instance);
298 let metrics = Arc::clone(&metrics);
299 let policy_manager = Arc::clone(&policy_manager);
300 let span = tracing::info_span!("component_invocation", func = %func, id = %id, instance = %instance);
301 async move {
302 if let Some(ref cx) = cx {
303 let trace_context = cx
306 .iter()
307 .flat_map(|(key, value)| {
308 value
309 .iter()
310 .map(|v| (key.to_string(), v.to_string()))
311 .collect::<Vec<_>>()
312 })
313 .collect::<Vec<(String, String)>>();
314 let _ = span.set_parent(wasmcloud_tracing::context::get_span_context(&trace_context));
315 }
316
317 let PolicyResponse {
318 request_id,
319 permitted,
320 message,
321 } = policy_manager
322 .evaluate_perform_invocation(
323 &id,
324 &image_reference,
325 &annotations,
326 claims.as_deref(),
327 instance.to_string(),
328 func.to_string(),
329 )
330 .instrument(debug_span!(parent: &span, "policy_check"))
331 .await?;
332 ensure!(
333 permitted,
334 "policy denied request to invoke component `{request_id}`: `{message:?}`",
335 );
336
337 Ok((
338 InvocationContext{
339 start_at: Instant::now(),
340 attributes: vec![
342 KeyValue::new("component.ref", image_reference),
343 KeyValue::new("lattice", metrics.lattice_id.clone()),
344 KeyValue::new("host", metrics.host_id.clone()),
345 KeyValue::new("operation", format!("{instance}/{func}")),
346 ],
347 span,
348 },
349 tx,
350 rx,
351 ))
352 }
353 }))
354 }
355}
356
357pub struct Host {
359 components: Arc<RwLock<HashMap<ComponentId, Arc<Component>>>>,
360 event_builder: EventBuilderV10,
361 friendly_name: String,
362 heartbeat: AbortHandle,
363 host_config: HostConfig,
364 host_key: Arc<KeyPair>,
365 host_token: Arc<jwt::Token<jwt::Host>>,
366 secrets_xkey: Arc<XKey>,
368 labels: Arc<RwLock<BTreeMap<String, String>>>,
369 ctl_topic_prefix: String,
370 ctl_nats: async_nats::Client,
372 rpc_nats: Arc<async_nats::Client>,
374 data: Store,
375 data_watch: AbortHandle,
377 config_data: Store,
378 config_generator: BundleGenerator,
379 policy_manager: Arc<PolicyManager>,
380 secrets_manager: Arc<SecretsManager>,
381 providers: RwLock<HashMap<String, Provider>>,
383 registry_config: RwLock<HashMap<String, RegistryConfig>>,
384 runtime: Runtime,
385 start_at: Instant,
386 stop_tx: watch::Sender<Option<Instant>>,
387 stop_rx: watch::Receiver<Option<Instant>>,
388 queue: AbortHandle,
389 links: RwLock<HashMap<String, Vec<Link>>>,
391 component_claims: Arc<RwLock<HashMap<ComponentId, jwt::Claims<jwt::Component>>>>, provider_claims: Arc<RwLock<HashMap<String, jwt::Claims<jwt::CapabilityProvider>>>>,
393 metrics: Arc<HostMetrics>,
394 max_execution_time: Duration,
395 messaging_links:
396 Arc<RwLock<HashMap<Arc<str>, Arc<RwLock<HashMap<Box<str>, async_nats::Client>>>>>>,
397 experimental_features: Features,
399 ready: Arc<AtomicBool>,
400 #[allow(unused)]
402 tasks: JoinSet<()>,
403}
404
405pub async fn connect_nats(
417 addr: impl async_nats::ToServerAddrs,
418 jwt: Option<&String>,
419 key: Option<Arc<KeyPair>>,
420 require_tls: bool,
421 request_timeout: Option<Duration>,
422 workload_identity_config: Option<WorkloadIdentityConfig>,
423) -> anyhow::Result<async_nats::Client> {
424 let opts = match (jwt, key, workload_identity_config) {
425 (Some(jwt), Some(key), None) => {
426 async_nats::ConnectOptions::with_jwt(jwt.to_string(), move |nonce| {
427 let key = key.clone();
428 async move { key.sign(&nonce).map_err(async_nats::AuthError::new) }
429 })
430 .name("wasmbus")
431 }
432 (Some(_), None, _) | (None, Some(_), _) => {
433 bail!("cannot authenticate if only one of jwt or seed is specified")
434 }
435 (jwt, key, Some(wid_cfg)) => {
436 setup_workload_identity_nats_connect_options(jwt, key, wid_cfg).await?
437 }
438 _ => async_nats::ConnectOptions::new().name("wasmbus"),
439 };
440 let opts = if let Some(timeout) = request_timeout {
441 opts.request_timeout(Some(timeout))
442 } else {
443 opts
444 };
445 let opts = opts.require_tls(require_tls);
446 opts.connect(addr)
447 .await
448 .context("failed to connect to NATS")
449}
450
451#[cfg(unix)]
452async fn setup_workload_identity_nats_connect_options(
453 jwt: Option<&String>,
454 key: Option<Arc<KeyPair>>,
455 wid_cfg: WorkloadIdentityConfig,
456) -> anyhow::Result<async_nats::ConnectOptions> {
457 let wid_cfg = Arc::new(wid_cfg);
458 let jwt = jwt.map(String::to_string).map(Arc::new);
459 let key = key.clone();
460
461 Ok(
466 async_nats::ConnectOptions::with_auth_callback(move |nonce| {
467 let key = key.clone();
468 let jwt = jwt.clone();
469 let wid_cfg = wid_cfg.clone();
470
471 let fetch_svid_handle = tokio::spawn(async move {
472 let mut client = spiffe::WorkloadApiClient::default()
473 .await
474 .map_err(async_nats::AuthError::new)?;
475 client
476 .fetch_jwt_svid(&[wid_cfg.auth_service_audience.as_str()], None)
477 .await
478 .map_err(async_nats::AuthError::new)
479 });
480
481 async move {
482 let svid = fetch_svid_handle
483 .await
484 .map_err(async_nats::AuthError::new)?
485 .map_err(async_nats::AuthError::new)?;
486
487 let mut auth = async_nats::Auth::new();
488 if let Some(key) = key {
489 let signature = key.sign(&nonce).map_err(async_nats::AuthError::new)?;
490 auth.signature = Some(signature);
491 }
492 if let Some(jwt) = jwt {
493 auth.jwt = Some(jwt.to_string());
494 }
495 auth.token = Some(svid.token().into());
496 Ok(auth)
497 }
498 })
499 .name("wasmbus"),
500 )
501}
502
503#[cfg(target_family = "windows")]
504async fn setup_workload_identity_nats_connect_options(
505 jwt: Option<&String>,
506 key: Option<Arc<KeyPair>>,
507 wid_cfg: WorkloadIdentityConfig,
508) -> anyhow::Result<async_nats::ConnectOptions> {
509 bail!("workload identity is not supported on Windows")
510}
511
512#[derive(Debug, Default)]
513struct SupplementalConfig {
514 registry_config: Option<HashMap<String, RegistryConfig>>,
515}
516
517#[instrument(level = "debug", skip_all)]
518async fn load_supplemental_config(
519 ctl_nats: &async_nats::Client,
520 lattice: &str,
521 labels: &BTreeMap<String, String>,
522) -> anyhow::Result<SupplementalConfig> {
523 #[derive(Deserialize, Default)]
524 struct SerializedSupplementalConfig {
525 #[serde(default, rename = "registryCredentials")]
526 registry_credentials: Option<HashMap<String, RegistryCredential>>,
527 }
528
529 let cfg_topic = format!("wasmbus.cfg.{lattice}.req");
530 let cfg_payload = serde_json::to_vec(&json!({
531 "labels": labels,
532 }))
533 .context("failed to serialize config payload")?;
534
535 debug!("requesting supplemental config");
536 match ctl_nats.request(cfg_topic, cfg_payload.into()).await {
537 Ok(resp) => {
538 match serde_json::from_slice::<SerializedSupplementalConfig>(resp.payload.as_ref()) {
539 Ok(ser_cfg) => Ok(SupplementalConfig {
540 registry_config: ser_cfg.registry_credentials.and_then(|creds| {
541 creds
542 .into_iter()
543 .map(|(k, v)| {
544 debug!(registry_url = %k, "set registry config");
545 v.into_registry_config().map(|v| (k, v))
546 })
547 .collect::<anyhow::Result<_>>()
548 .ok()
549 }),
550 }),
551 Err(e) => {
552 error!(
553 ?e,
554 "failed to deserialize supplemental config. Defaulting to empty config"
555 );
556 Ok(SupplementalConfig::default())
557 }
558 }
559 }
560 Err(e) => {
561 error!(
562 ?e,
563 "failed to request supplemental config. Defaulting to empty config"
564 );
565 Ok(SupplementalConfig::default())
566 }
567 }
568}
569
570#[instrument(level = "debug", skip_all)]
571async fn merge_registry_config(
572 registry_config: &RwLock<HashMap<String, RegistryConfig>>,
573 oci_opts: OciConfig,
574) -> () {
575 let mut registry_config = registry_config.write().await;
576 let allow_latest = oci_opts.allow_latest;
577 let additional_ca_paths = oci_opts.additional_ca_paths;
578
579 if let Some(reg) = oci_opts.oci_registry {
581 match registry_config.entry(reg.clone()) {
582 Entry::Occupied(_entry) => {
583 warn!(oci_registry_url = %reg, "ignoring OCI registry config, overridden by config service");
585 }
586 Entry::Vacant(entry) => {
587 debug!(oci_registry_url = %reg, "set registry config");
588 entry.insert(
589 RegistryConfig::builder()
590 .reg_type(RegistryType::Oci)
591 .auth(RegistryAuth::from((
592 oci_opts.oci_user,
593 oci_opts.oci_password,
594 )))
595 .build()
596 .expect("failed to build registry config"),
597 );
598 }
599 }
600 }
601
602 oci_opts.allowed_insecure.into_iter().for_each(|reg| {
604 match registry_config.entry(reg.clone()) {
605 Entry::Occupied(mut entry) => {
606 debug!(oci_registry_url = %reg, "set allowed_insecure");
607 entry.get_mut().set_allow_insecure(true);
608 }
609 Entry::Vacant(entry) => {
610 debug!(oci_registry_url = %reg, "set allowed_insecure");
611 entry.insert(
612 RegistryConfig::builder()
613 .reg_type(RegistryType::Oci)
614 .allow_insecure(true)
615 .build()
616 .expect("failed to build registry config"),
617 );
618 }
619 }
620 });
621
622 registry_config.iter_mut().for_each(|(url, config)| {
624 if !additional_ca_paths.is_empty() {
625 config.set_additional_ca_paths(additional_ca_paths.clone());
626 }
627 if allow_latest {
628 debug!(oci_registry_url = %url, "set allow_latest");
629 }
630 config.set_allow_latest(allow_latest);
631 });
632}
633
634impl Host {
635 const DEFAULT_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30);
636
637 const NAME_ADJECTIVES: &'static str = "
638 autumn hidden bitter misty silent empty dry dark summer
639 icy delicate quiet white cool spring winter patient
640 twilight dawn crimson wispy weathered blue billowing
641 broken cold damp falling frosty green long late lingering
642 bold little morning muddy old red rough still small
643 sparkling bouncing shy wandering withered wild black
644 young holy solitary fragrant aged snowy proud floral
645 restless divine polished ancient purple lively nameless
646 gray orange mauve
647 ";
648
649 const NAME_NOUNS: &'static str = "
650 waterfall river breeze moon rain wind sea morning
651 snow lake sunset pine shadow leaf dawn glitter forest
652 hill cloud meadow sun glade bird brook butterfly
653 bush dew dust field fire flower firefly ladybug feather grass
654 haze mountain night pond darkness snowflake silence
655 sound sky shape stapler surf thunder violet water wildflower
656 wave water resonance sun timber dream cherry tree fog autocorrect
657 frost voice paper frog smoke star hamster ocean emoji robot
658 ";
659
660 fn generate_friendly_name() -> Option<String> {
664 let adjectives: Vec<_> = Self::NAME_ADJECTIVES.split_whitespace().collect();
665 let nouns: Vec<_> = Self::NAME_NOUNS.split_whitespace().collect();
666 names::Generator::new(&adjectives, &nouns, names::Name::Numbered).next()
667 }
668
669 #[instrument(level = "debug", skip_all)]
671 pub async fn new(
672 config: HostConfig,
673 ) -> anyhow::Result<(Arc<Self>, impl Future<Output = anyhow::Result<()>>)> {
674 let host_key = if let Some(host_key) = &config.host_key {
675 ensure!(host_key.key_pair_type() == KeyPairType::Server);
676 Arc::clone(host_key)
677 } else {
678 Arc::new(KeyPair::new(KeyPairType::Server))
679 };
680
681 let mut labels = BTreeMap::from([
682 ("hostcore.arch".into(), ARCH.into()),
683 ("hostcore.os".into(), OS.into()),
684 ("hostcore.osfamily".into(), FAMILY.into()),
685 ]);
686 labels.extend(config.labels.clone().into_iter());
687 let friendly_name =
688 Self::generate_friendly_name().context("failed to generate friendly name")?;
689
690 let host_issuer = Arc::new(KeyPair::new_account());
691 let claims = jwt::Claims::<jwt::Host>::new(
692 friendly_name.clone(),
693 host_issuer.public_key(),
694 host_key.public_key().clone(),
695 Some(HashMap::from_iter([(
696 "self_signed".to_string(),
697 "true".to_string(),
698 )])),
699 );
700 let jwt = claims
701 .encode(&host_issuer)
702 .context("failed to encode host claims")?;
703 let host_token = Arc::new(jwt::Token { jwt, claims });
704
705 let start_evt = json!({
706 "friendly_name": friendly_name,
707 "labels": labels,
708 "uptime_seconds": 0,
709 "version": config.version,
710 });
711
712 let workload_identity_config = if config.experimental_features.workload_identity_auth {
713 Some(WorkloadIdentityConfig::from_env()?)
714 } else {
715 None
716 };
717
718 let ((ctl_nats, queue), rpc_nats) = try_join!(
719 async {
720 debug!(
721 ctl_nats_url = config.ctl_nats_url.as_str(),
722 "connecting to NATS control server"
723 );
724 let ctl_nats = connect_nats(
725 config.ctl_nats_url.as_str(),
726 config.ctl_jwt.as_ref(),
727 config.ctl_key.clone(),
728 config.ctl_tls,
729 None,
730 workload_identity_config.clone(),
731 )
732 .await
733 .context("failed to establish NATS control server connection")?;
734 let queue = Queue::new(
735 &ctl_nats,
736 &config.ctl_topic_prefix,
737 &config.lattice,
738 &host_key,
739 config.enable_component_auction,
740 config.enable_provider_auction,
741 )
742 .await
743 .context("failed to initialize queue")?;
744 ctl_nats.flush().await.context("failed to flush")?;
745 Ok((ctl_nats, queue))
746 },
747 async {
748 debug!(
749 rpc_nats_url = config.rpc_nats_url.as_str(),
750 "connecting to NATS RPC server"
751 );
752 connect_nats(
753 config.rpc_nats_url.as_str(),
754 config.rpc_jwt.as_ref(),
755 config.rpc_key.clone(),
756 config.rpc_tls,
757 Some(config.rpc_timeout),
758 workload_identity_config.clone(),
759 )
760 .await
761 .context("failed to establish NATS RPC server connection")
762 }
763 )?;
764
765 let start_at = Instant::now();
766
767 let heartbeat_interval = config
768 .heartbeat_interval
769 .unwrap_or(Self::DEFAULT_HEARTBEAT_INTERVAL);
770 let heartbeat_start_at = start_at
771 .checked_add(heartbeat_interval)
772 .context("failed to compute heartbeat start time")?;
773 let heartbeat = IntervalStream::new(interval_at(heartbeat_start_at, heartbeat_interval));
774
775 let (stop_tx, stop_rx) = watch::channel(None);
776
777 let (runtime, _epoch) = Runtime::builder()
778 .max_execution_time(config.max_execution_time)
779 .max_linear_memory(config.max_linear_memory)
780 .max_components(config.max_components)
781 .max_core_instances_per_component(config.max_core_instances_per_component)
782 .max_component_size(config.max_component_size)
783 .experimental_features(config.experimental_features.into())
784 .build()
785 .context("failed to build runtime")?;
786 let event_builder = EventBuilderV10::new().source(host_key.public_key());
787
788 let ctl_jetstream = if let Some(domain) = config.js_domain.as_ref() {
789 async_nats::jetstream::with_domain(ctl_nats.clone(), domain)
790 } else {
791 async_nats::jetstream::new(ctl_nats.clone())
792 };
793 let bucket = format!("LATTICEDATA_{}", config.lattice);
794 let data = create_bucket(&ctl_jetstream, &bucket).await?;
795
796 let config_bucket = format!("CONFIGDATA_{}", config.lattice);
797 let config_data = create_bucket(&ctl_jetstream, &config_bucket).await?;
798
799 let (queue_abort, queue_abort_reg) = AbortHandle::new_pair();
800 let (heartbeat_abort, heartbeat_abort_reg) = AbortHandle::new_pair();
801 let (data_watch_abort, data_watch_abort_reg) = AbortHandle::new_pair();
802
803 let supplemental_config = if config.config_service_enabled {
804 load_supplemental_config(&ctl_nats, &config.lattice, &labels).await?
805 } else {
806 SupplementalConfig::default()
807 };
808
809 let registry_config = RwLock::new(supplemental_config.registry_config.unwrap_or_default());
810 merge_registry_config(®istry_config, config.oci_opts.clone()).await;
811
812 let policy_manager = PolicyManager::new(
813 ctl_nats.clone(),
814 PolicyHostInfo {
815 public_key: host_key.public_key(),
816 lattice: config.lattice.to_string(),
817 labels: HashMap::from_iter(labels.clone()),
818 },
819 config.policy_service_config.policy_topic.clone(),
820 config.policy_service_config.policy_timeout_ms,
821 config.policy_service_config.policy_changes_topic.clone(),
822 )
823 .await?;
824
825 ensure!(
828 config.secrets_topic_prefix.is_none()
829 || config
830 .secrets_topic_prefix
831 .as_ref()
832 .is_some_and(|topic| !topic.is_empty()),
833 "secrets topic prefix must be non-empty"
834 );
835
836 let secrets_manager = Arc::new(SecretsManager::new(
837 &config_data,
838 config.secrets_topic_prefix.as_ref(),
839 &ctl_nats,
840 ));
841
842 let scope = InstrumentationScope::builder("wasmcloud-host")
843 .with_version(config.version.clone())
844 .with_attributes(vec![
845 KeyValue::new("host.id", host_key.public_key()),
846 KeyValue::new("host.version", config.version.clone()),
847 KeyValue::new("host.arch", ARCH),
848 KeyValue::new("host.os", OS),
849 KeyValue::new("host.osfamily", FAMILY),
850 KeyValue::new("host.friendly_name", friendly_name.clone()),
851 KeyValue::new("host.hostname", System::host_name().unwrap_or_default()),
852 KeyValue::new(
853 "host.kernel_version",
854 System::kernel_version().unwrap_or_default(),
855 ),
856 KeyValue::new("host.os_version", System::os_version().unwrap_or_default()),
857 ])
858 .build();
859 let meter = global::meter_with_scope(scope);
860 let metrics = HostMetrics::new(
861 &meter,
862 host_key.public_key(),
863 config.lattice.to_string(),
864 None,
865 )
866 .context("failed to create HostMetrics instance")?;
867
868 let config_generator = BundleGenerator::new(config_data.clone());
869
870 let max_execution_time_ms = config.max_execution_time;
871
872 debug!("Feature flags: {:?}", config.experimental_features);
873
874 let mut tasks = JoinSet::new();
875 let ready = Arc::new(AtomicBool::new(true));
876 if let Some(addr) = config.http_admin {
877 let socket = TcpListener::bind(addr)
878 .await
879 .context("failed to bind on HTTP administration endpoint")?;
880 let ready = Arc::clone(&ready);
881 let svc = hyper::service::service_fn(move |req| {
882 const OK: &str = r#"{"status":"ok"}"#;
883 const FAIL: &str = r#"{"status":"failure"}"#;
884 let ready = Arc::clone(&ready);
885 async move {
886 let (http::request::Parts { method, uri, .. }, _) = req.into_parts();
887 match (method.as_str(), uri.path()) {
888 ("HEAD", "/livez") => Ok(http::Response::default()),
889 ("GET", "/livez") => Ok(http::Response::new(http_body_util::Full::new(
890 Bytes::from(OK),
891 ))),
892 (method, "/livez") => http::Response::builder()
893 .status(http::StatusCode::METHOD_NOT_ALLOWED)
894 .body(http_body_util::Full::new(Bytes::from(format!(
895 "method `{method}` not supported for path `/livez`"
896 )))),
897 ("HEAD", "/readyz") => {
898 if ready.load(Ordering::Relaxed) {
899 Ok(http::Response::default())
900 } else {
901 http::Response::builder()
902 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
903 .body(http_body_util::Full::default())
904 }
905 }
906 ("GET", "/readyz") => {
907 if ready.load(Ordering::Relaxed) {
908 Ok(http::Response::new(http_body_util::Full::new(Bytes::from(
909 OK,
910 ))))
911 } else {
912 http::Response::builder()
913 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
914 .body(http_body_util::Full::new(Bytes::from(FAIL)))
915 }
916 }
917 (method, "/readyz") => http::Response::builder()
918 .status(http::StatusCode::METHOD_NOT_ALLOWED)
919 .body(http_body_util::Full::new(Bytes::from(format!(
920 "method `{method}` not supported for path `/readyz`"
921 )))),
922 (.., path) => http::Response::builder()
923 .status(http::StatusCode::NOT_FOUND)
924 .body(http_body_util::Full::new(Bytes::from(format!(
925 "unknown endpoint `{path}`"
926 )))),
927 }
928 }
929 });
930 let srv = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
931 tasks.spawn(async move {
932 loop {
933 let stream = match socket.accept().await {
934 Ok((stream, _)) => stream,
935 Err(err) => {
936 error!(?err, "failed to accept HTTP administration connection");
937 continue;
938 }
939 };
940 let svc = svc.clone();
941 if let Err(err) = srv.serve_connection(TokioIo::new(stream), svc).await {
942 error!(?err, "failed to serve HTTP administration connection");
943 }
944 }
945 });
946 }
947
948 let host = Host {
949 components: Arc::default(),
950 event_builder,
951 friendly_name,
952 heartbeat: heartbeat_abort.clone(),
953 ctl_topic_prefix: config.ctl_topic_prefix.clone(),
954 host_key,
955 host_token,
956 secrets_xkey: Arc::new(XKey::new()),
957 labels: Arc::new(RwLock::new(labels)),
958 ctl_nats,
959 rpc_nats: Arc::new(rpc_nats),
960 experimental_features: config.experimental_features,
961 host_config: config,
962 data: data.clone(),
963 data_watch: data_watch_abort.clone(),
964 config_data: config_data.clone(),
965 config_generator,
966 policy_manager,
967 secrets_manager,
968 providers: RwLock::default(),
969 registry_config,
970 runtime,
971 start_at,
972 stop_rx,
973 stop_tx,
974 queue: queue_abort.clone(),
975 links: RwLock::default(),
976 component_claims: Arc::default(),
977 provider_claims: Arc::default(),
978 metrics: Arc::new(metrics),
979 max_execution_time: max_execution_time_ms,
980 messaging_links: Arc::default(),
981 ready: Arc::clone(&ready),
982 tasks,
983 };
984
985 let host = Arc::new(host);
986 let queue = spawn({
987 let host = Arc::clone(&host);
988 async move {
989 let mut queue = Abortable::new(queue, queue_abort_reg);
990 queue
991 .by_ref()
992 .for_each_concurrent(None, {
993 let host = Arc::clone(&host);
994 move |msg| {
995 let host = Arc::clone(&host);
996 async move { host.handle_ctl_message(msg).await }
997 }
998 })
999 .await;
1000 let deadline = { *host.stop_rx.borrow() };
1001 host.stop_tx.send_replace(deadline);
1002 if queue.is_aborted() {
1003 info!("control interface queue task gracefully stopped");
1004 } else {
1005 error!("control interface queue task unexpectedly stopped");
1006 }
1007 }
1008 });
1009
1010 let data_watch: JoinHandle<anyhow::Result<_>> = spawn({
1011 let data = data.clone();
1012 let host = Arc::clone(&host);
1013 async move {
1014 let data_watch = data
1015 .watch_all()
1016 .await
1017 .context("failed to watch lattice data bucket")?;
1018 let mut data_watch = Abortable::new(data_watch, data_watch_abort_reg);
1019 data_watch
1020 .by_ref()
1021 .for_each({
1022 let host = Arc::clone(&host);
1023 move |entry| {
1024 let host = Arc::clone(&host);
1025 async move {
1026 match entry {
1027 Err(error) => {
1028 error!("failed to watch lattice data bucket: {error}");
1029 }
1030 Ok(entry) => host.process_entry(entry).await,
1031 }
1032 }
1033 }
1034 })
1035 .await;
1036 let deadline = { *host.stop_rx.borrow() };
1037 host.stop_tx.send_replace(deadline);
1038 if data_watch.is_aborted() {
1039 info!("data watch task gracefully stopped");
1040 } else {
1041 error!("data watch task unexpectedly stopped");
1042 }
1043 Ok(())
1044 }
1045 });
1046
1047 let heartbeat = spawn({
1048 let host = Arc::clone(&host);
1049 async move {
1050 let mut heartbeat = Abortable::new(heartbeat, heartbeat_abort_reg);
1051 heartbeat
1052 .by_ref()
1053 .for_each({
1054 let host = Arc::clone(&host);
1055 move |_| {
1056 let host = Arc::clone(&host);
1057 async move {
1058 let heartbeat = match host.heartbeat().await {
1059 Ok(heartbeat) => heartbeat,
1060 Err(e) => {
1061 error!("failed to generate heartbeat: {e}");
1062 return;
1063 }
1064 };
1065
1066 if let Err(e) =
1067 host.publish_event("host_heartbeat", heartbeat).await
1068 {
1069 error!("failed to publish heartbeat: {e}");
1070 }
1071 }
1072 }
1073 })
1074 .await;
1075 let deadline = { *host.stop_rx.borrow() };
1076 host.stop_tx.send_replace(deadline);
1077 if heartbeat.is_aborted() {
1078 info!("heartbeat task gracefully stopped");
1079 } else {
1080 error!("heartbeat task unexpectedly stopped");
1081 }
1082 }
1083 });
1084
1085 data.keys()
1087 .await
1088 .context("failed to read keys of lattice data bucket")?
1089 .map_err(|e| anyhow!(e).context("failed to read lattice data stream"))
1090 .try_filter_map(|key| async {
1091 data.entry(key)
1092 .await
1093 .context("failed to get entry in lattice data bucket")
1094 })
1095 .for_each(|entry| async {
1096 match entry {
1097 Ok(entry) => host.process_entry(entry).await,
1098 Err(err) => error!(%err, "failed to read entry from lattice data bucket"),
1099 }
1100 })
1101 .await;
1102
1103 host.publish_event("host_started", start_evt)
1104 .await
1105 .context("failed to publish start event")?;
1106 info!(
1107 host_id = host.host_key.public_key(),
1108 "wasmCloud host started"
1109 );
1110
1111 Ok((Arc::clone(&host), async move {
1112 ready.store(false, Ordering::Relaxed);
1113 heartbeat_abort.abort();
1114 queue_abort.abort();
1115 data_watch_abort.abort();
1116 host.policy_manager.policy_changes.abort();
1117 let _ = try_join!(queue, data_watch, heartbeat).context("failed to await tasks")?;
1118 host.publish_event(
1119 "host_stopped",
1120 json!({
1121 "labels": *host.labels.read().await,
1122 }),
1123 )
1124 .await
1125 .context("failed to publish stop event")?;
1126 try_join!(host.ctl_nats.flush(), host.rpc_nats.flush())
1129 .context("failed to flush NATS clients")?;
1130 Ok(())
1131 }))
1132 }
1133
1134 #[instrument(level = "debug", skip_all)]
1141 pub async fn stopped(&self) -> anyhow::Result<Option<Instant>> {
1142 self.stop_rx
1143 .clone()
1144 .changed()
1145 .await
1146 .context("failed to wait for stop")?;
1147 Ok(*self.stop_rx.borrow())
1148 }
1149
1150 #[instrument(level = "debug", skip_all)]
1151 async fn inventory(&self) -> HostInventory {
1152 trace!("generating host inventory");
1153 let components: Vec<_> = {
1154 let components = self.components.read().await;
1155 stream::iter(components.iter())
1156 .filter_map(|(id, component)| async move {
1157 let mut description = ComponentDescription::builder()
1158 .id(id.into())
1159 .image_ref(component.image_reference.to_string())
1160 .annotations(component.annotations.clone().into_iter().collect())
1161 .max_instances(component.max_instances.get().try_into().unwrap_or(u32::MAX))
1162 .limits(component.limits.map(|limits| limits.to_string_map()))
1163 .revision(
1164 component
1165 .claims()
1166 .and_then(|claims| claims.metadata.as_ref())
1167 .and_then(|jwt::Component { rev, .. }| *rev)
1168 .unwrap_or_default(),
1169 );
1170 if let Some(name) = component
1172 .claims()
1173 .and_then(|claims| claims.metadata.as_ref())
1174 .and_then(|metadata| metadata.name.as_ref())
1175 .cloned()
1176 {
1177 description = description.name(name);
1178 };
1179
1180 Some(
1181 description
1182 .build()
1183 .expect("failed to build component description: {e}"),
1184 )
1185 })
1186 .collect()
1187 .await
1188 };
1189
1190 let providers: Vec<_> = self
1191 .providers
1192 .read()
1193 .await
1194 .iter()
1195 .map(
1196 |(
1197 provider_id,
1198 Provider {
1199 annotations,
1200 claims_token,
1201 image_ref,
1202 ..
1203 },
1204 )| {
1205 let mut provider_description = ProviderDescription::builder()
1206 .id(provider_id)
1207 .image_ref(image_ref);
1208 if let Some(name) = claims_token
1209 .as_ref()
1210 .and_then(|claims| claims.claims.metadata.as_ref())
1211 .and_then(|metadata| metadata.name.as_ref())
1212 {
1213 provider_description = provider_description.name(name);
1214 }
1215 provider_description
1216 .annotations(
1217 annotations
1218 .clone()
1219 .into_iter()
1220 .collect::<BTreeMap<String, String>>(),
1221 )
1222 .revision(
1223 claims_token
1224 .as_ref()
1225 .and_then(|claims| claims.claims.metadata.as_ref())
1226 .and_then(|jwt::CapabilityProvider { rev, .. }| *rev)
1227 .unwrap_or_default(),
1228 )
1229 .build()
1230 .expect("failed to build provider description")
1231 },
1232 )
1233 .collect();
1234
1235 let uptime = self.start_at.elapsed();
1236 HostInventory::builder()
1237 .components(components)
1238 .providers(providers)
1239 .friendly_name(self.friendly_name.clone())
1240 .labels(self.labels.read().await.clone())
1241 .uptime_human(human_friendly_uptime(uptime))
1242 .uptime_seconds(uptime.as_secs())
1243 .version(self.host_config.version.clone())
1244 .host_id(self.host_key.public_key())
1245 .build()
1246 .expect("failed to build host inventory")
1247 }
1248
1249 #[instrument(level = "debug", skip_all)]
1250 async fn heartbeat(&self) -> anyhow::Result<serde_json::Value> {
1251 trace!("generating heartbeat");
1252 Ok(serde_json::to_value(self.inventory().await)?)
1253 }
1254
1255 #[instrument(level = "debug", skip(self))]
1256 async fn publish_event(&self, name: &str, data: serde_json::Value) -> anyhow::Result<()> {
1257 event::publish(
1258 &self.event_builder,
1259 &self.ctl_nats,
1260 &self.host_config.lattice,
1261 name,
1262 data,
1263 )
1264 .await
1265 }
1266
1267 #[allow(clippy::too_many_arguments)] #[instrument(level = "debug", skip_all)]
1270 async fn instantiate_component(
1271 &self,
1272 annotations: &Annotations,
1273 image_reference: Arc<str>,
1274 id: Arc<str>,
1275 max_instances: NonZeroUsize,
1276 limits: Option<Limits>,
1277 mut component: wasmcloud_runtime::Component<Handler>,
1278 handler: Handler,
1279 ) -> anyhow::Result<Arc<Component>> {
1280 trace!(
1281 component_ref = ?image_reference,
1282 max_instances,
1283 "instantiating component"
1284 );
1285
1286 let max_execution_time = self.max_execution_time; component.set_max_execution_time(max_execution_time);
1288
1289 let (events_tx, mut events_rx) = mpsc::channel(
1290 max_instances
1291 .get()
1292 .clamp(MIN_INVOCATION_CHANNEL_SIZE, MAX_INVOCATION_CHANNEL_SIZE),
1293 );
1294 let prefix = Arc::from(format!("{}.{id}", &self.host_config.lattice));
1295 let nats = wrpc_transport_nats::Client::new(
1296 Arc::clone(&self.rpc_nats),
1297 Arc::clone(&prefix),
1298 Some(prefix),
1299 )
1300 .await?;
1301 let exports = component
1302 .serve_wrpc(
1303 &WrpcServer {
1304 nats,
1305 claims: component.claims().cloned().map(Arc::new),
1306 id: Arc::clone(&id),
1307 image_reference: Arc::clone(&image_reference),
1308 annotations: Arc::new(annotations.clone()),
1309 policy_manager: Arc::clone(&self.policy_manager),
1310 metrics: Arc::clone(&self.metrics),
1311 },
1312 handler.clone(),
1313 events_tx.clone(),
1314 )
1315 .await?;
1316 let permits = Arc::new(Semaphore::new(
1317 usize::from(max_instances).min(Semaphore::MAX_PERMITS),
1318 ));
1319 let component_attributes = Arc::new(vec![
1320 KeyValue::new("component.id", id.to_string()),
1321 KeyValue::new("component.ref", image_reference.to_string()),
1322 KeyValue::new("lattice", self.host_config.lattice.clone()),
1323 KeyValue::new("host", self.host_key.public_key()),
1324 ]);
1325 self.metrics
1326 .set_max_instances(max_instances.get() as u64, &component_attributes);
1327
1328 let metrics = Arc::clone(&self.metrics);
1329 Ok(Arc::new(Component {
1330 component,
1331 id: Arc::clone(&id),
1332 handler,
1333 events: events_tx,
1334 permits: Arc::clone(&permits),
1335 exports: spawn(async move {
1336 let metrics_left = Arc::clone(&metrics);
1338 let metrics_right = Arc::clone(&metrics);
1339 join!(
1340 async move {
1341 let mut exports = stream::select_all(exports);
1342 loop {
1343 let metrics_left = Arc::clone(&metrics_left);
1344 let component_attributes = Arc::clone(&component_attributes);
1345 let permits = Arc::clone(&permits);
1346 if let Some(fut) = exports.next().await {
1347 match fut {
1348 Ok(fut) => {
1349 debug!("accepted invocation, acquiring permit");
1350 let permit = permits.acquire_owned().await;
1351
1352 metrics_left
1354 .increment_active_instance(&component_attributes);
1355 spawn(async move {
1356 let _permit = permit;
1357 debug!("handling invocation");
1358 let result = timeout(max_execution_time, fut).await;
1360 metrics_left
1361 .decrement_active_instance(&component_attributes);
1362
1363 match result {
1364 Ok(Ok(())) => {
1365 debug!("successfully handled invocation");
1366 Ok(())
1367 }
1368 Ok(Err(err)) => {
1369 warn!(?err, "failed to handle invocation");
1370 Err(err)
1371 }
1372 Err(_err) => {
1373 warn!("component invocation timed out");
1374 Err(anyhow::anyhow!(
1375 "component invocation timed out"
1376 ))
1377 }
1378 }
1379 });
1380 }
1381 Err(err) => {
1382 warn!(?err, "failed to accept invocation")
1383 }
1384 }
1385 }
1386 }
1387 },
1388 async move {
1389 while let Some(evt) = events_rx.recv().await {
1390 match evt {
1391 WrpcServeEvent::HttpIncomingHandlerHandleReturned {
1392 context:
1393 InvocationContext {
1394 start_at,
1395 ref attributes,
1396 ..
1397 },
1398 success,
1399 }
1400 | WrpcServeEvent::MessagingHandlerHandleMessageReturned {
1401 context:
1402 InvocationContext {
1403 start_at,
1404 ref attributes,
1405 ..
1406 },
1407 success,
1408 }
1409 | WrpcServeEvent::DynamicExportReturned {
1410 context:
1411 InvocationContext {
1412 start_at,
1413 ref attributes,
1414 ..
1415 },
1416 success,
1417 } => metrics_right.record_component_invocation(
1418 u64::try_from(start_at.elapsed().as_nanos())
1419 .unwrap_or_default(),
1420 attributes,
1421 !success,
1422 ),
1423 }
1424 }
1425 debug!("serving event stream is done");
1426 },
1427 );
1428 debug!("export serving task done");
1429 }),
1430 annotations: annotations.clone(),
1431 max_instances,
1432 limits,
1433 image_reference: Arc::clone(&image_reference),
1434 }))
1435 }
1436
1437 #[allow(clippy::too_many_arguments)]
1438 #[instrument(level = "debug", skip_all)]
1439 async fn start_component<'a>(
1440 &self,
1441 entry: hash_map::VacantEntry<'a, String, Arc<Component>>,
1442 wasm: &[u8],
1443 claims: Option<jwt::Claims<jwt::Component>>,
1444 component_ref: Arc<str>,
1445 component_id: Arc<str>,
1446 max_instances: NonZeroUsize,
1447 limits: Option<Limits>,
1448 annotations: &Annotations,
1449 config: ConfigBundle,
1450 secrets: HashMap<String, SecretBox<SecretValue>>,
1451 ) -> anyhow::Result<&'a mut Arc<Component>> {
1452 debug!(?component_ref, ?max_instances, "starting new component");
1453
1454 if let Some(ref claims) = claims {
1455 self.store_claims(Claims::Component(claims.clone()))
1456 .await
1457 .context("failed to store claims")?;
1458 }
1459
1460 let component_spec = self
1461 .get_component_spec(&component_id)
1462 .await?
1463 .unwrap_or_else(|| ComponentSpecification::new(&component_ref));
1464 self.store_component_spec(&component_id, &component_spec)
1465 .await?;
1466
1467 let handler = Handler {
1469 nats: Arc::clone(&self.rpc_nats),
1470 config_data: Arc::new(RwLock::new(config)),
1471 lattice: Arc::clone(&self.host_config.lattice),
1472 component_id: Arc::clone(&component_id),
1473 secrets: Arc::new(RwLock::new(secrets)),
1474 targets: Arc::default(),
1475 instance_links: Arc::new(RwLock::new(component_import_links(&component_spec.links))),
1476 messaging_links: {
1477 let mut links = self.messaging_links.write().await;
1478 Arc::clone(links.entry(Arc::clone(&component_id)).or_default())
1479 },
1480 invocation_timeout: Duration::from_secs(10), experimental_features: self.experimental_features,
1482 host_labels: Arc::clone(&self.labels),
1483 };
1484 let component = wasmcloud_runtime::Component::new(&self.runtime, wasm, limits)?;
1485 let component = self
1486 .instantiate_component(
1487 annotations,
1488 Arc::clone(&component_ref),
1489 Arc::clone(&component_id),
1490 max_instances,
1491 limits,
1492 component,
1493 handler,
1494 )
1495 .await
1496 .context("failed to instantiate component")?;
1497
1498 info!(?component_ref, "component started");
1499 self.publish_event(
1500 "component_scaled",
1501 event::component_scaled(
1502 claims.as_ref(),
1503 annotations,
1504 self.host_key.public_key(),
1505 max_instances,
1506 &component_ref,
1507 &component_id,
1508 ),
1509 )
1510 .await?;
1511
1512 Ok(entry.insert(component))
1513 }
1514
1515 #[instrument(level = "debug", skip_all)]
1516 async fn stop_component(&self, component: &Component, _host_id: &str) -> anyhow::Result<()> {
1517 trace!(component_id = %component.id, "stopping component");
1518
1519 component.exports.abort();
1520
1521 Ok(())
1522 }
1523
1524 #[instrument(level = "trace", skip_all)]
1525 async fn fetch_component(&self, component_ref: &str) -> anyhow::Result<Vec<u8>> {
1526 let registry_config = self.registry_config.read().await;
1527 fetch_component(
1528 component_ref,
1529 self.host_config.allow_file_load,
1530 &self.host_config.oci_opts,
1531 ®istry_config,
1532 )
1533 .await
1534 .context("failed to fetch component")
1535 }
1536
1537 #[instrument(level = "trace", skip_all)]
1538 async fn store_component_claims(
1539 &self,
1540 claims: jwt::Claims<jwt::Component>,
1541 ) -> anyhow::Result<()> {
1542 let mut component_claims = self.component_claims.write().await;
1543 component_claims.insert(claims.subject.clone(), claims);
1544 Ok(())
1545 }
1546
1547 #[instrument(level = "debug", skip_all)]
1548 async fn handle_auction_component(
1549 &self,
1550 payload: impl AsRef<[u8]>,
1551 ) -> anyhow::Result<Option<CtlResponse<ComponentAuctionAck>>> {
1552 let request = serde_json::from_slice::<ComponentAuctionRequest>(payload.as_ref())
1553 .context("failed to deserialize component auction command")?;
1554 <Self as ControlInterfaceServer>::handle_auction_component(self, request).await
1555 }
1556
1557 #[instrument(level = "debug", skip_all)]
1558 async fn handle_auction_provider(
1559 &self,
1560 payload: impl AsRef<[u8]>,
1561 ) -> anyhow::Result<Option<CtlResponse<ProviderAuctionAck>>> {
1562 let request = serde_json::from_slice::<ProviderAuctionRequest>(payload.as_ref())
1563 .context("failed to deserialize provider auction command")?;
1564 <Self as ControlInterfaceServer>::handle_auction_provider(self, request).await
1565 }
1566
1567 #[instrument(level = "debug", skip_all)]
1568 async fn handle_stop_host(
1569 &self,
1570 payload: impl AsRef<[u8]>,
1571 transport_host_id: &str,
1572 ) -> anyhow::Result<CtlResponse<()>> {
1573 let timeout = if payload.as_ref().is_empty() {
1575 None
1576 } else {
1577 let cmd = serde_json::from_slice::<StopHostCommand>(payload.as_ref())
1578 .context("failed to deserialize stop command")?;
1579 let timeout = cmd.timeout();
1580 let host_id = cmd.host_id();
1581
1582 if !host_id.is_empty() {
1585 anyhow::ensure!(
1586 host_id == transport_host_id && host_id == self.host_key.public_key(),
1587 "invalid host_id [{host_id}]"
1588 );
1589 }
1590 timeout
1591 };
1592
1593 anyhow::ensure!(
1595 transport_host_id == self.host_key.public_key(),
1596 "invalid host_id [{transport_host_id}]"
1597 );
1598
1599 let mut stop_command = StopHostCommand::builder().host_id(transport_host_id);
1600 if let Some(timeout) = timeout {
1601 stop_command = stop_command.timeout(timeout);
1602 }
1603 <Self as ControlInterfaceServer>::handle_stop_host(
1604 self,
1605 stop_command
1606 .build()
1607 .map_err(|e| anyhow!(e))
1608 .context("failed to build stop host command")?,
1609 )
1610 .await
1611 }
1612
1613 #[instrument(level = "debug", skip_all)]
1614 async fn handle_scale_component(
1615 self: Arc<Self>,
1616 payload: impl AsRef<[u8]>,
1617 ) -> anyhow::Result<CtlResponse<()>> {
1618 let request = serde_json::from_slice::<ScaleComponentCommand>(payload.as_ref())
1619 .context("failed to deserialize component scale command")?;
1620 <Self as ControlInterfaceServer>::handle_scale_component(self, request).await
1621 }
1622
1623 #[instrument(level = "debug", skip_all)]
1624 #[allow(clippy::too_many_arguments)]
1627 async fn handle_scale_component_task(
1628 &self,
1629 component_ref: Arc<str>,
1630 component_id: Arc<str>,
1631 host_id: &str,
1632 max_instances: u32,
1633 component_limits: Option<HashMap<String, String>>,
1634 annotations: &Annotations,
1635 config: Vec<String>,
1636 wasm: anyhow::Result<Vec<u8>>,
1637 claims_token: Option<&jwt::Token<jwt::Component>>,
1638 ) -> anyhow::Result<()> {
1639 trace!(?component_ref, max_instances, "scale component task");
1640
1641 let claims = claims_token.map(|c| c.claims.clone());
1642 match self
1643 .policy_manager
1644 .evaluate_start_component(
1645 &component_id,
1646 &component_ref,
1647 max_instances,
1648 annotations,
1649 claims.as_ref(),
1650 )
1651 .await?
1652 {
1653 PolicyResponse {
1654 permitted: false,
1655 message: Some(message),
1656 ..
1657 } => bail!("Policy denied request to scale component `{component_id}`: `{message:?}`"),
1658 PolicyResponse {
1659 permitted: false, ..
1660 } => bail!("Policy denied request to scale component `{component_id}`"),
1661 PolicyResponse {
1662 permitted: true, ..
1663 } => (),
1664 };
1665
1666 let limits: Option<Limits> = from_string_map(component_limits.as_ref());
1667
1668 let scaled_event = match (
1669 self.components
1670 .write()
1671 .await
1672 .entry(component_id.to_string()),
1673 NonZeroUsize::new(max_instances as usize),
1674 ) {
1675 (hash_map::Entry::Vacant(_), None) => event::component_scaled(
1678 claims.as_ref(),
1679 annotations,
1680 host_id,
1681 0_usize,
1682 &component_ref,
1683 &component_id,
1684 ),
1685 (hash_map::Entry::Vacant(entry), Some(max)) => {
1687 let (config, secrets) = self
1688 .fetch_config_and_secrets(
1689 &config,
1690 claims_token.as_ref().map(|c| &c.jwt),
1691 annotations.get("wasmcloud.dev/appspec"),
1692 )
1693 .await?;
1694 match &wasm {
1695 Ok(wasm) => {
1696 self.start_component(
1697 entry,
1698 wasm,
1699 claims.clone(),
1700 Arc::clone(&component_ref),
1701 Arc::clone(&component_id),
1702 max,
1703 limits,
1704 annotations,
1705 config,
1706 secrets,
1707 )
1708 .await?;
1709
1710 event::component_scaled(
1711 claims.as_ref(),
1712 annotations,
1713 host_id,
1714 max,
1715 &component_ref,
1716 &component_id,
1717 )
1718 }
1719 Err(e) => {
1720 error!(%component_ref, %component_id, err = ?e, "failed to scale component");
1721 if let Err(e) = self
1722 .publish_event(
1723 "component_scale_failed",
1724 event::component_scale_failed(
1725 claims_token.map(|c| c.claims.clone()).as_ref(),
1726 annotations,
1727 host_id,
1728 &component_ref,
1729 &component_id,
1730 max_instances,
1731 e,
1732 ),
1733 )
1734 .await
1735 {
1736 error!(%component_ref, %component_id, err = ?e, "failed to publish component scale failed event");
1737 }
1738 return Ok(());
1739 }
1740 }
1741 }
1742 (hash_map::Entry::Occupied(entry), None) => {
1744 let component = entry.remove();
1745 self.stop_component(&component, host_id)
1746 .await
1747 .context("failed to stop component in response to scale to zero")?;
1748
1749 info!(?component_ref, "component stopped");
1750 event::component_scaled(
1751 claims.as_ref(),
1752 &component.annotations,
1753 host_id,
1754 0_usize,
1755 &component.image_reference,
1756 &component.id,
1757 )
1758 }
1759 (hash_map::Entry::Occupied(mut entry), Some(max)) => {
1761 let component = entry.get_mut();
1762 let config_changed =
1763 &config != component.handler.config_data.read().await.config_names();
1764
1765 let event = event::component_scaled(
1768 claims.as_ref(),
1769 &component.annotations,
1770 host_id,
1771 max,
1772 &component.image_reference,
1773 &component.id,
1774 );
1775
1776 if component.max_instances != max || config_changed {
1778 let handler = component.handler.copy_for_new();
1780 if config_changed {
1781 let (config, secrets) = self
1782 .fetch_config_and_secrets(
1783 &config,
1784 claims_token.as_ref().map(|c| &c.jwt),
1785 annotations.get("wasmcloud.dev/appspec"),
1786 )
1787 .await?;
1788 *handler.config_data.write().await = config;
1789 *handler.secrets.write().await = secrets;
1790 }
1791 let instance = self
1792 .instantiate_component(
1793 annotations,
1794 Arc::clone(&component_ref),
1795 Arc::clone(&component.id),
1796 max,
1797 limits,
1798 component.component.clone(),
1799 handler,
1800 )
1801 .await
1802 .context("failed to instantiate component")?;
1803 let component = entry.insert(instance);
1804 self.stop_component(&component, host_id)
1805 .await
1806 .context("failed to stop component after scaling")?;
1807
1808 info!(?component_ref, ?max, "component scaled");
1809 } else {
1810 debug!(?component_ref, ?max, "component already at desired scale");
1811 }
1812 event
1813 }
1814 };
1815
1816 self.publish_event("component_scaled", scaled_event).await?;
1817
1818 Ok(())
1819 }
1820
1821 #[instrument(level = "debug", skip_all)]
1825 async fn handle_update_component(
1826 self: Arc<Self>,
1827 payload: impl AsRef<[u8]>,
1828 ) -> anyhow::Result<CtlResponse<()>> {
1829 let cmd = serde_json::from_slice::<UpdateComponentCommand>(payload.as_ref())
1830 .context("failed to deserialize component update command")?;
1831 <Self as ControlInterfaceServer>::handle_update_component(self, cmd).await
1832 }
1833
1834 async fn handle_update_component_task(
1835 &self,
1836 component_id: Arc<str>,
1837 new_component_ref: Arc<str>,
1838 host_id: &str,
1839 annotations: Option<BTreeMap<String, String>>,
1840 ) -> anyhow::Result<()> {
1841 let component = {
1844 let components = self.components.read().await;
1845 let existing_component = components
1846 .get(&*component_id)
1847 .context("component not found")?;
1848 let annotations = annotations.unwrap_or_default().into_iter().collect();
1849
1850 if existing_component.image_reference == new_component_ref {
1852 info!(%component_id, %new_component_ref, "component already updated");
1853 return Ok(());
1854 }
1855
1856 let new_component = self.fetch_component(&new_component_ref).await?;
1857 let new_component = wasmcloud_runtime::Component::new(
1858 &self.runtime,
1859 &new_component,
1860 existing_component.limits,
1861 )
1862 .context("failed to initialize component")?;
1863 let new_claims = new_component.claims().cloned();
1864 if let Some(ref claims) = new_claims {
1865 self.store_claims(Claims::Component(claims.clone()))
1866 .await
1867 .context("failed to store claims")?;
1868 }
1869
1870 let max = existing_component.max_instances;
1871 let Ok(component) = self
1872 .instantiate_component(
1873 &annotations,
1874 Arc::clone(&new_component_ref),
1875 Arc::clone(&component_id),
1876 max,
1877 existing_component.limits,
1878 new_component,
1879 existing_component.handler.copy_for_new(),
1880 )
1881 .await
1882 else {
1883 bail!("failed to instantiate component from new reference");
1884 };
1885
1886 info!(%new_component_ref, "component updated");
1887 self.publish_event(
1888 "component_scaled",
1889 event::component_scaled(
1890 new_claims.as_ref(),
1891 &component.annotations,
1892 host_id,
1893 max,
1894 new_component_ref,
1895 &component_id,
1896 ),
1897 )
1898 .await?;
1899
1900 self.stop_component(&component, host_id)
1902 .await
1903 .context("failed to stop old component")?;
1904 self.publish_event(
1905 "component_scaled",
1906 event::component_scaled(
1907 component.claims(),
1908 &component.annotations,
1909 host_id,
1910 0_usize,
1911 &component.image_reference,
1912 &component.id,
1913 ),
1914 )
1915 .await?;
1916
1917 component
1918 };
1919
1920 self.components
1921 .write()
1922 .await
1923 .insert(component_id.to_string(), component);
1924 Ok(())
1925 }
1926
1927 #[instrument(level = "debug", skip_all)]
1928 async fn handle_start_provider(
1929 self: Arc<Self>,
1930 payload: impl AsRef<[u8]>,
1931 ) -> anyhow::Result<Option<CtlResponse<()>>> {
1932 let cmd = serde_json::from_slice::<StartProviderCommand>(payload.as_ref())
1933 .context("failed to deserialize provider start command")?;
1934 <Self as ControlInterfaceServer>::handle_start_provider(self, cmd).await
1935 }
1936
1937 #[instrument(level = "debug", skip_all)]
1938 async fn handle_start_provider_task(
1939 self: Arc<Self>,
1940 config_names: &[String],
1941 provider_id: &str,
1942 provider_ref: &str,
1943 annotations: BTreeMap<String, String>,
1944 host_id: &str,
1945 ) -> anyhow::Result<()> {
1946 trace!(provider_ref, provider_id, "start provider task");
1947
1948 let registry_config = self.registry_config.read().await;
1949 let provider_ref =
1950 ResourceRef::try_from(provider_ref).context("failed to parse provider reference")?;
1951 let (path, claims_token) = match &provider_ref {
1952 ResourceRef::Builtin(..) => (None, None),
1953 _ => {
1954 let (path, claims_token) = crate::fetch_provider(
1955 &provider_ref,
1956 host_id,
1957 self.host_config.allow_file_load,
1958 &self.host_config.oci_opts,
1959 ®istry_config,
1960 )
1961 .await
1962 .context("failed to fetch provider")?;
1963 (Some(path), claims_token)
1964 }
1965 };
1966 let claims = claims_token.as_ref().map(|t| t.claims.clone());
1967
1968 if let Some(claims) = claims.clone() {
1969 self.store_claims(Claims::Provider(claims))
1970 .await
1971 .context("failed to store claims")?;
1972 }
1973
1974 let annotations: Annotations = annotations.into_iter().collect();
1975
1976 let PolicyResponse {
1977 permitted,
1978 request_id,
1979 message,
1980 } = self
1981 .policy_manager
1982 .evaluate_start_provider(
1983 provider_id,
1984 provider_ref.as_ref(),
1985 &annotations,
1986 claims.as_ref(),
1987 )
1988 .await?;
1989 ensure!(
1990 permitted,
1991 "policy denied request to start provider `{request_id}`: `{message:?}`",
1992 );
1993
1994 let component_specification = self
1995 .get_component_spec(provider_id)
1996 .await?
1997 .unwrap_or_else(|| ComponentSpecification::new(provider_ref.as_ref()));
1998
1999 self.store_component_spec(&provider_id, &component_specification)
2000 .await?;
2001
2002 let mut providers = self.providers.write().await;
2003 if let hash_map::Entry::Vacant(entry) = providers.entry(provider_id.into()) {
2004 let provider_xkey = XKey::new();
2005 let xkey = XKey::from_public_key(&provider_xkey.public_key())
2007 .context("failed to create XKey from provider public key xkey")?;
2008 let (host_data, config_bundle) = self
2010 .prepare_provider_config(
2011 config_names,
2012 claims_token.as_ref(),
2013 provider_id,
2014 &provider_xkey,
2015 &annotations,
2016 )
2017 .await?;
2018 let config_bundle = Arc::new(RwLock::new(config_bundle));
2019 let shutdown = Arc::new(AtomicBool::new(false));
2022 let tasks = match (path, &provider_ref) {
2023 (Some(path), ..) => {
2024 Arc::clone(&self)
2025 .start_binary_provider(
2026 path,
2027 host_data,
2028 Arc::clone(&config_bundle),
2029 provider_xkey,
2030 provider_id,
2031 config_names.to_vec(),
2033 claims_token.clone(),
2034 annotations.clone(),
2035 shutdown.clone(),
2036 )
2037 .await?
2038 }
2039 (None, ResourceRef::Builtin(name)) => match *name {
2040 "http-client" if self.experimental_features.builtin_http_client => {
2041 self.start_http_client_provider(host_data, provider_xkey, provider_id)
2042 .await?
2043 }
2044 "http-client" => {
2045 bail!("feature `builtin-http-client` is not enabled, denying start")
2046 }
2047 "http-server" if self.experimental_features.builtin_http_server => {
2048 self.start_http_server_provider(host_data, provider_xkey, provider_id)
2049 .await?
2050 }
2051 "http-server" => {
2052 bail!("feature `builtin-http-server` is not enabled, denying start")
2053 }
2054 "messaging-nats" if self.experimental_features.builtin_messaging_nats => {
2055 self.start_messaging_nats_provider(host_data, provider_xkey, provider_id)
2056 .await?
2057 }
2058 "messaging-nats" => {
2059 bail!("feature `builtin-messaging-nats` is not enabled, denying start")
2060 }
2061 _ => bail!("unknown builtin name: {name}"),
2062 },
2063 _ => bail!("invalid provider reference"),
2064 };
2065
2066 info!(
2067 provider_ref = provider_ref.as_ref(),
2068 provider_id, "provider started"
2069 );
2070 self.publish_event(
2071 "provider_started",
2072 event::provider_started(
2073 claims.as_ref(),
2074 &annotations,
2075 host_id,
2076 &provider_ref,
2077 provider_id,
2078 ),
2079 )
2080 .await?;
2081
2082 entry.insert(Provider {
2084 tasks,
2085 annotations,
2086 claims_token,
2087 image_ref: provider_ref.as_ref().to_string(),
2088 xkey,
2089 shutdown,
2090 });
2091 } else {
2092 bail!("provider is already running with that ID")
2093 }
2094
2095 Ok(())
2096 }
2097
2098 #[instrument(level = "debug", skip_all)]
2099 async fn handle_stop_provider(
2100 &self,
2101 payload: impl AsRef<[u8]>,
2102 ) -> anyhow::Result<CtlResponse<()>> {
2103 let cmd = serde_json::from_slice::<StopProviderCommand>(payload.as_ref())
2104 .context("failed to deserialize provider stop command")?;
2105 <Self as ControlInterfaceServer>::handle_stop_provider(self, cmd).await
2106 }
2107
2108 #[instrument(level = "debug", skip_all)]
2109 async fn handle_inventory(&self) -> anyhow::Result<CtlResponse<HostInventory>> {
2110 <Self as ControlInterfaceServer>::handle_inventory(self).await
2111 }
2112
2113 #[instrument(level = "trace", skip_all)]
2114 async fn handle_claims(&self) -> anyhow::Result<CtlResponse<Vec<HashMap<String, String>>>> {
2115 <Self as ControlInterfaceServer>::handle_claims(self).await
2116 }
2117
2118 #[instrument(level = "trace", skip_all)]
2119 async fn handle_links(&self) -> anyhow::Result<Vec<u8>> {
2120 <Self as ControlInterfaceServer>::handle_links(self).await
2121 }
2122
2123 #[instrument(level = "trace", skip(self))]
2124 async fn handle_config_get(&self, config_name: &str) -> anyhow::Result<Vec<u8>> {
2125 <Self as ControlInterfaceServer>::handle_config_get(self, config_name).await
2126 }
2127
2128 #[instrument(level = "debug", skip_all)]
2129 async fn handle_label_put(
2130 &self,
2131 host_id: &str,
2132 payload: impl AsRef<[u8]>,
2133 ) -> anyhow::Result<CtlResponse<()>> {
2134 let host_label = serde_json::from_slice::<HostLabel>(payload.as_ref())
2135 .context("failed to deserialize put label request")?;
2136 <Self as ControlInterfaceServer>::handle_label_put(self, host_label, host_id).await
2137 }
2138
2139 #[instrument(level = "debug", skip_all)]
2140 async fn handle_label_del(
2141 &self,
2142 host_id: &str,
2143 payload: impl AsRef<[u8]>,
2144 ) -> anyhow::Result<CtlResponse<()>> {
2145 let label = serde_json::from_slice::<HostLabelIdentifier>(payload.as_ref())
2146 .context("failed to deserialize delete label request")?;
2147 <Self as ControlInterfaceServer>::handle_label_del(self, label, host_id).await
2148 }
2149
2150 #[instrument(level = "debug", skip_all)]
2154 async fn handle_link_put(&self, payload: impl AsRef<[u8]>) -> anyhow::Result<CtlResponse<()>> {
2155 let link: Link = serde_json::from_slice(payload.as_ref())
2156 .context("failed to deserialize wrpc link definition")?;
2157 <Self as ControlInterfaceServer>::handle_link_put(self, link).await
2158 }
2159
2160 #[instrument(level = "debug", skip_all)]
2161 async fn handle_link_del(&self, payload: impl AsRef<[u8]>) -> anyhow::Result<CtlResponse<()>> {
2163 let req = serde_json::from_slice::<DeleteInterfaceLinkDefinitionRequest>(payload.as_ref())
2164 .context("failed to deserialize wrpc link definition")?;
2165 <Self as ControlInterfaceServer>::handle_link_del(self, req).await
2166 }
2167
2168 #[instrument(level = "debug", skip_all)]
2169 async fn handle_registries_put(
2170 &self,
2171 payload: impl AsRef<[u8]>,
2172 ) -> anyhow::Result<CtlResponse<()>> {
2173 let registry_creds: HashMap<String, RegistryCredential> =
2174 serde_json::from_slice(payload.as_ref())
2175 .context("failed to deserialize registries put command")?;
2176 <Self as ControlInterfaceServer>::handle_registries_put(self, registry_creds).await
2177 }
2178
2179 #[instrument(level = "debug", skip_all, fields(%config_name))]
2180 async fn handle_config_put(
2181 &self,
2182 config_name: &str,
2183 data: Bytes,
2184 ) -> anyhow::Result<CtlResponse<()>> {
2185 serde_json::from_slice::<HashMap<String, String>>(&data)
2187 .context("config data should be a map of string -> string")?;
2188 <Self as ControlInterfaceServer>::handle_config_put(self, config_name, data).await
2189 }
2190
2191 #[instrument(level = "debug", skip_all, fields(%config_name))]
2192 async fn handle_config_delete(&self, config_name: &str) -> anyhow::Result<CtlResponse<()>> {
2193 <Self as ControlInterfaceServer>::handle_config_delete(self, config_name).await
2194 }
2195
2196 #[instrument(level = "debug", skip_all)]
2197 async fn handle_ping_hosts(
2198 &self,
2199 ) -> anyhow::Result<CtlResponse<wasmcloud_control_interface::Host>> {
2200 <Self as ControlInterfaceServer>::handle_ping_hosts(self).await
2201 }
2202
2203 #[instrument(level = "trace", skip_all, fields(subject = %message.subject))]
2204 async fn handle_ctl_message(self: Arc<Self>, message: async_nats::Message) {
2205 opentelemetry_nats::attach_span_context(&message);
2209 let subject = message.subject;
2212 let mut parts = subject
2213 .trim()
2214 .trim_start_matches(&self.ctl_topic_prefix)
2215 .trim_start_matches('.')
2216 .split('.')
2217 .skip(2);
2218 trace!(%subject, "handling control interface request");
2219
2220 let ctl_response = match (parts.next(), parts.next(), parts.next(), parts.next()) {
2229 (Some("component"), Some("auction"), None, None) => self
2231 .handle_auction_component(message.payload)
2232 .await
2233 .map(serialize_ctl_response),
2234 (Some("component"), Some("scale"), Some(_host_id), None) => Arc::clone(&self)
2235 .handle_scale_component(message.payload)
2236 .await
2237 .map(Some)
2238 .map(serialize_ctl_response),
2239 (Some("component"), Some("update"), Some(_host_id), None) => Arc::clone(&self)
2240 .handle_update_component(message.payload)
2241 .await
2242 .map(Some)
2243 .map(serialize_ctl_response),
2244 (Some("provider"), Some("auction"), None, None) => self
2246 .handle_auction_provider(message.payload)
2247 .await
2248 .map(serialize_ctl_response),
2249 (Some("provider"), Some("start"), Some(_host_id), None) => Arc::clone(&self)
2250 .handle_start_provider(message.payload)
2251 .await
2252 .map(serialize_ctl_response),
2253 (Some("provider"), Some("stop"), Some(_host_id), None) => self
2254 .handle_stop_provider(message.payload)
2255 .await
2256 .map(Some)
2257 .map(serialize_ctl_response),
2258 (Some("host"), Some("get"), Some(_host_id), None) => self
2260 .handle_inventory()
2261 .await
2262 .map(Some)
2263 .map(serialize_ctl_response),
2264 (Some("host"), Some("ping"), None, None) => self
2265 .handle_ping_hosts()
2266 .await
2267 .map(Some)
2268 .map(serialize_ctl_response),
2269 (Some("host"), Some("stop"), Some(host_id), None) => self
2270 .handle_stop_host(message.payload, host_id)
2271 .await
2272 .map(Some)
2273 .map(serialize_ctl_response),
2274 (Some("claims"), Some("get"), None, None) => self
2276 .handle_claims()
2277 .await
2278 .map(Some)
2279 .map(serialize_ctl_response),
2280 (Some("link"), Some("del"), None, None) => self
2282 .handle_link_del(message.payload)
2283 .await
2284 .map(Some)
2285 .map(serialize_ctl_response),
2286 (Some("link"), Some("get"), None, None) => {
2287 self.handle_links().await.map(|bytes| Some(Ok(bytes)))
2289 }
2290 (Some("link"), Some("put"), None, None) => self
2291 .handle_link_put(message.payload)
2292 .await
2293 .map(Some)
2294 .map(serialize_ctl_response),
2295 (Some("label"), Some("del"), Some(host_id), None) => self
2297 .handle_label_del(host_id, message.payload)
2298 .await
2299 .map(Some)
2300 .map(serialize_ctl_response),
2301 (Some("label"), Some("put"), Some(host_id), None) => self
2302 .handle_label_put(host_id, message.payload)
2303 .await
2304 .map(Some)
2305 .map(serialize_ctl_response),
2306 (Some("registry"), Some("put"), None, None) => self
2308 .handle_registries_put(message.payload)
2309 .await
2310 .map(Some)
2311 .map(serialize_ctl_response),
2312 (Some("config"), Some("get"), Some(config_name), None) => self
2314 .handle_config_get(config_name)
2315 .await
2316 .map(|bytes| Some(Ok(bytes))),
2317 (Some("config"), Some("put"), Some(config_name), None) => self
2318 .handle_config_put(config_name, message.payload)
2319 .await
2320 .map(Some)
2321 .map(serialize_ctl_response),
2322 (Some("config"), Some("del"), Some(config_name), None) => self
2323 .handle_config_delete(config_name)
2324 .await
2325 .map(Some)
2326 .map(serialize_ctl_response),
2327 _ => {
2329 warn!(%subject, "received control interface request on unsupported subject");
2330 Ok(serialize_ctl_response(Some(CtlResponse::error(
2331 "unsupported subject",
2332 ))))
2333 }
2334 };
2335
2336 if let Err(err) = &ctl_response {
2337 error!(%subject, ?err, "failed to handle control interface request");
2338 } else {
2339 trace!(%subject, "handled control interface request");
2340 }
2341
2342 if let Some(reply) = message.reply {
2343 let headers = injector_to_headers(&TraceContextInjector::default_with_span());
2344
2345 let payload: Option<Bytes> = match ctl_response {
2346 Ok(Some(Ok(payload))) => Some(payload.into()),
2347 Ok(None) => None,
2349 Err(e) => Some(
2350 serde_json::to_vec(&CtlResponse::error(&e.to_string()))
2351 .context("failed to encode control interface response")
2352 .unwrap_or_else(|_| format!(r#"{{"success":false,"error":"{e}"}}"#).into())
2355 .into(),
2356 ),
2357 Ok(Some(Err(e))) => Some(
2360 serde_json::to_vec(&CtlResponse::error(&e.to_string()))
2361 .context("failed to encode control interface response")
2362 .unwrap_or_else(|_| format!(r#"{{"success":false,"error":"{e}"}}"#).into())
2363 .into(),
2364 ),
2365 };
2366
2367 if let Some(payload) = payload {
2368 let max_payload = self.ctl_nats.server_info().max_payload;
2369 if payload.len() > max_payload {
2370 warn!(
2371 size = payload.len(),
2372 max_size = max_payload,
2373 "ctl response payload is too large to publish and may fail",
2374 );
2375 }
2376 if let Err(err) = self
2377 .ctl_nats
2378 .publish_with_headers(reply.clone(), headers, payload)
2379 .err_into::<anyhow::Error>()
2380 .and_then(|()| self.ctl_nats.flush().err_into::<anyhow::Error>())
2381 .await
2382 {
2383 error!(%subject, ?err, "failed to publish reply to control interface request");
2384 }
2385 }
2386 }
2387 }
2388
2389 async fn put_backwards_compat_provider_link(&self, link: &Link) -> anyhow::Result<()> {
2396 let source_config_contains_secret = link
2399 .source_config()
2400 .iter()
2401 .any(|c| c.starts_with(SECRET_PREFIX));
2402 let target_config_contains_secret = link
2403 .target_config()
2404 .iter()
2405 .any(|c| c.starts_with(SECRET_PREFIX));
2406 if source_config_contains_secret || target_config_contains_secret {
2407 debug!("link contains secrets and is not backwards compatible, skipping");
2408 return Ok(());
2409 }
2410 let provider_link = self
2411 .resolve_link_config(link.clone(), None, None, &XKey::new())
2412 .await
2413 .context("failed to resolve link config")?;
2414 let lattice = &self.host_config.lattice;
2415 let payload: Bytes = serde_json::to_vec(&provider_link)
2416 .context("failed to serialize provider link definition")?
2417 .into();
2418
2419 if let Err(e) = self
2420 .rpc_nats
2421 .publish_with_headers(
2422 format!("wasmbus.rpc.{lattice}.{}.linkdefs.put", link.source_id()),
2423 injector_to_headers(&TraceContextInjector::default_with_span()),
2424 payload.clone(),
2425 )
2426 .await
2427 {
2428 warn!(
2429 ?e,
2430 "failed to publish backwards-compatible provider link to source"
2431 );
2432 }
2433
2434 if let Err(e) = self
2435 .rpc_nats
2436 .publish_with_headers(
2437 format!("wasmbus.rpc.{lattice}.{}.linkdefs.put", link.target()),
2438 injector_to_headers(&TraceContextInjector::default_with_span()),
2439 payload,
2440 )
2441 .await
2442 {
2443 warn!(
2444 ?e,
2445 "failed to publish backwards-compatible provider link to target"
2446 );
2447 }
2448
2449 Ok(())
2450 }
2451
2452 #[instrument(level = "debug", skip_all)]
2454 async fn put_provider_link(&self, provider: &Provider, link: &Link) -> anyhow::Result<()> {
2455 let provider_link = self
2456 .resolve_link_config(
2457 link.clone(),
2458 provider.claims_token.as_ref().map(|t| &t.jwt),
2459 provider.annotations.get("wasmcloud.dev/appspec"),
2460 &provider.xkey,
2461 )
2462 .await
2463 .context("failed to resolve link config and secrets")?;
2464 let lattice = &self.host_config.lattice;
2465 let payload: Bytes = serde_json::to_vec(&provider_link)
2466 .context("failed to serialize provider link definition")?
2467 .into();
2468
2469 self.rpc_nats
2470 .publish_with_headers(
2471 format!(
2472 "wasmbus.rpc.{lattice}.{}.linkdefs.put",
2473 provider.xkey.public_key()
2474 ),
2475 injector_to_headers(&TraceContextInjector::default_with_span()),
2476 payload.clone(),
2477 )
2478 .await
2479 .context("failed to publish provider link definition put")
2480 }
2481
2482 #[instrument(level = "debug", skip(self))]
2487 async fn del_provider_link(&self, link: &Link) -> anyhow::Result<()> {
2488 let lattice = &self.host_config.lattice;
2489 let link = wasmcloud_core::InterfaceLinkDefinition {
2491 source_id: link.source_id().to_string(),
2492 target: link.target().to_string(),
2493 wit_namespace: link.wit_namespace().to_string(),
2494 wit_package: link.wit_package().to_string(),
2495 name: link.name().to_string(),
2496 interfaces: link.interfaces().clone(),
2497 ..Default::default()
2499 };
2500 let source_id = &link.source_id;
2501 let target = &link.target;
2502 let payload: Bytes = serde_json::to_vec(&link)
2503 .context("failed to serialize provider link definition for deletion")?
2504 .into();
2505
2506 let (source_result, target_result) = futures::future::join(
2507 self.rpc_nats.publish_with_headers(
2508 format!("wasmbus.rpc.{lattice}.{source_id}.linkdefs.del"),
2509 injector_to_headers(&TraceContextInjector::default_with_span()),
2510 payload.clone(),
2511 ),
2512 self.rpc_nats.publish_with_headers(
2513 format!("wasmbus.rpc.{lattice}.{target}.linkdefs.del"),
2514 injector_to_headers(&TraceContextInjector::default_with_span()),
2515 payload,
2516 ),
2517 )
2518 .await;
2519
2520 source_result
2521 .and(target_result)
2522 .context("failed to publish provider link definition delete")
2523 }
2524
2525 async fn fetch_config_and_secrets(
2526 &self,
2527 config_names: &[String],
2528 entity_jwt: Option<&String>,
2529 application: Option<&String>,
2530 ) -> anyhow::Result<(ConfigBundle, HashMap<String, SecretBox<SecretValue>>)> {
2531 let (secret_names, config_names) = config_names
2532 .iter()
2533 .map(|s| s.to_string())
2534 .partition(|name| name.starts_with(SECRET_PREFIX));
2535
2536 let config = self
2537 .config_generator
2538 .generate(config_names)
2539 .await
2540 .context("Unable to fetch requested config")?;
2541
2542 let secrets = self
2543 .secrets_manager
2544 .fetch_secrets(secret_names, entity_jwt, &self.host_token.jwt, application)
2545 .await
2546 .context("Unable to fetch requested secrets")?;
2547
2548 Ok((config, secrets))
2549 }
2550
2551 async fn validate_config<I>(&self, config_names: I) -> anyhow::Result<()>
2556 where
2557 I: IntoIterator<Item: AsRef<str>>,
2558 {
2559 let config_store = self.config_data.clone();
2560 let validation_errors =
2561 futures::future::join_all(config_names.into_iter().map(|config_name| {
2562 let config_store = config_store.clone();
2563 let config_name = config_name.as_ref().to_string();
2564 async move {
2565 match config_store.get(&config_name).await {
2566 Ok(Some(_)) => None,
2567 Ok(None) if config_name.starts_with(SECRET_PREFIX) => Some(format!(
2568 "Secret reference {config_name} not found in config store"
2569 )),
2570 Ok(None) => Some(format!(
2571 "Configuration {config_name} not found in config store"
2572 )),
2573 Err(e) => Some(e.to_string()),
2574 }
2575 }
2576 }))
2577 .await;
2578
2579 let validation_errors = validation_errors
2582 .into_iter()
2583 .flatten()
2584 .fold(String::new(), |acc, e| acc + &e + ". ");
2585 if !validation_errors.is_empty() {
2586 bail!(format!(
2587 "Failed to validate configuration and secrets. {validation_errors}",
2588 ));
2589 }
2590
2591 Ok(())
2592 }
2593
2594 async fn resolve_link_config(
2597 &self,
2598 link: Link,
2599 provider_jwt: Option<&String>,
2600 application: Option<&String>,
2601 provider_xkey: &XKey,
2602 ) -> anyhow::Result<wasmcloud_core::InterfaceLinkDefinition> {
2603 let (source_bundle, raw_source_secrets) = self
2604 .fetch_config_and_secrets(link.source_config().as_slice(), provider_jwt, application)
2605 .await?;
2606 let (target_bundle, raw_target_secrets) = self
2607 .fetch_config_and_secrets(link.target_config().as_slice(), provider_jwt, application)
2608 .await?;
2609
2610 let source_config = source_bundle.get_config().await;
2611 let target_config = target_bundle.get_config().await;
2612 use secrecy::ExposeSecret;
2615 let source_secrets_map: HashMap<String, wasmcloud_core::secrets::SecretValue> =
2616 raw_source_secrets
2617 .iter()
2618 .map(|(k, v)| match v.expose_secret() {
2619 SecretValue::String(s) => (
2620 k.clone(),
2621 wasmcloud_core::secrets::SecretValue::String(s.to_owned()),
2622 ),
2623 SecretValue::Bytes(b) => (
2624 k.clone(),
2625 wasmcloud_core::secrets::SecretValue::Bytes(b.to_owned()),
2626 ),
2627 })
2628 .collect();
2629 let target_secrets_map: HashMap<String, wasmcloud_core::secrets::SecretValue> =
2630 raw_target_secrets
2631 .iter()
2632 .map(|(k, v)| match v.expose_secret() {
2633 SecretValue::String(s) => (
2634 k.clone(),
2635 wasmcloud_core::secrets::SecretValue::String(s.to_owned()),
2636 ),
2637 SecretValue::Bytes(b) => (
2638 k.clone(),
2639 wasmcloud_core::secrets::SecretValue::Bytes(b.to_owned()),
2640 ),
2641 })
2642 .collect();
2643 let source_secrets = if source_secrets_map.is_empty() {
2647 None
2648 } else {
2649 Some(
2650 serde_json::to_vec(&source_secrets_map)
2651 .map(|secrets| self.secrets_xkey.seal(&secrets, provider_xkey))
2652 .context("failed to serialize and encrypt source secrets")??,
2653 )
2654 };
2655 let target_secrets = if target_secrets_map.is_empty() {
2656 None
2657 } else {
2658 Some(
2659 serde_json::to_vec(&target_secrets_map)
2660 .map(|secrets| self.secrets_xkey.seal(&secrets, provider_xkey))
2661 .context("failed to serialize and encrypt target secrets")??,
2662 )
2663 };
2664
2665 Ok(wasmcloud_core::InterfaceLinkDefinition {
2666 source_id: link.source_id().to_string(),
2667 target: link.target().to_string(),
2668 name: link.name().to_string(),
2669 wit_namespace: link.wit_namespace().to_string(),
2670 wit_package: link.wit_package().to_string(),
2671 interfaces: link.interfaces().clone(),
2672 source_config: source_config.clone(),
2673 target_config: target_config.clone(),
2674 source_secrets,
2675 target_secrets,
2676 })
2677 }
2678}
2679
2680fn component_import_links(links: &[Link]) -> HashMap<Box<str>, HashMap<Box<str>, Box<str>>> {
2689 let mut m = HashMap::new();
2690 for link in links {
2691 let instances: &mut HashMap<Box<str>, Box<str>> = m
2692 .entry(link.name().to_string().into_boxed_str())
2693 .or_default();
2694 for interface in link.interfaces() {
2695 instances.insert(
2696 format!(
2697 "{}:{}/{interface}",
2698 link.wit_namespace(),
2699 link.wit_package(),
2700 )
2701 .into_boxed_str(),
2702 link.target().to_string().into_boxed_str(),
2703 );
2704 }
2705 }
2706 m
2707}
2708
2709fn serialize_ctl_response<T: Serialize>(
2711 ctl_response: Option<CtlResponse<T>>,
2712) -> Option<anyhow::Result<Vec<u8>>> {
2713 ctl_response.map(|resp| serde_json::to_vec(&resp).map_err(anyhow::Error::from))
2714}
2715
2716fn human_friendly_uptime(uptime: Duration) -> String {
2717 humantime::format_duration(
2719 uptime.saturating_sub(Duration::from_nanos(uptime.subsec_nanos().into())),
2720 )
2721 .to_string()
2722}
2723
2724fn injector_to_headers(injector: &TraceContextInjector) -> async_nats::header::HeaderMap {
2725 injector
2726 .iter()
2727 .filter_map(|(k, v)| {
2728 let name = async_nats::header::HeaderName::from_str(k.as_str()).ok()?;
2730 let value = async_nats::header::HeaderValue::from_str(v.as_str()).ok()?;
2731 Some((name, value))
2732 })
2733 .collect()
2734}
2735
2736#[cfg(test)]
2737mod test {
2738 #[test]
2740 fn can_compute_component_links() {
2741 use std::collections::HashMap;
2742 use wasmcloud_control_interface::Link;
2743
2744 let links = vec![
2745 Link::builder()
2746 .source_id("source_component")
2747 .target("kv-redis")
2748 .wit_namespace("wasi")
2749 .wit_package("keyvalue")
2750 .interfaces(vec!["atomics".into(), "store".into()])
2751 .name("default")
2752 .build()
2753 .expect("failed to build link"),
2754 Link::builder()
2755 .source_id("source_component")
2756 .target("kv-vault")
2757 .wit_namespace("wasi")
2758 .wit_package("keyvalue")
2759 .interfaces(vec!["atomics".into(), "store".into()])
2760 .name("secret")
2761 .source_config(vec![])
2762 .target_config(vec!["my-secret".into()])
2763 .build()
2764 .expect("failed to build link"),
2765 Link::builder()
2766 .source_id("source_component")
2767 .target("kv-vault-offsite")
2768 .wit_namespace("wasi")
2769 .wit_package("keyvalue")
2770 .interfaces(vec!["atomics".into()])
2771 .name("secret")
2772 .source_config(vec![])
2773 .target_config(vec!["my-secret".into()])
2774 .build()
2775 .expect("failed to build link"),
2776 Link::builder()
2777 .source_id("http")
2778 .target("source_component")
2779 .wit_namespace("wasi")
2780 .wit_package("http")
2781 .interfaces(vec!["incoming-handler".into()])
2782 .name("default")
2783 .source_config(vec!["some-port".into()])
2784 .target_config(vec![])
2785 .build()
2786 .expect("failed to build link"),
2787 Link::builder()
2788 .source_id("source_component")
2789 .target("httpclient")
2790 .wit_namespace("wasi")
2791 .wit_package("http")
2792 .interfaces(vec!["outgoing-handler".into()])
2793 .name("default")
2794 .source_config(vec![])
2795 .target_config(vec!["some-port".into()])
2796 .build()
2797 .expect("failed to build link"),
2798 Link::builder()
2799 .source_id("source_component")
2800 .target("other_component")
2801 .wit_namespace("custom")
2802 .wit_package("foo")
2803 .interfaces(vec!["bar".into(), "baz".into()])
2804 .name("default")
2805 .source_config(vec![])
2806 .target_config(vec![])
2807 .build()
2808 .expect("failed to build link"),
2809 Link::builder()
2810 .source_id("other_component")
2811 .target("target")
2812 .wit_namespace("wit")
2813 .wit_package("package")
2814 .interfaces(vec!["interface3".into()])
2815 .name("link2")
2816 .source_config(vec![])
2817 .target_config(vec![])
2818 .build()
2819 .expect("failed to build link"),
2820 ];
2821
2822 let links_map = super::component_import_links(&links);
2823
2824 let expected_result = HashMap::from_iter([
2852 (
2853 "default".into(),
2854 HashMap::from([
2855 ("wasi:keyvalue/atomics".into(), "kv-redis".into()),
2856 ("wasi:keyvalue/store".into(), "kv-redis".into()),
2857 (
2858 "wasi:http/incoming-handler".into(),
2859 "source_component".into(),
2860 ),
2861 ("wasi:http/outgoing-handler".into(), "httpclient".into()),
2862 ("custom:foo/bar".into(), "other_component".into()),
2863 ("custom:foo/baz".into(), "other_component".into()),
2864 ]),
2865 ),
2866 (
2867 "secret".into(),
2868 HashMap::from([
2869 ("wasi:keyvalue/atomics".into(), "kv-vault-offsite".into()),
2870 ("wasi:keyvalue/store".into(), "kv-vault".into()),
2871 ]),
2872 ),
2873 (
2874 "link2".into(),
2875 HashMap::from([("wit:package/interface3".into(), "target".into())]),
2876 ),
2877 ]);
2878
2879 assert_eq!(links_map, expected_result);
2880 }
2881}