wrpc_transport/frame/conn/
client.rs1use core::marker::PhantomData;
2
3use anyhow::Context as _;
4use bytes::{BufMut as _, Bytes, BytesMut};
5use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt as _};
6use tokio_util::codec::Encoder;
7use tracing::{instrument, trace};
8use wasm_tokio::{CoreNameEncoder, CoreVecEncoderBytes};
9
10use crate::frame::conn::{Incoming, Outgoing};
11use crate::frame::{Conn, ConnHandler, PROTOCOL};
12
13#[derive(Clone)]
15pub struct InvokeBuilder<H = ()>(PhantomData<H>)
16where
17 H: ?Sized;
18
19impl<H> InvokeBuilder<H> {
20 #[instrument(level = "trace", skip_all)]
22 pub async fn invoke<P, I, O>(
23 self,
24 mut tx: O,
25 rx: I,
26 instance: &str,
27 func: &str,
28 params: Bytes,
29 paths: impl AsRef<[P]> + Send,
30 ) -> anyhow::Result<(Outgoing, Incoming)>
31 where
32 P: AsRef<[Option<usize>]> + Send + Sync,
33 I: AsyncRead + Unpin + Send + 'static,
34 O: AsyncWrite + Unpin + Send + 'static,
35 H: ConnHandler<I, O>,
36 {
37 let mut buf = BytesMut::with_capacity(
38 17_usize .saturating_add(instance.len())
40 .saturating_add(func.len())
41 .saturating_add(params.len()),
42 );
43 buf.put_u8(PROTOCOL);
44 CoreNameEncoder.encode(instance, &mut buf)?;
45 CoreNameEncoder.encode(func, &mut buf)?;
46 buf.put_u8(0);
47 CoreVecEncoderBytes.encode(params, &mut buf)?;
48 trace!(?buf, "writing invocation");
49 tx.write_all(&buf)
50 .await
51 .context("failed to initialize connection")?;
52
53 let Conn { tx, rx } = Conn::new::<H, _, _, _>(rx, tx, paths.as_ref());
54 Ok((tx, rx))
55 }
56}
57
58impl<H> Default for InvokeBuilder<H> {
59 fn default() -> Self {
60 Self(PhantomData)
61 }
62}
63
64#[instrument(level = "trace", skip_all)]
66pub async fn invoke<P, I, O>(
67 tx: O,
68 rx: I,
69 instance: &str,
70 func: &str,
71 params: Bytes,
72 paths: impl AsRef<[P]> + Send,
73) -> anyhow::Result<(Outgoing, Incoming)>
74where
75 P: AsRef<[Option<usize>]> + Send + Sync,
76 I: AsyncRead + Unpin + Send + 'static,
77 O: AsyncWrite + Unpin + Send + 'static,
78{
79 InvokeBuilder::<()>::default()
80 .invoke(tx, rx, instance, func, params, paths)
81 .await
82}