kafka/consumer/
mod.rs

1//! Kafka Consumer - A higher-level API for consuming kafka topics.
2//!
3//! A consumer for Kafka topics on behalf of a specified group
4//! providing help in offset management.  The consumer requires at
5//! least one topic for consumption and allows consuming multiple
6//! topics at the same time. Further, clients can restrict the
7//! consumer to only specific topic partitions as demonstrated in the
8//! following example.
9//!
10//! # Example
11//! ```no_run
12//! use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
13//!
14//! let mut consumer =
15//!    Consumer::from_hosts(vec!("localhost:9092".to_owned()))
16//!       .with_topic_partitions("my-topic".to_owned(), &[0, 1])
17//!       .with_fallback_offset(FetchOffset::Earliest)
18//!       .with_group("my-group".to_owned())
19//!       .with_offset_storage(Some(GroupOffsetStorage::Kafka))
20//!       .create()
21//!       .unwrap();
22//! loop {
23//!   for ms in consumer.poll().unwrap().iter() {
24//!     for m in ms.messages() {
25//!       println!("{:?}", m);
26//!     }
27//!     consumer.consume_messageset(ms);
28//!   }
29//!   consumer.commit_consumed().unwrap();
30//! }
31//! ```
32//!
33//! Please refer to the documentation of the individual "with" methods
34//! used to set up the consumer. These contain further information or
35//! links to such.
36//!
37//! A call to `.poll()` on the consumer will ask for the next
38//! available "chunk of data" for the client code to process.  The
39//! returned data are `MessageSet`s. There is at most one for each partition
40//! of the consumed topics. Individual messages are embedded in the
41//! retrieved messagesets and can be processed using the `messages()`
42//! iterator.  Due to this embedding, an individual message's lifetime
43//! is bound to the `MessageSet` it is part of. Typically, client
44//! code accesses the raw data/bytes, parses it into custom data types,
45//! and passes that along for further processing within the application.
46//! Although inconvenient, this helps in reducing the number of
47//! allocations within the pipeline of processing incoming messages.
48//!
49//! If the consumer is configured for a non-empty group, it helps in
50//! keeping track of already consumed messages by maintaining a map of
51//! the consumed offsets.  Messages can be told "consumed" either
52//! through `consume_message` or `consume_messages` methods.  Once
53//! these consumed messages are committed to Kafka using
54//! `commit_consumed`, the consumer will start fetching messages from
55//! here even after restart.  Since committing is a certain overhead,
56//! it is up to the client to decide the frequency of the commits.
57//! The consumer will *not* commit any messages to Kafka
58//! automatically.
59//!
60//! The configuration of a group is optional.  If the consumer has no
61//! group configured, it will behave as if it had one, only that
62//! committing consumed message offsets resolves into a void operation.
63
64use std::collections::hash_map::{Entry, HashMap};
65use std::slice;
66
67use crate::client::fetch;
68use crate::client::{CommitOffset, FetchPartition, KafkaClient};
69use crate::error::{Error, KafkaCode, Result};
70
71// public re-exports
72pub use self::builder::Builder;
73pub use crate::client::fetch::Message;
74pub use crate::client::FetchOffset;
75pub use crate::client::GroupOffsetStorage;
76
77mod assignment;
78mod builder;
79mod config;
80mod state;
81
82/// The default value for `Builder::with_retry_max_bytes_limit`.
83pub const DEFAULT_RETRY_MAX_BYTES_LIMIT: i32 = 0;
84
85/// The default value for `Builder::with_fallback_offset`.
86pub const DEFAULT_FALLBACK_OFFSET: FetchOffset = FetchOffset::Latest;
87
88/// The Kafka Consumer
89///
90/// See module level documentation.
91#[derive(Debug)]
92pub struct Consumer {
93    client: KafkaClient,
94    state: state::State,
95    config: config::Config,
96}
97
98// XXX 1) Allow returning to a previous offset (aka seeking)
99// XXX 2) Issue IO in a separate (background) thread and pre-fetch messagesets
100
101impl Consumer {
102    /// Starts building a consumer using the given kafka client.
103    pub fn from_client(client: KafkaClient) -> Builder {
104        builder::new(Some(client), Vec::new())
105    }
106
107    /// Starts building a consumer bootstraping internally a new kafka
108    /// client from the given kafka hosts.
109    pub fn from_hosts(hosts: Vec<String>) -> Builder {
110        builder::new(None, hosts)
111    }
112
113    /// Borrows the underlying kafka client.
114    pub fn client(&self) -> &KafkaClient {
115        &self.client
116    }
117
118    /// Borrows the underlying kafka client as mut.
119    pub fn client_mut(&mut self) -> &mut KafkaClient {
120        &mut self.client
121    }
122
123    /// Destroys this consumer returning back the underlying kafka client.
124    pub fn into_client(self) -> KafkaClient {
125        self.client
126    }
127
128    /// Retrieves the topic partitions being currently consumed by
129    /// this consumer.
130    pub fn subscriptions(&self) -> HashMap<String, Vec<i32>> {
131        // ~ current subscriptions are reflected by
132        // `self.state.fetch_offsets` see `self.fetch_messages()`.
133        // ~ the number of topics subscribed can be estimated from the
134        // user specified assignments stored in `self.state.assignments`.
135        let mut h: HashMap<String, Vec<i32>> =
136            HashMap::with_capacity(self.state.assignments.as_slice().len());
137        // ~ expand subscriptions to (topic-name, partition id)
138        let tps = self
139            .state
140            .fetch_offsets
141            .keys()
142            .map(|tp| (self.state.topic_name(tp.topic_ref), tp.partition));
143        // ~ group by topic-name
144        for tp in tps {
145            // ~ allocate topic-name only once per topic
146            if let Some(ps) = h.get_mut(tp.0) {
147                ps.push(tp.1);
148                continue;
149            }
150            h.insert(tp.0.to_owned(), vec![tp.1]);
151        }
152        h
153    }
154
155    /// Polls for the next available message data.
156    pub fn poll(&mut self) -> Result<MessageSets> {
157        let (n, resps) = self.fetch_messages();
158        self.process_fetch_responses(n, resps?)
159    }
160
161    /// Determines whether this consumer is set up to consume only a
162    /// single topic partition.
163    fn single_partition_consumer(&self) -> bool {
164        self.state.fetch_offsets.len() == 1
165    }
166
167    /// Retrieves the group on which behalf this consumer is acting.
168    /// The empty group name specifies a group-less consumer.
169    pub fn group(&self) -> &str {
170        &self.config.group
171    }
172
173    // ~ returns (number partitions queried, fecth responses)
174    fn fetch_messages(&mut self) -> (u32, Result<Vec<fetch::Response>>) {
175        // ~ if there's a retry partition ... fetch messages just for
176        // that one. Otherwise try to fetch messages for all assigned
177        // partitions.
178        match self.state.retry_partitions.pop_front() {
179            Some(tp) => {
180                let s = match self.state.fetch_offsets.get(&tp) {
181                    Some(fstate) => fstate,
182                    None => return (1, Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition))),
183                };
184                let topic = self.state.topic_name(tp.topic_ref);
185                debug!(
186                    "fetching retry messages: (fetch-offset: {{\"{}:{}\": {:?}}})",
187                    topic, tp.partition, s
188                );
189                (
190                    1,
191                    self.client.fetch_messages_for_partition(
192                        &FetchPartition::new(topic, tp.partition, s.offset)
193                            .with_max_bytes(s.max_bytes),
194                    ),
195                )
196            }
197            None => {
198                let client = &mut self.client;
199                let state = &self.state;
200                debug!(
201                    "fetching messages: (fetch-offsets: {:?})",
202                    state.fetch_offsets_debug()
203                );
204                let reqs = state.fetch_offsets.iter().map(|(tp, s)| {
205                    let topic = state.topic_name(tp.topic_ref);
206                    FetchPartition::new(topic, tp.partition, s.offset).with_max_bytes(s.max_bytes)
207                });
208                (
209                    state.fetch_offsets.len() as u32,
210                    client.fetch_messages(reqs),
211                )
212            }
213        }
214    }
215
216    // ~ post process a data retrieved through fetch_messages before
217    // handing them out to client code
218    //   - update the fetch state for the next fetch cycle
219    // ~ num_partitions_queried: the original number of partitions requested/queried for
220    //   the responses
221    // ~ resps: the responses to post process
222    fn process_fetch_responses(
223        &mut self,
224        num_partitions_queried: u32,
225        resps: Vec<fetch::Response>,
226    ) -> Result<MessageSets> {
227        let single_partition_consumer = self.single_partition_consumer();
228        let mut empty = true;
229        let retry_partitions = &mut self.state.retry_partitions;
230
231        for resp in &resps {
232            for t in resp.topics() {
233                let topic_ref = self
234                    .state
235                    .assignments
236                    .topic_ref(t.topic())
237                    .expect("unknown topic in response");
238
239                for p in t.partitions() {
240                    let tp = state::TopicPartition {
241                        topic_ref,
242                        partition: p.partition(),
243                    };
244
245                    // ~ for now, as soon as a partition has an error
246                    // we fail to prevent client programs from not
247                    // noticing.  however, in future we don't need to
248                    // fail immediately, we can try to recover from
249                    // certain errors and retry the fetch operation
250                    // transparently for the caller.
251
252                    // XXX need to prevent updating fetch_offsets in case we're gonna fail here
253                    let data = p.data()?;
254
255                    let mut fetch_state = self
256                        .state
257                        .fetch_offsets
258                        .get_mut(&tp)
259                        .expect("non-requested partition");
260                    // ~ book keeping
261                    if let Some(last_msg) = data.messages().last() {
262                        fetch_state.offset = last_msg.offset + 1;
263                        empty = false;
264
265                        // ~ reset the max_bytes again to its usual
266                        // value if we had a retry request and finally
267                        // got some data
268                        if fetch_state.max_bytes != self.client.fetch_max_bytes_per_partition() {
269                            let prev_max_bytes = fetch_state.max_bytes;
270                            fetch_state.max_bytes = self.client.fetch_max_bytes_per_partition();
271                            debug!(
272                                "reset max_bytes for {}:{} from {} to {}",
273                                t.topic(),
274                                tp.partition,
275                                prev_max_bytes,
276                                fetch_state.max_bytes
277                            );
278                        }
279                    } else {
280                        debug!(
281                            "no data received for {}:{} (max_bytes: {} / fetch_offset: {} / \
282                                highwatermark_offset: {})",
283                            t.topic(),
284                            tp.partition,
285                            fetch_state.max_bytes,
286                            fetch_state.offset,
287                            data.highwatermark_offset()
288                        );
289
290                        // ~ when a partition is empty but has a
291                        // highwatermark-offset equal to or greater
292                        // than the one we tried to fetch ... we'll
293                        // try to increase the max-fetch-size in the
294                        // next fetch request
295                        if fetch_state.offset < data.highwatermark_offset() {
296                            if fetch_state.max_bytes < self.config.retry_max_bytes_limit {
297                                // ~ try to double the max_bytes
298                                let prev_max_bytes = fetch_state.max_bytes;
299                                let incr_max_bytes = prev_max_bytes + prev_max_bytes;
300                                if incr_max_bytes > self.config.retry_max_bytes_limit {
301                                    fetch_state.max_bytes = self.config.retry_max_bytes_limit;
302                                } else {
303                                    fetch_state.max_bytes = incr_max_bytes;
304                                }
305                                debug!(
306                                    "increased max_bytes for {}:{} from {} to {}",
307                                    t.topic(),
308                                    tp.partition,
309                                    prev_max_bytes,
310                                    fetch_state.max_bytes
311                                );
312                            } else if num_partitions_queried == 1 {
313                                // ~ this was a single partition
314                                // request and we didn't get anything
315                                // and we won't be increasing the max
316                                // fetch size ... this is will fail
317                                // forever ... signal the problem to
318                                // the user
319                                return Err(Error::Kafka(KafkaCode::MessageSizeTooLarge));
320                            }
321                            // ~ if this consumer is subscribed to one
322                            // partition only, there's no need to push
323                            // the partition to the 'retry_partitions'
324                            // (this is just a small optimization)
325                            if !single_partition_consumer {
326                                // ~ mark this partition for a retry on its own
327                                debug!("rescheduled for retry: {}:{}", t.topic(), tp.partition);
328                                retry_partitions.push_back(tp)
329                            }
330                        }
331                    }
332                }
333            }
334        }
335
336        // XXX in future, issue one more fetch_messages request in the
337        // background such that the next time the client polls that
338        // request's response will likely be already ready for
339        // consumption
340
341        Ok(MessageSets {
342            responses: resps,
343            empty,
344        })
345    }
346
347    /// Retrieves the offset of the last "consumed" message in the
348    /// specified partition. Results in `None` if there is no such
349    /// "consumed" message.
350    pub fn last_consumed_message(&self, topic: &str, partition: i32) -> Option<i64> {
351        self.state
352            .topic_ref(topic)
353            .and_then(|tref| {
354                self.state.consumed_offsets.get(&state::TopicPartition {
355                    topic_ref: tref,
356                    partition,
357                })
358            })
359            .map(|co| co.offset)
360    }
361
362    /// Marks the message at the specified offset in the specified
363    /// topic partition as consumed by the caller.
364    ///
365    /// Note: a message with a "later/higher" offset automatically
366    /// marks all preceding messages as "consumed", this is messages
367    /// with "earlier/lower" offsets in the same partition.
368    /// Therefore, it is not necessary to invoke this method for
369    /// every consumed message.
370    ///
371    /// Results in an error if the specified topic partition is not
372    /// being consumed by this consumer.
373    pub fn consume_message(&mut self, topic: &str, partition: i32, offset: i64) -> Result<()> {
374        let topic_ref = match self.state.topic_ref(topic) {
375            None => return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition)),
376            Some(topic_ref) => topic_ref,
377        };
378        let tp = state::TopicPartition {
379            topic_ref,
380            partition,
381        };
382        match self.state.consumed_offsets.entry(tp) {
383            Entry::Vacant(v) => {
384                v.insert(state::ConsumedOffset {
385                    offset,
386                    dirty: true,
387                });
388            }
389            Entry::Occupied(mut v) => {
390                let o = v.get_mut();
391                if offset > o.offset {
392                    o.offset = offset;
393                    o.dirty = true;
394                }
395            }
396        }
397        Ok(())
398    }
399
400    /// A convenience method to mark the given message set consumed as a
401    /// whole by the caller. This is equivalent to marking the last
402    /// message of the given set as consumed.
403    pub fn consume_messageset(&mut self, msgs: MessageSet<'_>) -> Result<()> {
404        if !msgs.messages.is_empty() {
405            self.consume_message(
406                msgs.topic,
407                msgs.partition,
408                msgs.messages.last().unwrap().offset,
409            )
410        } else {
411            Ok(())
412        }
413    }
414
415    /// Persists the so-far "marked as consumed" messages (on behalf
416    /// of this consumer's group for the underlying topic - if any.)
417    ///
418    /// See also `Consumer::consume_message` and
419    /// `Consumer::consume_messageset`.
420    pub fn commit_consumed(&mut self) -> Result<()> {
421        if self.config.group.is_empty() {
422            return Err(Error::UnsetGroupId);
423        }
424        debug!(
425            "commit_consumed: committing dirty-only consumer offsets (group: {} / offsets: {:?}",
426            self.config.group,
427            self.state.consumed_offsets_debug()
428        );
429        let (client, state) = (&mut self.client, &mut self.state);
430        client.commit_offsets(
431            &self.config.group,
432            state
433                .consumed_offsets
434                .iter()
435                .filter(|&(_, o)| o.dirty)
436                .map(|(tp, o)| {
437                    let topic = state.topic_name(tp.topic_ref);
438
439                    // Note that the offset that is committed should be the
440                    // offset of the next message a consumer should read, so
441                    // add one to the consumed message's offset.
442                    //
443                    // https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
444                    CommitOffset::new(topic, tp.partition, o.offset + 1)
445                }),
446        )?;
447        for co in state.consumed_offsets.values_mut() {
448            if co.dirty {
449                co.dirty = false;
450            }
451        }
452        Ok(())
453    }
454}
455
456// --------------------------------------------------------------------
457
458/// Messages retrieved from kafka in one fetch request.  This is a
459/// concatenation of blocks of messages successfully retrieved from
460/// the consumed topic partitions.  Each such partitions is guaranteed
461/// to be present at most once in this structure.
462#[derive(Debug)]
463pub struct MessageSets {
464    responses: Vec<fetch::Response>,
465
466    /// Precomputed; Says whether there are some messages or whether
467    /// the responses actually contain consumeable messages
468    empty: bool,
469}
470
471impl MessageSets {
472    /// Determines efficiently whether there are any consumeable
473    /// messages in this data set.
474    pub fn is_empty(&self) -> bool {
475        self.empty
476    }
477
478    /// Iterates over the message sets delivering the fetched message
479    /// data of consumed topic partitions.
480    pub fn iter(&self) -> MessageSetsIter<'_> {
481        let mut responses = self.responses.iter();
482        let mut topics = responses.next().map(|r| r.topics().iter());
483        let (curr_topic, partitions) = topics
484            .as_mut()
485            .and_then(|t| t.next())
486            .map_or((None, None), |t| {
487                (Some(t.topic()), Some(t.partitions().iter()))
488            });
489        MessageSetsIter {
490            responses,
491            topics,
492            curr_topic: curr_topic.unwrap_or(""),
493            partitions,
494        }
495    }
496}
497
498/// A set of messages successfully retrieved from a specific topic
499/// partition.
500pub struct MessageSet<'a> {
501    topic: &'a str,
502    partition: i32,
503    messages: &'a [Message<'a>],
504}
505
506impl<'a> MessageSet<'a> {
507    #[inline]
508    pub fn topic(&self) -> &'a str {
509        self.topic
510    }
511
512    #[inline]
513    pub fn partition(&self) -> i32 {
514        self.partition
515    }
516
517    #[inline]
518    pub fn messages(&self) -> &'a [Message<'a>] {
519        self.messages
520    }
521}
522
523/// An iterator over the consumed topic partition message sets.
524pub struct MessageSetsIter<'a> {
525    responses: slice::Iter<'a, fetch::Response>,
526    topics: Option<slice::Iter<'a, fetch::Topic<'a>>>,
527    curr_topic: &'a str,
528    partitions: Option<slice::Iter<'a, fetch::Partition<'a>>>,
529}
530
531impl<'a> Iterator for MessageSetsIter<'a> {
532    type Item = MessageSet<'a>;
533
534    fn next(&mut self) -> Option<Self::Item> {
535        loop {
536            // ~ then the next available partition
537            if let Some(p) = self.partitions.as_mut().and_then(|p| p.next()) {
538                // ~ skip erroneous partitions
539                // ~ skip empty partitions
540                match p.data() {
541                    Err(_) => {
542                        continue;
543                    }
544                    Ok(pdata) => {
545                        let msgs = pdata.messages();
546                        if msgs.is_empty() {
547                            continue;
548                        } else {
549                            return Some(MessageSet {
550                                topic: self.curr_topic,
551                                partition: p.partition(),
552                                messages: msgs,
553                            });
554                        }
555                    }
556                }
557            }
558            // ~ then the next available topic
559            if let Some(t) = self.topics.as_mut().and_then(|t| t.next()) {
560                self.curr_topic = t.topic();
561                self.partitions = Some(t.partitions().iter());
562                continue;
563            }
564            // ~ then the next available response
565            if let Some(r) = self.responses.next() {
566                self.curr_topic = "";
567                self.topics = Some(r.topics().iter());
568                continue;
569            }
570            // ~ finally we know there's nothing available anymore
571            return None;
572        }
573    }
574}