1use core::pin::Pin;
4use core::task::{Context, Poll};
5
6use futures::{Stream, StreamExt as _};
7
8pub struct BufferedIncomingStream<T> {
10 stream: Pin<Box<dyn Stream<Item = Vec<T>> + Send>>,
11 buffer: Vec<T>,
12}
13
14impl<T> BufferedIncomingStream<T> {
15 #[must_use]
17 pub fn new(stream: Pin<Box<dyn Stream<Item = Vec<T>> + Send>>) -> Self {
18 Self {
19 stream,
20 buffer: Vec::default(),
21 }
22 }
23}
24
25impl<T> Stream for BufferedIncomingStream<T>
26where
27 T: Unpin,
28{
29 type Item = T;
30
31 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
32 if self.buffer.is_empty() {
33 match self.stream.poll_next_unpin(cx) {
34 Poll::Pending => Poll::Pending,
35 Poll::Ready(None) => Poll::Ready(None),
36 Poll::Ready(Some(mut values)) => match values.len() {
37 0 => Poll::Ready(None),
38 1 => {
39 let item = values.pop().expect("element missing");
40 Poll::Ready(Some(item))
41 }
42 _ => {
43 self.buffer = values.split_off(1);
44 let item = values.pop().expect("element missing");
45 assert!(values.is_empty());
46 Poll::Ready(Some(item))
47 }
48 },
49 }
50 } else {
51 let tail = self.buffer.split_off(1);
52 let item = self.buffer.pop().expect("element missing");
53 assert!(self.buffer.is_empty());
54 self.buffer = tail;
55 Poll::Ready(Some(item))
56 }
57 }
58}