wasmcloud_host/nats/
provider.rs

1use std::sync::Arc;
2
3use anyhow::Context as _;
4use tracing::instrument;
5use wasmcloud_core::{link_del_subject, link_put_subject};
6use wasmcloud_tracing::context::TraceContextInjector;
7
8use crate::wasmbus::{injector_to_headers, providers::ProviderManager};
9
10/// NATS implementation of the wasmCloud [crate::wasmbus::providers::ProviderManager] extension trait
11pub struct NatsProviderManager {
12    pub(crate) nats_client: Arc<async_nats::Client>,
13    pub(crate) lattice: String,
14}
15
16impl NatsProviderManager {
17    /// Create a new NATS provider manager
18    pub fn new(nats_client: Arc<async_nats::Client>, lattice: String) -> Self {
19        Self {
20            nats_client,
21            lattice,
22        }
23    }
24}
25
26#[async_trait::async_trait]
27impl ProviderManager for NatsProviderManager {
28    #[instrument(level = "debug", skip(self))]
29    async fn put_link(
30        &self,
31        link: &wasmcloud_core::InterfaceLinkDefinition,
32        target: &str,
33    ) -> anyhow::Result<()> {
34        let lattice = &self.lattice;
35        let payload =
36            serde_json::to_vec(link).context("failed to serialize provider link definition")?;
37        self.nats_client
38            .publish_with_headers(
39                link_put_subject(lattice, target),
40                injector_to_headers(&TraceContextInjector::default_with_span()),
41                payload.into(),
42            )
43            .await
44            .context("failed to publish provider link definition")?;
45        Ok(())
46    }
47
48    #[instrument(level = "debug", skip(self))]
49    async fn delete_link(
50        &self,
51        link: &wasmcloud_core::InterfaceLinkDefinition,
52        target: &str,
53    ) -> anyhow::Result<()> {
54        let lattice = &self.lattice;
55        let payload =
56            serde_json::to_vec(link).context("failed to serialize provider link definition")?;
57        self.nats_client
58            .publish_with_headers(
59                link_del_subject(lattice, target),
60                injector_to_headers(&TraceContextInjector::default_with_span()),
61                payload.into(),
62            )
63            .await
64            .context("failed to publish provider link definition")?;
65        Ok(())
66    }
67}