1use std::collections::hash_map::{Entry, HashMap, Keys};
2use std::convert::AsRef;
3use std::slice;
4use std::u32;
5
6use crate::error::Result;
7use crate::protocol;
8
9#[derive(Debug)]
10pub struct ClientState {
11 correlation: i32,
14
15 brokers: Vec<Broker>,
23
24 topic_partitions: HashMap<String, TopicPartitions>,
26
27 group_coordinators: HashMap<String, BrokerRef>,
29}
30
31#[derive(Debug)]
37pub struct Broker {
38 node_id: i32,
41 host: String,
44}
45
46impl Broker {
47 #[inline]
50 pub fn id(&self) -> i32 {
51 self.node_id
52 }
53
54 #[inline]
56 pub fn host(&self) -> &str {
57 &self.host
58 }
59}
60
61const UNKNOWN_BROKER_INDEX: u32 = u32::MAX;
63
64#[derive(Debug, Copy, Clone)]
74pub struct BrokerRef {
75 _index: u32,
76}
77
78impl BrokerRef {
79 fn new(index: u32) -> Self {
81 BrokerRef { _index: index }
82 }
83
84 fn index(&self) -> usize {
85 self._index as usize
86 }
87
88 fn set(&mut self, other: BrokerRef) {
89 if self._index != other._index {
90 self._index = other._index;
91 }
92 }
93
94 fn set_unknown(&mut self) {
95 self.set(BrokerRef::new(UNKNOWN_BROKER_INDEX))
96 }
97}
98
99#[derive(Debug)]
103pub struct TopicPartitions {
104 partitions: Vec<TopicPartition>,
110}
111
112impl TopicPartitions {
113 fn new_with_partitions(n: usize) -> TopicPartitions {
115 TopicPartitions {
116 partitions: (0..n).map(|_| TopicPartition::new()).collect(),
117 }
118 }
119
120 pub fn len(&self) -> usize {
121 self.partitions.len()
122 }
123
124 pub fn is_empty(&self) -> bool {
125 self.partitions.is_empty()
126 }
127
128 pub fn partition(&self, partition_id: i32) -> Option<&TopicPartition> {
129 self.partitions.get(partition_id as usize)
130 }
131
132 pub fn iter(&self) -> TopicPartitionIter<'_> {
133 self.into_iter()
134 }
135}
136
137impl<'a> IntoIterator for &'a TopicPartitions {
138 type Item = (i32, &'a TopicPartition);
139 type IntoIter = TopicPartitionIter<'a>;
140
141 fn into_iter(self) -> Self::IntoIter {
142 TopicPartitionIter {
143 partition_id: 0,
144 iter: self.partitions.iter(),
145 }
146 }
147}
148
149#[derive(Debug)]
151pub struct TopicPartition {
152 broker: BrokerRef,
153}
154
155impl TopicPartition {
156 fn new() -> TopicPartition {
157 TopicPartition {
158 broker: BrokerRef::new(UNKNOWN_BROKER_INDEX),
159 }
160 }
161
162 pub fn broker<'a>(&self, state: &'a ClientState) -> Option<&'a Broker> {
163 state.brokers.get(self.broker.index())
164 }
165}
166
167pub struct TopicPartitionIter<'a> {
169 iter: slice::Iter<'a, TopicPartition>,
170 partition_id: i32,
171}
172
173impl<'a> Iterator for TopicPartitionIter<'a> {
174 type Item = (i32, &'a TopicPartition);
175 fn next(&mut self) -> Option<Self::Item> {
176 self.iter.next().map(|tp| {
177 let partition_id = self.partition_id;
178 self.partition_id += 1;
179 (partition_id, tp)
180 })
181 }
182}
183
184pub struct TopicNames<'a> {
190 iter: Keys<'a, String, TopicPartitions>,
191}
192
193impl<'a> Iterator for TopicNames<'a> {
194 type Item = &'a str;
195
196 #[inline]
197 fn next(&mut self) -> Option<Self::Item> {
198 self.iter.next().map(AsRef::as_ref)
199 }
200}
201
202impl Default for ClientState {
203 fn default() -> Self {
204 Self::new()
205 }
206}
207
208impl ClientState {
209 pub fn new() -> Self {
210 ClientState {
211 correlation: 0,
212 brokers: Vec::new(),
213 topic_partitions: HashMap::new(),
214 group_coordinators: HashMap::new(),
215 }
216 }
217
218 pub fn num_topics(&self) -> usize {
219 self.topic_partitions.len()
220 }
221
222 pub fn contains_topic(&self, topic: &str) -> bool {
223 self.topic_partitions.contains_key(topic)
224 }
225
226 pub fn contains_topic_partition(&self, topic: &str, partition_id: i32) -> bool {
227 self.topic_partitions
228 .get(topic)
229 .map(|tp| tp.partition(partition_id))
230 .is_some()
231 }
232
233 pub fn topic_names(&self) -> TopicNames<'_> {
234 TopicNames {
235 iter: self.topic_partitions.keys(),
236 }
237 }
238
239 pub fn topic_partitions(&self) -> &HashMap<String, TopicPartitions> {
241 &self.topic_partitions
242 }
243
244 pub fn partitions_for<'a>(&'a self, topic: &str) -> Option<&'a TopicPartitions> {
245 self.topic_partitions.get(topic)
246 }
247
248 pub fn next_correlation_id(&mut self) -> i32 {
249 self.correlation = (self.correlation + 1) % (1i32 << 30);
250 self.correlation
251 }
252
253 pub fn find_broker<'a>(&'a self, topic: &str, partition_id: i32) -> Option<&'a str> {
254 self.topic_partitions
255 .get(topic)
256 .and_then(|tp| tp.partition(partition_id))
257 .and_then(|p| p.broker(self))
258 .map(|b| &b.host[..])
259 }
260
261 pub fn clear_metadata(&mut self) {
263 self.topic_partitions.clear();
266 self.brokers.clear();
267 }
268
269 pub fn update_metadata(&mut self, md: protocol::MetadataResponse) -> Result<()> {
272 debug!("updating metadata from: {:?}", md);
273
274 let brokers = self.update_brokers(&md);
277
278 for t in md.topics {
280 let tps = match self.topic_partitions.entry(t.topic) {
283 Entry::Occupied(e) => {
284 let ps = &mut e.into_mut().partitions;
285 match (ps.len(), t.partitions.len()) {
286 (n, m) if n > m => ps.truncate(m),
287 (n, m) if n < m => {
288 ps.reserve(m);
289 for _ in 0..(m - n) {
290 ps.push(TopicPartition::new());
291 }
292 }
293 _ => {}
294 }
295 ps
296 }
297 Entry::Vacant(e) => {
298 &mut e
299 .insert(TopicPartitions::new_with_partitions(t.partitions.len()))
300 .partitions
301 }
302 };
303 for partition in t.partitions {
305 let tp = &mut tps[partition.id as usize];
306 if let Some(bref) = brokers.get(&partition.leader) {
307 tp.broker.set(*bref)
308 } else {
309 tp.broker.set_unknown()
310 }
311 }
312 }
313 Ok(())
314 }
315
316 fn update_brokers(&mut self, md: &protocol::MetadataResponse) -> HashMap<i32, BrokerRef> {
319 let mut brokers = HashMap::with_capacity(self.brokers.len() + md.brokers.len());
321 for (i, broker) in (0u32..).zip(self.brokers.iter()) {
322 brokers.insert(broker.node_id, BrokerRef::new(i));
323 }
324
325 for broker in &md.brokers {
328 let broker_host = format!("{}:{}", broker.host, broker.port);
329 match brokers.entry(broker.node_id) {
330 Entry::Occupied(e) => {
331 let bref = *e.get();
334 let b = &mut self.brokers[bref.index()];
335 if b.host != broker_host {
336 b.host = broker_host;
337 }
338 }
339 Entry::Vacant(e) => {
340 let new_index = self.brokers.len();
342 self.brokers.push(Broker {
343 node_id: broker.node_id,
344 host: broker_host,
345 });
346 e.insert(BrokerRef::new(new_index as u32));
348 }
349 }
350 }
351 brokers
352 }
353
354 pub fn group_coordinator<'a>(&'a self, group: &str) -> Option<&'a str> {
357 self.group_coordinators
358 .get(group)
359 .and_then(|b| self.brokers.get(b.index()))
360 .map(|b| &b.host[..])
361 }
362
363 pub fn remove_group_coordinator(&mut self, group: &str) {
366 self.group_coordinators.remove(group);
367 }
368
369 pub fn set_group_coordinator<'a>(
373 &'a mut self,
374 group: &str,
375 gc: &protocol::GroupCoordinatorResponse,
376 ) -> &'a str {
377 debug!(
378 "set_group_coordinator: registering coordinator for '{}': {:?}",
379 group, gc
380 );
381
382 let group_host = format!("{}:{}", gc.host, gc.port);
383 let mut broker_ref = BrokerRef::new(UNKNOWN_BROKER_INDEX);
385 for (i, broker) in (0u32..).zip(self.brokers.iter()) {
386 if gc.broker_id == broker.node_id {
387 if group_host != broker.host {
388 warn!(
389 "set_group_coordinator: coord_host({}) != broker_host({}) for \
390 broker_id({})!",
391 group_host, broker.host, broker.node_id
392 );
393 }
394 broker_ref._index = i;
395 break;
396 }
397 }
398 if broker_ref._index == UNKNOWN_BROKER_INDEX {
400 broker_ref._index = self.brokers.len() as u32;
401 self.brokers.push(Broker {
402 node_id: gc.broker_id,
403 host: group_host,
404 });
405 }
406 if let Some(br) = self.group_coordinators.get_mut(group) {
407 if br._index != broker_ref._index {
408 br._index = broker_ref._index;
409 }
410 }
411 self.group_coordinators.insert(group.to_owned(), broker_ref);
412 &self.brokers[broker_ref.index()].host
413 }
414}
415
416#[cfg(test)]
417mod tests {
418 use super::ClientState;
419 use crate::protocol;
420 use crate::protocol::metadata as md;
421
422 fn new_partition(id: i32, leader: i32) -> md::PartitionMetadata {
423 md::PartitionMetadata {
424 error: 0,
425 id,
426 leader,
427 replicas: vec![],
428 isr: vec![],
429 }
430 }
431
432 fn sorted<O: Ord>(mut xs: Vec<O>) -> Vec<O> {
434 xs.sort();
435 xs
436 }
437
438 fn metadata_response_initial() -> protocol::MetadataResponse {
440 protocol::MetadataResponse {
441 header: protocol::HeaderResponse { correlation: 1 },
442 brokers: vec![
443 md::BrokerMetadata {
444 node_id: 10,
445 host: "gin1.dev".to_owned(),
446 port: 1234,
447 },
448 md::BrokerMetadata {
449 node_id: 50,
450 host: "gin2.dev".to_owned(),
451 port: 9876,
452 },
453 md::BrokerMetadata {
454 node_id: 30,
455 host: "gin3.dev".to_owned(),
456 port: 9092,
457 },
458 ],
459 topics: vec![
460 md::TopicMetadata {
461 error: 0,
462 topic: "tee-one".to_owned(),
463 partitions: vec![
464 new_partition(0, 50),
465 new_partition(1, 10),
466 new_partition(2, 30),
467 new_partition(3, -1),
468 new_partition(4, 50),
469 ],
470 },
471 md::TopicMetadata {
472 error: 0,
473 topic: "tee-two".to_owned(),
474 partitions: vec![
475 new_partition(0, 30),
476 new_partition(1, -1),
477 new_partition(2, -1),
478 new_partition(3, 10),
479 ],
480 },
481 md::TopicMetadata {
482 error: 0,
483 topic: "tee-three".to_owned(),
484 partitions: vec![],
485 },
486 ],
487 }
488 }
489
490 fn assert_partitions(
491 state: &ClientState,
492 topic: &str,
493 expected: &[(i32, Option<(i32, &str)>)],
494 ) {
495 let partitions = state.partitions_for(topic).unwrap();
496 assert_eq!(expected.len(), partitions.len());
497 assert_eq!(expected.is_empty(), partitions.is_empty());
498 assert_eq!(
499 expected,
500 &partitions
501 .iter()
502 .map(|(id, tp)| {
503 let broker = tp.broker(state).map(|b| (b.id(), b.host()));
504 assert_eq!(broker.map(|b| b.1), state.find_broker(topic, id));
506 (id, broker)
507 })
508 .collect::<Vec<_>>()[..]
509 );
510 }
511
512 fn assert_initial_metadata_load(state: &ClientState) {
513 assert_eq!(
514 vec!["tee-one", "tee-three", "tee-two"],
515 sorted(state.topic_names().collect::<Vec<_>>())
516 );
517 assert_eq!(3, state.num_topics());
518
519 assert_eq!(true, state.contains_topic("tee-one"));
520 assert!(state.partitions_for("tee-one").is_some());
521
522 assert_eq!(true, state.contains_topic("tee-two"));
523 assert!(state.partitions_for("tee-two").is_some());
524
525 assert_eq!(true, state.contains_topic("tee-three"));
526 assert!(state.partitions_for("tee-three").is_some());
527
528 assert_eq!(false, state.contains_topic("foobar"));
529 assert!(state.partitions_for("foobar").is_none());
530
531 assert_partitions(
532 state,
533 "tee-one",
534 &[
535 (0, Some((50, "gin2.dev:9876"))),
536 (1, Some((10, "gin1.dev:1234"))),
537 (2, Some((30, "gin3.dev:9092"))),
538 (3, None),
539 (4, Some((50, "gin2.dev:9876"))),
540 ],
541 );
542 assert_partitions(
543 state,
544 "tee-two",
545 &[
546 (0, Some((30, "gin3.dev:9092"))),
547 (1, None),
548 (2, None),
549 (3, Some((10, "gin1.dev:1234"))),
550 ],
551 );
552 assert_partitions(state, "tee-three", &[]);
553 }
554
555 fn metadata_response_update() -> protocol::MetadataResponse {
556 protocol::MetadataResponse {
557 header: protocol::HeaderResponse { correlation: 2 },
558 brokers: vec![
559 md::BrokerMetadata {
560 node_id: 10,
561 host: "gin1.dev".to_owned(),
562 port: 1234,
563 },
564 md::BrokerMetadata {
568 node_id: 50,
569 host: "aladin1.dev".to_owned(),
570 port: 9091,
571 },
572 md::BrokerMetadata {
573 node_id: 30,
574 host: "gin3.dev".to_owned(),
575 port: 9092,
576 },
577 ],
578 topics: vec![md::TopicMetadata {
580 error: 0,
581 topic: "tee-two".to_owned(),
582 partitions: vec![
583 new_partition(0, 10),
584 new_partition(1, 10),
585 new_partition(2, 50),
586 new_partition(3, -1),
587 new_partition(4, 30),
588 ],
589 }],
590 }
591 }
592
593 fn assert_updated_metadata_load(state: &ClientState) {
594 assert_eq!(
595 vec!["tee-one", "tee-three", "tee-two"],
596 sorted(state.topic_names().collect::<Vec<_>>())
597 );
598 assert_eq!(3, state.num_topics());
599
600 assert_eq!(true, state.contains_topic("tee-one"));
601 assert!(state.partitions_for("tee-one").is_some());
602
603 assert_eq!(true, state.contains_topic("tee-two"));
604 assert!(state.partitions_for("tee-two").is_some());
605
606 assert_eq!(true, state.contains_topic("tee-three"));
607 assert!(state.partitions_for("tee-three").is_some());
608
609 assert_eq!(false, state.contains_topic("foobar"));
610 assert!(state.partitions_for("foobar").is_none());
611
612 assert_partitions(
613 state,
614 "tee-one",
615 &[
616 (0, Some((50, "aladin1.dev:9091"))),
617 (1, Some((10, "gin1.dev:1234"))),
618 (2, Some((30, "gin3.dev:9092"))),
619 (3, None),
620 (4, Some((50, "aladin1.dev:9091"))),
621 ],
622 );
623 assert_partitions(
624 state,
625 "tee-two",
626 &[
627 (0, Some((10, "gin1.dev:1234"))),
628 (1, Some((10, "gin1.dev:1234"))),
629 (2, Some((50, "aladin1.dev:9091"))),
630 (3, None),
631 (4, Some((30, "gin3.dev:9092"))),
632 ],
633 );
634 assert_partitions(state, "tee-three", &[]);
635 }
636
637 #[test]
638 fn test_loading_metadata() {
639 let mut state = ClientState::new();
640 state.update_metadata(metadata_response_initial()).unwrap();
642 assert_initial_metadata_load(&state);
643
644 state.update_metadata(metadata_response_update()).unwrap();
647 assert_updated_metadata_load(&state);
648 }
649}