kafka/consumer/mod.rs
1//! Kafka Consumer - A higher-level API for consuming kafka topics.
2//!
3//! A consumer for Kafka topics on behalf of a specified group
4//! providing help in offset management. The consumer requires at
5//! least one topic for consumption and allows consuming multiple
6//! topics at the same time. Further, clients can restrict the
7//! consumer to only specific topic partitions as demonstrated in the
8//! following example.
9//!
10//! # Example
11//! ```no_run
12//! use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
13//!
14//! let mut consumer =
15//! Consumer::from_hosts(vec!("localhost:9092".to_owned()))
16//! .with_topic_partitions("my-topic".to_owned(), &[0, 1])
17//! .with_fallback_offset(FetchOffset::Earliest)
18//! .with_group("my-group".to_owned())
19//! .with_offset_storage(Some(GroupOffsetStorage::Kafka))
20//! .create()
21//! .unwrap();
22//! loop {
23//! for ms in consumer.poll().unwrap().iter() {
24//! for m in ms.messages() {
25//! println!("{:?}", m);
26//! }
27//! consumer.consume_messageset(ms);
28//! }
29//! consumer.commit_consumed().unwrap();
30//! }
31//! ```
32//!
33//! Please refer to the documentation of the individual "with" methods
34//! used to set up the consumer. These contain further information or
35//! links to such.
36//!
37//! A call to `.poll()` on the consumer will ask for the next
38//! available "chunk of data" for the client code to process. The
39//! returned data are `MessageSet`s. There is at most one for each partition
40//! of the consumed topics. Individual messages are embedded in the
41//! retrieved messagesets and can be processed using the `messages()`
42//! iterator. Due to this embedding, an individual message's lifetime
43//! is bound to the `MessageSet` it is part of. Typically, client
44//! code accesses the raw data/bytes, parses it into custom data types,
45//! and passes that along for further processing within the application.
46//! Although inconvenient, this helps in reducing the number of
47//! allocations within the pipeline of processing incoming messages.
48//!
49//! If the consumer is configured for a non-empty group, it helps in
50//! keeping track of already consumed messages by maintaining a map of
51//! the consumed offsets. Messages can be told "consumed" either
52//! through `consume_message` or `consume_messages` methods. Once
53//! these consumed messages are committed to Kafka using
54//! `commit_consumed`, the consumer will start fetching messages from
55//! here even after restart. Since committing is a certain overhead,
56//! it is up to the client to decide the frequency of the commits.
57//! The consumer will *not* commit any messages to Kafka
58//! automatically.
59//!
60//! The configuration of a group is optional. If the consumer has no
61//! group configured, it will behave as if it had one, only that
62//! committing consumed message offsets resolves into a void operation.
63
64use std::collections::hash_map::{Entry, HashMap};
65use std::slice;
66
67use crate::client::fetch;
68use crate::client::{CommitOffset, FetchPartition, KafkaClient};
69use crate::error::{Error, KafkaCode, Result};
70
71// public re-exports
72pub use self::builder::Builder;
73pub use crate::client::fetch::Message;
74pub use crate::client::FetchOffset;
75pub use crate::client::GroupOffsetStorage;
76
77mod assignment;
78mod builder;
79mod config;
80mod state;
81
82/// The default value for `Builder::with_retry_max_bytes_limit`.
83pub const DEFAULT_RETRY_MAX_BYTES_LIMIT: i32 = 0;
84
85/// The default value for `Builder::with_fallback_offset`.
86pub const DEFAULT_FALLBACK_OFFSET: FetchOffset = FetchOffset::Latest;
87
88/// The Kafka Consumer
89///
90/// See module level documentation.
91#[derive(Debug)]
92pub struct Consumer {
93 client: KafkaClient,
94 state: state::State,
95 config: config::Config,
96}
97
98// XXX 1) Allow returning to a previous offset (aka seeking)
99// XXX 2) Issue IO in a separate (background) thread and pre-fetch messagesets
100
101impl Consumer {
102 /// Starts building a consumer using the given kafka client.
103 pub fn from_client(client: KafkaClient) -> Builder {
104 builder::new(Some(client), Vec::new())
105 }
106
107 /// Starts building a consumer bootstraping internally a new kafka
108 /// client from the given kafka hosts.
109 pub fn from_hosts(hosts: Vec<String>) -> Builder {
110 builder::new(None, hosts)
111 }
112
113 /// Borrows the underlying kafka client.
114 pub fn client(&self) -> &KafkaClient {
115 &self.client
116 }
117
118 /// Borrows the underlying kafka client as mut.
119 pub fn client_mut(&mut self) -> &mut KafkaClient {
120 &mut self.client
121 }
122
123 /// Destroys this consumer returning back the underlying kafka client.
124 pub fn into_client(self) -> KafkaClient {
125 self.client
126 }
127
128 /// Retrieves the topic partitions being currently consumed by
129 /// this consumer.
130 pub fn subscriptions(&self) -> HashMap<String, Vec<i32>> {
131 // ~ current subscriptions are reflected by
132 // `self.state.fetch_offsets` see `self.fetch_messages()`.
133 // ~ the number of topics subscribed can be estimated from the
134 // user specified assignments stored in `self.state.assignments`.
135 let mut h: HashMap<String, Vec<i32>> =
136 HashMap::with_capacity(self.state.assignments.as_slice().len());
137 // ~ expand subscriptions to (topic-name, partition id)
138 let tps = self
139 .state
140 .fetch_offsets
141 .keys()
142 .map(|tp| (self.state.topic_name(tp.topic_ref), tp.partition));
143 // ~ group by topic-name
144 for tp in tps {
145 // ~ allocate topic-name only once per topic
146 if let Some(ps) = h.get_mut(tp.0) {
147 ps.push(tp.1);
148 continue;
149 }
150 h.insert(tp.0.to_owned(), vec![tp.1]);
151 }
152 h
153 }
154
155 /// Polls for the next available message data.
156 pub fn poll(&mut self) -> Result<MessageSets> {
157 let (n, resps) = self.fetch_messages();
158 self.process_fetch_responses(n, resps?)
159 }
160
161 /// Determines whether this consumer is set up to consume only a
162 /// single topic partition.
163 fn single_partition_consumer(&self) -> bool {
164 self.state.fetch_offsets.len() == 1
165 }
166
167 /// Retrieves the group on which behalf this consumer is acting.
168 /// The empty group name specifies a group-less consumer.
169 pub fn group(&self) -> &str {
170 &self.config.group
171 }
172
173 // ~ returns (number partitions queried, fecth responses)
174 fn fetch_messages(&mut self) -> (u32, Result<Vec<fetch::Response>>) {
175 // ~ if there's a retry partition ... fetch messages just for
176 // that one. Otherwise try to fetch messages for all assigned
177 // partitions.
178 match self.state.retry_partitions.pop_front() {
179 Some(tp) => {
180 let s = match self.state.fetch_offsets.get(&tp) {
181 Some(fstate) => fstate,
182 None => return (1, Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition))),
183 };
184 let topic = self.state.topic_name(tp.topic_ref);
185 debug!(
186 "fetching retry messages: (fetch-offset: {{\"{}:{}\": {:?}}})",
187 topic, tp.partition, s
188 );
189 (
190 1,
191 self.client.fetch_messages_for_partition(
192 &FetchPartition::new(topic, tp.partition, s.offset)
193 .with_max_bytes(s.max_bytes),
194 ),
195 )
196 }
197 None => {
198 let client = &mut self.client;
199 let state = &self.state;
200 debug!(
201 "fetching messages: (fetch-offsets: {:?})",
202 state.fetch_offsets_debug()
203 );
204 let reqs = state.fetch_offsets.iter().map(|(tp, s)| {
205 let topic = state.topic_name(tp.topic_ref);
206 FetchPartition::new(topic, tp.partition, s.offset).with_max_bytes(s.max_bytes)
207 });
208 (
209 state.fetch_offsets.len() as u32,
210 client.fetch_messages(reqs),
211 )
212 }
213 }
214 }
215
216 // ~ post process a data retrieved through fetch_messages before
217 // handing them out to client code
218 // - update the fetch state for the next fetch cycle
219 // ~ num_partitions_queried: the original number of partitions requested/queried for
220 // the responses
221 // ~ resps: the responses to post process
222 fn process_fetch_responses(
223 &mut self,
224 num_partitions_queried: u32,
225 resps: Vec<fetch::Response>,
226 ) -> Result<MessageSets> {
227 let single_partition_consumer = self.single_partition_consumer();
228 let mut empty = true;
229 let retry_partitions = &mut self.state.retry_partitions;
230
231 for resp in &resps {
232 for t in resp.topics() {
233 let topic_ref = self
234 .state
235 .assignments
236 .topic_ref(t.topic())
237 .expect("unknown topic in response");
238
239 for p in t.partitions() {
240 let tp = state::TopicPartition {
241 topic_ref,
242 partition: p.partition(),
243 };
244
245 // ~ for now, as soon as a partition has an error
246 // we fail to prevent client programs from not
247 // noticing. however, in future we don't need to
248 // fail immediately, we can try to recover from
249 // certain errors and retry the fetch operation
250 // transparently for the caller.
251
252 // XXX need to prevent updating fetch_offsets in case we're gonna fail here
253 let data = p.data()?;
254
255 let mut fetch_state = self
256 .state
257 .fetch_offsets
258 .get_mut(&tp)
259 .expect("non-requested partition");
260 // ~ book keeping
261 if let Some(last_msg) = data.messages().last() {
262 fetch_state.offset = last_msg.offset + 1;
263 empty = false;
264
265 // ~ reset the max_bytes again to its usual
266 // value if we had a retry request and finally
267 // got some data
268 if fetch_state.max_bytes != self.client.fetch_max_bytes_per_partition() {
269 let prev_max_bytes = fetch_state.max_bytes;
270 fetch_state.max_bytes = self.client.fetch_max_bytes_per_partition();
271 debug!(
272 "reset max_bytes for {}:{} from {} to {}",
273 t.topic(),
274 tp.partition,
275 prev_max_bytes,
276 fetch_state.max_bytes
277 );
278 }
279 } else {
280 debug!(
281 "no data received for {}:{} (max_bytes: {} / fetch_offset: {} / \
282 highwatermark_offset: {})",
283 t.topic(),
284 tp.partition,
285 fetch_state.max_bytes,
286 fetch_state.offset,
287 data.highwatermark_offset()
288 );
289
290 // ~ when a partition is empty but has a
291 // highwatermark-offset equal to or greater
292 // than the one we tried to fetch ... we'll
293 // try to increase the max-fetch-size in the
294 // next fetch request
295 if fetch_state.offset < data.highwatermark_offset() {
296 if fetch_state.max_bytes < self.config.retry_max_bytes_limit {
297 // ~ try to double the max_bytes
298 let prev_max_bytes = fetch_state.max_bytes;
299 let incr_max_bytes = prev_max_bytes + prev_max_bytes;
300 if incr_max_bytes > self.config.retry_max_bytes_limit {
301 fetch_state.max_bytes = self.config.retry_max_bytes_limit;
302 } else {
303 fetch_state.max_bytes = incr_max_bytes;
304 }
305 debug!(
306 "increased max_bytes for {}:{} from {} to {}",
307 t.topic(),
308 tp.partition,
309 prev_max_bytes,
310 fetch_state.max_bytes
311 );
312 } else if num_partitions_queried == 1 {
313 // ~ this was a single partition
314 // request and we didn't get anything
315 // and we won't be increasing the max
316 // fetch size ... this is will fail
317 // forever ... signal the problem to
318 // the user
319 return Err(Error::Kafka(KafkaCode::MessageSizeTooLarge));
320 }
321 // ~ if this consumer is subscribed to one
322 // partition only, there's no need to push
323 // the partition to the 'retry_partitions'
324 // (this is just a small optimization)
325 if !single_partition_consumer {
326 // ~ mark this partition for a retry on its own
327 debug!("rescheduled for retry: {}:{}", t.topic(), tp.partition);
328 retry_partitions.push_back(tp)
329 }
330 }
331 }
332 }
333 }
334 }
335
336 // XXX in future, issue one more fetch_messages request in the
337 // background such that the next time the client polls that
338 // request's response will likely be already ready for
339 // consumption
340
341 Ok(MessageSets {
342 responses: resps,
343 empty,
344 })
345 }
346
347 /// Retrieves the offset of the last "consumed" message in the
348 /// specified partition. Results in `None` if there is no such
349 /// "consumed" message.
350 pub fn last_consumed_message(&self, topic: &str, partition: i32) -> Option<i64> {
351 self.state
352 .topic_ref(topic)
353 .and_then(|tref| {
354 self.state.consumed_offsets.get(&state::TopicPartition {
355 topic_ref: tref,
356 partition,
357 })
358 })
359 .map(|co| co.offset)
360 }
361
362 /// Marks the message at the specified offset in the specified
363 /// topic partition as consumed by the caller.
364 ///
365 /// Note: a message with a "later/higher" offset automatically
366 /// marks all preceding messages as "consumed", this is messages
367 /// with "earlier/lower" offsets in the same partition.
368 /// Therefore, it is not necessary to invoke this method for
369 /// every consumed message.
370 ///
371 /// Results in an error if the specified topic partition is not
372 /// being consumed by this consumer.
373 pub fn consume_message(&mut self, topic: &str, partition: i32, offset: i64) -> Result<()> {
374 let topic_ref = match self.state.topic_ref(topic) {
375 None => return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition)),
376 Some(topic_ref) => topic_ref,
377 };
378 let tp = state::TopicPartition {
379 topic_ref,
380 partition,
381 };
382 match self.state.consumed_offsets.entry(tp) {
383 Entry::Vacant(v) => {
384 v.insert(state::ConsumedOffset {
385 offset,
386 dirty: true,
387 });
388 }
389 Entry::Occupied(mut v) => {
390 let o = v.get_mut();
391 if offset > o.offset {
392 o.offset = offset;
393 o.dirty = true;
394 }
395 }
396 }
397 Ok(())
398 }
399
400 /// A convenience method to mark the given message set consumed as a
401 /// whole by the caller. This is equivalent to marking the last
402 /// message of the given set as consumed.
403 pub fn consume_messageset(&mut self, msgs: MessageSet<'_>) -> Result<()> {
404 if !msgs.messages.is_empty() {
405 self.consume_message(
406 msgs.topic,
407 msgs.partition,
408 msgs.messages.last().unwrap().offset,
409 )
410 } else {
411 Ok(())
412 }
413 }
414
415 /// Persists the so-far "marked as consumed" messages (on behalf
416 /// of this consumer's group for the underlying topic - if any.)
417 ///
418 /// See also `Consumer::consume_message` and
419 /// `Consumer::consume_messageset`.
420 pub fn commit_consumed(&mut self) -> Result<()> {
421 if self.config.group.is_empty() {
422 return Err(Error::UnsetGroupId);
423 }
424 debug!(
425 "commit_consumed: committing dirty-only consumer offsets (group: {} / offsets: {:?}",
426 self.config.group,
427 self.state.consumed_offsets_debug()
428 );
429 let (client, state) = (&mut self.client, &mut self.state);
430 client.commit_offsets(
431 &self.config.group,
432 state
433 .consumed_offsets
434 .iter()
435 .filter(|&(_, o)| o.dirty)
436 .map(|(tp, o)| {
437 let topic = state.topic_name(tp.topic_ref);
438
439 // Note that the offset that is committed should be the
440 // offset of the next message a consumer should read, so
441 // add one to the consumed message's offset.
442 //
443 // https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
444 CommitOffset::new(topic, tp.partition, o.offset + 1)
445 }),
446 )?;
447 for co in state.consumed_offsets.values_mut() {
448 if co.dirty {
449 co.dirty = false;
450 }
451 }
452 Ok(())
453 }
454}
455
456// --------------------------------------------------------------------
457
458/// Messages retrieved from kafka in one fetch request. This is a
459/// concatenation of blocks of messages successfully retrieved from
460/// the consumed topic partitions. Each such partitions is guaranteed
461/// to be present at most once in this structure.
462#[derive(Debug)]
463pub struct MessageSets {
464 responses: Vec<fetch::Response>,
465
466 /// Precomputed; Says whether there are some messages or whether
467 /// the responses actually contain consumeable messages
468 empty: bool,
469}
470
471impl MessageSets {
472 /// Determines efficiently whether there are any consumeable
473 /// messages in this data set.
474 pub fn is_empty(&self) -> bool {
475 self.empty
476 }
477
478 /// Iterates over the message sets delivering the fetched message
479 /// data of consumed topic partitions.
480 pub fn iter(&self) -> MessageSetsIter<'_> {
481 let mut responses = self.responses.iter();
482 let mut topics = responses.next().map(|r| r.topics().iter());
483 let (curr_topic, partitions) = topics
484 .as_mut()
485 .and_then(|t| t.next())
486 .map_or((None, None), |t| {
487 (Some(t.topic()), Some(t.partitions().iter()))
488 });
489 MessageSetsIter {
490 responses,
491 topics,
492 curr_topic: curr_topic.unwrap_or(""),
493 partitions,
494 }
495 }
496}
497
498/// A set of messages successfully retrieved from a specific topic
499/// partition.
500pub struct MessageSet<'a> {
501 topic: &'a str,
502 partition: i32,
503 messages: &'a [Message<'a>],
504}
505
506impl<'a> MessageSet<'a> {
507 #[inline]
508 pub fn topic(&self) -> &'a str {
509 self.topic
510 }
511
512 #[inline]
513 pub fn partition(&self) -> i32 {
514 self.partition
515 }
516
517 #[inline]
518 pub fn messages(&self) -> &'a [Message<'a>] {
519 self.messages
520 }
521}
522
523/// An iterator over the consumed topic partition message sets.
524pub struct MessageSetsIter<'a> {
525 responses: slice::Iter<'a, fetch::Response>,
526 topics: Option<slice::Iter<'a, fetch::Topic<'a>>>,
527 curr_topic: &'a str,
528 partitions: Option<slice::Iter<'a, fetch::Partition<'a>>>,
529}
530
531impl<'a> Iterator for MessageSetsIter<'a> {
532 type Item = MessageSet<'a>;
533
534 fn next(&mut self) -> Option<Self::Item> {
535 loop {
536 // ~ then the next available partition
537 if let Some(p) = self.partitions.as_mut().and_then(|p| p.next()) {
538 // ~ skip erroneous partitions
539 // ~ skip empty partitions
540 match p.data() {
541 Err(_) => {
542 continue;
543 }
544 Ok(pdata) => {
545 let msgs = pdata.messages();
546 if msgs.is_empty() {
547 continue;
548 } else {
549 return Some(MessageSet {
550 topic: self.curr_topic,
551 partition: p.partition(),
552 messages: msgs,
553 });
554 }
555 }
556 }
557 }
558 // ~ then the next available topic
559 if let Some(t) = self.topics.as_mut().and_then(|t| t.next()) {
560 self.curr_topic = t.topic();
561 self.partitions = Some(t.partitions().iter());
562 continue;
563 }
564 // ~ then the next available response
565 if let Some(r) = self.responses.next() {
566 self.curr_topic = "";
567 self.topics = Some(r.topics().iter());
568 continue;
569 }
570 // ~ finally we know there's nothing available anymore
571 return None;
572 }
573 }
574}