kafka/client/
state.rs

1use std::collections::hash_map::{Entry, HashMap, Keys};
2use std::convert::AsRef;
3use std::slice;
4use std::u32;
5
6use crate::error::Result;
7use crate::protocol;
8
9#[derive(Debug)]
10pub struct ClientState {
11    // ~ the last correlation used when communicating with kafka
12    // (see `#next_correlation_id`)
13    correlation: i32,
14
15    // ~ a list of known brokers referred to by the index in this
16    // vector.  This index is also referred to as `BrokerRef` and is
17    // enforced by this module.
18    //
19    // Note: loading of additional topic metadata must preserve
20    // already present brokers in this vector at their position.
21    // See `ClientState::update_metadata`
22    brokers: Vec<Broker>,
23
24    // ~ a mapping of topic to information about its partitions
25    topic_partitions: HashMap<String, TopicPartitions>,
26
27    // ~ a mapping of groups to their coordinators
28    group_coordinators: HashMap<String, BrokerRef>,
29}
30
31// --------------------------------------------------------------------
32
33// ~ note: this type is re-exported to the crate's public api through
34// client::metadata
35/// Describes a Kafka broker node `kafka-rust` is communicating with.
36#[derive(Debug)]
37pub struct Broker {
38    /// The identifier of this broker as understood in a Kafka
39    /// cluster.
40    node_id: i32,
41    /// "host:port" of this broker. This information is advertised by
42    /// and originating from Kafka cluster itself.
43    host: String,
44}
45
46impl Broker {
47    /// Retrieves the node_id of this broker as identified with the
48    /// remote Kafka cluster.
49    #[inline]
50    pub fn id(&self) -> i32 {
51        self.node_id
52    }
53
54    /// Retrieves the host:port of the this Kafka broker.
55    #[inline]
56    pub fn host(&self) -> &str {
57        &self.host
58    }
59}
60
61// See `Brokerref`
62const UNKNOWN_BROKER_INDEX: u32 = u32::MAX;
63
64/// ~ A custom identifier for a broker.  This type hides the fact that
65/// a `TopicPartition` references a `Broker` indirectly, loosely
66/// through an index, thereby being able to share broker data without
67/// having to fallback to `Rc` or `Arc` or otherwise fighting the
68/// borrowck.
69// ~ The value `UNKNOWN_BROKER_INDEX` is artificial and represents an
70// index to an unknown broker (aka the null value.) Code indexing
71// `self.brokers` using a `BrokerRef` _must_ check against this
72// constant and/or treat it conditionally.
73#[derive(Debug, Copy, Clone)]
74pub struct BrokerRef {
75    _index: u32,
76}
77
78impl BrokerRef {
79    // ~ private constructor on purpose
80    fn new(index: u32) -> Self {
81        BrokerRef { _index: index }
82    }
83
84    fn index(&self) -> usize {
85        self._index as usize
86    }
87
88    fn set(&mut self, other: BrokerRef) {
89        if self._index != other._index {
90            self._index = other._index;
91        }
92    }
93
94    fn set_unknown(&mut self) {
95        self.set(BrokerRef::new(UNKNOWN_BROKER_INDEX))
96    }
97}
98
99// --------------------------------------------------------------------
100
101/// A representation of partitions for a single topic.
102#[derive(Debug)]
103pub struct TopicPartitions {
104    // ~ This list keeps information about each partition of the
105    // corresponding topic - even about partitions currently without a
106    // leader.  The index into this list specifies the partition
107    // identifier.  (This works due to Kafka numbering partitions 0..N
108    // where N is the number of partitions of the topic.)
109    partitions: Vec<TopicPartition>,
110}
111
112impl TopicPartitions {
113    /// Creates a new partitions vector with all partitions leaderless
114    fn new_with_partitions(n: usize) -> TopicPartitions {
115        TopicPartitions {
116            partitions: (0..n).map(|_| TopicPartition::new()).collect(),
117        }
118    }
119
120    pub fn len(&self) -> usize {
121        self.partitions.len()
122    }
123
124    pub fn is_empty(&self) -> bool {
125        self.partitions.is_empty()
126    }
127
128    pub fn partition(&self, partition_id: i32) -> Option<&TopicPartition> {
129        self.partitions.get(partition_id as usize)
130    }
131
132    pub fn iter(&self) -> TopicPartitionIter<'_> {
133        self.into_iter()
134    }
135}
136
137impl<'a> IntoIterator for &'a TopicPartitions {
138    type Item = (i32, &'a TopicPartition);
139    type IntoIter = TopicPartitionIter<'a>;
140
141    fn into_iter(self) -> Self::IntoIter {
142        TopicPartitionIter {
143            partition_id: 0,
144            iter: self.partitions.iter(),
145        }
146    }
147}
148
149/// Metadata for a single topic partition.
150#[derive(Debug)]
151pub struct TopicPartition {
152    broker: BrokerRef,
153}
154
155impl TopicPartition {
156    fn new() -> TopicPartition {
157        TopicPartition {
158            broker: BrokerRef::new(UNKNOWN_BROKER_INDEX),
159        }
160    }
161
162    pub fn broker<'a>(&self, state: &'a ClientState) -> Option<&'a Broker> {
163        state.brokers.get(self.broker.index())
164    }
165}
166
167/// An iterator over a topic's partitions.
168pub struct TopicPartitionIter<'a> {
169    iter: slice::Iter<'a, TopicPartition>,
170    partition_id: i32,
171}
172
173impl<'a> Iterator for TopicPartitionIter<'a> {
174    type Item = (i32, &'a TopicPartition);
175    fn next(&mut self) -> Option<Self::Item> {
176        self.iter.next().map(|tp| {
177            let partition_id = self.partition_id;
178            self.partition_id += 1;
179            (partition_id, tp)
180        })
181    }
182}
183
184// --------------------------------------------------------------------
185
186// ~ note: this type is re-exported to the crate's public api through
187// client::metadata
188/// An iterator over the topic names.
189pub struct TopicNames<'a> {
190    iter: Keys<'a, String, TopicPartitions>,
191}
192
193impl<'a> Iterator for TopicNames<'a> {
194    type Item = &'a str;
195
196    #[inline]
197    fn next(&mut self) -> Option<Self::Item> {
198        self.iter.next().map(AsRef::as_ref)
199    }
200}
201
202impl Default for ClientState {
203    fn default() -> Self {
204        Self::new()
205    }
206}
207
208impl ClientState {
209    pub fn new() -> Self {
210        ClientState {
211            correlation: 0,
212            brokers: Vec::new(),
213            topic_partitions: HashMap::new(),
214            group_coordinators: HashMap::new(),
215        }
216    }
217
218    pub fn num_topics(&self) -> usize {
219        self.topic_partitions.len()
220    }
221
222    pub fn contains_topic(&self, topic: &str) -> bool {
223        self.topic_partitions.contains_key(topic)
224    }
225
226    pub fn contains_topic_partition(&self, topic: &str, partition_id: i32) -> bool {
227        self.topic_partitions
228            .get(topic)
229            .map(|tp| tp.partition(partition_id))
230            .is_some()
231    }
232
233    pub fn topic_names(&self) -> TopicNames<'_> {
234        TopicNames {
235            iter: self.topic_partitions.keys(),
236        }
237    }
238
239    // exposed for the sake of the metadata module
240    pub fn topic_partitions(&self) -> &HashMap<String, TopicPartitions> {
241        &self.topic_partitions
242    }
243
244    pub fn partitions_for<'a>(&'a self, topic: &str) -> Option<&'a TopicPartitions> {
245        self.topic_partitions.get(topic)
246    }
247
248    pub fn next_correlation_id(&mut self) -> i32 {
249        self.correlation = (self.correlation + 1) % (1i32 << 30);
250        self.correlation
251    }
252
253    pub fn find_broker<'a>(&'a self, topic: &str, partition_id: i32) -> Option<&'a str> {
254        self.topic_partitions
255            .get(topic)
256            .and_then(|tp| tp.partition(partition_id))
257            .and_then(|p| p.broker(self))
258            .map(|b| &b.host[..])
259    }
260
261    /// Clears all metadata.
262    pub fn clear_metadata(&mut self) {
263        // ~ important to clear both since one references the other
264        // through `BrokerIndex`
265        self.topic_partitions.clear();
266        self.brokers.clear();
267    }
268
269    /// Loads new and updates existing metadata from the given
270    /// metadata response.
271    pub fn update_metadata(&mut self, md: protocol::MetadataResponse) -> Result<()> {
272        debug!("updating metadata from: {:?}", md);
273
274        // ~ register new brokers with self.brokers and obtain an
275        // index over them by broker-node-id
276        let brokers = self.update_brokers(&md);
277
278        // ~ now update partitions
279        for t in md.topics {
280            // ~ get a mutable reference to the partitions vector
281            // (maintained in self.topic_partitions) for the topic
282            let tps = match self.topic_partitions.entry(t.topic) {
283                Entry::Occupied(e) => {
284                    let ps = &mut e.into_mut().partitions;
285                    match (ps.len(), t.partitions.len()) {
286                        (n, m) if n > m => ps.truncate(m),
287                        (n, m) if n < m => {
288                            ps.reserve(m);
289                            for _ in 0..(m - n) {
290                                ps.push(TopicPartition::new());
291                            }
292                        }
293                        _ => {}
294                    }
295                    ps
296                }
297                Entry::Vacant(e) => {
298                    &mut e
299                        .insert(TopicPartitions::new_with_partitions(t.partitions.len()))
300                        .partitions
301                }
302            };
303            // ~ sync the partitions vector with the new information
304            for partition in t.partitions {
305                let tp = &mut tps[partition.id as usize];
306                if let Some(bref) = brokers.get(&partition.leader) {
307                    tp.broker.set(*bref)
308                } else {
309                    tp.broker.set_unknown()
310                }
311            }
312        }
313        Ok(())
314    }
315
316    /// Updates self.brokers from the given metadata returning an
317    /// index `NodeId -> BrokerRef`
318    fn update_brokers(&mut self, md: &protocol::MetadataResponse) -> HashMap<i32, BrokerRef> {
319        // ~ build an index of the already loaded brokers -- if any
320        let mut brokers = HashMap::with_capacity(self.brokers.len() + md.brokers.len());
321        for (i, broker) in (0u32..).zip(self.brokers.iter()) {
322            brokers.insert(broker.node_id, BrokerRef::new(i));
323        }
324
325        // ~ now add new brokers or updated existing ones while
326        // keeping the above 'broker' index up-to-date
327        for broker in &md.brokers {
328            let broker_host = format!("{}:{}", broker.host, broker.port);
329            match brokers.entry(broker.node_id) {
330                Entry::Occupied(e) => {
331                    // ~ verify our information of the already tracked
332                    // broker is up-to-date
333                    let bref = *e.get();
334                    let b = &mut self.brokers[bref.index()];
335                    if b.host != broker_host {
336                        b.host = broker_host;
337                    }
338                }
339                Entry::Vacant(e) => {
340                    // ~ insert the new broker
341                    let new_index = self.brokers.len();
342                    self.brokers.push(Broker {
343                        node_id: broker.node_id,
344                        host: broker_host,
345                    });
346                    // ~ track the pushed broker's index
347                    e.insert(BrokerRef::new(new_index as u32));
348                }
349            }
350        }
351        brokers
352    }
353
354    /// ~ Retrieves the host:port of the coordinator for the specified
355    /// group - if any.
356    pub fn group_coordinator<'a>(&'a self, group: &str) -> Option<&'a str> {
357        self.group_coordinators
358            .get(group)
359            .and_then(|b| self.brokers.get(b.index()))
360            .map(|b| &b.host[..])
361    }
362
363    /// ~ Removes the current coordinator - if any - for the specified
364    /// group.
365    pub fn remove_group_coordinator(&mut self, group: &str) {
366        self.group_coordinators.remove(group);
367    }
368
369    /// ~ Updates the coordinator for the specified group and returns
370    /// the coordinator host as if `group_coordinator` would have
371    /// been called subsequently.
372    pub fn set_group_coordinator<'a>(
373        &'a mut self,
374        group: &str,
375        gc: &protocol::GroupCoordinatorResponse,
376    ) -> &'a str {
377        debug!(
378            "set_group_coordinator: registering coordinator for '{}': {:?}",
379            group, gc
380        );
381
382        let group_host = format!("{}:{}", gc.host, gc.port);
383        // ~ try to find an already existing broker
384        let mut broker_ref = BrokerRef::new(UNKNOWN_BROKER_INDEX);
385        for (i, broker) in (0u32..).zip(self.brokers.iter()) {
386            if gc.broker_id == broker.node_id {
387                if group_host != broker.host {
388                    warn!(
389                        "set_group_coordinator: coord_host({}) != broker_host({}) for \
390                           broker_id({})!",
391                        group_host, broker.host, broker.node_id
392                    );
393                }
394                broker_ref._index = i;
395                break;
396            }
397        }
398        // ~ if not found, add it to the list of known brokers
399        if broker_ref._index == UNKNOWN_BROKER_INDEX {
400            broker_ref._index = self.brokers.len() as u32;
401            self.brokers.push(Broker {
402                node_id: gc.broker_id,
403                host: group_host,
404            });
405        }
406        if let Some(br) = self.group_coordinators.get_mut(group) {
407            if br._index != broker_ref._index {
408                br._index = broker_ref._index;
409            }
410        }
411        self.group_coordinators.insert(group.to_owned(), broker_ref);
412        &self.brokers[broker_ref.index()].host
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use super::ClientState;
419    use crate::protocol;
420    use crate::protocol::metadata as md;
421
422    fn new_partition(id: i32, leader: i32) -> md::PartitionMetadata {
423        md::PartitionMetadata {
424            error: 0,
425            id,
426            leader,
427            replicas: vec![],
428            isr: vec![],
429        }
430    }
431
432    /// Utility to sort the given vector and return it.
433    fn sorted<O: Ord>(mut xs: Vec<O>) -> Vec<O> {
434        xs.sort();
435        xs
436    }
437
438    // mock data for an initial kafka metadata response
439    fn metadata_response_initial() -> protocol::MetadataResponse {
440        protocol::MetadataResponse {
441            header: protocol::HeaderResponse { correlation: 1 },
442            brokers: vec![
443                md::BrokerMetadata {
444                    node_id: 10,
445                    host: "gin1.dev".to_owned(),
446                    port: 1234,
447                },
448                md::BrokerMetadata {
449                    node_id: 50,
450                    host: "gin2.dev".to_owned(),
451                    port: 9876,
452                },
453                md::BrokerMetadata {
454                    node_id: 30,
455                    host: "gin3.dev".to_owned(),
456                    port: 9092,
457                },
458            ],
459            topics: vec![
460                md::TopicMetadata {
461                    error: 0,
462                    topic: "tee-one".to_owned(),
463                    partitions: vec![
464                        new_partition(0, 50),
465                        new_partition(1, 10),
466                        new_partition(2, 30),
467                        new_partition(3, -1),
468                        new_partition(4, 50),
469                    ],
470                },
471                md::TopicMetadata {
472                    error: 0,
473                    topic: "tee-two".to_owned(),
474                    partitions: vec![
475                        new_partition(0, 30),
476                        new_partition(1, -1),
477                        new_partition(2, -1),
478                        new_partition(3, 10),
479                    ],
480                },
481                md::TopicMetadata {
482                    error: 0,
483                    topic: "tee-three".to_owned(),
484                    partitions: vec![],
485                },
486            ],
487        }
488    }
489
490    fn assert_partitions(
491        state: &ClientState,
492        topic: &str,
493        expected: &[(i32, Option<(i32, &str)>)],
494    ) {
495        let partitions = state.partitions_for(topic).unwrap();
496        assert_eq!(expected.len(), partitions.len());
497        assert_eq!(expected.is_empty(), partitions.is_empty());
498        assert_eq!(
499            expected,
500            &partitions
501                .iter()
502                .map(|(id, tp)| {
503                    let broker = tp.broker(state).map(|b| (b.id(), b.host()));
504                    // ~ verify that find_broker delivers the same information
505                    assert_eq!(broker.map(|b| b.1), state.find_broker(topic, id));
506                    (id, broker)
507                })
508                .collect::<Vec<_>>()[..]
509        );
510    }
511
512    fn assert_initial_metadata_load(state: &ClientState) {
513        assert_eq!(
514            vec!["tee-one", "tee-three", "tee-two"],
515            sorted(state.topic_names().collect::<Vec<_>>())
516        );
517        assert_eq!(3, state.num_topics());
518
519        assert_eq!(true, state.contains_topic("tee-one"));
520        assert!(state.partitions_for("tee-one").is_some());
521
522        assert_eq!(true, state.contains_topic("tee-two"));
523        assert!(state.partitions_for("tee-two").is_some());
524
525        assert_eq!(true, state.contains_topic("tee-three"));
526        assert!(state.partitions_for("tee-three").is_some());
527
528        assert_eq!(false, state.contains_topic("foobar"));
529        assert!(state.partitions_for("foobar").is_none());
530
531        assert_partitions(
532            state,
533            "tee-one",
534            &[
535                (0, Some((50, "gin2.dev:9876"))),
536                (1, Some((10, "gin1.dev:1234"))),
537                (2, Some((30, "gin3.dev:9092"))),
538                (3, None),
539                (4, Some((50, "gin2.dev:9876"))),
540            ],
541        );
542        assert_partitions(
543            state,
544            "tee-two",
545            &[
546                (0, Some((30, "gin3.dev:9092"))),
547                (1, None),
548                (2, None),
549                (3, Some((10, "gin1.dev:1234"))),
550            ],
551        );
552        assert_partitions(state, "tee-three", &[]);
553    }
554
555    fn metadata_response_update() -> protocol::MetadataResponse {
556        protocol::MetadataResponse {
557            header: protocol::HeaderResponse { correlation: 2 },
558            brokers: vec![
559                md::BrokerMetadata {
560                    node_id: 10,
561                    host: "gin1.dev".to_owned(),
562                    port: 1234,
563                },
564                // note: compared to the initial metadata
565                // response this broker moved to a different
566                // machine
567                md::BrokerMetadata {
568                    node_id: 50,
569                    host: "aladin1.dev".to_owned(),
570                    port: 9091,
571                },
572                md::BrokerMetadata {
573                    node_id: 30,
574                    host: "gin3.dev".to_owned(),
575                    port: 9092,
576                },
577            ],
578            // metadata for topic "tee-two" only
579            topics: vec![md::TopicMetadata {
580                error: 0,
581                topic: "tee-two".to_owned(),
582                partitions: vec![
583                    new_partition(0, 10),
584                    new_partition(1, 10),
585                    new_partition(2, 50),
586                    new_partition(3, -1),
587                    new_partition(4, 30),
588                ],
589            }],
590        }
591    }
592
593    fn assert_updated_metadata_load(state: &ClientState) {
594        assert_eq!(
595            vec!["tee-one", "tee-three", "tee-two"],
596            sorted(state.topic_names().collect::<Vec<_>>())
597        );
598        assert_eq!(3, state.num_topics());
599
600        assert_eq!(true, state.contains_topic("tee-one"));
601        assert!(state.partitions_for("tee-one").is_some());
602
603        assert_eq!(true, state.contains_topic("tee-two"));
604        assert!(state.partitions_for("tee-two").is_some());
605
606        assert_eq!(true, state.contains_topic("tee-three"));
607        assert!(state.partitions_for("tee-three").is_some());
608
609        assert_eq!(false, state.contains_topic("foobar"));
610        assert!(state.partitions_for("foobar").is_none());
611
612        assert_partitions(
613            state,
614            "tee-one",
615            &[
616                (0, Some((50, "aladin1.dev:9091"))),
617                (1, Some((10, "gin1.dev:1234"))),
618                (2, Some((30, "gin3.dev:9092"))),
619                (3, None),
620                (4, Some((50, "aladin1.dev:9091"))),
621            ],
622        );
623        assert_partitions(
624            state,
625            "tee-two",
626            &[
627                (0, Some((10, "gin1.dev:1234"))),
628                (1, Some((10, "gin1.dev:1234"))),
629                (2, Some((50, "aladin1.dev:9091"))),
630                (3, None),
631                (4, Some((30, "gin3.dev:9092"))),
632            ],
633        );
634        assert_partitions(state, "tee-three", &[]);
635    }
636
637    #[test]
638    fn test_loading_metadata() {
639        let mut state = ClientState::new();
640        // Test loading metadata into a new, empty client state.
641        state.update_metadata(metadata_response_initial()).unwrap();
642        assert_initial_metadata_load(&state);
643
644        // Test loading a metadata update into a client state with
645        // already some initial metadata loaded.
646        state.update_metadata(metadata_response_update()).unwrap();
647        assert_updated_metadata_load(&state);
648    }
649}