wrpc_runtime_wasmtime/rpc/host/
invoker.rs

1use anyhow::Context as _;
2use tokio::io::AsyncWriteExt as _;
3use wasmtime::component::Resource;
4use wrpc_transport::Invoke;
5
6use crate::bindings::rpc::context::Context;
7use crate::bindings::rpc::invoker::Host;
8use crate::bindings::rpc::transport::Invocation;
9use crate::rpc::WrpcRpcImpl;
10use crate::{WrpcView, WrpcViewExt as _};
11
12impl<T: WrpcView> Host for WrpcRpcImpl<T>
13where
14    T::Invoke: Clone + 'static,
15    <T::Invoke as Invoke>::Context: 'static,
16{
17    fn invoke(
18        &mut self,
19        cx: Resource<Context>,
20        instance: String,
21        name: String,
22        params: Vec<u8>,
23        paths: Vec<Vec<Option<u32>>>,
24    ) -> wasmtime::Result<Resource<Invocation>> {
25        let client = self.0.wrpc().ctx.client().clone();
26        let cx = self.0.delete_context(cx)?;
27        let paths = paths
28            .into_iter()
29            .map(|path| {
30                path.into_iter()
31                    .map(|i| i.map(usize::try_from).transpose())
32                    .collect::<Result<Box<[_]>, _>>()
33            })
34            .collect::<Result<Box<[_]>, _>>()
35            .context("failed to construct subscription paths")?;
36        let invocation = async move {
37            let (mut tx, rx) = client
38                .invoke(cx, &instance, &name, params.into(), paths)
39                .await?;
40            tx.flush()
41                .await
42                .context("failed to flush outgoing stream")?;
43            Ok((tx, rx))
44        };
45        self.0.push_invocation(invocation)
46    }
47}