wasmcloud_runtime/component/messaging/
v0_3.rs

1use core::any::Any;
2use core::future::Future;
3
4use anyhow::{bail, Context as _};
5use async_trait::async_trait;
6use tracing::{info_span, instrument, Instrument as _};
7use tracing_opentelemetry::OpenTelemetrySpanExt as _;
8use wasmtime::component::Resource;
9use wasmtime::Store;
10
11use crate::capability::messaging0_3_0::types::{Error, Metadata, Topic};
12use crate::capability::messaging0_3_0::{producer, request_reply, types};
13use crate::capability::wrpc;
14use crate::component::{Ctx, Handler};
15
16pub mod bindings {
17    wasmtime::component::bindgen!({
18        world: "messaging-handler",
19        imports: { default: async | trappable | tracing },
20        exports: { default: async | trappable | tracing },
21        with: {
22           "wasmcloud:messaging/types": crate::capability::messaging0_3_0::types,
23        },
24    });
25}
26
27#[instrument(level = "debug", skip_all)]
28pub(crate) async fn handle_message<H>(
29    pre: bindings::MessagingHandlerPre<Ctx<H>>,
30    mut store: &mut Store<Ctx<H>>,
31    msg: wrpc::wasmcloud::messaging0_2_0::types::BrokerMessage,
32) -> anyhow::Result<Result<(), String>>
33where
34    H: Handler,
35{
36    let call_handle_message = info_span!("call_handle_message");
37    store.data_mut().parent_context = Some(call_handle_message.context());
38    let bindings = pre.instantiate_async(&mut store).await?;
39    let msg = store
40        .data_mut()
41        .table
42        .push(Message::Wrpc(msg))
43        .context("failed to push message to table")?;
44    bindings
45        .wasmcloud_messaging0_3_0_incoming_handler()
46        .call_handle(&mut store, msg)
47        .instrument(call_handle_message)
48        .await
49        .context("failed to call `wasmcloud:messaging/incoming-handler@0.3.0#handle`")
50        .map(|err| err.map_err(|err| err.to_string()))
51}
52
53/// Options for a request/reply operation.
54#[derive(Debug, Default)]
55pub struct RequestOptions {
56    /// The maximum amount of time to wait for a response. If the timeout value is not set, then
57    /// the request/reply operation will block until a message is received in response.
58    pub timeout_ms: Option<u32>,
59
60    /// The maximum number of replies to expect before returning.
61    pub expected_replies: Option<u32>,
62}
63
64#[async_trait]
65/// A message with a binary payload and additional information
66pub trait HostMessage {
67    /// The topic/subject/channel this message was received on
68    async fn topic(&self) -> wasmtime::Result<Option<Topic>>;
69    /// An optional content-type describing the format of the data in the message. This is
70    /// sometimes described as the "format" type
71    async fn content_type(&self) -> wasmtime::Result<Option<String>>;
72    /// Set the content-type describing the format of the data in the message. This is
73    /// sometimes described as the "format" type
74    async fn set_content_type(&mut self, content_type: String) -> wasmtime::Result<()>;
75    /// An opaque blob of data
76    async fn data(&self) -> wasmtime::Result<Vec<u8>>;
77    /// Set the opaque blob of data for this message, discarding the old value
78    async fn set_data(&mut self, buf: Vec<u8>) -> wasmtime::Result<()>;
79    /// Optional metadata (also called headers or attributes in some systems) attached to the
80    /// message. This metadata is simply decoration and should not be interpreted by a host
81    /// to ensure portability across different implementors (e.g., Kafka -> NATS, etc.).
82    async fn metadata(&self) -> wasmtime::Result<Option<Metadata>>;
83    /// Add a new key-value pair to the metadata, overwriting any existing value for the same key
84    async fn add_metadata(&mut self, key: String, value: String) -> wasmtime::Result<()>;
85    /// Set the metadata
86    async fn set_metadata(&mut self, meta: Metadata) -> wasmtime::Result<()>;
87    /// Remove a key-value pair from the metadata
88    async fn remove_metadata(&mut self, key: String) -> wasmtime::Result<()>;
89
90    /// Return [Self] as [Any]
91    fn as_any(&self) -> &dyn Any;
92
93    /// Return [Self] as [Any]
94    fn into_any(self: Box<Self>) -> Box<dyn Any>;
95}
96
97#[async_trait]
98/// A connection to a message-exchange service (e.g., buffer, broker, etc.).
99pub trait Client {
100    /// Disconnect from a message-exchange service (e.g., buffer, broker, etc.).
101    async fn disconnect(&mut self) -> wasmtime::Result<Result<(), Error>>;
102
103    /// Return [Self] as [Any]
104    fn as_any(&self) -> &dyn Any;
105}
106
107/// `wasmcloud:messaging` abstraction
108pub trait Messaging {
109    /// Establish connection to a message-exchange service (e.g., buffer, broker, etc.).
110    fn connect(
111        &self,
112        name: String,
113    ) -> impl Future<Output = wasmtime::Result<Result<Box<dyn Client + Send + Sync>, Error>>> + Send;
114
115    /// Sends the message using the given client.
116    fn send(
117        &self,
118        client: &(dyn Client + Send + Sync),
119        topic: Topic,
120        message: Message,
121    ) -> impl Future<Output = wasmtime::Result<Result<(), Error>>> + Send;
122
123    /// Performs a blocking request/reply operation with an optional set of request options.
124    ///
125    /// The behavior of this function is largely dependent on the options given to the function.
126    /// If no options are provided, then the request/reply operation will block until a single
127    /// message is received in response. If a timeout is provided, then the request/reply operation
128    /// will block for the specified amount of time before returning an error if no messages were
129    /// received (or the list of messages that were received). If both a timeout and an expected
130    /// number of replies are provided, the function should return when either condition is met
131    /// (whichever comes first)—e.g., (1) if no replies were received within the timeout return an
132    /// error, (2) if the maximum expected number of replies were received before timeout, return
133    /// the list of messages, or (3) if the timeout is reached before the expected number of replies,
134    /// return the list of messages received up to that point.
135    fn request(
136        &self,
137        client: &(dyn Client + Send + Sync),
138        topic: Topic,
139        message: &Message,
140        options: Option<RequestOptions>,
141    ) -> impl Future<Output = wasmtime::Result<Result<Vec<Box<dyn HostMessage + Send + Sync>>, Error>>>
142           + Send;
143
144    /// Replies to the given message with the given response message. The details of which topic
145    /// the message is sent to is up to the implementation. This allows for reply-to details to be
146    /// handled in the best way possible for the underlying messaging system.
147    ///
148    /// Please note that this reply functionality is different than something like HTTP because there
149    /// are several use cases in which a reply might not be required for every message (so this would
150    /// be a noop). There are also cases when you might want to reply and then continue processing.
151    /// Additionally, you might want to reply to a message several times (such as providing an
152    /// update). So this function is allowed to be called multiple times, unlike something like HTTP
153    /// where the reply is sent and the connection is closed.
154    fn reply(
155        &self,
156        reply_to: &Message,
157        message: Message,
158    ) -> impl Future<Output = wasmtime::Result<Result<(), Error>>> + Send;
159}
160
161/// A message originating from the guest
162#[derive(Debug, Default)]
163pub struct GuestMessage {
164    /// An optional content-type describing the format of the data in the message. This is
165    /// sometimes described as the "format" type
166    pub content_type: Option<String>,
167    /// An opaque blob of data
168    pub data: Vec<u8>,
169    /// Optional metadata (also called headers or attributes in some systems) attached to the
170    /// message. This metadata is simply decoration and should not be interpreted by a host
171    /// to ensure portability across different implementors (e.g., Kafka -> NATS, etc.).
172    pub metadata: Option<Vec<(String, String)>>,
173}
174
175pub enum Message {
176    Host(Box<dyn HostMessage + Send + Sync>),
177    Wrpc(wrpc::wasmcloud::messaging0_2_0::types::BrokerMessage),
178    Guest(GuestMessage),
179}
180
181impl<H> types::Host for Ctx<H> where H: Handler {}
182
183impl<H> types::HostClient for Ctx<H>
184where
185    H: Handler,
186{
187    #[instrument(level = "debug", skip_all)]
188    async fn connect(
189        &mut self,
190        name: String,
191    ) -> wasmtime::Result<Result<Resource<Box<dyn Client + Send + Sync>>, Error>> {
192        self.attach_parent_context();
193        match self.handler.connect(name).await? {
194            Ok(client) => {
195                let client = self
196                    .table
197                    .push(client)
198                    .context("failed to push client to table")?;
199                Ok(Ok(client))
200            }
201            Err(err) => Ok(Err(err)),
202        }
203    }
204
205    #[instrument(level = "debug", skip_all)]
206    async fn disconnect(
207        &mut self,
208        client: Resource<Box<dyn Client + Send + Sync>>,
209    ) -> wasmtime::Result<Result<(), Error>> {
210        self.attach_parent_context();
211        let client = self
212            .table
213            .get_mut(&client)
214            .context("failed to get client")?;
215        client.disconnect().await
216    }
217
218    #[instrument(level = "debug", skip_all)]
219    async fn drop(
220        &mut self,
221        client: Resource<Box<dyn Client + Send + Sync>>,
222    ) -> wasmtime::Result<()> {
223        self.attach_parent_context();
224        self.table
225            .delete(client)
226            .context("failed to delete client")?;
227        Ok(())
228    }
229}
230
231impl<H> types::HostMessage for Ctx<H>
232where
233    H: Handler,
234{
235    #[instrument(level = "debug", skip_all)]
236    async fn new(&mut self, data: Vec<u8>) -> wasmtime::Result<Resource<Message>> {
237        self.attach_parent_context();
238        self.table
239            .push(Message::Guest(GuestMessage {
240                data,
241                ..Default::default()
242            }))
243            .context("failed to push message to table")
244    }
245
246    #[instrument(level = "debug", skip_all)]
247    async fn topic(&mut self, msg: Resource<Message>) -> wasmtime::Result<Option<Topic>> {
248        self.attach_parent_context();
249        let msg = self.table.get(&msg).context("failed to get message")?;
250        match msg {
251            Message::Host(msg) => msg.topic().await,
252            Message::Wrpc(msg) => Ok(Some(msg.subject.clone())),
253            Message::Guest(GuestMessage { .. }) => Ok(None),
254        }
255    }
256
257    #[instrument(level = "debug", skip_all)]
258    async fn content_type(&mut self, msg: Resource<Message>) -> wasmtime::Result<Option<String>> {
259        self.attach_parent_context();
260        let msg = self.table.get(&msg).context("failed to get message")?;
261        match msg {
262            Message::Host(msg) => msg.content_type().await,
263            Message::Wrpc(..) => Ok(None),
264            Message::Guest(GuestMessage { content_type, .. }) => Ok(content_type.clone()),
265        }
266    }
267
268    #[instrument(level = "debug", skip_all)]
269    async fn set_content_type(
270        &mut self,
271        msg: Resource<Message>,
272        content_type: String,
273    ) -> wasmtime::Result<()> {
274        self.attach_parent_context();
275        let msg = self.table.get_mut(&msg).context("failed to get message")?;
276        match msg {
277            Message::Host(msg) => msg.set_content_type(content_type).await,
278            Message::Wrpc(..) => bail!("content-type not currently supported by wRPC messages"),
279            Message::Guest(msg) => {
280                msg.content_type = Some(content_type);
281                Ok(())
282            }
283        }
284    }
285
286    #[instrument(level = "debug", skip_all)]
287    async fn data(&mut self, msg: Resource<Message>) -> wasmtime::Result<Vec<u8>> {
288        self.attach_parent_context();
289        let msg = self.table.get(&msg).context("failed to get message")?;
290        match msg {
291            Message::Host(msg) => msg.data().await,
292            Message::Wrpc(msg) => Ok(msg.body.to_vec()),
293            Message::Guest(GuestMessage { data, .. }) => Ok(data.clone()),
294        }
295    }
296
297    #[instrument(level = "debug", skip_all)]
298    async fn set_data(&mut self, msg: Resource<Message>, buf: Vec<u8>) -> wasmtime::Result<()> {
299        self.attach_parent_context();
300        let msg = self.table.get_mut(&msg).context("failed to get message")?;
301        match msg {
302            Message::Host(msg) => msg.set_data(buf).await,
303            Message::Wrpc(msg) => {
304                msg.body = buf.into();
305                Ok(())
306            }
307            Message::Guest(GuestMessage { data, .. }) => {
308                *data = buf;
309                Ok(())
310            }
311        }
312    }
313
314    #[instrument(level = "debug", skip_all)]
315    async fn metadata(&mut self, msg: Resource<Message>) -> wasmtime::Result<Option<Metadata>> {
316        self.attach_parent_context();
317        let msg = self.table.get(&msg).context("failed to get message")?;
318        match msg {
319            Message::Host(msg) => msg.metadata().await,
320            Message::Wrpc(..) => Ok(None),
321            Message::Guest(GuestMessage { metadata, .. }) => Ok(metadata.clone()),
322        }
323    }
324
325    #[instrument(level = "debug", skip_all)]
326    async fn add_metadata(
327        &mut self,
328        msg: Resource<Message>,
329        key: String,
330        value: String,
331    ) -> wasmtime::Result<()> {
332        self.attach_parent_context();
333        let msg = self.table.get_mut(&msg).context("failed to get message")?;
334        match msg {
335            Message::Host(msg) => msg.add_metadata(key, value).await,
336            Message::Wrpc(..) => bail!("metadata not currently supported by wRPC messages"),
337            Message::Guest(GuestMessage {
338                metadata: Some(metadata),
339                ..
340            }) => {
341                metadata.push((key, value));
342                Ok(())
343            }
344            Message::Guest(GuestMessage { metadata, .. }) => {
345                *metadata = Some(vec![(key, value)]);
346                Ok(())
347            }
348        }
349    }
350
351    #[instrument(level = "debug", skip_all)]
352    async fn set_metadata(
353        &mut self,
354        msg: Resource<Message>,
355        meta: Metadata,
356    ) -> wasmtime::Result<()> {
357        self.attach_parent_context();
358        let msg = self.table.get_mut(&msg).context("failed to get message")?;
359        match msg {
360            Message::Host(msg) => msg.set_metadata(meta).await,
361            Message::Wrpc(..) if meta.is_empty() => Ok(()),
362            Message::Wrpc(..) => bail!("metadata not currently supported by wRPC messages"),
363            Message::Guest(GuestMessage { metadata, .. }) => {
364                *metadata = Some(meta);
365                Ok(())
366            }
367        }
368    }
369
370    #[instrument(level = "debug", skip_all)]
371    async fn remove_metadata(
372        &mut self,
373        msg: Resource<Message>,
374        key: String,
375    ) -> wasmtime::Result<()> {
376        self.attach_parent_context();
377        let msg = self.table.get_mut(&msg).context("failed to get message")?;
378        match msg {
379            Message::Host(msg) => msg.remove_metadata(key).await,
380            Message::Guest(GuestMessage {
381                metadata: Some(metadata),
382                ..
383            }) => {
384                metadata.retain(|(k, _)| *k != key);
385                Ok(())
386            }
387            Message::Guest(..) | Message::Wrpc(..) => Ok(()),
388        }
389    }
390
391    #[instrument(level = "debug", skip_all)]
392    async fn drop(&mut self, rep: Resource<Message>) -> wasmtime::Result<()> {
393        self.attach_parent_context();
394        self.table.delete(rep).context("failed to delete message")?;
395        Ok(())
396    }
397}
398
399impl<H> producer::Host for Ctx<H>
400where
401    H: Handler,
402{
403    #[instrument(level = "debug", skip_all)]
404    async fn send(
405        &mut self,
406        client: Resource<Box<dyn Client + Send + Sync>>,
407        topic: Topic,
408        message: Resource<Message>,
409    ) -> wasmtime::Result<Result<(), Error>> {
410        self.attach_parent_context();
411        let message = self
412            .table
413            .delete(message)
414            .context("failed to delete outgoing message")?;
415        let client = self.table.get(&client).context("failed to get client")?;
416        self.handler.send(client.as_ref(), topic, message).await
417    }
418}
419
420impl<H> request_reply::Host for Ctx<H>
421where
422    H: Handler,
423{
424    async fn request(
425        &mut self,
426        client: Resource<Box<dyn Client + Send + Sync>>,
427        topic: Topic,
428        message: Resource<Message>,
429        options: Option<Resource<RequestOptions>>,
430    ) -> wasmtime::Result<Result<Vec<Resource<Message>>, Error>> {
431        self.attach_parent_context();
432        let options = options
433            .map(|options| self.table.delete(options))
434            .transpose()
435            .context("failed to delete request options")?;
436        let client = self.table.get(&client).context("failed to get client")?;
437        let message = self
438            .table
439            .get(&message)
440            .context("failed to get outgoing message")?;
441        match Messaging::request(&self.handler, client.as_ref(), topic, message, options).await? {
442            Ok(msgs) => {
443                let msgs = msgs
444                    .into_iter()
445                    .map(|msg| {
446                        self.table
447                            .push(Message::Host(msg))
448                            .context("failed to push message to table")
449                    })
450                    .collect::<wasmtime::Result<Vec<_>>>()?;
451                Ok(Ok(msgs))
452            }
453            Err(err) => Ok(Err(err)),
454        }
455    }
456
457    async fn reply(
458        &mut self,
459        reply_to: Resource<Message>,
460        message: Resource<Message>,
461    ) -> wasmtime::Result<Result<(), Error>> {
462        self.attach_parent_context();
463        let message = self
464            .table
465            .delete(message)
466            .context("failed to delete outgoing message")?;
467        let reply_to = self
468            .table
469            .get(&reply_to)
470            .context("failed to get incoming message")?;
471        self.handler.reply(reply_to, message).await
472    }
473}
474
475impl<H> request_reply::HostRequestOptions for Ctx<H>
476where
477    H: Handler,
478{
479    async fn new(&mut self) -> wasmtime::Result<Resource<RequestOptions>> {
480        self.attach_parent_context();
481        self.table
482            .push(RequestOptions::default())
483            .context("failed to push request options to table")
484    }
485
486    async fn set_timeout_ms(
487        &mut self,
488        opts: Resource<RequestOptions>,
489        timeout_ms: u32,
490    ) -> wasmtime::Result<()> {
491        self.attach_parent_context();
492        let opts = self
493            .table
494            .get_mut(&opts)
495            .context("failed to get request options")?;
496        opts.timeout_ms = Some(timeout_ms);
497        Ok(())
498    }
499
500    async fn set_expected_replies(
501        &mut self,
502        opts: Resource<RequestOptions>,
503        expected_replies: u32,
504    ) -> wasmtime::Result<()> {
505        self.attach_parent_context();
506        let opts = self
507            .table
508            .get_mut(&opts)
509            .context("failed to get request options")?;
510        opts.expected_replies = Some(expected_replies);
511        Ok(())
512    }
513
514    async fn drop(&mut self, opts: Resource<RequestOptions>) -> wasmtime::Result<()> {
515        self.attach_parent_context();
516        self.table
517            .delete(opts)
518            .context("failed to delete request options")?;
519        Ok(())
520    }
521}