kafka/
producer.rs

1//! Kafka Producer - A higher-level API for sending messages to Kafka
2//! topics.
3//!
4//! This module hosts a multi-topic capable producer for a Kafka
5//! cluster providing a convenient API for sending messages
6//! synchronously.
7//!
8//! In Kafka, each message is a key/value pair where one or the other
9//! is optional.  A `Record` represents all the data necessary to
10//! produce such a message to Kafka using the `Producer`.  It
11//! specifies the target topic and the target partition the message is
12//! supposed to be delivered to as well as the key and the value.
13//!
14//! # Example
15//! ```no_run
16//! use std::fmt::Write;
17//! use std::time::Duration;
18//! use kafka::producer::{Producer, Record, RequiredAcks};
19//!
20//! let mut producer =
21//!     Producer::from_hosts(vec!("localhost:9092".to_owned()))
22//!         .with_ack_timeout(Duration::from_secs(1))
23//!         .with_required_acks(RequiredAcks::One)
24//!         .create()
25//!         .unwrap();
26//!
27//! let mut buf = String::with_capacity(2);
28//! for i in 0..10 {
29//!   let _ = write!(&mut buf, "{}", i); // some computation of the message data to be sent
30//!   producer.send(&Record::from_value("my-topic", buf.as_bytes())).unwrap();
31//!   buf.clear();
32//! }
33//! ```
34//!
35//! In this example, when the `producer.send(..)` returns
36//! successfully, we are guaranteed the message is delivered to Kafka
37//! and persisted by at least one Kafka broker.  However, when sending
38//! multiple messages just like in this example, it is more efficient
39//! to send them in batches using `Producer::send_all`.
40//!
41//! Since some of the `Record`s attributes are optional, convenience
42//! methods exist to ease their creation.  In this example, the call
43//! to `Record::from_value` creates a key-less, value-only record with
44//! an unspecified partition.  The `Record` struct, however, is
45//! intended to provide full control over its lifecycle to client
46//! code, and, hence, is fully open.  Its current constructor methods
47//! are provided for convenience only.
48//!
49//! Beside the target topic, key, and the value of a `Record`, client
50//! code is allowed to specify the topic partition the message is
51//! supposed to be delivered to.  If the partition of a `Record` is
52//! not specified - more precisely speaking if it's negative -
53//! `Producer` will rely on its underlying `Partitioner` to find a
54//! suitable one.  A `Partitioner` implementation can be supplied by
55//! client code at the `Producer`'s construction time and defaults to
56//! `DefaultPartitioner`.  See that for more information for its
57//! strategy to find a partition.
58
59// XXX 1) rethink return values for the send_all() method
60// XXX 2) Handle recoverable errors behind the scenes through retry attempts
61
62use crate::client::{self, KafkaClient};
63use crate::error::{Error, Result};
64use std::collections::HashMap;
65use std::fmt;
66use std::hash::{BuildHasher, BuildHasherDefault, Hasher};
67use std::slice::from_ref;
68use std::time::Duration;
69use twox_hash::XxHash32;
70
71#[cfg(feature = "security")]
72use crate::client::SecurityConfig;
73
74#[cfg(not(feature = "security"))]
75type SecurityConfig = ();
76use crate::client_internals::KafkaClientInternals;
77use crate::protocol;
78
79// public re-exports
80pub use crate::client::{Compression, ProduceConfirm, ProducePartitionConfirm, RequiredAcks};
81
82/// The default value for `Builder::with_ack_timeout`.
83pub const DEFAULT_ACK_TIMEOUT_MILLIS: u64 = 30 * 1000;
84
85/// The default value for `Builder::with_required_acks`.
86pub const DEFAULT_REQUIRED_ACKS: RequiredAcks = RequiredAcks::One;
87
88// --------------------------------------------------------------------
89
90/// A trait used by `Producer` to obtain the bytes `Record::key` and
91/// `Record::value` represent.  This leaves the choice of the types
92/// for `key` and `value` with the client.
93pub trait AsBytes {
94    fn as_bytes(&self) -> &[u8];
95}
96
97impl AsBytes for () {
98    fn as_bytes(&self) -> &[u8] {
99        &[]
100    }
101}
102
103// There seems to be some compiler issue with this:
104// impl<T: AsRef<[u8]>> AsBytes for T {
105//     fn as_bytes(&self) -> &[u8] { self.as_ref() }
106// }
107
108// for now we provide the impl for some standard library types
109impl AsBytes for String {
110    fn as_bytes(&self) -> &[u8] {
111        self.as_ref()
112    }
113}
114impl AsBytes for Vec<u8> {
115    fn as_bytes(&self) -> &[u8] {
116        self.as_ref()
117    }
118}
119
120impl<'a> AsBytes for &'a [u8] {
121    fn as_bytes(&self) -> &[u8] {
122        self
123    }
124}
125impl<'a> AsBytes for &'a str {
126    fn as_bytes(&self) -> &[u8] {
127        str::as_bytes(self)
128    }
129}
130
131// --------------------------------------------------------------------
132
133/// A structure representing a message to be sent to Kafka through the
134/// `Producer` API.  Such a message is basically a key/value pair
135/// specifying the target topic and optionally the topic's partition.
136pub struct Record<'a, K, V> {
137    /// Key data of this (message) record.
138    pub key: K,
139
140    /// Value data of this (message) record.
141    pub value: V,
142
143    /// Name of the topic this message is supposed to be delivered to.
144    pub topic: &'a str,
145
146    /// The partition id of the topic to deliver this message to.
147    /// This partition may be `< 0` in which case it is considered
148    /// "unspecified".  A `Producer` will then typically try to derive
149    /// a partition on its own.
150    pub partition: i32,
151}
152
153impl<'a, K, V> Record<'a, K, V> {
154    /// Convenience function to create a new key/value record with an
155    /// "unspecified" partition - this is, a partition set to a negative
156    /// value.
157    #[inline]
158    pub fn from_key_value(topic: &'a str, key: K, value: V) -> Record<'a, K, V> {
159        Record {
160            key,
161            value,
162            topic,
163            partition: -1,
164        }
165    }
166
167    /// Convenience method to set the partition.
168    #[inline]
169    pub fn with_partition(mut self, partition: i32) -> Self {
170        self.partition = partition;
171        self
172    }
173}
174
175impl<'a, V> Record<'a, (), V> {
176    /// Convenience function to create a new value only record with an
177    /// "unspecified" partition - this is, a partition set to a negative
178    /// value.
179    #[inline]
180    pub fn from_value(topic: &'a str, value: V) -> Record<'a, (), V> {
181        Record {
182            key: (),
183            value,
184            topic,
185            partition: -1,
186        }
187    }
188}
189
190impl<'a, K: fmt::Debug, V: fmt::Debug> fmt::Debug for Record<'a, K, V> {
191    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
192        write!(
193            f,
194            "Record {{ topic: {}, partition: {}, key: {:?}, value: {:?} }}",
195            self.topic, self.partition, self.key, self.value
196        )
197    }
198}
199
200// --------------------------------------------------------------------
201
202/// The Kafka Producer
203///
204/// See module level documentation.
205pub struct Producer<P = DefaultPartitioner> {
206    client: KafkaClient,
207    state: State<P>,
208    config: Config,
209}
210
211struct State<P> {
212    /// A list of available partition IDs for each topic.
213    partitions: HashMap<String, Partitions>,
214    /// The partitioner to decide how to distribute messages
215    partitioner: P,
216}
217
218struct Config {
219    /// The maximum time to wait for acknowledgements. See
220    /// `KafkaClient::produce_messages`.
221    ack_timeout: i32,
222    /// The number of acks to request. See
223    /// `KafkaClient::produce_messages`.
224    required_acks: i16,
225}
226
227impl Producer {
228    /// Starts building a new producer using the given Kafka client.
229    pub fn from_client(client: KafkaClient) -> Builder<DefaultPartitioner> {
230        Builder::new(Some(client), Vec::new())
231    }
232
233    /// Starts building a producer bootstraping internally a new kafka
234    /// client from the given kafka hosts.
235    pub fn from_hosts(hosts: Vec<String>) -> Builder<DefaultPartitioner> {
236        Builder::new(None, hosts)
237    }
238
239    /// Borrows the underlying kafka client.
240    pub fn client(&self) -> &KafkaClient {
241        &self.client
242    }
243
244    /// Borrows the underlying kafka client as mut.
245    pub fn client_mut(&mut self) -> &mut KafkaClient {
246        &mut self.client
247    }
248
249    /// Destroys this producer returning the underlying kafka client.
250    pub fn into_client(self) -> KafkaClient {
251        self.client
252    }
253}
254
255impl<P: Partitioner> Producer<P> {
256    /// Synchronously send the specified message to Kafka.
257    pub fn send<'a, K, V>(&mut self, rec: &Record<'a, K, V>) -> Result<()>
258    where
259        K: AsBytes,
260        V: AsBytes,
261    {
262        let mut rs = self.send_all(from_ref(rec))?;
263
264        if self.config.required_acks == 0 {
265            // ~ with no required_acks we get no response and
266            // consider the send-data request blindly as successful
267            Ok(())
268        } else {
269            assert_eq!(1, rs.len());
270            let mut produce_confirm = rs.pop().unwrap();
271
272            assert_eq!(1, produce_confirm.partition_confirms.len());
273            produce_confirm
274                .partition_confirms
275                .pop()
276                .unwrap()
277                .offset
278                .map_err(Error::Kafka)?;
279            Ok(())
280        }
281    }
282
283    /// Synchronously send all of the specified messages to Kafka. To validate
284    /// that all of the specified records have been successfully delivered,
285    /// inspection of the offsets on the returned confirms is necessary.
286    pub fn send_all<'a, K, V>(&mut self, recs: &[Record<'a, K, V>]) -> Result<Vec<ProduceConfirm>>
287    where
288        K: AsBytes,
289        V: AsBytes,
290    {
291        let partitioner = &mut self.state.partitioner;
292        let partitions = &self.state.partitions;
293        let client = &mut self.client;
294        let config = &self.config;
295
296        client.internal_produce_messages(
297            config.required_acks,
298            config.ack_timeout,
299            recs.iter().map(|r| {
300                let mut m = client::ProduceMessage {
301                    key: to_option(r.key.as_bytes()),
302                    value: to_option(r.value.as_bytes()),
303                    topic: r.topic,
304                    partition: r.partition,
305                };
306                partitioner.partition(Topics::new(partitions), &mut m);
307                m
308            }),
309        )
310    }
311}
312
313fn to_option(data: &[u8]) -> Option<&[u8]> {
314    if data.is_empty() {
315        None
316    } else {
317        Some(data)
318    }
319}
320
321// --------------------------------------------------------------------
322
323impl<P> State<P> {
324    fn new(client: &mut KafkaClient, partitioner: P) -> Result<State<P>> {
325        let ts = client.topics();
326        let mut ids = HashMap::with_capacity(ts.len());
327        for t in ts {
328            let ps = t.partitions();
329            ids.insert(
330                t.name().to_owned(),
331                Partitions {
332                    available_ids: ps.available_ids(),
333                    num_all_partitions: ps.len() as u32,
334                },
335            );
336        }
337        Ok(State {
338            partitions: ids,
339            partitioner,
340        })
341    }
342}
343
344// --------------------------------------------------------------------
345
346/// A Kafka Producer builder easing the process of setting up various
347/// configuration settings.
348pub struct Builder<P = DefaultPartitioner> {
349    client: Option<KafkaClient>,
350    hosts: Vec<String>,
351    compression: Compression,
352    ack_timeout: Duration,
353    conn_idle_timeout: Duration,
354    required_acks: RequiredAcks,
355    partitioner: P,
356    security_config: Option<SecurityConfig>,
357    client_id: Option<String>,
358}
359
360impl Builder {
361    fn new(client: Option<KafkaClient>, hosts: Vec<String>) -> Builder<DefaultPartitioner> {
362        let mut b = Builder {
363            client,
364            hosts,
365            compression: client::DEFAULT_COMPRESSION,
366            ack_timeout: Duration::from_millis(DEFAULT_ACK_TIMEOUT_MILLIS),
367            conn_idle_timeout: Duration::from_millis(
368                client::DEFAULT_CONNECTION_IDLE_TIMEOUT_MILLIS,
369            ),
370            required_acks: DEFAULT_REQUIRED_ACKS,
371            partitioner: DefaultPartitioner::default(),
372            security_config: None,
373            client_id: None,
374        };
375        if let Some(ref c) = b.client {
376            b.compression = c.compression();
377            b.conn_idle_timeout = c.connection_idle_timeout();
378        }
379        b
380    }
381
382    /// Specifies the security config to use.
383    /// See `KafkaClient::new_secure` for more info.
384    #[cfg(feature = "security")]
385    pub fn with_security(mut self, security: SecurityConfig) -> Self {
386        self.security_config = Some(security);
387        self
388    }
389
390    /// Sets the compression algorithm to use when sending out data.
391    ///
392    /// See `KafkaClient::set_compression`.
393    pub fn with_compression(mut self, compression: Compression) -> Self {
394        self.compression = compression;
395        self
396    }
397
398    /// Sets the maximum time the kafka brokers can await the receipt
399    /// of required acknowledgements (which is specified through
400    /// `Builder::with_required_acks`.)  Note that Kafka explicitly
401    /// documents this not to be a hard limit.
402    pub fn with_ack_timeout(mut self, timeout: Duration) -> Self {
403        self.ack_timeout = timeout;
404        self
405    }
406
407    /// Specifies the timeout for idle connections.
408    /// See `KafkaClient::set_connection_idle_timeout`.
409    pub fn with_connection_idle_timeout(mut self, timeout: Duration) -> Self {
410        self.conn_idle_timeout = timeout;
411        self
412    }
413
414    /// Sets how many acknowledgements the kafka brokers should
415    /// receive before responding to sent messages.
416    ///
417    /// See `RequiredAcks`.
418    pub fn with_required_acks(mut self, acks: RequiredAcks) -> Self {
419        self.required_acks = acks;
420        self
421    }
422
423    /// Specifies a client_id to be sent along every request to Kafka
424    /// brokers. See `KafkaClient::set_client_id`.
425    pub fn with_client_id(mut self, client_id: String) -> Self {
426        self.client_id = Some(client_id);
427        self
428    }
429}
430
431impl<P> Builder<P> {
432    /// Sets the partitioner to dispatch when sending messages without
433    /// an explicit partition assignment.
434    pub fn with_partitioner<Q: Partitioner>(self, partitioner: Q) -> Builder<Q> {
435        Builder {
436            client: self.client,
437            hosts: self.hosts,
438            compression: self.compression,
439            ack_timeout: self.ack_timeout,
440            conn_idle_timeout: self.conn_idle_timeout,
441            required_acks: self.required_acks,
442            partitioner,
443            security_config: None,
444            client_id: None,
445        }
446    }
447
448    #[cfg(not(feature = "security"))]
449    fn new_kafka_client(hosts: Vec<String>, _: Option<SecurityConfig>) -> KafkaClient {
450        KafkaClient::new(hosts)
451    }
452
453    #[cfg(feature = "security")]
454    fn new_kafka_client(hosts: Vec<String>, security: Option<SecurityConfig>) -> KafkaClient {
455        if let Some(security) = security {
456            KafkaClient::new_secure(hosts, security)
457        } else {
458            KafkaClient::new(hosts)
459        }
460    }
461
462    /// Finally creates/builds a new producer based on the so far
463    /// supplied settings.
464    pub fn create(self) -> Result<Producer<P>> {
465        // ~ create the client if necessary
466        let (mut client, need_metadata) = match self.client {
467            Some(client) => (client, false),
468            None => (
469                Self::new_kafka_client(self.hosts, self.security_config),
470                true,
471            ),
472        };
473        // ~ apply configuration settings
474        client.set_compression(self.compression);
475        client.set_connection_idle_timeout(self.conn_idle_timeout);
476        if let Some(client_id) = self.client_id {
477            client.set_client_id(client_id);
478        }
479        let producer_config = Config {
480            ack_timeout: protocol::to_millis_i32(self.ack_timeout)?,
481            required_acks: self.required_acks as i16,
482        };
483        // ~ load metadata if necessary
484        if need_metadata {
485            client.load_metadata_all()?;
486        }
487        // ~ create producer state
488        let state = State::new(&mut client, self.partitioner)?;
489        Ok(Producer {
490            client,
491            state,
492            config: producer_config,
493        })
494    }
495}
496
497// --------------------------------------------------------------------
498
499/// A description of available topics and their available partitions.
500///
501/// Indented for use by `Partitioner`s.
502pub struct Topics<'a> {
503    partitions: &'a HashMap<String, Partitions>,
504}
505
506/// Producer relevant partition information of a particular topic.
507///
508/// Indented for use by `Partition`s.
509pub struct Partitions {
510    available_ids: Vec<i32>,
511    num_all_partitions: u32,
512}
513
514impl Partitions {
515    /// Retrieves the list of the identifiers of currently "available"
516    /// partitions for the given topic.  This list excludes partitions
517    /// which do not have a leader broker assigned.
518    #[inline]
519    pub fn available_ids(&self) -> &[i32] {
520        &self.available_ids
521    }
522
523    /// Retrieves the number of "available" partitions. This is a
524    /// merely a convenience method. See `Partitions::available_ids`.
525    #[inline]
526    pub fn num_available(&self) -> u32 {
527        self.available_ids.len() as u32
528    }
529
530    /// The total number of partitions of the underlygin topic.  This
531    /// number includes also partitions without a current leader
532    /// assignment.
533    #[inline]
534    pub fn num_all(&self) -> u32 {
535        self.num_all_partitions
536    }
537}
538
539impl<'a> Topics<'a> {
540    fn new(partitions: &'a HashMap<String, Partitions>) -> Topics<'a> {
541        Topics { partitions }
542    }
543
544    /// Retrieves informationa about a topic's partitions.
545    #[inline]
546    pub fn partitions(&'a self, topic: &str) -> Option<&'a Partitions> {
547        self.partitions.get(topic)
548    }
549}
550
551/// A partitioner is given a chance to choose/redefine a partition for
552/// a message to be sent to Kafka.  See also
553/// `Record#with_partition`.
554///
555/// Implementations can be stateful.
556pub trait Partitioner {
557    /// Supposed to inspect the given message and if desired re-assign
558    /// the message's target partition.
559    ///
560    /// `topics` a description of the currently known topics and their
561    /// currently available partitions.
562    ///
563    /// `msg` the message whose partition assignment potentially to
564    /// change.
565    fn partition(&mut self, topics: Topics<'_>, msg: &mut client::ProduceMessage<'_, '_>);
566}
567
568/// The default hasher implementation used of `DefaultPartitioner`.
569pub type DefaultHasher = XxHash32;
570
571/// As its name implies `DefaultPartitioner` is the default
572/// partitioner for `Producer`.
573///
574/// For every message it proceeds as follows:
575///
576/// - If the messages contains a non-negative partition value it
577/// leaves the message untouched.  This will cause `Producer` to try
578/// to send the message to exactly that partition to.
579///
580/// - Otherwise, if the message has an "unspecified" `partition` -
581/// this is, it has a negative partition value - and a specified key,
582/// `DefaultPartitioner` will compute a hash from the key using the
583/// underlying hasher and take `hash % num_all_partitions` to derive
584/// the partition to send the message to.  This will consistently
585/// cause messages with the same key to be sent to the same partition.
586///
587/// - Otherwise - a message with an "unspecified" `partition` and no
588/// key - `DefaultPartitioner` will "randomly" pick one from the
589/// "available" partitions trying to distribute the messages across
590/// the multiple partitions.  In particular, it tries to distribute
591/// such messages across the "available" partitions in a round robin
592/// fashion.  "Available" it this context means partitions with a
593/// known leader.
594///
595/// This behavior may not suffice every workload.  If your application
596/// is dependent on a particular distribution scheme different from
597/// the one outlined above, you want to provide your own partioner to
598/// the `Producer` at its initialization time.
599///
600/// See `Builder::with_partitioner`.
601#[derive(Default)]
602pub struct DefaultPartitioner<H = BuildHasherDefault<DefaultHasher>> {
603    // ~ a hasher builder; used to consistently hash keys
604    hash_builder: H,
605    // ~ a counter incremented with each partitioned message to
606    // achieve a different partition assignment for each message
607    cntr: u32,
608}
609
610impl DefaultPartitioner {
611    /// Creates a new partitioner which will use the given hash
612    /// builder to hash message keys.
613    pub fn with_hasher<B: BuildHasher>(hash_builder: B) -> DefaultPartitioner<B> {
614        DefaultPartitioner {
615            hash_builder,
616            cntr: 0,
617        }
618    }
619
620    pub fn with_default_hasher<B>() -> DefaultPartitioner<BuildHasherDefault<B>>
621    where
622        B: Hasher + Default,
623    {
624        DefaultPartitioner {
625            hash_builder: BuildHasherDefault::<B>::default(),
626            cntr: 0,
627        }
628    }
629}
630
631impl<H: BuildHasher> Partitioner for DefaultPartitioner<H> {
632    #[allow(unused_variables)]
633    fn partition(&mut self, topics: Topics<'_>, rec: &mut client::ProduceMessage<'_, '_>) {
634        if rec.partition >= 0 {
635            // ~ partition explicitly defined, trust the user
636            return;
637        }
638        let partitions = match topics.partitions(rec.topic) {
639            None => return, // ~ unknown topic, this is not the place to deal with it.
640            Some(partitions) => partitions,
641        };
642        match rec.key {
643            Some(key) => {
644                let num_partitions = partitions.num_all();
645                if num_partitions == 0 {
646                    // ~ no partitions at all ... a rather strange
647                    // topic. again, this is not the right place to
648                    // deal with it.
649                    return;
650                }
651                let mut h = self.hash_builder.build_hasher();
652                h.write(key);
653                // ~ unconditionally dispatch to partitions no matter
654                // whether they are currently available or not.  this
655                // guarantees consistency which is the point of
656                // partitioning by key.  other behaviour - if desired
657                // - can be implemented in custom, user provided
658                // partitioners.
659                let hash = h.finish() as u32;
660                // if `num_partitions == u32::MAX` this can lead to a
661                // negative partition ... such a partition count is very
662                // unlikely though
663                rec.partition = (hash % num_partitions) as i32;
664            }
665            None => {
666                // ~ no key available, determine a partition from the
667                // available ones.
668                let avail = partitions.available_ids();
669                if !avail.is_empty() {
670                    rec.partition = avail[self.cntr as usize % avail.len()];
671                    // ~ update internal state so that the next time we choose
672                    // a different partition
673                    self.cntr = self.cntr.wrapping_add(1);
674                }
675            }
676        }
677    }
678}
679
680// --------------------------------------------------------------------
681
682#[cfg(test)]
683mod default_partitioner_tests {
684    use std::collections::HashMap;
685    use std::hash::{BuildHasherDefault, Hasher};
686
687    use super::{DefaultHasher, DefaultPartitioner, Partitioner, Partitions, Topics};
688    use crate::client;
689
690    fn topics_map(topics: Vec<(&str, Partitions)>) -> HashMap<String, Partitions> {
691        let mut h = HashMap::new();
692        for topic in topics {
693            h.insert(topic.0.into(), topic.1);
694        }
695        h
696    }
697
698    fn assert_partitioning<P: Partitioner>(
699        topics: &HashMap<String, Partitions>,
700        p: &mut P,
701        topic: &str,
702        key: &str,
703    ) -> i32 {
704        let mut msg = client::ProduceMessage {
705            key: Some(key.as_bytes()),
706            value: None,
707            topic,
708            partition: -1,
709        };
710        p.partition(Topics::new(topics), &mut msg);
711        let num_partitions = topics.get(topic).unwrap().num_all_partitions as i32;
712        assert!(msg.partition >= 0 && msg.partition < num_partitions);
713        msg.partition
714    }
715
716    /// Validate consistent partitioning on a message's key
717    #[test]
718    fn test_key_partitioning() {
719        let h = topics_map(vec![
720            (
721                "foo",
722                Partitions {
723                    available_ids: vec![0, 1, 4],
724                    num_all_partitions: 5,
725                },
726            ),
727            (
728                "bar",
729                Partitions {
730                    available_ids: vec![0, 1],
731                    num_all_partitions: 2,
732                },
733            ),
734        ]);
735
736        let mut p: DefaultPartitioner<BuildHasherDefault<DefaultHasher>> = Default::default();
737
738        // ~ validate that partitioning by the same key leads to the same
739        // partition
740        let h1 = assert_partitioning(&h, &mut p, "foo", "foo-key");
741        let h2 = assert_partitioning(&h, &mut p, "foo", "foo-key");
742        assert_eq!(h1, h2);
743
744        // ~ validate that partitioning by different keys leads to
745        // different partitions (the keys are chosen such that they lead
746        // to different partitions)
747        let h3 = assert_partitioning(&h, &mut p, "foo", "foo-key");
748        let h4 = assert_partitioning(&h, &mut p, "foo", "bar-key");
749        assert!(h3 != h4);
750    }
751
752    #[derive(Default)]
753    struct MyCustomHasher(u64);
754
755    impl Hasher for MyCustomHasher {
756        fn finish(&self) -> u64 {
757            self.0
758        }
759        fn write(&mut self, bytes: &[u8]) {
760            self.0 = bytes[0] as u64;
761        }
762    }
763
764    /// Validate it is possible to register a custom hasher with the
765    /// default partitioner
766    #[test]
767    fn default_partitioner_with_custom_hasher_default() {
768        // this must compile
769        let mut p = DefaultPartitioner::with_default_hasher::<MyCustomHasher>();
770
771        let h = topics_map(vec![
772            (
773                "confirms",
774                Partitions {
775                    available_ids: vec![0, 1],
776                    num_all_partitions: 2,
777                },
778            ),
779            (
780                "contents",
781                Partitions {
782                    available_ids: vec![0, 1, 9],
783                    num_all_partitions: 10,
784                },
785            ),
786        ]);
787
788        // verify also the partitioner derives the correct partition
789        // ... this is hash modulo num_all_partitions. here it is a
790        // topic with a total of 2 partitions.
791        let p1 = assert_partitioning(&h, &mut p, "confirms", "A" /* ascii: 65 */);
792        assert_eq!(1, p1);
793
794        // here it is a topic with a total of 10 partitions
795        let p2 = assert_partitioning(&h, &mut p, "contents", "B" /* ascii: 66 */);
796        assert_eq!(6, p2);
797    }
798}