
Struct KafkaClient

pub struct KafkaClient { /* private fields */ }
Expand description

Client struct keeping track of brokers and topic metadata.

Implements methods described by the Kafka Protocol.

You will have to load metadata before making any other request.



impl KafkaClient


pub fn new(hosts: Vec<String>) -> KafkaClient

Creates a new instance of KafkaClient. Before being able to successfully use the new client, you’ll have to load metadata.

let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_owned()));

pub fn hosts(&self) -> &[String]

Exposes the hosts used for discovery of the target kafka cluster. This set of hosts corresponds to the values supplied to KafkaClient::new.


pub fn set_client_id(&mut self, client_id: String)

Sets the client_id to be sent along every request to the remote Kafka brokers. By default, this value is the empty string.

Kafka brokers write out this client id to their request/response trace log - if configured appropriately.


pub fn client_id(&self) -> &str

Retrieves the current KafkaClient::set_client_id setting.


pub fn set_compression(&mut self, compression: Compression)

Sets the compression algorithm to use when sending out messages.

use kafka::client::{Compression, KafkaClient};

let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));

pub fn compression(&self) -> Compression

Retrieves the current KafkaClient::set_compression setting.


pub fn set_fetch_max_wait_time(&mut self, max_wait_time: Duration) -> Result<()>

Sets the maximum time in milliseconds to wait for insufficient data to become available when fetching messages.

See also KafkaClient::set_fetch_min_bytes(..) and KafkaClient::set_fetch_max_bytes_per_partition(..).


pub fn fetch_max_wait_time(&self) -> Duration

Retrieves the current KafkaClient::set_fetch_max_wait_time setting.


pub fn set_fetch_min_bytes(&mut self, min_bytes: i32)

Sets the minimum number of bytes of available data to wait for as long as specified by KafkaClient::set_fetch_max_wait_time when fetching messages.

By setting higher values in combination with the timeout the consumer can tune for throughput and trade a little additional latency for reading only large chunks of data (e.g. setting MaxWaitTime to 100 ms and setting MinBytes to 64k would allow the server to wait up to 100ms to try to accumulate 64k of data before responding).

use std::time::Duration;
use kafka::client::{KafkaClient, FetchPartition};

let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
client.set_fetch_min_bytes(64 * 1024);
let r = client.fetch_messages(&[FetchPartition::new("my-topic", 0, 0)]);

See also KafkaClient::set_fetch_max_wait_time(..) and KafkaClient::set_fetch_max_bytes_per_partition(..).


pub fn fetch_min_bytes(&self) -> i32

Retrieves the current KafkaClient::set_fetch_min_bytes setting.


pub fn set_fetch_max_bytes_per_partition(&mut self, max_bytes: i32)

Sets the default maximum number of bytes to obtain from a single kafka partition when fetching messages.

This basically determines the maximum message size this client will be able to fetch. If a topic partition contains a message larger than this specified number of bytes, the server will not deliver it.

Note that this setting is related to a single partition. The overall potential data size in a fetch messages response will thus be determined by the number of partitions in the fetch messages request times this “max bytes per partitions.”

This client will use this setting by default for all queried partitions, however, fetch_messages does allow you to override this setting for a particular partition being queried.

See also KafkaClient::set_fetch_max_wait_time, KafkaClient::set_fetch_min_bytes, and KafkaClient::fetch_messages.


pub fn fetch_max_bytes_per_partition(&self) -> i32

Retrieves the current KafkaClient::set_fetch_max_bytes_per_partition setting.


pub fn set_fetch_crc_validation(&mut self, validate_crc: bool)

Specifies whether the to perform CRC validation on fetched messages.

This ensures detection of on-the-wire or on-disk corruption to fetched messages. This check adds some overhead, so it may be disabled in cases seeking extreme performance.


pub fn fetch_crc_validation(&self) -> bool

Retrieves the current KafkaClient::set_fetch_crc_validation setting.


pub fn set_group_offset_storage(&mut self, storage: Option<GroupOffsetStorage>)

Specifies the group offset storage to address when fetching or committing group offsets.

In addition to Zookeeper, Kafka 0.8.2 brokers or later offer a more performant (and scalable) way to manage group offset directly by itself. Note that the remote storages are separate and independent on each other. Hence, you typically want consistently hard-code your choice in your program.

Unless you have a 0.8.1 broker or want to participate in a group which is already based on Zookeeper, you generally want to choose GroupOffsetStorage::Kafka here.

See also KafkaClient::fetch_group_offsets and KafkaClient::commit_offsets.


pub fn group_offset_storage(&self) -> Option<GroupOffsetStorage>

Retrieves the current KafkaClient::set_group_offset_storage settings.


pub fn set_retry_backoff_time(&mut self, time: Duration)

Specifies the time to wait before retrying a failed, repeatable operation against Kafka. This avoids retrying such operations in a tight loop.


