1use core::future::Future;
2use core::pin::Pin;
3
4use std::{collections::HashMap, sync::Arc};
5
6use anyhow::Context as _;
7use futures::{Stream, TryStreamExt as _};
8use tokio::sync::Mutex;
9use tracing::{debug, instrument, Instrument as _, Span};
10use wasmtime::component::types;
11use wasmtime::component::{Instance, InstancePre, ResourceType};
12use wasmtime::AsContextMut;
13use wasmtime_wasi::WasiView;
14
15use crate::{call, rpc_func_name, WrpcView};
16
17pub trait ServeExt: wrpc_transport::Serve {
18 #[instrument(level = "trace", skip(self, store, instance_pre, host_resources))]
21 fn serve_function<T>(
22 &self,
23 store: impl Fn() -> wasmtime::Store<T> + Send + 'static,
24 instance_pre: InstancePre<T>,
25 host_resources: impl Into<
26 Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
27 >,
28 ty: types::ComponentFunc,
29 instance_name: &str,
30 name: &str,
31 ) -> impl Future<
32 Output = anyhow::Result<
33 impl Stream<
34 Item = anyhow::Result<(
35 Self::Context,
36 Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'static>>,
37 )>,
38 > + Send
39 + 'static,
40 >,
41 > + Send
42 where
43 T: WasiView + WrpcView + 'static,
44 {
45 let span = Span::current();
46 let host_resources = host_resources.into();
47 async move {
48 debug!(instance = instance_name, name, "serving function export");
49 let component_ty = instance_pre.component();
50 let idx = if instance_name.is_empty() {
51 None
52 } else {
53 let idx = component_ty
54 .get_export_index(None, instance_name)
55 .with_context(|| format!("export `{instance_name}` not found"))?;
56 Some(idx)
57 };
58 let idx = component_ty
59 .get_export_index(idx.as_ref(), name)
60 .with_context(|| format!("export `{name}` not found"))?;
61
62 let invocations = self.serve(instance_name, rpc_func_name(name), []).await?;
64 let name = Arc::<str>::from(name);
65 let params_ty: Arc<[_]> = ty.params().map(|(_, ty)| ty).collect();
66 let results_ty: Arc<[_]> = ty.results().collect();
67 let host_resources = Arc::clone(&host_resources);
68 Ok(invocations.map_ok(move |(cx, tx, rx)| {
69 let instance_pre = instance_pre.clone();
70 let name = Arc::clone(&name);
71 let params_ty = Arc::clone(¶ms_ty);
72 let results_ty = Arc::clone(&results_ty);
73 let host_resources = Arc::clone(&host_resources);
74
75 let mut store = store();
76 (
77 cx,
78 Box::pin(
79 async move {
80 let instance = instance_pre
81 .instantiate_async(&mut store)
82 .await
83 .context("failed to instantiate component")?;
84 let func = instance
85 .get_func(&mut store, idx)
86 .with_context(|| format!("function export `{name}` not found"))?;
87 call(
88 &mut store,
89 rx,
90 tx,
91 &[],
92 &host_resources,
93 params_ty.iter(),
94 &results_ty,
95 func,
96 )
97 .await?;
98 Ok(())
99 }
100 .instrument(span.clone()),
101 ) as Pin<Box<dyn Future<Output = _> + Send + 'static>>,
102 )
103 }))
104 }
105 }
106
107 #[instrument(
110 level = "trace",
111 skip(self, store, instance, guest_resources, host_resources)
112 )]
113 #[allow(clippy::too_many_arguments)]
114 fn serve_function_shared<T>(
115 &self,
116 store: Arc<Mutex<wasmtime::Store<T>>>,
117 instance: Instance,
118 guest_resources: impl Into<Arc<[ResourceType]>>,
119 host_resources: impl Into<
120 Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
121 >,
122 ty: types::ComponentFunc,
123 instance_name: &str,
124 name: &str,
125 ) -> impl Future<
126 Output = anyhow::Result<
127 impl Stream<
128 Item = anyhow::Result<(
129 Self::Context,
130 Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'static>>,
131 )>,
132 > + Send
133 + 'static,
134 >,
135 > + Send
136 where
137 T: WasiView + WrpcView + 'static,
138 {
139 let span = Span::current();
140 let guest_resources = guest_resources.into();
141 let host_resources = host_resources.into();
142 async move {
143 let func = {
144 let mut store = store.lock().await;
145 let idx = if instance_name.is_empty() {
146 None
147 } else {
148 let idx = instance
149 .get_export_index(store.as_context_mut(), None, instance_name)
150 .with_context(|| format!("export `{instance_name}` not found"))?;
151 Some(idx)
152 };
153 let idx = instance
154 .get_export_index(store.as_context_mut(), idx.as_ref(), name)
155 .with_context(|| format!("export `{name}` not found"))?;
156 instance.get_func(store.as_context_mut(), idx)
157 }
158 .with_context(|| format!("function export `{name}` not found"))?;
159 debug!(instance = instance_name, name, "serving function export");
160 let invocations = self.serve(instance_name, rpc_func_name(name), []).await?;
162 let params_ty: Arc<[_]> = ty.params().map(|(_, ty)| ty).collect();
163 let results_ty: Arc<[_]> = ty.results().collect();
164 let guest_resources = Arc::clone(&guest_resources);
165 let host_resources = Arc::clone(&host_resources);
166 Ok(invocations.map_ok(move |(cx, tx, rx)| {
167 let params_ty = Arc::clone(¶ms_ty);
168 let results_ty = Arc::clone(&results_ty);
169 let guest_resources = Arc::clone(&guest_resources);
170 let host_resources = Arc::clone(&host_resources);
171 let store = Arc::clone(&store);
172 (
173 cx,
174 Box::pin(
175 async move {
176 let mut store = store.lock().await;
177 call(
178 &mut *store,
179 rx,
180 tx,
181 &guest_resources,
182 &host_resources,
183 params_ty.iter(),
184 &results_ty,
185 func,
186 )
187 .await?;
188 Ok(())
189 }
190 .instrument(span.clone()),
191 ) as Pin<Box<dyn Future<Output = _> + Send + 'static>>,
192 )
193 }))
194 }
195 }
196}
197
198impl<T: wrpc_transport::Serve> ServeExt for T {}