wasmcloud_runtime/component/
mod.rs

1use core::fmt::{self, Debug};
2use core::future::Future;
3use core::ops::{Bound, Deref};
4use core::pin::Pin;
5use core::time::Duration;
6
7use anyhow::Result;
8use anyhow::{ensure, Context as _};
9use futures::{Stream, TryStreamExt as _};
10use serde::{Deserialize, Serialize};
11use std::collections::{BTreeMap, HashMap};
12use std::sync::Arc;
13use tokio::io::{AsyncRead, AsyncReadExt as _};
14use tokio::sync::mpsc;
15use tracing::{debug, info_span, instrument, warn, Instrument as _, Span};
16use tracing_opentelemetry::OpenTelemetrySpanExt;
17use wascap::jwt;
18use wascap::wasm::extract_claims;
19use wasi_preview1_component_adapter_provider::{
20    WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME, WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER,
21};
22use wasmtime::component::{
23    types, HasSelf, Linker, ResourceTable, ResourceTableError, ResourceType,
24};
25use wasmtime::InstanceAllocationStrategy;
26use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
27use wasmtime_wasi_http::WasiHttpCtx;
28use wrpc_runtime_wasmtime::{
29    collect_component_resource_exports, collect_component_resource_imports, link_item, rpc,
30    RemoteResource, ServeExt as _, SharedResourceTable, WrpcCtxView, WrpcView,
31};
32
33use crate::capability::{self, wrpc};
34use crate::experimental::Features;
35use crate::Runtime;
36
37pub use bus::{Bus, Error};
38pub use bus1_0_0::Bus as Bus1_0_0;
39pub use config::Config;
40pub use identity::Identity;
41pub use logging::Logging;
42pub use messaging::v0_2::Messaging as Messaging0_2;
43pub use messaging::v0_3::{
44    Client as MessagingClient0_3, GuestMessage as MessagingGuestMessage0_3,
45    HostMessage as MessagingHostMessage0_3, Messaging as Messaging0_3,
46};
47pub use secrets::Secrets;
48
49pub(crate) mod blobstore;
50mod bus;
51mod bus1_0_0;
52mod config;
53mod http;
54mod identity;
55mod keyvalue;
56mod logging;
57pub(crate) mod messaging;
58mod secrets;
59
60/// Instance target, which is replaced in wRPC
61///
62/// This enum represents the original instance import invoked by the component
63#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
64pub enum ReplacedInstanceTarget {
65    /// `wasi:blobstore/blobstore` instance replacement
66    BlobstoreBlobstore,
67    /// `wasi:blobstore/container` instance replacement
68    BlobstoreContainer,
69    /// `wasi:keyvalue/atomic` instance replacement
70    KeyvalueAtomics,
71    /// `wasi:keyvalue/store` instance replacement
72    KeyvalueStore,
73    /// `wasi:keyvalue/batch` instance replacement
74    KeyvalueBatch,
75    /// `wasi:keyvalue/watch` instance replacment
76    KeyvalueWatch,
77    /// `wasi:http/incoming-handler` instance replacement
78    HttpIncomingHandler,
79    /// `wasi:http/outgoing-handler` instance replacement
80    HttpOutgoingHandler,
81}
82
83fn is_0_2(version: &str, min_patch: u64) -> bool {
84    if let Ok(semver::Version {
85        major,
86        minor,
87        patch,
88        pre,
89        build,
90    }) = version.parse()
91    {
92        major == 0 && minor == 2 && patch >= min_patch && pre.is_empty() && build.is_empty()
93    } else {
94        false
95    }
96}
97
98/// This represents a kind of wRPC invocation error
99pub enum InvocationErrorKind {
100    /// This occurs when the endpoint is not found, for example as would happen when the runtime
101    /// would attempt to call `foo:bar/baz@0.2.0`, but the peer served `foo:bar/baz@0.1.0`.
102    NotFound,
103
104    /// An error kind, which will result in a trap in the component
105    Trap,
106}
107
108/// Implementations of this trait are able to introspect an error returned by wRPC invocations
109pub trait InvocationErrorIntrospect {
110    /// Classify [`InvocationErrorKind`] of an error returned by wRPC
111    fn invocation_error_kind(&self, err: &anyhow::Error) -> InvocationErrorKind;
112}
113
114/// [`MinimalHandler`] represents the base traits that a handler must implement to be used with
115/// `wasmcloud_runtime`. It is a subset of the [`Handler`] trait, which includes many more traits
116/// for capability implementations when running in wasmCloud.
117pub trait MinimalHandler: Send + Sync + Clone + 'static {}
118impl<T: Send + Sync + Clone + 'static> MinimalHandler for T {}
119
120/// A collection of traits that the host must implement
121pub trait Handler:
122    wrpc_transport::Invoke<Context = Option<ReplacedInstanceTarget>>
123    + Bus
124    + Config
125    + Logging
126    + Secrets
127    + Messaging0_2
128    + Messaging0_3
129    + Identity
130    + InvocationErrorIntrospect
131    + Send
132    + Sync
133    + Clone
134    + 'static
135{
136}
137
138impl<
139        T: wrpc_transport::Invoke<Context = Option<ReplacedInstanceTarget>>
140            + Bus
141            + Config
142            + Logging
143            + Secrets
144            + Messaging0_2
145            + Messaging0_3
146            + Identity
147            + InvocationErrorIntrospect
148            + Send
149            + Sync
150            + Clone
151            + 'static,
152    > Handler for T
153{
154}
155
156/// Component instance configuration
157#[derive(Clone, Debug, Default)]
158pub struct ComponentConfig {
159    /// Whether components are required to be signed to be executed
160    pub require_signature: bool,
161}
162
163/// Component Environment Limits for more intricate resource control
164#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Copy)]
165pub struct Limits {
166    /// Maximum memory allocation in bytes. None defaults to host runtime limits
167    pub max_memory_limit: Option<usize>,
168    /// Maximum execution time in seconds. None defaults to host runtime limits.
169    pub max_execution_time: Option<u64>,
170}
171impl Limits {
172    /// Converts limits to a string-based key-value map for serialization.
173    ///
174    /// Only includes fields that have values (non-None). Used for converting
175    /// to external configuration formats.
176    ///
177    /// # Returns
178    /// `HashMap` containing string representations of set limits
179    #[must_use]
180    pub fn to_string_map(&self) -> HashMap<String, String> {
181        let mut map = HashMap::new();
182
183        if let Some(memory_limit) = self.max_memory_limit {
184            map.insert("max_memory_limit".to_string(), memory_limit.to_string());
185        }
186
187        if let Some(execution_time) = self.max_execution_time {
188            map.insert("max_execution_time".to_string(), execution_time.to_string());
189        }
190
191        map
192    }
193}
194
195/// Creates limits from a string-based key-value map.
196///
197/// Parses numeric values from string map, ignoring invalid entries.
198/// Missing or unparseable values default to None (no limit).
199///
200/// # Arguments
201/// * `map` - Optional `HashMap` containing string representations of limits
202///
203/// # Returns
204/// Optional Limits struct with parsed values, or None if input is None
205#[must_use]
206pub fn from_string_map<S: std::hash::BuildHasher>(
207    map: Option<&HashMap<String, String, S>>,
208) -> Option<Limits> {
209    map.map(|map| Limits {
210        max_memory_limit: map.get("max_memory_limit").and_then(|s| s.parse().ok()),
211
212        max_execution_time: map.get("max_execution_time").and_then(|s| s.parse().ok()),
213    })
214}
215/// Extracts and validates claims contained within a WebAssembly binary, if present
216///
217/// # Arguments
218///
219/// * `wasm` - Bytes that constitute a valid WebAssembly binary
220///
221/// # Errors
222///
223/// Fails if either parsing fails, or claims are not valid
224///
225/// # Returns
226/// The token embedded in the component, including the [`jwt::Claims`] and the raw JWT
227pub fn claims_token(wasm: impl AsRef<[u8]>) -> anyhow::Result<Option<jwt::Token<jwt::Component>>> {
228    let Some(claims) = extract_claims(wasm).context("failed to extract module claims")? else {
229        return Ok(None);
230    };
231    let v = jwt::validate_token::<jwt::Component>(&claims.jwt)
232        .context("failed to validate module token")?;
233    ensure!(!v.expired, "token expired at `{}`", v.expires_human);
234    ensure!(
235        !v.cannot_use_yet,
236        "token cannot be used before `{}`",
237        v.not_before_human
238    );
239    ensure!(v.signature_valid, "signature is not valid");
240    Ok(Some(claims))
241}
242
243/// Pre-compiled component [Component], which is cheapily-[Cloneable](Clone)
244#[derive(Clone)]
245pub struct Component<H>
246where
247    H: MinimalHandler,
248{
249    engine: wasmtime::Engine,
250    claims: Option<jwt::Claims<jwt::Component>>,
251    instance_pre: wasmtime::component::InstancePre<Ctx<H>>,
252    host_resources: Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
253    max_execution_time: Duration,
254    experimental_features: Features,
255    max_memory_limit: usize,
256}
257
258/// The [`CustomCtxComponent`] is similar to [`Component`], but it supports passing a custom context that
259/// implements the [`BaseCtx`] trait. This is useful for when you want to extend the context with additional
260/// host implementations or custom functionality.
261#[derive(Clone)]
262pub struct CustomCtxComponent<C>
263where
264    C: BaseCtx,
265{
266    engine: wasmtime::Engine,
267    instance_pre: wasmtime::component::InstancePre<C>,
268    #[allow(unused)]
269    host_resources: Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
270    max_execution_time: Duration,
271}
272
273impl<C> CustomCtxComponent<C>
274where
275    C: BaseCtx,
276{
277    /// Compiles a WebAssembly component using [Runtime]. Modules are not supported with this function.
278    ///
279    /// It is expected that the `linker_fn` is used to link any and all imports to the component. If
280    /// not, then the instantiation of the instance will fail.
281    #[instrument(level = "trace", skip_all)]
282    pub fn new_with_linker_minimal(
283        rt: &Runtime,
284        wasm: &[u8],
285        linker_fn: impl FnOnce(&mut Linker<C>, &wasmtime::component::Component) -> anyhow::Result<()>,
286    ) -> anyhow::Result<Self> {
287        if wasmparser::Parser::is_core_wasm(wasm) {
288            anyhow::bail!("core modules are not supported in the minimal linker");
289        }
290        let engine = rt.engine.clone();
291        let component = wasmtime::component::Component::new(&engine, wasm)
292            .context("failed to compile component")?;
293
294        let mut linker = Linker::new(&engine);
295
296        let ty = component.component_type();
297        let mut guest_resources = Vec::new();
298        let mut host_resources = BTreeMap::new();
299        collect_component_resource_exports(&engine, &ty, &mut guest_resources);
300        collect_component_resource_imports(&engine, &ty, &mut host_resources);
301
302        let host_resources = host_resources
303            .into_iter()
304            .map(|(name, instance)| {
305                let instance = instance
306                    .into_iter()
307                    .map(|(name, ty)| (name, (ty, ResourceType::host::<RemoteResource>())))
308                    .collect::<HashMap<_, _>>();
309                (name, instance)
310            })
311            .collect::<HashMap<_, _>>();
312        let host_resources = Arc::from(host_resources);
313
314        if !guest_resources.is_empty() {
315            debug!("guest resources present, must be fulfilled by the linker");
316        }
317
318        linker_fn(&mut linker, &component)?;
319
320        let instance_pre = linker
321            .instantiate_pre(&component)
322            .context("failed to pre-instantiate component")?;
323
324        Ok(CustomCtxComponent {
325            engine,
326            instance_pre,
327            host_resources,
328            max_execution_time: rt.max_execution_time,
329        })
330    }
331
332    /// Creates a new component store for instantiation
333    pub fn new_store(&self, ctx: C) -> wasmtime::Store<C> {
334        let mut store = wasmtime::Store::new(&self.engine, ctx);
335        store.set_epoch_deadline(self.max_execution_time.as_secs());
336        store
337    }
338
339    /// Returns the precompiled component instance
340    #[must_use]
341    pub fn instance_pre(&self) -> &wasmtime::component::InstancePre<C> {
342        &self.instance_pre
343    }
344}
345
346impl<H> Debug for Component<H>
347where
348    H: MinimalHandler,
349{
350    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
351        f.debug_struct("Component")
352            .field("claims", &self.claims)
353            .field("runtime", &"wasmtime")
354            .field("max_execution_time", &self.max_execution_time)
355            .finish_non_exhaustive()
356    }
357}
358
359fn new_store<H: Handler>(
360    engine: &wasmtime::Engine,
361    handler: H,
362    max_execution_time: Duration,
363) -> wasmtime::Store<Ctx<H>> {
364    let table = ResourceTable::new();
365    let wasi = WasiCtxBuilder::new()
366        .args(&["main.wasm"]) // TODO: Configure argv[0]
367        .inherit_stderr()
368        .build();
369
370    let mut store = wasmtime::Store::new(
371        engine,
372        Ctx {
373            handler: handler.clone(),
374            wasi,
375            http: WasiHttpCtx::new(),
376            table,
377            wrpc: WrpcCtx {
378                handler,
379                shared_resources: SharedResourceTable::default(),
380                timeout: max_execution_time,
381            },
382            parent_context: None,
383        },
384    );
385    store.set_epoch_deadline(max_execution_time.as_secs());
386    store
387}
388
389/// Events sent by [`Component::serve_wrpc`]
390#[derive(Clone, Debug)]
391pub enum WrpcServeEvent<C> {
392    /// `wasi:http/incoming-handler.handle` return event
393    HttpIncomingHandlerHandleReturned {
394        /// Invocation context
395        context: C,
396        /// Whether the invocation was successfully handled
397        success: bool,
398    },
399    /// `wasmcloud:messaging/handler.handle-message` return event
400    MessagingHandlerHandleMessageReturned {
401        /// Invocation context
402        context: C,
403        /// Whether the invocation was successfully handled
404        success: bool,
405    },
406    /// dynamic export return event
407    DynamicExportReturned {
408        /// Invocation context
409        context: C,
410        /// Whether the invocation was successfully handled
411        success: bool,
412    },
413}
414
415/// This represents a [Stream] of incoming invocations.
416/// Each item represents processing of a single invocation.
417pub type InvocationStream = Pin<
418    Box<
419        dyn Stream<
420                Item = anyhow::Result<
421                    Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'static>>,
422                >,
423            > + Send
424            + 'static,
425    >,
426>;
427
428impl<H> Component<H>
429where
430    H: MinimalHandler,
431{
432    /// Compiles a WebAssembly component using [Runtime]. Modules are not supported with this function
433    ///
434    /// If you need to pass a custom context, use [CustomCtxComponent::new_with_linker_minimal].
435    ///
436    /// It is expected that the `linker_fn` is used to link any and all imports to the component. If
437    /// not, then the instantiation of the instance will fail.
438    #[instrument(level = "trace", skip_all)]
439    pub fn new_with_linker_minimal(
440        rt: &Runtime,
441        wasm: &[u8],
442        linker_fn: impl FnOnce(&mut Linker<Ctx<H>>) -> anyhow::Result<()>,
443    ) -> anyhow::Result<Self> {
444        if wasmparser::Parser::is_core_wasm(wasm) {
445            anyhow::bail!("core modules are not supported in the minimal linker");
446        }
447        let engine = rt.engine.clone();
448        let claims = None;
449        let component = wasmtime::component::Component::new(&engine, wasm)
450            .context("failed to compile component")?;
451
452        let mut linker = Linker::new(&engine);
453        linker_fn(&mut linker)?;
454
455        let ty = component.component_type();
456        let mut guest_resources = Vec::new();
457        let mut host_resources = BTreeMap::new();
458        collect_component_resource_exports(&engine, &ty, &mut guest_resources);
459        collect_component_resource_imports(&engine, &ty, &mut host_resources);
460
461        let host_resources = host_resources
462            .into_iter()
463            .map(|(name, instance)| {
464                let instance = instance
465                    .into_iter()
466                    .map(|(name, ty)| (name, (ty, ResourceType::host::<RemoteResource>())))
467                    .collect::<HashMap<_, _>>();
468                (name, instance)
469            })
470            .collect::<HashMap<_, _>>();
471        let host_resources = Arc::from(host_resources);
472
473        if !guest_resources.is_empty() {
474            debug!("guest resources present, must be fulfilled by the linker");
475        }
476
477        let instance_pre = linker
478            .instantiate_pre(&component)
479            .context("failed to pre-instantiate component")?;
480
481        Ok(Component {
482            engine,
483            claims,
484            instance_pre,
485            host_resources,
486            max_execution_time: rt.max_execution_time,
487            experimental_features: rt.experimental_features,
488            max_memory_limit: rt.max_linear_memory,
489        })
490    }
491}
492
493impl<H> Component<H>
494where
495    H: Handler,
496{
497    /// Extracts [Claims](jwt::Claims) from WebAssembly component and compiles it using [Runtime].
498    ///
499    /// If `wasm` represents a core Wasm module, then it will first be turned into a component.
500    #[instrument(level = "trace", skip_all)]
501    pub fn new(rt: &Runtime, wasm: &[u8], limits: Option<Limits>) -> anyhow::Result<Self> {
502        Self::new_with_linker(rt, wasm, limits, |linker| {
503            wasmtime_wasi::p2::add_to_linker_async(linker)
504                .context("failed to link core WASI interfaces")?;
505            wasmtime_wasi_http::add_only_http_to_linker_async(linker)
506                .context("failed to link `wasi:http`")?;
507
508            capability::blobstore::blobstore::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| {
509                ctx
510            })
511            .context("failed to link `wasi:blobstore/blobstore`")?;
512            capability::blobstore::container::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| {
513                ctx
514            })
515            .context("failed to link `wasi:blobstore/container`")?;
516            capability::blobstore::types::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| ctx)
517                .context("failed to link `wasi:blobstore/types`")?;
518            capability::config::runtime::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| ctx)
519                .context("failed to link `wasi:config/runtime`")?;
520            capability::config::store::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| ctx)
521                .context("failed to link `wasi:config/store`")?;
522            capability::keyvalue::atomics::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| ctx)
523                .context("failed to link `wasi:keyvalue/atomics`")?;
524            capability::keyvalue::store::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| ctx)
525                .context("failed to link `wasi:keyvalue/store`")?;
526            capability::keyvalue::batch::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| ctx)
527                .context("failed to link `wasi:keyvalue/batch`")?;
528            capability::logging::logging::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| ctx)
529                .context("failed to link `wasi:logging/logging`")?;
530            capability::unversioned_logging::logging::add_to_linker::<_, HasSelf<Ctx<H>>>(
531                linker,
532                |ctx| ctx,
533            )
534            .context("failed to link unversioned `wasi:logging/logging`")?;
535
536            capability::bus1_0_0::lattice::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| ctx)
537                .context("failed to link `wasmcloud:bus/lattice@1.0.0`")?;
538            capability::bus2_0_1::lattice::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| ctx)
539                .context("failed to link `wasmcloud:bus/lattice@2.0.1`")?;
540            capability::bus2_0_1::error::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| ctx)
541                .context("failed to link `wasmcloud:bus/error@2.0.1`")?;
542            capability::messaging0_2_0::types::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| {
543                ctx
544            })
545            .context("failed to link `wasmcloud:messaging/types@0.2.0`")?;
546            capability::messaging0_2_0::consumer::add_to_linker::<_, HasSelf<Ctx<H>>>(
547                linker,
548                |ctx| ctx,
549            )
550            .context("failed to link `wasmcloud:messaging/consumer@0.2.0`")?;
551            capability::secrets::reveal::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| ctx)
552                .context("failed to link `wasmcloud:secrets/reveal`")?;
553            capability::secrets::store::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| ctx)
554                .context("failed to link `wasmcloud:secrets/store`")?;
555            // Only link wasmcloud:messaging@v3 if the feature is enabled
556            if rt.experimental_features.wasmcloud_messaging_v3 {
557                capability::messaging0_3_0::types::add_to_linker::<_, HasSelf<Ctx<H>>>(
558                    linker,
559                    |ctx| ctx,
560                )
561                .context("failed to link `wasmcloud:messaging/types@0.3.0`")?;
562                capability::messaging0_3_0::producer::add_to_linker::<_, HasSelf<Ctx<H>>>(
563                    linker,
564                    |ctx| ctx,
565                )
566                .context("failed to link `wasmcloud:messaging/producer@0.3.0`")?;
567                capability::messaging0_3_0::request_reply::add_to_linker::<_, HasSelf<Ctx<H>>>(
568                    linker,
569                    |ctx| ctx,
570                )
571                .context("failed to link `wasmcloud:messaging/request-reply@0.3.0`")?;
572            }
573            // Only link wasmcloud:identity if the workload identity feature is enabled
574            if rt.experimental_features.workload_identity_interface {
575                capability::identity::store::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| ctx)
576                    .context("failed to link `wasmcloud:identity/store`")?;
577            }
578
579            // Only link wrpc:rpc if the RPC feature is enabled
580            if rt.experimental_features.rpc_interface {
581                rpc::add_to_linker(linker).context("failed to link `wrpc:rpc`")?;
582            }
583
584            Ok(())
585        })
586    }
587
588    /// Extracts [Claims](jwt::Claims) from WebAssembly component and compiles it using [Runtime].
589    /// The `linker_fn` is used to link additional interfaces to the component.
590    ///
591    /// If `wasm` represents a core Wasm module, then it will first be turned into a component.
592    #[instrument(level = "trace", skip_all)]
593    pub fn new_with_linker(
594        rt: &Runtime,
595        wasm: &[u8],
596        limits: Option<Limits>,
597        linker_fn: impl FnOnce(&mut Linker<Ctx<H>>) -> anyhow::Result<()>,
598    ) -> anyhow::Result<Self> {
599        if wasmparser::Parser::is_core_wasm(wasm) {
600            let wasm = wit_component::ComponentEncoder::default()
601                .module(wasm)
602                .context("failed to set core component module")?
603                .adapter(
604                    WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME,
605                    WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER,
606                )
607                .context("failed to add WASI preview1 adapter")?
608                .encode()
609                .context("failed to encode a component from module")?;
610            return Self::new(rt, &wasm, limits);
611        }
612        let engine: wasmtime::Engine = if let Some(limits) = limits {
613            if limits.max_memory_limit.is_none() {
614                rt.engine.clone()
615            } else {
616                //use engine_config and create separate engine per component and edit the PoolingAllocatorConfig
617                let mut component_pooling_config = rt.pooling_config.clone();
618                component_pooling_config.max_memory_size(
619                    limits
620                        .max_memory_limit
621                        .expect("max_memory_limit should be Some"),
622                );
623
624                let mut component_engine_config = rt.engine_config.clone();
625                component_engine_config.allocation_strategy(InstanceAllocationStrategy::Pooling(
626                    component_pooling_config,
627                ));
628
629                match wasmtime::Engine::new(&component_engine_config)
630                    .context("failed to construct engine")
631                {
632                    Ok(engine) => engine,
633                    Err(e) => {
634                        tracing::warn!(err = %e, "failed to construct engine with pooling allocator, falling back to dynamic allocator which may result in slower startup and execution of components.");
635                        component_engine_config
636                            .allocation_strategy(InstanceAllocationStrategy::OnDemand);
637                        wasmtime::Engine::new(&component_engine_config)
638                            .context("failed to construct engine")?
639                    }
640                }
641            }
642        } else {
643            rt.engine.clone()
644        };
645        let claims_token = claims_token(wasm)?;
646        let claims = claims_token.map(|c| c.claims);
647        let component = wasmtime::component::Component::new(&engine, wasm)
648            .context("failed to compile component")?;
649
650        let mut linker = Linker::new(&engine);
651
652        linker_fn(&mut linker)?;
653
654        let ty = component.component_type();
655        let mut guest_resources = Vec::new();
656        let mut host_resources = BTreeMap::new();
657        collect_component_resource_exports(&engine, &ty, &mut guest_resources);
658        collect_component_resource_imports(&engine, &ty, &mut host_resources);
659        let io_err_tys = host_resources
660            .range::<str, _>((
661                Bound::Included("wasi:io/error@0.2"),
662                Bound::Excluded("wasi:io/error@0.3"),
663            ))
664            .flat_map(|(_, instance)| instance.get("error"))
665            .copied()
666            .collect::<Box<[_]>>();
667        let io_pollable_tys = host_resources
668            .range::<str, _>((
669                Bound::Included("wasi:io/poll@0.2"),
670                Bound::Excluded("wasi:io/poll@0.3"),
671            ))
672            .flat_map(|(_, instance)| instance.get("pollable"))
673            .copied()
674            .collect::<Box<[_]>>();
675        let io_input_stream_tys = host_resources
676            .range::<str, _>((
677                Bound::Included("wasi:io/streams@0.2"),
678                Bound::Excluded("wasi:io/streams@0.3"),
679            ))
680            .flat_map(|(_, instance)| instance.get("input-stream"))
681            .copied()
682            .collect::<Box<[_]>>();
683        let io_output_stream_tys = host_resources
684            .range::<str, _>((
685                Bound::Included("wasi:io/streams@0.2"),
686                Bound::Excluded("wasi:io/streams@0.3"),
687            ))
688            .flat_map(|(_, instance)| instance.get("output-stream"))
689            .copied()
690            .collect::<Box<[_]>>();
691        let rpc_err_ty = host_resources
692            .get("wrpc:rpc/error@0.1.0")
693            .and_then(|instance| instance.get("error"))
694            .copied();
695        // TODO: This should include `wasi:http` resources
696        let host_resources = host_resources
697            .into_iter()
698            .map(|(name, instance)| {
699                let instance = instance
700                    .into_iter()
701                    .map(|(name, ty)| {
702                        let host_ty = match ty {
703                            ty if Some(ty) == rpc_err_ty => ResourceType::host::<rpc::Error>(),
704                            ty if io_err_tys.contains(&ty) => ResourceType::host::<
705                                wasmtime_wasi::p2::bindings::io::error::Error,
706                            >(),
707                            ty if io_input_stream_tys.contains(&ty) => ResourceType::host::<
708                                wasmtime_wasi::p2::bindings::io::streams::InputStream,
709                            >(
710                            ),
711                            ty if io_output_stream_tys.contains(&ty) => ResourceType::host::<
712                                wasmtime_wasi::p2::bindings::io::streams::OutputStream,
713                            >(
714                            ),
715                            ty if io_pollable_tys.contains(&ty) => ResourceType::host::<
716                                wasmtime_wasi::p2::bindings::io::poll::Pollable,
717                            >(),
718                            _ => ResourceType::host::<RemoteResource>(),
719                        };
720                        (name, (ty, host_ty))
721                    })
722                    .collect::<HashMap<_, _>>();
723                (name, instance)
724            })
725            .collect::<HashMap<_, _>>();
726        let host_resources = Arc::from(host_resources);
727        if !guest_resources.is_empty() {
728            warn!("exported component resources are not supported in wasmCloud runtime and will be ignored, use a provider instead to enable this functionality");
729        }
730        for (name, ty) in ty.imports(&engine) {
731            // Don't link builtin instances or feature-gated instances if the feature is disabled
732            match name.split_once('/').map(|(pkg, suffix)| {
733                suffix
734                    .split_once('@')
735                    .map_or((pkg, suffix, None), |(iface, version)| {
736                        (pkg, iface, Some(version))
737                    })
738            }) {
739                Some(
740                    ("wasi:blobstore", "blobstore" | "container" | "types", Some("0.2.0-draft"))
741                    | ("wasi:config", "runtime" | "store", Some("0.2.0-draft"))
742                    | ("wasi:keyvalue", "atomics" | "batch" | "store", Some("0.2.0-draft"))
743                    | ("wasi:logging", "logging", None | Some("0.1.0-draft"))
744                    | ("wasmcloud:bus", "lattice", Some("1.0.0" | "2.0.0"))
745                    | ("wasmcloud:messaging", "consumer" | "types", Some("0.2.0"))
746                    | ("wasmcloud:secrets", "reveal" | "store", Some("0.1.0-draft")),
747                ) => {}
748                Some((
749                    "wasi:cli",
750                    "environment" | "exit" | "stderr" | "stdin" | "stdout" | "terminal-input"
751                    | "terminal-output" | "terminal-stderr" | "terminal-stdin" | "terminal-stdout",
752                    Some(version),
753                )) if is_0_2(version, 0) => {}
754                Some((
755                    "wasi:clocks",
756                    "monotonic-clock" | "wall-clock" | "timezone",
757                    Some(version),
758                )) if is_0_2(version, 0) => {}
759                Some(("wasi:clocks", "timezone", Some(version))) if is_0_2(version, 1) => {}
760                Some(("wasi:filesystem", "preopens" | "types", Some(version)))
761                    if is_0_2(version, 0) => {}
762                Some((
763                    "wasi:http",
764                    "incoming-handler" | "outgoing-handler" | "types",
765                    Some(version),
766                )) if is_0_2(version, 0) => {}
767                Some(("wasi:io", "error" | "poll" | "streams", Some(version)))
768                    if is_0_2(version, 0) => {}
769                Some(("wasi:random", "insecure-seed" | "insecure" | "random", Some(version)))
770                    if is_0_2(version, 0) => {}
771                Some((
772                    "wasi:sockets",
773                    "instance-network" | "ip-name-lookup" | "network" | "tcp-create-socket" | "tcp"
774                    | "udp-create-socket" | "udp",
775                    Some(version),
776                )) if is_0_2(version, 0) => {}
777                _ if rt.skip_feature_gated_instance(name) => {}
778                _ => link_item(
779                    &engine,
780                    &mut linker.root(),
781                    [],
782                    Arc::clone(&host_resources),
783                    ty,
784                    "",
785                    name,
786                )
787                .context("failed to link item")?,
788            }
789        }
790        let instance_pre = linker.instantiate_pre(&component)?;
791        // use component specific memorylimit or runtime wide limit
792        let max_memory_limit = limits
793            .and_then(|l| l.max_memory_limit)
794            .unwrap_or(rt.max_linear_memory);
795        Ok(Self {
796            engine,
797            claims,
798            instance_pre,
799            host_resources,
800            max_execution_time: rt.max_execution_time,
801            experimental_features: rt.experimental_features,
802            max_memory_limit,
803        })
804    }
805
806    /// Sets maximum execution time for functionality exported by this component.
807    /// Values below 1 second will be interpreted as 1 second.
808    #[instrument(level = "trace", skip_all)]
809    pub fn set_max_execution_time(&mut self, max_execution_time: Duration) -> &mut Self {
810        self.max_execution_time = max_execution_time.max(Duration::from_secs(1));
811        self
812    }
813
814    /// Reads the WebAssembly binary asynchronously and calls [Component::new].
815    ///
816    /// # Errors
817    ///
818    /// Fails if either reading `wasm` fails or [Self::new] fails
819    #[instrument(level = "trace", skip_all)]
820    pub async fn read(rt: &Runtime, mut wasm: impl AsyncRead + Unpin) -> anyhow::Result<Self> {
821        let mut buf = Vec::new();
822        wasm.read_to_end(&mut buf)
823            .await
824            .context("failed to read Wasm")?;
825        Self::new(rt, &buf, None)
826    }
827
828    /// Reads the WebAssembly binary synchronously and calls [Component::new].
829    ///
830    /// # Errors
831    ///
832    /// Fails if either reading `wasm` fails or [Self::new] fails
833    #[instrument(level = "trace", skip_all)]
834    pub fn read_sync(rt: &Runtime, mut wasm: impl std::io::Read) -> anyhow::Result<Self> {
835        let mut buf = Vec::new();
836        wasm.read_to_end(&mut buf).context("failed to read Wasm")?;
837        Self::new(rt, &buf, None)
838    }
839
840    /// [Claims](jwt::Claims) associated with this [Component].
841    #[instrument(level = "trace")]
842    pub fn claims(&self) -> Option<&jwt::Claims<jwt::Component>> {
843        self.claims.as_ref()
844    }
845
846    /// Instantiates the component given a handler and event channel
847    pub fn instantiate<C>(
848        &self,
849        handler: H,
850        events: mpsc::Sender<WrpcServeEvent<C>>,
851    ) -> Instance<H, C> {
852        Instance {
853            engine: self.engine.clone(),
854            pre: self.instance_pre.clone(),
855            handler,
856            max_execution_time: self.max_execution_time,
857            events,
858            experimental_features: self.experimental_features,
859            max_memory_limit: self.max_memory_limit,
860        }
861    }
862
863    /// Serve all exports of this [Component] using supplied [`wrpc_transport::Serve`]
864    ///
865    /// The returned [Vec] contains an [InvocationStream] per each function exported by the component.
866    /// A [`WrpcServeEvent`] containing the incoming [`wrpc_transport::Serve::Context`] will be sent
867    /// on completion of each invocation.
868    /// The supplied [`Handler`] will be used to satisfy imports.
869    #[instrument(level = "debug", skip_all)]
870    pub async fn serve_wrpc<S>(
871        &self,
872        srv: &S,
873        handler: H,
874        events: mpsc::Sender<WrpcServeEvent<S::Context>>,
875    ) -> anyhow::Result<Vec<InvocationStream>>
876    where
877        S: wrpc_transport::Serve,
878        S::Context: Deref<Target = tracing::Span>,
879    {
880        let max_execution_time = self.max_execution_time;
881        let mut invocations = vec![];
882        let instance = self.instantiate(handler.clone(), events.clone());
883        for (name, ty) in self
884            .instance_pre
885            .component()
886            .component_type()
887            .exports(&self.engine)
888        {
889            match (name, ty) {
890                (_, types::ComponentItem::ComponentInstance(..))
891                    if name.starts_with("wasi:http/incoming-handler@0.2") =>
892                {
893                    let instance = instance.clone();
894                    let [(_, _, handle)] = wrpc_interface_http::bindings::exports::wrpc::http::incoming_handler::serve_interface(
895                        srv,
896                        wrpc_interface_http::ServeWasmtime(instance),
897                    )
898                    .await
899                    .context("failed to serve `wrpc:http/incoming-handler`")?;
900                    invocations.push(handle);
901                }
902                (
903                    "wasmcloud:messaging/handler@0.2.0"
904                    | "wasmcloud:messaging/incoming-handler@0.3.0",
905                    types::ComponentItem::ComponentInstance(..),
906                ) => {
907                    let instance = instance.clone();
908                    let [(_, _, handle_message)] =
909                        wrpc::exports::wasmcloud::messaging0_2_0::handler::serve_interface(
910                            srv, instance,
911                        )
912                        .await
913                        .context("failed to serve `wasmcloud:messaging/handler`")?;
914                    invocations.push(handle_message);
915                }
916                (
917                    "wasi:keyvalue/watcher@0.2.0-draft",
918                    types::ComponentItem::ComponentInstance(..),
919                ) => {
920                    let instance = instance.clone();
921                    let [(_, _, on_set), (_, _, on_delete)] =
922                        wrpc::exports::wrpc::keyvalue::watcher::serve_interface(srv, instance)
923                            .await
924                            .context("failed to serve `wrpc:keyvalue/watcher`")?;
925                    invocations.push(on_set);
926                    invocations.push(on_delete);
927                }
928                (name, types::ComponentItem::ComponentFunc(ty)) => {
929                    let engine = self.engine.clone();
930                    let handler = handler.clone();
931                    let pre = self.instance_pre.clone();
932                    debug!(?name, "serving root function");
933                    let func = srv
934                        .serve_function(
935                            move || {
936                                let span = info_span!("call_instance_function");
937                                let mut store =
938                                    new_store(&engine, handler.clone(), max_execution_time);
939                                store.data_mut().parent_context = Some(span.context());
940                                store
941                            },
942                            pre,
943                            Arc::clone(&self.host_resources),
944                            ty,
945                            "",
946                            name,
947                        )
948                        .await
949                        .context("failed to serve root function")?;
950                    let events = events.clone();
951                    invocations.push(Box::pin(func.map_ok(move |(cx, res)| {
952                        let events = events.clone();
953                        let span = cx.deref().clone();
954                        Box::pin(
955                            async move {
956                                let res =
957                                    res.instrument(info_span!("handle_instance_function")).await;
958                                let success = res.is_ok();
959                                if let Err(err) =
960                                    events.try_send(WrpcServeEvent::DynamicExportReturned {
961                                        context: cx,
962                                        success,
963                                    })
964                                {
965                                    warn!(
966                                        ?err,
967                                        success, "failed to send dynamic root export return event"
968                                    );
969                                }
970                                res
971                            }
972                            .instrument(span),
973                        )
974                            as Pin<Box<dyn Future<Output = _> + Send + 'static>>
975                    })));
976                }
977                (_, types::ComponentItem::CoreFunc(_)) => {
978                    warn!(name, "serving root core function exports not supported yet");
979                }
980                (_, types::ComponentItem::Module(_)) => {
981                    warn!(name, "serving root module exports not supported yet");
982                }
983                (_, types::ComponentItem::Component(_)) => {
984                    warn!(name, "serving root component exports not supported yet");
985                }
986                (instance_name, types::ComponentItem::ComponentInstance(ty)) => {
987                    for (name, ty) in ty.exports(&self.engine) {
988                        match ty {
989                            types::ComponentItem::ComponentFunc(ty) => {
990                                let engine = self.engine.clone();
991                                let handler = handler.clone();
992                                let pre = self.instance_pre.clone();
993                                debug!(?instance_name, ?name, "serving instance function");
994                                let func = srv
995                                    .serve_function(
996                                        move || {
997                                            let span = info_span!("call_instance_function");
998                                            let mut store = new_store(
999                                                &engine,
1000                                                handler.clone(),
1001                                                max_execution_time,
1002                                            );
1003                                            store.data_mut().parent_context = Some(span.context());
1004                                            store
1005                                        },
1006                                        pre,
1007                                        Arc::clone(&self.host_resources),
1008                                        ty,
1009                                        instance_name,
1010                                        name,
1011                                    )
1012                                    .await
1013                                    .context("failed to serve instance function")?;
1014                                let events = events.clone();
1015                                invocations.push(Box::pin(func.map_ok(move |(cx, res)| {
1016                                    let events = events.clone();
1017                                    let span = cx.deref().clone();
1018                                    Box::pin(
1019                                        async move {
1020                                            let res = res.await;
1021                                            let success = res.is_ok();
1022                                            if let Err(err) = events.try_send(
1023                                                WrpcServeEvent::DynamicExportReturned {
1024                                                    context: cx,
1025                                                    success,
1026                                                },
1027                                            ) {
1028                                                warn!(
1029                                                    ?err,
1030                                                    success,
1031                                                    "failed to send dynamic instance export return event"
1032                                                );
1033                                            }
1034                                            res
1035                                        }
1036                                        .instrument(span),
1037                                    )
1038                                        as Pin<Box<dyn Future<Output = _> + Send + 'static>>
1039                                })));
1040                            }
1041                            types::ComponentItem::CoreFunc(_) => {
1042                                warn!(
1043                                    instance_name,
1044                                    name,
1045                                    "serving instance core function exports not supported yet"
1046                                );
1047                            }
1048                            types::ComponentItem::Module(_) => {
1049                                warn!(
1050                                    instance_name,
1051                                    name, "serving instance module exports not supported yet"
1052                                );
1053                            }
1054                            types::ComponentItem::Component(_) => {
1055                                warn!(
1056                                    instance_name,
1057                                    name, "serving instance component exports not supported yet"
1058                                );
1059                            }
1060                            types::ComponentItem::ComponentInstance(_) => {
1061                                warn!(
1062                                    instance_name,
1063                                    name, "serving nested instance exports not supported yet"
1064                                );
1065                            }
1066                            types::ComponentItem::Type(_) | types::ComponentItem::Resource(_) => {}
1067                        }
1068                    }
1069                }
1070                (_, types::ComponentItem::Type(_) | types::ComponentItem::Resource(_)) => {}
1071            }
1072        }
1073        Ok(invocations)
1074    }
1075}
1076
1077impl<H> From<Component<H>> for Option<jwt::Claims<jwt::Component>>
1078where
1079    H: Handler,
1080{
1081    fn from(Component { claims, .. }: Component<H>) -> Self {
1082        claims
1083    }
1084}
1085
1086/// Instantiated component
1087pub struct Instance<H, C>
1088where
1089    H: Handler,
1090{
1091    engine: wasmtime::Engine,
1092    pre: wasmtime::component::InstancePre<Ctx<H>>,
1093    handler: H,
1094    max_execution_time: Duration,
1095    events: mpsc::Sender<WrpcServeEvent<C>>,
1096    experimental_features: Features,
1097    max_memory_limit: usize,
1098}
1099
1100impl<H, C> Clone for Instance<H, C>
1101where
1102    H: Handler,
1103{
1104    fn clone(&self) -> Self {
1105        Self {
1106            engine: self.engine.clone(),
1107            pre: self.pre.clone(),
1108            handler: self.handler.clone(),
1109            max_execution_time: self.max_execution_time,
1110            events: self.events.clone(),
1111            experimental_features: self.experimental_features,
1112            max_memory_limit: self.max_memory_limit,
1113        }
1114    }
1115}
1116
1117type TableResult<T> = Result<T, ResourceTableError>;
1118
1119/// The minimal implementation of a context that is required to run a component instance.
1120pub trait BaseCtx: Debug + 'static {
1121    /// The timeout to use for the duration of the component's invocation
1122    fn timeout(&self) -> Option<Duration> {
1123        None
1124    }
1125
1126    /// The parent context to use for the component's invocation
1127    fn parent_context(&self) -> Option<&opentelemetry::Context> {
1128        None
1129    }
1130}
1131
1132/// Wasmtime Context for a component instance, with access to
1133/// WASI context, HTTP context, and WRPC Invocation context.
1134/// This is a low-level API and has to be paired with `Component::new_with_linker`.
1135pub struct Ctx<H>
1136where
1137    H: MinimalHandler,
1138{
1139    handler: H,
1140    wasi: WasiCtx,
1141    http: WasiHttpCtx,
1142    wrpc: WrpcCtx<H>,
1143    table: ResourceTable,
1144    parent_context: Option<opentelemetry::Context>,
1145}
1146
1147struct WrpcCtx<H>
1148where
1149    H: MinimalHandler,
1150{
1151    handler: H,
1152    shared_resources: SharedResourceTable,
1153    timeout: Duration,
1154}
1155
1156impl<H: MinimalHandler> WasiView for Ctx<H> {
1157    fn ctx(&mut self) -> WasiCtxView<'_> {
1158        WasiCtxView {
1159            ctx: &mut self.wasi,
1160            table: &mut self.table,
1161        }
1162    }
1163}
1164
1165impl<H: Handler> WrpcView for Ctx<H> {
1166    type Invoke = H;
1167
1168    fn wrpc(&mut self) -> WrpcCtxView<'_, Self::Invoke> {
1169        WrpcCtxView {
1170            ctx: &mut self.wrpc,
1171            table: &mut self.table,
1172        }
1173    }
1174}
1175
1176impl<H: Handler> wrpc_runtime_wasmtime::WrpcCtx<H> for WrpcCtx<H> {
1177    fn context(&self) -> H::Context {
1178        None
1179    }
1180
1181    fn client(&self) -> &H {
1182        &self.handler
1183    }
1184
1185    fn shared_resources(&mut self) -> &mut SharedResourceTable {
1186        &mut self.shared_resources
1187    }
1188
1189    fn timeout(&self) -> Option<Duration> {
1190        Some(self.timeout)
1191    }
1192}
1193
1194impl<H: MinimalHandler> Debug for Ctx<H> {
1195    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1196        f.debug_struct("Ctx").field("runtime", &"wasmtime").finish()
1197    }
1198}
1199
1200impl<H: MinimalHandler> Ctx<H> {
1201    fn attach_parent_context(&self) {
1202        if let Some(context) = self.parent_context.as_ref() {
1203            Span::current().set_parent(context.clone());
1204        }
1205    }
1206}