pub fn retry_backoff_time(&self) -> Duration

Retrieves the current KafkaClient::set_retry_backoff_time setting.


pub fn set_retry_max_attempts(&mut self, attempts: u32)

Specifies the upper limit of retry attempts for failed, repeatable operations against kafka. This avoids retrying them forever.


pub fn retry_max_attempts(&self) -> u32

Retrieves the current KafkaClient::set_retry_max_attempts setting.


pub fn set_connection_idle_timeout(&mut self, timeout: Duration)

Specifies the timeout after which idle connections will transparently be closed/re-established by KafkaClient.

To be effective this value must be smaller than the remote broker’s connections.max.idle.ms setting.


pub fn connection_idle_timeout(&self) -> Duration

Retrieves the current KafkaClient::set_connection_idle_timeout setting.


pub fn topics(&self) -> Topics<'_>

Provides a view onto the currently loaded metadata of known .

use kafka::client::KafkaClient;
use kafka::client::metadata::Broker;

let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
for topic in client.topics() {
  for partition in topic.partitions() {
    println!("{} #{} => {}", topic.name(), partition.id(),

pub fn load_metadata_all(&mut self) -> Result<()>

Resets and loads metadata for all topics from the underlying brokers.

let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_owned()));
for topic in client.topics().names() {
  println!("topic: {}", topic);

Returns the metadata for all loaded topics underlying this client.


pub fn load_metadata<T: AsRef<str>>(&mut self, topics: &[T]) -> Result<()>

Reloads metadata for a list of supplied topics.

Note: if any of the specified topics does not exist yet on the underlying brokers and these have the configuration for “auto create topics” enabled, the remote kafka instance will create the yet missing topics on the fly as a result of explicitly loading their metadata. This is in contrast to other methods of this KafkaClient which will silently filter out requests to not-yet-loaded/not-yet-known topics and, thus, not cause topics to be automatically created.

let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_owned()));
let _ = client.load_metadata(&["my-topic"]).unwrap();

Returns the metadata for all loaded topics underlying this client (this might be more topics than specified right to this method call.)


pub fn reset_metadata(&mut self)

Clears metadata stored in the client. You must load metadata after this call if you want to use the client.


pub fn fetch_offsets<T: AsRef<str>>( &mut self, topics: &[T], offset: FetchOffset, ) -> Result<HashMap<String, Vec<PartitionOffset>>>

Fetch offsets for a list of topics

use kafka::client::KafkaClient;

let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
let topics: Vec<String> = client.topics().names().map(ToOwned::to_owned).collect();
let offsets = client.fetch_offsets(&topics, kafka::client::FetchOffset::Latest).unwrap();

Returns a mapping of topic name to PartitionOffsets for each currently available partition of the corresponding topic.


pub fn fetch_topic_offsets<T: AsRef<str>>( &mut self, topic: T, offset: FetchOffset, ) -> Result<Vec<PartitionOffset>>

Fetch offset for a single topic.

use kafka::client::{KafkaClient, FetchOffset};

let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
let offsets = client.fetch_topic_offsets("my-topic", FetchOffset::Latest).unwrap();

Returns a vector of the offset data for each available partition. See also KafkaClient::fetch_offsets.


pub fn fetch_messages<'a, I, J>(&mut self, input: I) -> Result<Vec<Response>>
where J: AsRef<FetchPartition<'a>>, I: IntoIterator<Item = J>,

Fetch messages from Kafka (multiple topic, partitions).

It takes a vector specifying the topic partitions and their offsets as of which to fetch messages. Additionally, the default “max fetch size per partition” can be explicitly overridden if it is “defined” - this is, if max_bytes is greater than zero.

The result is exposed in a raw, complicated manner but allows for very efficient consumption possibilities. All of the data available through the returned fetch responses is bound to their lifetime as that data is merely a “view” into parts of the response structs. If you need to keep individual messages for a longer time than the whole fetch responses, you’ll need to make a copy of the message data.

  • This method transparently uncompresses messages (while Kafka might sent them in compressed format.)

  • This method ensures to skip messages with a lower offset than requested (while Kafka might for efficiency reasons sent messages with a lower offset.)

Note: before using this method consider using kafka::consumer::Consumer instead which provides an easier to use API for the regular use-case of fetching messesage from Kafka.


This example demonstrates iterating all fetched messages from two topic partitions. From one partition we allow Kafka to deliver to us the default number bytes as defined by KafkaClient::set_fetch_max_bytes_per_partition, from the other partition we allow Kafka to deliver up to 1MiB of messages.

use kafka::client::{KafkaClient, FetchPartition};

let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
let reqs = &[FetchPartition::new("my-topic", 0, 0),
             FetchPartition::new("my-topic-2", 0, 0).with_max_bytes(1024*1024)];
