wrpc_transport/frame/conn/
client.rs

1use core::marker::PhantomData;
2
3use anyhow::Context as _;
4use bytes::{BufMut as _, Bytes, BytesMut};
5use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt as _};
6use tokio_util::codec::Encoder;
7use tracing::{instrument, trace};
8use wasm_tokio::{CoreNameEncoder, CoreVecEncoderBytes};
9
10use crate::frame::conn::{Incoming, Outgoing};
11use crate::frame::{Conn, ConnHandler, PROTOCOL};
12
13/// Defines invocation behavior
14#[derive(Clone)]
15pub struct InvokeBuilder<H = ()>(PhantomData<H>)
16where
17    H: ?Sized;
18
19impl<H> InvokeBuilder<H> {
20    /// Invoke function `func` on instance `instance`
21    #[instrument(level = "trace", skip_all)]
22    pub async fn invoke<P, I, O>(
23        self,
24        mut tx: O,
25        rx: I,
26        instance: &str,
27        func: &str,
28        params: Bytes,
29        paths: impl AsRef<[P]> + Send,
30    ) -> anyhow::Result<(Outgoing, Incoming)>
31    where
32        P: AsRef<[Option<usize>]> + Send + Sync,
33        I: AsyncRead + Unpin + Send + 'static,
34        O: AsyncWrite + Unpin + Send + 'static,
35        H: ConnHandler<I, O>,
36    {
37        let mut buf = BytesMut::with_capacity(
38            17_usize // len(PROTOCOL) + len(instance) + len(func) + len([]) + len(params)
39                .saturating_add(instance.len())
40                .saturating_add(func.len())
41                .saturating_add(params.len()),
42        );
43        buf.put_u8(PROTOCOL);
44        CoreNameEncoder.encode(instance, &mut buf)?;
45        CoreNameEncoder.encode(func, &mut buf)?;
46        buf.put_u8(0);
47        CoreVecEncoderBytes.encode(params, &mut buf)?;
48        trace!(?buf, "writing invocation");
49        tx.write_all(&buf)
50            .await
51            .context("failed to initialize connection")?;
52
53        let Conn { tx, rx } = Conn::new::<H, _, _, _>(rx, tx, paths.as_ref());
54        Ok((tx, rx))
55    }
56}
57
58impl<H> Default for InvokeBuilder<H> {
59    fn default() -> Self {
60        Self(PhantomData)
61    }
62}
63
64/// Invoke function `func` on instance `instance`
65#[instrument(level = "trace", skip_all)]
66pub async fn invoke<P, I, O>(
67    tx: O,
68    rx: I,
69    instance: &str,
70    func: &str,
71    params: Bytes,
72    paths: impl AsRef<[P]> + Send,
73) -> anyhow::Result<(Outgoing, Incoming)>
74where
75    P: AsRef<[Option<usize>]> + Send + Sync,
76    I: AsyncRead + Unpin + Send + 'static,
77    O: AsyncWrite + Unpin + Send + 'static,
78{
79    InvokeBuilder::<()>::default()
80        .invoke(tx, rx, instance, func, params, paths)
81        .await
82}