wasmcloud_runtime/
io.rs

1//! wasmCloud I/O functionality
2
3use core::pin::Pin;
4use core::task::{Context, Poll};
5
6use futures::{Stream, StreamExt as _};
7
8/// Incoming value [`Stream`] wrapper, which buffers the chunks and flattens them
9pub struct BufferedIncomingStream<T> {
10    stream: Pin<Box<dyn Stream<Item = Vec<T>> + Send>>,
11    buffer: Vec<T>,
12}
13
14impl<T> BufferedIncomingStream<T> {
15    /// Create a new [`BufferedIncomingStream`]
16    #[must_use]
17    pub fn new(stream: Pin<Box<dyn Stream<Item = Vec<T>> + Send>>) -> Self {
18        Self {
19            stream,
20            buffer: Vec::default(),
21        }
22    }
23}
24
25impl<T> Stream for BufferedIncomingStream<T>
26where
27    T: Unpin,
28{
29    type Item = T;
30
31    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
32        if self.buffer.is_empty() {
33            match self.stream.poll_next_unpin(cx) {
34                Poll::Pending => Poll::Pending,
35                Poll::Ready(None) => Poll::Ready(None),
36                Poll::Ready(Some(mut values)) => match values.len() {
37                    0 => Poll::Ready(None),
38                    1 => {
39                        let item = values.pop().expect("element missing");
40                        Poll::Ready(Some(item))
41                    }
42                    _ => {
43                        self.buffer = values.split_off(1);
44                        let item = values.pop().expect("element missing");
45                        assert!(values.is_empty());
46                        Poll::Ready(Some(item))
47                    }
48                },
49            }
50        } else {
51            let tail = self.buffer.split_off(1);
52            let item = self.buffer.pop().expect("element missing");
53            assert!(self.buffer.is_empty());
54            self.buffer = tail;
55            Poll::Ready(Some(item))
56        }
57    }
58}