wasmcloud_provider_blobstore_nats/
config.rs

1use std::collections::HashMap;
2
3use anyhow::{bail, Result};
4use serde::{Deserialize, Serialize};
5
6use tracing::warn;
7use wasmcloud_provider_sdk::core::secrets::SecretValue;
8
9pub(crate) const DEFAULT_NATS_URI: &str = "nats://0.0.0.0:4222";
10
11const CONFIG_NATS_URI: &str = "cluster_uri";
12const CONFIG_NATS_JETSTREAM_DOMAIN: &str = "js_domain";
13const CONFIG_NATS_CLIENT_JWT: &str = "client_jwt";
14const CONFIG_NATS_CLIENT_SEED: &str = "client_seed";
15const CONFIG_NATS_TLS_CA: &str = "tls_ca";
16const CONFIG_NATS_TLS_CA_FILE: &str = "tls_ca_file";
17const CONFIG_NATS_MAX_WRITE_WAIT: &str = "max_write_wait";
18const CONFIG_NATS_STORAGE_MAX_AGE: &str = "max_age";
19const CONFIG_NATS_STORAGE_TYPE: &str = "storage_type";
20const CONFIG_NATS_STORAGE_NUM_REPLICAS: &str = "num_replicas";
21const CONFIG_NATS_STORAGE_COMPRESSION: &str = "compression";
22
23/// Configuration for connecting a NATS client.
24#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
25pub struct NatsConnectionConfig {
26    /// Cluster(s) to connect to
27    #[serde(default)]
28    pub cluster_uri: Option<String>,
29
30    /// JetStream Domain to connect to
31    #[serde(default)]
32    pub js_domain: Option<String>,
33
34    /// Auth JWT to use (if necessary)
35    #[serde(default)]
36    pub auth_jwt: Option<String>,
37
38    /// Auth seed to use (if necessary)
39    #[serde(default)]
40    pub auth_seed: Option<String>,
41
42    /// TLS Certificate Authority, encoded as a string
43    #[serde(default)]
44    pub tls_ca: Option<String>,
45
46    /// TLS Certificate Authority, as a path on disk
47    #[serde(default)]
48    pub tls_ca_file: Option<String>,
49
50    /// Backend storage configuration (opaque service provider specific configuration)
51    #[serde(default)]
52    pub storage_config: Option<StorageConfig>,
53
54    /// Write operation timeout configuration
55    #[serde(default)]
56    pub max_write_wait: Option<u64>,
57}
58
59/// NATS Object-store is backed by its Key-Value store; so an object-store
60/// is identified by its backing bucket name.
61///
62/// Note that when storage config is provided via link configuration
63/// the following keys are expected:
64/// - `max_age` (optional): the maximum age of any object in the container, expressed in seconds; defaults to 10 years
65/// - `storage_type` (optional): the type of storage backend, File (default) and Memory
66/// - `num_replicas` (optional): how many replicas to keep for each object in a cluster, maximum 5; defaults to 1
67/// - `compression` (optional): whether the underlying stream should be compressed; defaults to false
68#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
69pub struct StorageConfig {
70    /// Maximum age of any object in the container, expressed in nanoseconds
71    #[serde(default)]
72    pub max_age: core::time::Duration,
73    /// Maximum size of the object store container, expressed in bytes
74    #[serde(default)]
75    pub max_bytes: i64,
76    /// The type of storage backend, File (default) and Memory
77    #[serde(default)]
78    pub storage_type: StorageType,
79    /// How many replicas to keep for each object in a cluster, maximum 5
80    #[serde(default)]
81    pub num_replicas: usize,
82    /// Whether the underlying stream should be compressed
83    #[serde(default)]
84    pub compression: bool,
85}
86
87use std::str::FromStr;
88
89/// StorageType represents the type of storage backend to use; it maps the configuration value to the proper 'nats_async' type
90#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
91pub struct StorageType(pub async_nats::jetstream::stream::StorageType);
92
93/// Default implementation for [`StorageType`]
94impl Default for StorageType {
95    fn default() -> Self {
96        // Use File as default storage type
97        Self(async_nats::jetstream::stream::StorageType::File)
98    }
99}
100
101/// Implementing FromStr for StorageType to allow custom parsing from a string
102impl FromStr for StorageType {
103    type Err = ();
104
105    fn from_str(s: &str) -> Result<Self, Self::Err> {
106        match s.to_lowercase().as_str() {
107            "file" => Ok(StorageType(
108                async_nats::jetstream::stream::StorageType::File,
109            )),
110            "0" => Ok(StorageType(
111                async_nats::jetstream::stream::StorageType::File,
112            )),
113            "memory" => Ok(StorageType(
114                async_nats::jetstream::stream::StorageType::Memory,
115            )),
116            "1" => Ok(StorageType(
117                async_nats::jetstream::stream::StorageType::Memory,
118            )),
119            _ => Err(()),
120        }
121    }
122}
123
124/// Default implementation for [`StorageConfig`]
125impl Default for StorageConfig {
126    fn default() -> StorageConfig {
127        StorageConfig {
128            max_age: core::time::Duration::from_secs(0), // unlimited
129            max_bytes: -1,                               // unlimited
130            storage_type: StorageType::default(),
131            num_replicas: 1,
132            compression: false,
133        }
134    }
135}
136
137impl NatsConnectionConfig {
138    /// Merge a given [`NatsConnectionConfig`] with another, coalescing fields and overriding
139    /// where necessary
140    pub fn merge(&self, extra: &NatsConnectionConfig) -> NatsConnectionConfig {
141        let mut out = self.clone();
142        // If the default configuration has a URI in it, and then the link definition
143        // also provides a URI, the assumption is to replace/override rather than combine
144        // the two into a potentially incompatible set of URIs
145        if extra.cluster_uri.is_some() {
146            out.cluster_uri.clone_from(&extra.cluster_uri);
147        }
148        if extra.js_domain.is_some() {
149            out.js_domain.clone_from(&extra.js_domain);
150        }
151        if extra.auth_jwt.is_some() {
152            out.auth_jwt.clone_from(&extra.auth_jwt);
153        }
154        if extra.auth_seed.is_some() {
155            out.auth_seed.clone_from(&extra.auth_seed);
156        }
157        if extra.tls_ca.is_some() {
158            out.tls_ca.clone_from(&extra.tls_ca);
159        }
160        if extra.tls_ca_file.is_some() {
161            out.tls_ca_file.clone_from(&extra.tls_ca_file);
162        }
163        if extra.storage_config.is_some() {
164            out.storage_config.clone_from(&extra.storage_config);
165        }
166        if extra.max_write_wait.is_some() {
167            out.max_write_wait.clone_from(&extra.max_write_wait);
168        }
169        out
170    }
171}
172
173/// Default implementation for [`NatsConnectionConfig`]
174impl Default for NatsConnectionConfig {
175    fn default() -> NatsConnectionConfig {
176        NatsConnectionConfig {
177            cluster_uri: Some(DEFAULT_NATS_URI.into()),
178            js_domain: None,
179            auth_jwt: None,
180            auth_seed: None,
181            tls_ca: None,
182            tls_ca_file: None,
183            storage_config: None,
184            max_write_wait: Some(30),
185        }
186    }
187}
188
189impl NatsConnectionConfig {
190    /// Construct configuration from a given [`LinkConfig`] or [`HostData`], utilizing both config and secrets provided
191    pub fn from_link_config(
192        config: &HashMap<String, String>,
193        secrets: &HashMap<String, SecretValue>,
194    ) -> Result<NatsConnectionConfig> {
195        let mut map = HashMap::clone(config);
196
197        if let Some(jwt) = secrets
198            .get(CONFIG_NATS_CLIENT_JWT)
199            .and_then(SecretValue::as_string)
200            .or_else(|| config.get(CONFIG_NATS_CLIENT_JWT).map(String::as_str))
201        {
202            if secrets.get(CONFIG_NATS_CLIENT_JWT).is_none() {
203                warn!("secret value [{CONFIG_NATS_CLIENT_JWT}] is missing, but was found in configuration. Please prefer using secrets for sensitive values.");
204            }
205            map.insert(CONFIG_NATS_CLIENT_JWT.into(), jwt.to_string());
206        }
207
208        if let Some(seed) = secrets
209            .get(CONFIG_NATS_CLIENT_SEED)
210            .and_then(SecretValue::as_string)
211            .or_else(|| config.get(CONFIG_NATS_CLIENT_SEED).map(String::as_str))
212        {
213            if secrets.get(CONFIG_NATS_CLIENT_SEED).is_none() {
214                warn!("secret value [{CONFIG_NATS_CLIENT_SEED}] is missing, but was found configuration. Please prefer using secrets for sensitive values.");
215            }
216            map.insert(CONFIG_NATS_CLIENT_SEED.into(), seed.to_string());
217        }
218
219        if let Some(tls_ca) = secrets
220            .get(CONFIG_NATS_TLS_CA)
221            .and_then(SecretValue::as_string)
222            .or_else(|| config.get(CONFIG_NATS_TLS_CA).map(String::as_str))
223        {
224            if secrets.get(CONFIG_NATS_TLS_CA).is_none() {
225                warn!("secret value [{CONFIG_NATS_TLS_CA}] is missing, but was found configuration. Please prefer using secrets for sensitive values.");
226            }
227            map.insert(CONFIG_NATS_TLS_CA.into(), tls_ca.to_string());
228        } else if let Some(tls_ca_file) = config.get(CONFIG_NATS_TLS_CA_FILE)
229        // .or_else(|| config.get(CONFIG_NATS_TLS_CA_FILE).map(String::as_str))
230        {
231            map.insert(CONFIG_NATS_TLS_CA_FILE.into(), tls_ca_file.to_string());
232        }
233
234        // NATS Object Storage Configuration
235        let mut storage_config = StorageConfig::default();
236        if let Some(max_age) = config.get(CONFIG_NATS_STORAGE_MAX_AGE) {
237            storage_config.max_age = core::time::Duration::from_secs(
238                max_age.parse().expect("max_age must be a number (seconds)"),
239            );
240        }
241        if let Some(storage_type) = config.get(CONFIG_NATS_STORAGE_TYPE) {
242            storage_config.storage_type = storage_type
243                .parse::<StorageType>()
244                .expect("invalid storage_type");
245        }
246        if let Some(num_replicas) = config.get(CONFIG_NATS_STORAGE_NUM_REPLICAS) {
247            storage_config.num_replicas =
248                num_replicas.parse().expect("num_replicas must be a number");
249        }
250        if let Some(compression) = config.get(CONFIG_NATS_STORAGE_COMPRESSION) {
251            storage_config.compression =
252                compression.parse().expect("compression must be a boolean");
253        }
254        map.insert(
255            "storage_config".into(),
256            serde_json::to_string(&storage_config).expect("failed to serialize storage_config"),
257        );
258
259        Self::from_map(&map)
260    }
261
262    /// Construct configuration Struct from the passed hostdata config
263    pub fn from_map(values: &HashMap<String, String>) -> Result<NatsConnectionConfig> {
264        let mut config = NatsConnectionConfig::default();
265
266        if let Some(uri) = values.get(CONFIG_NATS_URI) {
267            config.cluster_uri = Some(uri.clone());
268        }
269        if let Some(domain) = values.get(CONFIG_NATS_JETSTREAM_DOMAIN) {
270            config.js_domain = Some(domain.clone());
271        }
272        if let Some(jwt) = values.get(CONFIG_NATS_CLIENT_JWT) {
273            config.auth_jwt = Some(jwt.clone());
274        }
275        if let Some(seed) = values.get(CONFIG_NATS_CLIENT_SEED) {
276            config.auth_seed = Some(seed.clone());
277        }
278        if let Some(tls_ca) = values.get(CONFIG_NATS_TLS_CA) {
279            config.tls_ca = Some(tls_ca.clone());
280        } else if let Some(tls_ca_file) = values.get(CONFIG_NATS_TLS_CA_FILE) {
281            config.tls_ca_file = Some(tls_ca_file.clone());
282        }
283        if config.auth_jwt.is_some() && config.auth_seed.is_none() {
284            bail!("if you specify jwt, you must also specify a seed");
285        }
286        if config.storage_config.is_none() {
287            if let Some(storage_config) = values.get("storage_config") {
288                config.storage_config = Some(serde_json::from_str(storage_config)?);
289            }
290        }
291        if let Some(max_write_wait) = values.get(CONFIG_NATS_MAX_WRITE_WAIT) {
292            config.max_write_wait = Some(max_write_wait.clone().parse().unwrap());
293        }
294
295        Ok(config)
296    }
297}
298
299// Performing various provider configuration tests
300#[cfg(test)]
301mod test {
302    use super::*;
303    use std::collections::HashMap;
304
305    // Verify that a NatsConnectionConfig could be constructed from partial input
306    #[test]
307    fn test_default_connection_serialize() {
308        let input = r#"
309{
310    "cluster_uri": "nats://super-cluster",
311    "js_domain": "optional",
312    "auth_jwt": "authy",
313    "auth_seed": "seedy",
314    "max_write_wait": 60
315}
316"#;
317
318        let config: NatsConnectionConfig = serde_json::from_str(input).unwrap();
319        assert_eq!(config.cluster_uri, Some("nats://super-cluster".to_string()));
320        assert_eq!(config.js_domain, Some("optional".to_string()));
321        assert_eq!(config.auth_jwt.unwrap(), "authy");
322        assert_eq!(config.auth_seed.unwrap(), "seedy");
323        assert_eq!(config.max_write_wait.unwrap(), 60);
324    }
325
326    // Verify that two NatsConnectionConfigs could be merged
327    #[test]
328    fn test_connectionconfig_merge() {
329        let ncc1 = NatsConnectionConfig {
330            cluster_uri: Some("old_server".to_string()),
331            max_write_wait: Some(45),
332            ..Default::default()
333        };
334        let ncc2 = NatsConnectionConfig {
335            cluster_uri: Some("server1".to_string()),
336            js_domain: Some("new_domain".to_string()),
337            auth_jwt: Some("jawty".to_string()),
338            max_write_wait: Some(60),
339            ..Default::default()
340        };
341        let ncc3 = ncc1.merge(&ncc2);
342        assert_eq!(ncc3.cluster_uri, ncc2.cluster_uri);
343        assert_eq!(ncc3.js_domain, ncc2.js_domain);
344        assert_eq!(ncc3.auth_jwt, Some("jawty".to_string()));
345        assert_eq!(ncc3.max_write_wait, Some(60));
346    }
347
348    // Verify that two configs, which include StorageConfigs could be merged
349    #[test]
350    fn test_merge_with_storage_config() {
351        let ncc1 = NatsConnectionConfig {
352            cluster_uri: Some("server1".to_string()),
353            js_domain: Some("domain1".to_string()),
354            auth_jwt: Some("jwt1".to_string()),
355            storage_config: Some(StorageConfig {
356                storage_type: StorageType(async_nats::jetstream::stream::StorageType::File),
357                compression: true,
358                ..Default::default()
359            }),
360            max_write_wait: Some(45),
361            ..Default::default()
362        };
363        let ncc2 = NatsConnectionConfig {
364            cluster_uri: Some("server1".to_string()),
365            js_domain: Some("new_domain".to_string()),
366            auth_jwt: Some("new_jwt".to_string()),
367            storage_config: Some(StorageConfig {
368                storage_type: StorageType(async_nats::jetstream::stream::StorageType::Memory),
369                compression: false,
370                ..Default::default()
371            }),
372            max_write_wait: Some(60),
373            ..Default::default()
374        };
375        let ncc3 = ncc1.merge(&ncc2);
376        assert_eq!(ncc3.cluster_uri, ncc2.cluster_uri);
377        assert_eq!(ncc3.js_domain, ncc2.js_domain);
378        assert_eq!(ncc3.auth_jwt, ncc2.auth_jwt);
379        assert_eq!(ncc3.max_write_wait, ncc2.max_write_wait);
380        assert_eq!(
381            ncc3.storage_config.clone().unwrap().storage_type,
382            ncc2.storage_config.clone().unwrap().storage_type
383        );
384        assert_eq!(
385            ncc3.storage_config.unwrap().compression,
386            ncc2.storage_config.unwrap().compression
387        );
388    }
389
390    // Verify that a NatsConnectionConfig could be constructed from a HashMap
391    #[test]
392    fn test_from_map_multiple_entries() -> anyhow::Result<()> {
393        let mut config = HashMap::new();
394        config.insert("tls_ca".to_string(), "rootCA".to_string());
395        config.insert("js_domain".to_string(), "optional".to_string());
396        config.insert(CONFIG_NATS_CLIENT_JWT.to_string(), "authy".to_string());
397        config.insert(CONFIG_NATS_CLIENT_SEED.to_string(), "seedy".to_string());
398        config.insert(CONFIG_NATS_MAX_WRITE_WAIT.to_string(), "45".to_string());
399        config.insert(CONFIG_NATS_STORAGE_MAX_AGE.to_string(), "3600".to_string());
400
401        let ncc = NatsConnectionConfig::from_link_config(&config, &HashMap::new())?;
402
403        assert_eq!(ncc.tls_ca, Some("rootCA".to_string()));
404        assert_eq!(ncc.js_domain, Some("optional".to_string()));
405        assert_eq!(ncc.auth_jwt, Some("authy".to_string()));
406        assert_eq!(ncc.auth_seed, Some("seedy".to_string()));
407        assert_eq!(ncc.max_write_wait, Some(45));
408
409        // Validate StorageConfig and max_age
410        if let Some(storage_config) = &ncc.storage_config {
411            assert_eq!(storage_config.max_age, std::time::Duration::from_secs(3600));
412        }
413
414        Ok(())
415    }
416
417    // Verify that a default NatsConnectionConfig will be constructed from an empty HashMap
418    #[test]
419    fn test_from_map_empty() {
420        let config =
421            NatsConnectionConfig::from_link_config(&HashMap::new(), &HashMap::new()).unwrap();
422        assert_eq!(config.cluster_uri, Some(DEFAULT_NATS_URI.into()));
423        assert_eq!(config.max_write_wait, Some(30)); // Default value
424    }
425
426    // Verify that the NatsConnectionConfig's merge function prioritizes the new values over the old ones
427    #[test]
428    fn test_merge_non_default_values() {
429        let ncc1 = NatsConnectionConfig {
430            cluster_uri: Some("old_server".to_string()),
431            js_domain: Some("old_domain".to_string()),
432            auth_jwt: Some("old_jawty".to_string()),
433            ..Default::default()
434        };
435        let ncc2 = NatsConnectionConfig {
436            cluster_uri: Some("server1".to_string()),
437            js_domain: Some("new_domain".to_string()),
438            auth_jwt: Some("new_jawty".to_string()),
439            ..Default::default()
440        };
441        let ncc3 = ncc1.merge(&ncc2);
442        assert_eq!(ncc3.cluster_uri, ncc2.cluster_uri);
443        assert_eq!(ncc3.js_domain, ncc2.js_domain);
444        assert_eq!(ncc3.auth_jwt, ncc2.auth_jwt);
445    }
446}