wrpc_runtime_wasmtime/
serve.rs

1use core::future::Future;
2use core::pin::Pin;
3
4use std::{collections::HashMap, sync::Arc};
5
6use anyhow::Context as _;
7use futures::{Stream, TryStreamExt as _};
8use tokio::sync::Mutex;
9use tracing::{debug, instrument, Instrument as _, Span};
10use wasmtime::component::types;
11use wasmtime::component::{Instance, InstancePre, ResourceType};
12use wasmtime::AsContextMut;
13use wasmtime_wasi::WasiView;
14
15use crate::{call, rpc_func_name, WrpcView};
16
17pub trait ServeExt: wrpc_transport::Serve {
18    /// Serve [`types::ComponentFunc`] from an [`InstancePre`] instantiating it on each call.
19    /// This serving method does not support guest-exported resources.
20    #[instrument(level = "trace", skip(self, store, instance_pre, host_resources))]
21    fn serve_function<T>(
22        &self,
23        store: impl Fn() -> wasmtime::Store<T> + Send + 'static,
24        instance_pre: InstancePre<T>,
25        host_resources: impl Into<
26            Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
27        >,
28        ty: types::ComponentFunc,
29        instance_name: &str,
30        name: &str,
31    ) -> impl Future<
32        Output = anyhow::Result<
33            impl Stream<
34                    Item = anyhow::Result<(
35                        Self::Context,
36                        Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'static>>,
37                    )>,
38                > + Send
39                + 'static,
40        >,
41    > + Send
42    where
43        T: WasiView + WrpcView + 'static,
44    {
45        let span = Span::current();
46        let host_resources = host_resources.into();
47        async move {
48            debug!(instance = instance_name, name, "serving function export");
49            let component_ty = instance_pre.component();
50            let idx = if instance_name.is_empty() {
51                None
52            } else {
53                let idx = component_ty
54                    .get_export_index(None, instance_name)
55                    .with_context(|| format!("export `{instance_name}` not found"))?;
56                Some(idx)
57            };
58            let idx = component_ty
59                .get_export_index(idx.as_ref(), name)
60                .with_context(|| format!("export `{name}` not found"))?;
61
62            // TODO: set paths
63            let invocations = self.serve(instance_name, rpc_func_name(name), []).await?;
64            let name = Arc::<str>::from(name);
65            let params_ty: Arc<[_]> = ty.params().map(|(_, ty)| ty).collect();
66            let results_ty: Arc<[_]> = ty.results().collect();
67            let host_resources = Arc::clone(&host_resources);
68            Ok(invocations.map_ok(move |(cx, tx, rx)| {
69                let instance_pre = instance_pre.clone();
70                let name = Arc::clone(&name);
71                let params_ty = Arc::clone(&params_ty);
72                let results_ty = Arc::clone(&results_ty);
73                let host_resources = Arc::clone(&host_resources);
74
75                let mut store = store();
76                (
77                    cx,
78                    Box::pin(
79                        async move {
80                            let instance = instance_pre
81                                .instantiate_async(&mut store)
82                                .await
83                                .context("failed to instantiate component")?;
84                            let func = instance
85                                .get_func(&mut store, idx)
86                                .with_context(|| format!("function export `{name}` not found"))?;
87                            call(
88                                &mut store,
89                                rx,
90                                tx,
91                                &[],
92                                &host_resources,
93                                params_ty.iter(),
94                                &results_ty,
95                                func,
96                            )
97                            .await?;
98                            Ok(())
99                        }
100                        .instrument(span.clone()),
101                    ) as Pin<Box<dyn Future<Output = _> + Send + 'static>>,
102                )
103            }))
104        }
105    }
106
107    /// Like [`Self::serve_function`], but with a shared `store` instance.
108    /// This is required to allow for serving functions, which operate on guest-exported resources.
109    #[instrument(
110        level = "trace",
111        skip(self, store, instance, guest_resources, host_resources)
112    )]
113    #[allow(clippy::too_many_arguments)]
114    fn serve_function_shared<T>(
115        &self,
116        store: Arc<Mutex<wasmtime::Store<T>>>,
117        instance: Instance,
118        guest_resources: impl Into<Arc<[ResourceType]>>,
119        host_resources: impl Into<
120            Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>,
121        >,
122        ty: types::ComponentFunc,
123        instance_name: &str,
124        name: &str,
125    ) -> impl Future<
126        Output = anyhow::Result<
127            impl Stream<
128                    Item = anyhow::Result<(
129                        Self::Context,
130                        Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'static>>,
131                    )>,
132                > + Send
133                + 'static,
134        >,
135    > + Send
136    where
137        T: WasiView + WrpcView + 'static,
138    {
139        let span = Span::current();
140        let guest_resources = guest_resources.into();
141        let host_resources = host_resources.into();
142        async move {
143            let func = {
144                let mut store = store.lock().await;
145                let idx = if instance_name.is_empty() {
146                    None
147                } else {
148                    let idx = instance
149                        .get_export_index(store.as_context_mut(), None, instance_name)
150                        .with_context(|| format!("export `{instance_name}` not found"))?;
151                    Some(idx)
152                };
153                let idx = instance
154                    .get_export_index(store.as_context_mut(), idx.as_ref(), name)
155                    .with_context(|| format!("export `{name}` not found"))?;
156                instance.get_func(store.as_context_mut(), idx)
157            }
158            .with_context(|| format!("function export `{name}` not found"))?;
159            debug!(instance = instance_name, name, "serving function export");
160            // TODO: set paths
161            let invocations = self.serve(instance_name, rpc_func_name(name), []).await?;
162            let params_ty: Arc<[_]> = ty.params().map(|(_, ty)| ty).collect();
163            let results_ty: Arc<[_]> = ty.results().collect();
164            let guest_resources = Arc::clone(&guest_resources);
165            let host_resources = Arc::clone(&host_resources);
166            Ok(invocations.map_ok(move |(cx, tx, rx)| {
167                let params_ty = Arc::clone(&params_ty);
168                let results_ty = Arc::clone(&results_ty);
169                let guest_resources = Arc::clone(&guest_resources);
170                let host_resources = Arc::clone(&host_resources);
171                let store = Arc::clone(&store);
172                (
173                    cx,
174                    Box::pin(
175                        async move {
176                            let mut store = store.lock().await;
177                            call(
178                                &mut *store,
179                                rx,
180                                tx,
181                                &guest_resources,
182                                &host_resources,
183                                params_ty.iter(),
184                                &results_ty,
185                                func,
186                            )
187                            .await?;
188                            Ok(())
189                        }
190                        .instrument(span.clone()),
191                    ) as Pin<Box<dyn Future<Output = _> + Send + 'static>>,
192                )
193            }))
194        }
195    }
196}
197
198impl<T: wrpc_transport::Serve> ServeExt for T {}