kafka/protocol/
fetch.rs

1//! A representation of fetched messages from Kafka.
2
3use std::borrow::Cow;
4use std::collections::HashMap;
5use std::hash::BuildHasherDefault;
6use std::io::Write;
7use std::sync::Arc;
8use std::{mem, result};
9
10use fnv::FnvHasher;
11
12use crate::codecs::ToByte;
13#[cfg(feature = "gzip")]
14use crate::compression::gzip;
15#[cfg(feature = "snappy")]
16use crate::compression::snappy::SnappyReader;
17use crate::compression::Compression;
18use crate::error::KafkaCode;
19use crate::{Error, Result};
20
21use super::to_crc;
22use super::zreader::ZReader;
23use super::{HeaderRequest, API_KEY_FETCH, API_VERSION};
24
25pub type PartitionHasher = BuildHasherDefault<FnvHasher>;
26
27#[derive(Debug)]
28pub struct FetchRequest<'a, 'b> {
29    pub header: HeaderRequest<'a>,
30    pub replica: i32,
31    pub max_wait_time: i32,
32    pub min_bytes: i32,
33    // topic -> partitions
34    pub topic_partitions: HashMap<&'b str, TopicPartitionFetchRequest>,
35}
36
37#[derive(Debug)]
38pub struct TopicPartitionFetchRequest {
39    // partition-id -> partition-data
40    pub partitions: HashMap<i32, PartitionFetchRequest, PartitionHasher>,
41}
42
43#[derive(Debug)]
44pub struct PartitionFetchRequest {
45    pub offset: i64,
46    pub max_bytes: i32,
47}
48
49impl<'a, 'b> FetchRequest<'a, 'b> {
50    pub fn new(
51        correlation_id: i32,
52        client_id: &'a str,
53        max_wait_time: i32,
54        min_bytes: i32,
55    ) -> FetchRequest<'a, 'b> {
56        FetchRequest {
57            header: HeaderRequest::new(API_KEY_FETCH, API_VERSION, correlation_id, client_id),
58            replica: -1,
59            max_wait_time,
60            min_bytes,
61            topic_partitions: HashMap::new(),
62        }
63    }
64
65    pub fn add(&mut self, topic: &'b str, partition: i32, offset: i64, max_bytes: i32) {
66        self.topic_partitions
67            .entry(topic)
68            .or_insert_with(TopicPartitionFetchRequest::new)
69            .add(partition, offset, max_bytes)
70    }
71
72    pub fn get<'d>(&'a self, topic: &'d str) -> Option<&'a TopicPartitionFetchRequest> {
73        self.topic_partitions.get(topic)
74    }
75}
76
77impl TopicPartitionFetchRequest {
78    pub fn new() -> TopicPartitionFetchRequest {
79        TopicPartitionFetchRequest {
80            partitions: HashMap::default(),
81        }
82    }
83
84    pub fn add(&mut self, partition: i32, offset: i64, max_bytes: i32) {
85        self.partitions
86            .insert(partition, PartitionFetchRequest::new(offset, max_bytes));
87    }
88
89    pub fn get(&self, partition: i32) -> Option<&PartitionFetchRequest> {
90        self.partitions.get(&partition)
91    }
92}
93
94impl PartitionFetchRequest {
95    pub fn new(offset: i64, max_bytes: i32) -> PartitionFetchRequest {
96        PartitionFetchRequest { offset, max_bytes }
97    }
98}
99
100impl<'a, 'b> ToByte for FetchRequest<'a, 'b> {
101    fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> {
102        self.header.encode(buffer)?;
103        self.replica.encode(buffer)?;
104        self.max_wait_time.encode(buffer)?;
105        self.min_bytes.encode(buffer)?;
106        // encode the hashmap as a vector
107        (self.topic_partitions.len() as i32).encode(buffer)?;
108        for (name, tp) in self.topic_partitions.iter() {
109            tp.encode(name, buffer)?;
110        }
111        Ok(())
112    }
113}
114
115impl TopicPartitionFetchRequest {
116    fn encode<W: Write>(&self, topic: &str, buffer: &mut W) -> Result<()> {
117        topic.encode(buffer)?;
118        // encode the hashmap as a vector
119        (self.partitions.len() as i32).encode(buffer)?;
120        for (&pid, p) in self.partitions.iter() {
121            p.encode(pid, buffer)?;
122        }
123        Ok(())
124    }
125}
126
127impl PartitionFetchRequest {
128    fn encode<T: Write>(&self, partition: i32, buffer: &mut T) -> Result<()> {
129        try_multi!(
130            partition.encode(buffer),
131            self.offset.encode(buffer),
132            self.max_bytes.encode(buffer)
133        )
134    }
135}
136
137// ~ response related -------------------------------------------------
138
139pub struct ResponseParser<'a, 'b, 'c> {
140    pub validate_crc: bool,
141    pub requests: Option<&'c FetchRequest<'a, 'b>>,
142}
143
144impl<'a, 'b, 'c> super::ResponseParser for ResponseParser<'a, 'b, 'c> {
145    type T = Response;
146    fn parse(&self, response: Vec<u8>) -> Result<Self::T> {
147        Response::from_vec(response, self.requests, self.validate_crc)
148    }
149}
150
151// ~ helper macro to aid parsing arrays of values (as defined by the
152// Kafka protocol.)
153macro_rules! array_of {
154    ($zreader:ident, $parse_elem:expr) => {{
155        let n_elems = $zreader.read_array_len()?;
156        let mut array = Vec::with_capacity(n_elems);
157        for _ in 0..n_elems {
158            array.push($parse_elem?);
159        }
160        array
161    }};
162}
163
164/// The result of a "fetch messages" request from a particular Kafka
165/// broker. Such a response can contain messages for multiple topic
166/// partitions.
167#[derive(Debug)]
168pub struct Response {
169    // used to "own" the data all other references of this struct
170    // point to.
171    #[allow(dead_code)]
172    raw_data: Vec<u8>,
173
174    correlation_id: i32,
175
176    // ~ Static is used here to get around the fact that we don't want
177    // Response have to a lifetime parameter as well.  The field is
178    // exposed only through an accessor which binds the exposed
179    // lifetime to the lifetime of the Response instance.
180    topics: Vec<Topic<'static>>,
181}
182
183impl Response {
184    /// Parses a Response from binary data as defined by the
185    /// Kafka Protocol.
186    fn from_vec(
187        response: Vec<u8>,
188        reqs: Option<&FetchRequest<'_, '_>>,
189        validate_crc: bool,
190    ) -> Result<Response> {
191        let slice = unsafe { mem::transmute(&response[..]) };
192        let mut r = ZReader::new(slice);
193        let correlation_id = r.read_i32()?;
194        let topics = array_of!(r, Topic::read(&mut r, reqs, validate_crc));
195        Ok(Response {
196            raw_data: response,
197            correlation_id,
198            topics,
199        })
200    }
201
202    /// Retrieves the id corresponding to the fetch messages request
203    /// (provided for debugging purposes only).
204    #[inline]
205    pub fn correlation_id(&self) -> i32 {
206        self.correlation_id
207    }
208
209    /// Provides an iterator over all the topics and the fetched data
210    /// relative to these topics.
211    #[inline]
212    pub fn topics<'a>(&'a self) -> &[Topic<'a>] {
213        &self.topics
214    }
215}
216
217/// The result of a "fetch messages" request from a particular Kafka
218/// broker for a single topic only.  Beside the name of the topic,
219/// this structure provides an iterator over the topic partitions from
220/// which messages were requested.
221#[derive(Debug)]
222pub struct Topic<'a> {
223    topic: &'a str,
224    partitions: Vec<Partition<'a>>,
225}
226
227impl<'a> Topic<'a> {
228    fn read(
229        r: &mut ZReader<'a>,
230        reqs: Option<&FetchRequest<'_, '_>>,
231        validate_crc: bool,
232    ) -> Result<Topic<'a>> {
233        let name = r.read_str()?;
234        let preqs = reqs.and_then(|reqs| reqs.get(name));
235        let partitions = array_of!(r, Partition::read(r, preqs, validate_crc));
236        Ok(Topic {
237            topic: name,
238            partitions,
239        })
240    }
241
242    /// Retrieves the identifier/name of the represented topic.
243    #[inline]
244    pub fn topic(&self) -> &'a str {
245        self.topic
246    }
247
248    /// Provides an iterator over all the partitions of this topic for
249    /// which messages were requested.
250    #[inline]
251    pub fn partitions(&self) -> &[Partition<'a>] {
252        &self.partitions
253    }
254}
255
256/// The result of a "fetch messages" request from a particular Kafka
257/// broker for a single topic partition only.  Beside the partition
258/// identifier, this structure provides an iterator over the actually
259/// requested message data.
260///
261/// Note: There might have been a (recoverable) error for a particular
262/// partition (but not for another).
263#[derive(Debug)]
264pub struct Partition<'a> {
265    /// The identifier of the represented partition.
266    partition: i32,
267
268    /// The partition data.
269    data: result::Result<Data<'a>, Arc<Error>>,
270}
271
272impl<'a> Partition<'a> {
273    fn read(
274        r: &mut ZReader<'a>,
275        preqs: Option<&TopicPartitionFetchRequest>,
276        validate_crc: bool,
277    ) -> Result<Partition<'a>> {
278        let partition = r.read_i32()?;
279        let proffs = preqs
280            .and_then(|preqs| preqs.get(partition))
281            .map(|preq| preq.offset)
282            .unwrap_or(0);
283
284        let err = Error::from_protocol(r.read_i16()?);
285        // we need to parse the rest even if there was an error to
286        // consume the input stream (zreader)
287        let highwatermark = r.read_i64()?;
288        let msgset = MessageSet::from_slice(r.read_bytes()?, proffs, validate_crc)?;
289
290        Ok(Partition {
291            partition,
292            data: match err {
293                Some(err) => Err(Arc::new(err)),
294                None => Ok(Data {
295                    highwatermark_offset: highwatermark,
296                    message_set: msgset,
297                }),
298            },
299        })
300    }
301
302    /// Retrieves the identifier of the represented partition.
303    #[inline]
304    pub fn partition(&self) -> i32 {
305        self.partition
306    }
307
308    /// Retrieves the data payload for this partition.
309    pub fn data(&'a self) -> result::Result<&'a Data<'a>, Arc<Error>> {
310        match self.data.as_ref() {
311            Ok(data) => Ok(data),
312            Err(err) => Err(err.clone()),
313        }
314    }
315}
316
317/// The successfully fetched data payload for a particular partition.
318#[derive(Debug)]
319pub struct Data<'a> {
320    highwatermark_offset: i64,
321    message_set: MessageSet<'a>,
322}
323
324impl<'a> Data<'a> {
325    /// Retrieves the so-called "high water mark offset" indicating
326    /// the "latest" offset for this partition at the remote broker.
327    /// This can be used by clients to find out how much behind the
328    /// latest available message they are.
329    #[inline]
330    pub fn highwatermark_offset(&self) -> i64 {
331        self.highwatermark_offset
332    }
333
334    /// Retrieves the fetched message data for this partition.
335    #[inline]
336    pub fn messages(&self) -> &[Message<'a>] {
337        &self.message_set.messages
338    }
339}
340
341#[derive(Debug)]
342struct MessageSet<'a> {
343    #[allow(dead_code)]
344    raw_data: Cow<'a, [u8]>, // ~ this field is used to potentially "own" the underlying vector
345    messages: Vec<Message<'a>>,
346}
347
348/// A fetched message from a remote Kafka broker for a particular
349/// topic partition.
350#[derive(Debug)]
351pub struct Message<'a> {
352    /// The offset at which this message resides in the remote kafka
353    /// broker topic partition.
354    pub offset: i64,
355
356    /// The "key" data of this message.  Empty if there is no such
357    /// data for this message.
358    pub key: &'a [u8],
359
360    /// The value data of this message.  Empty if there is no such
361    /// data for this message.
362    pub value: &'a [u8],
363}
364
365impl<'a> MessageSet<'a> {
366    #[allow(dead_code)]
367    fn from_vec(data: Vec<u8>, req_offset: i64, validate_crc: bool) -> Result<MessageSet<'a>> {
368        // since we're going to keep the original
369        // uncompressed vector around without
370        // further modifying it and providing
371        // publicly no mutability possibilities
372        // this is safe
373        let ms = MessageSet::from_slice(
374            unsafe { mem::transmute(&data[..]) },
375            req_offset,
376            validate_crc,
377        )?;
378        return Ok(MessageSet {
379            raw_data: Cow::Owned(data),
380            messages: ms.messages,
381        });
382    }
383
384    fn from_slice(raw_data: &[u8], req_offset: i64, validate_crc: bool) -> Result<MessageSet<'_>> {
385        let mut r = ZReader::new(raw_data);
386        let mut msgs = Vec::new();
387        while !r.is_empty() {
388            match MessageSet::next_message(&mut r, validate_crc) {
389                // this is the last messages which might be
390                // incomplete; a valid case to be handled by
391                // consumers
392                Err(Error::UnexpectedEOF) => {
393                    break;
394                }
395                Err(e) => {
396                    return Err(e);
397                }
398                Ok((offset, pmsg)) => {
399                    // handle compression (denoted by the last 3 bits
400                    // of the attr field)
401                    match pmsg.attr & 0x07 {
402                        c if c == Compression::NONE as i8 => {
403                            // skip messages with a lower offset
404                            // than the request one
405                            if offset >= req_offset {
406                                msgs.push(Message {
407                                    offset,
408                                    key: pmsg.key,
409                                    value: pmsg.value,
410                                });
411                            }
412                        }
413                        // XXX handle recursive compression in future
414                        #[cfg(feature = "gzip")]
415                        c if c == Compression::GZIP as i8 => {
416                            let v = gzip::uncompress(pmsg.value)?;
417                            return MessageSet::from_vec(v, req_offset, validate_crc);
418                        }
419                        #[cfg(feature = "snappy")]
420                        c if c == Compression::SNAPPY as i8 => {
421                            use std::io::Read;
422                            let mut v = Vec::new();
423                            SnappyReader::new(pmsg.value)?.read_to_end(&mut v)?;
424                            return MessageSet::from_vec(v, req_offset, validate_crc);
425                        }
426                        _ => return Err(Error::UnsupportedCompression),
427                    }
428                }
429            };
430        }
431        Ok(MessageSet {
432            raw_data: Cow::Borrowed(raw_data),
433            messages: msgs,
434        })
435    }
436
437    fn next_message<'b>(
438        r: &mut ZReader<'b>,
439        validate_crc: bool,
440    ) -> Result<(i64, ProtocolMessage<'b>)> {
441        let offset = r.read_i64()?;
442        let msg_data = r.read_bytes()?;
443        Ok((offset, ProtocolMessage::from_slice(msg_data, validate_crc)?))
444    }
445}
446
447/// Represents a messages exactly as defined in the protocol.
448struct ProtocolMessage<'a> {
449    attr: i8,
450    key: &'a [u8],
451    value: &'a [u8],
452}
453
454impl<'a> ProtocolMessage<'a> {
455    /// Parses a raw message from the given byte slice.  Does _not_
456    /// handle any compression.
457    fn from_slice(raw_data: &[u8], validate_crc: bool) -> Result<ProtocolMessage<'_>> {
458        let mut r = ZReader::new(raw_data);
459
460        // ~ optionally validate the crc checksum
461        let msg_crc = r.read_i32()?;
462        if validate_crc && to_crc(r.rest()) as i32 != msg_crc {
463            return Err(Error::Kafka(KafkaCode::CorruptMessage));
464        }
465        // ~ we support parsing only messages with the "zero"
466        // magic_byte; this covers kafka 0.8 and 0.9.
467        let msg_magic = r.read_i8()?;
468        if msg_magic != 0 {
469            return Err(Error::UnsupportedProtocol);
470        }
471        let msg_attr = r.read_i8()?;
472        let msg_key = r.read_bytes()?;
473        let msg_val = r.read_bytes()?;
474
475        debug_assert!(r.is_empty());
476
477        Ok(ProtocolMessage {
478            attr: msg_attr,
479            key: msg_key,
480            value: msg_val,
481        })
482    }
483}
484
485// tests --------------------------------------------------------------
486
487#[cfg(test)]
488mod tests {
489    use std::str;
490
491    use super::{FetchRequest, Message, Response};
492    use crate::error::{Error, KafkaCode};
493
494    static FETCH1_TXT: &str = include_str!("../../test-data/fetch1.txt");
495
496    #[rustfmt::skip]
497    static FETCH1_FETCH_RESPONSE_NOCOMPRESSION_K0821: &[u8] =
498        include_bytes!("../../test-data/fetch1.mytopic.1p.nocompression.kafka.0821");
499
500    #[rustfmt::skip]
501    static FETCH1_FETCH_RESPONSE_SNAPPY_K0821: &[u8] =
502        include_bytes!("../../test-data/fetch1.mytopic.1p.snappy.kafka.0821");
503
504    #[cfg(feature = "snappy")]
505    #[rustfmt::skip]
506    static FETCH1_FETCH_RESPONSE_SNAPPY_K0822: &[u8] =
507        include_bytes!("../../test-data/fetch1.mytopic.1p.snappy.kafka.0822");
508
509    #[cfg(feature = "gzip")]
510    #[rustfmt::skip]
511    static FETCH1_FETCH_RESPONSE_GZIP_K0821: &[u8] =
512        include_bytes!("../../test-data/fetch1.mytopic.1p.gzip.kafka.0821");
513
514    static FETCH2_TXT: &str = include_str!("../../test-data/fetch2.txt");
515
516    #[rustfmt::skip]
517    static FETCH2_FETCH_RESPONSE_NOCOMPRESSION_K0900: &[u8] =
518        include_bytes!("../../test-data/fetch2.mytopic.nocompression.kafka.0900");
519
520    #[rustfmt::skip]
521    static FETCH2_FETCH_RESPONSE_NOCOMPRESSION_INVALID_CRC_K0900: &[u8] =
522        include_bytes!("../../test-data/fetch2.mytopic.nocompression.invalid_crc.kafka.0900");
523
524    fn into_messages(r: &Response) -> Vec<&Message<'_>> {
525        let mut all_msgs = Vec::new();
526        for t in r.topics() {
527            for p in t.partitions() {
528                let data = p.data().unwrap();
529                all_msgs.extend(data.messages());
530            }
531        }
532        all_msgs
533    }
534
535    fn test_decode_new_fetch_response(
536        msg_per_line: &str,
537        response: Vec<u8>,
538        requests: Option<&FetchRequest<'_, '_>>,
539        validate_crc: bool,
540    ) {
541        let resp = Response::from_vec(response, requests, validate_crc);
542        let resp = resp.unwrap();
543
544        let original: Vec<_> = msg_per_line.lines().collect();
545
546        // ~ response for exactly one topic expected
547        assert_eq!(1, resp.topics.len());
548        // ~ topic name
549        assert_eq!("my-topic", resp.topics[0].topic);
550        // ~ exactly one partition
551        assert_eq!(1, resp.topics[0].partitions.len());
552        // ~ the first partition
553        assert_eq!(0, resp.topics[0].partitions[0].partition);
554        // ~ no error
555        // assert!(resp.topics[0].partitions[0].data.is_ok());
556
557        let msgs = into_messages(&resp);
558        assert_eq!(original.len(), msgs.len());
559        for (msg, orig) in msgs.into_iter().zip(original.iter()) {
560            assert_eq!(str::from_utf8(msg.value).unwrap(), *orig);
561        }
562    }
563
564    fn skip_lines(mut lines: &str, mut n: usize) -> &str {
565        while n > 0 {
566            n -= 1;
567            lines = &lines[lines.find('\n').map(|i| i + 1).unwrap_or(0)..];
568        }
569        lines
570    }
571
572    #[test]
573    fn test_from_slice_nocompression_k0821() {
574        let mut req = FetchRequest::new(0, "test", -1, -1);
575        req.add("my-topic", 0, 0, -1);
576        req.add("foo-quux", 0, 100, -1);
577        test_decode_new_fetch_response(
578            FETCH1_TXT,
579            FETCH1_FETCH_RESPONSE_NOCOMPRESSION_K0821.to_owned(),
580            Some(&req),
581            false,
582        );
583
584        // ~ pretend we asked for messages as of offset five (while
585        // the server delivered the zero-offset message as well)
586        req = FetchRequest::new(0, "test", -1, -1);
587        req.add("my-topic", 0, 5, -1);
588        test_decode_new_fetch_response(
589            skip_lines(FETCH1_TXT, 5),
590            FETCH1_FETCH_RESPONSE_NOCOMPRESSION_K0821.to_owned(),
591            Some(&req),
592            false,
593        );
594    }
595
596    // verify we don't crash but cleanly fail and report we don't
597    // support the compression
598    #[cfg(not(feature = "snappy"))]
599    #[test]
600    fn test_unsupported_compression_snappy() {
601        let mut req = FetchRequest::new(0, "test", -1, -1);
602        req.add("my-topic", 0, 0, -1);
603        let r = Response::from_vec(
604            FETCH1_FETCH_RESPONSE_SNAPPY_K0821.to_owned(),
605            Some(&req),
606            false,
607        );
608        assert!(match r {
609            return Err(Error::UnsupportedCompression) => true,
610            _ => false,
611        });
612    }
613
614    #[cfg(feature = "snappy")]
615    #[test]
616    fn test_from_slice_snappy_k0821() {
617        let mut req = FetchRequest::new(0, "test", -1, -1);
618        req.add("my-topic", 0, 0, -1);
619        test_decode_new_fetch_response(
620            FETCH1_TXT,
621            FETCH1_FETCH_RESPONSE_SNAPPY_K0821.to_owned(),
622            Some(&req),
623            false,
624        );
625
626        // ~ pretend we asked for messages as of offset three (while
627        // the server delivered the zero-offset message as well)
628        req = FetchRequest::new(0, "test", -1, -1);
629        req.add("my-topic", 0, 3, -1);
630        test_decode_new_fetch_response(
631            skip_lines(FETCH1_TXT, 3),
632            FETCH1_FETCH_RESPONSE_SNAPPY_K0821.to_owned(),
633            Some(&req),
634            false,
635        );
636    }
637
638    #[cfg(feature = "snappy")]
639    #[test]
640    fn test_from_slice_snappy_k0822() {
641        let mut req = FetchRequest::new(0, "test", -1, -1);
642        req.add("my-topic", 0, 0, -1);
643        test_decode_new_fetch_response(
644            FETCH1_TXT,
645            FETCH1_FETCH_RESPONSE_SNAPPY_K0822.to_owned(),
646            Some(&req),
647            false,
648        );
649    }
650
651    #[cfg(feature = "gzip")]
652    #[test]
653    fn test_from_slice_gzip_k0821() {
654        let mut req = FetchRequest::new(0, "test", -1, -1);
655        req.add("my-topic", 0, 0, -1);
656        test_decode_new_fetch_response(
657            FETCH1_TXT,
658            FETCH1_FETCH_RESPONSE_GZIP_K0821.to_owned(),
659            Some(&req),
660            false,
661        );
662
663        // ~ pretend we asked for messages as of offset one (while the
664        // server delivered the zero-offset message as well)
665        req.add("my-topic", 0, 1, -1);
666        test_decode_new_fetch_response(
667            skip_lines(FETCH1_TXT, 1),
668            FETCH1_FETCH_RESPONSE_GZIP_K0821.to_owned(),
669            Some(&req),
670            false,
671        );
672
673        // ~ pretend we asked for messages as of offset ten (while the
674        // server delivered the zero-offset message as well)
675        req.add("my-topic", 0, 10, -1);
676        test_decode_new_fetch_response(
677            skip_lines(FETCH1_TXT, 10),
678            FETCH1_FETCH_RESPONSE_GZIP_K0821.to_owned(),
679            Some(&req),
680            false,
681        );
682    }
683
684    #[test]
685    fn test_crc_validation() {
686        test_decode_new_fetch_response(
687            FETCH2_TXT,
688            FETCH2_FETCH_RESPONSE_NOCOMPRESSION_K0900.to_owned(),
689            None,
690            true,
691        );
692
693        // now test the same message but with an invalid crc checksum
694        // (modified by hand)
695        // 1) without checking the crc ... since only the crc field is
696        // artificially falsified ... we expect the rest of the
697        // message to be parsed correctly
698        test_decode_new_fetch_response(
699            FETCH2_TXT,
700            FETCH2_FETCH_RESPONSE_NOCOMPRESSION_INVALID_CRC_K0900.to_owned(),
701            None,
702            false,
703        );
704        // 2) with checking the crc ... parsing should fail immediately
705        match Response::from_vec(
706            FETCH2_FETCH_RESPONSE_NOCOMPRESSION_INVALID_CRC_K0900.to_owned(),
707            None,
708            true,
709        ) {
710            Ok(_) => panic!("Expected error, but got successful response!"),
711            Err(Error::Kafka(KafkaCode::CorruptMessage)) => {}
712            Err(e) => panic!("Expected KafkaCode::CorruptMessage error, but got: {:?}", e),
713        }
714    }
715
716    #[cfg(feature = "nightly")]
717    mod benches {
718        use test::{black_box, Bencher};
719
720        use super::super::{FetchRequest, Response};
721        use super::into_messages;
722
723        fn bench_decode_new_fetch_response(b: &mut Bencher, data: Vec<u8>, validate_crc: bool) {
724            let mut reqs = FetchRequest::new(0, "foo", -1, -1);
725            reqs.add("my-topic", 0, 0, -1);
726            b.bytes = data.len() as u64;
727            b.iter(|| {
728                let data = data.clone();
729                let r = black_box(Response::from_vec(data, Some(&reqs), validate_crc).unwrap());
730                let v = black_box(into_messages(&r));
731                v.len()
732            });
733        }
734
735        #[bench]
736        fn bench_decode_new_fetch_response_nocompression_k0821(b: &mut Bencher) {
737            bench_decode_new_fetch_response(
738                b,
739                super::FETCH1_FETCH_RESPONSE_NOCOMPRESSION_K0821.to_owned(),
740                false,
741            )
742        }
743
744        #[cfg(feature = "snappy")]
745        #[bench]
746        fn bench_decode_new_fetch_response_snappy_k0821(b: &mut Bencher) {
747            bench_decode_new_fetch_response(
748                b,
749                super::FETCH1_FETCH_RESPONSE_SNAPPY_K0821.to_owned(),
750                false,
751            )
752        }
753
754        #[cfg(feature = "snappy")]
755        #[bench]
756        fn bench_decode_new_fetch_response_snappy_k0822(b: &mut Bencher) {
757            bench_decode_new_fetch_response(
758                b,
759                super::FETCH1_FETCH_RESPONSE_SNAPPY_K0822.to_owned(),
760                false,
761            )
762        }
763
764        #[cfg(feature = "gzip")]
765        #[bench]
766        fn bench_decode_new_fetch_response_gzip_k0821(b: &mut Bencher) {
767            bench_decode_new_fetch_response(
768                b,
769                super::FETCH1_FETCH_RESPONSE_GZIP_K0821.to_owned(),
770                false,
771            )
772        }
773
774        #[bench]
775        fn bench_decode_new_fetch_response_nocompression_k0821_validate_crc(b: &mut Bencher) {
776            bench_decode_new_fetch_response(
777                b,
778                super::FETCH1_FETCH_RESPONSE_NOCOMPRESSION_K0821.to_owned(),
779                true,
780            )
781        }
782
783        #[cfg(feature = "snappy")]
784        #[bench]
785        fn bench_decode_new_fetch_response_snappy_k0821_validate_crc(b: &mut Bencher) {
786            bench_decode_new_fetch_response(
787                b,
788                super::FETCH1_FETCH_RESPONSE_SNAPPY_K0821.to_owned(),
789                true,
790            )
791        }
792
793        #[cfg(feature = "snappy")]
794        #[bench]
795        fn bench_decode_new_fetch_response_snappy_k0822_validate_crc(b: &mut Bencher) {
796            bench_decode_new_fetch_response(
797                b,
798                super::FETCH1_FETCH_RESPONSE_SNAPPY_K0822.to_owned(),
799                true,
800            )
801        }
802
803        #[cfg(feature = "gzip")]
804        #[bench]
805        fn bench_decode_new_fetch_response_gzip_k0821_validate_crc(b: &mut Bencher) {
806            bench_decode_new_fetch_response(
807                b,
808                super::FETCH1_FETCH_RESPONSE_GZIP_K0821.to_owned(),
809                true,
810            )
811        }
812    }
813}