1use std::io::{Read, Write};
2
3use super::{HeaderRequest, HeaderResponse};
4use super::{API_KEY_OFFSET, API_VERSION};
5use crate::codecs::{FromByte, ToByte};
6use crate::error::{KafkaCode, Result};
7use crate::utils::PartitionOffset;
8use std;
9
10#[derive(Debug)]
11pub struct OffsetRequest<'a> {
12 pub header: HeaderRequest<'a>,
13 pub replica: i32,
14 pub topic_partitions: Vec<TopicPartitionOffsetRequest<'a>>,
15}
16
17#[derive(Default, Debug)]
18pub struct TopicPartitionOffsetRequest<'a> {
19 pub topic: &'a str,
20 pub partitions: Vec<PartitionOffsetRequest>,
21}
22
23#[derive(Default, Debug)]
24pub struct PartitionOffsetRequest {
25 pub partition: i32,
26 pub max_offsets: i32,
27 pub time: i64,
28}
29
30impl<'a> OffsetRequest<'a> {
31 pub fn new(correlation_id: i32, client_id: &'a str) -> OffsetRequest<'a> {
32 OffsetRequest {
33 header: HeaderRequest::new(API_KEY_OFFSET, API_VERSION, correlation_id, client_id),
34 replica: -1,
35 topic_partitions: vec![],
36 }
37 }
38
39 pub fn add(&mut self, topic: &'a str, partition: i32, time: i64) {
40 for tp in &mut self.topic_partitions {
41 if tp.topic == topic {
42 tp.add(partition, time);
43 return;
44 }
45 }
46 let mut tp = TopicPartitionOffsetRequest::new(topic);
47 tp.add(partition, time);
48 self.topic_partitions.push(tp);
49 }
50}
51
52impl<'a> TopicPartitionOffsetRequest<'a> {
53 pub fn new(topic: &'a str) -> TopicPartitionOffsetRequest<'a> {
54 TopicPartitionOffsetRequest {
55 topic,
56 partitions: vec![],
57 }
58 }
59
60 pub fn add(&mut self, partition: i32, time: i64) {
61 self.partitions
62 .push(PartitionOffsetRequest::new(partition, time));
63 }
64}
65
66impl PartitionOffsetRequest {
67 pub fn new(partition: i32, time: i64) -> PartitionOffsetRequest {
68 PartitionOffsetRequest {
69 partition,
70 max_offsets: 1,
71 time,
72 }
73 }
74}
75
76impl<'a> ToByte for OffsetRequest<'a> {
77 fn encode<T: Write>(&self, buffer: &mut T) -> Result<()> {
78 try_multi!(
79 self.header.encode(buffer),
80 self.replica.encode(buffer),
81 self.topic_partitions.encode(buffer)
82 )
83 }
84}
85
86impl<'a> ToByte for TopicPartitionOffsetRequest<'a> {
87 fn encode<T: Write>(&self, buffer: &mut T) -> Result<()> {
88 try_multi!(self.topic.encode(buffer), self.partitions.encode(buffer))
89 }
90}
91
92impl ToByte for PartitionOffsetRequest {
93 fn encode<T: Write>(&self, buffer: &mut T) -> Result<()> {
94 try_multi!(
95 self.partition.encode(buffer),
96 self.time.encode(buffer),
97 self.max_offsets.encode(buffer)
98 )
99 }
100}
101
102#[derive(Default, Debug)]
105pub struct OffsetResponse {
106 pub header: HeaderResponse,
107 pub topic_partitions: Vec<TopicPartitionOffsetResponse>,
108}
109
110#[derive(Default, Debug)]
111pub struct TopicPartitionOffsetResponse {
112 pub topic: String,
113 pub partitions: Vec<PartitionOffsetResponse>,
114}
115
116#[derive(Default, Debug)]
117pub struct PartitionOffsetResponse {
118 pub partition: i32,
119 pub error: i16,
120 pub offset: Vec<i64>,
121}
122
123impl PartitionOffsetResponse {
124 pub fn to_offset(&self) -> std::result::Result<PartitionOffset, KafkaCode> {
125 match KafkaCode::from_protocol(self.error) {
126 Some(code) => Err(code),
127 None => {
128 let offset = match self.offset.first() {
129 Some(offs) => *offs,
130 None => -1,
131 };
132
133 Ok(PartitionOffset {
134 partition: self.partition,
135 offset,
136 })
137 }
138 }
139 }
140}
141
142impl FromByte for OffsetResponse {
143 type R = OffsetResponse;
144
145 #[allow(unused_must_use)]
146 fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
147 try_multi!(
148 self.header.decode(buffer),
149 self.topic_partitions.decode(buffer)
150 )
151 }
152}
153
154impl FromByte for TopicPartitionOffsetResponse {
155 type R = TopicPartitionOffsetResponse;
156
157 #[allow(unused_must_use)]
158 fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
159 try_multi!(self.topic.decode(buffer), self.partitions.decode(buffer))
160 }
161}
162
163impl FromByte for PartitionOffsetResponse {
164 type R = PartitionOffsetResponse;
165
166 #[allow(unused_must_use)]
167 fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
168 try_multi!(
169 self.partition.decode(buffer),
170 self.error.decode(buffer),
171 self.offset.decode(buffer)
172 )
173 }
174}