use std::collections::hash_map;
use std::fmt;
use super::state::{ClientState, TopicPartition, TopicPartitionIter, TopicPartitions};
use super::KafkaClient;
pub use super::state::Broker;
pub use super::state::TopicNames;
pub struct Topics<'a> {
state: &'a ClientState,
}
impl<'a> Topics<'a> {
#[inline]
pub fn new(client: &KafkaClient) -> Topics<'_> {
Topics {
state: &client.state,
}
}
#[inline]
pub fn len(&self) -> usize {
self.state.num_topics()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn iter(&'a self) -> TopicIter<'a> {
TopicIter::new(self.state)
}
#[inline]
pub fn names(&'a self) -> TopicNames<'a> {
self.state.topic_names()
}
#[inline]
pub fn contains(&'a self, topic: &str) -> bool {
self.state.contains_topic(topic)
}
#[inline]
pub fn partitions(&'a self, topic: &str) -> Option<Partitions<'a>> {
self.state.partitions_for(topic).map(|tp| Partitions {
state: self.state,
tp,
})
}
}
impl<'a> fmt::Debug for Topics<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Topics {{ topics: [")?;
let mut ts = self.iter();
if let Some(t) = ts.next() {
write!(f, "{:?}", t)?;
}
for t in ts {
write!(f, ", {:?}", t)?;
}
write!(f, "] }}")
}
}
impl<'a> IntoIterator for &'a Topics<'a> {
type Item = Topic<'a>;
type IntoIter = TopicIter<'a>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
impl<'a> IntoIterator for Topics<'a> {
type Item = Topic<'a>;
type IntoIter = TopicIter<'a>;
fn into_iter(self) -> Self::IntoIter {
TopicIter::new(self.state)
}
}
pub struct TopicIter<'a> {
state: &'a ClientState,
iter: hash_map::Iter<'a, String, TopicPartitions>,
}
impl<'a> TopicIter<'a> {
fn new(state: &'a ClientState) -> TopicIter<'a> {
TopicIter {
state,
iter: state.topic_partitions().iter(),
}
}
}
impl<'a> Iterator for TopicIter<'a> {
type Item = Topic<'a>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.iter.next().map(|(name, tps)| Topic {
state: self.state,
name: &name[..],
tp: tps,
})
}
}
pub struct Topic<'a> {
state: &'a ClientState,
name: &'a str,
tp: &'a TopicPartitions,
}
impl<'a> Topic<'a> {
#[inline]
pub fn name(&self) -> &str {
self.name
}
#[inline]
pub fn partitions(&self) -> Partitions<'a> {
Partitions {
state: self.state,
tp: self.tp,
}
}
}
impl<'a> fmt::Debug for Topic<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Topic {{ name: {}, partitions: {:?} }}",
self.name,
self.partitions()
)
}
}
pub struct Partitions<'a> {
state: &'a ClientState,
tp: &'a TopicPartitions,
}
impl<'a> Partitions<'a> {
#[inline]
pub fn len(&self) -> usize {
self.tp.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.tp.is_empty()
}
#[inline]
pub fn iter(&self) -> PartitionIter<'a> {
PartitionIter::new(self.state, self.tp)
}
#[inline]
pub fn partition(&self, partition_id: i32) -> Option<Partition<'a>> {
self.tp
.partition(partition_id)
.map(|p| Partition::new(self.state, p, partition_id))
}
#[inline]
pub fn available_ids(&self) -> Vec<i32> {
self.tp
.iter()
.filter_map(|(id, p)| p.broker(self.state).map(|_| id))
.collect()
}
}
impl<'a> fmt::Debug for Partitions<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Partitions {{ [")?;
let mut ps = self.iter();
if let Some(p) = ps.next() {
write!(f, "{:?}", p)?;
}
for p in ps {
write!(f, ", {:?}", p)?;
}
write!(f, "] }}")
}
}
impl<'a> IntoIterator for &'a Partitions<'a> {
type Item = Partition<'a>;
type IntoIter = PartitionIter<'a>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
impl<'a> IntoIterator for Partitions<'a> {
type Item = Partition<'a>;
type IntoIter = PartitionIter<'a>;
fn into_iter(self) -> Self::IntoIter {
PartitionIter::new(self.state, self.tp)
}
}
pub struct PartitionIter<'a> {
state: &'a ClientState,
iter: TopicPartitionIter<'a>,
}
impl<'a> PartitionIter<'a> {
fn new(state: &'a ClientState, tp: &'a TopicPartitions) -> Self {
PartitionIter {
state,
iter: tp.iter(),
}
}
}
impl<'a> Iterator for PartitionIter<'a> {
type Item = Partition<'a>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.iter
.next()
.map(|(id, p)| Partition::new(self.state, p, id))
}
}
pub struct Partition<'a> {
state: &'a ClientState,
partition: &'a TopicPartition,
id: i32,
}
impl<'a> Partition<'a> {
fn new(state: &'a ClientState, partition: &'a TopicPartition, id: i32) -> Partition<'a> {
Partition {
state,
partition,
id,
}
}
#[inline]
pub fn id(&self) -> i32 {
self.id
}
#[inline]
pub fn leader(&self) -> Option<&'a Broker> {
self.partition.broker(self.state)
}
pub fn is_available(&self) -> bool {
self.leader().is_some()
}
}
impl<'a> fmt::Debug for Partition<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Partition {{ id: {}, leader: {:?} }}",
self.id(),
self.leader()
)
}
}