wasmcloud_host/nats/
event.rs1use anyhow::Context;
4use cloudevents::{EventBuilder, EventBuilderV10};
5use time::{format_description::well_known::Rfc3339, OffsetDateTime};
6use tracing::{instrument, warn};
7use ulid::Ulid;
8use uuid::Uuid;
9
10use crate::event::EventPublisher;
11
12pub struct NatsEventPublisher {
15 event_builder: EventBuilderV10,
16 lattice: String,
17 ctl_nats: async_nats::Client,
18}
19
20impl NatsEventPublisher {
21 pub fn new(source: String, lattice: String, ctl_nats: async_nats::Client) -> Self {
29 Self {
30 event_builder: EventBuilderV10::new().source(source),
31 lattice,
32 ctl_nats,
33 }
34 }
35}
36
37#[async_trait::async_trait]
38impl EventPublisher for NatsEventPublisher {
39 #[instrument(skip(self, data))]
40 async fn publish_event(&self, name: &str, data: serde_json::Value) -> anyhow::Result<()> {
41 let now = OffsetDateTime::now_utc()
42 .format(&Rfc3339)
43 .context("failed to format current time")?;
44 let ev = self
45 .event_builder
46 .clone()
47 .ty(format!("com.wasmcloud.lattice.{name}"))
48 .id(Uuid::from_u128(Ulid::new().into()).to_string())
49 .time(now)
50 .data("application/json", data)
51 .build()
52 .context("failed to build cloud event")?;
53 let ev = serde_json::to_vec(&ev).context("failed to serialize event")?;
54 let max_payload = self.ctl_nats.server_info().max_payload;
55 let lattice = &self.lattice;
56 if ev.len() > max_payload {
57 warn!(
58 size = ev.len(),
59 max_size = max_payload,
60 event = name,
61 lattice = &lattice,
62 "event payload is too large to publish and may fail",
63 );
64 }
65 self.ctl_nats
66 .publish(format!("wasmbus.evt.{lattice}.{name}"), ev.into())
67 .await
68 .with_context(|| format!("failed to publish `{name}` event"))
69 }
70}