kafka/
codecs.rs

1use std::default::Default;
2use std::io::{Read, Write};
3
4use crate::error::{Error, Result};
5use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
6
7// Helper macro to safely convert an usize expression into a signed
8// integer.  If the conversion is not possible the macro issues a
9// `CodecError`, otherwise returns the expression
10// in the requested target type.
11macro_rules! try_usize_to_int {
12    // ~ $ttype should actually be a 'ty' ... but rust complains for
13    // some reason :/
14    ($value:expr, $ttype:ident) => {{
15        let maxv = $ttype::max_value();
16        let x: usize = $value;
17        if (x as u64) <= (maxv as u64) {
18            x as $ttype
19        } else {
20            return Err(Error::CodecError);
21        }
22    }};
23}
24
25pub trait ToByte {
26    fn encode<T: Write>(&self, buffer: &mut T) -> Result<()>;
27}
28
29impl<'a, T: ToByte + 'a + ?Sized> ToByte for &'a T {
30    fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> {
31        (*self).encode(buffer)
32    }
33}
34
35impl ToByte for i8 {
36    fn encode<T: Write>(&self, buffer: &mut T) -> Result<()> {
37        buffer.write_i8(*self).map_err(From::from)
38    }
39}
40
41impl ToByte for i16 {
42    fn encode<T: Write>(&self, buffer: &mut T) -> Result<()> {
43        buffer.write_i16::<BigEndian>(*self).map_err(From::from)
44    }
45}
46
47impl ToByte for i32 {
48    fn encode<T: Write>(&self, buffer: &mut T) -> Result<()> {
49        buffer.write_i32::<BigEndian>(*self).map_err(From::from)
50    }
51}
52
53impl ToByte for i64 {
54    fn encode<T: Write>(&self, buffer: &mut T) -> Result<()> {
55        buffer.write_i64::<BigEndian>(*self).map_err(From::from)
56    }
57}
58
59impl ToByte for str {
60    fn encode<T: Write>(&self, buffer: &mut T) -> Result<()> {
61        let l = try_usize_to_int!(self.len(), i16);
62        buffer.write_i16::<BigEndian>(l)?;
63        buffer.write_all(self.as_bytes()).map_err(From::from)
64    }
65}
66
67#[test]
68fn test_string_too_long() {
69    use std::str;
70
71    let s = vec![b'a'; i16::max_value() as usize + 1];
72    let s = unsafe { str::from_utf8_unchecked(&s) };
73    let mut buf = Vec::new();
74    assert!(s.encode(&mut buf).is_err());
75    assert!(buf.is_empty());
76}
77
78impl<V: ToByte> ToByte for [V] {
79    fn encode<T: Write>(&self, buffer: &mut T) -> Result<()> {
80        encode_as_array(buffer, self, |buffer, x| x.encode(buffer))
81    }
82}
83
84impl ToByte for [u8] {
85    fn encode<T: Write>(&self, buffer: &mut T) -> Result<()> {
86        let l = try_usize_to_int!(self.len(), i32);
87        buffer.write_i32::<BigEndian>(l)?;
88        buffer.write_all(self).map_err(From::from)
89    }
90}
91
92// ~ this allows to render a slice of various types (typically &str
93// and String) as strings
94pub struct AsStrings<'a, T>(pub &'a [T]);
95
96impl<'a, T: AsRef<str> + 'a> ToByte for AsStrings<'a, T> {
97    fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> {
98        encode_as_array(buffer, self.0, |buffer, x| x.as_ref().encode(buffer))
99    }
100}
101
102/// ~ Renders the length of `xs` to `buffer` as the start of a
103/// protocol array and then for each element of `xs` invokes `f`
104/// assuming that function will render the element to the buffer.
105pub fn encode_as_array<T, F, W>(buffer: &mut W, xs: &[T], mut f: F) -> Result<()>
106where
107    F: FnMut(&mut W, &T) -> Result<()>,
108    W: Write,
109{
110    let l = try_usize_to_int!(xs.len(), i32);
111    buffer.write_i32::<BigEndian>(l)?;
112    for x in xs {
113        f(buffer, x)?;
114    }
115    Ok(())
116}
117
118// --------------------------------------------------------------------
119
120pub trait FromByte {
121    type R: Default + FromByte;
122
123    fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()>;
124    fn decode_new<T: Read>(buffer: &mut T) -> Result<Self::R> {
125        let mut temp: Self::R = Default::default();
126        match temp.decode(buffer) {
127            Ok(_) => Ok(temp),
128            Err(e) => Err(e),
129        }
130    }
131}
132
133macro_rules! dec_helper {
134    ($val: expr, $dest:expr) => {{
135        match $val {
136            Ok(val) => {
137                *$dest = val;
138                Ok(())
139            }
140            Err(e) => Err(From::from(e)),
141        }
142    }};
143}
144macro_rules! decode {
145    ($src:expr, $dest:expr) => {{
146        dec_helper!($src.read_i8(), $dest)
147    }};
148    ($src:expr, $method:ident, $dest:expr) => {{
149        dec_helper!($src.$method::<BigEndian>(), $dest)
150    }};
151}
152
153impl FromByte for i8 {
154    type R = i8;
155
156    fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
157        decode!(buffer, self)
158    }
159}
160
161impl FromByte for i16 {
162    type R = i16;
163
164    fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
165        decode!(buffer, read_i16, self)
166    }
167}
168
169impl FromByte for i32 {
170    type R = i32;
171
172    fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
173        decode!(buffer, read_i32, self)
174    }
175}
176
177impl FromByte for i64 {
178    type R = i64;
179    fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
180        decode!(buffer, read_i64, self)
181    }
182}
183
184impl FromByte for String {
185    type R = String;
186    fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
187        let mut length: i16 = 0;
188        if let Err(e) = decode!(buffer, read_i16, &mut length) {
189            return Err(e);
190        }
191        if length <= 0 {
192            return Ok(());
193        }
194        self.reserve(length as usize);
195        let _ = buffer.take(length as u64).read_to_string(self);
196        if self.len() != length as usize {
197            return Err(Error::UnexpectedEOF);
198        }
199        Ok(())
200    }
201}
202
203impl<V: FromByte + Default> FromByte for Vec<V> {
204    type R = Vec<V>;
205
206    fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
207        let mut length: i32 = 0;
208        if let Err(e) = decode!(buffer, read_i32, &mut length) {
209            return Err(e);
210        }
211        if length <= 0 {
212            return Ok(());
213        }
214        self.reserve(length as usize);
215        for _ in 0..length {
216            let mut e: V = Default::default();
217            e.decode(buffer)?;
218            self.push(e);
219        }
220        Ok(())
221    }
222}
223
224impl FromByte for Vec<u8> {
225    type R = Vec<u8>;
226
227    fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
228        let mut length: i32 = 0;
229        match decode!(buffer, read_i32, &mut length) {
230            Ok(_) => {}
231            Err(e) => return Err(e),
232        }
233        if length <= 0 {
234            return Ok(());
235        }
236        self.reserve(length as usize);
237        match buffer.take(length as u64).read_to_end(self) {
238            Ok(size) => {
239                if size < length as usize {
240                    Err(Error::UnexpectedEOF)
241                } else {
242                    Ok(())
243                }
244            }
245            Err(e) => Err(From::from(e)),
246        }
247    }
248}
249
250#[test]
251fn codec_i8() {
252    use std::io::Cursor;
253    let mut buf = vec![];
254    let orig: i8 = 5;
255
256    // Encode into buffer
257    orig.encode(&mut buf).unwrap();
258    assert_eq!(buf, [5]);
259
260    // Read from buffer into existing variable
261    let mut dec1: i8 = 0;
262    dec1.decode(&mut Cursor::new(&buf[..])).unwrap();
263    assert_eq!(dec1, orig);
264
265    // Read from buffer into new variable
266    let dec2 = i8::decode_new(&mut Cursor::new(&buf[..])).unwrap();
267    assert_eq!(dec2, orig);
268}
269
270#[test]
271fn codec_i16() {
272    use std::io::Cursor;
273    let mut buf = vec![];
274    let orig: i16 = 5;
275
276    // Encode into buffer
277    orig.encode(&mut buf).unwrap();
278    assert_eq!(buf, [0, 5]);
279
280    // Read from buffer into existing variable
281    let mut dec1: i16 = 0;
282    dec1.decode(&mut Cursor::new(&buf[..])).unwrap();
283    assert_eq!(dec1, orig);
284
285    // Read from buffer into new variable
286    let dec2 = i16::decode_new(&mut Cursor::new(&buf[..])).unwrap();
287    assert_eq!(dec2, orig);
288}
289
290#[test]
291fn codec_32() {
292    use std::io::Cursor;
293    let mut buf = vec![];
294    let orig: i32 = 5;
295
296    // Encode into buffer
297    orig.encode(&mut buf).unwrap();
298    assert_eq!(buf, [0, 0, 0, 5]);
299
300    // Read from buffer into existing variable
301    let mut dec1: i32 = 0;
302    dec1.decode(&mut Cursor::new(&buf[..])).unwrap();
303    assert_eq!(dec1, orig);
304
305    // Read from buffer into new variable
306    let dec2 = i32::decode_new(&mut Cursor::new(&buf[..])).unwrap();
307    assert_eq!(dec2, orig);
308}
309
310#[test]
311fn codec_i64() {
312    use std::io::Cursor;
313    let mut buf = vec![];
314    let orig: i64 = 5;
315
316    // Encode into buffer
317    orig.encode(&mut buf).unwrap();
318    assert_eq!(buf, [0, 0, 0, 0, 0, 0, 0, 5]);
319
320    // Read from buffer into existing variable
321    let mut dec1: i64 = 0;
322    dec1.decode(&mut Cursor::new(&buf[..])).unwrap();
323    assert_eq!(dec1, orig);
324
325    // Read from buffer into new variable
326    let dec2 = i64::decode_new(&mut Cursor::new(&buf[..])).unwrap();
327    assert_eq!(dec2, orig);
328}
329
330#[test]
331fn codec_string() {
332    use std::io::Cursor;
333    let mut buf = vec![];
334    let orig = "test".to_owned();
335
336    // Encode into buffer
337    orig.encode(&mut buf).unwrap();
338    assert_eq!(buf, [0, 4, 116, 101, 115, 116]);
339
340    // Read from buffer into existing variable
341    let mut dec1 = String::new();
342    dec1.decode(&mut Cursor::new(&buf[..])).unwrap();
343    assert_eq!(dec1, orig);
344
345    // Read from buffer into new variable
346    let dec2 = String::decode_new(&mut Cursor::new(&buf[..])).unwrap();
347    assert_eq!(dec2, orig);
348}
349
350#[test]
351fn codec_vec_u8() {
352    use std::io::Cursor;
353    let mut buf = vec![];
354    let orig: Vec<u8> = vec![1, 2, 3];
355
356    // Encode into buffer
357    orig.encode(&mut buf).unwrap();
358    assert_eq!(buf, [0, 0, 0, 3, 1, 2, 3]);
359
360    // Read from buffer into existing variable
361    let mut dec1: Vec<u8> = vec![];
362    dec1.decode(&mut Cursor::new(&buf[..])).unwrap();
363    assert_eq!(dec1, orig);
364
365    // Read from buffer into new variable
366    let dec2 = Vec::<u8>::decode_new(&mut Cursor::new(&buf[..])).unwrap();
367    assert_eq!(dec2, orig);
368}
369
370#[test]
371fn codec_as_strings() {
372    macro_rules! enc_dec_cmp {
373        ($orig:expr) => {{
374            use std::io::Cursor;
375
376            let orig = $orig;
377
378            // Encode into buffer
379            let mut buf = Vec::new();
380            AsStrings(&orig).encode(&mut buf).unwrap();
381            assert_eq!(
382                buf,
383                [0, 0, 0, 2, 0, 3, b'a', b'b', b'c', 0, 4, b'd', b'e', b'f', b'g']
384            );
385
386            // Decode from buffer into existing value
387            {
388                let mut dec: Vec<String> = Vec::new();
389                dec.decode(&mut Cursor::new(&buf[..])).unwrap();
390                assert_eq!(dec, orig);
391            }
392
393            // Read from buffer into new variable
394            {
395                let dec = Vec::<String>::decode_new(&mut Cursor::new(&buf[..])).unwrap();
396                assert_eq!(dec, orig);
397            }
398        }};
399    }
400
401    {
402        // slice of &str
403        let orig: &[&str] = &["abc", "defg"];
404        enc_dec_cmp!(orig);
405    }
406
407    {
408        // vec of &str
409        let orig: Vec<&str> = vec!["abc", "defg"];
410        enc_dec_cmp!(orig);
411    }
412
413    {
414        // vec of String
415        let orig: Vec<String> = vec!["abc".to_owned(), "defg".to_owned()];
416        enc_dec_cmp!(orig);
417    }
418}