kafka/client/
metadata.rs

1//! Types related to topic metadata for introspection by clients.
2//! Example: `KafkaClient::topics()`.
3
4use std::collections::hash_map;
5use std::fmt;
6
7use super::state::{ClientState, TopicPartition, TopicPartitionIter, TopicPartitions};
8use super::KafkaClient;
9
10// public re-export
11pub use super::state::Broker;
12pub use super::state::TopicNames;
13
14/// A view on the loaded metadata about topics and their partitions.
15pub struct Topics<'a> {
16    state: &'a ClientState,
17}
18
19impl<'a> Topics<'a> {
20    /// Constructs a view of the currently loaded topic metadata from
21    /// the specified kafka client.
22    #[inline]
23    pub fn new(client: &KafkaClient) -> Topics<'_> {
24        Topics {
25            state: &client.state,
26        }
27    }
28
29    /// Retrieves the number of the underlying topics.
30    #[inline]
31    pub fn len(&self) -> usize {
32        self.state.num_topics()
33    }
34
35    pub fn is_empty(&self) -> bool {
36        self.len() == 0
37    }
38
39    /// Provides an iterator over the underlying topics.
40    #[inline]
41    pub fn iter(&'a self) -> TopicIter<'a> {
42        TopicIter::new(self.state)
43    }
44
45    /// A convenience method to return an iterator over the topics'
46    /// names.
47    #[inline]
48    pub fn names(&'a self) -> TopicNames<'a> {
49        self.state.topic_names()
50    }
51
52    /// A convenience method to determine whether the specified topic
53    /// is known.
54    #[inline]
55    pub fn contains(&'a self, topic: &str) -> bool {
56        self.state.contains_topic(topic)
57    }
58
59    /// Retrieves the partitions of a specified topic.
60    #[inline]
61    pub fn partitions(&'a self, topic: &str) -> Option<Partitions<'a>> {
62        self.state.partitions_for(topic).map(|tp| Partitions {
63            state: self.state,
64            tp,
65        })
66    }
67}
68
69impl<'a> fmt::Debug for Topics<'a> {
70    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71        write!(f, "Topics {{ topics: [")?;
72        let mut ts = self.iter();
73        if let Some(t) = ts.next() {
74            write!(f, "{:?}", t)?;
75        }
76        for t in ts {
77            write!(f, ", {:?}", t)?;
78        }
79        write!(f, "] }}")
80    }
81}
82
83impl<'a> IntoIterator for &'a Topics<'a> {
84    type Item = Topic<'a>;
85    type IntoIter = TopicIter<'a>;
86
87    fn into_iter(self) -> Self::IntoIter {
88        self.iter()
89    }
90}
91
92impl<'a> IntoIterator for Topics<'a> {
93    type Item = Topic<'a>;
94    type IntoIter = TopicIter<'a>;
95
96    fn into_iter(self) -> Self::IntoIter {
97        TopicIter::new(self.state)
98    }
99}
100
101/// An iterator over topics.
102pub struct TopicIter<'a> {
103    state: &'a ClientState,
104    iter: hash_map::Iter<'a, String, TopicPartitions>,
105}
106
107impl<'a> TopicIter<'a> {
108    fn new(state: &'a ClientState) -> TopicIter<'a> {
109        TopicIter {
110            state,
111            iter: state.topic_partitions().iter(),
112        }
113    }
114}
115
116impl<'a> Iterator for TopicIter<'a> {
117    type Item = Topic<'a>;
118
119    #[inline]
120    fn next(&mut self) -> Option<Self::Item> {
121        self.iter.next().map(|(name, tps)| Topic {
122            state: self.state,
123            name: &name[..],
124            tp: tps,
125        })
126    }
127}
128
129/// A view on the loaded metadata for a particular topic.
130pub struct Topic<'a> {
131    state: &'a ClientState,
132    name: &'a str,
133    tp: &'a TopicPartitions,
134}
135
136impl<'a> Topic<'a> {
137    /// Retrieves the name of this topic.
138    #[inline]
139    pub fn name(&self) -> &str {
140        self.name
141    }
142
143    /// Retrieves the list of all partitions for this topic.
144    #[inline]
145    pub fn partitions(&self) -> Partitions<'a> {
146        Partitions {
147            state: self.state,
148            tp: self.tp,
149        }
150    }
151}
152
153impl<'a> fmt::Debug for Topic<'a> {
154    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155        write!(
156            f,
157            "Topic {{ name: {}, partitions: {:?} }}",
158            self.name,
159            self.partitions()
160        )
161    }
162}
163
164/// Metadata relevant to partitions of a particular topic.
165pub struct Partitions<'a> {
166    state: &'a ClientState,
167    tp: &'a TopicPartitions,
168}
169
170impl<'a> Partitions<'a> {
171    /// Retrieves the number of the topic's partitions.
172    #[inline]
173    pub fn len(&self) -> usize {
174        self.tp.len()
175    }
176
177    /// Tests for `.len() > 0`.
178    #[inline]
179    pub fn is_empty(&self) -> bool {
180        self.tp.is_empty()
181    }
182
183    /// Retrieves an iterator of the partitions of the underlying topic.
184    #[inline]
185    pub fn iter(&self) -> PartitionIter<'a> {
186        PartitionIter::new(self.state, self.tp)
187    }
188
189    /// Finds a specified partition identified by its id.
190    #[inline]
191    pub fn partition(&self, partition_id: i32) -> Option<Partition<'a>> {
192        self.tp
193            .partition(partition_id)
194            .map(|p| Partition::new(self.state, p, partition_id))
195    }
196
197    /// Convenience method to retrieve the identifiers of all
198    /// currently "available" partitions.  Such partitions are known
199    /// to have a leader broker and can be sent messages to.
200    #[inline]
201    pub fn available_ids(&self) -> Vec<i32> {
202        self.tp
203            .iter()
204            .filter_map(|(id, p)| p.broker(self.state).map(|_| id))
205            .collect()
206    }
207}
208
209impl<'a> fmt::Debug for Partitions<'a> {
210    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
211        write!(f, "Partitions {{ [")?;
212        let mut ps = self.iter();
213        if let Some(p) = ps.next() {
214            write!(f, "{:?}", p)?;
215        }
216        for p in ps {
217            write!(f, ", {:?}", p)?;
218        }
219        write!(f, "] }}")
220    }
221}
222
223impl<'a> IntoIterator for &'a Partitions<'a> {
224    type Item = Partition<'a>;
225    type IntoIter = PartitionIter<'a>;
226
227    fn into_iter(self) -> Self::IntoIter {
228        self.iter()
229    }
230}
231
232impl<'a> IntoIterator for Partitions<'a> {
233    type Item = Partition<'a>;
234    type IntoIter = PartitionIter<'a>;
235
236    fn into_iter(self) -> Self::IntoIter {
237        PartitionIter::new(self.state, self.tp)
238    }
239}
240
241/// An iterator over a topic's partitions.
242pub struct PartitionIter<'a> {
243    state: &'a ClientState,
244    iter: TopicPartitionIter<'a>,
245}
246
247impl<'a> PartitionIter<'a> {
248    fn new(state: &'a ClientState, tp: &'a TopicPartitions) -> Self {
249        PartitionIter {
250            state,
251            iter: tp.iter(),
252        }
253    }
254}
255
256impl<'a> Iterator for PartitionIter<'a> {
257    type Item = Partition<'a>;
258
259    #[inline]
260    fn next(&mut self) -> Option<Self::Item> {
261        self.iter
262            .next()
263            .map(|(id, p)| Partition::new(self.state, p, id))
264    }
265}
266
267/// Metadata about a particular topic partition.
268///
269/// A partition can be seen as either available or not by
270/// `kafka-rust`.  "Available" partitions are partitions with an
271/// assigned leader broker and can be send messages to or fetched
272/// messages from.  Non-available partitions are ignored by
273/// `kafka-rust`.  Whether or not a partition is currently "available"
274/// can be determined by testing for `partition.leader().is_some()` or
275/// more directly through `partition.is_available()`.
276pub struct Partition<'a> {
277    state: &'a ClientState,
278    partition: &'a TopicPartition,
279    id: i32,
280}
281
282impl<'a> Partition<'a> {
283    fn new(state: &'a ClientState, partition: &'a TopicPartition, id: i32) -> Partition<'a> {
284        Partition {
285            state,
286            partition,
287            id,
288        }
289    }
290
291    /// Retrieves the identifier of this topic partition.
292    #[inline]
293    pub fn id(&self) -> i32 {
294        self.id
295    }
296
297    /// Retrieves the current leader broker of this partition - if
298    /// any.  A partition with a leader is said to be "available".
299    #[inline]
300    pub fn leader(&self) -> Option<&'a Broker> {
301        self.partition.broker(self.state)
302    }
303
304    /// Determines whether this partition is currently "available".
305    /// See `Partition::leader()`.
306    pub fn is_available(&self) -> bool {
307        self.leader().is_some()
308    }
309}
310
311impl<'a> fmt::Debug for Partition<'a> {
312    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
313        write!(
314            f,
315            "Partition {{ id: {}, leader: {:?} }}",
316            self.id(),
317            self.leader()
318        )
319    }
320}