Expand description
Kafka Consumer - A higher-level API for consuming kafka topics.
A consumer for Kafka topics on behalf of a specified group providing help in offset management. The consumer requires at least one topic for consumption and allows consuming multiple topics at the same time. Further, clients can restrict the consumer to only specific topic partitions as demonstrated in the following example.
§Example
use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
let mut consumer =
Consumer::from_hosts(vec!("localhost:9092".to_owned()))
.with_topic_partitions("my-topic".to_owned(), &[0, 1])
.with_fallback_offset(FetchOffset::Earliest)
.with_group("my-group".to_owned())
.with_offset_storage(Some(GroupOffsetStorage::Kafka))
.create()
.unwrap();
loop {
for ms in consumer.poll().unwrap().iter() {
for m in ms.messages() {
println!("{:?}", m);
}
consumer.consume_messageset(ms);
}
consumer.commit_consumed().unwrap();
}
Please refer to the documentation of the individual “with” methods used to set up the consumer. These contain further information or links to such.
A call to .poll()
on the consumer will ask for the next
available “chunk of data” for the client code to process. The
returned data are MessageSet
s. There is at most one for each partition
of the consumed topics. Individual messages are embedded in the
retrieved messagesets and can be processed using the messages()
iterator. Due to this embedding, an individual message’s lifetime
is bound to the MessageSet
it is part of. Typically, client
code accesses the raw data/bytes, parses it into custom data types,
and passes that along for further processing within the application.
Although inconvenient, this helps in reducing the number of
allocations within the pipeline of processing incoming messages.
If the consumer is configured for a non-empty group, it helps in
keeping track of already consumed messages by maintaining a map of
the consumed offsets. Messages can be told “consumed” either
through consume_message
or consume_messages
methods. Once
these consumed messages are committed to Kafka using
commit_consumed
, the consumer will start fetching messages from
here even after restart. Since committing is a certain overhead,
it is up to the client to decide the frequency of the commits.
The consumer will not commit any messages to Kafka
automatically.
The configuration of a group is optional. If the consumer has no group configured, it will behave as if it had one, only that committing consumed message offsets resolves into a void operation.
Re-exports§
pub use crate::client::fetch::Message;
pub use crate::client::FetchOffset;
pub use crate::client::GroupOffsetStorage;
Structs§
- A Kafka Consumer builder easing the process of setting up various configuration settings.
- The Kafka Consumer
- A set of messages successfully retrieved from a specific topic partition.
- Messages retrieved from kafka in one fetch request. This is a concatenation of blocks of messages successfully retrieved from the consumed topic partitions. Each such partitions is guaranteed to be present at most once in this structure.
- An iterator over the consumed topic partition message sets.
Constants§
- The default value for
Builder::with_fallback_offset
. - The default value for
Builder::with_retry_max_bytes_limit
.