wasmcloud_provider_blobstore_nats/
config.rs1use std::collections::HashMap;
2
3use anyhow::{bail, Result};
4use serde::{Deserialize, Serialize};
5
6use tracing::warn;
7use wasmcloud_provider_sdk::core::secrets::SecretValue;
8
9pub(crate) const DEFAULT_NATS_URI: &str = "nats://0.0.0.0:4222";
10
11const CONFIG_NATS_URI: &str = "cluster_uri";
12const CONFIG_NATS_JETSTREAM_DOMAIN: &str = "js_domain";
13const CONFIG_NATS_CLIENT_JWT: &str = "client_jwt";
14const CONFIG_NATS_CLIENT_SEED: &str = "client_seed";
15const CONFIG_NATS_TLS_CA: &str = "tls_ca";
16const CONFIG_NATS_TLS_CA_FILE: &str = "tls_ca_file";
17const CONFIG_NATS_MAX_WRITE_WAIT: &str = "max_write_wait";
18const CONFIG_NATS_STORAGE_MAX_AGE: &str = "max_age";
19const CONFIG_NATS_STORAGE_TYPE: &str = "storage_type";
20const CONFIG_NATS_STORAGE_NUM_REPLICAS: &str = "num_replicas";
21const CONFIG_NATS_STORAGE_COMPRESSION: &str = "compression";
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
25pub struct NatsConnectionConfig {
26 #[serde(default)]
28 pub cluster_uri: Option<String>,
29
30 #[serde(default)]
32 pub js_domain: Option<String>,
33
34 #[serde(default)]
36 pub auth_jwt: Option<String>,
37
38 #[serde(default)]
40 pub auth_seed: Option<String>,
41
42 #[serde(default)]
44 pub tls_ca: Option<String>,
45
46 #[serde(default)]
48 pub tls_ca_file: Option<String>,
49
50 #[serde(default)]
52 pub storage_config: Option<StorageConfig>,
53
54 #[serde(default)]
56 pub max_write_wait: Option<u64>,
57}
58
59#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
69pub struct StorageConfig {
70 #[serde(default)]
72 pub max_age: core::time::Duration,
73 #[serde(default)]
75 pub max_bytes: i64,
76 #[serde(default)]
78 pub storage_type: StorageType,
79 #[serde(default)]
81 pub num_replicas: usize,
82 #[serde(default)]
84 pub compression: bool,
85}
86
87use std::str::FromStr;
88
89#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
91pub struct StorageType(pub async_nats::jetstream::stream::StorageType);
92
93impl Default for StorageType {
95 fn default() -> Self {
96 Self(async_nats::jetstream::stream::StorageType::File)
98 }
99}
100
101impl FromStr for StorageType {
103 type Err = ();
104
105 fn from_str(s: &str) -> Result<Self, Self::Err> {
106 match s.to_lowercase().as_str() {
107 "file" => Ok(StorageType(
108 async_nats::jetstream::stream::StorageType::File,
109 )),
110 "0" => Ok(StorageType(
111 async_nats::jetstream::stream::StorageType::File,
112 )),
113 "memory" => Ok(StorageType(
114 async_nats::jetstream::stream::StorageType::Memory,
115 )),
116 "1" => Ok(StorageType(
117 async_nats::jetstream::stream::StorageType::Memory,
118 )),
119 _ => Err(()),
120 }
121 }
122}
123
124impl Default for StorageConfig {
126 fn default() -> StorageConfig {
127 StorageConfig {
128 max_age: core::time::Duration::from_secs(0), max_bytes: -1, storage_type: StorageType::default(),
131 num_replicas: 1,
132 compression: false,
133 }
134 }
135}
136
137impl NatsConnectionConfig {
138 pub fn merge(&self, extra: &NatsConnectionConfig) -> NatsConnectionConfig {
141 let mut out = self.clone();
142 if extra.cluster_uri.is_some() {
146 out.cluster_uri.clone_from(&extra.cluster_uri);
147 }
148 if extra.js_domain.is_some() {
149 out.js_domain.clone_from(&extra.js_domain);
150 }
151 if extra.auth_jwt.is_some() {
152 out.auth_jwt.clone_from(&extra.auth_jwt);
153 }
154 if extra.auth_seed.is_some() {
155 out.auth_seed.clone_from(&extra.auth_seed);
156 }
157 if extra.tls_ca.is_some() {
158 out.tls_ca.clone_from(&extra.tls_ca);
159 }
160 if extra.tls_ca_file.is_some() {
161 out.tls_ca_file.clone_from(&extra.tls_ca_file);
162 }
163 if extra.storage_config.is_some() {
164 out.storage_config.clone_from(&extra.storage_config);
165 }
166 if extra.max_write_wait.is_some() {
167 out.max_write_wait.clone_from(&extra.max_write_wait);
168 }
169 out
170 }
171}
172
173impl Default for NatsConnectionConfig {
175 fn default() -> NatsConnectionConfig {
176 NatsConnectionConfig {
177 cluster_uri: Some(DEFAULT_NATS_URI.into()),
178 js_domain: None,
179 auth_jwt: None,
180 auth_seed: None,
181 tls_ca: None,
182 tls_ca_file: None,
183 storage_config: None,
184 max_write_wait: Some(30),
185 }
186 }
187}
188
189impl NatsConnectionConfig {
190 pub fn from_link_config(
192 config: &HashMap<String, String>,
193 secrets: &HashMap<String, SecretValue>,
194 ) -> Result<NatsConnectionConfig> {
195 let mut map = HashMap::clone(config);
196
197 if let Some(jwt) = secrets
198 .get(CONFIG_NATS_CLIENT_JWT)
199 .and_then(SecretValue::as_string)
200 .or_else(|| config.get(CONFIG_NATS_CLIENT_JWT).map(String::as_str))
201 {
202 if secrets.get(CONFIG_NATS_CLIENT_JWT).is_none() {
203 warn!("secret value [{CONFIG_NATS_CLIENT_JWT}] is missing, but was found in configuration. Please prefer using secrets for sensitive values.");
204 }
205 map.insert(CONFIG_NATS_CLIENT_JWT.into(), jwt.to_string());
206 }
207
208 if let Some(seed) = secrets
209 .get(CONFIG_NATS_CLIENT_SEED)
210 .and_then(SecretValue::as_string)
211 .or_else(|| config.get(CONFIG_NATS_CLIENT_SEED).map(String::as_str))
212 {
213 if secrets.get(CONFIG_NATS_CLIENT_SEED).is_none() {
214 warn!("secret value [{CONFIG_NATS_CLIENT_SEED}] is missing, but was found configuration. Please prefer using secrets for sensitive values.");
215 }
216 map.insert(CONFIG_NATS_CLIENT_SEED.into(), seed.to_string());
217 }
218
219 if let Some(tls_ca) = secrets
220 .get(CONFIG_NATS_TLS_CA)
221 .and_then(SecretValue::as_string)
222 .or_else(|| config.get(CONFIG_NATS_TLS_CA).map(String::as_str))
223 {
224 if secrets.get(CONFIG_NATS_TLS_CA).is_none() {
225 warn!("secret value [{CONFIG_NATS_TLS_CA}] is missing, but was found configuration. Please prefer using secrets for sensitive values.");
226 }
227 map.insert(CONFIG_NATS_TLS_CA.into(), tls_ca.to_string());
228 } else if let Some(tls_ca_file) = config.get(CONFIG_NATS_TLS_CA_FILE)
229 {
231 map.insert(CONFIG_NATS_TLS_CA_FILE.into(), tls_ca_file.to_string());
232 }
233
234 let mut storage_config = StorageConfig::default();
236 if let Some(max_age) = config.get(CONFIG_NATS_STORAGE_MAX_AGE) {
237 storage_config.max_age = core::time::Duration::from_secs(
238 max_age.parse().expect("max_age must be a number (seconds)"),
239 );
240 }
241 if let Some(storage_type) = config.get(CONFIG_NATS_STORAGE_TYPE) {
242 storage_config.storage_type = storage_type
243 .parse::<StorageType>()
244 .expect("invalid storage_type");
245 }
246 if let Some(num_replicas) = config.get(CONFIG_NATS_STORAGE_NUM_REPLICAS) {
247 storage_config.num_replicas =
248 num_replicas.parse().expect("num_replicas must be a number");
249 }
250 if let Some(compression) = config.get(CONFIG_NATS_STORAGE_COMPRESSION) {
251 storage_config.compression =
252 compression.parse().expect("compression must be a boolean");
253 }
254 map.insert(
255 "storage_config".into(),
256 serde_json::to_string(&storage_config).expect("failed to serialize storage_config"),
257 );
258
259 Self::from_map(&map)
260 }
261
262 pub fn from_map(values: &HashMap<String, String>) -> Result<NatsConnectionConfig> {
264 let mut config = NatsConnectionConfig::default();
265
266 if let Some(uri) = values.get(CONFIG_NATS_URI) {
267 config.cluster_uri = Some(uri.clone());
268 }
269 if let Some(domain) = values.get(CONFIG_NATS_JETSTREAM_DOMAIN) {
270 config.js_domain = Some(domain.clone());
271 }
272 if let Some(jwt) = values.get(CONFIG_NATS_CLIENT_JWT) {
273 config.auth_jwt = Some(jwt.clone());
274 }
275 if let Some(seed) = values.get(CONFIG_NATS_CLIENT_SEED) {
276 config.auth_seed = Some(seed.clone());
277 }
278 if let Some(tls_ca) = values.get(CONFIG_NATS_TLS_CA) {
279 config.tls_ca = Some(tls_ca.clone());
280 } else if let Some(tls_ca_file) = values.get(CONFIG_NATS_TLS_CA_FILE) {
281 config.tls_ca_file = Some(tls_ca_file.clone());
282 }
283 if config.auth_jwt.is_some() && config.auth_seed.is_none() {
284 bail!("if you specify jwt, you must also specify a seed");
285 }
286 if config.storage_config.is_none() {
287 if let Some(storage_config) = values.get("storage_config") {
288 config.storage_config = Some(serde_json::from_str(storage_config)?);
289 }
290 }
291 if let Some(max_write_wait) = values.get(CONFIG_NATS_MAX_WRITE_WAIT) {
292 config.max_write_wait = Some(max_write_wait.clone().parse().unwrap());
293 }
294
295 Ok(config)
296 }
297}
298
299#[cfg(test)]
301mod test {
302 use super::*;
303 use std::collections::HashMap;
304
305 #[test]
307 fn test_default_connection_serialize() {
308 let input = r#"
309{
310 "cluster_uri": "nats://super-cluster",
311 "js_domain": "optional",
312 "auth_jwt": "authy",
313 "auth_seed": "seedy",
314 "max_write_wait": 60
315}
316"#;
317
318 let config: NatsConnectionConfig = serde_json::from_str(input).unwrap();
319 assert_eq!(config.cluster_uri, Some("nats://super-cluster".to_string()));
320 assert_eq!(config.js_domain, Some("optional".to_string()));
321 assert_eq!(config.auth_jwt.unwrap(), "authy");
322 assert_eq!(config.auth_seed.unwrap(), "seedy");
323 assert_eq!(config.max_write_wait.unwrap(), 60);
324 }
325
326 #[test]
328 fn test_connectionconfig_merge() {
329 let ncc1 = NatsConnectionConfig {
330 cluster_uri: Some("old_server".to_string()),
331 max_write_wait: Some(45),
332 ..Default::default()
333 };
334 let ncc2 = NatsConnectionConfig {
335 cluster_uri: Some("server1".to_string()),
336 js_domain: Some("new_domain".to_string()),
337 auth_jwt: Some("jawty".to_string()),
338 max_write_wait: Some(60),
339 ..Default::default()
340 };
341 let ncc3 = ncc1.merge(&ncc2);
342 assert_eq!(ncc3.cluster_uri, ncc2.cluster_uri);
343 assert_eq!(ncc3.js_domain, ncc2.js_domain);
344 assert_eq!(ncc3.auth_jwt, Some("jawty".to_string()));
345 assert_eq!(ncc3.max_write_wait, Some(60));
346 }
347
348 #[test]
350 fn test_merge_with_storage_config() {
351 let ncc1 = NatsConnectionConfig {
352 cluster_uri: Some("server1".to_string()),
353 js_domain: Some("domain1".to_string()),
354 auth_jwt: Some("jwt1".to_string()),
355 storage_config: Some(StorageConfig {
356 storage_type: StorageType(async_nats::jetstream::stream::StorageType::File),
357 compression: true,
358 ..Default::default()
359 }),
360 max_write_wait: Some(45),
361 ..Default::default()
362 };
363 let ncc2 = NatsConnectionConfig {
364 cluster_uri: Some("server1".to_string()),
365 js_domain: Some("new_domain".to_string()),
366 auth_jwt: Some("new_jwt".to_string()),
367 storage_config: Some(StorageConfig {
368 storage_type: StorageType(async_nats::jetstream::stream::StorageType::Memory),
369 compression: false,
370 ..Default::default()
371 }),
372 max_write_wait: Some(60),
373 ..Default::default()
374 };
375 let ncc3 = ncc1.merge(&ncc2);
376 assert_eq!(ncc3.cluster_uri, ncc2.cluster_uri);
377 assert_eq!(ncc3.js_domain, ncc2.js_domain);
378 assert_eq!(ncc3.auth_jwt, ncc2.auth_jwt);
379 assert_eq!(ncc3.max_write_wait, ncc2.max_write_wait);
380 assert_eq!(
381 ncc3.storage_config.clone().unwrap().storage_type,
382 ncc2.storage_config.clone().unwrap().storage_type
383 );
384 assert_eq!(
385 ncc3.storage_config.unwrap().compression,
386 ncc2.storage_config.unwrap().compression
387 );
388 }
389
390 #[test]
392 fn test_from_map_multiple_entries() -> anyhow::Result<()> {
393 let mut config = HashMap::new();
394 config.insert("tls_ca".to_string(), "rootCA".to_string());
395 config.insert("js_domain".to_string(), "optional".to_string());
396 config.insert(CONFIG_NATS_CLIENT_JWT.to_string(), "authy".to_string());
397 config.insert(CONFIG_NATS_CLIENT_SEED.to_string(), "seedy".to_string());
398 config.insert(CONFIG_NATS_MAX_WRITE_WAIT.to_string(), "45".to_string());
399 config.insert(CONFIG_NATS_STORAGE_MAX_AGE.to_string(), "3600".to_string());
400
401 let ncc = NatsConnectionConfig::from_link_config(&config, &HashMap::new())?;
402
403 assert_eq!(ncc.tls_ca, Some("rootCA".to_string()));
404 assert_eq!(ncc.js_domain, Some("optional".to_string()));
405 assert_eq!(ncc.auth_jwt, Some("authy".to_string()));
406 assert_eq!(ncc.auth_seed, Some("seedy".to_string()));
407 assert_eq!(ncc.max_write_wait, Some(45));
408
409 if let Some(storage_config) = &ncc.storage_config {
411 assert_eq!(storage_config.max_age, std::time::Duration::from_secs(3600));
412 }
413
414 Ok(())
415 }
416
417 #[test]
419 fn test_from_map_empty() {
420 let config =
421 NatsConnectionConfig::from_link_config(&HashMap::new(), &HashMap::new()).unwrap();
422 assert_eq!(config.cluster_uri, Some(DEFAULT_NATS_URI.into()));
423 assert_eq!(config.max_write_wait, Some(30)); }
425
426 #[test]
428 fn test_merge_non_default_values() {
429 let ncc1 = NatsConnectionConfig {
430 cluster_uri: Some("old_server".to_string()),
431 js_domain: Some("old_domain".to_string()),
432 auth_jwt: Some("old_jawty".to_string()),
433 ..Default::default()
434 };
435 let ncc2 = NatsConnectionConfig {
436 cluster_uri: Some("server1".to_string()),
437 js_domain: Some("new_domain".to_string()),
438 auth_jwt: Some("new_jawty".to_string()),
439 ..Default::default()
440 };
441 let ncc3 = ncc1.merge(&ncc2);
442 assert_eq!(ncc3.cluster_uri, ncc2.cluster_uri);
443 assert_eq!(ncc3.js_domain, ncc2.js_domain);
444 assert_eq!(ncc3.auth_jwt, ncc2.auth_jwt);
445 }
446}