kafka/protocol/
zreader.rs

1use std::str;
2
3use crate::{error::Result, Error};
4use byteorder::{BigEndian, ByteOrder};
5
6static EMPTY_STR: &str = "";
7
8pub struct ZReader<'a> {
9    data: &'a [u8],
10}
11
12// ~ a helper macro to hide away the used byte order
13macro_rules! dec {
14    ($method:ident, $src:expr) => {
15        BigEndian::$method($src)
16    };
17}
18
19impl<'a> ZReader<'a> {
20    pub fn new(data: &[u8]) -> ZReader<'_> {
21        ZReader { data }
22    }
23
24    /// ~ Consumes `n_bytes` from the underlying slice while returning
25    /// the consumed bytes. The returned slice is guaranteed to be
26    /// `n_bytes` long. This operation either succeeds or fails as a
27    /// whole. Upon failure the reader will _not_ advance.
28    pub fn read<'b>(&'b mut self, n_bytes: usize) -> Result<&'a [u8]> {
29        if n_bytes > self.data.len() {
30            Err(Error::UnexpectedEOF)
31        } else {
32            let (x, rest) = self.data.split_at(n_bytes);
33            self.data = rest;
34            Ok(x)
35        }
36    }
37
38    /// ~ Retrieves the rest of the underlying slice without advancing
39    /// this reader.
40    pub fn rest(&self) -> &[u8] {
41        self.data
42    }
43
44    /// ~ Determines whether there are still some bytes available for
45    /// consumption.
46    pub fn is_empty(&self) -> bool {
47        self.data.is_empty()
48    }
49
50    pub fn read_i8(&mut self) -> Result<i8> {
51        self.read(1).map(|x| unsafe { *x.get_unchecked(0) as i8 })
52    }
53
54    pub fn read_i16(&mut self) -> Result<i16> {
55        self.read(2).map(|x| dec!(read_i16, x))
56    }
57
58    pub fn read_i32(&mut self) -> Result<i32> {
59        self.read(4).map(|x| dec!(read_i32, x))
60    }
61
62    pub fn read_i64(&mut self) -> Result<i64> {
63        self.read(8).map(|x| dec!(read_i64, x))
64    }
65
66    /// Reads a string as defined by the Kafka Protocol. The 'null'
67    /// string is delivered as the empty string.
68    pub fn read_str<'b>(&'b mut self) -> Result<&'a str> {
69        let len = self.read_i16()?;
70        if len <= 0 {
71            Ok(EMPTY_STR)
72        } else {
73            // alternatively: str::from_utf8_unchecked(..)
74            match str::from_utf8(self.read(len as usize)?) {
75                Ok(s) => Ok(s),
76                Err(_) => Err(Error::StringDecodeError),
77            }
78        }
79    }
80
81    /// Reads 'bytes' as defined by the Kafka Protocol. The 'null'
82    /// bytes are delivered as an empty slice.
83    pub fn read_bytes<'b>(&'b mut self) -> Result<&'a [u8]> {
84        let len = self.read_i32()?;
85        if len <= 0 {
86            Ok(&self.data[0..0])
87        } else {
88            self.read(len as usize)
89        }
90    }
91
92    /// Reads the size of an array as defined by the Kafka
93    /// Protocol. The size of 'null' array will be returned as the
94    /// size an array of an empty array.
95    pub fn read_array_len(&mut self) -> Result<usize> {
96        let len = self.read_i32()?;
97        Ok(if len < 0 { 0 } else { len as usize })
98    }
99}
100
101#[test]
102fn test_read() {
103    let data = &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
104
105    // ~ consume the reader in small chunks
106    let mut r = ZReader::new(data);
107    assert_eq!(&[0, 1], r.read(2).unwrap());
108    assert_eq!(&[2], r.read(1).unwrap());
109    assert_eq!(&[3, 4, 5, 6, 7], r.read(5).unwrap());
110    assert_eq!(&[8, 9], r.read(2).unwrap());
111    assert!(r.read(1).is_err());
112
113    // ~ consume the whole available input
114    r = ZReader::new(data);
115    assert_eq!(data, r.read(data.len()).unwrap());
116
117    r = ZReader::new(data);
118    // ~ consume too much
119    assert!(r.read(11).is_err());
120    // ~ validate that the reader did not advance in the previous
121    // operation
122    assert_eq!(data, r.read(10).unwrap());
123}
124
125#[test]
126fn test_read_i8() {
127    let data = &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
128    let mut r = ZReader::new(data);
129    for &i in data {
130        assert_eq!(i as i8, r.read_i8().unwrap());
131    }
132    assert!(r.read_i8().is_err());
133
134    let data = &[0xff, 0xfe];
135    let mut r = ZReader::new(data);
136    assert_eq!(-1, r.read_i8().unwrap());
137    assert_eq!(-2, r.read_i8().unwrap());
138    assert!(r.read_i8().is_err());
139}
140
141#[test]
142fn test_read_i16() {
143    let data = &[1, 2, 16, 1];
144    let mut r = ZReader::new(data);
145    assert_eq!(258, r.read_i16().unwrap());
146    assert_eq!(4097, r.read_i16().unwrap());
147    assert!(r.read_i16().is_err());
148
149    let data = &[0xff, 0xff, 0xff, 0xfe];
150    let mut r = ZReader::new(data);
151    assert_eq!(-1, r.read_i16().unwrap());
152    assert_eq!(-2, r.read_i16().unwrap());
153    assert!(r.read_i16().is_err());
154}
155
156#[test]
157fn test_read_i32() {
158    let data = &[1, 2, 3, 4];
159    let mut r = ZReader::new(data);
160    assert_eq!(16909060, r.read_i32().unwrap());
161    assert!(r.read_i32().is_err());
162
163    let data = &[0xff, 0xff, 0xff, 0xfd];
164    let mut r = ZReader::new(data);
165    assert_eq!(-3, r.read_i32().unwrap());
166    assert!(r.read_i32().is_err());
167}
168
169#[test]
170fn test_read_i64() {
171    let data = &[1, 2, 3, 4, 5, 6, 7, 8];
172    let mut r = ZReader::new(data);
173    assert_eq!(72623859790382856, r.read_i64().unwrap());
174    assert!(r.read_i64().is_err());
175
176    let data = &[0, 0, 0, 0, 0, 0, 0, 1];
177    let mut r = ZReader::new(data);
178    assert_eq!(1, r.read_i64().unwrap());
179    assert!(r.read_i64().is_err());
180
181    let data = &[0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xfd];
182    let mut r = ZReader::new(data);
183    assert_eq!(-3, r.read_i64().unwrap());
184    assert!(r.read_i64().is_err());
185}
186
187#[test]
188fn test_read_str() {
189    let data = &[
190        0u8, 5, b'h', b'e', b'l', b'l', b'o', 0u8, 7, b',', b' ', b'w', b'o', b'r', b'l', b'd',
191        255, /* a "null" string */
192        28,
193    ]; // some byte
194
195    let mut r = ZReader::new(data);
196    assert_eq!("hello", r.read_str().unwrap());
197    assert_eq!(", world", r.read_str().unwrap());
198    assert_eq!("", r.read_str().unwrap());
199    // reading the last byte (> 0) as a string is invalid as more
200    // chars would be expected
201    assert!(r.read_str().is_err());
202}
203
204/// Verify we can advance the reader while holding on to a previously
205/// returned slice/string.
206#[test]
207fn test_mutability_lifetimes() {
208    let data = &[0, 2, b'h', b'i', 0, 2, b'h', b'o'];
209
210    let mut r = ZReader::new(data);
211    let x = r.read_str().unwrap();
212    let y = r.read_str().unwrap();
213
214    assert_eq!("hi", x);
215    assert_eq!("ho", y);
216}