wasmcloud_host/wasmbus/
config.rs

1//! Module with structs for use in managing and accessing config used by various wasmCloud entities
2use std::{collections::HashMap, fmt::Debug, sync::Arc};
3
4use anyhow::{bail, Context};
5use async_nats::jetstream::kv::{Operation, Store};
6use futures::{future::AbortHandle, stream::Abortable, TryStreamExt};
7use tokio::sync::{
8    watch::{self, Receiver, Sender},
9    RwLock, RwLockReadGuard,
10};
11use tracing::{error, warn, Instrument};
12
13type LockedConfig = Arc<RwLock<HashMap<String, String>>>;
14/// A cache of named config mapped to an existing receiver
15type WatchCache = Arc<RwLock<HashMap<String, Receiver<HashMap<String, String>>>>>;
16
17/// A struct used for mapping a config name to a receiver for logging/tracing purposes
18struct ConfigReceiver {
19    pub name: String,
20    pub receiver: Receiver<HashMap<String, String>>,
21}
22
23/// Helper struct that aborts on drop so we don't abort them when something is cloned in an arc. It
24/// will only abort after the last arc has been dropped.
25#[derive(Default)]
26struct AbortHandles {
27    handles: Vec<AbortHandle>,
28}
29
30impl Drop for AbortHandles {
31    fn drop(&mut self) {
32        for handle in &self.handles {
33            handle.abort();
34        }
35    }
36}
37
38/// A merged bundle of configuration for use with components that watches for updates to all named
39/// configs specified.
40///
41/// There are two main ways to get config from this struct:
42///
43/// 1. You can call [`get_config`](ConfigBundle::get_config) which will return a reference to the
44///    merged config. This is mainly for use in components, which will fetch needed config on demand
45/// 2. You can call [`changed`](ConfigBundle::changed) which will return a reference to the merged
46///    config. This is for use in situations where you want to be notified when a config changes,
47///    such as for a provider that needs to be notified when a config changes
48pub struct ConfigBundle {
49    /// A live view of the configuration that is being managed/updated by this bundle
50    merged_config: LockedConfig,
51    /// Names of named config that contribute to this bundle
52    config_names: Vec<String>,
53    /// A receiver that fires when changes are made to the bundle
54    changed_receiver: Receiver<()>,
55    /// Abort handles to the tasks that are watching for updates
56    ///
57    /// These are `drop()`ed when the bundle is dropped
58    _handles: Arc<AbortHandles>,
59    /// The sender that is used to notify the receiver that the config has changed, this
60    /// must not be dropped until the receiver is dropped so we ensure it's kept alive
61    _changed_notifier: Arc<Sender<()>>,
62}
63
64impl Clone for ConfigBundle {
65    fn clone(&self) -> Self {
66        // Cloning marks the value in the new receiver as seen, so we mark it as unseen, even if it
67        // was already viewed before cloning. This ensures that the newly cloned bundle will return
68        // the current config rather than needing to wait for a change.
69        let mut changed_receiver = self.changed_receiver.clone();
70        changed_receiver.mark_changed();
71        ConfigBundle {
72            merged_config: self.merged_config.clone(),
73            config_names: self.config_names.clone(),
74            changed_receiver,
75            _changed_notifier: self._changed_notifier.clone(),
76            _handles: self._handles.clone(),
77        }
78    }
79}
80
81impl Debug for ConfigBundle {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        f.debug_struct("ConfigBundle")
84            .field("merged_config", &self.merged_config)
85            .finish()
86    }
87}
88
89impl ConfigBundle {
90    /// Create a new config bundle.
91    ///
92    /// It takes an ordered list of receivers that should match the
93    /// order of config given by the user.
94    ///
95    /// This is only called internally.
96    #[must_use]
97    async fn new(receivers: Vec<ConfigReceiver>) -> Self {
98        // Generate the initial abort handles so we can construct the bundle
99        let (abort_handles, mut registrations): (Vec<_>, Vec<_>) =
100            std::iter::repeat_with(AbortHandle::new_pair)
101                .take(receivers.len())
102                .unzip();
103        // Now that we've set initial config, create the bundle and update the merged config with the latest values
104        let (changed_notifier, changed_receiver) = watch::channel(());
105        let changed_notifier = Arc::new(changed_notifier);
106        let mut bundle = ConfigBundle {
107            merged_config: Arc::default(),
108            config_names: receivers.iter().map(|r| r.name.clone()).collect(),
109            changed_receiver,
110            _changed_notifier: changed_notifier.clone(),
111            _handles: Arc::new(AbortHandles {
112                handles: abort_handles,
113            }),
114        };
115        let ordered_configs: Arc<Vec<Receiver<HashMap<String, String>>>> =
116            Arc::new(receivers.iter().map(|r| r.receiver.clone()).collect());
117        update_merge(&bundle.merged_config, &changed_notifier, &ordered_configs).await;
118        // Move all the receivers into spawned tasks to update the config
119        for ConfigReceiver { name, mut receiver } in receivers {
120            // SAFETY: We know we have the right amount of registrations because we just created
121            // them using the len above
122            let reg = registrations
123                .pop()
124                .expect("missing registration, this is developer error");
125            let cloned_name = name.clone();
126            let ordered_receivers = ordered_configs.clone();
127            let merged_config = bundle.merged_config.clone();
128            let notifier = changed_notifier.clone();
129            tokio::spawn(
130                Abortable::new(
131                    async move {
132                        loop {
133                            match receiver.changed().await {
134                                Ok(()) => {
135                                    update_merge(&merged_config, &notifier, &ordered_receivers)
136                                        .await;
137                                }
138                                Err(e) => {
139                                    warn!(error = %e, %name, "config sender dropped, updates will not be delivered");
140                                    return;
141                                }
142                            }
143                        }
144                    },
145                    reg,
146                )
147                .instrument(tracing::trace_span!("config_update", name = %cloned_name)),
148            );
149        }
150        // More likely than not, there will be a new value in the watch channel because we always
151        // read the latest value from the store before putting it here. But just in case, this
152        // ensures that the newly create bundle will return the current config rather than needing
153        // to wait for a change.
154        bundle.changed_receiver.mark_changed();
155        bundle
156    }
157
158    /// Returns a reference to the merged config behind a lock guard. This guard must be dropped
159    /// when you are no longer consuming the config
160    pub async fn get_config(&self) -> RwLockReadGuard<'_, HashMap<String, String>> {
161        self.merged_config.read().await
162    }
163
164    /// Waits for the config to change and returns a reference to the merged config behind a lock
165    /// guard. This guard must be dropped when you are no longer consuming the config.
166    ///
167    /// Please note that this requires a mutable borrow in order to manage underlying notification
168    /// acknowledgement.
169    pub async fn changed(
170        &mut self,
171    ) -> anyhow::Result<RwLockReadGuard<'_, HashMap<String, String>>> {
172        // NOTE(thomastaylor312): We use a watch channel here because we want everything to get
173        // notified individually (including clones) if config changes. Notify doesn't quite work
174        // because we have to have a permit existing when we create otherwise the notify_watchers
175        // won't actually get picked up (that only happens with notify_one).
176        if let Err(e) = self.changed_receiver.changed().await {
177            // If we get here, it likely means that a whole bunch of stuff has failed above it.
178            // Might be worth changing this to a panic
179            error!(error = %e, "Config changed receiver errored, this means that the config sender has dropped and the whole bundle has failed");
180            bail!("failed to read receiver: {e}");
181        }
182        Ok(self.merged_config.read().await)
183    }
184
185    /// Returns a reference to the ordered list of config names handled by this bundle
186    #[must_use]
187    pub fn config_names(&self) -> &Vec<String> {
188        &self.config_names
189    }
190}
191
192/// A struct used for generating a config bundle given a list of named configs
193#[derive(Clone)]
194pub struct BundleGenerator {
195    store: Store,
196    watch_cache: WatchCache,
197    watch_handles: Arc<RwLock<AbortHandles>>,
198}
199
200impl BundleGenerator {
201    /// Create a new bundle generator
202    #[must_use]
203    pub fn new(store: Store) -> Self {
204        Self {
205            store,
206            watch_cache: Arc::default(),
207            watch_handles: Arc::default(),
208        }
209    }
210
211    /// Generate a new config bundle. Will return an error if any of the configs do not exist or if
212    /// there was an error fetching the initial config
213    pub async fn generate(&self, config_names: Vec<String>) -> anyhow::Result<ConfigBundle> {
214        let receivers: Vec<ConfigReceiver> =
215            futures::future::join_all(config_names.into_iter().map(|name| self.get_receiver(name)))
216                .await
217                .into_iter()
218                .collect::<anyhow::Result<_>>()?;
219        Ok(ConfigBundle::new(receivers).await)
220    }
221
222    async fn get_receiver(&self, name: String) -> anyhow::Result<ConfigReceiver> {
223        // First check the cache to see if we already have a receiver for this config
224        if let Some(receiver) = self.watch_cache.read().await.get(&name) {
225            return Ok(ConfigReceiver {
226                name,
227                receiver: receiver.clone(),
228            });
229        }
230
231        // We need to actually try and fetch the config here. If we don't do this, then a watch will
232        // just blindly watch even if the key doesn't exist. We should return an error if the config
233        // doesn't exist or has data issues. It also allows us to set the initial value
234        let config: HashMap<String, String> = match self.store.get(&name).await {
235            Ok(Some(data)) => serde_json::from_slice(&data)
236                .context("Data corruption error, unable to decode data from store")?,
237            Ok(None) => return Err(anyhow::anyhow!("Config {} does not exist", name)),
238            Err(e) => return Err(anyhow::anyhow!("Error fetching config {}: {}", name, e)),
239        };
240
241        // Otherwise we need to setup the watcher. We start by setting up the watch so we don't miss
242        // any events after we query the initial config
243        let (tx, rx) = watch::channel(config);
244        let (done, wait) = tokio::sync::oneshot::channel();
245        let (handle, reg) = AbortHandle::new_pair();
246        tokio::task::spawn(Abortable::new(
247            watcher_loop(self.store.clone(), name.clone(), tx, done),
248            reg,
249        ));
250
251        wait.await
252            .context("Error waiting for watcher to start")?
253            .context("Error waiting for watcher to start")?;
254
255        // NOTE(thomastaylor312): We should probably find a way to clear out this cache. The Sender
256        // part of the channel can tell you how many receivers it has, but we pass that along to the
257        // watcher, so there would need to be more work to expose that, probably via a struct. We
258        // could also do something with a resource counter and track that way with a cleanup task.
259        // But for now going the easy route as we already cache everything anyway
260        self.watch_handles.write().await.handles.push(handle);
261        self.watch_cache
262            .write()
263            .await
264            .insert(name.clone(), rx.clone());
265
266        Ok(ConfigReceiver { name, receiver: rx })
267    }
268}
269
270async fn watcher_loop(
271    store: Store,
272    name: String,
273    tx: watch::Sender<HashMap<String, String>>,
274    done: tokio::sync::oneshot::Sender<anyhow::Result<()>>,
275) {
276    // We need to watch with history so we can get the initial config.
277    let mut watcher = match store.watch(&name).await {
278        Ok(watcher) => {
279            done.send(Ok(())).expect(
280                "Receiver for watcher setup should not have been dropped. This is programmer error",
281            );
282            watcher
283        }
284        Err(e) => {
285            done.send(Err(anyhow::anyhow!(
286                "Error setting up watcher for {}: {}",
287                name,
288                e
289            )))
290            .expect(
291                "Receiver for watcher setup should not have been dropped. This is programmer error",
292            );
293            return;
294        }
295    };
296    loop {
297        match watcher.try_next().await {
298            Ok(Some(entry)) if matches!(entry.operation, Operation::Delete | Operation::Purge) => {
299                // NOTE(thomastaylor312): We should probably do something and notify something up
300                // the chain if we get a delete or purge event of a config that is still being used.
301                // For now we just zero it out
302                tx.send_replace(HashMap::new());
303            }
304            Ok(Some(entry)) => {
305                let config: HashMap<String, String> = match serde_json::from_slice(&entry.value) {
306                    Ok(config) => config,
307                    Err(e) => {
308                        error!(%name, error = %e, "Error decoding config from store during watch");
309                        continue;
310                    }
311                };
312                tx.send_if_modified(|current| {
313                    if current == &config {
314                        false
315                    } else {
316                        *current = config;
317                        true
318                    }
319                });
320            }
321            Ok(None) => {
322                error!(%name, "Watcher for config has closed");
323                return;
324            }
325            Err(e) => {
326                error!(%name, error = %e, "Error reading from watcher for config. Will wait for next entry");
327                continue;
328            }
329        }
330    }
331}
332
333async fn update_merge(
334    merged_config: &RwLock<HashMap<String, String>>,
335    changed_notifier: &Sender<()>,
336    ordered_receivers: &[Receiver<HashMap<String, String>>],
337) {
338    // We get a write lock to start so nothing else can update the merged config while we merge
339    // in the other configs (e.g. when one of the ordered configs is write locked)
340    let mut hashmap = merged_config.write().await;
341    hashmap.clear();
342
343    // NOTE(thomastaylor312): There is a possible optimization here where we could just create a
344    // temporary hashmap of borrowed strings and then after extending everything we could
345    // into_iter it and clone it into the final hashmap. This would avoid extra allocations at
346    // the cost of a few more iterations
347    for recv in ordered_receivers {
348        hashmap.extend(recv.borrow().clone());
349    }
350    // Send a notification that the config has changed
351    changed_notifier.send_replace(());
352}
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357    use std::time::Duration;
358
359    use tokio::sync::watch;
360
361    #[tokio::test]
362    async fn test_config_bundle() {
363        let (foo_tx, foo_rx) =
364            watch::channel(HashMap::from([("foo".to_string(), "bar".to_string())]));
365        let (bar_tx, bar_rx) = watch::channel(HashMap::new());
366        let (baz_tx, baz_rx) = watch::channel(HashMap::new());
367
368        let mut bundle = ConfigBundle::new(vec![
369            ConfigReceiver {
370                name: "foo".to_string(),
371                receiver: foo_rx,
372            },
373            ConfigReceiver {
374                name: "bar".to_string(),
375                receiver: bar_rx,
376            },
377            ConfigReceiver {
378                name: "baz".to_string(),
379                receiver: baz_rx,
380            },
381        ])
382        .await;
383
384        // We should be able to get the initial config before sending anything
385        assert_eq!(
386            *bundle.get_config().await,
387            HashMap::from([("foo".to_string(), "bar".to_string())])
388        );
389
390        // Should also be able to get a value from the changed method immediately
391        let _ = tokio::time::timeout(Duration::from_millis(50), bundle.changed())
392            .await
393            .expect("Should have received a config");
394
395        // Update the bar config to overwrite the foo config
396        bar_tx.send_replace(HashMap::from([("foo".to_string(), "baz".to_string())]));
397        // Wait for the new config to come. This calls the same underlying method as get_config
398        let conf = tokio::time::timeout(Duration::from_millis(50), bundle.changed())
399            .await
400            .expect("conf should have been present")
401            .expect("Should have received a config");
402        assert_eq!(
403            *conf,
404            HashMap::from([("foo".to_string(), "baz".to_string())])
405        );
406        drop(conf);
407
408        // Update the baz config with additional data
409        baz_tx.send_replace(HashMap::from([("star".to_string(), "wars".to_string())]));
410        let conf = tokio::time::timeout(Duration::from_millis(50), bundle.changed())
411            .await
412            .expect("conf should have been present")
413            .expect("Should have received a config");
414        assert_eq!(
415            *conf,
416            HashMap::from([
417                ("foo".to_string(), "baz".to_string()),
418                ("star".to_string(), "wars".to_string())
419            ])
420        );
421        drop(conf);
422
423        // Update foo config with additional data
424        foo_tx.send_replace(HashMap::from([
425            ("starship".to_string(), "troopers".to_string()),
426            ("foo".to_string(), "bar".to_string()),
427        ]));
428        let conf = tokio::time::timeout(Duration::from_millis(50), bundle.changed())
429            .await
430            .expect("conf should have been present")
431            .expect("Should have received a config");
432        // Check that the config merged correctly
433        assert_eq!(
434            *conf,
435            HashMap::from([
436                ("foo".to_string(), "baz".to_string()),
437                ("star".to_string(), "wars".to_string()),
438                ("starship".to_string(), "troopers".to_string())
439            ]),
440        );
441    }
442}