wasmcloud_host/nats/
event.rs

1//! NATS implementation of the wasmCloud [crate::event::EventPublisher] extension trait
2
3use anyhow::Context;
4use cloudevents::{EventBuilder, EventBuilderV10};
5use time::{format_description::well_known::Rfc3339, OffsetDateTime};
6use tracing::{instrument, warn};
7use ulid::Ulid;
8use uuid::Uuid;
9
10use crate::event::EventPublisher;
11
12/// NATS implementation of the wasmCloud [crate::event::EventPublisher] extension trait,
13/// sending events to the NATS message bus with a CloudEvents payload envelope.
14pub struct NatsEventPublisher {
15    event_builder: EventBuilderV10,
16    lattice: String,
17    ctl_nats: async_nats::Client,
18}
19
20impl NatsEventPublisher {
21    /// Create a new NATS event publisher.
22    ///
23    /// # Arguments
24    ///
25    /// * `source` - The source of the event, typically the host ID.
26    /// * `lattice` - The lattice name to use for the event publisher.
27    /// * `ctl_nats` - The NATS client to use for publishing events.
28    pub fn new(source: String, lattice: String, ctl_nats: async_nats::Client) -> Self {
29        Self {
30            event_builder: EventBuilderV10::new().source(source),
31            lattice,
32            ctl_nats,
33        }
34    }
35}
36
37#[async_trait::async_trait]
38impl EventPublisher for NatsEventPublisher {
39    #[instrument(skip(self, data))]
40    async fn publish_event(&self, name: &str, data: serde_json::Value) -> anyhow::Result<()> {
41        let now = OffsetDateTime::now_utc()
42            .format(&Rfc3339)
43            .context("failed to format current time")?;
44        let ev = self
45            .event_builder
46            .clone()
47            .ty(format!("com.wasmcloud.lattice.{name}"))
48            .id(Uuid::from_u128(Ulid::new().into()).to_string())
49            .time(now)
50            .data("application/json", data)
51            .build()
52            .context("failed to build cloud event")?;
53        let ev = serde_json::to_vec(&ev).context("failed to serialize event")?;
54        let max_payload = self.ctl_nats.server_info().max_payload;
55        let lattice = &self.lattice;
56        if ev.len() > max_payload {
57            warn!(
58                size = ev.len(),
59                max_size = max_payload,
60                event = name,
61                lattice = &lattice,
62                "event payload is too large to publish and may fail",
63            );
64        }
65        self.ctl_nats
66            .publish(format!("wasmbus.evt.{lattice}.{name}"), ev.into())
67            .await
68            .with_context(|| format!("failed to publish `{name}` event"))
69    }
70}