wasmcloud_runtime/component/messaging/
v0_2.rs1use core::future::Future;
2
3use anyhow::Context as _;
4use tracing::{info_span, instrument, Instrument as _};
5use tracing_opentelemetry::OpenTelemetrySpanExt as _;
6use wasmtime::Store;
7
8use crate::capability::messaging0_2_0::{consumer, types};
9use crate::capability::wrpc;
10use crate::component::{Ctx, Handler};
11
12pub mod bindings {
13 wasmtime::component::bindgen!({
14 world: "messaging-handler-oh-two",
15 imports: { default: async | trappable | tracing },
16 exports: { default: async | trappable | tracing },
17 with: {
18 "wasmcloud:messaging/types": crate::capability::messaging0_2_0::types,
19 },
20 });
21}
22
23pub trait Messaging {
25 fn request(
27 &self,
28 subject: String,
29 body: Vec<u8>,
30 timeout_ms: u32,
31 ) -> impl Future<Output = anyhow::Result<Result<types::BrokerMessage, String>>> + Send;
32
33 fn publish(
35 &self,
36 msg: types::BrokerMessage,
37 ) -> impl Future<Output = anyhow::Result<Result<(), String>>> + Send;
38}
39
40impl<H> types::Host for Ctx<H> where H: Handler {}
41
42impl<H> consumer::Host for Ctx<H>
43where
44 H: Handler,
45{
46 #[instrument(level = "debug", skip_all)]
47 async fn request(
48 &mut self,
49 subject: String,
50 body: Vec<u8>,
51 timeout_ms: u32,
52 ) -> anyhow::Result<Result<types::BrokerMessage, String>> {
53 self.attach_parent_context();
54 Messaging::request(&self.handler, subject, body, timeout_ms).await
55 }
56
57 #[instrument(level = "debug", skip_all)]
58 async fn publish(&mut self, msg: types::BrokerMessage) -> anyhow::Result<Result<(), String>> {
59 self.attach_parent_context();
60 self.handler.publish(msg).await
61 }
62}
63
64#[instrument(level = "debug", skip_all)]
65pub(crate) async fn handle_message<H>(
66 pre: bindings::MessagingHandlerOhTwoPre<Ctx<H>>,
67 mut store: &mut Store<Ctx<H>>,
68 msg: wrpc::wasmcloud::messaging0_2_0::types::BrokerMessage,
69) -> anyhow::Result<Result<(), String>>
70where
71 H: Handler,
72{
73 let call_handle_message = info_span!("call_handle_message");
74 store.data_mut().parent_context = Some(call_handle_message.context());
75 let bindings = pre.instantiate_async(&mut store).await?;
76 bindings
77 .wasmcloud_messaging0_2_0_handler()
78 .call_handle_message(
79 &mut store,
80 &types::BrokerMessage {
81 subject: msg.subject,
82 body: msg.body.into(),
83 reply_to: msg.reply_to,
84 },
85 )
86 .instrument(call_handle_message)
87 .await
88 .context("failed to call `wasmcloud:messaging/handler@0.2.0#handle-message`")
89}