let resps = client.fetch_messages(reqs).unwrap();
for resp in resps {
  for t in resp.topics() {
    for p in t.partitions() {
      match p.data() {
        Err(ref e) => {
          println!("partition error: {}:{}: {}", t.topic(), p.partition(), e)
        Ok(ref data) => {
          println!("topic: {} / partition: {} / latest available message offset: {}",
                   t.topic(), p.partition(), data.highwatermark_offset());
          for msg in data.messages() {
            println!("topic: {} / partition: {} / message.offset: {} / message.len: {}",
                     t.topic(), p.partition(), msg.offset, msg.value.len());

See also kafka::consumer. See also KafkaClient::set_fetch_max_bytes_per_partition.


pub fn fetch_messages_for_partition<'a>( &mut self, req: &FetchPartition<'a>, ) -> Result<Vec<Response>>

Fetch messages from a single kafka partition.

See KafkaClient::fetch_messages.


pub fn produce_messages<'a, 'b, I, J>( &mut self, acks: RequiredAcks, ack_timeout: Duration, messages: I, ) -> Result<Vec<ProduceConfirm>>
where J: AsRef<ProduceMessage<'a, 'b>>, I: IntoIterator<Item = J>,

Send a message to Kafka

required_acks - indicates how many acknowledgements the servers should receive before responding to the request

ack_timeout - provides a maximum time in milliseconds the server can await the receipt of the number of acknowledgements in required_acks

input - the set of ProduceMessages to send

Note: Unlike the higher-level Producer API, this method will not automatically determine the partition to deliver the message to. It will strictly try to send the message to the specified partition.

Note: Trying to send messages to non-existing topics or non-existing partitions will result in an error.

use std::time::Duration;
use kafka::client::{KafkaClient, ProduceMessage, RequiredAcks};

let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
let req = vec![ProduceMessage::new("my-topic", 0, None, Some("a".as_bytes())),
               ProduceMessage::new("my-topic-2", 0, None, Some("b".as_bytes()))];
let resp = client.produce_messages(RequiredAcks::One, Duration::from_millis(100), req);
println!("{:?}", resp);

The return value will contain a vector of topic, partition, offset and error if any OR error:Error.


pub fn commit_offsets<'a, J, I>( &mut self, group: &str, offsets: I, ) -> Result<()>
where J: AsRef<CommitOffset<'a>>, I: IntoIterator<Item = J>,

Commit offset for a topic partitions on behalf of a consumer group.

use kafka::client::{KafkaClient, CommitOffset};

let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
    &[CommitOffset::new("my-topic", 0, 100),
      CommitOffset::new("my-topic", 1, 99)])

In this example, we commit the offset 100 for the topic partition “my-topic:0” and 99 for the topic partition “my-topic:1”. Once successfully committed, these can then be retrieved using fetch_group_offsets even from another process or at much later point in time to resume comusing the topic partitions as of these offsets.


pub fn commit_offset( &mut self, group: &str, topic: &str, partition: i32, offset: i64, ) -> Result<()>

Commit offset of a particular topic partition on behalf of a consumer group.

use kafka::client::KafkaClient;

let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
client.commit_offset("my-group", "my-topic", 0, 100).unwrap();

See also KafkaClient::commit_offsets.


pub fn fetch_group_offsets<'a, J, I>( &mut self, group: &str, partitions: I, ) -> Result<HashMap<String, Vec<PartitionOffset>>>
where J: AsRef<FetchGroupOffset<'a>>, I: IntoIterator<Item = J>,

Fetch offset for a specified list of topic partitions of a consumer group

use kafka::client::{KafkaClient, FetchGroupOffset};

let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);

let offsets =
            &[FetchGroupOffset::new("my-topic", 0),
              FetchGroupOffset::new("my-topic", 1)])

See also KafkaClient::fetch_group_topic_offset.


pub fn fetch_group_topic_offset( &mut self, group: &str, topic: &str, ) -> Result<Vec<PartitionOffset>>

Fetch offset for all partitions of a particular topic of a consumer group

use kafka::client::KafkaClient;

let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
let offsets = client.fetch_group_topic_offset("my-group", "my-topic").unwrap();

Trait Implementations§


impl Debug for KafkaClient


fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§


impl<T> Any for T
where T: 'static + ?Sized,


fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more

impl<T> Borrow<T> for T
where T: ?Sized,


fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more

impl<T> BorrowMut<T> for T
where T: ?Sized,


fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more

impl<T> From<T> for T


fn from(t: T) -> T

Returns the argument unchanged.


impl<T> Instrument for T


fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more

impl<T, U> Into<U> for T
where U: From<T>,


fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.


impl<T, U> TryFrom<U> for T
where U: Into<T>,


type Error = Infallible

The type returned in the event of a conversion error.

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,


type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.

impl<V, T> VZip<V> for T
where V: MultiLane<T>,


fn vzip(self) -> V


impl<T> WithSubscriber for T


fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more