kafka/consumer/
builder.rs1use std::collections::HashMap;
2use std::time::Duration;
3
4use crate::client::{self, FetchOffset, GroupOffsetStorage, KafkaClient};
5use crate::error::{Error, Result};
6
7use super::assignment;
8use super::config::Config;
9use super::state::State;
10use super::{Consumer, DEFAULT_FALLBACK_OFFSET, DEFAULT_RETRY_MAX_BYTES_LIMIT};
11
12#[cfg(feature = "security")]
13use crate::client::SecurityConfig;
14
15#[cfg(not(feature = "security"))]
16type SecurityConfig = ();
17
18#[derive(Debug)]
21pub struct Builder {
22 client: Option<KafkaClient>,
23 hosts: Vec<String>,
24 group: String,
25 assignments: HashMap<String, Vec<i32>>,
26 fallback_offset: FetchOffset,
27 fetch_max_wait_time: Duration,
28 fetch_min_bytes: i32,
29 fetch_max_bytes_per_partition: i32,
30 retry_max_bytes_limit: i32,
31 fetch_crc_validation: bool,
32 security_config: Option<SecurityConfig>,
33 group_offset_storage: Option<GroupOffsetStorage>,
34 conn_idle_timeout: Duration,
35 client_id: Option<String>,
36}
37
38pub fn new(client: Option<KafkaClient>, hosts: Vec<String>) -> Builder {
41 let mut b = Builder {
42 client,
43 hosts,
44 fetch_max_wait_time: Duration::from_millis(client::DEFAULT_FETCH_MAX_WAIT_TIME_MILLIS),
45 fetch_min_bytes: client::DEFAULT_FETCH_MIN_BYTES,
46 fetch_max_bytes_per_partition: client::DEFAULT_FETCH_MAX_BYTES_PER_PARTITION,
47 fetch_crc_validation: client::DEFAULT_FETCH_CRC_VALIDATION,
48 retry_max_bytes_limit: DEFAULT_RETRY_MAX_BYTES_LIMIT,
49 group: String::new(),
50 assignments: HashMap::new(),
51 fallback_offset: DEFAULT_FALLBACK_OFFSET,
52 security_config: None,
53 group_offset_storage: client::DEFAULT_GROUP_OFFSET_STORAGE,
54 conn_idle_timeout: Duration::from_millis(client::DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS),
55 client_id: None,
56 };
57 if let Some(ref c) = b.client {
58 b.fetch_max_wait_time = c.fetch_max_wait_time();
59 b.fetch_min_bytes = c.fetch_min_bytes();
60 b.fetch_max_bytes_per_partition = c.fetch_max_bytes_per_partition();
61 b.fetch_crc_validation = c.fetch_crc_validation();
62 b.group_offset_storage = c.group_offset_storage();
63 b.conn_idle_timeout = c.connection_idle_timeout();
64 }
65 b
66}
67
68impl Builder {
69 pub fn with_group(mut self, group: String) -> Builder {
75 self.group = group;
76 self
77 }
78
79 pub fn with_topic(mut self, topic: String) -> Builder {
89 self.assignments.insert(topic, Vec::new());
90 self
91 }
92
93 pub fn with_topic_partitions(mut self, topic: String, partitions: &[i32]) -> Builder {
103 self.assignments.insert(topic, partitions.to_vec());
104 self
105 }
106
107 #[cfg(feature = "security")]
110 pub fn with_security(mut self, sec: SecurityConfig) -> Builder {
111 self.security_config = Some(sec);
112 self
113 }
114
115 pub fn with_fallback_offset(mut self, fallback_offset: FetchOffset) -> Builder {
128 self.fallback_offset = fallback_offset;
129 self
130 }
131
132 pub fn with_fetch_max_wait_time(mut self, max_wait_time: Duration) -> Builder {
134 self.fetch_max_wait_time = max_wait_time;
135 self
136 }
137
138 pub fn with_fetch_min_bytes(mut self, min_bytes: i32) -> Builder {
140 self.fetch_min_bytes = min_bytes;
141 self
142 }
143
144 pub fn with_fetch_max_bytes_per_partition(mut self, max_bytes_per_partition: i32) -> Builder {
146 self.fetch_max_bytes_per_partition = max_bytes_per_partition;
147 self
148 }
149
150 pub fn with_fetch_crc_validation(mut self, validate_crc: bool) -> Builder {
152 self.fetch_crc_validation = validate_crc;
153 self
154 }
155
156 pub fn with_offset_storage(mut self, storage: Option<GroupOffsetStorage>) -> Builder {
158 self.group_offset_storage = storage;
159 self
160 }
161
162 pub fn with_retry_max_bytes_limit(mut self, limit: i32) -> Builder {
185 self.retry_max_bytes_limit = limit;
186 self
187 }
188
189 pub fn with_connection_idle_timeout(mut self, timeout: Duration) -> Self {
192 self.conn_idle_timeout = timeout;
193 self
194 }
195
196 pub fn with_client_id(mut self, client_id: String) -> Self {
199 self.client_id = Some(client_id);
200 self
201 }
202
203 #[cfg(not(feature = "security"))]
204 fn new_kafka_client(hosts: Vec<String>, _: Option<SecurityConfig>) -> KafkaClient {
205 KafkaClient::new(hosts)
206 }
207
208 #[cfg(feature = "security")]
209 fn new_kafka_client(hosts: Vec<String>, security: Option<SecurityConfig>) -> KafkaClient {
210 if let Some(security) = security {
211 KafkaClient::new_secure(hosts, security)
212 } else {
213 KafkaClient::new(hosts)
214 }
215 }
216
217 pub fn create(self) -> Result<Consumer> {
224 if self.assignments.is_empty() {
226 return Err(Error::NoTopicsAssigned);
227 }
228 let (mut client, need_metadata) = match self.client {
230 Some(client) => (client, false),
231 None => (
232 Self::new_kafka_client(self.hosts, self.security_config),
233 true,
234 ),
235 };
236 client.set_fetch_max_wait_time(self.fetch_max_wait_time)?;
238 client.set_fetch_min_bytes(self.fetch_min_bytes);
239 client.set_fetch_max_bytes_per_partition(self.fetch_max_bytes_per_partition);
240 client.set_group_offset_storage(self.group_offset_storage);
241 client.set_connection_idle_timeout(self.conn_idle_timeout);
242 if let Some(client_id) = self.client_id {
243 client.set_client_id(client_id)
244 }
245 if need_metadata {
247 client.load_metadata_all()?;
248 }
249 let config = Config {
251 group: self.group,
252 fallback_offset: self.fallback_offset,
253 retry_max_bytes_limit: self.retry_max_bytes_limit,
254 };
255 let state = State::new(&mut client, &config, assignment::from_map(self.assignments))?;
256 debug!(
257 "initialized: Consumer {{ config: {:?}, state: {:?} }}",
258 config, state
259 );
260 Ok(Consumer {
261 client,
262 state,
263 config,
264 })
265 }
266}