kafka/consumer/
assignment.rs

1//! Encapsulates topic partition assignments to a consumer. Once
2//! constructed these assignments are not modified.
3
4use std::collections::HashMap;
5use std::ops::Index;
6use std::u32;
7
8/// A read-only configuration for `Consumer`.
9#[derive(Debug)]
10pub struct Assignment {
11    /// ~ name of the topic to consume
12    topic: String,
13    /// ~ list of partitions to consumer, empty if consuming all
14    /// available partitions
15    /// ~ kept in ascending order
16    /// ~ no duplicates
17    partitions: Vec<i32>,
18}
19
20impl Assignment {
21    pub fn topic(&self) -> &str {
22        &self.topic
23    }
24
25    pub fn partitions(&self) -> &[i32] {
26        &self.partitions
27    }
28}
29
30/// A "pointer" to an assignment stored in `Config`.
31#[derive(Debug, PartialEq, Eq, Hash, Copy, Clone)]
32pub struct AssignmentRef(u32);
33
34/// A set of assignments.
35#[derive(Debug)]
36pub struct Assignments(Vec<Assignment>);
37
38impl Assignments {
39    pub fn as_slice(&self) -> &[Assignment] {
40        &self.0
41    }
42
43    pub fn topic_ref(&self, topic: &str) -> Option<AssignmentRef> {
44        self.0
45            .binary_search_by(|x| x.topic.as_str().cmp(topic))
46            .ok()
47            .map(|i| AssignmentRef(i as u32))
48    }
49}
50
51impl Index<AssignmentRef> for Assignments {
52    type Output = Assignment;
53    fn index(&self, index: AssignmentRef) -> &Self::Output {
54        &self.0[index.0 as usize]
55    }
56}
57
58pub fn from_map(src: HashMap<String, Vec<i32>>) -> Assignments {
59    let mut xs = Vec::with_capacity(src.len());
60    for (topic, mut partitions) in src {
61        partitions.sort_unstable();
62        partitions.dedup();
63        xs.push(Assignment { topic, partitions });
64    }
65    // ~ sort by topic such has we can apply binary search by that
66    // attribute later
67    xs.sort_by(|a, b| a.topic.cmp(&b.topic));
68    Assignments(xs)
69}