wrpc_runtime_wasmtime/rpc/host/
invoker.rs1use anyhow::Context as _;
2use tokio::io::AsyncWriteExt as _;
3use wasmtime::component::Resource;
4use wrpc_transport::Invoke;
5
6use crate::bindings::rpc::context::Context;
7use crate::bindings::rpc::invoker::Host;
8use crate::bindings::rpc::transport::Invocation;
9use crate::rpc::WrpcRpcImpl;
10use crate::{WrpcView, WrpcViewExt as _};
11
12impl<T: WrpcView> Host for WrpcRpcImpl<T>
13where
14 T::Invoke: Clone + 'static,
15 <T::Invoke as Invoke>::Context: 'static,
16{
17 fn invoke(
18 &mut self,
19 cx: Resource<Context>,
20 instance: String,
21 name: String,
22 params: Vec<u8>,
23 paths: Vec<Vec<Option<u32>>>,
24 ) -> wasmtime::Result<Resource<Invocation>> {
25 let client = self.0.wrpc().ctx.client().clone();
26 let cx = self.0.delete_context(cx)?;
27 let paths = paths
28 .into_iter()
29 .map(|path| {
30 path.into_iter()
31 .map(|i| i.map(usize::try_from).transpose())
32 .collect::<Result<Box<[_]>, _>>()
33 })
34 .collect::<Result<Box<[_]>, _>>()
35 .context("failed to construct subscription paths")?;
36 let invocation = async move {
37 let (mut tx, rx) = client
38 .invoke(cx, &instance, &name, params.into(), paths)
39 .await?;
40 tx.flush()
41 .await
42 .context("failed to flush outgoing stream")?;
43 Ok((tx, rx))
44 };
45 self.0.push_invocation(invocation)
46 }
47}