wasmcloud_provider_messaging_kafka/client.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
use anyhow::{Context as _, Result};
use futures::Stream;
use kafka::client::KafkaClient;
use kafka::consumer::{Builder as ConsumerBuilder, Consumer, Message};
use tokio::sync::oneshot::Sender;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{error, trace};
/// An Async Kafka Client built on the [`tokio`] runtime
pub(crate) struct AsyncKafkaClient(pub(crate) KafkaClient);
impl AsyncKafkaClient {
/// Build an [`AsyncKafkaClient`] for a list of hosts
pub async fn from_hosts(hosts: Vec<String>) -> Result<Self> {
Self::from_client(KafkaClient::new(hosts)).await
}
/// Build an [`AsyncKafkaClient`] from an existing [`KafkaClient`]
pub async fn from_client(mut kc: KafkaClient) -> Result<Self> {
let kc = tokio::task::spawn_blocking(move || {
kc.load_metadata_all().context("failed to load metadata")?;
Ok::<KafkaClient, anyhow::Error>(kc)
})
.await
.context("failed to perform spawn blocking")?
.context("failed to load metadata")?;
Ok(Self(kc))
}
}
/// An wrapper for easily using a [`kafka::consumer::Consumer`] asynchronously
pub(crate) struct AsyncKafkaConsumer(Consumer);
/// A fetched message from a remote Kafka broker for a particular topic & partition.
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub(crate) struct KafkaMessage {
/// The offset at which this message resides in the remote kafka
/// broker topic partition.
pub offset: i64,
/// The "key" data of this message. Empty if there is no such
/// data for this message.
pub key: Vec<u8>,
/// The value data of this message. Empty if there is no such
/// data for this message.
pub value: Vec<u8>,
}
impl<'a> From<&Message<'a>> for KafkaMessage {
fn from(Message { offset, key, value }: &Message<'a>) -> Self {
Self {
offset: *offset,
key: Vec::<u8>::from(*key),
value: Vec::<u8>::from(*value),
}
}
}
impl AsyncKafkaConsumer {
/// Build from an [`AsyncKafkaClient`] which is guaranteed to have had metadata loaded at least once (during construction).
pub async fn from_async_client(
ac: AsyncKafkaClient,
builder_fn: impl FnOnce(ConsumerBuilder) -> ConsumerBuilder,
) -> Result<Self> {
let builder = builder_fn(Consumer::from_client(ac.0));
let consumer = builder.create().context("failed to create consumer")?;
Ok(Self(consumer))
}
/// Produce an unending stream of messages based on the inner consumer, with a mechanism for stopping
pub async fn messages(self) -> Result<(impl Stream<Item = KafkaMessage>, Sender<()>)> {
let mut consumer = self.0;
let (stop_tx, mut stop_rx) = tokio::sync::oneshot::channel::<()>();
let (msg_tx, msg_rx) = tokio::sync::mpsc::unbounded_channel();
// Listen forever for new messages with the consumer
tokio::task::spawn_blocking(move || {
loop {
match consumer.poll() {
// If we received message sets, process them
Ok(message_sets) => {
for message_set in message_sets.iter() {
for message in message_set.messages() {
trace!(
topic = message_set.topic(),
partition = message_set.partition(),
offset = message.offset,
"received message",
);
if let Err(e) = msg_tx
.send(KafkaMessage::from(message))
.context("failed to send kafka message")
{
error!("failed to send kafka message: {e}");
}
}
if let Err(e) = consumer.consume_messageset(message_set) {
error!("failed to consume message set: {e}");
}
}
// Commit all consumed stuff, but only if we're in a group
if !consumer.group().is_empty() {
if let Err(e) = consumer.commit_consumed() {
error!("failed to commit consumed messages: {e}");
}
}
}
Err(e) => {
error!("failed to poll: {e}");
}
}
// If we've been told to stop, then we should stop
if let Ok(()) = stop_rx.try_recv() {
trace!("received stop, shutting down consuming thread...");
return Ok(()) as Result<()>;
}
}
});
Ok((UnboundedReceiverStream::new(msg_rx), stop_tx))
}
}