1use std::borrow::Cow;
4use std::collections::HashMap;
5use std::hash::BuildHasherDefault;
6use std::io::Write;
7use std::sync::Arc;
8use std::{mem, result};
9
10use fnv::FnvHasher;
11
12use crate::codecs::ToByte;
13#[cfg(feature = "gzip")]
14use crate::compression::gzip;
15#[cfg(feature = "snappy")]
16use crate::compression::snappy::SnappyReader;
17use crate::compression::Compression;
18use crate::error::KafkaCode;
19use crate::{Error, Result};
20
21use super::to_crc;
22use super::zreader::ZReader;
23use super::{HeaderRequest, API_KEY_FETCH, API_VERSION};
24
25pub type PartitionHasher = BuildHasherDefault<FnvHasher>;
26
27#[derive(Debug)]
28pub struct FetchRequest<'a, 'b> {
29 pub header: HeaderRequest<'a>,
30 pub replica: i32,
31 pub max_wait_time: i32,
32 pub min_bytes: i32,
33 pub topic_partitions: HashMap<&'b str, TopicPartitionFetchRequest>,
35}
36
37#[derive(Debug)]
38pub struct TopicPartitionFetchRequest {
39 pub partitions: HashMap<i32, PartitionFetchRequest, PartitionHasher>,
41}
42
43#[derive(Debug)]
44pub struct PartitionFetchRequest {
45 pub offset: i64,
46 pub max_bytes: i32,
47}
48
49impl<'a, 'b> FetchRequest<'a, 'b> {
50 pub fn new(
51 correlation_id: i32,
52 client_id: &'a str,
53 max_wait_time: i32,
54 min_bytes: i32,
55 ) -> FetchRequest<'a, 'b> {
56 FetchRequest {
57 header: HeaderRequest::new(API_KEY_FETCH, API_VERSION, correlation_id, client_id),
58 replica: -1,
59 max_wait_time,
60 min_bytes,
61 topic_partitions: HashMap::new(),
62 }
63 }
64
65 pub fn add(&mut self, topic: &'b str, partition: i32, offset: i64, max_bytes: i32) {
66 self.topic_partitions
67 .entry(topic)
68 .or_insert_with(TopicPartitionFetchRequest::new)
69 .add(partition, offset, max_bytes)
70 }
71
72 pub fn get<'d>(&'a self, topic: &'d str) -> Option<&'a TopicPartitionFetchRequest> {
73 self.topic_partitions.get(topic)
74 }
75}
76
77impl TopicPartitionFetchRequest {
78 pub fn new() -> TopicPartitionFetchRequest {
79 TopicPartitionFetchRequest {
80 partitions: HashMap::default(),
81 }
82 }
83
84 pub fn add(&mut self, partition: i32, offset: i64, max_bytes: i32) {
85 self.partitions
86 .insert(partition, PartitionFetchRequest::new(offset, max_bytes));
87 }
88
89 pub fn get(&self, partition: i32) -> Option<&PartitionFetchRequest> {
90 self.partitions.get(&partition)
91 }
92}
93
94impl PartitionFetchRequest {
95 pub fn new(offset: i64, max_bytes: i32) -> PartitionFetchRequest {
96 PartitionFetchRequest { offset, max_bytes }
97 }
98}
99
100impl<'a, 'b> ToByte for FetchRequest<'a, 'b> {
101 fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> {
102 self.header.encode(buffer)?;
103 self.replica.encode(buffer)?;
104 self.max_wait_time.encode(buffer)?;
105 self.min_bytes.encode(buffer)?;
106 (self.topic_partitions.len() as i32).encode(buffer)?;
108 for (name, tp) in self.topic_partitions.iter() {
109 tp.encode(name, buffer)?;
110 }
111 Ok(())
112 }
113}
114
115impl TopicPartitionFetchRequest {
116 fn encode<W: Write>(&self, topic: &str, buffer: &mut W) -> Result<()> {
117 topic.encode(buffer)?;
118 (self.partitions.len() as i32).encode(buffer)?;
120 for (&pid, p) in self.partitions.iter() {
121 p.encode(pid, buffer)?;
122 }
123 Ok(())
124 }
125}
126
127impl PartitionFetchRequest {
128 fn encode<T: Write>(&self, partition: i32, buffer: &mut T) -> Result<()> {
129 try_multi!(
130 partition.encode(buffer),
131 self.offset.encode(buffer),
132 self.max_bytes.encode(buffer)
133 )
134 }
135}
136
137pub struct ResponseParser<'a, 'b, 'c> {
140 pub validate_crc: bool,
141 pub requests: Option<&'c FetchRequest<'a, 'b>>,
142}
143
144impl<'a, 'b, 'c> super::ResponseParser for ResponseParser<'a, 'b, 'c> {
145 type T = Response;
146 fn parse(&self, response: Vec<u8>) -> Result<Self::T> {
147 Response::from_vec(response, self.requests, self.validate_crc)
148 }
149}
150
151macro_rules! array_of {
154 ($zreader:ident, $parse_elem:expr) => {{
155 let n_elems = $zreader.read_array_len()?;
156 let mut array = Vec::with_capacity(n_elems);
157 for _ in 0..n_elems {
158 array.push($parse_elem?);
159 }
160 array
161 }};
162}
163
164#[derive(Debug)]
168pub struct Response {
169 #[allow(dead_code)]
172 raw_data: Vec<u8>,
173
174 correlation_id: i32,
175
176 topics: Vec<Topic<'static>>,
181}
182
183impl Response {
184 fn from_vec(
187 response: Vec<u8>,
188 reqs: Option<&FetchRequest<'_, '_>>,
189 validate_crc: bool,
190 ) -> Result<Response> {
191 let slice = unsafe { mem::transmute(&response[..]) };
192 let mut r = ZReader::new(slice);
193 let correlation_id = r.read_i32()?;
194 let topics = array_of!(r, Topic::read(&mut r, reqs, validate_crc));
195 Ok(Response {
196 raw_data: response,
197 correlation_id,
198 topics,
199 })
200 }
201
202 #[inline]
205 pub fn correlation_id(&self) -> i32 {
206 self.correlation_id
207 }
208
209 #[inline]
212 pub fn topics<'a>(&'a self) -> &[Topic<'a>] {
213 &self.topics
214 }
215}
216
217#[derive(Debug)]
222pub struct Topic<'a> {
223 topic: &'a str,
224 partitions: Vec<Partition<'a>>,
225}
226
227impl<'a> Topic<'a> {
228 fn read(
229 r: &mut ZReader<'a>,
230 reqs: Option<&FetchRequest<'_, '_>>,
231 validate_crc: bool,
232 ) -> Result<Topic<'a>> {
233 let name = r.read_str()?;
234 let preqs = reqs.and_then(|reqs| reqs.get(name));
235 let partitions = array_of!(r, Partition::read(r, preqs, validate_crc));
236 Ok(Topic {
237 topic: name,
238 partitions,
239 })
240 }
241
242 #[inline]
244 pub fn topic(&self) -> &'a str {
245 self.topic
246 }
247
248 #[inline]
251 pub fn partitions(&self) -> &[Partition<'a>] {
252 &self.partitions
253 }
254}
255
256#[derive(Debug)]
264pub struct Partition<'a> {
265 partition: i32,
267
268 data: result::Result<Data<'a>, Arc<Error>>,
270}
271
272impl<'a> Partition<'a> {
273 fn read(
274 r: &mut ZReader<'a>,
275 preqs: Option<&TopicPartitionFetchRequest>,
276 validate_crc: bool,
277 ) -> Result<Partition<'a>> {
278 let partition = r.read_i32()?;
279 let proffs = preqs
280 .and_then(|preqs| preqs.get(partition))
281 .map(|preq| preq.offset)
282 .unwrap_or(0);
283
284 let err = Error::from_protocol(r.read_i16()?);
285 let highwatermark = r.read_i64()?;
288 let msgset = MessageSet::from_slice(r.read_bytes()?, proffs, validate_crc)?;
289
290 Ok(Partition {
291 partition,
292 data: match err {
293 Some(err) => Err(Arc::new(err)),
294 None => Ok(Data {
295 highwatermark_offset: highwatermark,
296 message_set: msgset,
297 }),
298 },
299 })
300 }
301
302 #[inline]
304 pub fn partition(&self) -> i32 {
305 self.partition
306 }
307
308 pub fn data(&'a self) -> result::Result<&'a Data<'a>, Arc<Error>> {
310 match self.data.as_ref() {
311 Ok(data) => Ok(data),
312 Err(err) => Err(err.clone()),
313 }
314 }
315}
316
317#[derive(Debug)]
319pub struct Data<'a> {
320 highwatermark_offset: i64,
321 message_set: MessageSet<'a>,
322}
323
324impl<'a> Data<'a> {
325 #[inline]
330 pub fn highwatermark_offset(&self) -> i64 {
331 self.highwatermark_offset
332 }
333
334 #[inline]
336 pub fn messages(&self) -> &[Message<'a>] {
337 &self.message_set.messages
338 }
339}
340
341#[derive(Debug)]
342struct MessageSet<'a> {
343 #[allow(dead_code)]
344 raw_data: Cow<'a, [u8]>, messages: Vec<Message<'a>>,
346}
347
348#[derive(Debug)]
351pub struct Message<'a> {
352 pub offset: i64,
355
356 pub key: &'a [u8],
359
360 pub value: &'a [u8],
363}
364
365impl<'a> MessageSet<'a> {
366 #[allow(dead_code)]
367 fn from_vec(data: Vec<u8>, req_offset: i64, validate_crc: bool) -> Result<MessageSet<'a>> {
368 let ms = MessageSet::from_slice(
374 unsafe { mem::transmute(&data[..]) },
375 req_offset,
376 validate_crc,
377 )?;
378 return Ok(MessageSet {
379 raw_data: Cow::Owned(data),
380 messages: ms.messages,
381 });
382 }
383
384 fn from_slice(raw_data: &[u8], req_offset: i64, validate_crc: bool) -> Result<MessageSet<'_>> {
385 let mut r = ZReader::new(raw_data);
386 let mut msgs = Vec::new();
387 while !r.is_empty() {
388 match MessageSet::next_message(&mut r, validate_crc) {
389 Err(Error::UnexpectedEOF) => {
393 break;
394 }
395 Err(e) => {
396 return Err(e);
397 }
398 Ok((offset, pmsg)) => {
399 match pmsg.attr & 0x07 {
402 c if c == Compression::NONE as i8 => {
403 if offset >= req_offset {
406 msgs.push(Message {
407 offset,
408 key: pmsg.key,
409 value: pmsg.value,
410 });
411 }
412 }
413 #[cfg(feature = "gzip")]
415 c if c == Compression::GZIP as i8 => {
416 let v = gzip::uncompress(pmsg.value)?;
417 return MessageSet::from_vec(v, req_offset, validate_crc);
418 }
419 #[cfg(feature = "snappy")]
420 c if c == Compression::SNAPPY as i8 => {
421 use std::io::Read;
422 let mut v = Vec::new();
423 SnappyReader::new(pmsg.value)?.read_to_end(&mut v)?;
424 return MessageSet::from_vec(v, req_offset, validate_crc);
425 }
426 _ => return Err(Error::UnsupportedCompression),
427 }
428 }
429 };
430 }
431 Ok(MessageSet {
432 raw_data: Cow::Borrowed(raw_data),
433 messages: msgs,
434 })
435 }
436
437 fn next_message<'b>(
438 r: &mut ZReader<'b>,
439 validate_crc: bool,
440 ) -> Result<(i64, ProtocolMessage<'b>)> {
441 let offset = r.read_i64()?;
442 let msg_data = r.read_bytes()?;
443 Ok((offset, ProtocolMessage::from_slice(msg_data, validate_crc)?))
444 }
445}
446
447struct ProtocolMessage<'a> {
449 attr: i8,
450 key: &'a [u8],
451 value: &'a [u8],
452}
453
454impl<'a> ProtocolMessage<'a> {
455 fn from_slice(raw_data: &[u8], validate_crc: bool) -> Result<ProtocolMessage<'_>> {
458 let mut r = ZReader::new(raw_data);
459
460 let msg_crc = r.read_i32()?;
462 if validate_crc && to_crc(r.rest()) as i32 != msg_crc {
463 return Err(Error::Kafka(KafkaCode::CorruptMessage));
464 }
465 let msg_magic = r.read_i8()?;
468 if msg_magic != 0 {
469 return Err(Error::UnsupportedProtocol);
470 }
471 let msg_attr = r.read_i8()?;
472 let msg_key = r.read_bytes()?;
473 let msg_val = r.read_bytes()?;
474
475 debug_assert!(r.is_empty());
476
477 Ok(ProtocolMessage {
478 attr: msg_attr,
479 key: msg_key,
480 value: msg_val,
481 })
482 }
483}
484
485#[cfg(test)]
488mod tests {
489 use std::str;
490
491 use super::{FetchRequest, Message, Response};
492 use crate::error::{Error, KafkaCode};
493
494 static FETCH1_TXT: &str = include_str!("../../test-data/fetch1.txt");
495
496 #[rustfmt::skip]
497 static FETCH1_FETCH_RESPONSE_NOCOMPRESSION_K0821: &[u8] =
498 include_bytes!("../../test-data/fetch1.mytopic.1p.nocompression.kafka.0821");
499
500 #[rustfmt::skip]
501 static FETCH1_FETCH_RESPONSE_SNAPPY_K0821: &[u8] =
502 include_bytes!("../../test-data/fetch1.mytopic.1p.snappy.kafka.0821");
503
504 #[cfg(feature = "snappy")]
505 #[rustfmt::skip]
506 static FETCH1_FETCH_RESPONSE_SNAPPY_K0822: &[u8] =
507 include_bytes!("../../test-data/fetch1.mytopic.1p.snappy.kafka.0822");
508
509 #[cfg(feature = "gzip")]
510 #[rustfmt::skip]
511 static FETCH1_FETCH_RESPONSE_GZIP_K0821: &[u8] =
512 include_bytes!("../../test-data/fetch1.mytopic.1p.gzip.kafka.0821");
513
514 static FETCH2_TXT: &str = include_str!("../../test-data/fetch2.txt");
515
516 #[rustfmt::skip]
517 static FETCH2_FETCH_RESPONSE_NOCOMPRESSION_K0900: &[u8] =
518 include_bytes!("../../test-data/fetch2.mytopic.nocompression.kafka.0900");
519
520 #[rustfmt::skip]
521 static FETCH2_FETCH_RESPONSE_NOCOMPRESSION_INVALID_CRC_K0900: &[u8] =
522 include_bytes!("../../test-data/fetch2.mytopic.nocompression.invalid_crc.kafka.0900");
523
524 fn into_messages(r: &Response) -> Vec<&Message<'_>> {
525 let mut all_msgs = Vec::new();
526 for t in r.topics() {
527 for p in t.partitions() {
528 let data = p.data().unwrap();
529 all_msgs.extend(data.messages());
530 }
531 }
532 all_msgs
533 }
534
535 fn test_decode_new_fetch_response(
536 msg_per_line: &str,
537 response: Vec<u8>,
538 requests: Option<&FetchRequest<'_, '_>>,
539 validate_crc: bool,
540 ) {
541 let resp = Response::from_vec(response, requests, validate_crc);
542 let resp = resp.unwrap();
543
544 let original: Vec<_> = msg_per_line.lines().collect();
545
546 assert_eq!(1, resp.topics.len());
548 assert_eq!("my-topic", resp.topics[0].topic);
550 assert_eq!(1, resp.topics[0].partitions.len());
552 assert_eq!(0, resp.topics[0].partitions[0].partition);
554 let msgs = into_messages(&resp);
558 assert_eq!(original.len(), msgs.len());
559 for (msg, orig) in msgs.into_iter().zip(original.iter()) {
560 assert_eq!(str::from_utf8(msg.value).unwrap(), *orig);
561 }
562 }
563
564 fn skip_lines(mut lines: &str, mut n: usize) -> &str {
565 while n > 0 {
566 n -= 1;
567 lines = &lines[lines.find('\n').map(|i| i + 1).unwrap_or(0)..];
568 }
569 lines
570 }
571
572 #[test]
573 fn test_from_slice_nocompression_k0821() {
574 let mut req = FetchRequest::new(0, "test", -1, -1);
575 req.add("my-topic", 0, 0, -1);
576 req.add("foo-quux", 0, 100, -1);
577 test_decode_new_fetch_response(
578 FETCH1_TXT,
579 FETCH1_FETCH_RESPONSE_NOCOMPRESSION_K0821.to_owned(),
580 Some(&req),
581 false,
582 );
583
584 req = FetchRequest::new(0, "test", -1, -1);
587 req.add("my-topic", 0, 5, -1);
588 test_decode_new_fetch_response(
589 skip_lines(FETCH1_TXT, 5),
590 FETCH1_FETCH_RESPONSE_NOCOMPRESSION_K0821.to_owned(),
591 Some(&req),
592 false,
593 );
594 }
595
596 #[cfg(not(feature = "snappy"))]
599 #[test]
600 fn test_unsupported_compression_snappy() {
601 let mut req = FetchRequest::new(0, "test", -1, -1);
602 req.add("my-topic", 0, 0, -1);
603 let r = Response::from_vec(
604 FETCH1_FETCH_RESPONSE_SNAPPY_K0821.to_owned(),
605 Some(&req),
606 false,
607 );
608 assert!(match r {
609 return Err(Error::UnsupportedCompression) => true,
610 _ => false,
611 });
612 }
613
614 #[cfg(feature = "snappy")]
615 #[test]
616 fn test_from_slice_snappy_k0821() {
617 let mut req = FetchRequest::new(0, "test", -1, -1);
618 req.add("my-topic", 0, 0, -1);
619 test_decode_new_fetch_response(
620 FETCH1_TXT,
621 FETCH1_FETCH_RESPONSE_SNAPPY_K0821.to_owned(),
622 Some(&req),
623 false,
624 );
625
626 req = FetchRequest::new(0, "test", -1, -1);
629 req.add("my-topic", 0, 3, -1);
630 test_decode_new_fetch_response(
631 skip_lines(FETCH1_TXT, 3),
632 FETCH1_FETCH_RESPONSE_SNAPPY_K0821.to_owned(),
633 Some(&req),
634 false,
635 );
636 }
637
638 #[cfg(feature = "snappy")]
639 #[test]
640 fn test_from_slice_snappy_k0822() {
641 let mut req = FetchRequest::new(0, "test", -1, -1);
642 req.add("my-topic", 0, 0, -1);
643 test_decode_new_fetch_response(
644 FETCH1_TXT,
645 FETCH1_FETCH_RESPONSE_SNAPPY_K0822.to_owned(),
646 Some(&req),
647 false,
648 );
649 }
650
651 #[cfg(feature = "gzip")]
652 #[test]
653 fn test_from_slice_gzip_k0821() {
654 let mut req = FetchRequest::new(0, "test", -1, -1);
655 req.add("my-topic", 0, 0, -1);
656 test_decode_new_fetch_response(
657 FETCH1_TXT,
658 FETCH1_FETCH_RESPONSE_GZIP_K0821.to_owned(),
659 Some(&req),
660 false,
661 );
662
663 req.add("my-topic", 0, 1, -1);
666 test_decode_new_fetch_response(
667 skip_lines(FETCH1_TXT, 1),
668 FETCH1_FETCH_RESPONSE_GZIP_K0821.to_owned(),
669 Some(&req),
670 false,
671 );
672
673 req.add("my-topic", 0, 10, -1);
676 test_decode_new_fetch_response(
677 skip_lines(FETCH1_TXT, 10),
678 FETCH1_FETCH_RESPONSE_GZIP_K0821.to_owned(),
679 Some(&req),
680 false,
681 );
682 }
683
684 #[test]
685 fn test_crc_validation() {
686 test_decode_new_fetch_response(
687 FETCH2_TXT,
688 FETCH2_FETCH_RESPONSE_NOCOMPRESSION_K0900.to_owned(),
689 None,
690 true,
691 );
692
693 test_decode_new_fetch_response(
699 FETCH2_TXT,
700 FETCH2_FETCH_RESPONSE_NOCOMPRESSION_INVALID_CRC_K0900.to_owned(),
701 None,
702 false,
703 );
704 match Response::from_vec(
706 FETCH2_FETCH_RESPONSE_NOCOMPRESSION_INVALID_CRC_K0900.to_owned(),
707 None,
708 true,
709 ) {
710 Ok(_) => panic!("Expected error, but got successful response!"),
711 Err(Error::Kafka(KafkaCode::CorruptMessage)) => {}
712 Err(e) => panic!("Expected KafkaCode::CorruptMessage error, but got: {:?}", e),
713 }
714 }
715
716 #[cfg(feature = "nightly")]
717 mod benches {
718 use test::{black_box, Bencher};
719
720 use super::super::{FetchRequest, Response};
721 use super::into_messages;
722
723 fn bench_decode_new_fetch_response(b: &mut Bencher, data: Vec<u8>, validate_crc: bool) {
724 let mut reqs = FetchRequest::new(0, "foo", -1, -1);
725 reqs.add("my-topic", 0, 0, -1);
726 b.bytes = data.len() as u64;
727 b.iter(|| {
728 let data = data.clone();
729 let r = black_box(Response::from_vec(data, Some(&reqs), validate_crc).unwrap());
730 let v = black_box(into_messages(&r));
731 v.len()
732 });
733 }
734
735 #[bench]
736 fn bench_decode_new_fetch_response_nocompression_k0821(b: &mut Bencher) {
737 bench_decode_new_fetch_response(
738 b,
739 super::FETCH1_FETCH_RESPONSE_NOCOMPRESSION_K0821.to_owned(),
740 false,
741 )
742 }
743
744 #[cfg(feature = "snappy")]
745 #[bench]
746 fn bench_decode_new_fetch_response_snappy_k0821(b: &mut Bencher) {
747 bench_decode_new_fetch_response(
748 b,
749 super::FETCH1_FETCH_RESPONSE_SNAPPY_K0821.to_owned(),
750 false,
751 )
752 }
753
754 #[cfg(feature = "snappy")]
755 #[bench]
756 fn bench_decode_new_fetch_response_snappy_k0822(b: &mut Bencher) {
757 bench_decode_new_fetch_response(
758 b,
759 super::FETCH1_FETCH_RESPONSE_SNAPPY_K0822.to_owned(),
760 false,
761 )
762 }
763
764 #[cfg(feature = "gzip")]
765 #[bench]
766 fn bench_decode_new_fetch_response_gzip_k0821(b: &mut Bencher) {
767 bench_decode_new_fetch_response(
768 b,
769 super::FETCH1_FETCH_RESPONSE_GZIP_K0821.to_owned(),
770 false,
771 )
772 }
773
774 #[bench]
775 fn bench_decode_new_fetch_response_nocompression_k0821_validate_crc(b: &mut Bencher) {
776 bench_decode_new_fetch_response(
777 b,
778 super::FETCH1_FETCH_RESPONSE_NOCOMPRESSION_K0821.to_owned(),
779 true,
780 )
781 }
782
783 #[cfg(feature = "snappy")]
784 #[bench]
785 fn bench_decode_new_fetch_response_snappy_k0821_validate_crc(b: &mut Bencher) {
786 bench_decode_new_fetch_response(
787 b,
788 super::FETCH1_FETCH_RESPONSE_SNAPPY_K0821.to_owned(),
789 true,
790 )
791 }
792
793 #[cfg(feature = "snappy")]
794 #[bench]
795 fn bench_decode_new_fetch_response_snappy_k0822_validate_crc(b: &mut Bencher) {
796 bench_decode_new_fetch_response(
797 b,
798 super::FETCH1_FETCH_RESPONSE_SNAPPY_K0822.to_owned(),
799 true,
800 )
801 }
802
803 #[cfg(feature = "gzip")]
804 #[bench]
805 fn bench_decode_new_fetch_response_gzip_k0821_validate_crc(b: &mut Bencher) {
806 bench_decode_new_fetch_response(
807 b,
808 super::FETCH1_FETCH_RESPONSE_GZIP_K0821.to_owned(),
809 true,
810 )
811 }
812 }
813}