wrpc_transport/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#![allow(clippy::type_complexity)]
#![deny(missing_docs)]

//! wRPC transport abstractions, codec and framing
//!
//! wRPC is an RPC framework based on [WIT](https://component-model.bytecodealliance.org/design/wit.html).
//! It follows client-server model, where peers (servers) may serve function and method calls invoked by the other peers (clients).
//!
//! The two main abstractions on top of which wRPC is built are:
//! - [Invoke] - the client-side handle to a wRPC transport, allowing clients to *invoke* WIT functions over wRPC transport
//! - [Serve] - the server-side handle to a wRPC transport, allowing servers to *serve* WIT functions over wRPC transport
//!
//! Implementations of [Invoke] and [Serve] define transport-specific, multiplexed bidirectional byte stream types:
//! - [`Invoke::Incoming`] and [`Serve::Incoming`] represent the stream *incoming* from a peer.
//! - [`Invoke::Outgoing`] and [`Serve::Outgoing`] represent the stream *outgoing* to a peer.

pub mod frame;
pub mod invoke;
pub mod serve;

mod value;

pub use frame::{
    Accept, Decoder as FrameDecoder, Encoder as FrameEncoder, Frame, FrameRef, Server,
};
pub use invoke::{Invoke, InvokeExt};
pub use send_future::SendFuture;
pub use serve::{Serve, ServeExt};
pub use value::*;

#[cfg(any(target_family = "wasm", feature = "net"))]
pub use frame::tcp;
#[cfg(all(unix, feature = "net"))]
pub use frame::unix;

use core::mem;
use core::pin::{pin, Pin};
use core::task::{Context, Poll};

use bytes::BytesMut;
use tokio::io::{AsyncRead, ReadBuf};
use tracing::trace;

/// Internal workaround trait
///
/// This is an internal trait used as a workaround for
/// https://github.com/rust-lang/rust/issues/63033
#[doc(hidden)]
pub trait Captures<'a> {}

impl<'a, T: ?Sized> Captures<'a> for T {}

/// Multiplexes streams
///
/// Implementations of this trait define multiplexing for underlying connections
/// using a particular structural `path`
pub trait Index<T> {
    /// Index the entity using a structural `path`
    fn index(&self, path: &[usize]) -> anyhow::Result<T>;
}

/// Buffered incoming stream used for decoding values
pub struct Incoming<T> {
    buffer: BytesMut,
    inner: T,
}

impl<T: Index<T>> Index<Self> for Incoming<T> {
    fn index(&self, path: &[usize]) -> anyhow::Result<Self> {
        let inner = self.inner.index(path)?;
        Ok(Self {
            buffer: BytesMut::default(),
            inner,
        })
    }
}

impl<T: AsyncRead + Unpin> AsyncRead for Incoming<T> {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        let cap = buf.remaining();
        if cap == 0 {
            trace!("attempt to read empty buffer");
            return Poll::Ready(Ok(()));
        }
        if !self.buffer.is_empty() {
            if self.buffer.len() > cap {
                trace!(cap, len = self.buffer.len(), "reading part of buffer");
                buf.put_slice(&self.buffer.split_to(cap));
            } else {
                trace!(cap, len = self.buffer.len(), "reading full buffer");
                buf.put_slice(&mem::take(&mut self.buffer));
            }
            return Poll::Ready(Ok(()));
        }
        pin!(&mut self.inner).poll_read(cx, buf)
    }
}