pub struct KafkaClient { /* private fields */ }
Expand description
Client struct keeping track of brokers and topic metadata.
Implements methods described by the Kafka Protocol.
You will have to load metadata before making any other request.
Implementations§
source§impl KafkaClient
impl KafkaClient
sourcepub fn new(hosts: Vec<String>) -> KafkaClient
pub fn new(hosts: Vec<String>) -> KafkaClient
Creates a new instance of KafkaClient. Before being able to successfully use the new client, you’ll have to load metadata.
§Examples
let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_owned()));
client.load_metadata_all().unwrap();
sourcepub fn hosts(&self) -> &[String]
pub fn hosts(&self) -> &[String]
Exposes the hosts used for discovery of the target kafka
cluster. This set of hosts corresponds to the values supplied
to KafkaClient::new
.
sourcepub fn set_client_id(&mut self, client_id: String)
pub fn set_client_id(&mut self, client_id: String)
Sets the client_id to be sent along every request to the remote Kafka brokers. By default, this value is the empty string.
Kafka brokers write out this client id to their request/response trace log - if configured appropriately.
sourcepub fn set_compression(&mut self, compression: Compression)
pub fn set_compression(&mut self, compression: Compression)
Sets the compression algorithm to use when sending out messages.
§Example
use kafka::client::{Compression, KafkaClient};
let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
client.load_metadata_all().unwrap();
client.set_compression(Compression::NONE);
sourcepub fn compression(&self) -> Compression
pub fn compression(&self) -> Compression
Retrieves the current KafkaClient::set_compression
setting.
sourcepub fn set_fetch_max_wait_time(&mut self, max_wait_time: Duration) -> Result<()>
pub fn set_fetch_max_wait_time(&mut self, max_wait_time: Duration) -> Result<()>
Sets the maximum time in milliseconds to wait for insufficient data to become available when fetching messages.
See also KafkaClient::set_fetch_min_bytes(..)
and
KafkaClient::set_fetch_max_bytes_per_partition(..)
.
sourcepub fn fetch_max_wait_time(&self) -> Duration
pub fn fetch_max_wait_time(&self) -> Duration
Retrieves the current KafkaClient::set_fetch_max_wait_time
setting.
sourcepub fn set_fetch_min_bytes(&mut self, min_bytes: i32)
pub fn set_fetch_min_bytes(&mut self, min_bytes: i32)
Sets the minimum number of bytes of available data to wait for
as long as specified by KafkaClient::set_fetch_max_wait_time
when fetching messages.
By setting higher values in combination with the timeout the consumer can tune for throughput and trade a little additional latency for reading only large chunks of data (e.g. setting MaxWaitTime to 100 ms and setting MinBytes to 64k would allow the server to wait up to 100ms to try to accumulate 64k of data before responding).
§Example
use std::time::Duration;
use kafka::client::{KafkaClient, FetchPartition};
let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
client.load_metadata_all().unwrap();
client.set_fetch_max_wait_time(Duration::from_millis(100));
client.set_fetch_min_bytes(64 * 1024);
let r = client.fetch_messages(&[FetchPartition::new("my-topic", 0, 0)]);
See also KafkaClient::set_fetch_max_wait_time(..)
and
KafkaClient::set_fetch_max_bytes_per_partition(..)
.
sourcepub fn fetch_min_bytes(&self) -> i32
pub fn fetch_min_bytes(&self) -> i32
Retrieves the current KafkaClient::set_fetch_min_bytes
setting.
sourcepub fn set_fetch_max_bytes_per_partition(&mut self, max_bytes: i32)
pub fn set_fetch_max_bytes_per_partition(&mut self, max_bytes: i32)
Sets the default maximum number of bytes to obtain from a single kafka partition when fetching messages.
This basically determines the maximum message size this client will be able to fetch. If a topic partition contains a message larger than this specified number of bytes, the server will not deliver it.
Note that this setting is related to a single partition. The overall potential data size in a fetch messages response will thus be determined by the number of partitions in the fetch messages request times this “max bytes per partitions.”
This client will use this setting by default for all queried
partitions, however, fetch_messages
does allow you to
override this setting for a particular partition being
queried.
See also KafkaClient::set_fetch_max_wait_time
,
KafkaClient::set_fetch_min_bytes
, and KafkaClient::fetch_messages
.
sourcepub fn fetch_max_bytes_per_partition(&self) -> i32
pub fn fetch_max_bytes_per_partition(&self) -> i32
Retrieves the current
KafkaClient::set_fetch_max_bytes_per_partition
setting.
sourcepub fn set_fetch_crc_validation(&mut self, validate_crc: bool)
pub fn set_fetch_crc_validation(&mut self, validate_crc: bool)
Specifies whether the to perform CRC validation on fetched messages.
This ensures detection of on-the-wire or on-disk corruption to fetched messages. This check adds some overhead, so it may be disabled in cases seeking extreme performance.
sourcepub fn fetch_crc_validation(&self) -> bool
pub fn fetch_crc_validation(&self) -> bool
Retrieves the current KafkaClient::set_fetch_crc_validation
setting.
sourcepub fn set_group_offset_storage(&mut self, storage: Option<GroupOffsetStorage>)
pub fn set_group_offset_storage(&mut self, storage: Option<GroupOffsetStorage>)
Specifies the group offset storage to address when fetching or committing group offsets.
In addition to Zookeeper, Kafka 0.8.2 brokers or later offer a more performant (and scalable) way to manage group offset directly by itself. Note that the remote storages are separate and independent on each other. Hence, you typically want consistently hard-code your choice in your program.
Unless you have a 0.8.1 broker or want to participate in a
group which is already based on Zookeeper, you generally want
to choose GroupOffsetStorage::Kafka
here.
See also KafkaClient::fetch_group_offsets
and
KafkaClient::commit_offsets
.
sourcepub fn group_offset_storage(&self) -> Option<GroupOffsetStorage>
pub fn group_offset_storage(&self) -> Option<GroupOffsetStorage>
Retrieves the current KafkaClient::set_group_offset_storage
settings.
sourcepub fn set_retry_backoff_time(&mut self, time: Duration)
pub fn set_retry_backoff_time(&mut self, time: Duration)
Specifies the time to wait before retrying a failed, repeatable operation against Kafka. This avoids retrying such operations in a tight loop.
sourcepub fn retry_backoff_time(&self) -> Duration
pub fn retry_backoff_time(&self) -> Duration
Retrieves the current KafkaClient::set_retry_backoff_time
setting.
sourcepub fn set_retry_max_attempts(&mut self, attempts: u32)
pub fn set_retry_max_attempts(&mut self, attempts: u32)
Specifies the upper limit of retry attempts for failed, repeatable operations against kafka. This avoids retrying them forever.
sourcepub fn retry_max_attempts(&self) -> u32
pub fn retry_max_attempts(&self) -> u32
Retrieves the current KafkaClient::set_retry_max_attempts
setting.
sourcepub fn set_connection_idle_timeout(&mut self, timeout: Duration)
pub fn set_connection_idle_timeout(&mut self, timeout: Duration)
Specifies the timeout after which idle connections will
transparently be closed/re-established by KafkaClient
.
To be effective this value must be smaller than the remote
broker’s connections.max.idle.ms
setting.
sourcepub fn connection_idle_timeout(&self) -> Duration
pub fn connection_idle_timeout(&self) -> Duration
Retrieves the current
KafkaClient::set_connection_idle_timeout
setting.
sourcepub fn topics(&self) -> Topics<'_>
pub fn topics(&self) -> Topics<'_>
Provides a view onto the currently loaded metadata of known .
§Examples
use kafka::client::KafkaClient;
use kafka::client::metadata::Broker;
let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
client.load_metadata_all().unwrap();
for topic in client.topics() {
for partition in topic.partitions() {
println!("{} #{} => {}", topic.name(), partition.id(),
partition.leader()
.map(Broker::host)
.unwrap_or("no-leader!"));
}
}
sourcepub fn load_metadata_all(&mut self) -> Result<()>
pub fn load_metadata_all(&mut self) -> Result<()>
Resets and loads metadata for all topics from the underlying brokers.
§Examples
let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_owned()));
client.load_metadata_all().unwrap();
for topic in client.topics().names() {
println!("topic: {}", topic);
}
Returns the metadata for all loaded topics underlying this client.
sourcepub fn load_metadata<T: AsRef<str>>(&mut self, topics: &[T]) -> Result<()>
pub fn load_metadata<T: AsRef<str>>(&mut self, topics: &[T]) -> Result<()>
Reloads metadata for a list of supplied topics.
Note: if any of the specified topics does not exist yet on the
underlying brokers and these have the configuration for “auto
create topics”
enabled,
the remote kafka instance will create the yet missing topics
on the fly as a result of explicitly loading their metadata.
This is in contrast to other methods of this KafkaClient
which will silently filter out requests to
not-yet-loaded/not-yet-known topics and, thus, not cause
topics to be automatically created.
§Examples
let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_owned()));
let _ = client.load_metadata(&["my-topic"]).unwrap();
Returns the metadata for all loaded topics underlying this client (this might be more topics than specified right to this method call.)
sourcepub fn reset_metadata(&mut self)
pub fn reset_metadata(&mut self)
Clears metadata stored in the client. You must load metadata after this call if you want to use the client.
sourcepub fn fetch_offsets<T: AsRef<str>>(
&mut self,
topics: &[T],
offset: FetchOffset,
) -> Result<HashMap<String, Vec<PartitionOffset>>>
pub fn fetch_offsets<T: AsRef<str>>( &mut self, topics: &[T], offset: FetchOffset, ) -> Result<HashMap<String, Vec<PartitionOffset>>>
Fetch offsets for a list of topics
§Examples
use kafka::client::KafkaClient;
let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
client.load_metadata_all().unwrap();
let topics: Vec<String> = client.topics().names().map(ToOwned::to_owned).collect();
let offsets = client.fetch_offsets(&topics, kafka::client::FetchOffset::Latest).unwrap();
Returns a mapping of topic name to PartitionOffset
s for each
currently available partition of the corresponding topic.
sourcepub fn fetch_topic_offsets<T: AsRef<str>>(
&mut self,
topic: T,
offset: FetchOffset,
) -> Result<Vec<PartitionOffset>>
pub fn fetch_topic_offsets<T: AsRef<str>>( &mut self, topic: T, offset: FetchOffset, ) -> Result<Vec<PartitionOffset>>
Fetch offset for a single topic.
§Examples
use kafka::client::{KafkaClient, FetchOffset};
let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
client.load_metadata_all().unwrap();
let offsets = client.fetch_topic_offsets("my-topic", FetchOffset::Latest).unwrap();
Returns a vector of the offset data for each available partition.
See also KafkaClient::fetch_offsets
.
sourcepub fn fetch_messages<'a, I, J>(&mut self, input: I) -> Result<Vec<Response>>
pub fn fetch_messages<'a, I, J>(&mut self, input: I) -> Result<Vec<Response>>
Fetch messages from Kafka (multiple topic, partitions).
It takes a vector specifying the topic partitions and their
offsets as of which to fetch messages. Additionally, the
default “max fetch size per partition” can be explicitly
overridden if it is “defined” - this is, if max_bytes
is
greater than zero.
The result is exposed in a raw, complicated manner but allows for very efficient consumption possibilities. All of the data available through the returned fetch responses is bound to their lifetime as that data is merely a “view” into parts of the response structs. If you need to keep individual messages for a longer time than the whole fetch responses, you’ll need to make a copy of the message data.
-
This method transparently uncompresses messages (while Kafka might sent them in compressed format.)
-
This method ensures to skip messages with a lower offset than requested (while Kafka might for efficiency reasons sent messages with a lower offset.)
Note: before using this method consider using
kafka::consumer::Consumer
instead which provides an easier
to use API for the regular use-case of fetching messesage from
Kafka.
§Example
This example demonstrates iterating all fetched messages from
two topic partitions. From one partition we allow Kafka to
deliver to us the default number bytes as defined by
KafkaClient::set_fetch_max_bytes_per_partition
, from the
other partition we allow Kafka to deliver up to 1MiB of
messages.
use kafka::client::{KafkaClient, FetchPartition};
let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
client.load_metadata_all().unwrap();
let reqs = &[FetchPartition::new("my-topic", 0, 0),
FetchPartition::new("my-topic-2", 0, 0).with_max_bytes(1024*1024)];
let resps = client.fetch_messages(reqs).unwrap();
for resp in resps {
for t in resp.topics() {
for p in t.partitions() {
match p.data() {
Err(ref e) => {
println!("partition error: {}:{}: {}", t.topic(), p.partition(), e)
}
Ok(ref data) => {
println!("topic: {} / partition: {} / latest available message offset: {}",
t.topic(), p.partition(), data.highwatermark_offset());
for msg in data.messages() {
println!("topic: {} / partition: {} / message.offset: {} / message.len: {}",
t.topic(), p.partition(), msg.offset, msg.value.len());
}
}
}
}
}
}
See also kafka::consumer
.
See also KafkaClient::set_fetch_max_bytes_per_partition
.
sourcepub fn fetch_messages_for_partition<'a>(
&mut self,
req: &FetchPartition<'a>,
) -> Result<Vec<Response>>
pub fn fetch_messages_for_partition<'a>( &mut self, req: &FetchPartition<'a>, ) -> Result<Vec<Response>>
Fetch messages from a single kafka partition.
See KafkaClient::fetch_messages
.
sourcepub fn produce_messages<'a, 'b, I, J>(
&mut self,
acks: RequiredAcks,
ack_timeout: Duration,
messages: I,
) -> Result<Vec<ProduceConfirm>>
pub fn produce_messages<'a, 'b, I, J>( &mut self, acks: RequiredAcks, ack_timeout: Duration, messages: I, ) -> Result<Vec<ProduceConfirm>>
Send a message to Kafka
required_acks
- indicates how many acknowledgements the
servers should receive before responding to the request
ack_timeout
- provides a maximum time in milliseconds the
server can await the receipt of the number of acknowledgements
in required_acks
input
- the set of ProduceMessage
s to send
Note: Unlike the higher-level Producer
API, this method will
not automatically determine the partition to deliver the
message to. It will strictly try to send the message to the
specified partition.
Note: Trying to send messages to non-existing topics or non-existing partitions will result in an error.
§Example
use std::time::Duration;
use kafka::client::{KafkaClient, ProduceMessage, RequiredAcks};
let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
client.load_metadata_all().unwrap();
let req = vec![ProduceMessage::new("my-topic", 0, None, Some("a".as_bytes())),
ProduceMessage::new("my-topic-2", 0, None, Some("b".as_bytes()))];
let resp = client.produce_messages(RequiredAcks::One, Duration::from_millis(100), req);
println!("{:?}", resp);
The return value will contain a vector of topic, partition, offset and error if any OR error:Error.
sourcepub fn commit_offsets<'a, J, I>(
&mut self,
group: &str,
offsets: I,
) -> Result<()>
pub fn commit_offsets<'a, J, I>( &mut self, group: &str, offsets: I, ) -> Result<()>
Commit offset for a topic partitions on behalf of a consumer group.
§Examples
use kafka::client::{KafkaClient, CommitOffset};
let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
client.load_metadata_all().unwrap();
client.commit_offsets("my-group",
&[CommitOffset::new("my-topic", 0, 100),
CommitOffset::new("my-topic", 1, 99)])
.unwrap();
In this example, we commit the offset 100 for the topic
partition “my-topic:0” and 99 for the topic partition
“my-topic:1”. Once successfully committed, these can then be
retrieved using fetch_group_offsets
even from another
process or at much later point in time to resume comusing the
topic partitions as of these offsets.
sourcepub fn commit_offset(
&mut self,
group: &str,
topic: &str,
partition: i32,
offset: i64,
) -> Result<()>
pub fn commit_offset( &mut self, group: &str, topic: &str, partition: i32, offset: i64, ) -> Result<()>
Commit offset of a particular topic partition on behalf of a consumer group.
§Examples
use kafka::client::KafkaClient;
let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
client.load_metadata_all().unwrap();
client.commit_offset("my-group", "my-topic", 0, 100).unwrap();
See also KafkaClient::commit_offsets
.
sourcepub fn fetch_group_offsets<'a, J, I>(
&mut self,
group: &str,
partitions: I,
) -> Result<HashMap<String, Vec<PartitionOffset>>>
pub fn fetch_group_offsets<'a, J, I>( &mut self, group: &str, partitions: I, ) -> Result<HashMap<String, Vec<PartitionOffset>>>
Fetch offset for a specified list of topic partitions of a consumer group
§Examples
use kafka::client::{KafkaClient, FetchGroupOffset};
let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
client.load_metadata_all().unwrap();
let offsets =
client.fetch_group_offsets("my-group",
&[FetchGroupOffset::new("my-topic", 0),
FetchGroupOffset::new("my-topic", 1)])
.unwrap();
See also KafkaClient::fetch_group_topic_offset
.
sourcepub fn fetch_group_topic_offset(
&mut self,
group: &str,
topic: &str,
) -> Result<Vec<PartitionOffset>>
pub fn fetch_group_topic_offset( &mut self, group: &str, topic: &str, ) -> Result<Vec<PartitionOffset>>
Fetch offset for all partitions of a particular topic of a consumer group
§Examples
use kafka::client::KafkaClient;
let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
client.load_metadata_all().unwrap();
let offsets = client.fetch_group_topic_offset("my-group", "my-topic").unwrap();