kafka/consumer/
state.rs

1use std::collections::HashMap;
2use std::collections::VecDeque;
3use std::fmt;
4use std::hash::BuildHasherDefault;
5
6use fnv::FnvHasher;
7
8use crate::client::metadata::Topics;
9use crate::client::{FetchGroupOffset, FetchOffset, KafkaClient};
10use crate::error::{Error, KafkaCode, Result};
11
12use super::assignment::{Assignment, AssignmentRef, Assignments};
13use super::config::Config;
14
15pub type PartitionHasher = BuildHasherDefault<FnvHasher>;
16
17// The "fetch state" for a particular topci partition.
18#[derive(Debug)]
19pub struct FetchState {
20    /// ~ specifies the offset which to fetch from
21    pub offset: i64,
22    /// ~ specifies the max_bytes to be fetched
23    pub max_bytes: i32,
24}
25
26#[derive(Debug, PartialEq, Eq, Hash)]
27pub struct TopicPartition {
28    /// ~ indirect reference to the topic through config.topic(..)
29    pub topic_ref: AssignmentRef,
30    /// ~ the partition to retry
31    pub partition: i32,
32}
33
34#[derive(Debug)]
35pub struct ConsumedOffset {
36    /// ~ the consumed offset
37    pub offset: i64,
38    /// ~ true if the consumed offset is chnaged but not committed to
39    /// kafka yet
40    pub dirty: bool,
41}
42
43pub struct State {
44    /// Contains the topic partitions the consumer is assigned to
45    /// consume; this is a _read-only_ data structure
46    pub assignments: Assignments,
47
48    /// Contains the information relevant for the next fetch operation
49    /// on the corresponding partitions
50    pub fetch_offsets: HashMap<TopicPartition, FetchState, PartitionHasher>,
51
52    /// Specifies partitions to be fetched on their own in the next
53    /// poll request.
54    pub retry_partitions: VecDeque<TopicPartition>,
55
56    /// Contains the offsets of messages marked as "consumed" (to be
57    /// committed)
58    pub consumed_offsets: HashMap<TopicPartition, ConsumedOffset, PartitionHasher>,
59}
60
61impl<'a> fmt::Debug for State {
62    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63        write!(
64            f,
65            "State {{ assignments: {:?}, fetch_offsets: {:?}, retry_partitions: {:?}, \
66                consumed_offsets: {:?} }}",
67            self.assignments,
68            self.fetch_offsets_debug(),
69            TopicPartitionsDebug {
70                state: self,
71                tps: &self.retry_partitions,
72            },
73            self.consumed_offsets_debug()
74        )
75    }
76}
77
78impl State {
79    pub fn new(
80        client: &mut KafkaClient,
81        config: &Config,
82        assignments: Assignments,
83    ) -> Result<State> {
84        let (consumed_offsets, fetch_offsets) = {
85            let subscriptions = {
86                let xs = assignments.as_slice();
87                let mut subs = Vec::with_capacity(xs.len());
88                for x in xs {
89                    subs.push(determine_partitions(x, client.topics())?);
90                }
91                subs
92            };
93            let n = subscriptions.iter().map(|s| s.partitions.len()).sum();
94            let consumed =
95                load_consumed_offsets(client, &config.group, &assignments, &subscriptions, n)?;
96
97            let fetch_next =
98                load_fetch_states(client, config, &assignments, &subscriptions, &consumed, n)?;
99            (consumed, fetch_next)
100        };
101        Ok(State {
102            assignments,
103            fetch_offsets,
104            retry_partitions: VecDeque::new(),
105            consumed_offsets,
106        })
107    }
108
109    pub fn topic_name(&self, assignment: AssignmentRef) -> &str {
110        self.assignments[assignment].topic()
111    }
112
113    pub fn topic_ref(&self, name: &str) -> Option<AssignmentRef> {
114        self.assignments.topic_ref(name)
115    }
116
117    /// Returns a wrapper around `self.fetch_offsets` for nice dumping
118    /// in debug messages
119    pub fn fetch_offsets_debug(&self) -> OffsetsMapDebug<'_, FetchState> {
120        OffsetsMapDebug {
121            state: self,
122            offsets: &self.fetch_offsets,
123        }
124    }
125
126    pub fn consumed_offsets_debug(&self) -> OffsetsMapDebug<'_, ConsumedOffset> {
127        OffsetsMapDebug {
128            state: self,
129            offsets: &self.consumed_offsets,
130        }
131    }
132}
133
134// Specifies the actual partitions of a topic to be consumed
135struct Subscription<'a> {
136    assignment: &'a Assignment, // the assignment - user configuration
137    partitions: Vec<i32>,       // the actual partitions to be consumed
138}
139
140/// Determines the partitions to be consumed according to the
141/// specified topic and requested partitions configuration. Returns an
142/// ordered list of the partition ids to consume.
143fn determine_partitions<'a>(
144    assignment: &'a Assignment,
145    metadata: Topics<'_>,
146) -> Result<Subscription<'a>> {
147    let topic = assignment.topic();
148    let req_partitions = assignment.partitions();
149    let avail_partitions = match metadata.partitions(topic) {
150        // ~ fail if the underlying topic is unknown to the given client
151        None => {
152            debug!(
153                "determine_partitions: no such topic: {} (all metadata: {:?})",
154                topic, metadata
155            );
156            return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition));
157        }
158        Some(tp) => tp,
159    };
160    let ps = if req_partitions.is_empty() {
161        // ~ no partitions configured ... use all available
162        let mut ps: Vec<i32> = Vec::with_capacity(avail_partitions.len());
163        for p in avail_partitions {
164            ps.push(p.id());
165        }
166        ps
167    } else {
168        // ~ validate that all partitions we're going to consume are
169        // available
170        let mut ps: Vec<i32> = Vec::with_capacity(req_partitions.len());
171        for &p in req_partitions {
172            match avail_partitions.partition(p) {
173                None => {
174                    debug!(
175                        "determine_partitions: no such partition: \"{}:{}\" \
176                            (all metadata: {:?})",
177                        topic, p, metadata
178                    );
179                    return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition));
180                }
181                Some(_) => ps.push(p),
182            };
183        }
184        ps
185    };
186    Ok(Subscription {
187        assignment,
188        partitions: ps,
189    })
190}
191
192// Fetches the so-far commited/consumed offsets for the configured
193// group/topic/partitions.
194fn load_consumed_offsets(
195    client: &mut KafkaClient,
196    group: &str,
197    assignments: &Assignments,
198    subscriptions: &[Subscription<'_>],
199    result_capacity: usize,
200) -> Result<HashMap<TopicPartition, ConsumedOffset, PartitionHasher>> {
201    assert!(!subscriptions.is_empty());
202    // ~ pre-allocate the right size
203    let mut offs = HashMap::with_capacity_and_hasher(result_capacity, PartitionHasher::default());
204    // ~ no group, no persisted consumed offsets
205    if group.is_empty() {
206        return Ok(offs);
207    }
208    // ~ otherwise try load them for the group
209    let tpos = client.fetch_group_offsets(
210        group,
211        subscriptions.iter().flat_map(|s| {
212            let topic = s.assignment.topic();
213            s.partitions
214                .iter()
215                .map(move |&p| FetchGroupOffset::new(topic, p))
216        }),
217    )?;
218    for (topic, pos) in tpos {
219        for po in pos {
220            if po.offset != -1 {
221                offs.insert(
222                    TopicPartition {
223                        topic_ref: assignments.topic_ref(&topic).expect("non-assigned topic"),
224                        partition: po.partition,
225                    },
226                    // the committed offset is the next message to be fetched, so
227                    // the last consumed message is that - 1
228                    ConsumedOffset {
229                        offset: po.offset - 1,
230                        dirty: false,
231                    },
232                );
233            }
234        }
235    }
236
237    debug!("load_consumed_offsets: constructed consumed: {:#?}", offs);
238
239    Ok(offs)
240}
241
242/// Fetches the "next fetch" offsets/states based on the specified
243/// subscriptions and the given consumed offsets.
244fn load_fetch_states(
245    client: &mut KafkaClient,
246    config: &Config,
247    assignments: &Assignments,
248    subscriptions: &[Subscription<'_>],
249    consumed_offsets: &HashMap<TopicPartition, ConsumedOffset, PartitionHasher>,
250    result_capacity: usize,
251) -> Result<HashMap<TopicPartition, FetchState, PartitionHasher>> {
252    fn load_partition_offsets(
253        client: &mut KafkaClient,
254        topics: &[&str],
255        offset: FetchOffset,
256    ) -> Result<HashMap<String, HashMap<i32, i64, PartitionHasher>>> {
257        let toffs = client.fetch_offsets(topics, offset)?;
258        let mut m = HashMap::with_capacity(toffs.len());
259        for (topic, poffs) in toffs {
260            let mut pidx =
261                HashMap::with_capacity_and_hasher(poffs.len(), PartitionHasher::default());
262
263            for poff in poffs {
264                pidx.insert(poff.partition, poff.offset);
265            }
266
267            m.insert(topic, pidx);
268        }
269        Ok(m)
270    }
271
272    let mut fetch_offsets =
273        HashMap::with_capacity_and_hasher(result_capacity, PartitionHasher::default());
274    let max_bytes = client.fetch_max_bytes_per_partition();
275    let subscription_topics: Vec<_> = subscriptions.iter().map(|s| s.assignment.topic()).collect();
276    if consumed_offsets.is_empty() {
277        // ~ if there are no offsets on behalf of the consumer
278        // group - if any - we can directly use the fallback offsets.
279        let offsets = load_partition_offsets(client, &subscription_topics, config.fallback_offset)?;
280        for s in subscriptions {
281            let topic_ref = assignments
282                .topic_ref(s.assignment.topic())
283                .expect("unassigned subscription");
284            match offsets.get(s.assignment.topic()) {
285                None => {
286                    debug!(
287                        "load_fetch_states: failed to load fallback offsets for: {}",
288                        s.assignment.topic()
289                    );
290                    return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition));
291                }
292                Some(offsets) => {
293                    for p in &s.partitions {
294                        fetch_offsets.insert(
295                            TopicPartition {
296                                topic_ref,
297                                partition: *p,
298                            },
299                            FetchState {
300                                offset: *offsets.get(p).unwrap_or(&-1),
301                                max_bytes,
302                            },
303                        );
304                    }
305                }
306            }
307        }
308    } else {
309        // fetch the earliest and latest available offsets
310        let latest = load_partition_offsets(client, &subscription_topics, FetchOffset::Latest)?;
311        let earliest = load_partition_offsets(client, &subscription_topics, FetchOffset::Earliest)?;
312        // ~ for each subscribed partition if we have a
313        // consumed_offset verify it is in the earliest/latest range
314        // and use that consumed_offset+1 as the fetch_message.
315        for s in subscriptions {
316            let topic_ref = assignments
317                .topic_ref(s.assignment.topic())
318                .expect("unassigned subscription");
319            for p in &s.partitions {
320                let l_off = *latest
321                    .get(s.assignment.topic())
322                    .and_then(|ps| ps.get(p))
323                    .unwrap_or(&-1);
324                let e_off = *earliest
325                    .get(s.assignment.topic())
326                    .and_then(|ps| ps.get(p))
327                    .unwrap_or(&-1);
328
329                let tp = TopicPartition {
330                    topic_ref,
331                    partition: *p,
332                };
333
334                // the "latest" offset is the offset of the "next coming message"
335                let offset = match consumed_offsets.get(&tp) {
336                    Some(co) if co.offset >= e_off && co.offset < l_off => co.offset + 1,
337                    _ => match config.fallback_offset {
338                        FetchOffset::Latest => l_off,
339                        FetchOffset::Earliest => e_off,
340                        _ => {
341                            debug!(
342                                "cannot determine fetch offset \
343                                        (group: {} / topic: {} / partition: {})",
344                                &config.group,
345                                s.assignment.topic(),
346                                p
347                            );
348                            return Err(Error::Kafka(KafkaCode::Unknown));
349                        }
350                    },
351                };
352                fetch_offsets.insert(tp, FetchState { offset, max_bytes });
353            }
354        }
355    }
356    Ok(fetch_offsets)
357}
358
359pub struct OffsetsMapDebug<'a, T> {
360    state: &'a State,
361    offsets: &'a HashMap<TopicPartition, T, PartitionHasher>,
362}
363
364impl<'a, T: fmt::Debug + 'a> fmt::Debug for OffsetsMapDebug<'a, T> {
365    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
366        write!(f, "{{")?;
367        for (i, (tp, v)) in self.offsets.iter().enumerate() {
368            if i != 0 {
369                write!(f, ", ")?;
370            }
371            let topic = self.state.topic_name(tp.topic_ref);
372            write!(f, "\"{}:{}\": {:?}", topic, tp.partition, v)?;
373        }
374        write!(f, "}}")
375    }
376}
377
378struct TopicPartitionsDebug<'a> {
379    state: &'a State,
380    tps: &'a VecDeque<TopicPartition>,
381}
382
383impl<'a> fmt::Debug for TopicPartitionsDebug<'a> {
384    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
385        write!(f, "[")?;
386        for (i, tp) in self.tps.iter().enumerate() {
387            if i != 0 {
388                write!(f, " ,")?;
389            }
390            write!(
391                f,
392                "\"{}:{}\"",
393                self.state.topic_name(tp.topic_ref),
394                tp.partition
395            )?;
396        }
397        write!(f, "]")
398    }
399}