1use std::io::{Read, Write};
2
3use crate::codecs::{FromByte, ToByte};
4#[cfg(feature = "gzip")]
5use crate::compression::gzip;
6#[cfg(feature = "snappy")]
7use crate::compression::snappy;
8use crate::compression::Compression;
9
10use crate::error::{KafkaCode, Result};
11
12use super::to_crc;
13use super::{HeaderRequest, HeaderResponse};
14use super::{API_KEY_PRODUCE, API_VERSION};
15use crate::producer::{ProduceConfirm, ProducePartitionConfirm};
16
17const MESSAGE_MAGIC_BYTE: i8 = 0;
19
20#[derive(Debug)]
21pub struct ProduceRequest<'a, 'b> {
22 pub header: HeaderRequest<'a>,
23 pub required_acks: i16,
24 pub timeout: i32,
25 pub topic_partitions: Vec<TopicPartitionProduceRequest<'b>>,
26 pub compression: Compression,
27}
28
29#[derive(Debug)]
30pub struct TopicPartitionProduceRequest<'a> {
31 pub topic: &'a str,
32 pub partitions: Vec<PartitionProduceRequest<'a>>,
33 pub compression: Compression,
34}
35
36#[derive(Debug)]
37pub struct PartitionProduceRequest<'a> {
38 pub partition: i32,
39 pub messages: Vec<MessageProduceRequest<'a>>,
40}
41
42#[derive(Debug)]
43pub struct MessageProduceRequest<'a> {
44 key: Option<&'a [u8]>,
45 value: Option<&'a [u8]>,
46}
47
48impl<'a, 'b> ProduceRequest<'a, 'b> {
49 pub fn new(
50 required_acks: i16,
51 timeout: i32,
52 correlation_id: i32,
53 client_id: &'a str,
54 compression: Compression,
55 ) -> ProduceRequest<'a, 'b> {
56 ProduceRequest {
57 header: HeaderRequest::new(API_KEY_PRODUCE, API_VERSION, correlation_id, client_id),
58 required_acks,
59 timeout,
60 topic_partitions: vec![],
61 compression,
62 }
63 }
64
65 pub fn add(
66 &mut self,
67 topic: &'b str,
68 partition: i32,
69 key: Option<&'b [u8]>,
70 value: Option<&'b [u8]>,
71 ) {
72 for tp in &mut self.topic_partitions {
73 if tp.topic == topic {
74 tp.add(partition, key, value);
75 return;
76 }
77 }
78 let mut tp = TopicPartitionProduceRequest::new(topic, self.compression);
79 tp.add(partition, key, value);
80 self.topic_partitions.push(tp);
81 }
82}
83
84impl<'a> TopicPartitionProduceRequest<'a> {
85 pub fn new(topic: &'a str, compression: Compression) -> TopicPartitionProduceRequest<'a> {
86 TopicPartitionProduceRequest {
87 topic,
88 partitions: vec![],
89 compression,
90 }
91 }
92
93 pub fn add(&mut self, partition: i32, key: Option<&'a [u8]>, value: Option<&'a [u8]>) {
94 for pp in &mut self.partitions {
95 if pp.partition == partition {
96 pp.add(key, value);
97 return;
98 }
99 }
100 self.partitions
101 .push(PartitionProduceRequest::new(partition, key, value));
102 }
103}
104
105impl<'a> PartitionProduceRequest<'a> {
106 pub fn new<'b>(
107 partition: i32,
108 key: Option<&'b [u8]>,
109 value: Option<&'b [u8]>,
110 ) -> PartitionProduceRequest<'b> {
111 let mut r = PartitionProduceRequest {
112 partition,
113 messages: Vec::new(),
114 };
115 r.add(key, value);
116 r
117 }
118
119 pub fn add(&mut self, key: Option<&'a [u8]>, value: Option<&'a [u8]>) {
120 self.messages.push(MessageProduceRequest::new(key, value));
121 }
122}
123
124impl<'a, 'b> ToByte for ProduceRequest<'a, 'b> {
125 fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> {
126 try_multi!(
127 self.header.encode(buffer),
128 self.required_acks.encode(buffer),
129 self.timeout.encode(buffer),
130 self.topic_partitions.encode(buffer)
131 )
132 }
133}
134
135impl<'a> ToByte for TopicPartitionProduceRequest<'a> {
136 fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> {
138 self.topic.encode(buffer)?;
139 (self.partitions.len() as i32).encode(buffer)?;
140 for e in &self.partitions {
141 e._encode(buffer, self.compression)?
142 }
143 Ok(())
144 }
145}
146
147impl<'a> PartitionProduceRequest<'a> {
148 fn _encode<W: Write>(&self, out: &mut W, compression: Compression) -> Result<()> {
153 self.partition.encode(out)?;
154
155 let mut buf = Vec::new();
157 for msg in &self.messages {
158 msg._encode_to_buf(&mut buf, MESSAGE_MAGIC_BYTE, 0)?;
159 }
160 match compression {
161 Compression::NONE => {
162 }
164 #[cfg(feature = "gzip")]
165 Compression::GZIP => {
166 let cdata = gzip::compress(&buf)?;
167 render_compressed(&mut buf, &cdata, compression)?;
168 }
169 #[cfg(feature = "snappy")]
170 Compression::SNAPPY => {
171 let cdata = snappy::compress(&buf)?;
172 render_compressed(&mut buf, &cdata, compression)?;
173 }
174 }
175 buf.encode(out)
176 }
177}
178
179#[cfg(any(feature = "snappy", feature = "gzip"))]
182fn render_compressed(out: &mut Vec<u8>, cdata: &[u8], compression: Compression) -> Result<()> {
183 out.clear();
184 let cmsg = MessageProduceRequest::new(None, Some(cdata));
185 cmsg._encode_to_buf(out, MESSAGE_MAGIC_BYTE, compression as i8)
186}
187
188impl<'a> MessageProduceRequest<'a> {
189 fn new<'b>(key: Option<&'b [u8]>, value: Option<&'b [u8]>) -> MessageProduceRequest<'b> {
190 MessageProduceRequest { key, value }
191 }
192
193 fn _encode_to_buf(&self, buffer: &mut Vec<u8>, magic: i8, attributes: i8) -> Result<()> {
206 (0i64).encode(buffer)?; let size_pos = buffer.len();
209 let mut size: i32 = 0;
210 size.encode(buffer)?; let crc_pos = buffer.len(); let mut crc: i32 = 0;
214 crc.encode(buffer)?; magic.encode(buffer)?;
216 attributes.encode(buffer)?;
217 self.key.encode(buffer)?;
218 self.value.encode(buffer)?;
219
220 crc = to_crc(&buffer[(crc_pos + 4)..]) as i32;
222 crc.encode(&mut &mut buffer[crc_pos..crc_pos + 4])?;
223
224 size = (buffer.len() - crc_pos) as i32;
226 size.encode(&mut &mut buffer[size_pos..size_pos + 4])?;
227
228 Ok(())
229 }
230}
231
232impl<'a> ToByte for Option<&'a [u8]> {
233 fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> {
234 match *self {
235 Some(xs) => xs.encode(buffer),
236 None => (-1i32).encode(buffer),
237 }
238 }
239}
240
241#[derive(Default, Debug, Clone)]
244pub struct ProduceResponse {
245 pub header: HeaderResponse,
246 pub topic_partitions: Vec<TopicPartitionProduceResponse>,
247}
248
249#[derive(Default, Debug, Clone)]
250pub struct TopicPartitionProduceResponse {
251 pub topic: String,
252 pub partitions: Vec<PartitionProduceResponse>,
253}
254
255#[derive(Default, Debug, Clone)]
256pub struct PartitionProduceResponse {
257 pub partition: i32,
258 pub error: i16,
259 pub offset: i64,
260}
261
262impl ProduceResponse {
263 pub fn get_response(self) -> Vec<ProduceConfirm> {
264 self.topic_partitions
265 .into_iter()
266 .map(|tp| tp.get_response())
267 .collect()
268 }
269}
270
271impl TopicPartitionProduceResponse {
272 pub fn get_response(self) -> ProduceConfirm {
273 let confirms = self.partitions.iter().map(|p| p.get_response()).collect();
274
275 ProduceConfirm {
276 topic: self.topic,
277 partition_confirms: confirms,
278 }
279 }
280}
281
282impl PartitionProduceResponse {
283 pub fn get_response(&self) -> ProducePartitionConfirm {
284 ProducePartitionConfirm {
285 partition: self.partition,
286 offset: match KafkaCode::from_protocol(self.error) {
287 None => Ok(self.offset),
288 Some(code) => Err(code),
289 },
290 }
291 }
292}
293
294impl FromByte for ProduceResponse {
295 type R = ProduceResponse;
296
297 #[allow(unused_must_use)]
298 fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
299 try_multi!(
300 self.header.decode(buffer),
301 self.topic_partitions.decode(buffer)
302 )
303 }
304}
305
306impl FromByte for TopicPartitionProduceResponse {
307 type R = TopicPartitionProduceResponse;
308
309 #[allow(unused_must_use)]
310 fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
311 try_multi!(self.topic.decode(buffer), self.partitions.decode(buffer))
312 }
313}
314
315impl FromByte for PartitionProduceResponse {
316 type R = PartitionProduceResponse;
317
318 #[allow(unused_must_use)]
319 fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
320 try_multi!(
321 self.partition.decode(buffer),
322 self.error.decode(buffer),
323 self.offset.decode(buffer)
324 )
325 }
326}