kafka/producer.rs
1//! Kafka Producer - A higher-level API for sending messages to Kafka
2//! topics.
3//!
4//! This module hosts a multi-topic capable producer for a Kafka
5//! cluster providing a convenient API for sending messages
6//! synchronously.
7//!
8//! In Kafka, each message is a key/value pair where one or the other
9//! is optional. A `Record` represents all the data necessary to
10//! produce such a message to Kafka using the `Producer`. It
11//! specifies the target topic and the target partition the message is
12//! supposed to be delivered to as well as the key and the value.
13//!
14//! # Example
15//! ```no_run
16//! use std::fmt::Write;
17//! use std::time::Duration;
18//! use kafka::producer::{Producer, Record, RequiredAcks};
19//!
20//! let mut producer =
21//! Producer::from_hosts(vec!("localhost:9092".to_owned()))
22//! .with_ack_timeout(Duration::from_secs(1))
23//! .with_required_acks(RequiredAcks::One)
24//! .create()
25//! .unwrap();
26//!
27//! let mut buf = String::with_capacity(2);
28//! for i in 0..10 {
29//! let _ = write!(&mut buf, "{}", i); // some computation of the message data to be sent
30//! producer.send(&Record::from_value("my-topic", buf.as_bytes())).unwrap();
31//! buf.clear();
32//! }
33//! ```
34//!
35//! In this example, when the `producer.send(..)` returns
36//! successfully, we are guaranteed the message is delivered to Kafka
37//! and persisted by at least one Kafka broker. However, when sending
38//! multiple messages just like in this example, it is more efficient
39//! to send them in batches using `Producer::send_all`.
40//!
41//! Since some of the `Record`s attributes are optional, convenience
42//! methods exist to ease their creation. In this example, the call
43//! to `Record::from_value` creates a key-less, value-only record with
44//! an unspecified partition. The `Record` struct, however, is
45//! intended to provide full control over its lifecycle to client
46//! code, and, hence, is fully open. Its current constructor methods
47//! are provided for convenience only.
48//!
49//! Beside the target topic, key, and the value of a `Record`, client
50//! code is allowed to specify the topic partition the message is
51//! supposed to be delivered to. If the partition of a `Record` is
52//! not specified - more precisely speaking if it's negative -
53//! `Producer` will rely on its underlying `Partitioner` to find a
54//! suitable one. A `Partitioner` implementation can be supplied by
55//! client code at the `Producer`'s construction time and defaults to
56//! `DefaultPartitioner`. See that for more information for its
57//! strategy to find a partition.
58
59// XXX 1) rethink return values for the send_all() method
60// XXX 2) Handle recoverable errors behind the scenes through retry attempts
61
62use crate::client::{self, KafkaClient};
63use crate::error::{Error, Result};
64use std::collections::HashMap;
65use std::fmt;
66use std::hash::{BuildHasher, BuildHasherDefault, Hasher};
67use std::slice::from_ref;
68use std::time::Duration;
69use twox_hash::XxHash32;
70
71#[cfg(feature = "security")]
72use crate::client::SecurityConfig;
73
74#[cfg(not(feature = "security"))]
75type SecurityConfig = ();
76use crate::client_internals::KafkaClientInternals;
77use crate::protocol;
78
79// public re-exports
80pub use crate::client::{Compression, ProduceConfirm, ProducePartitionConfirm, RequiredAcks};
81
82/// The default value for `Builder::with_ack_timeout`.
83pub const DEFAULT_ACK_TIMEOUT_MILLIS: u64 = 30 * 1000;
84
85/// The default value for `Builder::with_required_acks`.
86pub const DEFAULT_REQUIRED_ACKS: RequiredAcks = RequiredAcks::One;
87
88// --------------------------------------------------------------------
89
90/// A trait used by `Producer` to obtain the bytes `Record::key` and
91/// `Record::value` represent. This leaves the choice of the types
92/// for `key` and `value` with the client.
93pub trait AsBytes {
94 fn as_bytes(&self) -> &[u8];
95}
96
97impl AsBytes for () {
98 fn as_bytes(&self) -> &[u8] {
99 &[]
100 }
101}
102
103// There seems to be some compiler issue with this:
104// impl<T: AsRef<[u8]>> AsBytes for T {
105// fn as_bytes(&self) -> &[u8] { self.as_ref() }
106// }
107
108// for now we provide the impl for some standard library types
109impl AsBytes for String {
110 fn as_bytes(&self) -> &[u8] {
111 self.as_ref()
112 }
113}
114impl AsBytes for Vec<u8> {
115 fn as_bytes(&self) -> &[u8] {
116 self.as_ref()
117 }
118}
119
120impl<'a> AsBytes for &'a [u8] {
121 fn as_bytes(&self) -> &[u8] {
122 self
123 }
124}
125impl<'a> AsBytes for &'a str {
126 fn as_bytes(&self) -> &[u8] {
127 str::as_bytes(self)
128 }
129}
130
131// --------------------------------------------------------------------
132
133/// A structure representing a message to be sent to Kafka through the
134/// `Producer` API. Such a message is basically a key/value pair
135/// specifying the target topic and optionally the topic's partition.
136pub struct Record<'a, K, V> {
137 /// Key data of this (message) record.
138 pub key: K,
139
140 /// Value data of this (message) record.
141 pub value: V,
142
143 /// Name of the topic this message is supposed to be delivered to.
144 pub topic: &'a str,
145
146 /// The partition id of the topic to deliver this message to.
147 /// This partition may be `< 0` in which case it is considered
148 /// "unspecified". A `Producer` will then typically try to derive
149 /// a partition on its own.
150 pub partition: i32,
151}
152
153impl<'a, K, V> Record<'a, K, V> {
154 /// Convenience function to create a new key/value record with an
155 /// "unspecified" partition - this is, a partition set to a negative
156 /// value.
157 #[inline]
158 pub fn from_key_value(topic: &'a str, key: K, value: V) -> Record<'a, K, V> {
159 Record {
160 key,
161 value,
162 topic,
163 partition: -1,
164 }
165 }
166
167 /// Convenience method to set the partition.
168 #[inline]
169 pub fn with_partition(mut self, partition: i32) -> Self {
170 self.partition = partition;
171 self
172 }
173}
174
175impl<'a, V> Record<'a, (), V> {
176 /// Convenience function to create a new value only record with an
177 /// "unspecified" partition - this is, a partition set to a negative
178 /// value.
179 #[inline]
180 pub fn from_value(topic: &'a str, value: V) -> Record<'a, (), V> {
181 Record {
182 key: (),
183 value,
184 topic,
185 partition: -1,
186 }
187 }
188}
189
190impl<'a, K: fmt::Debug, V: fmt::Debug> fmt::Debug for Record<'a, K, V> {
191 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
192 write!(
193 f,
194 "Record {{ topic: {}, partition: {}, key: {:?}, value: {:?} }}",
195 self.topic, self.partition, self.key, self.value
196 )
197 }
198}
199
200// --------------------------------------------------------------------
201
202/// The Kafka Producer
203///
204/// See module level documentation.
205pub struct Producer<P = DefaultPartitioner> {
206 client: KafkaClient,
207 state: State<P>,
208 config: Config,
209}
210
211struct State<P> {
212 /// A list of available partition IDs for each topic.
213 partitions: HashMap<String, Partitions>,
214 /// The partitioner to decide how to distribute messages
215 partitioner: P,
216}
217
218struct Config {
219 /// The maximum time to wait for acknowledgements. See
220 /// `KafkaClient::produce_messages`.
221 ack_timeout: i32,
222 /// The number of acks to request. See
223 /// `KafkaClient::produce_messages`.
224 required_acks: i16,
225}
226
227impl Producer {
228 /// Starts building a new producer using the given Kafka client.
229 pub fn from_client(client: KafkaClient) -> Builder<DefaultPartitioner> {
230 Builder::new(Some(client), Vec::new())
231 }
232
233 /// Starts building a producer bootstraping internally a new kafka
234 /// client from the given kafka hosts.
235 pub fn from_hosts(hosts: Vec<String>) -> Builder<DefaultPartitioner> {
236 Builder::new(None, hosts)
237 }
238
239 /// Borrows the underlying kafka client.
240 pub fn client(&self) -> &KafkaClient {
241 &self.client
242 }
243
244 /// Borrows the underlying kafka client as mut.
245 pub fn client_mut(&mut self) -> &mut KafkaClient {
246 &mut self.client
247 }
248
249 /// Destroys this producer returning the underlying kafka client.
250 pub fn into_client(self) -> KafkaClient {
251 self.client
252 }
253}
254
255impl<P: Partitioner> Producer<P> {
256 /// Synchronously send the specified message to Kafka.
257 pub fn send<'a, K, V>(&mut self, rec: &Record<'a, K, V>) -> Result<()>
258 where
259 K: AsBytes,
260 V: AsBytes,
261 {
262 let mut rs = self.send_all(from_ref(rec))?;
263
264 if self.config.required_acks == 0 {
265 // ~ with no required_acks we get no response and
266 // consider the send-data request blindly as successful
267 Ok(())
268 } else {
269 assert_eq!(1, rs.len());
270 let mut produce_confirm = rs.pop().unwrap();
271
272 assert_eq!(1, produce_confirm.partition_confirms.len());
273 produce_confirm
274 .partition_confirms
275 .pop()
276 .unwrap()
277 .offset
278 .map_err(Error::Kafka)?;
279 Ok(())
280 }
281 }
282
283 /// Synchronously send all of the specified messages to Kafka. To validate
284 /// that all of the specified records have been successfully delivered,
285 /// inspection of the offsets on the returned confirms is necessary.
286 pub fn send_all<'a, K, V>(&mut self, recs: &[Record<'a, K, V>]) -> Result<Vec<ProduceConfirm>>
287 where
288 K: AsBytes,
289 V: AsBytes,
290 {
291 let partitioner = &mut self.state.partitioner;
292 let partitions = &self.state.partitions;
293 let client = &mut self.client;
294 let config = &self.config;
295
296 client.internal_produce_messages(
297 config.required_acks,
298 config.ack_timeout,
299 recs.iter().map(|r| {
300 let mut m = client::ProduceMessage {
301 key: to_option(r.key.as_bytes()),
302 value: to_option(r.value.as_bytes()),
303 topic: r.topic,
304 partition: r.partition,
305 };
306 partitioner.partition(Topics::new(partitions), &mut m);
307 m
308 }),
309 )
310 }
311}
312
313fn to_option(data: &[u8]) -> Option<&[u8]> {
314 if data.is_empty() {
315 None
316 } else {
317 Some(data)
318 }
319}
320
321// --------------------------------------------------------------------
322
323impl<P> State<P> {
324 fn new(client: &mut KafkaClient, partitioner: P) -> Result<State<P>> {
325 let ts = client.topics();
326 let mut ids = HashMap::with_capacity(ts.len());
327 for t in ts {
328 let ps = t.partitions();
329 ids.insert(
330 t.name().to_owned(),
331 Partitions {
332 available_ids: ps.available_ids(),
333 num_all_partitions: ps.len() as u32,
334 },
335 );
336 }
337 Ok(State {
338 partitions: ids,
339 partitioner,
340 })
341 }
342}
343
344// --------------------------------------------------------------------
345
346/// A Kafka Producer builder easing the process of setting up various
347/// configuration settings.
348pub struct Builder<P = DefaultPartitioner> {
349 client: Option<KafkaClient>,
350 hosts: Vec<String>,
351 compression: Compression,
352 ack_timeout: Duration,
353 conn_idle_timeout: Duration,
354 required_acks: RequiredAcks,
355 partitioner: P,
356 security_config: Option<SecurityConfig>,
357 client_id: Option<String>,
358}
359
360impl Builder {
361 fn new(client: Option<KafkaClient>, hosts: Vec<String>) -> Builder<DefaultPartitioner> {
362 let mut b = Builder {
363 client,
364 hosts,
365 compression: client::DEFAULT_COMPRESSION,
366 ack_timeout: Duration::from_millis(DEFAULT_ACK_TIMEOUT_MILLIS),
367 conn_idle_timeout: Duration::from_millis(
368 client::DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS,
369 ),
370 required_acks: DEFAULT_REQUIRED_ACKS,
371 partitioner: DefaultPartitioner::default(),
372 security_config: None,
373 client_id: None,
374 };
375 if let Some(ref c) = b.client {
376 b.compression = c.compression();
377 b.conn_idle_timeout = c.connection_idle_timeout();
378 }
379 b
380 }
381
382 /// Specifies the security config to use.
383 /// See `KafkaClient::new_secure` for more info.
384 #[cfg(feature = "security")]
385 pub fn with_security(mut self, security: SecurityConfig) -> Self {
386 self.security_config = Some(security);
387 self
388 }
389
390 /// Sets the compression algorithm to use when sending out data.
391 ///
392 /// See `KafkaClient::set_compression`.
393 pub fn with_compression(mut self, compression: Compression) -> Self {
394 self.compression = compression;
395 self
396 }
397
398 /// Sets the maximum time the kafka brokers can await the receipt
399 /// of required acknowledgements (which is specified through
400 /// `Builder::with_required_acks`.) Note that Kafka explicitly
401 /// documents this not to be a hard limit.
402 pub fn with_ack_timeout(mut self, timeout: Duration) -> Self {
403 self.ack_timeout = timeout;
404 self
405 }
406
407 /// Specifies the timeout for idle connections.
408 /// See `KafkaClient::set_connection_idle_timeout`.
409 pub fn with_connection_idle_timeout(mut self, timeout: Duration) -> Self {
410 self.conn_idle_timeout = timeout;
411 self
412 }
413
414 /// Sets how many acknowledgements the kafka brokers should
415 /// receive before responding to sent messages.
416 ///
417 /// See `RequiredAcks`.
418 pub fn with_required_acks(mut self, acks: RequiredAcks) -> Self {
419 self.required_acks = acks;
420 self
421 }
422
423 /// Specifies a client_id to be sent along every request to Kafka
424 /// brokers. See `KafkaClient::set_client_id`.
425 pub fn with_client_id(mut self, client_id: String) -> Self {
426 self.client_id = Some(client_id);
427 self
428 }
429}
430
431impl<P> Builder<P> {
432 /// Sets the partitioner to dispatch when sending messages without
433 /// an explicit partition assignment.
434 pub fn with_partitioner<Q: Partitioner>(self, partitioner: Q) -> Builder<Q> {
435 Builder {
436 client: self.client,
437 hosts: self.hosts,
438 compression: self.compression,
439 ack_timeout: self.ack_timeout,
440 conn_idle_timeout: self.conn_idle_timeout,
441 required_acks: self.required_acks,
442 partitioner,
443 security_config: None,
444 client_id: None,
445 }
446 }
447
448 #[cfg(not(feature = "security"))]
449 fn new_kafka_client(hosts: Vec<String>, _: Option<SecurityConfig>) -> KafkaClient {
450 KafkaClient::new(hosts)
451 }
452
453 #[cfg(feature = "security")]
454 fn new_kafka_client(hosts: Vec<String>, security: Option<SecurityConfig>) -> KafkaClient {
455 if let Some(security) = security {
456 KafkaClient::new_secure(hosts, security)
457 } else {
458 KafkaClient::new(hosts)
459 }
460 }
461
462 /// Finally creates/builds a new producer based on the so far
463 /// supplied settings.
464 pub fn create(self) -> Result<Producer<P>> {
465 // ~ create the client if necessary
466 let (mut client, need_metadata) = match self.client {
467 Some(client) => (client, false),
468 None => (
469 Self::new_kafka_client(self.hosts, self.security_config),
470 true,
471 ),
472 };
473 // ~ apply configuration settings
474 client.set_compression(self.compression);
475 client.set_connection_idle_timeout(self.conn_idle_timeout);
476 if let Some(client_id) = self.client_id {
477 client.set_client_id(client_id);
478 }
479 let producer_config = Config {
480 ack_timeout: protocol::to_millis_i32(self.ack_timeout)?,
481 required_acks: self.required_acks as i16,
482 };
483 // ~ load metadata if necessary
484 if need_metadata {
485 client.load_metadata_all()?;
486 }
487 // ~ create producer state
488 let state = State::new(&mut client, self.partitioner)?;
489 Ok(Producer {
490 client,
491 state,
492 config: producer_config,
493 })
494 }
495}
496
497// --------------------------------------------------------------------
498
499/// A description of available topics and their available partitions.
500///
501/// Indented for use by `Partitioner`s.
502pub struct Topics<'a> {
503 partitions: &'a HashMap<String, Partitions>,
504}
505
506/// Producer relevant partition information of a particular topic.
507///
508/// Indented for use by `Partition`s.
509pub struct Partitions {
510 available_ids: Vec<i32>,
511 num_all_partitions: u32,
512}
513
514impl Partitions {
515 /// Retrieves the list of the identifiers of currently "available"
516 /// partitions for the given topic. This list excludes partitions
517 /// which do not have a leader broker assigned.
518 #[inline]
519 pub fn available_ids(&self) -> &[i32] {
520 &self.available_ids
521 }
522
523 /// Retrieves the number of "available" partitions. This is a
524 /// merely a convenience method. See `Partitions::available_ids`.
525 #[inline]
526 pub fn num_available(&self) -> u32 {
527 self.available_ids.len() as u32
528 }
529
530 /// The total number of partitions of the underlygin topic. This
531 /// number includes also partitions without a current leader
532 /// assignment.
533 #[inline]
534 pub fn num_all(&self) -> u32 {
535 self.num_all_partitions
536 }
537}
538
539impl<'a> Topics<'a> {
540 fn new(partitions: &'a HashMap<String, Partitions>) -> Topics<'a> {
541 Topics { partitions }
542 }
543
544 /// Retrieves informationa about a topic's partitions.
545 #[inline]
546 pub fn partitions(&'a self, topic: &str) -> Option<&'a Partitions> {
547 self.partitions.get(topic)
548 }
549}
550
551/// A partitioner is given a chance to choose/redefine a partition for
552/// a message to be sent to Kafka. See also
553/// `Record#with_partition`.
554///
555/// Implementations can be stateful.
556pub trait Partitioner {
557 /// Supposed to inspect the given message and if desired re-assign
558 /// the message's target partition.
559 ///
560 /// `topics` a description of the currently known topics and their
561 /// currently available partitions.
562 ///
563 /// `msg` the message whose partition assignment potentially to
564 /// change.
565 fn partition(&mut self, topics: Topics<'_>, msg: &mut client::ProduceMessage<'_, '_>);
566}
567
568/// The default hasher implementation used of `DefaultPartitioner`.
569pub type DefaultHasher = XxHash32;
570
571/// As its name implies `DefaultPartitioner` is the default
572/// partitioner for `Producer`.
573///
574/// For every message it proceeds as follows:
575///
576/// - If the messages contains a non-negative partition value it
577/// leaves the message untouched. This will cause `Producer` to try
578/// to send the message to exactly that partition to.
579///
580/// - Otherwise, if the message has an "unspecified" `partition` -
581/// this is, it has a negative partition value - and a specified key,
582/// `DefaultPartitioner` will compute a hash from the key using the
583/// underlying hasher and take `hash % num_all_partitions` to derive
584/// the partition to send the message to. This will consistently
585/// cause messages with the same key to be sent to the same partition.
586///
587/// - Otherwise - a message with an "unspecified" `partition` and no
588/// key - `DefaultPartitioner` will "randomly" pick one from the
589/// "available" partitions trying to distribute the messages across
590/// the multiple partitions. In particular, it tries to distribute
591/// such messages across the "available" partitions in a round robin
592/// fashion. "Available" it this context means partitions with a
593/// known leader.
594///
595/// This behavior may not suffice every workload. If your application
596/// is dependent on a particular distribution scheme different from
597/// the one outlined above, you want to provide your own partioner to
598/// the `Producer` at its initialization time.
599///
600/// See `Builder::with_partitioner`.
601#[derive(Default)]
602pub struct DefaultPartitioner<H = BuildHasherDefault<DefaultHasher>> {
603 // ~ a hasher builder; used to consistently hash keys
604 hash_builder: H,
605 // ~ a counter incremented with each partitioned message to
606 // achieve a different partition assignment for each message
607 cntr: u32,
608}
609
610impl DefaultPartitioner {
611 /// Creates a new partitioner which will use the given hash
612 /// builder to hash message keys.
613 pub fn with_hasher<B: BuildHasher>(hash_builder: B) -> DefaultPartitioner<B> {
614 DefaultPartitioner {
615 hash_builder,
616 cntr: 0,
617 }
618 }
619
620 pub fn with_default_hasher<B>() -> DefaultPartitioner<BuildHasherDefault<B>>
621 where
622 B: Hasher + Default,
623 {
624 DefaultPartitioner {
625 hash_builder: BuildHasherDefault::<B>::default(),
626 cntr: 0,
627 }
628 }
629}
630
631impl<H: BuildHasher> Partitioner for DefaultPartitioner<H> {
632 #[allow(unused_variables)]
633 fn partition(&mut self, topics: Topics<'_>, rec: &mut client::ProduceMessage<'_, '_>) {
634 if rec.partition >= 0 {
635 // ~ partition explicitly defined, trust the user
636 return;
637 }
638 let partitions = match topics.partitions(rec.topic) {
639 None => return, // ~ unknown topic, this is not the place to deal with it.
640 Some(partitions) => partitions,
641 };
642 match rec.key {
643 Some(key) => {
644 let num_partitions = partitions.num_all();
645 if num_partitions == 0 {
646 // ~ no partitions at all ... a rather strange
647 // topic. again, this is not the right place to
648 // deal with it.
649 return;
650 }
651 let mut h = self.hash_builder.build_hasher();
652 h.write(key);
653 // ~ unconditionally dispatch to partitions no matter
654 // whether they are currently available or not. this
655 // guarantees consistency which is the point of
656 // partitioning by key. other behaviour - if desired
657 // - can be implemented in custom, user provided
658 // partitioners.
659 let hash = h.finish() as u32;
660 // if `num_partitions == u32::MAX` this can lead to a
661 // negative partition ... such a partition count is very
662 // unlikely though
663 rec.partition = (hash % num_partitions) as i32;
664 }
665 None => {
666 // ~ no key available, determine a partition from the
667 // available ones.
668 let avail = partitions.available_ids();
669 if !avail.is_empty() {
670 rec.partition = avail[self.cntr as usize % avail.len()];
671 // ~ update internal state so that the next time we choose
672 // a different partition
673 self.cntr = self.cntr.wrapping_add(1);
674 }
675 }
676 }
677 }
678}
679
680// --------------------------------------------------------------------
681
682#[cfg(test)]
683mod default_partitioner_tests {
684 use std::collections::HashMap;
685 use std::hash::{BuildHasherDefault, Hasher};
686
687 use super::{DefaultHasher, DefaultPartitioner, Partitioner, Partitions, Topics};
688 use crate::client;
689
690 fn topics_map(topics: Vec<(&str, Partitions)>) -> HashMap<String, Partitions> {
691 let mut h = HashMap::new();
692 for topic in topics {
693 h.insert(topic.0.into(), topic.1);
694 }
695 h
696 }
697
698 fn assert_partitioning<P: Partitioner>(
699 topics: &HashMap<String, Partitions>,
700 p: &mut P,
701 topic: &str,
702 key: &str,
703 ) -> i32 {
704 let mut msg = client::ProduceMessage {
705 key: Some(key.as_bytes()),
706 value: None,
707 topic,
708 partition: -1,
709 };
710 p.partition(Topics::new(topics), &mut msg);
711 let num_partitions = topics.get(topic).unwrap().num_all_partitions as i32;
712 assert!(msg.partition >= 0 && msg.partition < num_partitions);
713 msg.partition
714 }
715
716 /// Validate consistent partitioning on a message's key
717 #[test]
718 fn test_key_partitioning() {
719 let h = topics_map(vec![
720 (
721 "foo",
722 Partitions {
723 available_ids: vec![0, 1, 4],
724 num_all_partitions: 5,
725 },
726 ),
727 (
728 "bar",
729 Partitions {
730 available_ids: vec![0, 1],
731 num_all_partitions: 2,
732 },
733 ),
734 ]);
735
736 let mut p: DefaultPartitioner<BuildHasherDefault<DefaultHasher>> = Default::default();
737
738 // ~ validate that partitioning by the same key leads to the same
739 // partition
740 let h1 = assert_partitioning(&h, &mut p, "foo", "foo-key");
741 let h2 = assert_partitioning(&h, &mut p, "foo", "foo-key");
742 assert_eq!(h1, h2);
743
744 // ~ validate that partitioning by different keys leads to
745 // different partitions (the keys are chosen such that they lead
746 // to different partitions)
747 let h3 = assert_partitioning(&h, &mut p, "foo", "foo-key");
748 let h4 = assert_partitioning(&h, &mut p, "foo", "bar-key");
749 assert!(h3 != h4);
750 }
751
752 #[derive(Default)]
753 struct MyCustomHasher(u64);
754
755 impl Hasher for MyCustomHasher {
756 fn finish(&self) -> u64 {
757 self.0
758 }
759 fn write(&mut self, bytes: &[u8]) {
760 self.0 = bytes[0] as u64;
761 }
762 }
763
764 /// Validate it is possible to register a custom hasher with the
765 /// default partitioner
766 #[test]
767 fn default_partitioner_with_custom_hasher_default() {
768 // this must compile
769 let mut p = DefaultPartitioner::with_default_hasher::<MyCustomHasher>();
770
771 let h = topics_map(vec![
772 (
773 "confirms",
774 Partitions {
775 available_ids: vec![0, 1],
776 num_all_partitions: 2,
777 },
778 ),
779 (
780 "contents",
781 Partitions {
782 available_ids: vec![0, 1, 9],
783 num_all_partitions: 10,
784 },
785 ),
786 ]);
787
788 // verify also the partitioner derives the correct partition
789 // ... this is hash modulo num_all_partitions. here it is a
790 // topic with a total of 2 partitions.
791 let p1 = assert_partitioning(&h, &mut p, "confirms", "A" /* ascii: 65 */);
792 assert_eq!(1, p1);
793
794 // here it is a topic with a total of 10 partitions
795 let p2 = assert_partitioning(&h, &mut p, "contents", "B" /* ascii: 66 */);
796 assert_eq!(6, p2);
797 }
798}