1use core::fmt::{self, Debug};
2use core::future::Future;
3use core::ops::{Bound, Deref};
4use core::pin::Pin;
5use core::time::Duration;
6
7use anyhow::Result;
8use anyhow::{ensure, Context as _};
9use futures::{Stream, TryStreamExt as _};
10use serde::{Deserialize, Serialize};
11use std::collections::{BTreeMap, HashMap};
12use std::sync::Arc;
13use tokio::io::{AsyncRead, AsyncReadExt as _};
14use tokio::sync::mpsc;
15use tracing::{debug, info_span, instrument, warn, Instrument as _, Span};
16use tracing_opentelemetry::OpenTelemetrySpanExt;
17use wascap::jwt;
18use wascap::wasm::extract_claims;
19use wasi_preview1_component_adapter_provider::{
20 WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME, WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER,
21};
22use wasmtime::component::{
23 types, HasSelf, Linker, ResourceTable, ResourceTableError, ResourceType,
24};
25use wasmtime::InstanceAllocationStrategy;
26use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView};
27use wasmtime_wasi_http::WasiHttpCtx;
28use wrpc_runtime_wasmtime::{
29 collect_component_resource_exports, collect_component_resource_imports, link_item, rpc,
30 RemoteResource, ServeExt as _, SharedResourceTable, WrpcCtxView, WrpcView,
31};
32
33use crate::capability::{self, wrpc};
34use crate::experimental::Features;
35use crate::Runtime;
36
37pub use bus::{Bus, Error};
38pub use bus1_0_0::Bus as Bus1_0_0;
39pub use config::Config;
40pub use identity::Identity;
41pub use logging::Logging;
42pub use messaging::v0_2::Messaging as Messaging0_2;
43pub use messaging::v0_3::{
44 Client as MessagingClient0_3, GuestMessage as MessagingGuestMessage0_3,
45 HostMessage as MessagingHostMessage0_3, Messaging as Messaging0_3,
46};
47pub use secrets::Secrets;
48
49pub(crate) mod blobstore;
50mod bus;
51mod bus1_0_0;
52mod config;
53mod http;
54mod identity;
55mod keyvalue;
56mod logging;
57pub(crate) mod messaging;
58mod secrets;
59
60#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
64pub enum ReplacedInstanceTarget {
65 BlobstoreBlobstore,
67 BlobstoreContainer,
69 KeyvalueAtomics,
71 KeyvalueStore,
73 KeyvalueBatch,
75 KeyvalueWatch,
77 HttpIncomingHandler,
79 HttpOutgoingHandler,
81}
82
83fn is_0_2(version: &str, min_patch: u64) -> bool {
84 if let Ok(semver::Version {
85 major,
86 minor,
87 patch,
88 pre,
89 build,
90 }) = version.parse()
91 {
92 major == 0 && minor == 2 && patch >= min_patch && pre.is_empty() && build.is_empty()
93 } else {
94 false
95 }
96}
97
98pub enum InvocationErrorKind {
100 NotFound,
103
104 Trap,
106}
107
108pub trait InvocationErrorIntrospect {
110 fn invocation_error_kind(&self, err: &anyhow::Error) -> InvocationErrorKind;
112}
113
114pub trait MinimalHandler: Send + Sync + Clone + 'static {}
118impl<T: Send + Sync + Clone + 'static> MinimalHandler for T {}
119
120pub trait Handler:
122 wrpc_transport::Invoke<Context = Option<ReplacedInstanceTarget>>
123 + Bus
124 + Config
125 + Logging
126 + Secrets
127 + Messaging0_2
128 + Messaging0_3
129 + Identity
130 + InvocationErrorIntrospect
131 + Send
132 + Sync
133 + Clone
134 + 'static
135{
136}
137
138impl<
139 T: wrpc_transport::Invoke<Context = Option<ReplacedInstanceTarget>>
140 + Bus
141 + Config
142 + Logging
143 + Secrets
144 + Messaging0_2
145 + Messaging0_3
146 + Identity
147 + InvocationErrorIntrospect
148 + Send
149 + Sync
150 + Clone
151 + 'static,
152 > Handler for T
153{
154}
155
156#[derive(Clone, Debug, Default)]
158pub struct ComponentConfig {
159 pub require_signature: bool,
161}
162
163#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Copy)]
165pub struct Limits {
166 pub max_memory_limit: Option<usize>,
168 pub max_execution_time: Option<u64>,
170}
171impl Limits {
172 #[must_use]
180 pub fn to_string_map(&self) -> HashMap<String, String> {
181 let mut map = HashMap::new();
182
183 if let Some(memory_limit) = self.max_memory_limit {
184 map.insert("max_memory_limit".to_string(), memory_limit.to_string());
185 }
186
187 if let Some(execution_time) = self.max_execution_time {
188 map.insert("max_execution_time".to_string(), execution_time.to_string());
189 }
190
191 map
192 }
193}
194
195#[must_use]
206pub fn from_string_map<S: std::hash::BuildHasher>(
207 map: Option<&HashMap<String, String, S>>,
208) -> Option<Limits> {
209 map.map(|map| Limits {
210 max_memory_limit: map.get("max_memory_limit").and_then(|s| s.parse().ok()),
211
212 max_execution_time: map.get("max_execution_time").and_then(|s| s.parse().ok()),
213 })
214}
215pub fn claims_token(wasm: impl AsRef<[u8]>) -> anyhow::Result<Option<jwt::Token<jwt::Component>>> {
228 let Some(claims) = extract_claims(wasm).context("failed to extract module claims")? else {
229 return Ok(None);
230 };
231 let v = jwt::validate_token::<jwt::Component>(&claims.jwt)
232 .context("failed to validate module token")?;
233 ensure!(!v.expired, "token expired at `{}`", v.expires_human);
234 ensure!(
235 !v.cannot_use_yet,
236 "token cannot be used before `{}`",
237 v.not_before_human
238 );
239 ensure!(v.signature_valid, "signature is not valid");
240 Ok(Some(claims))
241}
242
243#[derive(Clone)]
245pub struct Component<H>
246where
247 H: MinimalHandler,
248{
249 engine: wasmtime::Engine,
250 claims: Option<jwt::Claims<jwt::Component>>,
251 instance_pre: wasmtime::component::InstancePre<Ctx<H>>,
252 host_resources: Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
253 max_execution_time: Duration,
254 experimental_features: Features,
255 max_memory_limit: usize,
256}
257
258#[derive(Clone)]
262pub struct CustomCtxComponent<C>
263where
264 C: BaseCtx,
265{
266 engine: wasmtime::Engine,
267 instance_pre: wasmtime::component::InstancePre<C>,
268 #[allow(unused)]
269 host_resources: Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
270 max_execution_time: Duration,
271}
272
273impl<C> CustomCtxComponent<C>
274where
275 C: BaseCtx,
276{
277 #[instrument(level = "trace", skip_all)]
282 pub fn new_with_linker_minimal(
283 rt: &Runtime,
284 wasm: &[u8],
285 linker_fn: impl FnOnce(&mut Linker<C>, &wasmtime::component::Component) -> anyhow::Result<()>,
286 ) -> anyhow::Result<Self> {
287 if wasmparser::Parser::is_core_wasm(wasm) {
288 anyhow::bail!("core modules are not supported in the minimal linker");
289 }
290 let engine = rt.engine.clone();
291 let component = wasmtime::component::Component::new(&engine, wasm)
292 .context("failed to compile component")?;
293
294 let mut linker = Linker::new(&engine);
295
296 let ty = component.component_type();
297 let mut guest_resources = Vec::new();
298 let mut host_resources = BTreeMap::new();
299 collect_component_resource_exports(&engine, &ty, &mut guest_resources);
300 collect_component_resource_imports(&engine, &ty, &mut host_resources);
301
302 let host_resources = host_resources
303 .into_iter()
304 .map(|(name, instance)| {
305 let instance = instance
306 .into_iter()
307 .map(|(name, ty)| (name, (ty, ResourceType::host::<RemoteResource>())))
308 .collect::<HashMap<_, _>>();
309 (name, instance)
310 })
311 .collect::<HashMap<_, _>>();
312 let host_resources = Arc::from(host_resources);
313
314 if !guest_resources.is_empty() {
315 debug!("guest resources present, must be fulfilled by the linker");
316 }
317
318 linker_fn(&mut linker, &component)?;
319
320 let instance_pre = linker
321 .instantiate_pre(&component)
322 .context("failed to pre-instantiate component")?;
323
324 Ok(CustomCtxComponent {
325 engine,
326 instance_pre,
327 host_resources,
328 max_execution_time: rt.max_execution_time,
329 })
330 }
331
332 pub fn new_store(&self, ctx: C) -> wasmtime::Store<C> {
334 let mut store = wasmtime::Store::new(&self.engine, ctx);
335 store.set_epoch_deadline(self.max_execution_time.as_secs());
336 store
337 }
338
339 #[must_use]
341 pub fn instance_pre(&self) -> &wasmtime::component::InstancePre<C> {
342 &self.instance_pre
343 }
344}
345
346impl<H> Debug for Component<H>
347where
348 H: MinimalHandler,
349{
350 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
351 f.debug_struct("Component")
352 .field("claims", &self.claims)
353 .field("runtime", &"wasmtime")
354 .field("max_execution_time", &self.max_execution_time)
355 .finish_non_exhaustive()
356 }
357}
358
359fn new_store<H: Handler>(
360 engine: &wasmtime::Engine,
361 handler: H,
362 max_execution_time: Duration,
363) -> wasmtime::Store<Ctx<H>> {
364 let table = ResourceTable::new();
365 let wasi = WasiCtxBuilder::new()
366 .args(&["main.wasm"]) .inherit_stderr()
368 .build();
369
370 let mut store = wasmtime::Store::new(
371 engine,
372 Ctx {
373 handler: handler.clone(),
374 wasi,
375 http: WasiHttpCtx::new(),
376 table,
377 wrpc: WrpcCtx {
378 handler,
379 shared_resources: SharedResourceTable::default(),
380 timeout: max_execution_time,
381 },
382 parent_context: None,
383 },
384 );
385 store.set_epoch_deadline(max_execution_time.as_secs());
386 store
387}
388
389#[derive(Clone, Debug)]
391pub enum WrpcServeEvent<C> {
392 HttpIncomingHandlerHandleReturned {
394 context: C,
396 success: bool,
398 },
399 MessagingHandlerHandleMessageReturned {
401 context: C,
403 success: bool,
405 },
406 DynamicExportReturned {
408 context: C,
410 success: bool,
412 },
413}
414
415pub type InvocationStream = Pin<
418 Box<
419 dyn Stream<
420 Item = anyhow::Result<
421 Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'static>>,
422 >,
423 > + Send
424 + 'static,
425 >,
426>;
427
428impl<H> Component<H>
429where
430 H: MinimalHandler,
431{
432 #[instrument(level = "trace", skip_all)]
439 pub fn new_with_linker_minimal(
440 rt: &Runtime,
441 wasm: &[u8],
442 linker_fn: impl FnOnce(&mut Linker<Ctx<H>>) -> anyhow::Result<()>,
443 ) -> anyhow::Result<Self> {
444 if wasmparser::Parser::is_core_wasm(wasm) {
445 anyhow::bail!("core modules are not supported in the minimal linker");
446 }
447 let engine = rt.engine.clone();
448 let claims = None;
449 let component = wasmtime::component::Component::new(&engine, wasm)
450 .context("failed to compile component")?;
451
452 let mut linker = Linker::new(&engine);
453 linker_fn(&mut linker)?;
454
455 let ty = component.component_type();
456 let mut guest_resources = Vec::new();
457 let mut host_resources = BTreeMap::new();
458 collect_component_resource_exports(&engine, &ty, &mut guest_resources);
459 collect_component_resource_imports(&engine, &ty, &mut host_resources);
460
461 let host_resources = host_resources
462 .into_iter()
463 .map(|(name, instance)| {
464 let instance = instance
465 .into_iter()
466 .map(|(name, ty)| (name, (ty, ResourceType::host::<RemoteResource>())))
467 .collect::<HashMap<_, _>>();
468 (name, instance)
469 })
470 .collect::<HashMap<_, _>>();
471 let host_resources = Arc::from(host_resources);
472
473 if !guest_resources.is_empty() {
474 debug!("guest resources present, must be fulfilled by the linker");
475 }
476
477 let instance_pre = linker
478 .instantiate_pre(&component)
479 .context("failed to pre-instantiate component")?;
480
481 Ok(Component {
482 engine,
483 claims,
484 instance_pre,
485 host_resources,
486 max_execution_time: rt.max_execution_time,
487 experimental_features: rt.experimental_features,
488 max_memory_limit: rt.max_linear_memory,
489 })
490 }
491}
492
493impl<H> Component<H>
494where
495 H: Handler,
496{
497 #[instrument(level = "trace", skip_all)]
501 pub fn new(rt: &Runtime, wasm: &[u8], limits: Option<Limits>) -> anyhow::Result<Self> {
502 Self::new_with_linker(rt, wasm, limits, |linker| {
503 wasmtime_wasi::p2::add_to_linker_async(linker)
504 .context("failed to link core WASI interfaces")?;
505 wasmtime_wasi_http::add_only_http_to_linker_async(linker)
506 .context("failed to link `wasi:http`")?;
507
508 capability::blobstore::blobstore::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| {
509 ctx
510 })
511 .context("failed to link `wasi:blobstore/blobstore`")?;
512 capability::blobstore::container::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| {
513 ctx
514 })
515 .context("failed to link `wasi:blobstore/container`")?;
516 capability::blobstore::types::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| ctx)
517 .context("failed to link `wasi:blobstore/types`")?;
518 capability::config::runtime::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| ctx)
519 .context("failed to link `wasi:config/runtime`")?;
520 capability::config::store::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| ctx)
521 .context("failed to link `wasi:config/store`")?;
522 capability::keyvalue::atomics::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| ctx)
523 .context("failed to link `wasi:keyvalue/atomics`")?;
524 capability::keyvalue::store::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| ctx)
525 .context("failed to link `wasi:keyvalue/store`")?;
526 capability::keyvalue::batch::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| ctx)
527 .context("failed to link `wasi:keyvalue/batch`")?;
528 capability::logging::logging::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| ctx)
529 .context("failed to link `wasi:logging/logging`")?;
530 capability::unversioned_logging::logging::add_to_linker::<_, HasSelf<Ctx<H>>>(
531 linker,
532 |ctx| ctx,
533 )
534 .context("failed to link unversioned `wasi:logging/logging`")?;
535
536 capability::bus1_0_0::lattice::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| ctx)
537 .context("failed to link `wasmcloud:bus/lattice@1.0.0`")?;
538 capability::bus2_0_1::lattice::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| ctx)
539 .context("failed to link `wasmcloud:bus/lattice@2.0.1`")?;
540 capability::bus2_0_1::error::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| ctx)
541 .context("failed to link `wasmcloud:bus/error@2.0.1`")?;
542 capability::messaging0_2_0::types::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| {
543 ctx
544 })
545 .context("failed to link `wasmcloud:messaging/types@0.2.0`")?;
546 capability::messaging0_2_0::consumer::add_to_linker::<_, HasSelf<Ctx<H>>>(
547 linker,
548 |ctx| ctx,
549 )
550 .context("failed to link `wasmcloud:messaging/consumer@0.2.0`")?;
551 capability::secrets::reveal::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| ctx)
552 .context("failed to link `wasmcloud:secrets/reveal`")?;
553 capability::secrets::store::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| ctx)
554 .context("failed to link `wasmcloud:secrets/store`")?;
555 if rt.experimental_features.wasmcloud_messaging_v3 {
557 capability::messaging0_3_0::types::add_to_linker::<_, HasSelf<Ctx<H>>>(
558 linker,
559 |ctx| ctx,
560 )
561 .context("failed to link `wasmcloud:messaging/types@0.3.0`")?;
562 capability::messaging0_3_0::producer::add_to_linker::<_, HasSelf<Ctx<H>>>(
563 linker,
564 |ctx| ctx,
565 )
566 .context("failed to link `wasmcloud:messaging/producer@0.3.0`")?;
567 capability::messaging0_3_0::request_reply::add_to_linker::<_, HasSelf<Ctx<H>>>(
568 linker,
569 |ctx| ctx,
570 )
571 .context("failed to link `wasmcloud:messaging/request-reply@0.3.0`")?;
572 }
573 if rt.experimental_features.workload_identity_interface {
575 capability::identity::store::add_to_linker::<_, HasSelf<Ctx<H>>>(linker, |ctx| ctx)
576 .context("failed to link `wasmcloud:identity/store`")?;
577 }
578
579 if rt.experimental_features.rpc_interface {
581 rpc::add_to_linker(linker).context("failed to link `wrpc:rpc`")?;
582 }
583
584 Ok(())
585 })
586 }
587
588 #[instrument(level = "trace", skip_all)]
593 pub fn new_with_linker(
594 rt: &Runtime,
595 wasm: &[u8],
596 limits: Option<Limits>,
597 linker_fn: impl FnOnce(&mut Linker<Ctx<H>>) -> anyhow::Result<()>,
598 ) -> anyhow::Result<Self> {
599 if wasmparser::Parser::is_core_wasm(wasm) {
600 let wasm = wit_component::ComponentEncoder::default()
601 .module(wasm)
602 .context("failed to set core component module")?
603 .adapter(
604 WASI_SNAPSHOT_PREVIEW1_ADAPTER_NAME,
605 WASI_SNAPSHOT_PREVIEW1_REACTOR_ADAPTER,
606 )
607 .context("failed to add WASI preview1 adapter")?
608 .encode()
609 .context("failed to encode a component from module")?;
610 return Self::new(rt, &wasm, limits);
611 }
612 let engine: wasmtime::Engine = if let Some(limits) = limits {
613 if limits.max_memory_limit.is_none() {
614 rt.engine.clone()
615 } else {
616 let mut component_pooling_config = rt.pooling_config.clone();
618 component_pooling_config.max_memory_size(
619 limits
620 .max_memory_limit
621 .expect("max_memory_limit should be Some"),
622 );
623
624 let mut component_engine_config = rt.engine_config.clone();
625 component_engine_config.allocation_strategy(InstanceAllocationStrategy::Pooling(
626 component_pooling_config,
627 ));
628
629 match wasmtime::Engine::new(&component_engine_config)
630 .context("failed to construct engine")
631 {
632 Ok(engine) => engine,
633 Err(e) => {
634 tracing::warn!(err = %e, "failed to construct engine with pooling allocator, falling back to dynamic allocator which may result in slower startup and execution of components.");
635 component_engine_config
636 .allocation_strategy(InstanceAllocationStrategy::OnDemand);
637 wasmtime::Engine::new(&component_engine_config)
638 .context("failed to construct engine")?
639 }
640 }
641 }
642 } else {
643 rt.engine.clone()
644 };
645 let claims_token = claims_token(wasm)?;
646 let claims = claims_token.map(|c| c.claims);
647 let component = wasmtime::component::Component::new(&engine, wasm)
648 .context("failed to compile component")?;
649
650 let mut linker = Linker::new(&engine);
651
652 linker_fn(&mut linker)?;
653
654 let ty = component.component_type();
655 let mut guest_resources = Vec::new();
656 let mut host_resources = BTreeMap::new();
657 collect_component_resource_exports(&engine, &ty, &mut guest_resources);
658 collect_component_resource_imports(&engine, &ty, &mut host_resources);
659 let io_err_tys = host_resources
660 .range::<str, _>((
661 Bound::Included("wasi:io/error@0.2"),
662 Bound::Excluded("wasi:io/error@0.3"),
663 ))
664 .flat_map(|(_, instance)| instance.get("error"))
665 .copied()
666 .collect::<Box<[_]>>();
667 let io_pollable_tys = host_resources
668 .range::<str, _>((
669 Bound::Included("wasi:io/poll@0.2"),
670 Bound::Excluded("wasi:io/poll@0.3"),
671 ))
672 .flat_map(|(_, instance)| instance.get("pollable"))
673 .copied()
674 .collect::<Box<[_]>>();
675 let io_input_stream_tys = host_resources
676 .range::<str, _>((
677 Bound::Included("wasi:io/streams@0.2"),
678 Bound::Excluded("wasi:io/streams@0.3"),
679 ))
680 .flat_map(|(_, instance)| instance.get("input-stream"))
681 .copied()
682 .collect::<Box<[_]>>();
683 let io_output_stream_tys = host_resources
684 .range::<str, _>((
685 Bound::Included("wasi:io/streams@0.2"),
686 Bound::Excluded("wasi:io/streams@0.3"),
687 ))
688 .flat_map(|(_, instance)| instance.get("output-stream"))
689 .copied()
690 .collect::<Box<[_]>>();
691 let rpc_err_ty = host_resources
692 .get("wrpc:rpc/error@0.1.0")
693 .and_then(|instance| instance.get("error"))
694 .copied();
695 let host_resources = host_resources
697 .into_iter()
698 .map(|(name, instance)| {
699 let instance = instance
700 .into_iter()
701 .map(|(name, ty)| {
702 let host_ty = match ty {
703 ty if Some(ty) == rpc_err_ty => ResourceType::host::<rpc::Error>(),
704 ty if io_err_tys.contains(&ty) => ResourceType::host::<
705 wasmtime_wasi::p2::bindings::io::error::Error,
706 >(),
707 ty if io_input_stream_tys.contains(&ty) => ResourceType::host::<
708 wasmtime_wasi::p2::bindings::io::streams::InputStream,
709 >(
710 ),
711 ty if io_output_stream_tys.contains(&ty) => ResourceType::host::<
712 wasmtime_wasi::p2::bindings::io::streams::OutputStream,
713 >(
714 ),
715 ty if io_pollable_tys.contains(&ty) => ResourceType::host::<
716 wasmtime_wasi::p2::bindings::io::poll::Pollable,
717 >(),
718 _ => ResourceType::host::<RemoteResource>(),
719 };
720 (name, (ty, host_ty))
721 })
722 .collect::<HashMap<_, _>>();
723 (name, instance)
724 })
725 .collect::<HashMap<_, _>>();
726 let host_resources = Arc::from(host_resources);
727 if !guest_resources.is_empty() {
728 warn!("exported component resources are not supported in wasmCloud runtime and will be ignored, use a provider instead to enable this functionality");
729 }
730 for (name, ty) in ty.imports(&engine) {
731 match name.split_once('/').map(|(pkg, suffix)| {
733 suffix
734 .split_once('@')
735 .map_or((pkg, suffix, None), |(iface, version)| {
736 (pkg, iface, Some(version))
737 })
738 }) {
739 Some(
740 ("wasi:blobstore", "blobstore" | "container" | "types", Some("0.2.0-draft"))
741 | ("wasi:config", "runtime" | "store", Some("0.2.0-draft"))
742 | ("wasi:keyvalue", "atomics" | "batch" | "store", Some("0.2.0-draft"))
743 | ("wasi:logging", "logging", None | Some("0.1.0-draft"))
744 | ("wasmcloud:bus", "lattice", Some("1.0.0" | "2.0.0"))
745 | ("wasmcloud:messaging", "consumer" | "types", Some("0.2.0"))
746 | ("wasmcloud:secrets", "reveal" | "store", Some("0.1.0-draft")),
747 ) => {}
748 Some((
749 "wasi:cli",
750 "environment" | "exit" | "stderr" | "stdin" | "stdout" | "terminal-input"
751 | "terminal-output" | "terminal-stderr" | "terminal-stdin" | "terminal-stdout",
752 Some(version),
753 )) if is_0_2(version, 0) => {}
754 Some((
755 "wasi:clocks",
756 "monotonic-clock" | "wall-clock" | "timezone",
757 Some(version),
758 )) if is_0_2(version, 0) => {}
759 Some(("wasi:clocks", "timezone", Some(version))) if is_0_2(version, 1) => {}
760 Some(("wasi:filesystem", "preopens" | "types", Some(version)))
761 if is_0_2(version, 0) => {}
762 Some((
763 "wasi:http",
764 "incoming-handler" | "outgoing-handler" | "types",
765 Some(version),
766 )) if is_0_2(version, 0) => {}
767 Some(("wasi:io", "error" | "poll" | "streams", Some(version)))
768 if is_0_2(version, 0) => {}
769 Some(("wasi:random", "insecure-seed" | "insecure" | "random", Some(version)))
770 if is_0_2(version, 0) => {}
771 Some((
772 "wasi:sockets",
773 "instance-network" | "ip-name-lookup" | "network" | "tcp-create-socket" | "tcp"
774 | "udp-create-socket" | "udp",
775 Some(version),
776 )) if is_0_2(version, 0) => {}
777 _ if rt.skip_feature_gated_instance(name) => {}
778 _ => link_item(
779 &engine,
780 &mut linker.root(),
781 [],
782 Arc::clone(&host_resources),
783 ty,
784 "",
785 name,
786 )
787 .context("failed to link item")?,
788 }
789 }
790 let instance_pre = linker.instantiate_pre(&component)?;
791 let max_memory_limit = limits
793 .and_then(|l| l.max_memory_limit)
794 .unwrap_or(rt.max_linear_memory);
795 Ok(Self {
796 engine,
797 claims,
798 instance_pre,
799 host_resources,
800 max_execution_time: rt.max_execution_time,
801 experimental_features: rt.experimental_features,
802 max_memory_limit,
803 })
804 }
805
806 #[instrument(level = "trace", skip_all)]
809 pub fn set_max_execution_time(&mut self, max_execution_time: Duration) -> &mut Self {
810 self.max_execution_time = max_execution_time.max(Duration::from_secs(1));
811 self
812 }
813
814 #[instrument(level = "trace", skip_all)]
820 pub async fn read(rt: &Runtime, mut wasm: impl AsyncRead + Unpin) -> anyhow::Result<Self> {
821 let mut buf = Vec::new();
822 wasm.read_to_end(&mut buf)
823 .await
824 .context("failed to read Wasm")?;
825 Self::new(rt, &buf, None)
826 }
827
828 #[instrument(level = "trace", skip_all)]
834 pub fn read_sync(rt: &Runtime, mut wasm: impl std::io::Read) -> anyhow::Result<Self> {
835 let mut buf = Vec::new();
836 wasm.read_to_end(&mut buf).context("failed to read Wasm")?;
837 Self::new(rt, &buf, None)
838 }
839
840 #[instrument(level = "trace")]
842 pub fn claims(&self) -> Option<&jwt::Claims<jwt::Component>> {
843 self.claims.as_ref()
844 }
845
846 pub fn instantiate<C>(
848 &self,
849 handler: H,
850 events: mpsc::Sender<WrpcServeEvent<C>>,
851 ) -> Instance<H, C> {
852 Instance {
853 engine: self.engine.clone(),
854 pre: self.instance_pre.clone(),
855 handler,
856 max_execution_time: self.max_execution_time,
857 events,
858 experimental_features: self.experimental_features,
859 max_memory_limit: self.max_memory_limit,
860 }
861 }
862
863 #[instrument(level = "debug", skip_all)]
870 pub async fn serve_wrpc<S>(
871 &self,
872 srv: &S,
873 handler: H,
874 events: mpsc::Sender<WrpcServeEvent<S::Context>>,
875 ) -> anyhow::Result<Vec<InvocationStream>>
876 where
877 S: wrpc_transport::Serve,
878 S::Context: Deref<Target = tracing::Span>,
879 {
880 let max_execution_time = self.max_execution_time;
881 let mut invocations = vec![];
882 let instance = self.instantiate(handler.clone(), events.clone());
883 for (name, ty) in self
884 .instance_pre
885 .component()
886 .component_type()
887 .exports(&self.engine)
888 {
889 match (name, ty) {
890 (_, types::ComponentItem::ComponentInstance(..))
891 if name.starts_with("wasi:http/incoming-handler@0.2") =>
892 {
893 let instance = instance.clone();
894 let [(_, _, handle)] = wrpc_interface_http::bindings::exports::wrpc::http::incoming_handler::serve_interface(
895 srv,
896 wrpc_interface_http::ServeWasmtime(instance),
897 )
898 .await
899 .context("failed to serve `wrpc:http/incoming-handler`")?;
900 invocations.push(handle);
901 }
902 (
903 "wasmcloud:messaging/handler@0.2.0"
904 | "wasmcloud:messaging/incoming-handler@0.3.0",
905 types::ComponentItem::ComponentInstance(..),
906 ) => {
907 let instance = instance.clone();
908 let [(_, _, handle_message)] =
909 wrpc::exports::wasmcloud::messaging0_2_0::handler::serve_interface(
910 srv, instance,
911 )
912 .await
913 .context("failed to serve `wasmcloud:messaging/handler`")?;
914 invocations.push(handle_message);
915 }
916 (
917 "wasi:keyvalue/watcher@0.2.0-draft",
918 types::ComponentItem::ComponentInstance(..),
919 ) => {
920 let instance = instance.clone();
921 let [(_, _, on_set), (_, _, on_delete)] =
922 wrpc::exports::wrpc::keyvalue::watcher::serve_interface(srv, instance)
923 .await
924 .context("failed to serve `wrpc:keyvalue/watcher`")?;
925 invocations.push(on_set);
926 invocations.push(on_delete);
927 }
928 (name, types::ComponentItem::ComponentFunc(ty)) => {
929 let engine = self.engine.clone();
930 let handler = handler.clone();
931 let pre = self.instance_pre.clone();
932 debug!(?name, "serving root function");
933 let func = srv
934 .serve_function(
935 move || {
936 let span = info_span!("call_instance_function");
937 let mut store =
938 new_store(&engine, handler.clone(), max_execution_time);
939 store.data_mut().parent_context = Some(span.context());
940 store
941 },
942 pre,
943 Arc::clone(&self.host_resources),
944 ty,
945 "",
946 name,
947 )
948 .await
949 .context("failed to serve root function")?;
950 let events = events.clone();
951 invocations.push(Box::pin(func.map_ok(move |(cx, res)| {
952 let events = events.clone();
953 let span = cx.deref().clone();
954 Box::pin(
955 async move {
956 let res =
957 res.instrument(info_span!("handle_instance_function")).await;
958 let success = res.is_ok();
959 if let Err(err) =
960 events.try_send(WrpcServeEvent::DynamicExportReturned {
961 context: cx,
962 success,
963 })
964 {
965 warn!(
966 ?err,
967 success, "failed to send dynamic root export return event"
968 );
969 }
970 res
971 }
972 .instrument(span),
973 )
974 as Pin<Box<dyn Future<Output = _> + Send + 'static>>
975 })));
976 }
977 (_, types::ComponentItem::CoreFunc(_)) => {
978 warn!(name, "serving root core function exports not supported yet");
979 }
980 (_, types::ComponentItem::Module(_)) => {
981 warn!(name, "serving root module exports not supported yet");
982 }
983 (_, types::ComponentItem::Component(_)) => {
984 warn!(name, "serving root component exports not supported yet");
985 }
986 (instance_name, types::ComponentItem::ComponentInstance(ty)) => {
987 for (name, ty) in ty.exports(&self.engine) {
988 match ty {
989 types::ComponentItem::ComponentFunc(ty) => {
990 let engine = self.engine.clone();
991 let handler = handler.clone();
992 let pre = self.instance_pre.clone();
993 debug!(?instance_name, ?name, "serving instance function");
994 let func = srv
995 .serve_function(
996 move || {
997 let span = info_span!("call_instance_function");
998 let mut store = new_store(
999 &engine,
1000 handler.clone(),
1001 max_execution_time,
1002 );
1003 store.data_mut().parent_context = Some(span.context());
1004 store
1005 },
1006 pre,
1007 Arc::clone(&self.host_resources),
1008 ty,
1009 instance_name,
1010 name,
1011 )
1012 .await
1013 .context("failed to serve instance function")?;
1014 let events = events.clone();
1015 invocations.push(Box::pin(func.map_ok(move |(cx, res)| {
1016 let events = events.clone();
1017 let span = cx.deref().clone();
1018 Box::pin(
1019 async move {
1020 let res = res.await;
1021 let success = res.is_ok();
1022 if let Err(err) = events.try_send(
1023 WrpcServeEvent::DynamicExportReturned {
1024 context: cx,
1025 success,
1026 },
1027 ) {
1028 warn!(
1029 ?err,
1030 success,
1031 "failed to send dynamic instance export return event"
1032 );
1033 }
1034 res
1035 }
1036 .instrument(span),
1037 )
1038 as Pin<Box<dyn Future<Output = _> + Send + 'static>>
1039 })));
1040 }
1041 types::ComponentItem::CoreFunc(_) => {
1042 warn!(
1043 instance_name,
1044 name,
1045 "serving instance core function exports not supported yet"
1046 );
1047 }
1048 types::ComponentItem::Module(_) => {
1049 warn!(
1050 instance_name,
1051 name, "serving instance module exports not supported yet"
1052 );
1053 }
1054 types::ComponentItem::Component(_) => {
1055 warn!(
1056 instance_name,
1057 name, "serving instance component exports not supported yet"
1058 );
1059 }
1060 types::ComponentItem::ComponentInstance(_) => {
1061 warn!(
1062 instance_name,
1063 name, "serving nested instance exports not supported yet"
1064 );
1065 }
1066 types::ComponentItem::Type(_) | types::ComponentItem::Resource(_) => {}
1067 }
1068 }
1069 }
1070 (_, types::ComponentItem::Type(_) | types::ComponentItem::Resource(_)) => {}
1071 }
1072 }
1073 Ok(invocations)
1074 }
1075}
1076
1077impl<H> From<Component<H>> for Option<jwt::Claims<jwt::Component>>
1078where
1079 H: Handler,
1080{
1081 fn from(Component { claims, .. }: Component<H>) -> Self {
1082 claims
1083 }
1084}
1085
1086pub struct Instance<H, C>
1088where
1089 H: Handler,
1090{
1091 engine: wasmtime::Engine,
1092 pre: wasmtime::component::InstancePre<Ctx<H>>,
1093 handler: H,
1094 max_execution_time: Duration,
1095 events: mpsc::Sender<WrpcServeEvent<C>>,
1096 experimental_features: Features,
1097 max_memory_limit: usize,
1098}
1099
1100impl<H, C> Clone for Instance<H, C>
1101where
1102 H: Handler,
1103{
1104 fn clone(&self) -> Self {
1105 Self {
1106 engine: self.engine.clone(),
1107 pre: self.pre.clone(),
1108 handler: self.handler.clone(),
1109 max_execution_time: self.max_execution_time,
1110 events: self.events.clone(),
1111 experimental_features: self.experimental_features,
1112 max_memory_limit: self.max_memory_limit,
1113 }
1114 }
1115}
1116
1117type TableResult<T> = Result<T, ResourceTableError>;
1118
1119pub trait BaseCtx: Debug + 'static {
1121 fn timeout(&self) -> Option<Duration> {
1123 None
1124 }
1125
1126 fn parent_context(&self) -> Option<&opentelemetry::Context> {
1128 None
1129 }
1130}
1131
1132pub struct Ctx<H>
1136where
1137 H: MinimalHandler,
1138{
1139 handler: H,
1140 wasi: WasiCtx,
1141 http: WasiHttpCtx,
1142 wrpc: WrpcCtx<H>,
1143 table: ResourceTable,
1144 parent_context: Option<opentelemetry::Context>,
1145}
1146
1147struct WrpcCtx<H>
1148where
1149 H: MinimalHandler,
1150{
1151 handler: H,
1152 shared_resources: SharedResourceTable,
1153 timeout: Duration,
1154}
1155
1156impl<H: MinimalHandler> WasiView for Ctx<H> {
1157 fn ctx(&mut self) -> WasiCtxView<'_> {
1158 WasiCtxView {
1159 ctx: &mut self.wasi,
1160 table: &mut self.table,
1161 }
1162 }
1163}
1164
1165impl<H: Handler> WrpcView for Ctx<H> {
1166 type Invoke = H;
1167
1168 fn wrpc(&mut self) -> WrpcCtxView<'_, Self::Invoke> {
1169 WrpcCtxView {
1170 ctx: &mut self.wrpc,
1171 table: &mut self.table,
1172 }
1173 }
1174}
1175
1176impl<H: Handler> wrpc_runtime_wasmtime::WrpcCtx<H> for WrpcCtx<H> {
1177 fn context(&self) -> H::Context {
1178 None
1179 }
1180
1181 fn client(&self) -> &H {
1182 &self.handler
1183 }
1184
1185 fn shared_resources(&mut self) -> &mut SharedResourceTable {
1186 &mut self.shared_resources
1187 }
1188
1189 fn timeout(&self) -> Option<Duration> {
1190 Some(self.timeout)
1191 }
1192}
1193
1194impl<H: MinimalHandler> Debug for Ctx<H> {
1195 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1196 f.debug_struct("Ctx").field("runtime", &"wasmtime").finish()
1197 }
1198}
1199
1200impl<H: MinimalHandler> Ctx<H> {
1201 fn attach_parent_context(&self) {
1202 if let Some(context) = self.parent_context.as_ref() {
1203 Span::current().set_parent(context.clone());
1204 }
1205 }
1206}