1use std::io::{Read, Write};
2
3use crate::codecs::{self, FromByte, ToByte};
4use crate::error::{self, Error, KafkaCode, Result};
5use crate::utils::PartitionOffset;
6
7use super::{HeaderRequest, HeaderResponse};
8use super::{API_KEY_GROUP_COORDINATOR, API_KEY_OFFSET_COMMIT, API_KEY_OFFSET_FETCH, API_VERSION};
9
10#[derive(Debug)]
13pub struct GroupCoordinatorRequest<'a, 'b> {
14 pub header: HeaderRequest<'a>,
15 pub group: &'b str,
16}
17
18impl<'a, 'b> GroupCoordinatorRequest<'a, 'b> {
19 pub fn new(
20 group: &'b str,
21 correlation_id: i32,
22 client_id: &'a str,
23 ) -> GroupCoordinatorRequest<'a, 'b> {
24 GroupCoordinatorRequest {
25 header: HeaderRequest::new(
26 API_KEY_GROUP_COORDINATOR,
27 API_VERSION,
28 correlation_id,
29 client_id,
30 ),
31 group,
32 }
33 }
34}
35
36impl<'a, 'b> ToByte for GroupCoordinatorRequest<'a, 'b> {
37 fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> {
38 try_multi!(self.header.encode(buffer), self.group.encode(buffer))
39 }
40}
41
42#[derive(Debug, Default)]
43pub struct GroupCoordinatorResponse {
44 pub header: HeaderResponse,
45 pub error: i16,
46 pub broker_id: i32,
47 pub port: i32,
48 pub host: String,
49}
50
51impl GroupCoordinatorResponse {
52 pub fn into_result(self) -> Result<Self> {
53 match Error::from_protocol(self.error) {
54 Some(e) => Err(e),
55 None => Ok(self),
56 }
57 }
58}
59
60impl FromByte for GroupCoordinatorResponse {
61 type R = GroupCoordinatorResponse;
62
63 fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
64 try_multi!(
65 self.header.decode(buffer),
66 self.error.decode(buffer),
67 self.broker_id.decode(buffer),
68 self.host.decode(buffer),
69 self.port.decode(buffer)
70 )
71 }
72}
73
74#[derive(Debug, Copy, Clone, PartialEq, Eq)]
77pub enum OffsetFetchVersion {
78 V0 = 0,
80 V1 = 1,
83}
84
85#[derive(Debug)]
86pub struct OffsetFetchRequest<'a, 'b, 'c> {
87 pub header: HeaderRequest<'a>,
88 pub group: &'b str,
89 pub topic_partitions: Vec<TopicPartitionOffsetFetchRequest<'c>>,
90}
91
92#[derive(Debug)]
93pub struct TopicPartitionOffsetFetchRequest<'a> {
94 pub topic: &'a str,
95 pub partitions: Vec<PartitionOffsetFetchRequest>,
96}
97
98#[derive(Debug)]
99pub struct PartitionOffsetFetchRequest {
100 pub partition: i32,
101}
102
103impl<'a, 'b, 'c> OffsetFetchRequest<'a, 'b, 'c> {
104 pub fn new(
105 group: &'b str,
106 version: OffsetFetchVersion,
107 correlation_id: i32,
108 client_id: &'a str,
109 ) -> OffsetFetchRequest<'a, 'b, 'c> {
110 OffsetFetchRequest {
111 header: HeaderRequest::new(
112 API_KEY_OFFSET_FETCH,
113 version as i16,
114 correlation_id,
115 client_id,
116 ),
117 group,
118 topic_partitions: vec![],
119 }
120 }
121
122 pub fn add(&mut self, topic: &'c str, partition: i32) {
123 for tp in &mut self.topic_partitions {
124 if tp.topic == topic {
125 tp.add(partition);
126 return;
127 }
128 }
129 let mut tp = TopicPartitionOffsetFetchRequest::new(topic);
130 tp.add(partition);
131 self.topic_partitions.push(tp);
132 }
133}
134
135impl<'a> TopicPartitionOffsetFetchRequest<'a> {
136 pub fn new(topic: &'a str) -> TopicPartitionOffsetFetchRequest<'a> {
137 TopicPartitionOffsetFetchRequest {
138 topic,
139 partitions: vec![],
140 }
141 }
142
143 pub fn add(&mut self, partition: i32) {
144 self.partitions
145 .push(PartitionOffsetFetchRequest::new(partition));
146 }
147}
148
149impl PartitionOffsetFetchRequest {
150 pub fn new(partition: i32) -> PartitionOffsetFetchRequest {
151 PartitionOffsetFetchRequest { partition }
152 }
153}
154
155impl<'a, 'b, 'c> ToByte for OffsetFetchRequest<'a, 'b, 'c> {
156 fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> {
157 try_multi!(
158 self.header.encode(buffer),
159 self.group.encode(buffer),
160 self.topic_partitions.encode(buffer)
161 )
162 }
163}
164
165impl<'a> ToByte for TopicPartitionOffsetFetchRequest<'a> {
166 fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> {
167 try_multi!(self.topic.encode(buffer), self.partitions.encode(buffer))
168 }
169}
170
171impl ToByte for PartitionOffsetFetchRequest {
172 fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> {
173 self.partition.encode(buffer)
174 }
175}
176
177#[derive(Default, Debug)]
180pub struct OffsetFetchResponse {
181 pub header: HeaderResponse,
182 pub topic_partitions: Vec<TopicPartitionOffsetFetchResponse>,
183}
184
185#[derive(Default, Debug)]
186pub struct TopicPartitionOffsetFetchResponse {
187 pub topic: String,
188 pub partitions: Vec<PartitionOffsetFetchResponse>,
189}
190
191#[derive(Default, Debug)]
192pub struct PartitionOffsetFetchResponse {
193 pub partition: i32,
194 pub offset: i64,
195 pub metadata: String,
196 pub error: i16,
197}
198
199impl PartitionOffsetFetchResponse {
200 pub fn get_offsets(&self) -> Result<PartitionOffset> {
201 match Error::from_protocol(self.error) {
202 Some(Error::Kafka(KafkaCode::UnknownTopicOrPartition)) => {
203 Ok(PartitionOffset {
207 partition: self.partition,
208 offset: -1,
209 })
210 }
211 Some(e) => Err(e),
212 None => Ok(PartitionOffset {
213 partition: self.partition,
214 offset: self.offset,
215 }),
216 }
217 }
218}
219
220impl FromByte for OffsetFetchResponse {
221 type R = OffsetFetchResponse;
222
223 fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
224 try_multi!(
225 self.header.decode(buffer),
226 self.topic_partitions.decode(buffer)
227 )
228 }
229}
230
231impl FromByte for TopicPartitionOffsetFetchResponse {
232 type R = TopicPartitionOffsetFetchResponse;
233
234 fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
235 try_multi!(self.topic.decode(buffer), self.partitions.decode(buffer))
236 }
237}
238
239impl FromByte for PartitionOffsetFetchResponse {
240 type R = PartitionOffsetFetchResponse;
241
242 fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
243 try_multi!(
244 self.partition.decode(buffer),
245 self.offset.decode(buffer),
246 self.metadata.decode(buffer),
247 self.error.decode(buffer)
248 )
249 }
250}
251
252#[derive(Debug, Copy, Clone, PartialEq, Eq)]
255pub enum OffsetCommitVersion {
256 V0 = 0,
258 V1 = 1,
261 V2 = 2,
264}
265
266impl OffsetCommitVersion {
267 fn from_protocol(n: i16) -> OffsetCommitVersion {
268 match n {
269 0 => OffsetCommitVersion::V0,
270 1 => OffsetCommitVersion::V1,
271 2 => OffsetCommitVersion::V2,
272 _ => panic!("Unknown offset commit version code: {}", n),
273 }
274 }
275}
276
277#[derive(Debug)]
278pub struct OffsetCommitRequest<'a, 'b> {
279 pub header: HeaderRequest<'a>,
280 pub group: &'b str,
281 pub topic_partitions: Vec<TopicPartitionOffsetCommitRequest<'b>>,
282}
283
284#[derive(Debug)]
285pub struct TopicPartitionOffsetCommitRequest<'a> {
286 pub topic: &'a str,
287 pub partitions: Vec<PartitionOffsetCommitRequest<'a>>,
288}
289
290#[derive(Debug)]
291pub struct PartitionOffsetCommitRequest<'a> {
292 pub partition: i32,
293 pub offset: i64,
294 pub metadata: &'a str,
295}
296
297impl<'a, 'b> OffsetCommitRequest<'a, 'b> {
298 pub fn new(
299 group: &'b str,
300 version: OffsetCommitVersion,
301 correlation_id: i32,
302 client_id: &'a str,
303 ) -> OffsetCommitRequest<'a, 'b> {
304 OffsetCommitRequest {
305 header: HeaderRequest::new(
306 API_KEY_OFFSET_COMMIT,
307 version as i16,
308 correlation_id,
309 client_id,
310 ),
311 group,
312 topic_partitions: vec![],
313 }
314 }
315
316 pub fn add(&mut self, topic: &'b str, partition: i32, offset: i64, metadata: &'b str) {
317 for tp in &mut self.topic_partitions {
318 if tp.topic == topic {
319 tp.add(partition, offset, metadata);
320 return;
321 }
322 }
323 let mut tp = TopicPartitionOffsetCommitRequest::new(topic);
324 tp.add(partition, offset, metadata);
325 self.topic_partitions.push(tp);
326 }
327}
328
329impl<'a> TopicPartitionOffsetCommitRequest<'a> {
330 pub fn new(topic: &'a str) -> TopicPartitionOffsetCommitRequest<'a> {
331 TopicPartitionOffsetCommitRequest {
332 topic,
333 partitions: vec![],
334 }
335 }
336
337 pub fn add(&mut self, partition: i32, offset: i64, metadata: &'a str) {
338 self.partitions.push(PartitionOffsetCommitRequest::new(
339 partition, offset, metadata,
340 ))
341 }
342}
343
344impl<'a> PartitionOffsetCommitRequest<'a> {
345 pub fn new(partition: i32, offset: i64, metadata: &'a str) -> PartitionOffsetCommitRequest<'a> {
346 PartitionOffsetCommitRequest {
347 partition,
348 offset,
349 metadata,
350 }
351 }
352}
353
354impl<'a, 'b> ToByte for OffsetCommitRequest<'a, 'b> {
355 fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> {
356 let v = OffsetCommitVersion::from_protocol(self.header.api_version);
357 self.header.encode(buffer)?;
358 self.group.encode(buffer)?;
359 match v {
360 OffsetCommitVersion::V1 => {
361 (-1i32).encode(buffer)?;
362 "".encode(buffer)?;
363 }
364 OffsetCommitVersion::V2 => {
365 (-1i32).encode(buffer)?;
366 "".encode(buffer)?;
367 (-1i64).encode(buffer)?;
368 }
369 _ => {
370 }
372 }
373 codecs::encode_as_array(buffer, &self.topic_partitions, |buffer, tp| {
374 try_multi!(
375 tp.topic.encode(buffer),
376 codecs::encode_as_array(buffer, &tp.partitions, |buffer, p| {
377 p.partition.encode(buffer)?;
378 p.offset.encode(buffer)?;
379 if v == OffsetCommitVersion::V1 {
380 (-1i64).encode(buffer)?;
381 }
382 p.metadata.encode(buffer)
383 })
384 )
385 })
386 }
387}
388
389#[derive(Default, Debug)]
392pub struct OffsetCommitResponse {
393 pub header: HeaderResponse,
394 pub topic_partitions: Vec<TopicPartitionOffsetCommitResponse>,
395}
396
397impl FromByte for OffsetCommitResponse {
398 type R = OffsetCommitResponse;
399
400 fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
401 try_multi!(
402 self.header.decode(buffer),
403 self.topic_partitions.decode(buffer)
404 )
405 }
406}
407
408#[derive(Default, Debug)]
409pub struct TopicPartitionOffsetCommitResponse {
410 pub topic: String,
411 pub partitions: Vec<PartitionOffsetCommitResponse>,
412}
413
414impl FromByte for TopicPartitionOffsetCommitResponse {
415 type R = TopicPartitionOffsetCommitResponse;
416
417 fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
418 try_multi!(self.topic.decode(buffer), self.partitions.decode(buffer))
419 }
420}
421
422#[derive(Default, Debug)]
423pub struct PartitionOffsetCommitResponse {
424 pub partition: i32,
425 pub error: i16,
426}
427
428impl PartitionOffsetCommitResponse {
429 pub fn to_error(&self) -> Option<error::KafkaCode> {
430 error::KafkaCode::from_protocol(self.error)
431 }
432}
433
434impl FromByte for PartitionOffsetCommitResponse {
435 type R = PartitionOffsetCommitResponse;
436
437 fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
438 try_multi!(self.partition.decode(buffer), self.error.decode(buffer))
439 }
440}