wasmcloud_provider_messaging_kafka/
client.rs1use anyhow::{Context as _, Result};
2use futures::Stream;
3use kafka::client::KafkaClient;
4use kafka::consumer::{Builder as ConsumerBuilder, Consumer, Message};
5use tokio::sync::oneshot::Sender;
6use tokio_stream::wrappers::UnboundedReceiverStream;
7use tracing::{error, trace};
8
9pub(crate) struct AsyncKafkaClient(pub(crate) KafkaClient);
11
12impl AsyncKafkaClient {
13 pub async fn from_hosts(hosts: Vec<String>) -> Result<Self> {
15 Self::from_client(KafkaClient::new(hosts)).await
16 }
17
18 pub async fn from_client(mut kc: KafkaClient) -> Result<Self> {
20 let kc = tokio::task::spawn_blocking(move || {
21 kc.load_metadata_all().context("failed to load metadata")?;
22 Ok::<KafkaClient, anyhow::Error>(kc)
23 })
24 .await
25 .context("failed to perform spawn blocking")?
26 .context("failed to load metadata")?;
27 Ok(Self(kc))
28 }
29}
30
31pub(crate) struct AsyncKafkaConsumer(Consumer);
33
34#[derive(Debug, Clone)]
36#[allow(dead_code)]
37pub(crate) struct KafkaMessage {
38 pub offset: i64,
41
42 pub key: Vec<u8>,
45
46 pub value: Vec<u8>,
49}
50
51impl<'a> From<&Message<'a>> for KafkaMessage {
52 fn from(Message { offset, key, value }: &Message<'a>) -> Self {
53 Self {
54 offset: *offset,
55 key: Vec::<u8>::from(*key),
56 value: Vec::<u8>::from(*value),
57 }
58 }
59}
60
61impl AsyncKafkaConsumer {
62 pub async fn from_async_client(
64 ac: AsyncKafkaClient,
65 builder_fn: impl FnOnce(ConsumerBuilder) -> ConsumerBuilder,
66 ) -> Result<Self> {
67 let builder = builder_fn(Consumer::from_client(ac.0));
68 let consumer = builder.create().context("failed to create consumer")?;
69 Ok(Self(consumer))
70 }
71
72 pub async fn messages(self) -> Result<(impl Stream<Item = KafkaMessage>, Sender<()>)> {
74 let mut consumer = self.0;
75 let (stop_tx, mut stop_rx) = tokio::sync::oneshot::channel::<()>();
76 let (msg_tx, msg_rx) = tokio::sync::mpsc::unbounded_channel();
77
78 tokio::task::spawn_blocking(move || {
80 loop {
81 match consumer.poll() {
82 Ok(message_sets) => {
84 for message_set in message_sets.iter() {
85 for message in message_set.messages() {
86 trace!(
87 topic = message_set.topic(),
88 partition = message_set.partition(),
89 offset = message.offset,
90 "received message",
91 );
92 if let Err(e) = msg_tx
93 .send(KafkaMessage::from(message))
94 .context("failed to send kafka message")
95 {
96 error!("failed to send kafka message: {e}");
97 }
98 }
99 if let Err(e) = consumer.consume_messageset(message_set) {
100 error!("failed to consume message set: {e}");
101 }
102 }
103 if !consumer.group().is_empty() {
105 if let Err(e) = consumer.commit_consumed() {
106 error!("failed to commit consumed messages: {e}");
107 }
108 }
109 }
110 Err(e) => {
111 error!("failed to poll: {e}");
112 }
113 }
114
115 if let Ok(()) = stop_rx.try_recv() {
117 trace!("received stop, shutting down consuming thread...");
118 return Ok(()) as Result<()>;
119 }
120 }
121 });
122
123 Ok((UnboundedReceiverStream::new(msg_rx), stop_tx))
124 }
125}