wasmcloud_host/wasmbus/
handler.rs

1use core::any::Any;
2use core::iter::{repeat, zip};
3use std::collections::{BTreeMap, HashMap};
4use std::ops::Deref;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::{anyhow, bail, Context as _};
9use async_nats::header::{IntoHeaderName as _, IntoHeaderValue as _};
10use async_trait::async_trait;
11use bytes::Bytes;
12use secrecy::SecretBox;
13#[cfg(unix)]
14use spire_api::{
15    selectors::Selector, DelegateAttestationRequest::Selectors, DelegatedIdentityClient,
16};
17use tokio::sync::RwLock;
18use tracing::{error, instrument, warn};
19use wasmcloud_runtime::capability::logging::logging;
20use wasmcloud_runtime::capability::secrets::store::SecretValue;
21use wasmcloud_runtime::capability::{
22    self, identity, messaging0_2_0, messaging0_3_0, secrets, CallTargetInterface,
23};
24use wasmcloud_runtime::component::{
25    Bus, Bus1_0_0, Config, Error, Identity, InvocationErrorIntrospect, InvocationErrorKind,
26    Logging, Messaging0_2, Messaging0_3, MessagingClient0_3, MessagingGuestMessage0_3,
27    MessagingHostMessage0_3, ReplacedInstanceTarget, Secrets,
28};
29use wasmcloud_tracing::context::TraceContextInjector;
30use wrpc_transport::InvokeExt as _;
31
32use super::config::ConfigBundle;
33use super::{injector_to_headers, Features};
34
35// The key used to represent a wasmCloud-specific selector:
36// https://github.com/spiffe/spire-api-sdk/blob/3c6b1447f3d82210b91462d003f6c2774ffbe472/proto/spire/api/types/selector.proto#L6-L8
37//
38// Similar to existing types defined in the spire-api crate: https://github.com/maxlambrecht/rust-spiffe/blob/929a090f99d458dd67fa499b74afbeb2fc44b114/spire-api/src/selectors.rs#L4-L5
39const WASMCLOUD_SELECTOR_TYPE: &str = "wasmcloud";
40// Similar to the existing Kubernetes types: https://github.com/maxlambrecht/rust-spiffe/blob/929a090f99d458dd67fa499b74afbeb2fc44b114/spire-api/src/selectors.rs#L38-L39
41const WASMCLOUD_SELECTOR_COMPONENT: &str = "component";
42
43#[derive(Clone, Debug)]
44pub struct Handler {
45    pub nats: Arc<async_nats::Client>,
46    // ConfigBundle is perfectly safe to pass around, but in order to update it on the fly, we need
47    // to have it behind a lock since it can be cloned and because the `Actor` struct this gets
48    // placed into is also inside of an Arc
49    pub config_data: Arc<RwLock<ConfigBundle>>,
50    /// Secrets are cached per-[`Handler`] so they can be used at runtime without consulting the secrets
51    /// backend for each request. The [`SecretValue`] is wrapped in the [`Secret`] type from the `secrecy`
52    /// crate to ensure that it is not accidentally logged or exposed in error messages.
53    pub secrets: Arc<RwLock<HashMap<String, SecretBox<SecretValue>>>>,
54    /// The lattice this handler will use for RPC
55    pub lattice: Arc<str>,
56    /// The identifier of the component that this handler is associated with
57    pub component_id: Arc<str>,
58    /// The current link targets. `instance` -> `link-name`
59    /// Instance specification does not include a version
60    pub targets: Arc<RwLock<HashMap<Box<str>, Arc<str>>>>,
61
62    /// Map of link names -> instance -> Target
63    ///
64    /// While a target may often be a component ID, it is not guaranteed to be one, and could be
65    /// some other identifier of where to send invocations, representing one or more lattice entities.
66    ///
67    /// Lattice entities could be:
68    /// - A (single) Component ID
69    /// - A routing group
70    /// - Some other opaque string
71    #[allow(clippy::type_complexity)]
72    pub instance_links: Arc<RwLock<HashMap<Box<str>, HashMap<Box<str>, Box<str>>>>>,
73    /// Link name -> messaging client
74    pub messaging_links: Arc<RwLock<HashMap<Box<str>, async_nats::Client>>>,
75
76    pub invocation_timeout: Duration,
77    /// Experimental features enabled in the host for gating handler functionality
78    pub experimental_features: Features,
79    /// Labels associated with the wasmCloud Host the component is running on
80    pub host_labels: Arc<RwLock<BTreeMap<String, String>>>,
81}
82
83impl Handler {
84    /// Used for creating a new handler from an existing one. This is different than clone because
85    /// some fields shouldn't be copied between component instances such as link targets.
86    pub fn copy_for_new(&self) -> Self {
87        Handler {
88            nats: self.nats.clone(),
89            config_data: self.config_data.clone(),
90            secrets: self.secrets.clone(),
91            lattice: self.lattice.clone(),
92            component_id: self.component_id.clone(),
93            targets: Arc::default(),
94            instance_links: self.instance_links.clone(),
95            messaging_links: self.messaging_links.clone(),
96            invocation_timeout: self.invocation_timeout,
97            experimental_features: self.experimental_features,
98            host_labels: self.host_labels.clone(),
99        }
100    }
101}
102
103#[async_trait]
104impl Bus1_0_0 for Handler {
105    /// Set the current link name in use by the handler, which is otherwise "default".
106    ///
107    /// Link names are important to set to differentiate similar operations (ex. `wasi:keyvalue/store.get`)
108    /// that should go to different targets (ex. a capability provider like `kv-redis` vs `kv-vault`)
109    #[instrument(level = "debug", skip(self))]
110    async fn set_link_name(&self, link_name: String, interfaces: Vec<Arc<CallTargetInterface>>) {
111        let interfaces = interfaces.iter().map(Deref::deref);
112        let mut targets = self.targets.write().await;
113        if link_name == "default" {
114            for CallTargetInterface {
115                namespace,
116                package,
117                interface,
118            } in interfaces
119            {
120                targets.remove(&format!("{namespace}:{package}/{interface}").into_boxed_str());
121            }
122        } else {
123            let link_name = Arc::from(link_name);
124            for CallTargetInterface {
125                namespace,
126                package,
127                interface,
128            } in interfaces
129            {
130                targets.insert(
131                    format!("{namespace}:{package}/{interface}").into_boxed_str(),
132                    Arc::clone(&link_name),
133                );
134            }
135        }
136    }
137}
138
139#[async_trait]
140impl Bus for Handler {
141    /// Set the current link name in use by the handler, which is otherwise "default".
142    ///
143    /// Link names are important to set to differentiate similar operations (ex. `wasi:keyvalue/store.get`)
144    /// that should go to different targets (ex. a capability provider like `kv-redis` vs `kv-vault`)
145    #[instrument(level = "debug", skip(self))]
146    async fn set_link_name(
147        &self,
148        link_name: String,
149        interfaces: Vec<Arc<CallTargetInterface>>,
150    ) -> anyhow::Result<Result<(), String>> {
151        let links = self.instance_links.read().await;
152        // Ensure that all interfaces have an established link with the given name.
153        if let Some(interface_missing_link) = interfaces.iter().find_map(|i| {
154            let instance = i.as_instance();
155            // This could be expressed in one line as a `!(bool).then_some`, but the negation makes it confusing
156            if links
157                .get(link_name.as_str())
158                .and_then(|l| l.get(instance.as_str()))
159                .is_none()
160            {
161                Some(instance)
162            } else {
163                None
164            }
165        }) {
166            return Ok(Err(format!(
167                "interface `{interface_missing_link}` does not have an existing link with name `{link_name}`"
168            )));
169        }
170        // Explicitly drop the lock before calling `set_link_name` just to avoid holding the lock for longer than needed
171        drop(links);
172
173        Bus1_0_0::set_link_name(self, link_name, interfaces).await;
174        Ok(Ok(()))
175    }
176}
177
178impl wrpc_transport::Invoke for Handler {
179    type Context = Option<ReplacedInstanceTarget>;
180    type Outgoing = <wrpc_transport_nats::Client as wrpc_transport::Invoke>::Outgoing;
181    type Incoming = <wrpc_transport_nats::Client as wrpc_transport::Invoke>::Incoming;
182
183    #[instrument(level = "debug", skip_all)]
184    async fn invoke<P>(
185        &self,
186        target_instance: Self::Context,
187        instance: &str,
188        func: &str,
189        params: Bytes,
190        paths: impl AsRef<[P]> + Send,
191    ) -> anyhow::Result<(Self::Outgoing, Self::Incoming)>
192    where
193        P: AsRef<[Option<usize>]> + Send + Sync,
194    {
195        let links = self.instance_links.read().await;
196        let targets = self.targets.read().await;
197
198        let target_instance = match target_instance {
199            Some(
200                ReplacedInstanceTarget::BlobstoreBlobstore
201                | ReplacedInstanceTarget::BlobstoreContainer,
202            ) => "wasi:blobstore/blobstore",
203            Some(ReplacedInstanceTarget::KeyvalueAtomics) => "wasi:keyvalue/atomics",
204            Some(ReplacedInstanceTarget::KeyvalueStore) => "wasi:keyvalue/store",
205            Some(ReplacedInstanceTarget::KeyvalueBatch) => "wasi:keyvalue/batch",
206            Some(ReplacedInstanceTarget::KeyvalueWatch) => "wasi:keyvalue/watcher",
207            Some(ReplacedInstanceTarget::HttpIncomingHandler) => "wasi:http/incoming-handler",
208            Some(ReplacedInstanceTarget::HttpOutgoingHandler) => "wasi:http/outgoing-handler",
209            None => instance.split_once('@').map_or(instance, |(l, _)| l),
210        };
211
212        let link_name = targets
213            .get(target_instance)
214            .map_or("default", AsRef::as_ref);
215
216        let instances = links
217            .get(link_name)
218            .with_context(|| {
219                warn!(
220                    instance,
221                    link_name,
222                    ?target_instance,
223                    ?self.component_id,
224                    "no links with link name found for instance"
225                );
226                format!("link `{link_name}` not found for instance `{target_instance}`")
227            })
228            .map_err(Error::LinkNotFound)?;
229
230        // Determine the lattice target ID we should be sending to
231        let id = instances.get(target_instance).with_context(||{
232            warn!(
233                instance,
234                ?target_instance,
235                ?self.component_id,
236                "component is not linked to a lattice target for the given instance"
237            );
238            format!("failed to call `{func}` in instance `{instance}` (failed to find a configured link with name `{link_name}` from component `{id}`, please check your configuration)", id = self.component_id)
239        }).map_err(Error::LinkNotFound)?;
240
241        let mut headers = injector_to_headers(&TraceContextInjector::default_with_span());
242        headers.insert("source-id", &*self.component_id);
243        headers.insert("link-name", link_name);
244        let nats = wrpc_transport_nats::Client::new(
245            Arc::clone(&self.nats),
246            format!("{}.{id}", &self.lattice),
247            None,
248        )
249        .await
250        .map_err(Error::Handler)?;
251        let (tx, rx) = nats
252            .timeout(self.invocation_timeout)
253            .invoke(Some(headers), instance, func, params, paths)
254            .await
255            .map_err(Error::Handler)?;
256        Ok((tx, rx))
257    }
258}
259
260#[async_trait]
261impl Config for Handler {
262    #[instrument(level = "debug", skip_all)]
263    async fn get(
264        &self,
265        key: &str,
266    ) -> anyhow::Result<Result<Option<String>, capability::config::store::Error>> {
267        let lock = self.config_data.read().await;
268        let conf = lock.get_config().await;
269        let data = conf.get(key).cloned();
270        Ok(Ok(data))
271    }
272
273    #[instrument(level = "debug", skip_all)]
274    async fn get_all(
275        &self,
276    ) -> anyhow::Result<Result<Vec<(String, String)>, capability::config::store::Error>> {
277        Ok(Ok(self
278            .config_data
279            .read()
280            .await
281            .get_config()
282            .await
283            .clone()
284            .into_iter()
285            .collect()))
286    }
287}
288
289#[async_trait]
290impl Logging for Handler {
291    #[instrument(level = "trace", skip(self))]
292    async fn log(
293        &self,
294        level: logging::Level,
295        context: String,
296        message: String,
297    ) -> anyhow::Result<()> {
298        match level {
299            logging::Level::Trace => {
300                tracing::event!(
301                    tracing::Level::TRACE,
302                    component_id = ?self.component_id,
303                    level = level.to_string(),
304                    context,
305                    "{message}"
306                );
307            }
308            logging::Level::Debug => {
309                tracing::event!(
310                    tracing::Level::DEBUG,
311                    component_id = ?self.component_id,
312                    level = level.to_string(),
313                    context,
314                    "{message}"
315                );
316            }
317            logging::Level::Info => {
318                tracing::event!(
319                    tracing::Level::INFO,
320                    component_id = ?self.component_id,
321                    level = level.to_string(),
322                    context,
323                    "{message}"
324                );
325            }
326            logging::Level::Warn => {
327                tracing::event!(
328                    tracing::Level::WARN,
329                    component_id = ?self.component_id,
330                    level = level.to_string(),
331                    context,
332                    "{message}"
333                );
334            }
335            logging::Level::Error => {
336                tracing::event!(
337                    tracing::Level::ERROR,
338                    component_id = ?self.component_id,
339                    level = level.to_string(),
340                    context,
341                    "{message}"
342                );
343            }
344            logging::Level::Critical => {
345                tracing::event!(
346                    tracing::Level::ERROR,
347                    component_id = ?self.component_id,
348                    level = level.to_string(),
349                    context,
350                    "{message}"
351                );
352            }
353        };
354        Ok(())
355    }
356}
357
358#[async_trait]
359impl Secrets for Handler {
360    #[instrument(level = "debug", skip_all)]
361    async fn get(
362        &self,
363        key: &str,
364    ) -> anyhow::Result<Result<secrets::store::Secret, secrets::store::SecretsError>> {
365        if self.secrets.read().await.get(key).is_some() {
366            Ok(Ok(Arc::new(key.to_string())))
367        } else {
368            Ok(Err(secrets::store::SecretsError::NotFound))
369        }
370    }
371
372    async fn reveal(
373        &self,
374        secret: secrets::store::Secret,
375    ) -> anyhow::Result<secrets::store::SecretValue> {
376        let read_lock = self.secrets.read().await;
377        let Some(secret_val) = read_lock.get(secret.as_str()) else {
378            // NOTE(brooksmtownsend): This error case should never happen, since we check for existence during `get` and
379            // fail to start the component if the secret is missing. We might hit this during wRPC testing with resources.
380            const ERROR_MSG: &str = "secret not found to reveal, ensure the secret is declared and associated with this component at startup";
381            // NOTE: This "secret" is just the name of the key, not the actual secret value. Regardless the secret itself
382            // both wasn't found and is wrapped by `secrecy` so it won't be logged.
383            error!(?secret, ERROR_MSG);
384            bail!(ERROR_MSG)
385        };
386        use secrecy::ExposeSecret;
387        Ok(secret_val.expose_secret().clone())
388    }
389}
390
391impl Messaging0_2 for Handler {
392    #[instrument(level = "debug", skip_all)]
393    async fn request(
394        &self,
395        subject: String,
396        body: Vec<u8>,
397        timeout_ms: u32,
398    ) -> anyhow::Result<Result<messaging0_2_0::types::BrokerMessage, String>> {
399        use wasmcloud_runtime::capability::wrpc::wasmcloud::messaging0_2_0 as messaging;
400
401        {
402            let targets = self.targets.read().await;
403            let target = targets
404                .get("wasmcloud:messaging/consumer")
405                .map(AsRef::as_ref)
406                .unwrap_or("default");
407            if let Some(nats) = self.messaging_links.read().await.get(target) {
408                match nats.request(subject, body.into()).await {
409                    Ok(async_nats::Message {
410                        subject,
411                        payload,
412                        reply,
413                        ..
414                    }) => {
415                        return Ok(Ok(messaging0_2_0::types::BrokerMessage {
416                            subject: subject.into_string(),
417                            body: payload.into(),
418                            reply_to: reply.map(async_nats::Subject::into_string),
419                        }))
420                    }
421                    Err(err) => return Ok(Err(err.to_string())),
422                }
423            }
424        }
425
426        match messaging::consumer::request(self, None, &subject, &Bytes::from(body), timeout_ms)
427            .await?
428        {
429            Ok(messaging::types::BrokerMessage {
430                subject,
431                body,
432                reply_to,
433            }) => Ok(Ok(messaging0_2_0::types::BrokerMessage {
434                subject,
435                body: body.into(),
436                reply_to,
437            })),
438            Err(err) => Ok(Err(err)),
439        }
440    }
441
442    #[instrument(level = "debug", skip_all)]
443    async fn publish(
444        &self,
445        messaging0_2_0::types::BrokerMessage {
446            subject,
447            body,
448            reply_to,
449        }: messaging0_2_0::types::BrokerMessage,
450    ) -> anyhow::Result<Result<(), String>> {
451        use wasmcloud_runtime::capability::wrpc::wasmcloud::messaging0_2_0 as messaging;
452
453        {
454            let targets = self.targets.read().await;
455            let target = targets
456                .get("wasmcloud:messaging/consumer")
457                .map(AsRef::as_ref)
458                .unwrap_or("default");
459            if let Some(nats) = self.messaging_links.read().await.get(target) {
460                if let Some(reply_to) = reply_to {
461                    match nats
462                        .publish_with_reply(subject, reply_to, body.into())
463                        .await
464                    {
465                        Ok(()) => return Ok(Ok(())),
466                        Err(err) => return Ok(Err(err.to_string())),
467                    }
468                }
469                match nats.publish(subject, body.into()).await {
470                    Ok(()) => return Ok(Ok(())),
471                    Err(err) => return Ok(Err(err.to_string())),
472                }
473            }
474        }
475
476        messaging::consumer::publish(
477            self,
478            None,
479            &messaging::types::BrokerMessage {
480                subject,
481                body: body.into(),
482                reply_to,
483            },
484        )
485        .await
486    }
487}
488
489struct MessagingClient {
490    name: Box<str>,
491}
492
493#[async_trait]
494impl MessagingClient0_3 for MessagingClient {
495    async fn disconnect(&mut self) -> anyhow::Result<Result<(), messaging0_3_0::types::Error>> {
496        Ok(Ok(()))
497    }
498
499    fn as_any(&self) -> &dyn Any {
500        self
501    }
502}
503
504/// Concrete implementation of a message originating directly from the host, i.e. not received via
505/// wRPC.
506enum Message {
507    Nats(async_nats::Message),
508}
509
510#[async_trait]
511impl MessagingHostMessage0_3 for Message {
512    async fn topic(&self) -> anyhow::Result<Option<messaging0_3_0::types::Topic>> {
513        match self {
514            Message::Nats(async_nats::Message { subject, .. }) => Ok(Some(subject.to_string())),
515        }
516    }
517    async fn content_type(&self) -> anyhow::Result<Option<String>> {
518        Ok(None)
519    }
520    async fn set_content_type(&mut self, _content_type: String) -> anyhow::Result<()> {
521        bail!("`content-type` not supported")
522    }
523    async fn data(&self) -> anyhow::Result<Vec<u8>> {
524        match self {
525            Message::Nats(async_nats::Message { payload, .. }) => Ok(payload.to_vec()),
526        }
527    }
528    async fn set_data(&mut self, buf: Vec<u8>) -> anyhow::Result<()> {
529        match self {
530            Message::Nats(msg) => {
531                msg.payload = buf.into();
532            }
533        }
534        Ok(())
535    }
536    async fn metadata(&self) -> anyhow::Result<Option<messaging0_3_0::types::Metadata>> {
537        match self {
538            Message::Nats(async_nats::Message { headers: None, .. }) => Ok(None),
539            Message::Nats(async_nats::Message {
540                headers: Some(headers),
541                ..
542            }) => Ok(Some(headers.iter().fold(
543                // TODO: Initialize vector with capacity, once `async-nats` is updated to 0.37,
544                // where `len` method is introduced:
545                // https://docs.rs/async-nats/0.37.0/async_nats/header/struct.HeaderMap.html#method.len
546                //Vec::with_capacity(headers.len()),
547                Vec::default(),
548                |mut headers, (k, vs)| {
549                    for v in vs {
550                        headers.push((k.to_string(), v.to_string()))
551                    }
552                    headers
553                },
554            ))),
555        }
556    }
557    async fn add_metadata(&mut self, key: String, value: String) -> anyhow::Result<()> {
558        match self {
559            Message::Nats(async_nats::Message {
560                headers: Some(headers),
561                ..
562            }) => {
563                headers.append(key, value);
564                Ok(())
565            }
566            Message::Nats(async_nats::Message { headers, .. }) => {
567                *headers = Some(async_nats::HeaderMap::from_iter([(
568                    key.into_header_name(),
569                    value.into_header_value(),
570                )]));
571                Ok(())
572            }
573        }
574    }
575    async fn set_metadata(&mut self, meta: messaging0_3_0::types::Metadata) -> anyhow::Result<()> {
576        match self {
577            Message::Nats(async_nats::Message { headers, .. }) => {
578                *headers = Some(
579                    meta.into_iter()
580                        .map(|(k, v)| (k.into_header_name(), v.into_header_value()))
581                        .collect(),
582                );
583                Ok(())
584            }
585        }
586    }
587    async fn remove_metadata(&mut self, key: String) -> anyhow::Result<()> {
588        match self {
589            Message::Nats(async_nats::Message {
590                headers: Some(headers),
591                ..
592            }) => {
593                *headers = headers
594                    .iter()
595                    // NOTE(brooksmtownsend): The funky construction here is to provide a concrete type
596                    // to the `as_ref()` call, which is necessary to satisfy the type inference on Windows.
597                    .filter(|(k, ..)| <&async_nats::HeaderName as AsRef<str>>::as_ref(k) != key)
598                    .flat_map(|(k, vs)| zip(repeat(k.clone()), vs.iter().cloned()))
599                    .collect();
600                Ok(())
601            }
602            Message::Nats(..) => Ok(()),
603        }
604    }
605
606    fn as_any(&self) -> &dyn Any {
607        self
608    }
609
610    fn into_any(self: Box<Self>) -> Box<dyn Any> {
611        self
612    }
613}
614
615impl Messaging0_3 for Handler {
616    #[instrument(level = "debug", skip_all)]
617    async fn connect(
618        &self,
619        name: String,
620    ) -> anyhow::Result<
621        Result<Box<dyn MessagingClient0_3 + Send + Sync>, messaging0_3_0::types::Error>,
622    > {
623        Ok(Ok(Box::new(MessagingClient {
624            name: name.into_boxed_str(),
625        })))
626    }
627
628    #[instrument(level = "debug", skip_all)]
629    async fn send(
630        &self,
631        client: &(dyn MessagingClient0_3 + Send + Sync),
632        topic: messaging0_3_0::types::Topic,
633        message: messaging0_3_0::types::Message,
634    ) -> anyhow::Result<Result<(), messaging0_3_0::types::Error>> {
635        use wasmcloud_runtime::capability::wrpc::wasmcloud::messaging0_2_0 as messaging;
636
637        let MessagingClient { name } = client
638            .as_any()
639            .downcast_ref()
640            .context("unknown client type")?;
641        {
642            let targets = self.targets.read().await;
643            let target = targets
644                .get("wasmcloud:messaging/producer")
645                .map(AsRef::as_ref)
646                .unwrap_or("default");
647            let name = if name.is_empty() {
648                "default"
649            } else {
650                name.as_ref()
651            };
652            if name != target {
653                return Ok(Err(messaging0_3_0::types::Error::Other(format!(
654                    "mismatch between link name and client connection name, `{name}` != `{target}`"
655                ))));
656            }
657            if let Some(nats) = self.messaging_links.read().await.get(target) {
658                match match message {
659                    messaging0_3_0::types::Message::Host(message) => {
660                        let message = message
661                            .into_any()
662                            .downcast::<Message>()
663                            .map_err(|_| anyhow!("unknown message type"))?;
664                        match *message {
665                            Message::Nats(async_nats::Message {
666                                payload,
667                                headers: Some(headers),
668                                ..
669                            }) => nats.publish_with_headers(topic, headers, payload).await,
670                            Message::Nats(async_nats::Message { payload, .. }) => {
671                                nats.publish(topic, payload).await
672                            }
673                        }
674                    }
675                    messaging0_3_0::types::Message::Wrpc(messaging::types::BrokerMessage {
676                        body,
677                        ..
678                    }) => nats.publish(topic, body).await,
679                    messaging0_3_0::types::Message::Guest(MessagingGuestMessage0_3 {
680                        content_type,
681                        data,
682                        metadata,
683                    }) => {
684                        if let Some(content_type) = content_type {
685                            warn!(
686                                content_type,
687                                "`content-type` not supported by NATS.io, value is ignored"
688                            );
689                        }
690                        if let Some(metadata) = metadata {
691                            nats.publish_with_headers(
692                                topic,
693                                metadata
694                                    .into_iter()
695                                    .map(|(k, v)| (k.into_header_name(), v.into_header_value()))
696                                    .collect(),
697                                data.into(),
698                            )
699                            .await
700                        } else {
701                            nats.publish(topic, data.into()).await
702                        }
703                    }
704                } {
705                    Ok(()) => return Ok(Ok(())),
706                    Err(err) => {
707                        // TODO: Correctly handle error kind
708                        return Ok(Err(messaging0_3_0::types::Error::Other(err.to_string())));
709                    }
710                }
711            }
712            let body = match message {
713                messaging0_3_0::types::Message::Host(message) => {
714                    let message = message
715                        .into_any()
716                        .downcast::<Message>()
717                        .map_err(|_| anyhow!("unknown message type"))?;
718                    match *message {
719                        Message::Nats(async_nats::Message {
720                            headers: Some(..), ..
721                        }) => {
722                            return Ok(Err(messaging0_3_0::types::Error::Other(
723                                "headers not currently supported by wRPC targets".into(),
724                            )));
725                        }
726                        Message::Nats(async_nats::Message { payload, .. }) => payload,
727                    }
728                }
729                messaging0_3_0::types::Message::Wrpc(messaging::types::BrokerMessage {
730                    body,
731                    ..
732                }) => body,
733                messaging0_3_0::types::Message::Guest(MessagingGuestMessage0_3 {
734                    metadata: Some(..),
735                    ..
736                }) => {
737                    return Ok(Err(messaging0_3_0::types::Error::Other(
738                        "`metadata` not currently supported by wRPC targets".into(),
739                    )));
740                }
741                messaging0_3_0::types::Message::Guest(MessagingGuestMessage0_3 {
742                    content_type,
743                    data,
744                    ..
745                }) => {
746                    if let Some(content_type) = content_type {
747                        warn!(
748                            content_type,
749                            "`content-type` not currently supported by wRPC targets, value is ignored",
750                        );
751                    }
752                    data.into()
753                }
754            };
755            match messaging::consumer::publish(
756                self,
757                None,
758                &messaging::types::BrokerMessage {
759                    subject: topic,
760                    body,
761                    reply_to: None,
762                },
763            )
764            .await
765            {
766                Ok(Ok(())) => Ok(Ok(())),
767                Ok(Err(err)) => Ok(Err(messaging0_3_0::types::Error::Other(err))),
768                // TODO: Correctly handle error kind
769                Err(err) => Ok(Err(messaging0_3_0::types::Error::Other(err.to_string()))),
770            }
771        }
772    }
773
774    #[instrument(level = "debug", skip_all)]
775    async fn request(
776        &self,
777        client: &(dyn MessagingClient0_3 + Send + Sync),
778        topic: messaging0_3_0::types::Topic,
779        message: &messaging0_3_0::types::Message,
780        options: Option<messaging0_3_0::request_reply::RequestOptions>,
781    ) -> anyhow::Result<
782        Result<Vec<Box<dyn MessagingHostMessage0_3 + Send + Sync>>, messaging0_3_0::types::Error>,
783    > {
784        if options.is_some() {
785            return Ok(Err(messaging0_3_0::types::Error::Other(
786                "`options` not currently supported".into(),
787            )));
788        }
789
790        use wasmcloud_runtime::capability::wrpc::wasmcloud::messaging0_2_0 as messaging;
791
792        let MessagingClient { name } = client
793            .as_any()
794            .downcast_ref()
795            .context("unknown client type")?;
796        {
797            let targets = self.targets.read().await;
798            let target = targets
799                .get("wasmcloud:messaging/request-reply")
800                .map(AsRef::as_ref)
801                .unwrap_or("default");
802            let name = if name.is_empty() {
803                "default"
804            } else {
805                name.as_ref()
806            };
807            if name != target {
808                return Ok(Err(messaging0_3_0::types::Error::Other(format!(
809                    "mismatch between link name and client connection name, `{name}` != `{target}`"
810                ))));
811            }
812            if let Some(nats) = self.messaging_links.read().await.get(target) {
813                match match message {
814                    messaging0_3_0::types::Message::Host(message) => {
815                        let message = message
816                            .as_any()
817                            .downcast_ref::<Message>()
818                            .context("unknown message type")?;
819                        match message {
820                            Message::Nats(async_nats::Message {
821                                payload,
822                                headers: Some(headers),
823                                ..
824                            }) => {
825                                nats.request_with_headers(topic, headers.clone(), payload.clone())
826                                    .await
827                            }
828                            Message::Nats(async_nats::Message { payload, .. }) => {
829                                nats.request(topic, payload.clone()).await
830                            }
831                        }
832                    }
833                    messaging0_3_0::types::Message::Wrpc(messaging::types::BrokerMessage {
834                        body,
835                        ..
836                    }) => nats.request(topic, body.clone()).await,
837                    messaging0_3_0::types::Message::Guest(MessagingGuestMessage0_3 {
838                        content_type,
839                        data,
840                        metadata,
841                    }) => {
842                        if let Some(content_type) = content_type {
843                            warn!(
844                                content_type,
845                                "`content-type` not supported by NATS.io, value is ignored"
846                            );
847                        }
848                        if let Some(metadata) = metadata {
849                            nats.request_with_headers(
850                                topic,
851                                metadata
852                                    .iter()
853                                    .map(|(k, v)| {
854                                        (
855                                            k.as_str().into_header_name(),
856                                            v.as_str().into_header_value(),
857                                        )
858                                    })
859                                    .collect(),
860                                Bytes::copy_from_slice(data),
861                            )
862                            .await
863                        } else {
864                            nats.request(topic, Bytes::copy_from_slice(data)).await
865                        }
866                    }
867                } {
868                    Ok(msg) => return Ok(Ok(vec![Box::new(Message::Nats(msg))])),
869                    Err(err) => {
870                        // TODO: Correctly handle error kind
871                        return Ok(Err(messaging0_3_0::types::Error::Other(err.to_string())));
872                    }
873                }
874            }
875            let body = match message {
876                messaging0_3_0::types::Message::Host(message) => {
877                    let message = message
878                        .as_any()
879                        .downcast_ref::<Message>()
880                        .context("unknown message type")?;
881                    match message {
882                        Message::Nats(async_nats::Message {
883                            headers: Some(..), ..
884                        }) => {
885                            return Ok(Err(messaging0_3_0::types::Error::Other(
886                                "headers not currently supported by wRPC targets".into(),
887                            )));
888                        }
889                        Message::Nats(async_nats::Message { payload, .. }) => payload.clone(),
890                    }
891                }
892                messaging0_3_0::types::Message::Wrpc(messaging::types::BrokerMessage {
893                    body,
894                    ..
895                }) => body.clone(),
896                messaging0_3_0::types::Message::Guest(MessagingGuestMessage0_3 {
897                    metadata: Some(..),
898                    ..
899                }) => {
900                    return Ok(Err(messaging0_3_0::types::Error::Other(
901                        "`metadata` not currently supported by wRPC targets".into(),
902                    )));
903                }
904                messaging0_3_0::types::Message::Guest(MessagingGuestMessage0_3 {
905                    content_type,
906                    data,
907                    ..
908                }) => {
909                    if let Some(content_type) = content_type {
910                        warn!(
911                            content_type,
912                            "`content-type` not currently supported by wRPC targets, value is ignored",
913                        );
914                    }
915                    Bytes::copy_from_slice(data)
916                }
917            };
918
919            match messaging::consumer::publish(
920                self,
921                None,
922                &messaging::types::BrokerMessage {
923                    subject: topic,
924                    body,
925                    reply_to: None,
926                },
927            )
928            .await
929            {
930                Ok(Ok(())) => Ok(Err(messaging0_3_0::types::Error::Other(
931                    "message sent, but returning responses is not currently supported by wRPC targets".into(),
932                ))),
933                Ok(Err(err)) => Ok(Err(messaging0_3_0::types::Error::Other(err))),
934                // TODO: Correctly handle error kind
935                Err(err) => Ok(Err(messaging0_3_0::types::Error::Other(err.to_string()))),
936            }
937        }
938    }
939
940    #[instrument(level = "debug", skip_all)]
941    async fn reply(
942        &self,
943        reply_to: &messaging0_3_0::types::Message,
944        message: messaging0_3_0::types::Message,
945    ) -> anyhow::Result<Result<(), messaging0_3_0::types::Error>> {
946        use wasmcloud_runtime::capability::wrpc::wasmcloud::messaging0_2_0 as messaging;
947
948        {
949            let targets = self.targets.read().await;
950            let target = targets
951                .get("wasmcloud:messaging/request-reply")
952                .map(AsRef::as_ref)
953                .unwrap_or("default");
954            if let Some(nats) = self.messaging_links.read().await.get(target) {
955                let subject = match reply_to {
956                    messaging0_3_0::types::Message::Host(reply_to) => {
957                        match reply_to
958                            .as_any()
959                            .downcast_ref::<Message>()
960                            .context("unknown message type")?
961                        {
962                            Message::Nats(async_nats::Message {
963                                reply: Some(reply), ..
964                            }) => reply.clone(),
965                            Message::Nats(async_nats::Message { reply: None, .. }) => {
966                                return Ok(Err(messaging0_3_0::types::Error::Other(
967                                    "reply not set in incoming NATS.io message".into(),
968                                )))
969                            }
970                        }
971                    }
972                    messaging0_3_0::types::Message::Wrpc(messaging::types::BrokerMessage {
973                        reply_to: Some(reply_to),
974                        ..
975                    }) => reply_to.as_str().into(),
976                    messaging0_3_0::types::Message::Wrpc(messaging::types::BrokerMessage {
977                        reply_to: None,
978                        ..
979                    }) => {
980                        return Ok(Err(messaging0_3_0::types::Error::Other(
981                            "reply not set in incoming wRPC message".into(),
982                        )))
983                    }
984                    messaging0_3_0::types::Message::Guest(..) => {
985                        return Ok(Err(messaging0_3_0::types::Error::Other(
986                            "cannot reply to guest message".into(),
987                        )))
988                    }
989                };
990                match match message {
991                    messaging0_3_0::types::Message::Host(message) => {
992                        let message = message
993                            .into_any()
994                            .downcast::<Message>()
995                            .map_err(|_| anyhow!("unknown message type"))?;
996                        match *message {
997                            Message::Nats(async_nats::Message {
998                                payload,
999                                headers: Some(headers),
1000                                ..
1001                            }) => nats.publish_with_headers(subject, headers, payload).await,
1002                            Message::Nats(async_nats::Message { payload, .. }) => {
1003                                nats.publish(subject, payload).await
1004                            }
1005                        }
1006                    }
1007                    messaging0_3_0::types::Message::Wrpc(messaging::types::BrokerMessage {
1008                        body,
1009                        ..
1010                    }) => nats.publish(subject, body).await,
1011                    messaging0_3_0::types::Message::Guest(MessagingGuestMessage0_3 {
1012                        content_type,
1013                        data,
1014                        metadata,
1015                    }) => {
1016                        if let Some(content_type) = content_type {
1017                            warn!(
1018                                content_type,
1019                                "`content-type` not supported by NATS.io, value is ignored"
1020                            );
1021                        }
1022                        if let Some(metadata) = metadata {
1023                            nats.publish_with_headers(
1024                                subject,
1025                                metadata
1026                                    .into_iter()
1027                                    .map(|(k, v)| (k.into_header_name(), v.into_header_value()))
1028                                    .collect(),
1029                                data.into(),
1030                            )
1031                            .await
1032                        } else {
1033                            nats.publish(subject, data.into()).await
1034                        }
1035                    }
1036                } {
1037                    Ok(()) => return Ok(Ok(())),
1038                    Err(err) => {
1039                        // TODO: Correctly handle error kind
1040                        return Ok(Err(messaging0_3_0::types::Error::Other(err.to_string())));
1041                    }
1042                }
1043            }
1044            let body = match message {
1045                messaging0_3_0::types::Message::Host(message) => {
1046                    let message = message
1047                        .into_any()
1048                        .downcast::<Message>()
1049                        .map_err(|_| anyhow!("unknown message type"))?;
1050                    match *message {
1051                        Message::Nats(async_nats::Message {
1052                            headers: Some(..), ..
1053                        }) => {
1054                            return Ok(Err(messaging0_3_0::types::Error::Other(
1055                                "headers not currently supported by wRPC targets".into(),
1056                            )));
1057                        }
1058                        Message::Nats(async_nats::Message { payload, .. }) => payload,
1059                    }
1060                }
1061                messaging0_3_0::types::Message::Wrpc(messaging::types::BrokerMessage {
1062                    body,
1063                    ..
1064                }) => body,
1065                messaging0_3_0::types::Message::Guest(MessagingGuestMessage0_3 {
1066                    metadata: Some(..),
1067                    ..
1068                }) => {
1069                    return Ok(Err(messaging0_3_0::types::Error::Other(
1070                        "`metadata` not currently supported by wRPC targets".into(),
1071                    )));
1072                }
1073                messaging0_3_0::types::Message::Guest(MessagingGuestMessage0_3 {
1074                    content_type,
1075                    data,
1076                    ..
1077                }) => {
1078                    if let Some(content_type) = content_type {
1079                        warn!(
1080                            content_type,
1081                            "`content-type` not currently supported by wRPC targets, value is ignored",
1082                        );
1083                    }
1084                    data.into()
1085                }
1086            };
1087            let subject = match reply_to {
1088                messaging0_3_0::types::Message::Host(reply_to) => {
1089                    match reply_to
1090                        .as_any()
1091                        .downcast_ref::<Message>()
1092                        .context("unknown message type")?
1093                    {
1094                        Message::Nats(async_nats::Message {
1095                            reply: Some(reply), ..
1096                        }) => reply.to_string(),
1097                        Message::Nats(async_nats::Message { reply: None, .. }) => {
1098                            return Ok(Err(messaging0_3_0::types::Error::Other(
1099                                "reply not set in incoming NATS.io message".into(),
1100                            )))
1101                        }
1102                    }
1103                }
1104                messaging0_3_0::types::Message::Wrpc(messaging::types::BrokerMessage {
1105                    reply_to: Some(reply_to),
1106                    ..
1107                }) => reply_to.clone(),
1108                messaging0_3_0::types::Message::Wrpc(messaging::types::BrokerMessage {
1109                    reply_to: None,
1110                    ..
1111                }) => {
1112                    return Ok(Err(messaging0_3_0::types::Error::Other(
1113                        "reply not set in incoming wRPC message".into(),
1114                    )))
1115                }
1116                messaging0_3_0::types::Message::Guest(..) => {
1117                    return Ok(Err(messaging0_3_0::types::Error::Other(
1118                        "cannot reply to guest message".into(),
1119                    )))
1120                }
1121            };
1122            match messaging::consumer::publish(
1123                self,
1124                None,
1125                &messaging::types::BrokerMessage {
1126                    subject,
1127                    body,
1128                    reply_to: None,
1129                },
1130            )
1131            .await
1132            {
1133                Ok(Ok(())) => Ok(Ok(())),
1134                Ok(Err(err)) => Ok(Err(messaging0_3_0::types::Error::Other(err))),
1135                // TODO: Correctly handle error kind
1136                Err(err) => Ok(Err(messaging0_3_0::types::Error::Other(err.to_string()))),
1137            }
1138        }
1139    }
1140}
1141
1142#[async_trait]
1143impl Identity for Handler {
1144    #[cfg(unix)]
1145    #[instrument(level = "debug", skip_all)]
1146    async fn get(
1147        &self,
1148        audience: &str,
1149    ) -> anyhow::Result<Result<Option<String>, identity::store::Error>> {
1150        let mut client = match DelegatedIdentityClient::default().await {
1151            Ok(client) => client,
1152            Err(err) => {
1153                return Ok(Err(identity::store::Error::Io(format!(
1154                    "Unable to connect to workload identity service: {err}"
1155                ))));
1156            }
1157        };
1158
1159        let mut selectors =
1160            parse_selectors_from_host_labels(self.host_labels.read().await.deref()).await;
1161        // "wasmcloud", "component:{component_id}" is inserted at the end to make sure it can't be overridden.
1162        selectors.push(Selector::Generic((
1163            WASMCLOUD_SELECTOR_TYPE.to_string(),
1164            format!("{}:{}", WASMCLOUD_SELECTOR_COMPONENT, self.component_id),
1165        )));
1166
1167        let svids = match client
1168            .fetch_jwt_svids(&[audience], Selectors(selectors))
1169            .await
1170        {
1171            Ok(svids) => svids,
1172            Err(err) => {
1173                return Ok(Err(identity::store::Error::Io(format!(
1174                    "Unable to query workload identity service: {err}"
1175                ))));
1176            }
1177        };
1178
1179        if !svids.is_empty() {
1180            // TODO: Is there a better way to determine which SVID to return here?
1181            let svid = svids.first().map(|svid| svid.token()).unwrap_or_default();
1182            Ok(Ok(Some(svid.to_string())))
1183        } else {
1184            Ok(Err(identity::store::Error::NotFound))
1185        }
1186    }
1187
1188    #[cfg(target_family = "windows")]
1189    #[instrument(level = "debug", skip_all)]
1190    async fn get(
1191        &self,
1192        _audience: &str,
1193    ) -> anyhow::Result<Result<Option<String>, identity::store::Error>> {
1194        Ok(Err(identity::store::Error::Other(
1195            "workload identity is not supported on Windows".to_string(),
1196        )))
1197    }
1198}
1199
1200impl InvocationErrorIntrospect for Handler {
1201    fn invocation_error_kind(&self, err: &anyhow::Error) -> InvocationErrorKind {
1202        if let Some(err) = err.root_cause().downcast_ref::<std::io::Error>() {
1203            if err.kind() == std::io::ErrorKind::NotConnected {
1204                return InvocationErrorKind::NotFound;
1205            }
1206        }
1207        InvocationErrorKind::Trap
1208    }
1209}
1210
1211// TODO(joonas): Make this more generalized so we can support non-wasmcloud-specific
1212// selectors as well.
1213//
1214// environment variable -> WASMCLOUD_LABEL_wasmcloud__ns=my-namespace-goes-here
1215// becomes:
1216// SPIRE Selector -> wasmcloud:ns:my-namespace-goes-here
1217#[cfg(unix)]
1218async fn parse_selectors_from_host_labels(host_labels: &BTreeMap<String, String>) -> Vec<Selector> {
1219    let mut selectors = vec![];
1220
1221    for (key, value) in host_labels.iter() {
1222        // Ensure the label starts with `wasmcloud__` and doesn't end in `__`, i.e. just `wasmcloud__`
1223        if key.starts_with("wasmcloud__") && !key.ends_with("__") {
1224            let selector = key
1225                // Replace all __ with :
1226                .replace("__", ":")
1227                // Remove the leading "wasmcloud"
1228                .split_once(":")
1229                // Map the remaining part of the label key together with the value `` to make it a selector
1230                .map(|(_, selector)| format!("{selector}:{value}"))
1231                // This should never get triggered, but just in case.
1232                .unwrap_or("unknown".to_string());
1233
1234            selectors.push(Selector::Generic((
1235                WASMCLOUD_SELECTOR_TYPE.to_string(),
1236                selector,
1237            )));
1238        }
1239    }
1240
1241    selectors
1242}
1243
1244#[cfg(unix)]
1245#[cfg(test)]
1246mod tests {
1247    use super::*;
1248    use std::env::consts::{ARCH, FAMILY, OS};
1249
1250    #[tokio::test]
1251    async fn test_parse_selectors_from_host_labels() {
1252        let labels = BTreeMap::from([
1253            ("hostcore.arch".into(), ARCH.into()),
1254            ("hostcore.os".into(), OS.into()),
1255            ("hostcore.osfamily".into(), FAMILY.into()),
1256            ("wasmcloud__lattice".into(), "default".into()),
1257        ]);
1258
1259        let selectors = parse_selectors_from_host_labels(&labels).await;
1260
1261        assert_eq!(selectors.len(), 1);
1262
1263        let (selector_type, selector_value) = match selectors.first() {
1264            Some(Selector::Generic(pair)) => pair,
1265            _ => &("wrong-value".into(), "wrong-value".into()),
1266        };
1267        assert_eq!(selector_type, WASMCLOUD_SELECTOR_TYPE);
1268        assert_eq!(selector_value, "lattice:default");
1269    }
1270
1271    #[tokio::test]
1272    async fn test_parse_selectors_from_host_labels_defaults_to_no_selectors() {
1273        let no_labels = BTreeMap::new();
1274        let selectors = parse_selectors_from_host_labels(&no_labels).await;
1275        assert_eq!(selectors.len(), 0);
1276    }
1277}