wasmcloud_runtime/component/messaging/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
use core::ops::Deref;

use anyhow::Context as _;
use tracing::{instrument, warn, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt as _;

use crate::capability::wrpc;
use crate::component::{new_store, Handler, Instance, WrpcServeEvent};

pub mod v0_2;
pub mod v0_3;

impl<H, C> wrpc::exports::wasmcloud::messaging0_2_0::handler::Handler<C> for Instance<H, C>
where
    H: Handler,
    C: Send + Deref<Target = Span>,
{
    #[instrument(level = "debug", skip_all)]
    async fn handle_message(
        &self,
        cx: C,
        msg: wrpc::wasmcloud::messaging0_2_0::types::BrokerMessage,
    ) -> anyhow::Result<Result<(), String>> {
        // Set the parent of the current context to the span passed in
        Span::current().set_parent(cx.deref().context());
        let mut store = new_store(&self.engine, self.handler.clone(), self.max_execution_time);

        // If wasmcloud:messaging@0.3.0 is enabled and we can instantiate the 0.3.0 bindings,
        // handle the message using 0.3.0. Otherwise, use the 0.2.0 bindings.
        let res = if self.experimental_features.wasmcloud_messaging_v3 {
            if let Ok(pre) = v0_3::bindings::MessagingHandlerPre::new(self.pre.clone()) {
                v0_3::handle_message(pre, &mut store, msg).await
            } else {
                let pre = v0_2::bindings::MessagingHandlerOhTwoPre::new(self.pre.clone())
                    .context("failed to pre-instantiate `wasmcloud:messaging/handler`")?;
                v0_2::handle_message(pre, &mut store, msg).await
            }
        } else {
            let pre = v0_2::bindings::MessagingHandlerOhTwoPre::new(self.pre.clone())
                .context("failed to pre-instantiate `wasmcloud:messaging/handler`")?;
            v0_2::handle_message(pre, &mut store, msg).await
        };

        let success = res.is_ok();
        if let Err(err) =
            self.events
                .try_send(WrpcServeEvent::MessagingHandlerHandleMessageReturned {
                    context: cx,
                    success,
                })
        {
            warn!(
                ?err,
                success, "failed to send `wasmcloud:messaging/handler.handle-message` return event"
            );
        }
        res
    }
}