wasmcloud_host/nats/
store.rs

1//! Implementation of the [crate::config::ConfigManager] and [crate::store::StoreManager] traits for NATS JetStream KV [Store].
2
3use std::{collections::HashMap, sync::Arc};
4
5use anyhow::{anyhow, ensure, Context as _};
6use async_nats::jetstream::kv::{Entry as KvEntry, Operation, Store};
7use bytes::Bytes;
8use futures::{StreamExt as _, TryStreamExt as _};
9use tokio::{
10    sync::watch::{self, Receiver},
11    task::JoinSet,
12};
13use tracing::{debug, error, instrument, warn};
14
15use crate::{
16    config::ConfigManager,
17    store::StoreManager,
18    wasmbus::{
19        claims::{Claims, StoredClaims},
20        ComponentSpecification,
21    },
22};
23
24#[async_trait::async_trait]
25impl StoreManager for Store {
26    #[instrument(level = "debug", skip(self))]
27    async fn get(&self, key: &str) -> anyhow::Result<Option<Bytes>> {
28        self.get(key)
29            .await
30            .map_err(|err| anyhow::anyhow!("Failed to get config: {}", err))
31    }
32
33    #[instrument(level = "debug", skip(self, value))]
34    async fn put(&self, key: &str, value: Bytes) -> anyhow::Result<()> {
35        self.put(key, value)
36            .await
37            .map(|_| ())
38            .map_err(|err| anyhow::anyhow!("Failed to set config: {}", err))
39    }
40
41    #[instrument(level = "debug", skip(self))]
42    async fn del(&self, key: &str) -> anyhow::Result<()> {
43        self.purge(key)
44            .await
45            .map_err(|err| anyhow::anyhow!("Failed to delete config: {}", err))
46    }
47}
48
49#[async_trait::async_trait]
50impl ConfigManager for Store {
51    /// Watch the key in the JetStream bucket for changes. This will return a channel that will
52    /// receive updates to the config as they happen.
53    async fn watch(&self, name: &str) -> anyhow::Result<Receiver<HashMap<String, String>>> {
54        let config: HashMap<String, String> = match self.get(name).await {
55            Ok(Some(data)) => serde_json::from_slice(&data)
56                .context("Data corruption error, unable to decode data from store")?,
57            Ok(None) => return Err(anyhow::anyhow!("Config {} does not exist", name)),
58            Err(e) => return Err(anyhow::anyhow!("Error fetching config {}: {}", name, e)),
59        };
60
61        let (tx, rx) = watch::channel(config);
62        // Since we're starting a task, we need to own this data
63        let name = name.to_owned();
64        let mut watcher = self.watch(&name).await.context("Failed to watch config")?;
65
66        tokio::spawn(async move {
67            loop {
68                if tx.is_closed() {
69                    warn!(%name, "config watch channel closed, aborting watch");
70                    return;
71                }
72
73                match watcher.try_next().await {
74                    Ok(Some(entry))
75                        if matches!(entry.operation, Operation::Delete | Operation::Purge) =>
76                    {
77                        // NOTE(thomastaylor312): We should probably do something and notify something up
78                        // the chain if we get a delete or purge event of a config that is still being used.
79                        // For now we just zero it out
80                        tx.send_replace(HashMap::new());
81                    }
82                    Ok(Some(entry)) => {
83                        let config: HashMap<String, String> = match serde_json::from_slice(
84                            &entry.value,
85                        ) {
86                            Ok(config) => config,
87                            Err(e) => {
88                                error!(%name, error = %e, "Error decoding config from store during watch");
89                                continue;
90                            }
91                        };
92                        tx.send_if_modified(|current| {
93                            if current == &config {
94                                false
95                            } else {
96                                *current = config;
97                                true
98                            }
99                        });
100                    }
101                    Ok(None) => {
102                        error!(%name, "Watcher for config has closed");
103                        return;
104                    }
105                    Err(e) => {
106                        error!(%name, error = %e, "Error reading from watcher for config. Will wait for next entry");
107                        continue;
108                    }
109                }
110            }
111        });
112
113        Ok(rx)
114    }
115}
116
117/// This is an extra implementation for the host to process entries coming from a JetStream bucket.
118impl crate::wasmbus::Host {
119    #[instrument(level = "trace", skip_all)]
120    pub(crate) async fn process_entry(
121        &self,
122        KvEntry {
123            key,
124            value,
125            operation,
126            ..
127        }: KvEntry,
128    ) {
129        let key_id = key.split_once('_');
130        let res = match (operation, key_id) {
131            (Operation::Put, Some(("COMPONENT", id))) => {
132                self.process_component_spec_put(id, value).await
133            }
134            (Operation::Delete, Some(("COMPONENT", id))) => {
135                self.process_component_spec_delete(id).await
136            }
137            (Operation::Put, Some(("LINKDEF", _id))) => {
138                debug!("ignoring deprecated LINKDEF put operation");
139                Ok(())
140            }
141            (Operation::Delete, Some(("LINKDEF", _id))) => {
142                debug!("ignoring deprecated LINKDEF delete operation");
143                Ok(())
144            }
145            (Operation::Put, Some(("CLAIMS", pubkey))) => {
146                self.process_claims_put(pubkey, value).await
147            }
148            (Operation::Delete, Some(("CLAIMS", pubkey))) => {
149                self.process_claims_delete(pubkey, value).await
150            }
151            (operation, Some(("REFMAP", id))) => {
152                // TODO: process REFMAP entries
153                debug!(?operation, id, "ignoring REFMAP entry");
154                Ok(())
155            }
156            _ => {
157                warn!(key, ?operation, "unsupported KV bucket entry");
158                Ok(())
159            }
160        };
161        if let Err(error) = &res {
162            error!(key, ?operation, ?error, "failed to process KV bucket entry");
163        }
164    }
165
166    #[instrument(level = "debug", skip_all)]
167    pub(crate) async fn process_component_spec_put(
168        &self,
169        id: impl AsRef<str>,
170        value: impl AsRef<[u8]>,
171    ) -> anyhow::Result<()> {
172        let id = id.as_ref();
173        debug!(id, "process component spec put");
174
175        let spec: ComponentSpecification = serde_json::from_slice(value.as_ref())
176            .context("failed to deserialize component specification")?;
177        self.update_host_with_spec(id, &spec)
178            .await
179            .context("failed to update component spec")?;
180
181        Ok(())
182    }
183
184    #[instrument(level = "debug", skip_all)]
185    pub(crate) async fn process_component_spec_delete(
186        &self,
187        id: impl AsRef<str>,
188    ) -> anyhow::Result<()> {
189        debug!(id = id.as_ref(), "process component delete");
190        self.delete_component_spec(id).await
191    }
192
193    #[instrument(level = "debug", skip_all)]
194    /// Process claims being put into the JetStream data store.
195    ///
196    /// Notably this updates the host map but does not call [Self::store_claims], which
197    /// would cause an infinite loop.
198    pub(crate) async fn process_claims_put(
199        &self,
200        pubkey: impl AsRef<str>,
201        value: impl AsRef<[u8]>,
202    ) -> anyhow::Result<()> {
203        let pubkey = pubkey.as_ref();
204
205        debug!(pubkey, "process claim entry put");
206
207        let stored_claims: StoredClaims =
208            serde_json::from_slice(value.as_ref()).context("failed to decode stored claims")?;
209        let claims = Claims::from(stored_claims);
210
211        ensure!(claims.subject() == pubkey, "subject mismatch");
212        match claims {
213            Claims::Component(claims) => self.store_component_claims(claims).await,
214            Claims::Provider(claims) => self.store_provider_claims(claims).await,
215        }
216    }
217
218    #[instrument(level = "debug", skip_all)]
219    pub(crate) async fn process_claims_delete(
220        &self,
221        pubkey: impl AsRef<str>,
222        value: impl AsRef<[u8]>,
223    ) -> anyhow::Result<()> {
224        let pubkey = pubkey.as_ref();
225
226        debug!(pubkey, "process claim entry deletion");
227
228        let stored_claims: StoredClaims =
229            serde_json::from_slice(value.as_ref()).context("failed to decode stored claims")?;
230        let claims = Claims::from(stored_claims);
231
232        ensure!(claims.subject() == pubkey, "subject mismatch");
233
234        match claims {
235            Claims::Component(_) => self.delete_component_claims(pubkey).await,
236            Claims::Provider(_) => self.delete_provider_claims(pubkey).await,
237        }
238    }
239}
240
241/// Watch the JetStream bucket for changes to the ComponentSpec and claims data
242pub async fn data_watch(
243    tasks: &mut JoinSet<anyhow::Result<()>>,
244    store: Store,
245    host: Arc<crate::wasmbus::Host>,
246) -> anyhow::Result<()> {
247    tasks.spawn({
248        let host = Arc::clone(&host);
249        let data = store.clone();
250        async move {
251            // Setup data watch first
252            let data_watch = data
253                .watch_all()
254                .await
255                .context("failed to watch lattice data bucket")?;
256
257            // Process existing data without emitting events
258            data.keys()
259                .await
260                .context("failed to read keys of lattice data bucket")?
261                .map_err(|e| anyhow!(e).context("failed to read lattice data stream"))
262                .try_filter_map(|key| async {
263                    data.entry(key)
264                        .await
265                        .context("failed to get entry in lattice data bucket")
266                })
267                .for_each(|entry| async {
268                    match entry {
269                        Ok(entry) => host.process_entry(entry).await,
270                        Err(err) => {
271                            error!(%err, "failed to read entry from lattice data bucket")
272                        }
273                    }
274                })
275                .await;
276            data_watch
277                .for_each({
278                    let host = Arc::clone(&host);
279                    move |entry| {
280                        let host = Arc::clone(&host);
281                        async move {
282                            match entry {
283                                Err(error) => {
284                                    error!("failed to watch lattice data bucket: {error}");
285                                }
286                                Ok(entry) => host.process_entry(entry).await,
287                            }
288                        }
289                    }
290                })
291                .await;
292            let deadline = { *host.stop_rx.borrow() };
293            host.stop_tx.send_replace(deadline);
294            Ok(())
295        }
296    });
297
298    Ok(())
299}