kafka/protocol/
consumer.rs

1use std::io::{Read, Write};
2
3use crate::codecs::{self, FromByte, ToByte};
4use crate::error::{self, Error, KafkaCode, Result};
5use crate::utils::PartitionOffset;
6
7use super::{HeaderRequest, HeaderResponse};
8use super::{API_KEY_GROUP_COORDINATOR, API_KEY_OFFSET_COMMIT, API_KEY_OFFSET_FETCH, API_VERSION};
9
10// --------------------------------------------------------------------
11
12#[derive(Debug)]
13pub struct GroupCoordinatorRequest<'a, 'b> {
14    pub header: HeaderRequest<'a>,
15    pub group: &'b str,
16}
17
18impl<'a, 'b> GroupCoordinatorRequest<'a, 'b> {
19    pub fn new(
20        group: &'b str,
21        correlation_id: i32,
22        client_id: &'a str,
23    ) -> GroupCoordinatorRequest<'a, 'b> {
24        GroupCoordinatorRequest {
25            header: HeaderRequest::new(
26                API_KEY_GROUP_COORDINATOR,
27                API_VERSION,
28                correlation_id,
29                client_id,
30            ),
31            group,
32        }
33    }
34}
35
36impl<'a, 'b> ToByte for GroupCoordinatorRequest<'a, 'b> {
37    fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> {
38        try_multi!(self.header.encode(buffer), self.group.encode(buffer))
39    }
40}
41
42#[derive(Debug, Default)]
43pub struct GroupCoordinatorResponse {
44    pub header: HeaderResponse,
45    pub error: i16,
46    pub broker_id: i32,
47    pub port: i32,
48    pub host: String,
49}
50
51impl GroupCoordinatorResponse {
52    pub fn into_result(self) -> Result<Self> {
53        match Error::from_protocol(self.error) {
54            Some(e) => Err(e),
55            None => Ok(self),
56        }
57    }
58}
59
60impl FromByte for GroupCoordinatorResponse {
61    type R = GroupCoordinatorResponse;
62
63    fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
64        try_multi!(
65            self.header.decode(buffer),
66            self.error.decode(buffer),
67            self.broker_id.decode(buffer),
68            self.host.decode(buffer),
69            self.port.decode(buffer)
70        )
71    }
72}
73
74// --------------------------------------------------------------------
75
76#[derive(Debug, Copy, Clone, PartialEq, Eq)]
77pub enum OffsetFetchVersion {
78    /// causes the retrieval of the offsets from zookeeper
79    V0 = 0,
80    /// supported as of kafka 0.8.2, causes the retrieval of the
81    /// offsets from kafka itself
82    V1 = 1,
83}
84
85#[derive(Debug)]
86pub struct OffsetFetchRequest<'a, 'b, 'c> {
87    pub header: HeaderRequest<'a>,
88    pub group: &'b str,
89    pub topic_partitions: Vec<TopicPartitionOffsetFetchRequest<'c>>,
90}
91
92#[derive(Debug)]
93pub struct TopicPartitionOffsetFetchRequest<'a> {
94    pub topic: &'a str,
95    pub partitions: Vec<PartitionOffsetFetchRequest>,
96}
97
98#[derive(Debug)]
99pub struct PartitionOffsetFetchRequest {
100    pub partition: i32,
101}
102
103impl<'a, 'b, 'c> OffsetFetchRequest<'a, 'b, 'c> {
104    pub fn new(
105        group: &'b str,
106        version: OffsetFetchVersion,
107        correlation_id: i32,
108        client_id: &'a str,
109    ) -> OffsetFetchRequest<'a, 'b, 'c> {
110        OffsetFetchRequest {
111            header: HeaderRequest::new(
112                API_KEY_OFFSET_FETCH,
113                version as i16,
114                correlation_id,
115                client_id,
116            ),
117            group,
118            topic_partitions: vec![],
119        }
120    }
121
122    pub fn add(&mut self, topic: &'c str, partition: i32) {
123        for tp in &mut self.topic_partitions {
124            if tp.topic == topic {
125                tp.add(partition);
126                return;
127            }
128        }
129        let mut tp = TopicPartitionOffsetFetchRequest::new(topic);
130        tp.add(partition);
131        self.topic_partitions.push(tp);
132    }
133}
134
135impl<'a> TopicPartitionOffsetFetchRequest<'a> {
136    pub fn new(topic: &'a str) -> TopicPartitionOffsetFetchRequest<'a> {
137        TopicPartitionOffsetFetchRequest {
138            topic,
139            partitions: vec![],
140        }
141    }
142
143    pub fn add(&mut self, partition: i32) {
144        self.partitions
145            .push(PartitionOffsetFetchRequest::new(partition));
146    }
147}
148
149impl PartitionOffsetFetchRequest {
150    pub fn new(partition: i32) -> PartitionOffsetFetchRequest {
151        PartitionOffsetFetchRequest { partition }
152    }
153}
154
155impl<'a, 'b, 'c> ToByte for OffsetFetchRequest<'a, 'b, 'c> {
156    fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> {
157        try_multi!(
158            self.header.encode(buffer),
159            self.group.encode(buffer),
160            self.topic_partitions.encode(buffer)
161        )
162    }
163}
164
165impl<'a> ToByte for TopicPartitionOffsetFetchRequest<'a> {
166    fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> {
167        try_multi!(self.topic.encode(buffer), self.partitions.encode(buffer))
168    }
169}
170
171impl ToByte for PartitionOffsetFetchRequest {
172    fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> {
173        self.partition.encode(buffer)
174    }
175}
176
177// --------------------------------------------------------------------
178
179#[derive(Default, Debug)]
180pub struct OffsetFetchResponse {
181    pub header: HeaderResponse,
182    pub topic_partitions: Vec<TopicPartitionOffsetFetchResponse>,
183}
184
185#[derive(Default, Debug)]
186pub struct TopicPartitionOffsetFetchResponse {
187    pub topic: String,
188    pub partitions: Vec<PartitionOffsetFetchResponse>,
189}
190
191#[derive(Default, Debug)]
192pub struct PartitionOffsetFetchResponse {
193    pub partition: i32,
194    pub offset: i64,
195    pub metadata: String,
196    pub error: i16,
197}
198
199impl PartitionOffsetFetchResponse {
200    pub fn get_offsets(&self) -> Result<PartitionOffset> {
201        match Error::from_protocol(self.error) {
202            Some(Error::Kafka(KafkaCode::UnknownTopicOrPartition)) => {
203                // ~ occurs only on protocol v0 when no offset available
204                // for the group in question; we'll align the behavior
205                // with protocol v1.
206                Ok(PartitionOffset {
207                    partition: self.partition,
208                    offset: -1,
209                })
210            }
211            Some(e) => Err(e),
212            None => Ok(PartitionOffset {
213                partition: self.partition,
214                offset: self.offset,
215            }),
216        }
217    }
218}
219
220impl FromByte for OffsetFetchResponse {
221    type R = OffsetFetchResponse;
222
223    fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
224        try_multi!(
225            self.header.decode(buffer),
226            self.topic_partitions.decode(buffer)
227        )
228    }
229}
230
231impl FromByte for TopicPartitionOffsetFetchResponse {
232    type R = TopicPartitionOffsetFetchResponse;
233
234    fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
235        try_multi!(self.topic.decode(buffer), self.partitions.decode(buffer))
236    }
237}
238
239impl FromByte for PartitionOffsetFetchResponse {
240    type R = PartitionOffsetFetchResponse;
241
242    fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
243        try_multi!(
244            self.partition.decode(buffer),
245            self.offset.decode(buffer),
246            self.metadata.decode(buffer),
247            self.error.decode(buffer)
248        )
249    }
250}
251
252// --------------------------------------------------------------------
253
254#[derive(Debug, Copy, Clone, PartialEq, Eq)]
255pub enum OffsetCommitVersion {
256    /// causes offset to be stored in zookeeper
257    V0 = 0,
258    /// supported as of kafka 0.8.2, causes offsets to be stored
259    /// directly in kafka
260    V1 = 1,
261    /// supported as of kafka 0.9.0, causes offsets to be stored
262    /// directly in kafka
263    V2 = 2,
264}
265
266impl OffsetCommitVersion {
267    fn from_protocol(n: i16) -> OffsetCommitVersion {
268        match n {
269            0 => OffsetCommitVersion::V0,
270            1 => OffsetCommitVersion::V1,
271            2 => OffsetCommitVersion::V2,
272            _ => panic!("Unknown offset commit version code: {}", n),
273        }
274    }
275}
276
277#[derive(Debug)]
278pub struct OffsetCommitRequest<'a, 'b> {
279    pub header: HeaderRequest<'a>,
280    pub group: &'b str,
281    pub topic_partitions: Vec<TopicPartitionOffsetCommitRequest<'b>>,
282}
283
284#[derive(Debug)]
285pub struct TopicPartitionOffsetCommitRequest<'a> {
286    pub topic: &'a str,
287    pub partitions: Vec<PartitionOffsetCommitRequest<'a>>,
288}
289
290#[derive(Debug)]
291pub struct PartitionOffsetCommitRequest<'a> {
292    pub partition: i32,
293    pub offset: i64,
294    pub metadata: &'a str,
295}
296
297impl<'a, 'b> OffsetCommitRequest<'a, 'b> {
298    pub fn new(
299        group: &'b str,
300        version: OffsetCommitVersion,
301        correlation_id: i32,
302        client_id: &'a str,
303    ) -> OffsetCommitRequest<'a, 'b> {
304        OffsetCommitRequest {
305            header: HeaderRequest::new(
306                API_KEY_OFFSET_COMMIT,
307                version as i16,
308                correlation_id,
309                client_id,
310            ),
311            group,
312            topic_partitions: vec![],
313        }
314    }
315
316    pub fn add(&mut self, topic: &'b str, partition: i32, offset: i64, metadata: &'b str) {
317        for tp in &mut self.topic_partitions {
318            if tp.topic == topic {
319                tp.add(partition, offset, metadata);
320                return;
321            }
322        }
323        let mut tp = TopicPartitionOffsetCommitRequest::new(topic);
324        tp.add(partition, offset, metadata);
325        self.topic_partitions.push(tp);
326    }
327}
328
329impl<'a> TopicPartitionOffsetCommitRequest<'a> {
330    pub fn new(topic: &'a str) -> TopicPartitionOffsetCommitRequest<'a> {
331        TopicPartitionOffsetCommitRequest {
332            topic,
333            partitions: vec![],
334        }
335    }
336
337    pub fn add(&mut self, partition: i32, offset: i64, metadata: &'a str) {
338        self.partitions.push(PartitionOffsetCommitRequest::new(
339            partition, offset, metadata,
340        ))
341    }
342}
343
344impl<'a> PartitionOffsetCommitRequest<'a> {
345    pub fn new(partition: i32, offset: i64, metadata: &'a str) -> PartitionOffsetCommitRequest<'a> {
346        PartitionOffsetCommitRequest {
347            partition,
348            offset,
349            metadata,
350        }
351    }
352}
353
354impl<'a, 'b> ToByte for OffsetCommitRequest<'a, 'b> {
355    fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> {
356        let v = OffsetCommitVersion::from_protocol(self.header.api_version);
357        self.header.encode(buffer)?;
358        self.group.encode(buffer)?;
359        match v {
360            OffsetCommitVersion::V1 => {
361                (-1i32).encode(buffer)?;
362                "".encode(buffer)?;
363            }
364            OffsetCommitVersion::V2 => {
365                (-1i32).encode(buffer)?;
366                "".encode(buffer)?;
367                (-1i64).encode(buffer)?;
368            }
369            _ => {
370                // nothing to do
371            }
372        }
373        codecs::encode_as_array(buffer, &self.topic_partitions, |buffer, tp| {
374            try_multi!(
375                tp.topic.encode(buffer),
376                codecs::encode_as_array(buffer, &tp.partitions, |buffer, p| {
377                    p.partition.encode(buffer)?;
378                    p.offset.encode(buffer)?;
379                    if v == OffsetCommitVersion::V1 {
380                        (-1i64).encode(buffer)?;
381                    }
382                    p.metadata.encode(buffer)
383                })
384            )
385        })
386    }
387}
388
389// --------------------------------------------------------------------
390
391#[derive(Default, Debug)]
392pub struct OffsetCommitResponse {
393    pub header: HeaderResponse,
394    pub topic_partitions: Vec<TopicPartitionOffsetCommitResponse>,
395}
396
397impl FromByte for OffsetCommitResponse {
398    type R = OffsetCommitResponse;
399
400    fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
401        try_multi!(
402            self.header.decode(buffer),
403            self.topic_partitions.decode(buffer)
404        )
405    }
406}
407
408#[derive(Default, Debug)]
409pub struct TopicPartitionOffsetCommitResponse {
410    pub topic: String,
411    pub partitions: Vec<PartitionOffsetCommitResponse>,
412}
413
414impl FromByte for TopicPartitionOffsetCommitResponse {
415    type R = TopicPartitionOffsetCommitResponse;
416
417    fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
418        try_multi!(self.topic.decode(buffer), self.partitions.decode(buffer))
419    }
420}
421
422#[derive(Default, Debug)]
423pub struct PartitionOffsetCommitResponse {
424    pub partition: i32,
425    pub error: i16,
426}
427
428impl PartitionOffsetCommitResponse {
429    pub fn to_error(&self) -> Option<error::KafkaCode> {
430        error::KafkaCode::from_protocol(self.error)
431    }
432}
433
434impl FromByte for PartitionOffsetCommitResponse {
435    type R = PartitionOffsetCommitResponse;
436
437    fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
438        try_multi!(self.partition.decode(buffer), self.error.decode(buffer))
439    }
440}