wasmcloud_host/wasmbus/
config.rs

1//! Module with structs for use in managing and accessing config used by various wasmCloud entities
2use std::{collections::HashMap, fmt::Debug, sync::Arc};
3
4use anyhow::{bail, Context as _};
5use futures::{future::AbortHandle, stream::Abortable};
6use tokio::sync::{
7    watch::{self, Receiver, Sender},
8    RwLock, RwLockReadGuard,
9};
10use tracing::{error, warn, Instrument};
11
12use crate::config::ConfigManager;
13
14type LockedConfig = Arc<RwLock<HashMap<String, String>>>;
15/// A cache of named config mapped to an existing receiver
16type WatchCache = Arc<RwLock<HashMap<String, Receiver<HashMap<String, String>>>>>;
17
18/// A struct used for mapping a config name to a receiver for logging/tracing purposes
19struct ConfigReceiver {
20    pub name: String,
21    pub receiver: Receiver<HashMap<String, String>>,
22}
23
24/// Helper struct that aborts on drop so we don't abort them when something is cloned in an arc. It
25/// will only abort after the last arc has been dropped.
26#[derive(Default)]
27struct AbortHandles {
28    handles: Vec<AbortHandle>,
29}
30
31impl Drop for AbortHandles {
32    fn drop(&mut self) {
33        for handle in &self.handles {
34            handle.abort();
35        }
36    }
37}
38
39/// A merged bundle of configuration for use with components that watches for updates to all named
40/// configs specified.
41///
42/// There are two main ways to get config from this struct:
43///
44/// 1. You can call [`get_config`](ConfigBundle::get_config) which will return a reference to the
45///    merged config. This is mainly for use in components, which will fetch needed config on demand
46/// 2. You can call [`changed`](ConfigBundle::changed) which will return a reference to the merged
47///    config. This is for use in situations where you want to be notified when a config changes,
48///    such as for a provider that needs to be notified when a config changes
49pub struct ConfigBundle {
50    /// A live view of the configuration that is being managed/updated by this bundle
51    merged_config: LockedConfig,
52    /// Names of named config that contribute to this bundle
53    config_names: Vec<String>,
54    /// A receiver that fires when changes are made to the bundle
55    changed_receiver: Receiver<()>,
56    /// Abort handles to the tasks that are watching for updates
57    ///
58    /// These are `drop()`ed when the bundle is dropped
59    _handles: Arc<AbortHandles>,
60    /// The sender that is used to notify the receiver that the config has changed, this
61    /// must not be dropped until the receiver is dropped so we ensure it's kept alive
62    _changed_notifier: Arc<Sender<()>>,
63}
64
65impl Clone for ConfigBundle {
66    fn clone(&self) -> Self {
67        // Cloning marks the value in the new receiver as seen, so we mark it as unseen, even if it
68        // was already viewed before cloning. This ensures that the newly cloned bundle will return
69        // the current config rather than needing to wait for a change.
70        let mut changed_receiver = self.changed_receiver.clone();
71        changed_receiver.mark_changed();
72        ConfigBundle {
73            merged_config: self.merged_config.clone(),
74            config_names: self.config_names.clone(),
75            changed_receiver,
76            _changed_notifier: self._changed_notifier.clone(),
77            _handles: self._handles.clone(),
78        }
79    }
80}
81
82impl Debug for ConfigBundle {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        f.debug_struct("ConfigBundle")
85            .field("merged_config", &self.merged_config)
86            .finish()
87    }
88}
89
90impl ConfigBundle {
91    /// Create a new config bundle.
92    ///
93    /// It takes an ordered list of receivers that should match the
94    /// order of config given by the user.
95    ///
96    /// This is only called internally.
97    #[must_use]
98    async fn new(receivers: Vec<ConfigReceiver>) -> Self {
99        // Generate the initial abort handles so we can construct the bundle
100        let (abort_handles, mut registrations): (Vec<_>, Vec<_>) =
101            std::iter::repeat_with(AbortHandle::new_pair)
102                .take(receivers.len())
103                .unzip();
104        // Now that we've set initial config, create the bundle and update the merged config with the latest values
105        let (changed_notifier, changed_receiver) = watch::channel(());
106        let changed_notifier = Arc::new(changed_notifier);
107        let mut bundle = ConfigBundle {
108            merged_config: Arc::default(),
109            config_names: receivers.iter().map(|r| r.name.clone()).collect(),
110            changed_receiver,
111            _changed_notifier: changed_notifier.clone(),
112            _handles: Arc::new(AbortHandles {
113                handles: abort_handles,
114            }),
115        };
116        let ordered_configs: Arc<Vec<Receiver<HashMap<String, String>>>> =
117            Arc::new(receivers.iter().map(|r| r.receiver.clone()).collect());
118        update_merge(&bundle.merged_config, &changed_notifier, &ordered_configs).await;
119        // Move all the receivers into spawned tasks to update the config
120        for ConfigReceiver { name, mut receiver } in receivers {
121            // SAFETY: We know we have the right amount of registrations because we just created
122            // them using the len above
123            let reg = registrations
124                .pop()
125                .expect("missing registration, this is developer error");
126            let cloned_name = name.clone();
127            let ordered_receivers = ordered_configs.clone();
128            let merged_config = bundle.merged_config.clone();
129            let notifier = changed_notifier.clone();
130            tokio::spawn(
131                Abortable::new(
132                    async move {
133                        loop {
134                            match receiver.changed().await {
135                                Ok(()) => {
136                                    update_merge(&merged_config, &notifier, &ordered_receivers)
137                                        .await;
138                                }
139                                Err(e) => {
140                                    warn!(error = %e, %name, "config sender dropped, updates will not be delivered");
141                                    return;
142                                }
143                            }
144                        }
145                    },
146                    reg,
147                )
148                .instrument(tracing::trace_span!("config_update", name = %cloned_name)),
149            );
150        }
151        // More likely than not, there will be a new value in the watch channel because we always
152        // read the latest value from the store before putting it here. But just in case, this
153        // ensures that the newly create bundle will return the current config rather than needing
154        // to wait for a change.
155        bundle.changed_receiver.mark_changed();
156        bundle
157    }
158
159    /// Returns a reference to the merged config behind a lock guard. This guard must be dropped
160    /// when you are no longer consuming the config
161    pub async fn get_config(&self) -> RwLockReadGuard<'_, HashMap<String, String>> {
162        self.merged_config.read().await
163    }
164
165    /// Waits for the config to change and returns a reference to the merged config behind a lock
166    /// guard. This guard must be dropped when you are no longer consuming the config.
167    ///
168    /// Please note that this requires a mutable borrow in order to manage underlying notification
169    /// acknowledgement.
170    pub async fn changed(
171        &mut self,
172    ) -> anyhow::Result<RwLockReadGuard<'_, HashMap<String, String>>> {
173        // NOTE(thomastaylor312): We use a watch channel here because we want everything to get
174        // notified individually (including clones) if config changes. Notify doesn't quite work
175        // because we have to have a permit existing when we create otherwise the notify_watchers
176        // won't actually get picked up (that only happens with notify_one).
177        if let Err(e) = self.changed_receiver.changed().await {
178            // If we get here, it likely means that a whole bunch of stuff has failed above it.
179            // Might be worth changing this to a panic
180            error!(error = %e, "Config changed receiver errored, this means that the config sender has dropped and the whole bundle has failed");
181            bail!("failed to read receiver: {e}");
182        }
183        Ok(self.merged_config.read().await)
184    }
185
186    /// Returns a reference to the ordered list of config names handled by this bundle
187    #[must_use]
188    pub fn config_names(&self) -> &Vec<String> {
189        &self.config_names
190    }
191}
192
193/// A struct used for generating a config bundle given a list of named configs
194#[derive(Clone)]
195pub struct BundleGenerator {
196    store: Arc<dyn ConfigManager>,
197    watch_cache: WatchCache,
198}
199
200impl BundleGenerator {
201    /// Create a new bundle generator
202    #[must_use]
203    pub fn new(store: Arc<dyn ConfigManager>) -> Self {
204        Self {
205            store,
206            watch_cache: Arc::default(),
207        }
208    }
209
210    /// Generate a new config bundle. Will return an error if any of the configs do not exist or if
211    /// there was an error fetching the initial config
212    pub async fn generate(&self, config_names: Vec<String>) -> anyhow::Result<ConfigBundle> {
213        let receivers: Vec<ConfigReceiver> =
214            futures::future::join_all(config_names.into_iter().map(|name| self.get_receiver(name)))
215                .await
216                .into_iter()
217                .collect::<anyhow::Result<_>>()?;
218        Ok(ConfigBundle::new(receivers).await)
219    }
220
221    async fn get_receiver(&self, name: String) -> anyhow::Result<ConfigReceiver> {
222        // First check the cache to see if we already have a receiver for this config
223        if let Some(receiver) = self.watch_cache.read().await.get(&name) {
224            return Ok(ConfigReceiver {
225                name,
226                receiver: receiver.clone(),
227            });
228        }
229
230        let receiver = self
231            .store
232            .watch(&name)
233            .await
234            .context(format!("error setting up watcher for {name}"))?;
235        self.watch_cache
236            .write()
237            .await
238            .insert(name.clone(), receiver.clone());
239        Ok(ConfigReceiver { name, receiver })
240    }
241}
242
243async fn update_merge(
244    merged_config: &RwLock<HashMap<String, String>>,
245    changed_notifier: &Sender<()>,
246    ordered_receivers: &[Receiver<HashMap<String, String>>],
247) {
248    // We get a write lock to start so nothing else can update the merged config while we merge
249    // in the other configs (e.g. when one of the ordered configs is write locked)
250    let mut hashmap = merged_config.write().await;
251    hashmap.clear();
252
253    // NOTE(thomastaylor312): There is a possible optimization here where we could just create a
254    // temporary hashmap of borrowed strings and then after extending everything we could
255    // into_iter it and clone it into the final hashmap. This would avoid extra allocations at
256    // the cost of a few more iterations
257    for recv in ordered_receivers {
258        hashmap.extend(recv.borrow().clone());
259    }
260    // Send a notification that the config has changed
261    changed_notifier.send_replace(());
262}
263
264#[cfg(test)]
265mod tests {
266    use super::*;
267    use std::time::Duration;
268
269    use tokio::sync::watch;
270
271    #[tokio::test]
272    async fn test_config_bundle() {
273        let (foo_tx, foo_rx) =
274            watch::channel(HashMap::from([("foo".to_string(), "bar".to_string())]));
275        let (bar_tx, bar_rx) = watch::channel(HashMap::new());
276        let (baz_tx, baz_rx) = watch::channel(HashMap::new());
277
278        let mut bundle = ConfigBundle::new(vec![
279            ConfigReceiver {
280                name: "foo".to_string(),
281                receiver: foo_rx,
282            },
283            ConfigReceiver {
284                name: "bar".to_string(),
285                receiver: bar_rx,
286            },
287            ConfigReceiver {
288                name: "baz".to_string(),
289                receiver: baz_rx,
290            },
291        ])
292        .await;
293
294        // We should be able to get the initial config before sending anything
295        assert_eq!(
296            *bundle.get_config().await,
297            HashMap::from([("foo".to_string(), "bar".to_string())])
298        );
299
300        // Should also be able to get a value from the changed method immediately
301        let _ = tokio::time::timeout(Duration::from_millis(50), bundle.changed())
302            .await
303            .expect("Should have received a config");
304
305        // Update the bar config to overwrite the foo config
306        bar_tx.send_replace(HashMap::from([("foo".to_string(), "baz".to_string())]));
307        // Wait for the new config to come. This calls the same underlying method as get_config
308        let conf = tokio::time::timeout(Duration::from_millis(50), bundle.changed())
309            .await
310            .expect("conf should have been present")
311            .expect("Should have received a config");
312        assert_eq!(
313            *conf,
314            HashMap::from([("foo".to_string(), "baz".to_string())])
315        );
316        drop(conf);
317
318        // Update the baz config with additional data
319        baz_tx.send_replace(HashMap::from([("star".to_string(), "wars".to_string())]));
320        let conf = tokio::time::timeout(Duration::from_millis(50), bundle.changed())
321            .await
322            .expect("conf should have been present")
323            .expect("Should have received a config");
324        assert_eq!(
325            *conf,
326            HashMap::from([
327                ("foo".to_string(), "baz".to_string()),
328                ("star".to_string(), "wars".to_string())
329            ])
330        );
331        drop(conf);
332
333        // Update foo config with additional data
334        foo_tx.send_replace(HashMap::from([
335            ("starship".to_string(), "troopers".to_string()),
336            ("foo".to_string(), "bar".to_string()),
337        ]));
338        let conf = tokio::time::timeout(Duration::from_millis(50), bundle.changed())
339            .await
340            .expect("conf should have been present")
341            .expect("Should have received a config");
342        // Check that the config merged correctly
343        assert_eq!(
344            *conf,
345            HashMap::from([
346                ("foo".to_string(), "baz".to_string()),
347                ("star".to_string(), "wars".to_string()),
348                ("starship".to_string(), "troopers".to_string())
349            ]),
350        );
351    }
352}