wasmcloud_provider_messaging_kafka/
client.rs

1use anyhow::{Context as _, Result};
2use futures::Stream;
3use kafka::client::KafkaClient;
4use kafka::consumer::{Builder as ConsumerBuilder, Consumer, Message};
5use tokio::sync::oneshot::Sender;
6use tokio_stream::wrappers::UnboundedReceiverStream;
7use tracing::{error, trace};
8
9/// An Async Kafka Client built on the [`tokio`] runtime
10pub(crate) struct AsyncKafkaClient(pub(crate) KafkaClient);
11
12impl AsyncKafkaClient {
13    /// Build an [`AsyncKafkaClient`] for a list of hosts
14    pub async fn from_hosts(hosts: Vec<String>) -> Result<Self> {
15        Self::from_client(KafkaClient::new(hosts)).await
16    }
17
18    /// Build an [`AsyncKafkaClient`] from an existing [`KafkaClient`]
19    pub async fn from_client(mut kc: KafkaClient) -> Result<Self> {
20        let kc = tokio::task::spawn_blocking(move || {
21            kc.load_metadata_all().context("failed to load metadata")?;
22            Ok::<KafkaClient, anyhow::Error>(kc)
23        })
24        .await
25        .context("failed to perform spawn blocking")?
26        .context("failed to load metadata")?;
27        Ok(Self(kc))
28    }
29}
30
31/// An wrapper for easily using a [`kafka::consumer::Consumer`] asynchronously
32pub(crate) struct AsyncKafkaConsumer(Consumer);
33
34/// A fetched message from a remote Kafka broker for a particular topic & partition.
35#[derive(Debug, Clone)]
36#[allow(dead_code)]
37pub(crate) struct KafkaMessage {
38    /// The offset at which this message resides in the remote kafka
39    /// broker topic partition.
40    pub offset: i64,
41
42    /// The "key" data of this message.  Empty if there is no such
43    /// data for this message.
44    pub key: Vec<u8>,
45
46    /// The value data of this message.  Empty if there is no such
47    /// data for this message.
48    pub value: Vec<u8>,
49}
50
51impl<'a> From<&Message<'a>> for KafkaMessage {
52    fn from(Message { offset, key, value }: &Message<'a>) -> Self {
53        Self {
54            offset: *offset,
55            key: Vec::<u8>::from(*key),
56            value: Vec::<u8>::from(*value),
57        }
58    }
59}
60
61impl AsyncKafkaConsumer {
62    /// Build from an [`AsyncKafkaClient`] which is guaranteed to have had metadata loaded at least once (during construction).
63    pub async fn from_async_client(
64        ac: AsyncKafkaClient,
65        builder_fn: impl FnOnce(ConsumerBuilder) -> ConsumerBuilder,
66    ) -> Result<Self> {
67        let builder = builder_fn(Consumer::from_client(ac.0));
68        let consumer = builder.create().context("failed to create consumer")?;
69        Ok(Self(consumer))
70    }
71
72    /// Produce an unending stream of messages based on the inner consumer, with a mechanism for stopping
73    pub async fn messages(self) -> Result<(impl Stream<Item = KafkaMessage>, Sender<()>)> {
74        let mut consumer = self.0;
75        let (stop_tx, mut stop_rx) = tokio::sync::oneshot::channel::<()>();
76        let (msg_tx, msg_rx) = tokio::sync::mpsc::unbounded_channel();
77
78        // Listen forever for new messages with the consumer
79        tokio::task::spawn_blocking(move || {
80            loop {
81                match consumer.poll() {
82                    // If we received message sets, process them
83                    Ok(message_sets) => {
84                        for message_set in message_sets.iter() {
85                            for message in message_set.messages() {
86                                trace!(
87                                    topic = message_set.topic(),
88                                    partition = message_set.partition(),
89                                    offset = message.offset,
90                                    "received message",
91                                );
92                                if let Err(e) = msg_tx
93                                    .send(KafkaMessage::from(message))
94                                    .context("failed to send kafka message")
95                                {
96                                    error!("failed to send kafka message: {e}");
97                                }
98                            }
99                            if let Err(e) = consumer.consume_messageset(message_set) {
100                                error!("failed to consume message set: {e}");
101                            }
102                        }
103                        // Commit all consumed stuff, but only if we're in a group
104                        if !consumer.group().is_empty() {
105                            if let Err(e) = consumer.commit_consumed() {
106                                error!("failed to commit consumed messages: {e}");
107                            }
108                        }
109                    }
110                    Err(e) => {
111                        error!("failed to poll: {e}");
112                    }
113                }
114
115                // If we've been told to stop, then we should stop
116                if let Ok(()) = stop_rx.try_recv() {
117                    trace!("received stop, shutting down consuming thread...");
118                    return Ok(()) as Result<()>;
119                }
120            }
121        });
122
123        Ok((UnboundedReceiverStream::new(msg_rx), stop_tx))
124    }
125}