combine/stream/
buffered.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
use alloc::collections::VecDeque;

use crate::{
    error::StreamError,
    stream::{ParseError, Positioned, ResetStream, StreamErrorFor, StreamOnce},
};

/// `Stream` which buffers items from an instance of `StreamOnce` into a ring buffer.
/// Instances of `StreamOnce` which is not able to implement `ResetStream` (such as `ReadStream`) may
/// use this as a way to implement `ResetStream` and become a full `Stream` instance.
///
/// The drawback is that the buffer only stores a limited number of items which limits how many
/// tokens that can be reset and replayed. If a `buffered::Stream` is reset past this limit an error
/// will be returned when `uncons` is next called.
///
/// NOTE: If this stream is used in conjunction with an error enhancing stream such as
/// `easy::Stream` (also via the `easy_parser` method) it is recommended that the `buffered::Stream`
/// instance wraps the `easy::Stream` instance instead of the other way around.
///
/// ```ignore
/// // DO
/// buffered::Stream::new(easy::Stream(..), ..)
/// // DON'T
/// easy::Stream(buffered::Stream::new(.., ..))
/// parser.easy_parse(buffered::Stream::new(..));
/// ```
#[derive(Debug, PartialEq)]
pub struct Stream<Input>
where
    Input: StreamOnce + Positioned,
{
    offset: usize,
    iter: Input,
    buffer_offset: usize,
    buffer: VecDeque<(Input::Token, Input::Position)>,
}

impl<Input> ResetStream for Stream<Input>
where
    Input: Positioned,
{
    type Checkpoint = usize;

    fn checkpoint(&self) -> Self::Checkpoint {
        self.offset
    }

    fn reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), Self::Error> {
        if checkpoint < self.buffer_offset - self.buffer.len() {
            // We have backtracked to far
            Err(Self::Error::from_error(
                self.position(),
                StreamErrorFor::<Self>::message_static_message("Backtracked to far"),
            ))
        } else {
            self.offset = checkpoint;
            Ok(())
        }
    }
}

impl<Input> Stream<Input>
where
    Input: StreamOnce + Positioned,
    Input::Position: Clone,
    Input::Token: Clone,
{
    /// Constructs a new `BufferedStream` from a `StreamOnce` instance with a `lookahead`
    /// number of elements that can be stored in the buffer.
    pub fn new(iter: Input, lookahead: usize) -> Stream<Input> {
        Stream {
            offset: 0,
            iter,
            buffer_offset: 0,
            buffer: VecDeque::with_capacity(lookahead),
        }
    }
}

impl<Input> Positioned for Stream<Input>
where
    Input: StreamOnce + Positioned,
{
    #[inline]
    fn position(&self) -> Self::Position {
        if self.offset >= self.buffer_offset {
            self.iter.position()
        } else if self.offset < self.buffer_offset - self.buffer.len() {
            self.buffer
                .front()
                .expect("At least 1 element in the buffer")
                .1
                .clone()
        } else {
            self.buffer[self.buffer.len() - (self.buffer_offset - self.offset)]
                .1
                .clone()
        }
    }
}

impl<Input> StreamOnce for Stream<Input>
where
    Input: StreamOnce + Positioned,
    Input::Token: Clone,
{
    type Token = Input::Token;
    type Range = Input::Range;
    type Position = Input::Position;
    type Error = Input::Error;

    #[inline]
    fn uncons(&mut self) -> Result<Input::Token, StreamErrorFor<Self>> {
        if self.offset >= self.buffer_offset {
            let position = self.iter.position();
            let token = self.iter.uncons()?;
            self.buffer_offset += 1;
            // We want the VecDeque to only keep the last .capacity() elements so we need to remove
            // an element if it gets to large
            if self.buffer.len() == self.buffer.capacity() {
                self.buffer.pop_front();
            }
            self.buffer.push_back((token.clone(), position));
            self.offset += 1;
            Ok(token)
        } else if self.offset < self.buffer_offset - self.buffer.len() {
            // We have backtracked to far
            Err(StreamError::message_static_message("Backtracked to far"))
        } else {
            let value = self.buffer[self.buffer.len() - (self.buffer_offset - self.offset)]
                .0
                .clone();
            self.offset += 1;
            Ok(value)
        }
    }

    fn is_partial(&self) -> bool {
        self.iter.is_partial()
    }
}