kafka/protocol/
metadata.rs

1use std::io::{Read, Write};
2
3use crate::codecs::{AsStrings, FromByte, ToByte};
4use crate::error::Result;
5
6use super::{HeaderRequest, HeaderResponse};
7use super::{API_KEY_METADATA, API_VERSION};
8
9#[derive(Debug)]
10pub struct MetadataRequest<'a, T> {
11    pub header: HeaderRequest<'a>,
12    pub topics: &'a [T],
13}
14
15impl<'a, T: AsRef<str>> MetadataRequest<'a, T> {
16    pub fn new(correlation_id: i32, client_id: &'a str, topics: &'a [T]) -> MetadataRequest<'a, T> {
17        MetadataRequest {
18            header: HeaderRequest::new(API_KEY_METADATA, API_VERSION, correlation_id, client_id),
19            topics,
20        }
21    }
22}
23
24impl<'a, T: AsRef<str> + 'a> ToByte for MetadataRequest<'a, T> {
25    fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> {
26        try_multi!(
27            self.header.encode(buffer),
28            AsStrings(self.topics).encode(buffer)
29        )
30    }
31}
32
33// --------------------------------------------------------------------
34
35#[derive(Default, Debug)]
36pub struct MetadataResponse {
37    pub header: HeaderResponse,
38    pub brokers: Vec<BrokerMetadata>,
39    pub topics: Vec<TopicMetadata>,
40}
41
42#[derive(Default, Debug)]
43pub struct BrokerMetadata {
44    pub node_id: i32,
45    pub host: String,
46    pub port: i32,
47}
48
49#[derive(Default, Debug)]
50pub struct TopicMetadata {
51    pub error: i16,
52    pub topic: String,
53    pub partitions: Vec<PartitionMetadata>,
54}
55
56#[derive(Default, Debug)]
57pub struct PartitionMetadata {
58    pub error: i16,
59    pub id: i32,
60    pub leader: i32,
61    pub replicas: Vec<i32>,
62    pub isr: Vec<i32>,
63}
64
65impl FromByte for MetadataResponse {
66    type R = MetadataResponse;
67
68    #[allow(unused_must_use)]
69    fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
70        try_multi!(
71            self.header.decode(buffer),
72            self.brokers.decode(buffer),
73            self.topics.decode(buffer)
74        )
75    }
76}
77
78impl FromByte for BrokerMetadata {
79    type R = BrokerMetadata;
80
81    #[allow(unused_must_use)]
82    fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
83        try_multi!(
84            self.node_id.decode(buffer),
85            self.host.decode(buffer),
86            self.port.decode(buffer)
87        )
88    }
89}
90
91impl FromByte for TopicMetadata {
92    type R = TopicMetadata;
93
94    #[allow(unused_must_use)]
95    fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
96        try_multi!(
97            self.error.decode(buffer),
98            self.topic.decode(buffer),
99            self.partitions.decode(buffer)
100        )
101    }
102}
103
104impl FromByte for PartitionMetadata {
105    type R = PartitionMetadata;
106
107    #[allow(unused_must_use)]
108    fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
109        try_multi!(
110            self.error.decode(buffer),
111            self.id.decode(buffer),
112            self.leader.decode(buffer),
113            self.replicas.decode(buffer),
114            self.isr.decode(buffer)
115        )
116    }
117}