kafka/protocol/
produce.rs

1use std::io::{Read, Write};
2
3use crate::codecs::{FromByte, ToByte};
4#[cfg(feature = "gzip")]
5use crate::compression::gzip;
6#[cfg(feature = "snappy")]
7use crate::compression::snappy;
8use crate::compression::Compression;
9
10use crate::error::{KafkaCode, Result};
11
12use super::to_crc;
13use super::{HeaderRequest, HeaderResponse};
14use super::{API_KEY_PRODUCE, API_VERSION};
15use crate::producer::{ProduceConfirm, ProducePartitionConfirm};
16
17/// The magic byte (a.k.a version) we use for sent messages.
18const MESSAGE_MAGIC_BYTE: i8 = 0;
19
20#[derive(Debug)]
21pub struct ProduceRequest<'a, 'b> {
22    pub header: HeaderRequest<'a>,
23    pub required_acks: i16,
24    pub timeout: i32,
25    pub topic_partitions: Vec<TopicPartitionProduceRequest<'b>>,
26    pub compression: Compression,
27}
28
29#[derive(Debug)]
30pub struct TopicPartitionProduceRequest<'a> {
31    pub topic: &'a str,
32    pub partitions: Vec<PartitionProduceRequest<'a>>,
33    pub compression: Compression,
34}
35
36#[derive(Debug)]
37pub struct PartitionProduceRequest<'a> {
38    pub partition: i32,
39    pub messages: Vec<MessageProduceRequest<'a>>,
40}
41
42#[derive(Debug)]
43pub struct MessageProduceRequest<'a> {
44    key: Option<&'a [u8]>,
45    value: Option<&'a [u8]>,
46}
47
48impl<'a, 'b> ProduceRequest<'a, 'b> {
49    pub fn new(
50        required_acks: i16,
51        timeout: i32,
52        correlation_id: i32,
53        client_id: &'a str,
54        compression: Compression,
55    ) -> ProduceRequest<'a, 'b> {
56        ProduceRequest {
57            header: HeaderRequest::new(API_KEY_PRODUCE, API_VERSION, correlation_id, client_id),
58            required_acks,
59            timeout,
60            topic_partitions: vec![],
61            compression,
62        }
63    }
64
65    pub fn add(
66        &mut self,
67        topic: &'b str,
68        partition: i32,
69        key: Option<&'b [u8]>,
70        value: Option<&'b [u8]>,
71    ) {
72        for tp in &mut self.topic_partitions {
73            if tp.topic == topic {
74                tp.add(partition, key, value);
75                return;
76            }
77        }
78        let mut tp = TopicPartitionProduceRequest::new(topic, self.compression);
79        tp.add(partition, key, value);
80        self.topic_partitions.push(tp);
81    }
82}
83
84impl<'a> TopicPartitionProduceRequest<'a> {
85    pub fn new(topic: &'a str, compression: Compression) -> TopicPartitionProduceRequest<'a> {
86        TopicPartitionProduceRequest {
87            topic,
88            partitions: vec![],
89            compression,
90        }
91    }
92
93    pub fn add(&mut self, partition: i32, key: Option<&'a [u8]>, value: Option<&'a [u8]>) {
94        for pp in &mut self.partitions {
95            if pp.partition == partition {
96                pp.add(key, value);
97                return;
98            }
99        }
100        self.partitions
101            .push(PartitionProduceRequest::new(partition, key, value));
102    }
103}
104
105impl<'a> PartitionProduceRequest<'a> {
106    pub fn new<'b>(
107        partition: i32,
108        key: Option<&'b [u8]>,
109        value: Option<&'b [u8]>,
110    ) -> PartitionProduceRequest<'b> {
111        let mut r = PartitionProduceRequest {
112            partition,
113            messages: Vec::new(),
114        };
115        r.add(key, value);
116        r
117    }
118
119    pub fn add(&mut self, key: Option<&'a [u8]>, value: Option<&'a [u8]>) {
120        self.messages.push(MessageProduceRequest::new(key, value));
121    }
122}
123
124impl<'a, 'b> ToByte for ProduceRequest<'a, 'b> {
125    fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> {
126        try_multi!(
127            self.header.encode(buffer),
128            self.required_acks.encode(buffer),
129            self.timeout.encode(buffer),
130            self.topic_partitions.encode(buffer)
131        )
132    }
133}
134
135impl<'a> ToByte for TopicPartitionProduceRequest<'a> {
136    // render: TopicName [Partition MessageSetSize MessageSet]
137    fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> {
138        self.topic.encode(buffer)?;
139        (self.partitions.len() as i32).encode(buffer)?;
140        for e in &self.partitions {
141            e._encode(buffer, self.compression)?
142        }
143        Ok(())
144    }
145}
146
147impl<'a> PartitionProduceRequest<'a> {
148    // render: Partition MessageSetSize MessageSet
149    //
150    // MessetSet => [Offset MessageSize Message]
151    // MessageSets are not preceded by an int32 like other array elements in the protocol.
152    fn _encode<W: Write>(&self, out: &mut W, compression: Compression) -> Result<()> {
153        self.partition.encode(out)?;
154
155        // ~ render the whole MessageSet first to a temporary buffer
156        let mut buf = Vec::new();
157        for msg in &self.messages {
158            msg._encode_to_buf(&mut buf, MESSAGE_MAGIC_BYTE, 0)?;
159        }
160        match compression {
161            Compression::NONE => {
162                // ~ nothing to do
163            }
164            #[cfg(feature = "gzip")]
165            Compression::GZIP => {
166                let cdata = gzip::compress(&buf)?;
167                render_compressed(&mut buf, &cdata, compression)?;
168            }
169            #[cfg(feature = "snappy")]
170            Compression::SNAPPY => {
171                let cdata = snappy::compress(&buf)?;
172                render_compressed(&mut buf, &cdata, compression)?;
173            }
174        }
175        buf.encode(out)
176    }
177}
178
179// ~ A helper method to render `cdata` into `out` as a compressed message.
180// ~ `out` is first cleared and then populated with the rendered message.
181#[cfg(any(feature = "snappy", feature = "gzip"))]
182fn render_compressed(out: &mut Vec<u8>, cdata: &[u8], compression: Compression) -> Result<()> {
183    out.clear();
184    let cmsg = MessageProduceRequest::new(None, Some(cdata));
185    cmsg._encode_to_buf(out, MESSAGE_MAGIC_BYTE, compression as i8)
186}
187
188impl<'a> MessageProduceRequest<'a> {
189    fn new<'b>(key: Option<&'b [u8]>, value: Option<&'b [u8]>) -> MessageProduceRequest<'b> {
190        MessageProduceRequest { key, value }
191    }
192
193    // render a single message as: Offset MessageSize Message
194    //
195    // Offset => int64 (always encoded as zero here)
196    // MessageSize => int32
197    // Message => Crc MagicByte Attributes Key Value
198    // Crc => int32
199    // MagicByte => int8
200    // Attributes => int8
201    // Key => bytes
202    // Value => bytes
203    //
204    // note: the rendered data corresponds to a single MessageSet in the kafka protocol
205    fn _encode_to_buf(&self, buffer: &mut Vec<u8>, magic: i8, attributes: i8) -> Result<()> {
206        (0i64).encode(buffer)?; // offset in the response request can be anything
207
208        let size_pos = buffer.len();
209        let mut size: i32 = 0;
210        size.encode(buffer)?; // reserve space for the size to be computed later
211
212        let crc_pos = buffer.len(); // remember the position where to update the crc later
213        let mut crc: i32 = 0;
214        crc.encode(buffer)?; // reserve space for the crc to be computed later
215        magic.encode(buffer)?;
216        attributes.encode(buffer)?;
217        self.key.encode(buffer)?;
218        self.value.encode(buffer)?;
219
220        // compute the crc and store it back in the reserved space
221        crc = to_crc(&buffer[(crc_pos + 4)..]) as i32;
222        crc.encode(&mut &mut buffer[crc_pos..crc_pos + 4])?;
223
224        // compute the size and store it back in the reserved space
225        size = (buffer.len() - crc_pos) as i32;
226        size.encode(&mut &mut buffer[size_pos..size_pos + 4])?;
227
228        Ok(())
229    }
230}
231
232impl<'a> ToByte for Option<&'a [u8]> {
233    fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> {
234        match *self {
235            Some(xs) => xs.encode(buffer),
236            None => (-1i32).encode(buffer),
237        }
238    }
239}
240
241// --------------------------------------------------------------------
242
243#[derive(Default, Debug, Clone)]
244pub struct ProduceResponse {
245    pub header: HeaderResponse,
246    pub topic_partitions: Vec<TopicPartitionProduceResponse>,
247}
248
249#[derive(Default, Debug, Clone)]
250pub struct TopicPartitionProduceResponse {
251    pub topic: String,
252    pub partitions: Vec<PartitionProduceResponse>,
253}
254
255#[derive(Default, Debug, Clone)]
256pub struct PartitionProduceResponse {
257    pub partition: i32,
258    pub error: i16,
259    pub offset: i64,
260}
261
262impl ProduceResponse {
263    pub fn get_response(self) -> Vec<ProduceConfirm> {
264        self.topic_partitions
265            .into_iter()
266            .map(|tp| tp.get_response())
267            .collect()
268    }
269}
270
271impl TopicPartitionProduceResponse {
272    pub fn get_response(self) -> ProduceConfirm {
273        let confirms = self.partitions.iter().map(|p| p.get_response()).collect();
274
275        ProduceConfirm {
276            topic: self.topic,
277            partition_confirms: confirms,
278        }
279    }
280}
281
282impl PartitionProduceResponse {
283    pub fn get_response(&self) -> ProducePartitionConfirm {
284        ProducePartitionConfirm {
285            partition: self.partition,
286            offset: match KafkaCode::from_protocol(self.error) {
287                None => Ok(self.offset),
288                Some(code) => Err(code),
289            },
290        }
291    }
292}
293
294impl FromByte for ProduceResponse {
295    type R = ProduceResponse;
296
297    #[allow(unused_must_use)]
298    fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
299        try_multi!(
300            self.header.decode(buffer),
301            self.topic_partitions.decode(buffer)
302        )
303    }
304}
305
306impl FromByte for TopicPartitionProduceResponse {
307    type R = TopicPartitionProduceResponse;
308
309    #[allow(unused_must_use)]
310    fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
311        try_multi!(self.topic.decode(buffer), self.partitions.decode(buffer))
312    }
313}
314
315impl FromByte for PartitionProduceResponse {
316    type R = PartitionProduceResponse;
317
318    #[allow(unused_must_use)]
319    fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
320        try_multi!(
321            self.partition.decode(buffer),
322            self.error.decode(buffer),
323            self.offset.decode(buffer)
324        )
325    }
326}