wrpc_runtime_wasmtime/
polyfill.rs

1use core::iter::zip;
2use core::pin::pin;
3
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use anyhow::{bail, ensure, Context as _};
8use bytes::BytesMut;
9use futures::future::try_join_all;
10use tokio::io::AsyncWriteExt as _;
11use tokio::time::Instant;
12use tokio::try_join;
13use tokio_util::codec::Encoder;
14use tracing::{debug, instrument, trace, warn, Instrument as _, Span};
15use wasmtime::component::{types, LinkerInstance, ResourceType, Type, Val};
16use wasmtime::{AsContextMut, Engine, StoreContextMut};
17use wrpc_transport::{Index as _, Invoke, InvokeExt as _};
18
19use crate::rpc::Error;
20use crate::{read_value, rpc_func_name, rpc_result_type, ValEncoder, WrpcView, WrpcViewExt as _};
21
22/// Polyfill [`types::ComponentItem`] in a [`LinkerInstance`] using [`wrpc_transport::Invoke`]
23#[instrument(level = "trace", skip_all)]
24pub fn link_item<V>(
25    engine: &Engine,
26    linker: &mut LinkerInstance<V>,
27    guest_resources: impl Into<Arc<[ResourceType]>>,
28    host_resources: impl Into<Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>>,
29    ty: types::ComponentItem,
30    instance: impl Into<Arc<str>>,
31    name: impl Into<Arc<str>>,
32) -> wasmtime::Result<()>
33where
34    V: WrpcView,
35{
36    let instance = instance.into();
37    let guest_resources = guest_resources.into();
38    let host_resources = host_resources.into();
39    match ty {
40        types::ComponentItem::ComponentFunc(ty) => {
41            let name = name.into();
42            debug!(?instance, ?name, "linking function");
43            link_function(
44                linker,
45                Arc::clone(&guest_resources),
46                Arc::clone(&host_resources),
47                ty,
48                instance,
49                name,
50            )?;
51        }
52        types::ComponentItem::CoreFunc(_) => {
53            bail!("polyfilling core functions not supported yet")
54        }
55        types::ComponentItem::Module(_) => bail!("polyfilling modules not supported yet"),
56        types::ComponentItem::Component(ty) => {
57            for (name, ty) in ty.imports(engine) {
58                debug!(?instance, name, "linking component item");
59                link_item(
60                    engine,
61                    linker,
62                    Arc::clone(&guest_resources),
63                    Arc::clone(&host_resources),
64                    ty,
65                    "",
66                    name,
67                )?;
68            }
69        }
70        types::ComponentItem::ComponentInstance(ty) => {
71            let name = name.into();
72            let mut linker = linker
73                .instance(&name)
74                .with_context(|| format!("failed to instantiate `{name}` in the linker"))?;
75            debug!(?instance, ?name, "linking instance");
76            link_instance(
77                engine,
78                &mut linker,
79                guest_resources,
80                host_resources,
81                ty,
82                name,
83            )?;
84        }
85        types::ComponentItem::Type(_) => {}
86        types::ComponentItem::Resource(ty) => {
87            let name = name.into();
88            let Some((guest_ty, host_ty)) = host_resources
89                .get(&*instance)
90                .and_then(|instance| instance.get(&*name))
91            else {
92                bail!("resource type for {instance}/{name} not defined");
93            };
94            ensure!(ty == *guest_ty, "{instance}/{name} resource type mismatch");
95
96            debug!(?instance, ?name, "linking resource");
97            linker.resource(&name, *host_ty, |_, _| Ok(()))?;
98        }
99    }
100    Ok(())
101}
102
103/// Polyfill [`types::ComponentInstance`] in a [`LinkerInstance`] using [`wrpc_transport::Invoke`]
104#[instrument(level = "trace", skip_all)]
105pub fn link_instance<V>(
106    engine: &Engine,
107    linker: &mut LinkerInstance<V>,
108    guest_resources: impl Into<Arc<[ResourceType]>>,
109    host_resources: impl Into<Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>>,
110    ty: types::ComponentInstance,
111    name: impl Into<Arc<str>>,
112) -> wasmtime::Result<()>
113where
114    V: WrpcView,
115{
116    let instance = name.into();
117    let guest_resources = guest_resources.into();
118    let host_resources = host_resources.into();
119    for (name, ty) in ty.exports(engine) {
120        debug!(name, "linking instance item");
121        link_item(
122            engine,
123            linker,
124            Arc::clone(&guest_resources),
125            Arc::clone(&host_resources),
126            ty,
127            Arc::clone(&instance),
128            name,
129        )?;
130    }
131    Ok(())
132}
133
134#[allow(clippy::too_many_arguments)]
135async fn invoke<T: WrpcView>(
136    mut store: &mut StoreContextMut<'_, T>,
137    params: &[Val],
138    results: &mut [Val],
139    guest_resources: Arc<[ResourceType]>,
140    params_ty: impl IntoIterator<Item = (&str, Type)>,
141    results_ty: impl IntoIterator<Item = Type>,
142    instance: Arc<str>,
143    name: Arc<str>,
144) -> wasmtime::Result<anyhow::Result<()>> {
145    let mut buf = BytesMut::default();
146    let mut deferred = vec![];
147    for (v, (name, ref ty)) in zip(params, params_ty) {
148        let mut enc = ValEncoder::new(store.as_context_mut(), ty, &guest_resources);
149        enc.encode(v, &mut buf)
150            .with_context(|| format!("failed to encode parameter `{name}`"))?;
151        deferred.push(enc.deferred);
152    }
153    let view = store.data_mut().wrpc();
154    let clt = view.ctx.client();
155    let cx = view.ctx.context();
156    let timeout = view.ctx.timeout();
157    let buf = buf.freeze();
158    // TODO: set paths
159    let paths = &[[]; 0];
160    let rpc_name = rpc_func_name(&name);
161    let start = Instant::now();
162    let invocation = if let Some(timeout) = timeout {
163        clt.timeout(timeout)
164            .invoke(cx, &instance, rpc_name, buf, paths)
165            .await
166    } else {
167        clt.invoke(cx, &instance, rpc_name, buf, paths).await
168    }
169    .with_context(|| format!("failed to invoke `{instance}.{name}` polyfill via wRPC"));
170    let (outgoing, incoming) = match invocation {
171        Ok((outgoing, incoming)) => (outgoing, incoming),
172        Err(err) => return Ok(Err(err)),
173    };
174    let tx = async {
175        try_join_all(
176            zip(0.., deferred)
177                .filter_map(|(i, f)| f.map(|f| (outgoing.index(&[i]), f)))
178                .map(|(w, f)| async move {
179                    let w = w?;
180                    f(w).await
181                }),
182        )
183        .await
184        .context("failed to write asynchronous parameters")?;
185        let mut outgoing = pin!(outgoing);
186        outgoing
187            .flush()
188            .await
189            .context("failed to flush outgoing stream")?;
190        if let Err(err) = outgoing.shutdown().await {
191            trace!(?err, "failed to shutdown outgoing stream");
192        }
193        anyhow::Ok(())
194    };
195    let rx = async {
196        let mut incoming = pin!(incoming);
197        for (i, (v, ref ty)) in zip(results, results_ty).enumerate() {
198            read_value(&mut store, &mut incoming, &guest_resources, v, ty, &[i])
199                .await
200                .with_context(|| format!("failed to decode return value {i}"))?;
201        }
202        Ok(())
203    };
204    let res = if let Some(timeout) = timeout {
205        let timeout = timeout.saturating_sub(Instant::now().saturating_duration_since(start));
206        try_join!(
207            async {
208                tokio::time::timeout(timeout, tx)
209                    .await
210                    .context("data transmission timed out")?
211            },
212            async {
213                tokio::time::timeout(timeout, rx)
214                    .await
215                    .context("data receipt timed out")?
216            },
217        )
218    } else {
219        try_join!(tx, rx)
220    };
221    match res {
222        Ok(((), ())) => Ok(Ok(())),
223        Err(err) => Ok(Err(err)),
224    }
225}
226
227/// Polyfill [`types::ComponentFunc`] in a [`LinkerInstance`] using [`wrpc_transport::Invoke`]
228#[instrument(level = "trace", skip_all)]
229pub fn link_function<V>(
230    linker: &mut LinkerInstance<V>,
231    guest_resources: impl Into<Arc<[ResourceType]>>,
232    host_resources: impl Into<Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>>,
233    ty: types::ComponentFunc,
234    instance: impl Into<Arc<str>>,
235    name: impl Into<Arc<str>>,
236) -> wasmtime::Result<()>
237where
238    V: WrpcView,
239{
240    let span = Span::current();
241    let instance = instance.into();
242    let name = name.into();
243    let guest_resources = guest_resources.into();
244    let host_resources = host_resources.into();
245    match rpc_result_type(&host_resources, ty.results()) {
246        None => linker.func_new_async(&Arc::clone(&name), move |mut store, params, results| {
247            let ty = ty.clone();
248            let instance = Arc::clone(&instance);
249            let name = Arc::clone(&name);
250            let resources = Arc::clone(&guest_resources);
251            Box::new(
252                async move {
253                    match invoke(
254                        &mut store,
255                        params,
256                        results,
257                        resources,
258                        ty.params(),
259                        ty.results(),
260                        instance,
261                        name,
262                    )
263                    .await
264                    {
265                        Ok(Ok(())) => Ok(()),
266                        Ok(Err(err)) => Err(err),
267                        Err(err) => Err(err),
268                    }
269                }
270                .instrument(span.clone()),
271            )
272        }),
273        // `result<_, rpc-eror>`
274        Some(None) => {
275            linker.func_new_async(&Arc::clone(&name), move |mut store, params, results| {
276                let ty = ty.clone();
277                let instance = Arc::clone(&instance);
278                let name = Arc::clone(&name);
279                let resources = Arc::clone(&guest_resources);
280                Box::new(
281                    async move {
282                        let [result] = results else {
283                            bail!("result type mismatch");
284                        };
285                        match invoke(
286                            &mut store,
287                            params,
288                            &mut [],
289                            resources,
290                            ty.params(),
291                            None,
292                            instance,
293                            name,
294                        )
295                        .await?
296                        {
297                            Ok(()) => {
298                                *result = Val::Result(Ok(None));
299                            }
300                            Err(err) => {
301                                let err = store.data_mut().push_error(Error::Invoke(err))?;
302                                let err = err
303                                    .try_into_resource_any(&mut store)
304                                    .context("failed to lower error resource")?;
305                                *result = Val::Result(Err(Some(Box::new(Val::Resource(err)))));
306                            }
307                        }
308                        Ok(())
309                    }
310                    .instrument(span.clone()),
311                )
312            })
313        }
314        // `result<T, rpc-eror>`
315        Some(Some(result_ty)) => {
316            linker.func_new_async(&Arc::clone(&name), move |mut store, params, results| {
317                let ty = ty.clone();
318                let instance = Arc::clone(&instance);
319                let name = Arc::clone(&name);
320                let resources = Arc::clone(&guest_resources);
321                let result_ty = result_ty.clone();
322                Box::new(
323                    async move {
324                        let [result] = results else {
325                            bail!("result type mismatch");
326                        };
327                        let mut ok = [Val::Bool(false); 1];
328                        match invoke(
329                            &mut store,
330                            params,
331                            ok.as_mut_slice(),
332                            resources,
333                            ty.params(),
334                            [result_ty],
335                            instance,
336                            name,
337                        )
338                        .await?
339                        {
340                            Ok(()) => {
341                                let [ok] = ok;
342                                *result = Val::Result(Ok(Some(Box::new(ok))));
343                            }
344                            Err(err) => {
345                                let err = store.data_mut().push_error(Error::Invoke(err))?;
346                                let err = err
347                                    .try_into_resource_any(&mut store)
348                                    .context("failed to lower error resource")?;
349                                *result = Val::Result(Err(Some(Box::new(Val::Resource(err)))));
350                            }
351                        }
352                        Ok(())
353                    }
354                    .instrument(span.clone()),
355                )
356            })
357        }
358    }
359}