combine/stream/buffered.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
use alloc::collections::VecDeque;
use crate::{
error::StreamError,
stream::{ParseError, Positioned, ResetStream, StreamErrorFor, StreamOnce},
};
/// `Stream` which buffers items from an instance of `StreamOnce` into a ring buffer.
/// Instances of `StreamOnce` which is not able to implement `ResetStream` (such as `ReadStream`) may
/// use this as a way to implement `ResetStream` and become a full `Stream` instance.
///
/// The drawback is that the buffer only stores a limited number of items which limits how many
/// tokens that can be reset and replayed. If a `buffered::Stream` is reset past this limit an error
/// will be returned when `uncons` is next called.
///
/// NOTE: If this stream is used in conjunction with an error enhancing stream such as
/// `easy::Stream` (also via the `easy_parser` method) it is recommended that the `buffered::Stream`
/// instance wraps the `easy::Stream` instance instead of the other way around.
///
/// ```ignore
/// // DO
/// buffered::Stream::new(easy::Stream(..), ..)
/// // DON'T
/// easy::Stream(buffered::Stream::new(.., ..))
/// parser.easy_parse(buffered::Stream::new(..));
/// ```
#[derive(Debug, PartialEq)]
pub struct Stream<Input>
where
Input: StreamOnce + Positioned,
{
offset: usize,
iter: Input,
buffer_offset: usize,
buffer: VecDeque<(Input::Token, Input::Position)>,
}
impl<Input> ResetStream for Stream<Input>
where
Input: Positioned,
{
type Checkpoint = usize;
fn checkpoint(&self) -> Self::Checkpoint {
self.offset
}
fn reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), Self::Error> {
if checkpoint < self.buffer_offset - self.buffer.len() {
// We have backtracked to far
Err(Self::Error::from_error(
self.position(),
StreamErrorFor::<Self>::message_static_message("Backtracked to far"),
))
} else {
self.offset = checkpoint;
Ok(())
}
}
}
impl<Input> Stream<Input>
where
Input: StreamOnce + Positioned,
Input::Position: Clone,
Input::Token: Clone,
{
/// Constructs a new `BufferedStream` from a `StreamOnce` instance with a `lookahead`
/// number of elements that can be stored in the buffer.
pub fn new(iter: Input, lookahead: usize) -> Stream<Input> {
Stream {
offset: 0,
iter,
buffer_offset: 0,
buffer: VecDeque::with_capacity(lookahead),
}
}
}
impl<Input> Positioned for Stream<Input>
where
Input: StreamOnce + Positioned,
{
#[inline]
fn position(&self) -> Self::Position {
if self.offset >= self.buffer_offset {
self.iter.position()
} else if self.offset < self.buffer_offset - self.buffer.len() {
self.buffer
.front()
.expect("At least 1 element in the buffer")
.1
.clone()
} else {
self.buffer[self.buffer.len() - (self.buffer_offset - self.offset)]
.1
.clone()
}
}
}
impl<Input> StreamOnce for Stream<Input>
where
Input: StreamOnce + Positioned,
Input::Token: Clone,
{
type Token = Input::Token;
type Range = Input::Range;
type Position = Input::Position;
type Error = Input::Error;
#[inline]
fn uncons(&mut self) -> Result<Input::Token, StreamErrorFor<Self>> {
if self.offset >= self.buffer_offset {
let position = self.iter.position();
let token = self.iter.uncons()?;
self.buffer_offset += 1;
// We want the VecDeque to only keep the last .capacity() elements so we need to remove
// an element if it gets to large
if self.buffer.len() == self.buffer.capacity() {
self.buffer.pop_front();
}
self.buffer.push_back((token.clone(), position));
self.offset += 1;
Ok(token)
} else if self.offset < self.buffer_offset - self.buffer.len() {
// We have backtracked to far
Err(StreamError::message_static_message("Backtracked to far"))
} else {
let value = self.buffer[self.buffer.len() - (self.buffer_offset - self.offset)]
.0
.clone();
self.offset += 1;
Ok(value)
}
}
fn is_partial(&self) -> bool {
self.iter.is_partial()
}
}