1use core::net::SocketAddr;
2
3use std::collections::{HashMap, HashSet};
4use std::env;
5use std::path::PathBuf;
6use std::sync::{Arc, LazyLock};
7use std::time::Duration;
8
9use anyhow::{bail, Context};
10use clap::{ArgAction, Parser};
11use nkeys::KeyPair;
12use regex::Regex;
13use tokio::time::{timeout, timeout_at};
14use tokio::{select, signal};
15use tracing::{warn, Level as TracingLogLevel};
16use tracing_subscriber::util::SubscriberInitExt as _;
17use url::Url;
18use wasmcloud_core::logging::Level as WasmcloudLogLevel;
19use wasmcloud_core::{OtelConfig, OtelProtocol};
20use wasmcloud_host::nats::builder::NatsHostBuilder;
21use wasmcloud_host::oci::Config as OciConfig;
22use wasmcloud_host::workload_identity::WorkloadIdentityConfig;
23use wasmcloud_host::WasmbusHostConfig;
24use wasmcloud_host::{nats::connect_nats, wasmbus::Features};
25use wasmcloud_tracing::configure_observability;
26
27#[derive(Debug, Parser)]
28#[allow(clippy::struct_excessive_bools)]
29#[clap(name = "wasmcloud")]
30#[command(version, about, long_about = None)]
31struct Args {
32 #[clap(long = "trace-level", default_value_t = TracingLogLevel::INFO, env = "WASMCLOUD_TRACE_LEVEL")]
34 pub trace_level: TracingLogLevel,
35 #[clap(long = "log-level", alias = "structured-log-level", default_value_t = TracingLogLevel::INFO, env = "WASMCLOUD_LOG_LEVEL")]
37 pub log_level: TracingLogLevel,
38 #[clap(
40 long = "nats-host",
41 default_value = "127.0.0.1",
42 env = "WASMCLOUD_NATS_HOST"
43 )]
44 nats_host: String,
45 #[clap(
47 long = "nats-port",
48 default_value_t = 4222,
49 env = "WASMCLOUD_NATS_PORT"
50 )]
51 nats_port: u16,
52 #[clap(
54 long = "nats-jwt",
55 env = "WASMCLOUD_NATS_JWT",
56 requires = "nats_seed",
57 conflicts_with = "nats_creds"
58 )]
59 nats_jwt: Option<String>,
60 #[clap(
62 long = "nats-seed",
63 env = "WASMCLOUD_NATS_SEED",
64 requires = "nats_jwt",
65 conflicts_with = "nats_creds"
66 )]
67 nats_seed: Option<String>,
68 #[clap(long = "nats-creds", env = "WASMCLOUD_NATS_CREDS", conflicts_with_all = ["nats_jwt", "nats_seed"])]
70 nats_creds: Option<PathBuf>,
71 #[clap(
73 short = 'x',
74 long = "lattice",
75 default_value = "default",
76 env = "WASMCLOUD_LATTICE"
77 )]
78 lattice: String,
79 #[clap(long = "host-seed", env = "WASMCLOUD_HOST_SEED")]
81 host_seed: Option<String>,
82 #[clap(long = "provider-shutdown-delay-ms", alias = "provider-shutdown-delay", default_value = "300", env = "WASMCLOUD_PROV_SHUTDOWN_DELAY_MS", value_parser = parse_duration_millis)]
84 provider_shutdown_delay: Duration,
85 #[clap(long = "allow-latest", env = "WASMCLOUD_OCI_ALLOW_LATEST")]
87 allow_latest: bool,
88 #[clap(
90 long = "allowed-insecure",
91 env = "WASMCLOUD_OCI_ALLOWED_INSECURE",
92 value_delimiter = ','
93 )]
94 allowed_insecure: Vec<String>,
95 #[clap(
97 long = "js-domain",
98 alias = "wasmcloud-js-domain",
99 env = "WASMCLOUD_JS_DOMAIN"
100 )]
101 js_domain: Option<String>,
102 #[clap(long = "config-service-enabled", env = "WASMCLOUD_CONFIG_SERVICE")]
104 config_service_enabled: bool,
105 #[clap(
107 long = "allow-file-load",
108 default_value_t = false,
109 env = "WASMCLOUD_ALLOW_FILE_LOAD"
110 )]
111 allow_file_load: bool,
112 #[clap(
114 long = "enable-structured-logging",
115 env = "WASMCLOUD_STRUCTURED_LOGGING_ENABLED"
116 )]
117 enable_structured_logging: bool,
118 #[clap(short = 'l', long = "label")]
120 label: Option<Vec<String>>,
121
122 #[clap(long = "ctl-host", env = "WASMCLOUD_CTL_HOST", hide = true)]
124 ctl_host: Option<String>,
125 #[clap(long = "ctl-port", env = "WASMCLOUD_CTL_PORT", hide = true)]
127 ctl_port: Option<u16>,
128 #[clap(
130 long = "ctl-jwt",
131 env = "WASMCLOUD_CTL_JWT",
132 requires = "ctl_seed",
133 hide = true,
134 conflicts_with = "ctl_creds"
135 )]
136 ctl_jwt: Option<String>,
137 #[clap(
139 long = "ctl-seed",
140 env = "WASMCLOUD_CTL_SEED",
141 requires = "ctl_jwt",
142 hide = true,
143 conflicts_with = "ctl_creds"
144 )]
145 ctl_seed: Option<String>,
146 #[clap(long = "ctl-creds", env = "WASMCLOUD_CTL_CREDS", hide = true, conflicts_with_all = ["ctl_jwt", "ctl_seed"])]
148 ctl_creds: Option<PathBuf>,
149 #[clap(long = "ctl-tls", env = "WASMCLOUD_CTL_TLS", hide = true)]
151 ctl_tls: bool,
152 #[clap(
154 long = "ctl-topic-prefix",
155 env = "WASMCLOUD_CTL_TOPIC_PREFIX",
156 default_value = "wasmbus.ctl",
157 hide = true
158 )]
159 ctl_topic_prefix: String,
160
161 #[clap(long = "rpc-host", env = "WASMCLOUD_RPC_HOST", hide = true)]
163 rpc_host: Option<String>,
164 #[clap(long = "rpc-port", env = "WASMCLOUD_RPC_PORT", hide = true)]
166 rpc_port: Option<u16>,
167 #[clap(
169 long = "rpc-jwt",
170 env = "WASMCLOUD_RPC_JWT",
171 requires = "rpc_seed",
172 hide = true,
173 conflicts_with = "rpc_creds"
174 )]
175 rpc_jwt: Option<String>,
176 #[clap(
178 long = "rpc-seed",
179 env = "WASMCLOUD_RPC_SEED",
180 requires = "rpc_jwt",
181 hide = true,
182 conflicts_with = "rpc_creds"
183 )]
184 rpc_seed: Option<String>,
185 #[clap(long = "rpc-creds", env = "WASMCLOUD_RPC_CREDS", hide = true, conflicts_with_all = ["rpc_jwt", "rpc_seed"])]
187 rpc_creds: Option<PathBuf>,
188 #[clap(long = "rpc-timeout-ms", default_value = "2000", env = "WASMCLOUD_RPC_TIMEOUT_MS", value_parser = parse_duration_millis, hide = true)]
190 rpc_timeout_ms: Duration,
191 #[clap(long = "rpc-tls", env = "WASMCLOUD_RPC_TLS", hide = true)]
193 rpc_tls: bool,
194
195 #[clap(long = "policy-topic", env = "WASMCLOUD_POLICY_TOPIC")]
197 policy_topic: Option<String>,
198 #[clap(
200 long = "policy-changes-topic",
201 env = "WASMCLOUD_POLICY_CHANGES_TOPIC",
202 requires = "policy_topic"
203 )]
204 policy_changes_topic: Option<String>,
205 #[clap(long = "max-execution-time-ms", default_value = "600000", env = "WASMCLOUD_MAX_EXECUTION_TIME_MS", value_parser = parse_duration_millis)]
207 max_execution_time: Duration,
208 #[clap(long = "max-linear-memory-bytes", default_value_t = 256 * 1024 * 1024, env = "WASMCLOUD_MAX_LINEAR_MEMORY")]
210 max_linear_memory: u32,
211 #[clap(long = "max-component-size-bytes", default_value_t = 50 * 1024 * 1024, env = "WASMCLOUD_MAX_COMPONENT_SIZE")]
213 max_component_size: u64,
214 #[clap(
216 long = "max-components",
217 default_value_t = 10_000,
218 env = "WASMCLOUD_MAX_COMPONENTS"
219 )]
220 max_components: u32,
221
222 #[clap(
224 long = "max-core-instances-per-component",
225 default_value_t = 30,
226 env = "WASMCLOUD_MAX_CORE_INSTANCES_PER_COMPONENT"
227 )]
228 max_core_instances_per_component: u32,
229
230 #[clap(
232 long = "policy-timeout-ms",
233 env = "WASMCLOUD_POLICY_TIMEOUT",
234 requires = "policy_topic",
235 value_parser = parse_duration_millis,
236 )]
237 policy_timeout_ms: Option<Duration>,
238
239 #[clap(long = "secrets-topic", env = "WASMCLOUD_SECRETS_TOPIC")]
241 secrets_topic_prefix: Option<String>,
242
243 #[clap(
245 long = "oci-registry",
246 env = "WASMCLOUD_OCI_REGISTRY",
247 requires = "oci_user",
248 requires = "oci_password"
249 )]
250 oci_registry: Option<String>,
251 #[clap(
253 long = "oci-user",
254 env = "WASMCLOUD_OCI_REGISTRY_USER",
255 requires = "oci_registry",
256 requires = "oci_password"
257 )]
258 oci_user: Option<String>,
259 #[clap(
261 long = "oci-password",
262 env = "WASMCLOUD_OCI_REGISTRY_PASSWORD",
263 requires = "oci_registry",
264 requires = "oci_user"
265 )]
266 oci_password: Option<String>,
267
268 #[clap(
270 long = "enable-observability",
271 env = "WASMCLOUD_OBSERVABILITY_ENABLED",
272 conflicts_with_all = ["enable_traces", "enable_metrics", "enable_logs"]
273 )]
274 enable_observability: bool,
275
276 #[clap(long = "enable-traces", env = "WASMCLOUD_TRACES_ENABLED", hide = true)]
278 enable_traces: Option<bool>,
279
280 #[clap(
282 long = "enable-metrics",
283 env = "WASMCLOUD_METRICS_ENABLED",
284 hide = true
285 )]
286 enable_metrics: Option<bool>,
287
288 #[clap(long = "enable-logs", env = "WASMCLOUD_LOGS_ENABLED", hide = true)]
290 enable_logs: Option<bool>,
291
292 #[clap(
298 long = "override-observability-endpoint",
299 env = "OTEL_EXPORTER_OTLP_ENDPOINT"
300 )]
301 observability_endpoint: Option<String>,
302
303 #[clap(
309 long = "override-traces-endpoint",
310 env = "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT",
311 hide = true
312 )]
313 traces_endpoint: Option<String>,
314
315 #[clap(
321 long = "override-metrics-endpoint",
322 env = "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT",
323 hide = true
324 )]
325 metrics_endpoint: Option<String>,
326
327 #[clap(
333 long = "override-logs-endpoint",
334 env = "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT",
335 hide = true
336 )]
337 logs_endpoint: Option<String>,
338
339 #[clap(
341 long = "observability-protocol",
342 env = "WASMCLOUD_OBSERVABILITY_PROTOCOL",
343 hide = true
344 )]
345 observability_protocol: Option<OtelProtocol>,
346
347 #[clap(long = "flame-graph", env = "WASMCLOUD_FLAME_GRAPH")]
349 flame_graph: Option<String>,
350
351 #[arg(
353 long = "tls-ca-path",
354 env = "WASMCLOUD_TLS_CA_PATH",
355 value_delimiter = ','
356 )]
357 pub tls_ca_paths: Option<Vec<PathBuf>>,
358
359 #[arg(long = "heartbeat-interval-seconds", env = "WASMCLOUD_HEARTBEAT_INTERVAL", value_parser = parse_duration_secs, hide = true)]
361 heartbeat_interval: Option<Duration>,
362
363 #[arg(
365 long = "feature",
366 env = "WASMCLOUD_EXPERIMENTAL_FEATURES",
367 value_delimiter = ',',
368 hide = true
369 )]
370 experimental_features: Vec<Features>,
371
372 #[clap(
373 long = "help-markdown",
374 action=ArgAction::SetTrue,
375 conflicts_with = "help",
376 hide = true
377 )]
378 help_markdown: bool,
379
380 #[clap(long = "http-admin", env = "WASMCLOUD_HTTP_ADMIN")]
381 http_admin: Option<SocketAddr>,
383
384 #[clap(
385 long = "enable-component-auction",
386 env = "WASMCLOUD_COMPONENT_AUCTION_ENABLED"
387 )]
388 enable_component_auction: Option<bool>,
390
391 #[clap(
392 long = "enable-provider-auction",
393 env = "WASMCLOUD_PROVIDER_AUCTION_ENABLED"
394 )]
395 enable_provider_auction: Option<bool>,
397}
398
399const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
400
401#[tokio::main]
402#[allow(clippy::too_many_lines)]
403async fn main() -> anyhow::Result<()> {
404 let args: Args = Args::parse();
405
406 if args.help_markdown {
408 clap_markdown::print_help_markdown::<Args>();
409 std::process::exit(0);
410 }
411
412 if let Some(tls_ca_paths) = args.tls_ca_paths.clone() {
413 ensure_certs_for_paths(tls_ca_paths)?;
414 }
415
416 let trace_level = WasmcloudLogLevel::from(args.trace_level);
417 let otel_config = OtelConfig {
418 enable_observability: args.enable_observability,
419 enable_traces: args.enable_traces,
420 enable_metrics: args.enable_metrics,
421 enable_logs: args.enable_logs,
422 observability_endpoint: args.observability_endpoint,
423 traces_endpoint: args.traces_endpoint,
424 metrics_endpoint: args.metrics_endpoint,
425 logs_endpoint: args.logs_endpoint,
426 protocol: args.observability_protocol.unwrap_or_default(),
427 additional_ca_paths: args.tls_ca_paths.clone().unwrap_or_default(),
428 trace_level,
429 ..Default::default()
430 };
431 let log_level = WasmcloudLogLevel::from(args.log_level);
432
433 let _guard = match configure_observability(
434 "wasmcloud-host",
435 &otel_config,
436 args.enable_structured_logging,
437 args.flame_graph,
438 Some(&log_level),
439 Some(&otel_config.trace_level),
440 ) {
441 Ok((dispatch, guard)) => {
442 dispatch
443 .try_init()
444 .context("failed to init observability for host")?;
445
446 tracing::debug!(
448 ?otel_config,
449 traces_endpoint = %otel_config.traces_endpoint(),
450 metrics_endpoint = %otel_config.metrics_endpoint(),
451 logs_endpoint = %otel_config.logs_endpoint(),
452 protocol = ?otel_config.protocol,
453 "combined OpenTelemetry configuration (cli > env > defaults)"
454 );
455
456 Some(guard)
457 }
458 Err(e) => {
459 eprintln!("Failed to configure observability: {e:?}");
460 None
461 }
462 };
463
464 let ctl_nats_url = Url::parse(&format!(
465 "nats://{}:{}",
466 args.ctl_host.unwrap_or_else(|| args.nats_host.clone()),
467 args.ctl_port.unwrap_or(args.nats_port)
468 ))
469 .context("failed to construct a valid `ctl_nats_url` using `ctl-host` and `ctl-port`")?;
470 let rpc_nats_url = Url::parse(&format!(
471 "nats://{}:{}",
472 args.rpc_host.unwrap_or_else(|| args.nats_host.clone()),
473 args.rpc_port.unwrap_or(args.nats_port)
474 ))
475 .context("failed to construct a valid `rpc_nats_url` using `rpc-host` and `rpc-port`")?;
476
477 let host_key = args
478 .host_seed
479 .as_deref()
480 .map(KeyPair::from_seed)
481 .transpose()
482 .context("failed to construct host key pair from seed")?
483 .map(Arc::new)
484 .unwrap_or_else(|| Arc::new(KeyPair::new_server()));
485 let (nats_jwt, nats_key) =
486 parse_nats_credentials(args.nats_creds, args.nats_jwt, args.nats_seed)
487 .await
488 .context("failed to parse NATS credentials from provided arguments")?;
489 let (ctl_jwt, ctl_key) = parse_nats_credentials(args.ctl_creds, args.ctl_jwt, args.ctl_seed)
490 .await
491 .context("failed to parse control interface credentials from provided arguments")?;
492 let (rpc_jwt, rpc_key) = parse_nats_credentials(args.rpc_creds, args.rpc_jwt, args.rpc_seed)
493 .await
494 .context("failed to parse RPC credentials from provided arguments")?;
495 let oci_opts = OciConfig {
496 additional_ca_paths: args.tls_ca_paths.unwrap_or_default(),
497 allow_latest: args.allow_latest,
498 allowed_insecure: args.allowed_insecure,
499 oci_registry: args.oci_registry,
500 oci_user: args.oci_user,
501 oci_password: args.oci_password,
502 };
503
504 let mut labels = args
505 .label
506 .unwrap_or_default()
507 .iter()
508 .map(|labelpair| parse_label(labelpair))
509 .collect::<anyhow::Result<HashMap<String, String>, anyhow::Error>>()
510 .context("failed to parse labels")?;
511 let labels_from_args: HashSet<String> = labels.keys().cloned().collect();
512 labels.extend(env::vars().filter_map(|(key, value)| {
513 let key = if key.starts_with("WASMCLOUD_LABEL_") {
514 key.strip_prefix("WASMCLOUD_LABEL_")?.to_string()
515 } else {
516 return None;
517 };
518 if labels_from_args.contains(&key) {
519 warn!(
520 ?key,
521 "label provided via args will override label set via environment variable"
522 );
523 return None;
524 }
525 Some((key, value))
526 }));
527 if let Some(secrets_topic) = args.secrets_topic_prefix.as_deref() {
528 anyhow::ensure!(
529 validate_nats_subject(secrets_topic).is_ok(),
530 "Invalid secrets topic"
531 );
532 }
533
534 let experimental_features: Features = args.experimental_features.into_iter().sum();
536 let workload_identity_config = if experimental_features.workload_identity_auth_enabled() {
537 Some(WorkloadIdentityConfig::from_env()?)
538 } else {
539 None
540 };
541 let ctl_nats = connect_nats(
542 ctl_nats_url.as_str(),
543 ctl_jwt.or_else(|| nats_jwt.clone()).as_ref(),
544 ctl_key.or_else(|| nats_key.clone()),
545 args.ctl_tls,
546 None,
547 workload_identity_config.clone(),
548 )
549 .await
550 .context("failed to establish NATS control connection")?;
551
552 let builder = NatsHostBuilder::new(
553 ctl_nats,
554 Some(args.ctl_topic_prefix),
555 args.lattice.clone(),
556 args.js_domain.clone(),
557 Some(oci_opts.clone()),
558 labels.clone().into_iter().collect(),
559 args.config_service_enabled,
560 args.enable_component_auction.unwrap_or(true),
561 args.enable_provider_auction.unwrap_or(true),
562 )
563 .await?
564 .with_event_publisher(host_key.public_key());
565
566 let builder = if let Some(policy_topic) = args.policy_topic.as_deref() {
567 anyhow::ensure!(
568 validate_nats_subject(policy_topic).is_ok(),
569 "Invalid policy topic"
570 );
571 builder
572 .with_policy_manager(
573 host_key.clone(),
574 labels.clone(),
575 args.policy_topic.clone(),
576 args.policy_timeout_ms,
577 args.policy_changes_topic.clone(),
578 )
579 .await?
580 } else {
581 builder
582 };
583
584 let builder = if let Some(secrets_topic) = args.secrets_topic_prefix {
585 anyhow::ensure!(
586 validate_nats_subject(&secrets_topic).is_ok(),
587 "Invalid secrets topic"
588 );
589 builder.with_secrets_manager(secrets_topic)?
590 } else {
591 builder
592 };
593
594 let (host_builder, nats_ctl_server) = builder
595 .build(WasmbusHostConfig {
596 lattice: Arc::from(args.lattice.clone()),
597 host_key: host_key.clone(),
598 config_service_enabled: args.config_service_enabled,
599 js_domain: args.js_domain,
600 labels,
601 provider_shutdown_delay: Some(args.provider_shutdown_delay),
602 oci_opts,
603 rpc_nats_url,
604 rpc_timeout: args.rpc_timeout_ms,
605 rpc_jwt: rpc_jwt.or_else(|| nats_jwt.clone()),
606 rpc_key: rpc_key.or_else(|| nats_key.clone()),
607 rpc_tls: args.rpc_tls,
608 allow_file_load: args.allow_file_load,
609 log_level,
610 enable_structured_logging: args.enable_structured_logging,
611 otel_config,
612 version: env!("CARGO_PKG_VERSION").to_string(),
613 max_execution_time: args.max_execution_time,
614 max_linear_memory: args.max_linear_memory,
615 max_component_size: args.max_component_size,
616 max_components: args.max_components,
617 max_core_instances_per_component: args.max_core_instances_per_component,
618 heartbeat_interval: args.heartbeat_interval,
619 experimental_features,
620 http_admin: args.http_admin,
621 enable_component_auction: args.enable_component_auction.unwrap_or(true),
622 enable_provider_auction: args.enable_provider_auction.unwrap_or(true),
623 })
624 .await?;
625 let (host, shutdown) = host_builder
626 .build()
627 .await
628 .context("failed to initialize host")?;
629
630 let mut ctl = nats_ctl_server.start(host.clone()).await?;
632
633 #[cfg(unix)]
634 let deadline = {
635 let mut terminate = signal::unix::signal(signal::unix::SignalKind::terminate())?;
636 select! {
637 sig = signal::ctrl_c() => {
638 sig.context("failed to wait for Ctrl-C")?;
639 None
640 },
641 _ = terminate.recv() => None,
642 deadline = host.stopped() => deadline?,
643 }
644 };
645 #[cfg(not(unix))]
646 let deadline = select! {
647 sig = signal::ctrl_c() => {
648 sig.context("failed to wait for Ctrl-C")?;
649 None
650 },
651 deadline = host.stopped() => deadline?,
652 };
653 ctl.abort_all();
655 drop(host);
656 if let Some(deadline) = deadline {
657 timeout_at(deadline, shutdown)
658 } else {
659 timeout(DEFAULT_SHUTDOWN_TIMEOUT, shutdown)
660 }
661 .await
662 .context("host shutdown timed out")?
663 .context("failed to shutdown host")?;
664 Ok(())
665}
666
667fn parse_duration_millis(arg: &str) -> anyhow::Result<Duration> {
668 arg.parse()
669 .map(Duration::from_millis)
670 .map_err(|e| anyhow::anyhow!(e))
671}
672
673fn validate_nats_subject(subject: &str) -> anyhow::Result<()> {
678 let re = Regex::new(r"^(?:[A-Za-z0-9_-]+\.)*[A-Za-z0-9_-]+$")
679 .context("Failed to compile NATS subject regex")?;
680 if re.is_match(subject) && !subject.contains('*') && !subject.contains('>') {
681 Ok(())
682 } else {
683 Err(anyhow::anyhow!("Invalid NATS subject: {}", subject))
684 }
685}
686
687fn parse_duration_secs(arg: &str) -> anyhow::Result<Duration> {
688 arg.parse()
689 .map(Duration::from_secs)
690 .map_err(|e| anyhow::anyhow!(e))
691}
692
693fn parse_label(labelpair: &str) -> anyhow::Result<(String, String)> {
694 match labelpair.split('=').collect::<Vec<&str>>()[..] {
695 [k, v] => Ok((k.to_string(), v.to_string())),
696 _ => bail!("invalid label format `{labelpair}`. Expected `key=value`"),
697 }
698}
699
700static JWT_RE: LazyLock<Regex> = LazyLock::new(|| {
701 Regex::new(r"-----BEGIN NATS USER JWT-----\n(?<jwt>.*)\n------END NATS USER JWT------").unwrap()
702});
703
704static SEED_RE: LazyLock<Regex> = LazyLock::new(|| {
705 Regex::new(r"-----BEGIN USER NKEY SEED-----\n(?<seed>.*)\n------END USER NKEY SEED------")
706 .unwrap()
707});
708
709async fn parse_nats_credentials(
710 nats_creds: Option<PathBuf>,
711 nats_jwt: Option<String>,
712 nats_seed: Option<String>,
713) -> anyhow::Result<(Option<String>, Option<Arc<KeyPair>>)> {
714 match (nats_creds, nats_jwt, nats_seed) {
715 (Some(creds), None, None) => {
716 let contents = tokio::fs::read_to_string(creds).await?;
717 Ok(parse_jwt_and_key_from_creds(&contents)?)
718 }
719 (None, Some(jwt), Some(seed)) => {
720 let kp =
721 KeyPair::from_seed(&seed).context("failed to construct NATS key pair from seed")?;
722 Ok((Some(jwt), Some(Arc::new(kp))))
723 }
724 _ => Ok((None, None)),
725 }
726}
727
728fn parse_jwt_and_key_from_creds(
729 contents: &str,
730) -> anyhow::Result<(Option<String>, Option<Arc<KeyPair>>)> {
731 let jwt = JWT_RE
732 .captures(contents)
733 .map(|capture| capture["jwt"].to_owned())
734 .context("failed to parse JWT from NATS credentials")?;
735 let kp = SEED_RE
736 .captures(contents)
737 .and_then(|capture| KeyPair::from_seed(&capture["seed"]).ok())
738 .map(Arc::new)
739 .context("failed to construct key pair from NATS credentials")?;
740 Ok((Some(jwt), Some(kp)))
741}
742
743fn ensure_certs_for_paths(paths: Vec<PathBuf>) -> anyhow::Result<()> {
744 if wasmcloud_core::tls::load_certs_from_paths(&paths)
745 .context("failed to load certificates from the provided path")?
746 .is_empty()
747 {
748 bail!("failed to parse certificates from the provided path");
749 };
750 Ok(())
751}
752
753#[cfg(test)]
754mod tests {
755 use super::*;
756 use std::fs::File;
757 use std::io::Write;
758 use tempfile::tempdir;
759
760 #[test]
761 fn test_nats_subject_validation() {
762 assert!(validate_nats_subject("wasmcloud.secrets").is_ok());
764 assert!(validate_nats_subject("simple").is_ok());
765 assert!(validate_nats_subject("with_underscore").is_ok());
766 assert!(validate_nats_subject("with-hyphen").is_ok());
767 assert!(validate_nats_subject("multiple.topic.levels").is_ok());
768 assert!(validate_nats_subject("123.456").is_ok());
769 assert!(validate_nats_subject("subject.123").is_ok());
770 assert!(validate_nats_subject("").is_err()); assert!(validate_nats_subject(".").is_err()); assert!(validate_nats_subject(".starts.with.dot").is_err()); assert!(validate_nats_subject("ends.with.dot.").is_err()); assert!(validate_nats_subject("double..dot").is_err()); assert!(validate_nats_subject("contains.*.wildcard").is_err()); assert!(validate_nats_subject("contains.>.wildcard").is_err()); assert!(validate_nats_subject("spaced words").is_err()); assert!(validate_nats_subject("invalid!chars").is_err()); assert!(validate_nats_subject("invalid@chars").is_err()); }
782
783 #[tokio::test]
784 async fn test_parse_nats_credentials() {
785 let expected_jwt = "eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJPSTZNRlRQMlpPTlVaSlhTSjVGQ01CUVFGR0xIRUlZTkpXWVJTR0xRQkRHS1JKTVlDQlpBIiwiaWF0IjoxNzI0NzczMDMzLCJpc3MiOiJBQUo3S0s3TkFQQURLM0dUSVNPQ1BFUVk1UVFRMk1MUFdVWlVTWVVNN0pRQVYyNExYSUZGQkU0WCIsIm5hbWUiOiJqdXN0LWZvci10ZXN0aW5nIiwic3ViIjoiVUI2NzJSWk9VQkxaNFZWTjdNVlpPNktHS1JCTDJFSTVLQldYUkhUVlBKUlA3UDY0WEc2NU5YRDciLCJuYXRzIjp7InB1YiI6e30sInN1YiI6e30sInN1YnMiOi0xLCJkYXRhIjotMSwicGF5bG9hZCI6LTEsInR5cGUiOiJ1c2VyIiwidmVyc2lvbiI6Mn19.YgaVafvKp_VLmlQsN26zrhtX8yHMpnxjcUtX51ctd8hh_KqqiSdHtHOlFRapHbpHaiFS_kp9e67L0aqdSn87BA";
786 let expected_seed = "SUAO2CXJCBHGBKIR5TPLXQH6WV2QEEP3YQLLPNVLYVTNSDCZFJMCBHEIN4";
787
788 let creds = format!(
790 r#"
791-----BEGIN NATS USER JWT-----
792{expected_jwt}
793------END NATS USER JWT------
794
795-----BEGIN USER NKEY SEED-----
796{expected_seed}
797------END USER NKEY SEED------
798"#,
799 );
800 let tmpdir = tempdir().expect("should have created a temporary directory");
801 let nats_creds_path = tmpdir.path().join("nats.creds");
802 let mut nats_creds = File::create(nats_creds_path.clone())
803 .expect("should have created nats.creds in temporary directory");
804 let _ = nats_creds.write_all(creds.as_bytes());
805 let _ = nats_creds.flush();
806
807 let (jwt, kp) = parse_nats_credentials(Some(nats_creds_path), None, None)
808 .await
809 .unwrap();
810 assert_eq!(jwt.unwrap(), expected_jwt);
811 assert_eq!(kp.unwrap().seed().unwrap(), expected_seed);
812 drop(nats_creds);
813 tmpdir
814 .close()
815 .expect("should have closed the temporary directory handle");
816
817 let (jwt, kp) = parse_nats_credentials(
819 None,
820 Some(String::from(expected_jwt)),
821 Some(String::from(expected_seed)),
822 )
823 .await
824 .unwrap();
825 assert_eq!(jwt.unwrap(), expected_jwt);
826 assert_eq!(kp.unwrap().seed().unwrap(), expected_seed);
827
828 let expected_jwt = "eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJPSTZNRlRQMlpPTlVaSlhTSjVGQ01CUVFGR0xIRUlZTkpXWVJTR0xRQkRHS1JKTVlDQlpBIiwiaWF0IjoxNzI0NzczMDMzLCJpc3MiOiJBQUo3S0s3TkFQQURLM0dUSVNPQ1BFUVk1UVFRMk1MUFdVWlVTWVVNN0pRQVYyNExYSUZGQkU0WCIsIm5hbWUiOiJqdXN0LWZvci10ZXN0aW5nIiwic3ViIjoiVUI2NzJSWk9VQkxaNFZWTjdNVlpPNktHS1JCTDJFSTVLQldYUkhUVlBKUlA3UDY0WEc2NU5YRDciLCJuYXRzIjp7InB1YiI6e30sInN1YiI6e30sInN1YnMiOi0xLCJkYXRhIjotMSwicGF5bG9hZCI6LTEsInR5cGUiOiJ1c2VyIiwidmVyc2lvbiI6Mn19.YgaVafvKp_VLmlQsN26zrhtX8yHMpnxjcUtX51ctd8hh_KqqiSdHtHOlFRapHbpHaiFS_kp9e67L0aqdSn87BA";
829 let expected_seed = "SUAO2CXJCBHGBKIR5TPLXQH6WV2QEEP3YQLLPNVLYVTNSDCZFJMCBHEIN4";
830 let (jwt, kp) = parse_nats_credentials(
831 None,
832 Some(String::from(expected_jwt)),
833 Some(String::from(expected_seed)),
834 )
835 .await
836 .unwrap();
837 assert_eq!(jwt.unwrap(), expected_jwt);
838 assert_eq!(kp.unwrap().seed().unwrap(), expected_seed);
839
840 let (no_nats_jwt, no_nats_key) = parse_nats_credentials(None, None, None).await.unwrap();
842 assert!(no_nats_jwt.is_none());
843 assert!(no_nats_key.is_none());
844 }
845
846 #[test]
847 fn test_parse_jwt_and_key_from_creds() {
848 let expected_jwt = "eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJPSTZNRlRQMlpPTlVaSlhTSjVGQ01CUVFGR0xIRUlZTkpXWVJTR0xRQkRHS1JKTVlDQlpBIiwiaWF0IjoxNzI0NzczMDMzLCJpc3MiOiJBQUo3S0s3TkFQQURLM0dUSVNPQ1BFUVk1UVFRMk1MUFdVWlVTWVVNN0pRQVYyNExYSUZGQkU0WCIsIm5hbWUiOiJqdXN0LWZvci10ZXN0aW5nIiwic3ViIjoiVUI2NzJSWk9VQkxaNFZWTjdNVlpPNktHS1JCTDJFSTVLQldYUkhUVlBKUlA3UDY0WEc2NU5YRDciLCJuYXRzIjp7InB1YiI6e30sInN1YiI6e30sInN1YnMiOi0xLCJkYXRhIjotMSwicGF5bG9hZCI6LTEsInR5cGUiOiJ1c2VyIiwidmVyc2lvbiI6Mn19.YgaVafvKp_VLmlQsN26zrhtX8yHMpnxjcUtX51ctd8hh_KqqiSdHtHOlFRapHbpHaiFS_kp9e67L0aqdSn87BA";
849 let expected_seed = "SUAO2CXJCBHGBKIR5TPLXQH6WV2QEEP3YQLLPNVLYVTNSDCZFJMCBHEIN4";
850
851 let creds = format!(
852 r#"
853-----BEGIN NATS USER JWT-----
854{expected_jwt}
855------END NATS USER JWT------
856
857************************* IMPORTANT *************************
858NKEY Seed printed below can be used to sign and prove identity.
859NKEYs are sensitive and should be treated as secrets.
860
861-----BEGIN USER NKEY SEED-----
862{expected_seed}
863------END USER NKEY SEED------
864
865*************************************************************
866"#
867 );
868
869 let (jwt, kp) = parse_jwt_and_key_from_creds(&creds)
870 .expect("should have parsed the creds successfully");
871
872 assert!(jwt.is_some());
873 assert!(kp.is_some());
874 assert_eq!(jwt.unwrap(), expected_jwt);
875 assert_eq!(kp.unwrap().seed().unwrap(), expected_seed);
876
877 let creds_missing_jwt = r#"
879-----BEGIN NATS USER JWT-----
880------END NATS USER JWT------
881
882-----BEGIN USER NKEY SEED-----
883SUAO2CXJCBHGBKIR5TPLXQH6WV2QEEP3YQLLPNVLYVTNSDCZFJMCBHEIN4
884------END USER NKEY SEED------
885"#;
886 assert!(parse_jwt_and_key_from_creds(creds_missing_jwt).is_err());
887
888 let creds_missing_seed = r#"
889-----BEGIN NATS USER JWT-----
890eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJPSTZNRlRQMlpPTlVaSlhTSjVGQ01CUVFGR0xIRUlZTkpXWVJTR0xRQkRHS1JKTVlDQlpBIiwiaWF0IjoxNzI0NzczMDMzLCJpc3MiOiJBQUo3S0s3TkFQQURLM0dUSVNPQ1BFUVk1UVFRMk1MUFdVWlVTWVVNN0pRQVYyNExYSUZGQkU0WCIsIm5hbWUiOiJqdXN0LWZvci10ZXN0aW5nIiwic3ViIjoiVUI2NzJSWk9VQkxaNFZWTjdNVlpPNktHS1JCTDJFSTVLQldYUkhUVlBKUlA3UDY0WEc2NU5YRDciLCJuYXRzIjp7InB1YiI6e30sInN1YiI6e30sInN1YnMiOi0xLCJkYXRhIjotMSwicGF5bG9hZCI6LTEsInR5cGUiOiJ1c2VyIiwidmVyc2lvbiI6Mn19.YgaVafvKp_VLmlQsN26zrhtX8yHMpnxjcUtX51ctd8hh_KqqiSdHtHOlFRapHbpHaiFS_kp9e67L0aqdSn87BA
891------END NATS USER JWT------
892
893-----BEGIN USER NKEY SEED-----
894------END USER NKEY SEED------
895 "#;
896 assert!(parse_jwt_and_key_from_creds(creds_missing_seed).is_err());
897
898 let creds_invalid_seed = r#"
899-----BEGIN NATS USER JWT-----
900eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJPSTZNRlRQMlpPTlVaSlhTSjVGQ01CUVFGR0xIRUlZTkpXWVJTR0xRQkRHS1JKTVlDQlpBIiwiaWF0IjoxNzI0NzczMDMzLCJpc3MiOiJBQUo3S0s3TkFQQURLM0dUSVNPQ1BFUVk1UVFRMk1MUFdVWlVTWVVNN0pRQVYyNExYSUZGQkU0WCIsIm5hbWUiOiJqdXN0LWZvci10ZXN0aW5nIiwic3ViIjoiVUI2NzJSWk9VQkxaNFZWTjdNVlpPNktHS1JCTDJFSTVLQldYUkhUVlBKUlA3UDY0WEc2NU5YRDciLCJuYXRzIjp7InB1YiI6e30sInN1YiI6e30sInN1YnMiOi0xLCJkYXRhIjotMSwicGF5bG9hZCI6LTEsInR5cGUiOiJ1c2VyIiwidmVyc2lvbiI6Mn19.YgaVafvKp_VLmlQsN26zrhtX8yHMpnxjcUtX51ctd8hh_KqqiSdHtHOlFRapHbpHaiFS_kp9e67L0aqdSn87BA
901------END NATS USER JWT------
902
903-----BEGIN USER NKEY SEED-----
904SUANOPE
905------END USER NKEY SEED------
906 "#;
907 assert!(parse_jwt_and_key_from_creds(creds_invalid_seed).is_err());
908 }
909}