1use std::collections::hash_map;
5use std::fmt;
6
7use super::state::{ClientState, TopicPartition, TopicPartitionIter, TopicPartitions};
8use super::KafkaClient;
9
10pub use super::state::Broker;
12pub use super::state::TopicNames;
13
14pub struct Topics<'a> {
16 state: &'a ClientState,
17}
18
19impl<'a> Topics<'a> {
20 #[inline]
23 pub fn new(client: &KafkaClient) -> Topics<'_> {
24 Topics {
25 state: &client.state,
26 }
27 }
28
29 #[inline]
31 pub fn len(&self) -> usize {
32 self.state.num_topics()
33 }
34
35 pub fn is_empty(&self) -> bool {
36 self.len() == 0
37 }
38
39 #[inline]
41 pub fn iter(&'a self) -> TopicIter<'a> {
42 TopicIter::new(self.state)
43 }
44
45 #[inline]
48 pub fn names(&'a self) -> TopicNames<'a> {
49 self.state.topic_names()
50 }
51
52 #[inline]
55 pub fn contains(&'a self, topic: &str) -> bool {
56 self.state.contains_topic(topic)
57 }
58
59 #[inline]
61 pub fn partitions(&'a self, topic: &str) -> Option<Partitions<'a>> {
62 self.state.partitions_for(topic).map(|tp| Partitions {
63 state: self.state,
64 tp,
65 })
66 }
67}
68
69impl<'a> fmt::Debug for Topics<'a> {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71 write!(f, "Topics {{ topics: [")?;
72 let mut ts = self.iter();
73 if let Some(t) = ts.next() {
74 write!(f, "{:?}", t)?;
75 }
76 for t in ts {
77 write!(f, ", {:?}", t)?;
78 }
79 write!(f, "] }}")
80 }
81}
82
83impl<'a> IntoIterator for &'a Topics<'a> {
84 type Item = Topic<'a>;
85 type IntoIter = TopicIter<'a>;
86
87 fn into_iter(self) -> Self::IntoIter {
88 self.iter()
89 }
90}
91
92impl<'a> IntoIterator for Topics<'a> {
93 type Item = Topic<'a>;
94 type IntoIter = TopicIter<'a>;
95
96 fn into_iter(self) -> Self::IntoIter {
97 TopicIter::new(self.state)
98 }
99}
100
101pub struct TopicIter<'a> {
103 state: &'a ClientState,
104 iter: hash_map::Iter<'a, String, TopicPartitions>,
105}
106
107impl<'a> TopicIter<'a> {
108 fn new(state: &'a ClientState) -> TopicIter<'a> {
109 TopicIter {
110 state,
111 iter: state.topic_partitions().iter(),
112 }
113 }
114}
115
116impl<'a> Iterator for TopicIter<'a> {
117 type Item = Topic<'a>;
118
119 #[inline]
120 fn next(&mut self) -> Option<Self::Item> {
121 self.iter.next().map(|(name, tps)| Topic {
122 state: self.state,
123 name: &name[..],
124 tp: tps,
125 })
126 }
127}
128
129pub struct Topic<'a> {
131 state: &'a ClientState,
132 name: &'a str,
133 tp: &'a TopicPartitions,
134}
135
136impl<'a> Topic<'a> {
137 #[inline]
139 pub fn name(&self) -> &str {
140 self.name
141 }
142
143 #[inline]
145 pub fn partitions(&self) -> Partitions<'a> {
146 Partitions {
147 state: self.state,
148 tp: self.tp,
149 }
150 }
151}
152
153impl<'a> fmt::Debug for Topic<'a> {
154 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155 write!(
156 f,
157 "Topic {{ name: {}, partitions: {:?} }}",
158 self.name,
159 self.partitions()
160 )
161 }
162}
163
164pub struct Partitions<'a> {
166 state: &'a ClientState,
167 tp: &'a TopicPartitions,
168}
169
170impl<'a> Partitions<'a> {
171 #[inline]
173 pub fn len(&self) -> usize {
174 self.tp.len()
175 }
176
177 #[inline]
179 pub fn is_empty(&self) -> bool {
180 self.tp.is_empty()
181 }
182
183 #[inline]
185 pub fn iter(&self) -> PartitionIter<'a> {
186 PartitionIter::new(self.state, self.tp)
187 }
188
189 #[inline]
191 pub fn partition(&self, partition_id: i32) -> Option<Partition<'a>> {
192 self.tp
193 .partition(partition_id)
194 .map(|p| Partition::new(self.state, p, partition_id))
195 }
196
197 #[inline]
201 pub fn available_ids(&self) -> Vec<i32> {
202 self.tp
203 .iter()
204 .filter_map(|(id, p)| p.broker(self.state).map(|_| id))
205 .collect()
206 }
207}
208
209impl<'a> fmt::Debug for Partitions<'a> {
210 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
211 write!(f, "Partitions {{ [")?;
212 let mut ps = self.iter();
213 if let Some(p) = ps.next() {
214 write!(f, "{:?}", p)?;
215 }
216 for p in ps {
217 write!(f, ", {:?}", p)?;
218 }
219 write!(f, "] }}")
220 }
221}
222
223impl<'a> IntoIterator for &'a Partitions<'a> {
224 type Item = Partition<'a>;
225 type IntoIter = PartitionIter<'a>;
226
227 fn into_iter(self) -> Self::IntoIter {
228 self.iter()
229 }
230}
231
232impl<'a> IntoIterator for Partitions<'a> {
233 type Item = Partition<'a>;
234 type IntoIter = PartitionIter<'a>;
235
236 fn into_iter(self) -> Self::IntoIter {
237 PartitionIter::new(self.state, self.tp)
238 }
239}
240
241pub struct PartitionIter<'a> {
243 state: &'a ClientState,
244 iter: TopicPartitionIter<'a>,
245}
246
247impl<'a> PartitionIter<'a> {
248 fn new(state: &'a ClientState, tp: &'a TopicPartitions) -> Self {
249 PartitionIter {
250 state,
251 iter: tp.iter(),
252 }
253 }
254}
255
256impl<'a> Iterator for PartitionIter<'a> {
257 type Item = Partition<'a>;
258
259 #[inline]
260 fn next(&mut self) -> Option<Self::Item> {
261 self.iter
262 .next()
263 .map(|(id, p)| Partition::new(self.state, p, id))
264 }
265}
266
267pub struct Partition<'a> {
277 state: &'a ClientState,
278 partition: &'a TopicPartition,
279 id: i32,
280}
281
282impl<'a> Partition<'a> {
283 fn new(state: &'a ClientState, partition: &'a TopicPartition, id: i32) -> Partition<'a> {
284 Partition {
285 state,
286 partition,
287 id,
288 }
289 }
290
291 #[inline]
293 pub fn id(&self) -> i32 {
294 self.id
295 }
296
297 #[inline]
300 pub fn leader(&self) -> Option<&'a Broker> {
301 self.partition.broker(self.state)
302 }
303
304 pub fn is_available(&self) -> bool {
307 self.leader().is_some()
308 }
309}
310
311impl<'a> fmt::Debug for Partition<'a> {
312 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
313 write!(
314 f,
315 "Partition {{ id: {}, leader: {:?} }}",
316 self.id(),
317 self.leader()
318 )
319 }
320}