1use std::collections::HashMap;
2use std::collections::VecDeque;
3use std::fmt;
4use std::hash::BuildHasherDefault;
5
6use fnv::FnvHasher;
7
8use crate::client::metadata::Topics;
9use crate::client::{FetchGroupOffset, FetchOffset, KafkaClient};
10use crate::error::{Error, KafkaCode, Result};
11
12use super::assignment::{Assignment, AssignmentRef, Assignments};
13use super::config::Config;
14
15pub type PartitionHasher = BuildHasherDefault<FnvHasher>;
16
17#[derive(Debug)]
19pub struct FetchState {
20 pub offset: i64,
22 pub max_bytes: i32,
24}
25
26#[derive(Debug, PartialEq, Eq, Hash)]
27pub struct TopicPartition {
28 pub topic_ref: AssignmentRef,
30 pub partition: i32,
32}
33
34#[derive(Debug)]
35pub struct ConsumedOffset {
36 pub offset: i64,
38 pub dirty: bool,
41}
42
43pub struct State {
44 pub assignments: Assignments,
47
48 pub fetch_offsets: HashMap<TopicPartition, FetchState, PartitionHasher>,
51
52 pub retry_partitions: VecDeque<TopicPartition>,
55
56 pub consumed_offsets: HashMap<TopicPartition, ConsumedOffset, PartitionHasher>,
59}
60
61impl<'a> fmt::Debug for State {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 write!(
64 f,
65 "State {{ assignments: {:?}, fetch_offsets: {:?}, retry_partitions: {:?}, \
66 consumed_offsets: {:?} }}",
67 self.assignments,
68 self.fetch_offsets_debug(),
69 TopicPartitionsDebug {
70 state: self,
71 tps: &self.retry_partitions,
72 },
73 self.consumed_offsets_debug()
74 )
75 }
76}
77
78impl State {
79 pub fn new(
80 client: &mut KafkaClient,
81 config: &Config,
82 assignments: Assignments,
83 ) -> Result<State> {
84 let (consumed_offsets, fetch_offsets) = {
85 let subscriptions = {
86 let xs = assignments.as_slice();
87 let mut subs = Vec::with_capacity(xs.len());
88 for x in xs {
89 subs.push(determine_partitions(x, client.topics())?);
90 }
91 subs
92 };
93 let n = subscriptions.iter().map(|s| s.partitions.len()).sum();
94 let consumed =
95 load_consumed_offsets(client, &config.group, &assignments, &subscriptions, n)?;
96
97 let fetch_next =
98 load_fetch_states(client, config, &assignments, &subscriptions, &consumed, n)?;
99 (consumed, fetch_next)
100 };
101 Ok(State {
102 assignments,
103 fetch_offsets,
104 retry_partitions: VecDeque::new(),
105 consumed_offsets,
106 })
107 }
108
109 pub fn topic_name(&self, assignment: AssignmentRef) -> &str {
110 self.assignments[assignment].topic()
111 }
112
113 pub fn topic_ref(&self, name: &str) -> Option<AssignmentRef> {
114 self.assignments.topic_ref(name)
115 }
116
117 pub fn fetch_offsets_debug(&self) -> OffsetsMapDebug<'_, FetchState> {
120 OffsetsMapDebug {
121 state: self,
122 offsets: &self.fetch_offsets,
123 }
124 }
125
126 pub fn consumed_offsets_debug(&self) -> OffsetsMapDebug<'_, ConsumedOffset> {
127 OffsetsMapDebug {
128 state: self,
129 offsets: &self.consumed_offsets,
130 }
131 }
132}
133
134struct Subscription<'a> {
136 assignment: &'a Assignment, partitions: Vec<i32>, }
139
140fn determine_partitions<'a>(
144 assignment: &'a Assignment,
145 metadata: Topics<'_>,
146) -> Result<Subscription<'a>> {
147 let topic = assignment.topic();
148 let req_partitions = assignment.partitions();
149 let avail_partitions = match metadata.partitions(topic) {
150 None => {
152 debug!(
153 "determine_partitions: no such topic: {} (all metadata: {:?})",
154 topic, metadata
155 );
156 return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition));
157 }
158 Some(tp) => tp,
159 };
160 let ps = if req_partitions.is_empty() {
161 let mut ps: Vec<i32> = Vec::with_capacity(avail_partitions.len());
163 for p in avail_partitions {
164 ps.push(p.id());
165 }
166 ps
167 } else {
168 let mut ps: Vec<i32> = Vec::with_capacity(req_partitions.len());
171 for &p in req_partitions {
172 match avail_partitions.partition(p) {
173 None => {
174 debug!(
175 "determine_partitions: no such partition: \"{}:{}\" \
176 (all metadata: {:?})",
177 topic, p, metadata
178 );
179 return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition));
180 }
181 Some(_) => ps.push(p),
182 };
183 }
184 ps
185 };
186 Ok(Subscription {
187 assignment,
188 partitions: ps,
189 })
190}
191
192fn load_consumed_offsets(
195 client: &mut KafkaClient,
196 group: &str,
197 assignments: &Assignments,
198 subscriptions: &[Subscription<'_>],
199 result_capacity: usize,
200) -> Result<HashMap<TopicPartition, ConsumedOffset, PartitionHasher>> {
201 assert!(!subscriptions.is_empty());
202 let mut offs = HashMap::with_capacity_and_hasher(result_capacity, PartitionHasher::default());
204 if group.is_empty() {
206 return Ok(offs);
207 }
208 let tpos = client.fetch_group_offsets(
210 group,
211 subscriptions.iter().flat_map(|s| {
212 let topic = s.assignment.topic();
213 s.partitions
214 .iter()
215 .map(move |&p| FetchGroupOffset::new(topic, p))
216 }),
217 )?;
218 for (topic, pos) in tpos {
219 for po in pos {
220 if po.offset != -1 {
221 offs.insert(
222 TopicPartition {
223 topic_ref: assignments.topic_ref(&topic).expect("non-assigned topic"),
224 partition: po.partition,
225 },
226 ConsumedOffset {
229 offset: po.offset - 1,
230 dirty: false,
231 },
232 );
233 }
234 }
235 }
236
237 debug!("load_consumed_offsets: constructed consumed: {:#?}", offs);
238
239 Ok(offs)
240}
241
242fn load_fetch_states(
245 client: &mut KafkaClient,
246 config: &Config,
247 assignments: &Assignments,
248 subscriptions: &[Subscription<'_>],
249 consumed_offsets: &HashMap<TopicPartition, ConsumedOffset, PartitionHasher>,
250 result_capacity: usize,
251) -> Result<HashMap<TopicPartition, FetchState, PartitionHasher>> {
252 fn load_partition_offsets(
253 client: &mut KafkaClient,
254 topics: &[&str],
255 offset: FetchOffset,
256 ) -> Result<HashMap<String, HashMap<i32, i64, PartitionHasher>>> {
257 let toffs = client.fetch_offsets(topics, offset)?;
258 let mut m = HashMap::with_capacity(toffs.len());
259 for (topic, poffs) in toffs {
260 let mut pidx =
261 HashMap::with_capacity_and_hasher(poffs.len(), PartitionHasher::default());
262
263 for poff in poffs {
264 pidx.insert(poff.partition, poff.offset);
265 }
266
267 m.insert(topic, pidx);
268 }
269 Ok(m)
270 }
271
272 let mut fetch_offsets =
273 HashMap::with_capacity_and_hasher(result_capacity, PartitionHasher::default());
274 let max_bytes = client.fetch_max_bytes_per_partition();
275 let subscription_topics: Vec<_> = subscriptions.iter().map(|s| s.assignment.topic()).collect();
276 if consumed_offsets.is_empty() {
277 let offsets = load_partition_offsets(client, &subscription_topics, config.fallback_offset)?;
280 for s in subscriptions {
281 let topic_ref = assignments
282 .topic_ref(s.assignment.topic())
283 .expect("unassigned subscription");
284 match offsets.get(s.assignment.topic()) {
285 None => {
286 debug!(
287 "load_fetch_states: failed to load fallback offsets for: {}",
288 s.assignment.topic()
289 );
290 return Err(Error::Kafka(KafkaCode::UnknownTopicOrPartition));
291 }
292 Some(offsets) => {
293 for p in &s.partitions {
294 fetch_offsets.insert(
295 TopicPartition {
296 topic_ref,
297 partition: *p,
298 },
299 FetchState {
300 offset: *offsets.get(p).unwrap_or(&-1),
301 max_bytes,
302 },
303 );
304 }
305 }
306 }
307 }
308 } else {
309 let latest = load_partition_offsets(client, &subscription_topics, FetchOffset::Latest)?;
311 let earliest = load_partition_offsets(client, &subscription_topics, FetchOffset::Earliest)?;
312 for s in subscriptions {
316 let topic_ref = assignments
317 .topic_ref(s.assignment.topic())
318 .expect("unassigned subscription");
319 for p in &s.partitions {
320 let l_off = *latest
321 .get(s.assignment.topic())
322 .and_then(|ps| ps.get(p))
323 .unwrap_or(&-1);
324 let e_off = *earliest
325 .get(s.assignment.topic())
326 .and_then(|ps| ps.get(p))
327 .unwrap_or(&-1);
328
329 let tp = TopicPartition {
330 topic_ref,
331 partition: *p,
332 };
333
334 let offset = match consumed_offsets.get(&tp) {
336 Some(co) if co.offset >= e_off && co.offset < l_off => co.offset + 1,
337 _ => match config.fallback_offset {
338 FetchOffset::Latest => l_off,
339 FetchOffset::Earliest => e_off,
340 _ => {
341 debug!(
342 "cannot determine fetch offset \
343 (group: {} / topic: {} / partition: {})",
344 &config.group,
345 s.assignment.topic(),
346 p
347 );
348 return Err(Error::Kafka(KafkaCode::Unknown));
349 }
350 },
351 };
352 fetch_offsets.insert(tp, FetchState { offset, max_bytes });
353 }
354 }
355 }
356 Ok(fetch_offsets)
357}
358
359pub struct OffsetsMapDebug<'a, T> {
360 state: &'a State,
361 offsets: &'a HashMap<TopicPartition, T, PartitionHasher>,
362}
363
364impl<'a, T: fmt::Debug + 'a> fmt::Debug for OffsetsMapDebug<'a, T> {
365 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
366 write!(f, "{{")?;
367 for (i, (tp, v)) in self.offsets.iter().enumerate() {
368 if i != 0 {
369 write!(f, ", ")?;
370 }
371 let topic = self.state.topic_name(tp.topic_ref);
372 write!(f, "\"{}:{}\": {:?}", topic, tp.partition, v)?;
373 }
374 write!(f, "}}")
375 }
376}
377
378struct TopicPartitionsDebug<'a> {
379 state: &'a State,
380 tps: &'a VecDeque<TopicPartition>,
381}
382
383impl<'a> fmt::Debug for TopicPartitionsDebug<'a> {
384 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
385 write!(f, "[")?;
386 for (i, tp) in self.tps.iter().enumerate() {
387 if i != 0 {
388 write!(f, " ,")?;
389 }
390 write!(
391 f,
392 "\"{}:{}\"",
393 self.state.topic_name(tp.topic_ref),
394 tp.partition
395 )?;
396 }
397 write!(f, "]")
398 }
399}