kafka/consumer/
builder.rs

1use std::collections::HashMap;
2use std::time::Duration;
3
4use crate::client::{self, FetchOffset, GroupOffsetStorage, KafkaClient};
5use crate::error::{Error, Result};
6
7use super::assignment;
8use super::config::Config;
9use super::state::State;
10use super::{Consumer, DEFAULT_FALLBACK_OFFSET, DEFAULT_RETRY_MAX_BYTES_LIMIT};
11
12#[cfg(feature = "security")]
13use crate::client::SecurityConfig;
14
15#[cfg(not(feature = "security"))]
16type SecurityConfig = ();
17
18/// A Kafka Consumer builder easing the process of setting up various
19/// configuration settings.
20#[derive(Debug)]
21pub struct Builder {
22    client: Option<KafkaClient>,
23    hosts: Vec<String>,
24    group: String,
25    assignments: HashMap<String, Vec<i32>>,
26    fallback_offset: FetchOffset,
27    fetch_max_wait_time: Duration,
28    fetch_min_bytes: i32,
29    fetch_max_bytes_per_partition: i32,
30    retry_max_bytes_limit: i32,
31    fetch_crc_validation: bool,
32    security_config: Option<SecurityConfig>,
33    group_offset_storage: Option<GroupOffsetStorage>,
34    conn_idle_timeout: Duration,
35    client_id: Option<String>,
36}
37
38// ~ public only to be shared inside the kafka crate; not supposed to
39// be published outside the crate itself
40pub fn new(client: Option<KafkaClient>, hosts: Vec<String>) -> Builder {
41    let mut b = Builder {
42        client,
43        hosts,
44        fetch_max_wait_time: Duration::from_millis(client::DEFAULT_FETCH_MAX_WAIT_TIME_MILLIS),
45        fetch_min_bytes: client::DEFAULT_FETCH_MIN_BYTES,
46        fetch_max_bytes_per_partition: client::DEFAULT_FETCH_MAX_BYTES_PER_PARTITION,
47        fetch_crc_validation: client::DEFAULT_FETCH_CRC_VALIDATION,
48        retry_max_bytes_limit: DEFAULT_RETRY_MAX_BYTES_LIMIT,
49        group: String::new(),
50        assignments: HashMap::new(),
51        fallback_offset: DEFAULT_FALLBACK_OFFSET,
52        security_config: None,
53        group_offset_storage: client::DEFAULT_GROUP_OFFSET_STORAGE,
54        conn_idle_timeout: Duration::from_millis(client::DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS),
55        client_id: None,
56    };
57    if let Some(ref c) = b.client {
58        b.fetch_max_wait_time = c.fetch_max_wait_time();
59        b.fetch_min_bytes = c.fetch_min_bytes();
60        b.fetch_max_bytes_per_partition = c.fetch_max_bytes_per_partition();
61        b.fetch_crc_validation = c.fetch_crc_validation();
62        b.group_offset_storage = c.group_offset_storage();
63        b.conn_idle_timeout = c.connection_idle_timeout();
64    }
65    b
66}
67
68impl Builder {
69    /// Specifies the group on whose behalf to maintain consumed
70    /// message offsets.
71    ///
72    /// The group is allowed to be the empty string, in which case the
73    /// resulting consumer will be group-less.
74    pub fn with_group(mut self, group: String) -> Builder {
75        self.group = group;
76        self
77    }
78
79    /// Specifies a topic to consume. All of the available partitions
80    /// of the identified topic will be consumed unless overridden
81    /// later using `with_topic_partitions`.
82    ///
83    /// This method may be called multiple times to assign the
84    /// consumer multiple topics.
85    ///
86    /// This method or `with_topic_partitions` must be called at least
87    /// once, to assign a topic to the consumer.
88    pub fn with_topic(mut self, topic: String) -> Builder {
89        self.assignments.insert(topic, Vec::new());
90        self
91    }
92
93    /// Explicitly specifies topic partitions to consume. Only the
94    /// specified partitions for the identified topic will be consumed
95    /// unless overridden later using `with_topic`.
96    ///
97    /// This method may be called multiple times to subscribe to
98    /// multiple topics.
99    ///
100    /// This method or `with_topic` must be called at least once, to
101    /// assign a topic to the consumer.
102    pub fn with_topic_partitions(mut self, topic: String, partitions: &[i32]) -> Builder {
103        self.assignments.insert(topic, partitions.to_vec());
104        self
105    }
106
107    /// Specifies the security config to use.
108    /// See `KafkaClient::new_secure` for more info.
109    #[cfg(feature = "security")]
110    pub fn with_security(mut self, sec: SecurityConfig) -> Builder {
111        self.security_config = Some(sec);
112        self
113    }
114
115    /// Specifies the offset to use when none was committed for the
116    /// underlying group yet or the consumer has no group configured.
117    ///
118    /// Running the underlying group for the first time against a
119    /// topic or running the consumer without a group results in the
120    /// question where to start reading from the topic, since it might
121    /// already contain a lot of messages.  Common strategies are
122    /// starting at the earliest available message (thereby consuming
123    /// whatever is currently in the topic) or at the latest one
124    /// (thereby staring to consume only newly arriving messages.)
125    /// The "fallback offset" here corresponds to `time` in
126    /// `KafkaClient::fetch_offsets`.
127    pub fn with_fallback_offset(mut self, fallback_offset: FetchOffset) -> Builder {
128        self.fallback_offset = fallback_offset;
129        self
130    }
131
132    /// See `KafkaClient::set_fetch_max_wait_time`
133    pub fn with_fetch_max_wait_time(mut self, max_wait_time: Duration) -> Builder {
134        self.fetch_max_wait_time = max_wait_time;
135        self
136    }
137
138    /// See `KafkaClient::set_fetch_min_bytes`
139    pub fn with_fetch_min_bytes(mut self, min_bytes: i32) -> Builder {
140        self.fetch_min_bytes = min_bytes;
141        self
142    }
143
144    /// See `KafkaClient::set_fetch_max_bytes_per_partition`
145    pub fn with_fetch_max_bytes_per_partition(mut self, max_bytes_per_partition: i32) -> Builder {
146        self.fetch_max_bytes_per_partition = max_bytes_per_partition;
147        self
148    }
149
150    /// See `KafkaClient::set_fetch_crc_validation`
151    pub fn with_fetch_crc_validation(mut self, validate_crc: bool) -> Builder {
152        self.fetch_crc_validation = validate_crc;
153        self
154    }
155
156    /// See `KafkaClient::set_group_offset_storage`
157    pub fn with_offset_storage(mut self, storage: Option<GroupOffsetStorage>) -> Builder {
158        self.group_offset_storage = storage;
159        self
160    }
161
162    /// Specifies the upper bound of data bytes to allow fetching from
163    /// a kafka partition when retrying a fetch request due to a too
164    /// big message in the partition.
165    ///
166    /// By default, this consumer will fetch up to
167    /// `KafkaClient::fetch_max_bytes_per_partition` data from each
168    /// partition.  However, when it discovers that there are messages
169    /// in an underlying partition which could not be delivered, the
170    /// request to that partition might be retried a few times with an
171    /// increased `fetch_max_bytes_per_partition`.  The value
172    /// specified here defines a limit to this increment.
173    ///
174    /// A value smaller than the
175    /// `KafkaClient::fetch_max_bytes_per_partition`, e.g. zero, will
176    /// disable the retry feature of this consumer.  The default value
177    /// for this setting is `DEFAULT_RETRY_MAX_BYTES_LIMIT`.
178    ///
179    /// Note: if the consumed topic partitions are known to host large
180    /// messages it is much more efficient to set
181    /// `KafkaClient::fetch_max_bytes_per_partition` appropriately
182    /// instead of relying on the limit specified here.  This limit is
183    /// just an upper bound for already additional retry requests.
184    pub fn with_retry_max_bytes_limit(mut self, limit: i32) -> Builder {
185        self.retry_max_bytes_limit = limit;
186        self
187    }
188
189    /// Specifies the timeout for idle connections.
190    /// See `KafkaClient::set_connection_idle_timeout`.
191    pub fn with_connection_idle_timeout(mut self, timeout: Duration) -> Self {
192        self.conn_idle_timeout = timeout;
193        self
194    }
195
196    /// Specifies a client_id to be sent along every request to Kafka
197    /// brokers. See `KafkaClient::set_client_id`.
198    pub fn with_client_id(mut self, client_id: String) -> Self {
199        self.client_id = Some(client_id);
200        self
201    }
202
203    #[cfg(not(feature = "security"))]
204    fn new_kafka_client(hosts: Vec<String>, _: Option<SecurityConfig>) -> KafkaClient {
205        KafkaClient::new(hosts)
206    }
207
208    #[cfg(feature = "security")]
209    fn new_kafka_client(hosts: Vec<String>, security: Option<SecurityConfig>) -> KafkaClient {
210        if let Some(security) = security {
211            KafkaClient::new_secure(hosts, security)
212        } else {
213            KafkaClient::new(hosts)
214        }
215    }
216
217    /// Finally creates/builds a new consumer based on the so far
218    /// supplied settings.
219    ///
220    /// Fails with the `NoTopicsAssigned` error, if neither
221    /// `with_topic` nor `with_topic_partitions` have been called to
222    /// assign at least one topic for consumption.
223    pub fn create(self) -> Result<Consumer> {
224        // ~ fail immediately if there's no topic to be consumed
225        if self.assignments.is_empty() {
226            return Err(Error::NoTopicsAssigned);
227        }
228        // ~ create the client if necessary
229        let (mut client, need_metadata) = match self.client {
230            Some(client) => (client, false),
231            None => (
232                Self::new_kafka_client(self.hosts, self.security_config),
233                true,
234            ),
235        };
236        // ~ apply configuration settings
237        client.set_fetch_max_wait_time(self.fetch_max_wait_time)?;
238        client.set_fetch_min_bytes(self.fetch_min_bytes);
239        client.set_fetch_max_bytes_per_partition(self.fetch_max_bytes_per_partition);
240        client.set_group_offset_storage(self.group_offset_storage);
241        client.set_connection_idle_timeout(self.conn_idle_timeout);
242        if let Some(client_id) = self.client_id {
243            client.set_client_id(client_id)
244        }
245        // ~ load metadata if necessary
246        if need_metadata {
247            client.load_metadata_all()?;
248        }
249        // ~ load consumer state
250        let config = Config {
251            group: self.group,
252            fallback_offset: self.fallback_offset,
253            retry_max_bytes_limit: self.retry_max_bytes_limit,
254        };
255        let state = State::new(&mut client, &config, assignment::from_map(self.assignments))?;
256        debug!(
257            "initialized: Consumer {{ config: {:?}, state: {:?} }}",
258            config, state
259        );
260        Ok(Consumer {
261            client,
262            state,
263            config,
264        })
265    }
266}