kafka/protocol/
metadata.rs1use std::io::{Read, Write};
2
3use crate::codecs::{AsStrings, FromByte, ToByte};
4use crate::error::Result;
5
6use super::{HeaderRequest, HeaderResponse};
7use super::{API_KEY_METADATA, API_VERSION};
8
9#[derive(Debug)]
10pub struct MetadataRequest<'a, T> {
11 pub header: HeaderRequest<'a>,
12 pub topics: &'a [T],
13}
14
15impl<'a, T: AsRef<str>> MetadataRequest<'a, T> {
16 pub fn new(correlation_id: i32, client_id: &'a str, topics: &'a [T]) -> MetadataRequest<'a, T> {
17 MetadataRequest {
18 header: HeaderRequest::new(API_KEY_METADATA, API_VERSION, correlation_id, client_id),
19 topics,
20 }
21 }
22}
23
24impl<'a, T: AsRef<str> + 'a> ToByte for MetadataRequest<'a, T> {
25 fn encode<W: Write>(&self, buffer: &mut W) -> Result<()> {
26 try_multi!(
27 self.header.encode(buffer),
28 AsStrings(self.topics).encode(buffer)
29 )
30 }
31}
32
33#[derive(Default, Debug)]
36pub struct MetadataResponse {
37 pub header: HeaderResponse,
38 pub brokers: Vec<BrokerMetadata>,
39 pub topics: Vec<TopicMetadata>,
40}
41
42#[derive(Default, Debug)]
43pub struct BrokerMetadata {
44 pub node_id: i32,
45 pub host: String,
46 pub port: i32,
47}
48
49#[derive(Default, Debug)]
50pub struct TopicMetadata {
51 pub error: i16,
52 pub topic: String,
53 pub partitions: Vec<PartitionMetadata>,
54}
55
56#[derive(Default, Debug)]
57pub struct PartitionMetadata {
58 pub error: i16,
59 pub id: i32,
60 pub leader: i32,
61 pub replicas: Vec<i32>,
62 pub isr: Vec<i32>,
63}
64
65impl FromByte for MetadataResponse {
66 type R = MetadataResponse;
67
68 #[allow(unused_must_use)]
69 fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
70 try_multi!(
71 self.header.decode(buffer),
72 self.brokers.decode(buffer),
73 self.topics.decode(buffer)
74 )
75 }
76}
77
78impl FromByte for BrokerMetadata {
79 type R = BrokerMetadata;
80
81 #[allow(unused_must_use)]
82 fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
83 try_multi!(
84 self.node_id.decode(buffer),
85 self.host.decode(buffer),
86 self.port.decode(buffer)
87 )
88 }
89}
90
91impl FromByte for TopicMetadata {
92 type R = TopicMetadata;
93
94 #[allow(unused_must_use)]
95 fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
96 try_multi!(
97 self.error.decode(buffer),
98 self.topic.decode(buffer),
99 self.partitions.decode(buffer)
100 )
101 }
102}
103
104impl FromByte for PartitionMetadata {
105 type R = PartitionMetadata;
106
107 #[allow(unused_must_use)]
108 fn decode<T: Read>(&mut self, buffer: &mut T) -> Result<()> {
109 try_multi!(
110 self.error.decode(buffer),
111 self.id.decode(buffer),
112 self.leader.decode(buffer),
113 self.replicas.decode(buffer),
114 self.isr.decode(buffer)
115 )
116 }
117}