kafka/protocol/
offset.rs

1use std::io::{Read, Write};
2
3use super::{HeaderRequest, HeaderResponse};
4use super::{API_KEY_OFFSET, API_VERSION};
5use crate::codecs::{FromByte, ToByte};
6use crate::error::{KafkaCode, Result};
7use crate::utils::PartitionOffset;
8use std;
9
10#[derive(Debug)]
11pub struct OffsetRequest<'a> {
12    pub header: HeaderRequest<'a>,
13    pub replica: i32,
14    pub topic_partitions: Vec<TopicPartitionOffsetRequest<'a>>,
15}
16
17#[derive(Default, Debug)]
18pub struct TopicPartitionOffsetRequest<'a> {
19    pub topic: &'a str,
20    pub partitions: Vec<PartitionOffsetRequest>,
21}
22
23#[derive(Default, Debug)]
24pub struct PartitionOffsetRequest {
25    pub partition: i32,
26    pub max_offsets: i32,
27    pub time: i64,
28}
29
30impl<'a> OffsetRequest<'a> {
31    pub fn new(correlation_id: i32, client_id: &'a str) -> OffsetRequest<'a> {
32        OffsetRequest {
33            header: HeaderRequest::new(API_KEY_OFFSET, API_VERSION, correlation_id, client_id),
34            replica: -1,
35            topic_partitions: vec![],
36        }
37    }
38
39    pub fn add(&mut self, topic: &'a str, partition: i32, time: i64) {
40        for tp in &mut self.topic_partitions {
41            if tp.topic == topic {
42                tp.add(partition, time);
43                return;
44            }
45        }
46        let mut tp = TopicPartitionOffsetRequest::new(topic);
47        tp.add(partition, time);
48        self.topic_partitions.push(tp);
49    }
50}
51
52impl<'a> TopicPartitionOffsetRequest<'a> {
53    pub fn new(topic: &'a str) -> TopicPartitionOffsetRequest<'a> {
54        TopicPartitionOffsetRequest {
55            topic,
56            partitions: vec![],
57        }
58    }
59
60    pub fn add(&mut self, partition: i32, time: i64) {
61        self.partitions
62            .push(PartitionOffsetRequest::new(partition, time));
63    }
64}
65
66impl PartitionOffsetRequest {
67    pub fn new(partition: i32, time: i64) -> PartitionOffsetRequest {
68        PartitionOffsetRequest {
69            partition,
70            max_offsets: 1,
71            time,
72        }
73    }
74}
75
76impl<'a> ToByte for OffsetRequest<'a> {
77    fn encode<T: Write>(&self, buffer: &mut T) -> Result<()> {
78        try_multi!(
79            self.header.encode(buffer),
80            self.replica.encode(buffer),
81            self.topic_partitions.encode(buffer)
82        )
83    }
84}
85
86impl<'a> ToByte for TopicPartitionOffsetRequest<'a> {
87    fn encode<T: Write>(&self, buffer: &mut T) -> Result<()> {
88        try_multi!(self.topic.encode(buffer), self.partitions.encode(buffer))
89    }
90}
91
92impl ToByte for PartitionOffsetRequest {
93    fn encode<T: Write>(&self, buffer: &mut T) -> Result<()> {
94        try_multi!(
95            self.partition.encode(buffer),
96            self.time.encode(buffer),
97            self.max_offsets.encode(buffer)
98        )
99    }
100}
101
102// --------------------------------------------------------------------
103
104#[derive(Default, Debug)]
105pub struct OffsetResponse {
106    pub header: HeaderResponse,
107    pub topic_partitions: Vec<TopicPartitionOffsetResponse>,
108}
109
110#[derive(Default, Debug)]
111pub struct TopicPartitionOffsetResponse {
112    pub topic: String,
113    pub partitions: Vec<PartitionOffsetResponse>,
114}
115
116#[derive(Default, Debug)]
117pub struct PartitionOffsetResponse {
118    pub partition: i32,
119    pub error: i16,
120    pub offset: Vec<i64>,
121}
122
123impl PartitionOffsetResponse {
124    pub fn to_offset(&self) -> std::result::Result<PartitionOffset, KafkaCode> {
125        match KafkaCode::from_protocol(self.error) {
126            Some(code) => Err(code),
127            None => {
128                let offset = match self.offset.first() {
129                    Some(offs) => *offs,
130                    None => -1,
131                };
132
133                Ok(PartitionOffset {
134                    partition: self.partition,
135                    offset,
136                })
137            }
138        }
139    }
140}
141
142impl FromByte for OffsetResponse {
143    type R = OffsetResponse;
144
145    #[allow(unused_must_use)]
146    fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
147        try_multi!(
148            self.header.decode(buffer),
149            self.topic_partitions.decode(buffer)
150        )
151    }
152}
153
154impl FromByte for TopicPartitionOffsetResponse {
155    type R = TopicPartitionOffsetResponse;
156
157    #[allow(unused_must_use)]
158    fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
159        try_multi!(self.topic.decode(buffer), self.partitions.decode(buffer))
160    }
161}
162
163impl FromByte for PartitionOffsetResponse {
164    type R = PartitionOffsetResponse;
165
166    #[allow(unused_must_use)]
167    fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
168        try_multi!(
169            self.partition.decode(buffer),
170            self.error.decode(buffer),
171            self.offset.decode(buffer)
172        )
173    }
174}