wasmcloud_core/
messaging.rs

1//! Common configuration settings for both the NATS messaging provider and the builtin messaging
2//! provider for the host. This module requires the `messaging` feature to be enabled
3use std::collections::HashMap;
4
5use anyhow::{bail, Context as _, Result};
6use serde::{Deserialize, Serialize};
7
8pub const DEFAULT_NATS_URI: &str = "0.0.0.0:4222";
9pub const CONFIG_NATS_SUBSCRIPTION: &str = "subscriptions";
10pub const CONFIG_NATS_CONSUMERS: &str = "consumers";
11pub const CONFIG_NATS_URI: &str = "cluster_uris";
12pub const CONFIG_NATS_CLIENT_JWT: &str = "client_jwt";
13pub const CONFIG_NATS_CLIENT_SEED: &str = "client_seed";
14pub const CONFIG_NATS_TLS_CA: &str = "tls_ca";
15pub const CONFIG_NATS_CUSTOM_INBOX_PREFIX: &str = "custom_inbox_prefix";
16
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
18pub struct ConsumerConfig {
19    pub stream: Box<str>,
20    pub consumer: Box<str>,
21    pub max_messages: Option<usize>,
22    pub max_bytes: Option<usize>,
23}
24
25/// Configuration for connecting a nats client.
26/// More options are available if you use the json than variables in the values string map.
27#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
28pub struct ConnectionConfig {
29    /// List of topics to subscribe to
30    #[serde(default)]
31    pub subscriptions: Box<[async_nats::Subject]>,
32
33    /// List of JetStream consumers
34    #[serde(default)]
35    pub consumers: Box<[ConsumerConfig]>,
36
37    /// Cluster(s) to make a subscription on and connect to
38    #[serde(default)]
39    pub cluster_uris: Box<[Box<str>]>,
40
41    /// Auth JWT to use (if necessary)
42    #[serde(default)]
43    pub auth_jwt: Option<Box<str>>,
44
45    /// Auth seed to use (if necessary)
46    #[serde(default)]
47    pub auth_seed: Option<Box<str>>,
48
49    /// TLS Certificate Authority, encoded as a string
50    #[serde(default)]
51    pub tls_ca: Option<Box<str>>,
52
53    /// TLS Certificate Authority, as a path on disk
54    #[serde(default)]
55    pub tls_ca_file: Option<Box<str>>,
56
57    /// Ping interval in seconds
58    #[serde(default)]
59    pub ping_interval_sec: Option<u16>,
60
61    /// Inbox prefix to use (by default
62    #[serde(default)]
63    pub custom_inbox_prefix: Option<Box<str>>,
64}
65
66impl ConnectionConfig {
67    /// Merge a given [`ConnectionConfig`] with another, coalescing fields and overriding
68    /// where necessary
69    pub fn merge(&self, extra: &ConnectionConfig) -> ConnectionConfig {
70        let mut out = self.clone();
71        if !extra.subscriptions.is_empty() {
72            out.subscriptions.clone_from(&extra.subscriptions);
73        }
74        if !extra.consumers.is_empty() {
75            out.consumers.clone_from(&extra.consumers);
76        }
77        // If the default configuration has a URL in it, and then the link definition
78        // also provides a URL, the assumption is to replace/override rather than combine
79        // the two into a potentially incompatible set of URIs
80        if !extra.cluster_uris.is_empty() {
81            out.cluster_uris.clone_from(&extra.cluster_uris);
82        }
83        if extra.auth_jwt.is_some() {
84            out.auth_jwt.clone_from(&extra.auth_jwt);
85        }
86        if extra.auth_seed.is_some() {
87            out.auth_seed.clone_from(&extra.auth_seed);
88        }
89        if extra.tls_ca.is_some() {
90            out.tls_ca.clone_from(&extra.tls_ca);
91        }
92        if extra.tls_ca_file.is_some() {
93            out.tls_ca_file.clone_from(&extra.tls_ca_file);
94        }
95        if extra.ping_interval_sec.is_some() {
96            out.ping_interval_sec = extra.ping_interval_sec;
97        }
98        if extra.custom_inbox_prefix.is_some() {
99            out.custom_inbox_prefix
100                .clone_from(&extra.custom_inbox_prefix);
101        }
102        out
103    }
104}
105
106impl Default for ConnectionConfig {
107    fn default() -> ConnectionConfig {
108        ConnectionConfig {
109            subscriptions: Box::default(),
110            consumers: Box::default(),
111            cluster_uris: Box::from([DEFAULT_NATS_URI.into()]),
112            auth_jwt: None,
113            auth_seed: None,
114            tls_ca: None,
115            tls_ca_file: None,
116            ping_interval_sec: None,
117            custom_inbox_prefix: None,
118        }
119    }
120}
121
122impl ConnectionConfig {
123    /// Construct configuration from the passed hostdata config
124    pub fn from_map(values: &HashMap<String, String>) -> Result<ConnectionConfig> {
125        let mut config = ConnectionConfig::default();
126
127        if let Some(sub) = values.get(CONFIG_NATS_SUBSCRIPTION) {
128            config.subscriptions = sub.split(',').map(async_nats::Subject::from).collect();
129        }
130        if let Some(cons) = values.get(CONFIG_NATS_CONSUMERS) {
131            config.consumers = serde_json::from_str(cons).context("failed to parse `consumers`")?;
132        }
133        if let Some(url) = values.get(CONFIG_NATS_URI) {
134            config.cluster_uris = url.split(',').map(Box::from).collect();
135        }
136        if let Some(custom_inbox_prefix) = values.get(CONFIG_NATS_CUSTOM_INBOX_PREFIX) {
137            config.custom_inbox_prefix = Some(custom_inbox_prefix.as_str().into());
138        }
139        if let Some(jwt) = values.get(CONFIG_NATS_CLIENT_JWT) {
140            config.auth_jwt = Some(jwt.as_str().into());
141        }
142        if let Some(seed) = values.get(CONFIG_NATS_CLIENT_SEED) {
143            config.auth_seed = Some(seed.as_str().into());
144        }
145        if let Some(tls_ca) = values.get(CONFIG_NATS_TLS_CA) {
146            config.tls_ca = Some(tls_ca.as_str().into());
147        }
148        if config.auth_jwt.is_some() && config.auth_seed.is_none() {
149            bail!("if you specify jwt, you must also specify a seed");
150        }
151
152        Ok(config)
153    }
154}
155
156/// Adds the given CA cert to the provided [`async_nats::ConnectOptions`].
157///
158/// This follows the builder pattern in that it returns the same `ConnectOptions` with TLS
159/// configured
160pub fn add_tls_ca(
161    tls_ca: &str,
162    opts: async_nats::ConnectOptions,
163) -> anyhow::Result<async_nats::ConnectOptions> {
164    let ca = rustls_pemfile::read_one(&mut tls_ca.as_bytes()).context("failed to read CA")?;
165    let mut roots = async_nats::rustls::RootCertStore::empty();
166    if let Some(rustls_pemfile::Item::X509Certificate(ca)) = ca {
167        roots.add_parsable_certificates([ca]);
168    } else {
169        bail!("tls ca: invalid certificate type, must be a DER encoded PEM file")
170    };
171    let tls_client = async_nats::rustls::ClientConfig::builder()
172        .with_root_certificates(roots)
173        .with_no_client_auth();
174    Ok(opts.tls_client_config(tls_client).require_tls(true))
175}