wasmcloud_runtime/component/messaging/
v0_2.rs

1use core::future::Future;
2
3use anyhow::Context as _;
4use tracing::{info_span, instrument, Instrument as _};
5use tracing_opentelemetry::OpenTelemetrySpanExt as _;
6use wasmtime::Store;
7
8use crate::capability::messaging0_2_0::{consumer, types};
9use crate::capability::wrpc;
10use crate::component::{Ctx, Handler};
11
12pub mod bindings {
13    wasmtime::component::bindgen!({
14        world: "messaging-handler-oh-two",
15        imports: { default: async | trappable | tracing },
16        exports: { default: async | trappable | tracing },
17        with: {
18           "wasmcloud:messaging/types": crate::capability::messaging0_2_0::types,
19        },
20    });
21}
22
23/// `wasmcloud:messaging` abstraction
24pub trait Messaging {
25    /// Handle `wasmcloud:messaging/request`
26    fn request(
27        &self,
28        subject: String,
29        body: Vec<u8>,
30        timeout_ms: u32,
31    ) -> impl Future<Output = anyhow::Result<Result<types::BrokerMessage, String>>> + Send;
32
33    /// Handle `wasmcloud:messaging/publish`
34    fn publish(
35        &self,
36        msg: types::BrokerMessage,
37    ) -> impl Future<Output = anyhow::Result<Result<(), String>>> + Send;
38}
39
40impl<H> types::Host for Ctx<H> where H: Handler {}
41
42impl<H> consumer::Host for Ctx<H>
43where
44    H: Handler,
45{
46    #[instrument(level = "debug", skip_all)]
47    async fn request(
48        &mut self,
49        subject: String,
50        body: Vec<u8>,
51        timeout_ms: u32,
52    ) -> anyhow::Result<Result<types::BrokerMessage, String>> {
53        self.attach_parent_context();
54        Messaging::request(&self.handler, subject, body, timeout_ms).await
55    }
56
57    #[instrument(level = "debug", skip_all)]
58    async fn publish(&mut self, msg: types::BrokerMessage) -> anyhow::Result<Result<(), String>> {
59        self.attach_parent_context();
60        self.handler.publish(msg).await
61    }
62}
63
64#[instrument(level = "debug", skip_all)]
65pub(crate) async fn handle_message<H>(
66    pre: bindings::MessagingHandlerOhTwoPre<Ctx<H>>,
67    mut store: &mut Store<Ctx<H>>,
68    msg: wrpc::wasmcloud::messaging0_2_0::types::BrokerMessage,
69) -> anyhow::Result<Result<(), String>>
70where
71    H: Handler,
72{
73    let call_handle_message = info_span!("call_handle_message");
74    store.data_mut().parent_context = Some(call_handle_message.context());
75    let bindings = pre.instantiate_async(&mut store).await?;
76    bindings
77        .wasmcloud_messaging0_2_0_handler()
78        .call_handle_message(
79            &mut store,
80            &types::BrokerMessage {
81                subject: msg.subject,
82                body: msg.body.into(),
83                reply_to: msg.reply_to,
84            },
85        )
86        .instrument(call_handle_message)
87        .await
88        .context("failed to call `wasmcloud:messaging/handler@0.2.0#handle-message`")
89}