wasmcloud/
main.rs

1use core::net::SocketAddr;
2
3use std::collections::{HashMap, HashSet};
4use std::env;
5use std::path::PathBuf;
6use std::sync::{Arc, LazyLock};
7use std::time::Duration;
8
9use anyhow::{bail, Context};
10use clap::{ArgAction, Parser};
11use nkeys::KeyPair;
12use regex::Regex;
13use tokio::time::{timeout, timeout_at};
14use tokio::{select, signal};
15use tracing::{warn, Level as TracingLogLevel};
16use tracing_subscriber::util::SubscriberInitExt as _;
17use url::Url;
18use wasmcloud_core::logging::Level as WasmcloudLogLevel;
19use wasmcloud_core::{OtelConfig, OtelProtocol};
20use wasmcloud_host::nats::builder::NatsHostBuilder;
21use wasmcloud_host::oci::Config as OciConfig;
22use wasmcloud_host::workload_identity::WorkloadIdentityConfig;
23use wasmcloud_host::WasmbusHostConfig;
24use wasmcloud_host::{nats::connect_nats, wasmbus::Features};
25use wasmcloud_tracing::configure_observability;
26
27#[derive(Debug, Parser)]
28#[allow(clippy::struct_excessive_bools)]
29#[clap(name = "wasmcloud")]
30#[command(version, about, long_about = None)]
31struct Args {
32    /// Controls the verbosity of traces emitted from the wasmCloud host
33    #[clap(long = "trace-level", default_value_t = TracingLogLevel::INFO, env = "WASMCLOUD_TRACE_LEVEL")]
34    pub trace_level: TracingLogLevel,
35    /// Controls the verbosity of logs from the wasmCloud host
36    #[clap(long = "log-level", alias = "structured-log-level", default_value_t = TracingLogLevel::INFO, env = "WASMCLOUD_LOG_LEVEL")]
37    pub log_level: TracingLogLevel,
38    /// NATS server host to connect to
39    #[clap(
40        long = "nats-host",
41        default_value = "127.0.0.1",
42        env = "WASMCLOUD_NATS_HOST"
43    )]
44    nats_host: String,
45    /// NATS server port to connect to
46    #[clap(
47        long = "nats-port",
48        default_value_t = 4222,
49        env = "WASMCLOUD_NATS_PORT"
50    )]
51    nats_port: u16,
52    /// A user JWT to use to authenticate to NATS
53    #[clap(
54        long = "nats-jwt",
55        env = "WASMCLOUD_NATS_JWT",
56        requires = "nats_seed",
57        conflicts_with = "nats_creds"
58    )]
59    nats_jwt: Option<String>,
60    /// A seed nkey to use to authenticate to NATS
61    #[clap(
62        long = "nats-seed",
63        env = "WASMCLOUD_NATS_SEED",
64        requires = "nats_jwt",
65        conflicts_with = "nats_creds"
66    )]
67    nats_seed: Option<String>,
68    /// A NATS credentials file that contains the JWT and seed for authenticating to NATS
69    #[clap(long = "nats-creds", env = "WASMCLOUD_NATS_CREDS", conflicts_with_all = ["nats_jwt", "nats_seed"])]
70    nats_creds: Option<PathBuf>,
71    /// The lattice the host belongs to
72    #[clap(
73        short = 'x',
74        long = "lattice",
75        default_value = "default",
76        env = "WASMCLOUD_LATTICE"
77    )]
78    lattice: String,
79    /// The seed key (a printable 256-bit Ed25519 private key) used by this host to generate its public key
80    #[clap(long = "host-seed", env = "WASMCLOUD_HOST_SEED")]
81    host_seed: Option<String>,
82    /// Delay, in milliseconds, between requesting a provider shut down and forcibly terminating its process
83    #[clap(long = "provider-shutdown-delay-ms", alias = "provider-shutdown-delay", default_value = "300", env = "WASMCLOUD_PROV_SHUTDOWN_DELAY_MS", value_parser = parse_duration_millis)]
84    provider_shutdown_delay: Duration,
85    /// Determines whether OCI images tagged latest are allowed to be pulled from OCI registries and started
86    #[clap(long = "allow-latest", env = "WASMCLOUD_OCI_ALLOW_LATEST")]
87    allow_latest: bool,
88    /// A comma-separated list of OCI hosts to which insecure (non-TLS) connections are allowed
89    #[clap(
90        long = "allowed-insecure",
91        env = "WASMCLOUD_OCI_ALLOWED_INSECURE",
92        value_delimiter = ','
93    )]
94    allowed_insecure: Vec<String>,
95    /// NATS Jetstream domain name
96    #[clap(
97        long = "js-domain",
98        alias = "wasmcloud-js-domain",
99        env = "WASMCLOUD_JS_DOMAIN"
100    )]
101    js_domain: Option<String>,
102    /// Denotes if a wasmCloud host should issue requests to a config service on startup
103    #[clap(long = "config-service-enabled", env = "WASMCLOUD_CONFIG_SERVICE")]
104    config_service_enabled: bool,
105    /// Denotes if a wasmCloud host should allow starting components from the file system
106    #[clap(
107        long = "allow-file-load",
108        default_value_t = false,
109        env = "WASMCLOUD_ALLOW_FILE_LOAD"
110    )]
111    allow_file_load: bool,
112    /// Enable JSON structured logging from the wasmCloud host
113    #[clap(
114        long = "enable-structured-logging",
115        env = "WASMCLOUD_STRUCTURED_LOGGING_ENABLED"
116    )]
117    enable_structured_logging: bool,
118    /// Start the host with a set of labels, can be specified multiple times. This can alternatively be specified via environment variables prefixed with `WASMCLOUD_LABEL_`, e.g. `WASMCLOUD_LABEL_foo=bar`
119    #[clap(short = 'l', long = "label")]
120    label: Option<Vec<String>>,
121
122    /// An IP address or DNS name to use to connect to NATS for Control Interface (CTL) messages, defaults to the value supplied to --nats-host if not supplied
123    #[clap(long = "ctl-host", env = "WASMCLOUD_CTL_HOST", hide = true)]
124    ctl_host: Option<String>,
125    /// A port to use to connect to NATS for CTL messages, defaults to the value supplied to --nats-port if not supplied
126    #[clap(long = "ctl-port", env = "WASMCLOUD_CTL_PORT", hide = true)]
127    ctl_port: Option<u16>,
128    /// A user JWT to use to authenticate to NATS for CTL messages, defaults to the value supplied to --nats-jwt if not supplied
129    #[clap(
130        long = "ctl-jwt",
131        env = "WASMCLOUD_CTL_JWT",
132        requires = "ctl_seed",
133        hide = true,
134        conflicts_with = "ctl_creds"
135    )]
136    ctl_jwt: Option<String>,
137    /// A seed nkey to use to authenticate to NATS for CTL messages, defaults to the value supplied to --nats-seed if not supplied
138    #[clap(
139        long = "ctl-seed",
140        env = "WASMCLOUD_CTL_SEED",
141        requires = "ctl_jwt",
142        hide = true,
143        conflicts_with = "ctl_creds"
144    )]
145    ctl_seed: Option<String>,
146    /// A NATS credentials file to use to authenticate to NATS for CTL messages, defaults to the value supplied to --nats-creds or --nats-jwt and --nats-seed
147    #[clap(long = "ctl-creds", env = "WASMCLOUD_CTL_CREDS", hide = true, conflicts_with_all = ["ctl_jwt", "ctl_seed"])]
148    ctl_creds: Option<PathBuf>,
149    /// Optional flag to require host communication over TLS with a NATS server for CTL messages
150    #[clap(long = "ctl-tls", env = "WASMCLOUD_CTL_TLS", hide = true)]
151    ctl_tls: bool,
152    /// Advanced: A prefix to use for all CTL topics
153    #[clap(
154        long = "ctl-topic-prefix",
155        env = "WASMCLOUD_CTL_TOPIC_PREFIX",
156        default_value = "wasmbus.ctl",
157        hide = true
158    )]
159    ctl_topic_prefix: String,
160
161    /// An IP address or DNS name to use to connect to NATS for RPC messages, defaults to the value supplied to --nats-host if not supplied
162    #[clap(long = "rpc-host", env = "WASMCLOUD_RPC_HOST", hide = true)]
163    rpc_host: Option<String>,
164    /// A port to use to connect to NATS for RPC messages, defaults to the value supplied to --nats-port if not supplied
165    #[clap(long = "rpc-port", env = "WASMCLOUD_RPC_PORT", hide = true)]
166    rpc_port: Option<u16>,
167    /// A user JWT to use to authenticate to NATS for RPC messages, defaults to the value supplied to --nats-jwt if not supplied
168    #[clap(
169        long = "rpc-jwt",
170        env = "WASMCLOUD_RPC_JWT",
171        requires = "rpc_seed",
172        hide = true,
173        conflicts_with = "rpc_creds"
174    )]
175    rpc_jwt: Option<String>,
176    /// A seed nkey to use to authenticate to NATS for RPC messages, defaults to the value supplied to --nats-seed if not supplied
177    #[clap(
178        long = "rpc-seed",
179        env = "WASMCLOUD_RPC_SEED",
180        requires = "rpc_jwt",
181        hide = true,
182        conflicts_with = "rpc_creds"
183    )]
184    rpc_seed: Option<String>,
185    /// A NATS credentials file to use to authenticate to NATS for RPC messages, defaults to the value supplied to --nats-creds or --nats-jwt and --nats-seed
186    #[clap(long = "rpc-creds", env = "WASMCLOUD_RPC_CREDS", hide = true, conflicts_with_all = ["rpc_jwt", "rpc_seed"])]
187    rpc_creds: Option<PathBuf>,
188    /// Timeout in milliseconds for all RPC calls
189    #[clap(long = "rpc-timeout-ms", default_value = "2000", env = "WASMCLOUD_RPC_TIMEOUT_MS", value_parser = parse_duration_millis, hide = true)]
190    rpc_timeout_ms: Duration,
191    /// Optional flag to require host communication over TLS with a NATS server for RPC messages
192    #[clap(long = "rpc-tls", env = "WASMCLOUD_RPC_TLS", hide = true)]
193    rpc_tls: bool,
194
195    /// If provided, enables policy checks on start actions and component invocations
196    #[clap(long = "policy-topic", env = "WASMCLOUD_POLICY_TOPIC")]
197    policy_topic: Option<String>,
198    /// If provided, allows the host to subscribe to updates on past policy decisions. Requires `policy_topic` to be set.
199    #[clap(
200        long = "policy-changes-topic",
201        env = "WASMCLOUD_POLICY_CHANGES_TOPIC",
202        requires = "policy_topic"
203    )]
204    policy_changes_topic: Option<String>,
205    /// If provided, allows to set a custom Max Execution time for the Host in ms.
206    #[clap(long = "max-execution-time-ms", default_value = "600000", env = "WASMCLOUD_MAX_EXECUTION_TIME_MS", value_parser = parse_duration_millis)]
207    max_execution_time: Duration,
208    /// The maximum amount of memory bytes that a component can allocate (default 256 MiB)
209    #[clap(long = "max-linear-memory-bytes", default_value_t = 256 * 1024 * 1024, env = "WASMCLOUD_MAX_LINEAR_MEMORY")]
210    max_linear_memory: u32,
211    /// The maximum byte size of a component binary that can be loaded (default 50 MiB)
212    #[clap(long = "max-component-size-bytes", default_value_t = 50 * 1024 * 1024, env = "WASMCLOUD_MAX_COMPONENT_SIZE")]
213    max_component_size: u64,
214    /// The maximum number of components that can be run simultaneously
215    #[clap(
216        long = "max-components",
217        default_value_t = 10_000,
218        env = "WASMCLOUD_MAX_COMPONENTS"
219    )]
220    max_components: u32,
221
222    /// The maximum number of core instances per component
223    #[clap(
224        long = "max-core-instances-per-component",
225        default_value_t = 30,
226        env = "WASMCLOUD_MAX_CORE_INSTANCES_PER_COMPONENT"
227    )]
228    max_core_instances_per_component: u32,
229
230    /// If provided, allows setting a custom timeout for requesting policy decisions. Defaults to one second. Requires `policy_topic` to be set.
231    #[clap(
232        long = "policy-timeout-ms",
233        env = "WASMCLOUD_POLICY_TIMEOUT",
234        requires = "policy_topic",
235        value_parser = parse_duration_millis,
236    )]
237    policy_timeout_ms: Option<Duration>,
238
239    /// If provided, enables interfacing with a secrets backend for secret retrieval over the given topic prefix. Must not be empty.
240    #[clap(long = "secrets-topic", env = "WASMCLOUD_SECRETS_TOPIC")]
241    secrets_topic_prefix: Option<String>,
242
243    /// Used in tandem with `oci_user` and `oci_password` to override credentials for a specific OCI registry.
244    #[clap(
245        long = "oci-registry",
246        env = "WASMCLOUD_OCI_REGISTRY",
247        requires = "oci_user",
248        requires = "oci_password"
249    )]
250    oci_registry: Option<String>,
251    /// Username for the OCI registry specified by `oci_registry`.
252    #[clap(
253        long = "oci-user",
254        env = "WASMCLOUD_OCI_REGISTRY_USER",
255        requires = "oci_registry",
256        requires = "oci_password"
257    )]
258    oci_user: Option<String>,
259    /// Password for the OCI registry specified by `oci_registry`.
260    #[clap(
261        long = "oci-password",
262        env = "WASMCLOUD_OCI_REGISTRY_PASSWORD",
263        requires = "oci_registry",
264        requires = "oci_user"
265    )]
266    oci_password: Option<String>,
267
268    /// Determines whether observability should be enabled.
269    #[clap(
270        long = "enable-observability",
271        env = "WASMCLOUD_OBSERVABILITY_ENABLED",
272        conflicts_with_all = ["enable_traces", "enable_metrics", "enable_logs"]
273    )]
274    enable_observability: bool,
275
276    /// Determines whether traces should be enabled.
277    #[clap(long = "enable-traces", env = "WASMCLOUD_TRACES_ENABLED", hide = true)]
278    enable_traces: Option<bool>,
279
280    /// Determines whether metrics should be enabled.
281    #[clap(
282        long = "enable-metrics",
283        env = "WASMCLOUD_METRICS_ENABLED",
284        hide = true
285    )]
286    enable_metrics: Option<bool>,
287
288    /// Determines whether logs should be enabled.
289    #[clap(long = "enable-logs", env = "WASMCLOUD_LOGS_ENABLED", hide = true)]
290    enable_logs: Option<bool>,
291
292    /// Overrides the OpenTelemetry endpoint used for emitting traces, metrics and logs.
293    ///
294    /// If not provided, defaults to:
295    ///   - HTTP: http://127.0.0.1:4318/v1/<signal> (e.g., /v1/traces, /v1/metrics, /v1/logs)
296    ///   - gRPC: http://127.0.0.1:4317
297    #[clap(
298        long = "override-observability-endpoint",
299        env = "OTEL_EXPORTER_OTLP_ENDPOINT"
300    )]
301    observability_endpoint: Option<String>,
302
303    /// Overrides the OpenTelemetry endpoint used for emitting traces.
304    ///
305    /// If not provided, defaults to:
306    ///   - HTTP: http://127.0.0.1:4318/v1/traces
307    ///   - gRPC: http://127.0.0.1:4317
308    #[clap(
309        long = "override-traces-endpoint",
310        env = "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT",
311        hide = true
312    )]
313    traces_endpoint: Option<String>,
314
315    /// Overrides the OpenTelemetry endpoint used for emitting metrics.
316    ///
317    /// If not provided, defaults to:
318    ///   - HTTP: http://127.0.0.1:4318/v1/metrics
319    ///   - gRPC: http://127.0.0.1:4317
320    #[clap(
321        long = "override-metrics-endpoint",
322        env = "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT",
323        hide = true
324    )]
325    metrics_endpoint: Option<String>,
326
327    /// Overrides the OpenTelemetry endpoint used for emitting logs.
328    ///
329    /// If not provided, defaults to:
330    ///   - HTTP: http://127.0.0.1:4318/v1/logs
331    ///   - gRPC: http://127.0.0.1:4317
332    #[clap(
333        long = "override-logs-endpoint",
334        env = "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT",
335        hide = true
336    )]
337    logs_endpoint: Option<String>,
338
339    /// Configures whether grpc or http will be used for exporting the enabled telemetry. This defaults to 'http'.
340    #[clap(
341        long = "observability-protocol",
342        env = "WASMCLOUD_OBSERVABILITY_PROTOCOL",
343        hide = true
344    )]
345    observability_protocol: Option<OtelProtocol>,
346
347    /// Path to generate flame graph at
348    #[clap(long = "flame-graph", env = "WASMCLOUD_FLAME_GRAPH")]
349    flame_graph: Option<String>,
350
351    /// Configures the set of certificate authorities as repeatable set of file paths to load into the OCI and OpenTelemetry clients
352    #[arg(
353        long = "tls-ca-path",
354        env = "WASMCLOUD_TLS_CA_PATH",
355        value_delimiter = ','
356    )]
357    pub tls_ca_paths: Option<Vec<PathBuf>>,
358
359    /// If provided, overrides the default heartbeat interval of every 30 seconds. Provided value is interpreted as seconds.
360    #[arg(long = "heartbeat-interval-seconds", env = "WASMCLOUD_HEARTBEAT_INTERVAL", value_parser = parse_duration_secs, hide = true)]
361    heartbeat_interval: Option<Duration>,
362
363    /// Experimental features to enable in the host. This is a repeatable option.
364    #[arg(
365        long = "feature",
366        env = "WASMCLOUD_EXPERIMENTAL_FEATURES",
367        value_delimiter = ',',
368        hide = true
369    )]
370    experimental_features: Vec<Features>,
371
372    #[clap(
373        long = "help-markdown",
374        action=ArgAction::SetTrue,
375        conflicts_with = "help",
376        hide = true
377    )]
378    help_markdown: bool,
379
380    #[clap(long = "http-admin", env = "WASMCLOUD_HTTP_ADMIN")]
381    /// HTTP administration endpoint address
382    http_admin: Option<SocketAddr>,
383
384    #[clap(
385        long = "enable-component-auction",
386        env = "WASMCLOUD_COMPONENT_AUCTION_ENABLED"
387    )]
388    /// Determines whether component auctions should be enabled (defaults to true)
389    enable_component_auction: Option<bool>,
390
391    #[clap(
392        long = "enable-provider-auction",
393        env = "WASMCLOUD_PROVIDER_AUCTION_ENABLED"
394    )]
395    /// Determines whether capability provider auctions should be enabled (defaults to true)
396    enable_provider_auction: Option<bool>,
397}
398
399const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
400
401#[tokio::main]
402#[allow(clippy::too_many_lines)]
403async fn main() -> anyhow::Result<()> {
404    let args: Args = Args::parse();
405
406    // Implements clap_markdown for markdown generation of command line documentation.`
407    if args.help_markdown {
408        clap_markdown::print_help_markdown::<Args>();
409        std::process::exit(0);
410    }
411
412    if let Some(tls_ca_paths) = args.tls_ca_paths.clone() {
413        ensure_certs_for_paths(tls_ca_paths)?;
414    }
415
416    let trace_level = WasmcloudLogLevel::from(args.trace_level);
417    let otel_config = OtelConfig {
418        enable_observability: args.enable_observability,
419        enable_traces: args.enable_traces,
420        enable_metrics: args.enable_metrics,
421        enable_logs: args.enable_logs,
422        observability_endpoint: args.observability_endpoint,
423        traces_endpoint: args.traces_endpoint,
424        metrics_endpoint: args.metrics_endpoint,
425        logs_endpoint: args.logs_endpoint,
426        protocol: args.observability_protocol.unwrap_or_default(),
427        additional_ca_paths: args.tls_ca_paths.clone().unwrap_or_default(),
428        trace_level,
429        ..Default::default()
430    };
431    let log_level = WasmcloudLogLevel::from(args.log_level);
432
433    let _guard = match configure_observability(
434        "wasmcloud-host",
435        &otel_config,
436        args.enable_structured_logging,
437        args.flame_graph,
438        Some(&log_level),
439        Some(&otel_config.trace_level),
440    ) {
441        Ok((dispatch, guard)) => {
442            dispatch
443                .try_init()
444                .context("failed to init observability for host")?;
445
446            // Now that tracing is initialized, log the effective OTEL configuration
447            tracing::debug!(
448                ?otel_config,
449                traces_endpoint = %otel_config.traces_endpoint(),
450                metrics_endpoint = %otel_config.metrics_endpoint(),
451                logs_endpoint = %otel_config.logs_endpoint(),
452                protocol = ?otel_config.protocol,
453                "combined OpenTelemetry configuration (cli > env > defaults)"
454            );
455
456            Some(guard)
457        }
458        Err(e) => {
459            eprintln!("Failed to configure observability: {e:?}");
460            None
461        }
462    };
463
464    let ctl_nats_url = Url::parse(&format!(
465        "nats://{}:{}",
466        args.ctl_host.unwrap_or_else(|| args.nats_host.clone()),
467        args.ctl_port.unwrap_or(args.nats_port)
468    ))
469    .context("failed to construct a valid `ctl_nats_url` using `ctl-host` and `ctl-port`")?;
470    let rpc_nats_url = Url::parse(&format!(
471        "nats://{}:{}",
472        args.rpc_host.unwrap_or_else(|| args.nats_host.clone()),
473        args.rpc_port.unwrap_or(args.nats_port)
474    ))
475    .context("failed to construct a valid `rpc_nats_url` using `rpc-host` and `rpc-port`")?;
476
477    let host_key = args
478        .host_seed
479        .as_deref()
480        .map(KeyPair::from_seed)
481        .transpose()
482        .context("failed to construct host key pair from seed")?
483        .map(Arc::new)
484        .unwrap_or_else(|| Arc::new(KeyPair::new_server()));
485    let (nats_jwt, nats_key) =
486        parse_nats_credentials(args.nats_creds, args.nats_jwt, args.nats_seed)
487            .await
488            .context("failed to parse NATS credentials from provided arguments")?;
489    let (ctl_jwt, ctl_key) = parse_nats_credentials(args.ctl_creds, args.ctl_jwt, args.ctl_seed)
490        .await
491        .context("failed to parse control interface credentials from provided arguments")?;
492    let (rpc_jwt, rpc_key) = parse_nats_credentials(args.rpc_creds, args.rpc_jwt, args.rpc_seed)
493        .await
494        .context("failed to parse RPC credentials from provided arguments")?;
495    let oci_opts = OciConfig {
496        additional_ca_paths: args.tls_ca_paths.unwrap_or_default(),
497        allow_latest: args.allow_latest,
498        allowed_insecure: args.allowed_insecure,
499        oci_registry: args.oci_registry,
500        oci_user: args.oci_user,
501        oci_password: args.oci_password,
502    };
503
504    let mut labels = args
505        .label
506        .unwrap_or_default()
507        .iter()
508        .map(|labelpair| parse_label(labelpair))
509        .collect::<anyhow::Result<HashMap<String, String>, anyhow::Error>>()
510        .context("failed to parse labels")?;
511    let labels_from_args: HashSet<String> = labels.keys().cloned().collect();
512    labels.extend(env::vars().filter_map(|(key, value)| {
513        let key = if key.starts_with("WASMCLOUD_LABEL_") {
514            key.strip_prefix("WASMCLOUD_LABEL_")?.to_string()
515        } else {
516            return None;
517        };
518        if labels_from_args.contains(&key) {
519            warn!(
520                ?key,
521                "label provided via args will override label set via environment variable"
522            );
523            return None;
524        }
525        Some((key, value))
526    }));
527    if let Some(secrets_topic) = args.secrets_topic_prefix.as_deref() {
528        anyhow::ensure!(
529            validate_nats_subject(secrets_topic).is_ok(),
530            "Invalid secrets topic"
531        );
532    }
533
534    // NOTE(brooksmtownsend): Summing the feature flags "OR"s the multiple flags together.
535    let experimental_features: Features = args.experimental_features.into_iter().sum();
536    let workload_identity_config = if experimental_features.workload_identity_auth_enabled() {
537        Some(WorkloadIdentityConfig::from_env()?)
538    } else {
539        None
540    };
541    let ctl_nats = connect_nats(
542        ctl_nats_url.as_str(),
543        ctl_jwt.or_else(|| nats_jwt.clone()).as_ref(),
544        ctl_key.or_else(|| nats_key.clone()),
545        args.ctl_tls,
546        None,
547        workload_identity_config.clone(),
548    )
549    .await
550    .context("failed to establish NATS control connection")?;
551
552    let builder = NatsHostBuilder::new(
553        ctl_nats,
554        Some(args.ctl_topic_prefix),
555        args.lattice.clone(),
556        args.js_domain.clone(),
557        Some(oci_opts.clone()),
558        labels.clone().into_iter().collect(),
559        args.config_service_enabled,
560        args.enable_component_auction.unwrap_or(true),
561        args.enable_provider_auction.unwrap_or(true),
562    )
563    .await?
564    .with_event_publisher(host_key.public_key());
565
566    let builder = if let Some(policy_topic) = args.policy_topic.as_deref() {
567        anyhow::ensure!(
568            validate_nats_subject(policy_topic).is_ok(),
569            "Invalid policy topic"
570        );
571        builder
572            .with_policy_manager(
573                host_key.clone(),
574                labels.clone(),
575                args.policy_topic.clone(),
576                args.policy_timeout_ms,
577                args.policy_changes_topic.clone(),
578            )
579            .await?
580    } else {
581        builder
582    };
583
584    let builder = if let Some(secrets_topic) = args.secrets_topic_prefix {
585        anyhow::ensure!(
586            validate_nats_subject(&secrets_topic).is_ok(),
587            "Invalid secrets topic"
588        );
589        builder.with_secrets_manager(secrets_topic)?
590    } else {
591        builder
592    };
593
594    let (host_builder, nats_ctl_server) = builder
595        .build(WasmbusHostConfig {
596            lattice: Arc::from(args.lattice.clone()),
597            host_key: host_key.clone(),
598            config_service_enabled: args.config_service_enabled,
599            js_domain: args.js_domain,
600            labels,
601            provider_shutdown_delay: Some(args.provider_shutdown_delay),
602            oci_opts,
603            rpc_nats_url,
604            rpc_timeout: args.rpc_timeout_ms,
605            rpc_jwt: rpc_jwt.or_else(|| nats_jwt.clone()),
606            rpc_key: rpc_key.or_else(|| nats_key.clone()),
607            rpc_tls: args.rpc_tls,
608            allow_file_load: args.allow_file_load,
609            log_level,
610            enable_structured_logging: args.enable_structured_logging,
611            otel_config,
612            version: env!("CARGO_PKG_VERSION").to_string(),
613            max_execution_time: args.max_execution_time,
614            max_linear_memory: args.max_linear_memory,
615            max_component_size: args.max_component_size,
616            max_components: args.max_components,
617            max_core_instances_per_component: args.max_core_instances_per_component,
618            heartbeat_interval: args.heartbeat_interval,
619            experimental_features,
620            http_admin: args.http_admin,
621            enable_component_auction: args.enable_component_auction.unwrap_or(true),
622            enable_provider_auction: args.enable_provider_auction.unwrap_or(true),
623        })
624        .await?;
625    let (host, shutdown) = host_builder
626        .build()
627        .await
628        .context("failed to initialize host")?;
629
630    // Start the control interface server
631    let mut ctl = nats_ctl_server.start(host.clone()).await?;
632
633    #[cfg(unix)]
634    let deadline = {
635        let mut terminate = signal::unix::signal(signal::unix::SignalKind::terminate())?;
636        select! {
637            sig = signal::ctrl_c() => {
638                sig.context("failed to wait for Ctrl-C")?;
639                None
640            },
641            _ = terminate.recv() => None,
642            deadline = host.stopped() => deadline?,
643        }
644    };
645    #[cfg(not(unix))]
646    let deadline = select! {
647        sig = signal::ctrl_c() => {
648            sig.context("failed to wait for Ctrl-C")?;
649            None
650        },
651        deadline = host.stopped() => deadline?,
652    };
653    // TODO(brooksmtownsend): Consider a drain of sorts that can wrap up pending persistent work
654    ctl.abort_all();
655    drop(host);
656    if let Some(deadline) = deadline {
657        timeout_at(deadline, shutdown)
658    } else {
659        timeout(DEFAULT_SHUTDOWN_TIMEOUT, shutdown)
660    }
661    .await
662    .context("host shutdown timed out")?
663    .context("failed to shutdown host")?;
664    Ok(())
665}
666
667fn parse_duration_millis(arg: &str) -> anyhow::Result<Duration> {
668    arg.parse()
669        .map(Duration::from_millis)
670        .map_err(|e| anyhow::anyhow!(e))
671}
672
673/// Validates that a subject string (e.g. secrets-topic and policy-topic) adheres to the rules and conventions
674/// of being a valid NATS subject.
675/// This function is specifically for validating subjects to publish to and not intended to be used for
676/// validating subjects to subscribe to, as those may include wildcard characters.
677fn validate_nats_subject(subject: &str) -> anyhow::Result<()> {
678    let re = Regex::new(r"^(?:[A-Za-z0-9_-]+\.)*[A-Za-z0-9_-]+$")
679        .context("Failed to compile NATS subject regex")?;
680    if re.is_match(subject) && !subject.contains('*') && !subject.contains('>') {
681        Ok(())
682    } else {
683        Err(anyhow::anyhow!("Invalid NATS subject: {}", subject))
684    }
685}
686
687fn parse_duration_secs(arg: &str) -> anyhow::Result<Duration> {
688    arg.parse()
689        .map(Duration::from_secs)
690        .map_err(|e| anyhow::anyhow!(e))
691}
692
693fn parse_label(labelpair: &str) -> anyhow::Result<(String, String)> {
694    match labelpair.split('=').collect::<Vec<&str>>()[..] {
695        [k, v] => Ok((k.to_string(), v.to_string())),
696        _ => bail!("invalid label format `{labelpair}`. Expected `key=value`"),
697    }
698}
699
700static JWT_RE: LazyLock<Regex> = LazyLock::new(|| {
701    Regex::new(r"-----BEGIN NATS USER JWT-----\n(?<jwt>.*)\n------END NATS USER JWT------").unwrap()
702});
703
704static SEED_RE: LazyLock<Regex> = LazyLock::new(|| {
705    Regex::new(r"-----BEGIN USER NKEY SEED-----\n(?<seed>.*)\n------END USER NKEY SEED------")
706        .unwrap()
707});
708
709async fn parse_nats_credentials(
710    nats_creds: Option<PathBuf>,
711    nats_jwt: Option<String>,
712    nats_seed: Option<String>,
713) -> anyhow::Result<(Option<String>, Option<Arc<KeyPair>>)> {
714    match (nats_creds, nats_jwt, nats_seed) {
715        (Some(creds), None, None) => {
716            let contents = tokio::fs::read_to_string(creds).await?;
717            Ok(parse_jwt_and_key_from_creds(&contents)?)
718        }
719        (None, Some(jwt), Some(seed)) => {
720            let kp =
721                KeyPair::from_seed(&seed).context("failed to construct NATS key pair from seed")?;
722            Ok((Some(jwt), Some(Arc::new(kp))))
723        }
724        _ => Ok((None, None)),
725    }
726}
727
728fn parse_jwt_and_key_from_creds(
729    contents: &str,
730) -> anyhow::Result<(Option<String>, Option<Arc<KeyPair>>)> {
731    let jwt = JWT_RE
732        .captures(contents)
733        .map(|capture| capture["jwt"].to_owned())
734        .context("failed to parse JWT from NATS credentials")?;
735    let kp = SEED_RE
736        .captures(contents)
737        .and_then(|capture| KeyPair::from_seed(&capture["seed"]).ok())
738        .map(Arc::new)
739        .context("failed to construct key pair from NATS credentials")?;
740    Ok((Some(jwt), Some(kp)))
741}
742
743fn ensure_certs_for_paths(paths: Vec<PathBuf>) -> anyhow::Result<()> {
744    if wasmcloud_core::tls::load_certs_from_paths(&paths)
745        .context("failed to load certificates from the provided path")?
746        .is_empty()
747    {
748        bail!("failed to parse certificates from the provided path");
749    };
750    Ok(())
751}
752
753#[cfg(test)]
754mod tests {
755    use super::*;
756    use std::fs::File;
757    use std::io::Write;
758    use tempfile::tempdir;
759
760    #[test]
761    fn test_nats_subject_validation() {
762        // Valid subjects
763        assert!(validate_nats_subject("wasmcloud.secrets").is_ok());
764        assert!(validate_nats_subject("simple").is_ok());
765        assert!(validate_nats_subject("with_underscore").is_ok());
766        assert!(validate_nats_subject("with-hyphen").is_ok());
767        assert!(validate_nats_subject("multiple.topic.levels").is_ok());
768        assert!(validate_nats_subject("123.456").is_ok());
769        assert!(validate_nats_subject("subject.123").is_ok());
770        // Invalid subjects
771        assert!(validate_nats_subject("").is_err()); // Empty topic
772        assert!(validate_nats_subject(".").is_err()); // Just a dot
773        assert!(validate_nats_subject(".starts.with.dot").is_err()); // Starts with a dot
774        assert!(validate_nats_subject("ends.with.dot.").is_err()); // Ends with a dot
775        assert!(validate_nats_subject("double..dot").is_err()); // Double dot
776        assert!(validate_nats_subject("contains.*.wildcard").is_err()); // Contains *
777        assert!(validate_nats_subject("contains.>.wildcard").is_err()); // Contains >
778        assert!(validate_nats_subject("spaced words").is_err()); // Contains space
779        assert!(validate_nats_subject("invalid!chars").is_err()); // Contains !
780        assert!(validate_nats_subject("invalid@chars").is_err()); // Contains @
781    }
782
783    #[tokio::test]
784    async fn test_parse_nats_credentials() {
785        let expected_jwt = "eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJPSTZNRlRQMlpPTlVaSlhTSjVGQ01CUVFGR0xIRUlZTkpXWVJTR0xRQkRHS1JKTVlDQlpBIiwiaWF0IjoxNzI0NzczMDMzLCJpc3MiOiJBQUo3S0s3TkFQQURLM0dUSVNPQ1BFUVk1UVFRMk1MUFdVWlVTWVVNN0pRQVYyNExYSUZGQkU0WCIsIm5hbWUiOiJqdXN0LWZvci10ZXN0aW5nIiwic3ViIjoiVUI2NzJSWk9VQkxaNFZWTjdNVlpPNktHS1JCTDJFSTVLQldYUkhUVlBKUlA3UDY0WEc2NU5YRDciLCJuYXRzIjp7InB1YiI6e30sInN1YiI6e30sInN1YnMiOi0xLCJkYXRhIjotMSwicGF5bG9hZCI6LTEsInR5cGUiOiJ1c2VyIiwidmVyc2lvbiI6Mn19.YgaVafvKp_VLmlQsN26zrhtX8yHMpnxjcUtX51ctd8hh_KqqiSdHtHOlFRapHbpHaiFS_kp9e67L0aqdSn87BA";
786        let expected_seed = "SUAO2CXJCBHGBKIR5TPLXQH6WV2QEEP3YQLLPNVLYVTNSDCZFJMCBHEIN4";
787
788        // Test that passing in `--nats-creds` a creds file works
789        let creds = format!(
790            r#"
791-----BEGIN NATS USER JWT-----
792{expected_jwt}
793------END NATS USER JWT------
794
795-----BEGIN USER NKEY SEED-----
796{expected_seed}
797------END USER NKEY SEED------
798"#,
799        );
800        let tmpdir = tempdir().expect("should have created a temporary directory");
801        let nats_creds_path = tmpdir.path().join("nats.creds");
802        let mut nats_creds = File::create(nats_creds_path.clone())
803            .expect("should have created nats.creds in temporary directory");
804        let _ = nats_creds.write_all(creds.as_bytes());
805        let _ = nats_creds.flush();
806
807        let (jwt, kp) = parse_nats_credentials(Some(nats_creds_path), None, None)
808            .await
809            .unwrap();
810        assert_eq!(jwt.unwrap(), expected_jwt);
811        assert_eq!(kp.unwrap().seed().unwrap(), expected_seed);
812        drop(nats_creds);
813        tmpdir
814            .close()
815            .expect("should have closed the temporary directory handle");
816
817        // Test that passing in `--nats-jwt` and `--nats-seed` works
818        let (jwt, kp) = parse_nats_credentials(
819            None,
820            Some(String::from(expected_jwt)),
821            Some(String::from(expected_seed)),
822        )
823        .await
824        .unwrap();
825        assert_eq!(jwt.unwrap(), expected_jwt);
826        assert_eq!(kp.unwrap().seed().unwrap(), expected_seed);
827
828        let expected_jwt = "eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJPSTZNRlRQMlpPTlVaSlhTSjVGQ01CUVFGR0xIRUlZTkpXWVJTR0xRQkRHS1JKTVlDQlpBIiwiaWF0IjoxNzI0NzczMDMzLCJpc3MiOiJBQUo3S0s3TkFQQURLM0dUSVNPQ1BFUVk1UVFRMk1MUFdVWlVTWVVNN0pRQVYyNExYSUZGQkU0WCIsIm5hbWUiOiJqdXN0LWZvci10ZXN0aW5nIiwic3ViIjoiVUI2NzJSWk9VQkxaNFZWTjdNVlpPNktHS1JCTDJFSTVLQldYUkhUVlBKUlA3UDY0WEc2NU5YRDciLCJuYXRzIjp7InB1YiI6e30sInN1YiI6e30sInN1YnMiOi0xLCJkYXRhIjotMSwicGF5bG9hZCI6LTEsInR5cGUiOiJ1c2VyIiwidmVyc2lvbiI6Mn19.YgaVafvKp_VLmlQsN26zrhtX8yHMpnxjcUtX51ctd8hh_KqqiSdHtHOlFRapHbpHaiFS_kp9e67L0aqdSn87BA";
829        let expected_seed = "SUAO2CXJCBHGBKIR5TPLXQH6WV2QEEP3YQLLPNVLYVTNSDCZFJMCBHEIN4";
830        let (jwt, kp) = parse_nats_credentials(
831            None,
832            Some(String::from(expected_jwt)),
833            Some(String::from(expected_seed)),
834        )
835        .await
836        .unwrap();
837        assert_eq!(jwt.unwrap(), expected_jwt);
838        assert_eq!(kp.unwrap().seed().unwrap(), expected_seed);
839
840        // Test that passing in nothing also works
841        let (no_nats_jwt, no_nats_key) = parse_nats_credentials(None, None, None).await.unwrap();
842        assert!(no_nats_jwt.is_none());
843        assert!(no_nats_key.is_none());
844    }
845
846    #[test]
847    fn test_parse_jwt_and_key_from_creds() {
848        let expected_jwt = "eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJPSTZNRlRQMlpPTlVaSlhTSjVGQ01CUVFGR0xIRUlZTkpXWVJTR0xRQkRHS1JKTVlDQlpBIiwiaWF0IjoxNzI0NzczMDMzLCJpc3MiOiJBQUo3S0s3TkFQQURLM0dUSVNPQ1BFUVk1UVFRMk1MUFdVWlVTWVVNN0pRQVYyNExYSUZGQkU0WCIsIm5hbWUiOiJqdXN0LWZvci10ZXN0aW5nIiwic3ViIjoiVUI2NzJSWk9VQkxaNFZWTjdNVlpPNktHS1JCTDJFSTVLQldYUkhUVlBKUlA3UDY0WEc2NU5YRDciLCJuYXRzIjp7InB1YiI6e30sInN1YiI6e30sInN1YnMiOi0xLCJkYXRhIjotMSwicGF5bG9hZCI6LTEsInR5cGUiOiJ1c2VyIiwidmVyc2lvbiI6Mn19.YgaVafvKp_VLmlQsN26zrhtX8yHMpnxjcUtX51ctd8hh_KqqiSdHtHOlFRapHbpHaiFS_kp9e67L0aqdSn87BA";
849        let expected_seed = "SUAO2CXJCBHGBKIR5TPLXQH6WV2QEEP3YQLLPNVLYVTNSDCZFJMCBHEIN4";
850
851        let creds = format!(
852            r#"
853-----BEGIN NATS USER JWT-----
854{expected_jwt}
855------END NATS USER JWT------
856
857************************* IMPORTANT *************************
858NKEY Seed printed below can be used to sign and prove identity.
859NKEYs are sensitive and should be treated as secrets.
860
861-----BEGIN USER NKEY SEED-----
862{expected_seed}
863------END USER NKEY SEED------
864
865*************************************************************
866"#
867        );
868
869        let (jwt, kp) = parse_jwt_and_key_from_creds(&creds)
870            .expect("should have parsed the creds successfully");
871
872        assert!(jwt.is_some());
873        assert!(kp.is_some());
874        assert_eq!(jwt.unwrap(), expected_jwt);
875        assert_eq!(kp.unwrap().seed().unwrap(), expected_seed);
876
877        // Test error cases
878        let creds_missing_jwt = r#"
879-----BEGIN NATS USER JWT-----
880------END NATS USER JWT------
881
882-----BEGIN USER NKEY SEED-----
883SUAO2CXJCBHGBKIR5TPLXQH6WV2QEEP3YQLLPNVLYVTNSDCZFJMCBHEIN4
884------END USER NKEY SEED------
885"#;
886        assert!(parse_jwt_and_key_from_creds(creds_missing_jwt).is_err());
887
888        let creds_missing_seed = r#"
889-----BEGIN NATS USER JWT-----
890eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJPSTZNRlRQMlpPTlVaSlhTSjVGQ01CUVFGR0xIRUlZTkpXWVJTR0xRQkRHS1JKTVlDQlpBIiwiaWF0IjoxNzI0NzczMDMzLCJpc3MiOiJBQUo3S0s3TkFQQURLM0dUSVNPQ1BFUVk1UVFRMk1MUFdVWlVTWVVNN0pRQVYyNExYSUZGQkU0WCIsIm5hbWUiOiJqdXN0LWZvci10ZXN0aW5nIiwic3ViIjoiVUI2NzJSWk9VQkxaNFZWTjdNVlpPNktHS1JCTDJFSTVLQldYUkhUVlBKUlA3UDY0WEc2NU5YRDciLCJuYXRzIjp7InB1YiI6e30sInN1YiI6e30sInN1YnMiOi0xLCJkYXRhIjotMSwicGF5bG9hZCI6LTEsInR5cGUiOiJ1c2VyIiwidmVyc2lvbiI6Mn19.YgaVafvKp_VLmlQsN26zrhtX8yHMpnxjcUtX51ctd8hh_KqqiSdHtHOlFRapHbpHaiFS_kp9e67L0aqdSn87BA
891------END NATS USER JWT------
892
893-----BEGIN USER NKEY SEED-----
894------END USER NKEY SEED------
895        "#;
896        assert!(parse_jwt_and_key_from_creds(creds_missing_seed).is_err());
897
898        let creds_invalid_seed = r#"
899-----BEGIN NATS USER JWT-----
900eyJ0eXAiOiJKV1QiLCJhbGciOiJlZDI1NTE5LW5rZXkifQ.eyJqdGkiOiJPSTZNRlRQMlpPTlVaSlhTSjVGQ01CUVFGR0xIRUlZTkpXWVJTR0xRQkRHS1JKTVlDQlpBIiwiaWF0IjoxNzI0NzczMDMzLCJpc3MiOiJBQUo3S0s3TkFQQURLM0dUSVNPQ1BFUVk1UVFRMk1MUFdVWlVTWVVNN0pRQVYyNExYSUZGQkU0WCIsIm5hbWUiOiJqdXN0LWZvci10ZXN0aW5nIiwic3ViIjoiVUI2NzJSWk9VQkxaNFZWTjdNVlpPNktHS1JCTDJFSTVLQldYUkhUVlBKUlA3UDY0WEc2NU5YRDciLCJuYXRzIjp7InB1YiI6e30sInN1YiI6e30sInN1YnMiOi0xLCJkYXRhIjotMSwicGF5bG9hZCI6LTEsInR5cGUiOiJ1c2VyIiwidmVyc2lvbiI6Mn19.YgaVafvKp_VLmlQsN26zrhtX8yHMpnxjcUtX51ctd8hh_KqqiSdHtHOlFRapHbpHaiFS_kp9e67L0aqdSn87BA
901------END NATS USER JWT------
902
903-----BEGIN USER NKEY SEED-----
904SUANOPE
905------END USER NKEY SEED------
906        "#;
907        assert!(parse_jwt_and_key_from_creds(creds_invalid_seed).is_err());
908    }
909}