1use std::io::{Read, Write};
2use std::mem;
3use std::time::Duration;
4
5use crate::codecs::{FromByte, ToByte};
6use crate::error::{Error, KafkaCode, Result};
7use crc::Crc;
8
9macro_rules! try_multi {
11 ($($input_expr:expr),*) => ({
12 $($input_expr?;)*
13 Ok(())
14 })
15}
16
17pub mod consumer;
18pub mod metadata;
19pub mod offset;
20pub mod produce;
21
22pub mod fetch;
23mod zreader;
24
25pub use self::consumer::{
28 GroupCoordinatorRequest, GroupCoordinatorResponse, OffsetCommitRequest, OffsetCommitResponse,
29 OffsetCommitVersion, OffsetFetchRequest, OffsetFetchResponse, OffsetFetchVersion,
30};
31pub use self::fetch::FetchRequest;
32pub use self::metadata::{MetadataRequest, MetadataResponse};
33pub use self::offset::{OffsetRequest, OffsetResponse};
34pub use self::produce::{ProduceRequest, ProduceResponse};
35
36const API_KEY_PRODUCE: i16 = 0;
39const API_KEY_FETCH: i16 = 1;
40const API_KEY_OFFSET: i16 = 2;
41const API_KEY_METADATA: i16 = 3;
42const API_KEY_OFFSET_COMMIT: i16 = 8;
44const API_KEY_OFFSET_FETCH: i16 = 9;
45const API_KEY_GROUP_COORDINATOR: i16 = 10;
46
47const API_VERSION: i16 = 0;
49
50pub trait ResponseParser {
55 type T;
56 fn parse(&self, response: Vec<u8>) -> Result<Self::T>;
57}
58
59impl KafkaCode {
62 fn from_protocol(n: i16) -> Option<KafkaCode> {
63 if n == 0 {
64 return None;
65 }
66 if n >= KafkaCode::OffsetOutOfRange as i16 && n <= KafkaCode::UnsupportedVersion as i16 {
67 return Some(unsafe { mem::transmute(n as i8) });
68 }
69 Some(KafkaCode::Unknown)
70 }
71}
72
73#[test]
74fn test_kafka_code_from_protocol() {
75 use std::i16;
76
77 macro_rules! assert_kafka_code {
78 ($kcode:path, $n:expr) => {
79 assert!(if let Some($kcode) = KafkaCode::from_protocol($n) {
80 true
81 } else {
82 false
83 })
84 };
85 }
86
87 assert!(KafkaCode::from_protocol(0).is_none());
88 assert_kafka_code!(
89 KafkaCode::OffsetOutOfRange,
90 KafkaCode::OffsetOutOfRange as i16
91 );
92 assert_kafka_code!(
93 KafkaCode::IllegalGeneration,
94 KafkaCode::IllegalGeneration as i16
95 );
96 assert_kafka_code!(
97 KafkaCode::UnsupportedVersion,
98 KafkaCode::UnsupportedVersion as i16
99 );
100 assert_kafka_code!(KafkaCode::Unknown, KafkaCode::Unknown as i16);
101 assert_kafka_code!(KafkaCode::Unknown, i16::MAX);
103 assert_kafka_code!(KafkaCode::Unknown, i16::MIN);
104 assert_kafka_code!(KafkaCode::Unknown, -100);
105 assert_kafka_code!(KafkaCode::Unknown, 100);
106}
107
108impl Error {
110 fn from_protocol(n: i16) -> Option<Error> {
111 KafkaCode::from_protocol(n).map(Error::Kafka)
112 }
113}
114
115#[derive(Debug)]
118pub struct HeaderRequest<'a> {
119 pub api_key: i16,
120 pub api_version: i16,
121 pub correlation_id: i32,
122 pub client_id: &'a str,
123}
124
125impl<'a> HeaderRequest<'a> {
126 fn new(
127 api_key: i16,
128 api_version: i16,
129 correlation_id: i32,
130 client_id: &'a str,
131 ) -> HeaderRequest<'_> {
132 HeaderRequest {
133 api_key,
134 api_version,
135 correlation_id,
136 client_id,
137 }
138 }
139}
140
141impl<'a> ToByte for HeaderRequest<'a> {
142 fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> {
143 try_multi!(
144 self.api_key.encode(buffer),
145 self.api_version.encode(buffer),
146 self.correlation_id.encode(buffer),
147 self.client_id.encode(buffer)
148 )
149 }
150}
151
152#[derive(Default, Debug, Clone)]
155pub struct HeaderResponse {
156 pub correlation: i32,
157}
158
159impl FromByte for HeaderResponse {
160 type R = HeaderResponse;
161
162 #[allow(unused_must_use)]
163 fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
164 self.correlation.decode(buffer)
165 }
166}
167
168pub fn to_crc(data: &[u8]) -> u32 {
171 Crc::<u32>::new(&crc::CRC_32_ISO_HDLC).checksum(data)
172}
173
174pub fn to_millis_i32(d: Duration) -> Result<i32> {
179 use std::i32;
180 let m = d
181 .as_secs()
182 .saturating_mul(1_000)
183 .saturating_add(d.subsec_millis() as u64);
184 if m > i32::MAX as u64 {
185 Err(Error::InvalidDuration)
186 } else {
187 Ok(m as i32)
188 }
189}
190
191#[test]
192fn test_to_millis_i32() {
193 use std::{i32, u32, u64};
194
195 fn assert_invalid(d: Duration) {
196 match to_millis_i32(d) {
197 Err(Error::InvalidDuration) => {}
198 other => panic!("Expected Err(InvalidDuration) but got {:?}", other),
199 }
200 }
201 fn assert_valid(d: Duration, expected_millis: i32) {
202 let r = to_millis_i32(d);
203 match r {
204 Ok(m) => assert_eq!(expected_millis, m),
205 Err(e) => panic!("Expected Ok({}) but got Err({:?})", expected_millis, e),
206 }
207 }
208 assert_valid(Duration::from_millis(1_234), 1_234);
209 assert_valid(Duration::new(540, 123_456_789), 540_123);
210 assert_invalid(Duration::from_millis(u64::MAX));
211 assert_invalid(Duration::from_millis(u32::MAX as u64));
212 assert_invalid(Duration::from_millis(i32::MAX as u64 + 1));
213 assert_valid(Duration::from_millis(i32::MAX as u64 - 1), i32::MAX - 1);
214}