kafka/protocol/
mod.rs

1use std::io::{Read, Write};
2use std::mem;
3use std::time::Duration;
4
5use crate::codecs::{FromByte, ToByte};
6use crate::error::{Error, KafkaCode, Result};
7use crc::Crc;
8
9/// Macro to return Result<()> from multiple statements
10macro_rules! try_multi {
11    ($($input_expr:expr),*) => ({
12        $($input_expr?;)*
13        Ok(())
14    })
15}
16
17pub mod consumer;
18pub mod metadata;
19pub mod offset;
20pub mod produce;
21
22pub mod fetch;
23mod zreader;
24
25// ~ convenient re-exports for request/response types defined in the
26// submodules
27pub use self::consumer::{
28    GroupCoordinatorRequest, GroupCoordinatorResponse, OffsetCommitRequest, OffsetCommitResponse,
29    OffsetCommitVersion, OffsetFetchRequest, OffsetFetchResponse, OffsetFetchVersion,
30};
31pub use self::fetch::FetchRequest;
32pub use self::metadata::{MetadataRequest, MetadataResponse};
33pub use self::offset::{OffsetRequest, OffsetResponse};
34pub use self::produce::{ProduceRequest, ProduceResponse};
35
36// --------------------------------------------------------------------
37
38const API_KEY_PRODUCE: i16 = 0;
39const API_KEY_FETCH: i16 = 1;
40const API_KEY_OFFSET: i16 = 2;
41const API_KEY_METADATA: i16 = 3;
42// 4-7 reserved for non-public kafka api services
43const API_KEY_OFFSET_COMMIT: i16 = 8;
44const API_KEY_OFFSET_FETCH: i16 = 9;
45const API_KEY_GROUP_COORDINATOR: i16 = 10;
46
47// the default version of Kafka API we are requesting
48const API_VERSION: i16 = 0;
49
50// --------------------------------------------------------------------
51
52/// Provides a way to parse the full raw response data into a
53/// particular response structure.
54pub trait ResponseParser {
55    type T;
56    fn parse(&self, response: Vec<u8>) -> Result<Self::T>;
57}
58
59// --------------------------------------------------------------------
60
61impl KafkaCode {
62    fn from_protocol(n: i16) -> Option<KafkaCode> {
63        if n == 0 {
64            return None;
65        }
66        if n >= KafkaCode::OffsetOutOfRange as i16 && n <= KafkaCode::UnsupportedVersion as i16 {
67            return Some(unsafe { mem::transmute(n as i8) });
68        }
69        Some(KafkaCode::Unknown)
70    }
71}
72
73#[test]
74fn test_kafka_code_from_protocol() {
75    use std::i16;
76
77    macro_rules! assert_kafka_code {
78        ($kcode:path, $n:expr) => {
79            assert!(if let Some($kcode) = KafkaCode::from_protocol($n) {
80                true
81            } else {
82                false
83            })
84        };
85    }
86
87    assert!(KafkaCode::from_protocol(0).is_none());
88    assert_kafka_code!(
89        KafkaCode::OffsetOutOfRange,
90        KafkaCode::OffsetOutOfRange as i16
91    );
92    assert_kafka_code!(
93        KafkaCode::IllegalGeneration,
94        KafkaCode::IllegalGeneration as i16
95    );
96    assert_kafka_code!(
97        KafkaCode::UnsupportedVersion,
98        KafkaCode::UnsupportedVersion as i16
99    );
100    assert_kafka_code!(KafkaCode::Unknown, KafkaCode::Unknown as i16);
101    // ~ test some un mapped non-zero codes; should all map to "unknown"
102    assert_kafka_code!(KafkaCode::Unknown, i16::MAX);
103    assert_kafka_code!(KafkaCode::Unknown, i16::MIN);
104    assert_kafka_code!(KafkaCode::Unknown, -100);
105    assert_kafka_code!(KafkaCode::Unknown, 100);
106}
107
108// a (sub-) module private method for error
109impl Error {
110    fn from_protocol(n: i16) -> Option<Error> {
111        KafkaCode::from_protocol(n).map(Error::Kafka)
112    }
113}
114
115// --------------------------------------------------------------------
116
117#[derive(Debug)]
118pub struct HeaderRequest<'a> {
119    pub api_key: i16,
120    pub api_version: i16,
121    pub correlation_id: i32,
122    pub client_id: &'a str,
123}
124
125impl<'a> HeaderRequest<'a> {
126    fn new(
127        api_key: i16,
128        api_version: i16,
129        correlation_id: i32,
130        client_id: &'a str,
131    ) -> HeaderRequest<'_> {
132        HeaderRequest {
133            api_key,
134            api_version,
135            correlation_id,
136            client_id,
137        }
138    }
139}
140
141impl<'a> ToByte for HeaderRequest<'a> {
142    fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> {
143        try_multi!(
144            self.api_key.encode(buffer),
145            self.api_version.encode(buffer),
146            self.correlation_id.encode(buffer),
147            self.client_id.encode(buffer)
148        )
149    }
150}
151
152// --------------------------------------------------------------------
153
154#[derive(Default, Debug, Clone)]
155pub struct HeaderResponse {
156    pub correlation: i32,
157}
158
159impl FromByte for HeaderResponse {
160    type R = HeaderResponse;
161
162    #[allow(unused_must_use)]
163    fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
164        self.correlation.decode(buffer)
165    }
166}
167
168// --------------------------------------------------------------------
169
170pub fn to_crc(data: &[u8]) -> u32 {
171    Crc::<u32>::new(&crc::CRC_32_ISO_HDLC).checksum(data)
172}
173
174// --------------------------------------------------------------------
175
176/// Safely converts a Duration into the number of milliseconds as a
177/// i32 as often required in the kafka protocol.
178pub fn to_millis_i32(d: Duration) -> Result<i32> {
179    use std::i32;
180    let m = d
181        .as_secs()
182        .saturating_mul(1_000)
183        .saturating_add(d.subsec_millis() as u64);
184    if m > i32::MAX as u64 {
185        Err(Error::InvalidDuration)
186    } else {
187        Ok(m as i32)
188    }
189}
190
191#[test]
192fn test_to_millis_i32() {
193    use std::{i32, u32, u64};
194
195    fn assert_invalid(d: Duration) {
196        match to_millis_i32(d) {
197            Err(Error::InvalidDuration) => {}
198            other => panic!("Expected Err(InvalidDuration) but got {:?}", other),
199        }
200    }
201    fn assert_valid(d: Duration, expected_millis: i32) {
202        let r = to_millis_i32(d);
203        match r {
204            Ok(m) => assert_eq!(expected_millis, m),
205            Err(e) => panic!("Expected Ok({}) but got Err({:?})", expected_millis, e),
206        }
207    }
208    assert_valid(Duration::from_millis(1_234), 1_234);
209    assert_valid(Duration::new(540, 123_456_789), 540_123);
210    assert_invalid(Duration::from_millis(u64::MAX));
211    assert_invalid(Duration::from_millis(u32::MAX as u64));
212    assert_invalid(Duration::from_millis(i32::MAX as u64 + 1));
213    assert_valid(Duration::from_millis(i32::MAX as u64 - 1), i32::MAX - 1);
214}