wasmcloud_core/
messaging.rs1use std::collections::HashMap;
4
5use anyhow::{bail, Context as _, Result};
6use serde::{Deserialize, Serialize};
7
8pub const DEFAULT_NATS_URI: &str = "0.0.0.0:4222";
9pub const CONFIG_NATS_SUBSCRIPTION: &str = "subscriptions";
10pub const CONFIG_NATS_CONSUMERS: &str = "consumers";
11pub const CONFIG_NATS_URI: &str = "cluster_uris";
12pub const CONFIG_NATS_CLIENT_JWT: &str = "client_jwt";
13pub const CONFIG_NATS_CLIENT_SEED: &str = "client_seed";
14pub const CONFIG_NATS_TLS_CA: &str = "tls_ca";
15pub const CONFIG_NATS_CUSTOM_INBOX_PREFIX: &str = "custom_inbox_prefix";
16
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
18pub struct ConsumerConfig {
19 pub stream: Box<str>,
20 pub consumer: Box<str>,
21 pub max_messages: Option<usize>,
22 pub max_bytes: Option<usize>,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
28pub struct ConnectionConfig {
29 #[serde(default)]
31 pub subscriptions: Box<[async_nats::Subject]>,
32
33 #[serde(default)]
35 pub consumers: Box<[ConsumerConfig]>,
36
37 #[serde(default)]
39 pub cluster_uris: Box<[Box<str>]>,
40
41 #[serde(default)]
43 pub auth_jwt: Option<Box<str>>,
44
45 #[serde(default)]
47 pub auth_seed: Option<Box<str>>,
48
49 #[serde(default)]
51 pub tls_ca: Option<Box<str>>,
52
53 #[serde(default)]
55 pub tls_ca_file: Option<Box<str>>,
56
57 #[serde(default)]
59 pub ping_interval_sec: Option<u16>,
60
61 #[serde(default)]
63 pub custom_inbox_prefix: Option<Box<str>>,
64}
65
66impl ConnectionConfig {
67 pub fn merge(&self, extra: &ConnectionConfig) -> ConnectionConfig {
70 let mut out = self.clone();
71 if !extra.subscriptions.is_empty() {
72 out.subscriptions.clone_from(&extra.subscriptions);
73 }
74 if !extra.consumers.is_empty() {
75 out.consumers.clone_from(&extra.consumers);
76 }
77 if !extra.cluster_uris.is_empty() {
81 out.cluster_uris.clone_from(&extra.cluster_uris);
82 }
83 if extra.auth_jwt.is_some() {
84 out.auth_jwt.clone_from(&extra.auth_jwt);
85 }
86 if extra.auth_seed.is_some() {
87 out.auth_seed.clone_from(&extra.auth_seed);
88 }
89 if extra.tls_ca.is_some() {
90 out.tls_ca.clone_from(&extra.tls_ca);
91 }
92 if extra.tls_ca_file.is_some() {
93 out.tls_ca_file.clone_from(&extra.tls_ca_file);
94 }
95 if extra.ping_interval_sec.is_some() {
96 out.ping_interval_sec = extra.ping_interval_sec;
97 }
98 if extra.custom_inbox_prefix.is_some() {
99 out.custom_inbox_prefix
100 .clone_from(&extra.custom_inbox_prefix);
101 }
102 out
103 }
104}
105
106impl Default for ConnectionConfig {
107 fn default() -> ConnectionConfig {
108 ConnectionConfig {
109 subscriptions: Box::default(),
110 consumers: Box::default(),
111 cluster_uris: Box::from([DEFAULT_NATS_URI.into()]),
112 auth_jwt: None,
113 auth_seed: None,
114 tls_ca: None,
115 tls_ca_file: None,
116 ping_interval_sec: None,
117 custom_inbox_prefix: None,
118 }
119 }
120}
121
122impl ConnectionConfig {
123 pub fn from_map(values: &HashMap<String, String>) -> Result<ConnectionConfig> {
125 let mut config = ConnectionConfig::default();
126
127 if let Some(sub) = values.get(CONFIG_NATS_SUBSCRIPTION) {
128 config.subscriptions = sub.split(',').map(async_nats::Subject::from).collect();
129 }
130 if let Some(cons) = values.get(CONFIG_NATS_CONSUMERS) {
131 config.consumers = serde_json::from_str(cons).context("failed to parse `consumers`")?;
132 }
133 if let Some(url) = values.get(CONFIG_NATS_URI) {
134 config.cluster_uris = url.split(',').map(Box::from).collect();
135 }
136 if let Some(custom_inbox_prefix) = values.get(CONFIG_NATS_CUSTOM_INBOX_PREFIX) {
137 config.custom_inbox_prefix = Some(custom_inbox_prefix.as_str().into());
138 }
139 if let Some(jwt) = values.get(CONFIG_NATS_CLIENT_JWT) {
140 config.auth_jwt = Some(jwt.as_str().into());
141 }
142 if let Some(seed) = values.get(CONFIG_NATS_CLIENT_SEED) {
143 config.auth_seed = Some(seed.as_str().into());
144 }
145 if let Some(tls_ca) = values.get(CONFIG_NATS_TLS_CA) {
146 config.tls_ca = Some(tls_ca.as_str().into());
147 }
148 if config.auth_jwt.is_some() && config.auth_seed.is_none() {
149 bail!("if you specify jwt, you must also specify a seed");
150 }
151
152 Ok(config)
153 }
154}
155
156pub fn add_tls_ca(
161 tls_ca: &str,
162 opts: async_nats::ConnectOptions,
163) -> anyhow::Result<async_nats::ConnectOptions> {
164 let ca = rustls_pemfile::read_one(&mut tls_ca.as_bytes()).context("failed to read CA")?;
165 let mut roots = async_nats::rustls::RootCertStore::empty();
166 if let Some(rustls_pemfile::Item::X509Certificate(ca)) = ca {
167 roots.add_parsable_certificates([ca]);
168 } else {
169 bail!("tls ca: invalid certificate type, must be a DER encoded PEM file")
170 };
171 let tls_client = async_nats::rustls::ClientConfig::builder()
172 .with_root_certificates(roots)
173 .with_no_client_auth();
174 Ok(opts.tls_client_config(tls_client).require_tls(true))
175}