1use core::any::Any;
2use core::iter::{repeat, zip};
3use std::collections::{BTreeMap, HashMap};
4use std::ops::Deref;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::{anyhow, bail, Context as _};
9use async_nats::header::{IntoHeaderName as _, IntoHeaderValue as _};
10use async_trait::async_trait;
11use bytes::Bytes;
12use secrecy::SecretBox;
13#[cfg(unix)]
14use spire_api::{
15 selectors::Selector, DelegateAttestationRequest::Selectors, DelegatedIdentityClient,
16};
17use tokio::sync::RwLock;
18use tracing::{error, instrument, warn};
19use wasmcloud_runtime::capability::logging::logging;
20use wasmcloud_runtime::capability::secrets::store::SecretValue;
21use wasmcloud_runtime::capability::{
22 self, identity, messaging0_2_0, messaging0_3_0, secrets, CallTargetInterface,
23};
24use wasmcloud_runtime::component::{
25 Bus, Bus1_0_0, Config, Error, Identity, InvocationErrorIntrospect, InvocationErrorKind,
26 Logging, Messaging0_2, Messaging0_3, MessagingClient0_3, MessagingGuestMessage0_3,
27 MessagingHostMessage0_3, ReplacedInstanceTarget, Secrets,
28};
29use wasmcloud_tracing::context::TraceContextInjector;
30use wrpc_transport::InvokeExt as _;
31
32use super::config::ConfigBundle;
33use super::{injector_to_headers, Features};
34
35const WASMCLOUD_SELECTOR_TYPE: &str = "wasmcloud";
40const WASMCLOUD_SELECTOR_COMPONENT: &str = "component";
42
43#[derive(Clone, Debug)]
44pub struct Handler {
45 pub nats: Arc<async_nats::Client>,
46 pub config_data: Arc<RwLock<ConfigBundle>>,
50 pub secrets: Arc<RwLock<HashMap<String, SecretBox<SecretValue>>>>,
54 pub lattice: Arc<str>,
56 pub component_id: Arc<str>,
58 pub targets: Arc<RwLock<HashMap<Box<str>, Arc<str>>>>,
61
62 #[allow(clippy::type_complexity)]
72 pub instance_links: Arc<RwLock<HashMap<Box<str>, HashMap<Box<str>, Box<str>>>>>,
73 pub messaging_links: Arc<RwLock<HashMap<Box<str>, async_nats::Client>>>,
75
76 pub invocation_timeout: Duration,
77 pub experimental_features: Features,
79 pub host_labels: Arc<RwLock<BTreeMap<String, String>>>,
81}
82
83impl Handler {
84 pub fn copy_for_new(&self) -> Self {
87 Handler {
88 nats: self.nats.clone(),
89 config_data: self.config_data.clone(),
90 secrets: self.secrets.clone(),
91 lattice: self.lattice.clone(),
92 component_id: self.component_id.clone(),
93 targets: Arc::default(),
94 instance_links: self.instance_links.clone(),
95 messaging_links: self.messaging_links.clone(),
96 invocation_timeout: self.invocation_timeout,
97 experimental_features: self.experimental_features,
98 host_labels: self.host_labels.clone(),
99 }
100 }
101}
102
103#[async_trait]
104impl Bus1_0_0 for Handler {
105 #[instrument(level = "debug", skip(self))]
110 async fn set_link_name(&self, link_name: String, interfaces: Vec<Arc<CallTargetInterface>>) {
111 let interfaces = interfaces.iter().map(Deref::deref);
112 let mut targets = self.targets.write().await;
113 if link_name == "default" {
114 for CallTargetInterface {
115 namespace,
116 package,
117 interface,
118 } in interfaces
119 {
120 targets.remove(&format!("{namespace}:{package}/{interface}").into_boxed_str());
121 }
122 } else {
123 let link_name = Arc::from(link_name);
124 for CallTargetInterface {
125 namespace,
126 package,
127 interface,
128 } in interfaces
129 {
130 targets.insert(
131 format!("{namespace}:{package}/{interface}").into_boxed_str(),
132 Arc::clone(&link_name),
133 );
134 }
135 }
136 }
137}
138
139#[async_trait]
140impl Bus for Handler {
141 #[instrument(level = "debug", skip(self))]
146 async fn set_link_name(
147 &self,
148 link_name: String,
149 interfaces: Vec<Arc<CallTargetInterface>>,
150 ) -> anyhow::Result<Result<(), String>> {
151 let links = self.instance_links.read().await;
152 if let Some(interface_missing_link) = interfaces.iter().find_map(|i| {
154 let instance = i.as_instance();
155 if links
157 .get(link_name.as_str())
158 .and_then(|l| l.get(instance.as_str()))
159 .is_none()
160 {
161 Some(instance)
162 } else {
163 None
164 }
165 }) {
166 return Ok(Err(format!(
167 "interface `{interface_missing_link}` does not have an existing link with name `{link_name}`"
168 )));
169 }
170 drop(links);
172
173 Bus1_0_0::set_link_name(self, link_name, interfaces).await;
174 Ok(Ok(()))
175 }
176}
177
178impl wrpc_transport::Invoke for Handler {
179 type Context = Option<ReplacedInstanceTarget>;
180 type Outgoing = <wrpc_transport_nats::Client as wrpc_transport::Invoke>::Outgoing;
181 type Incoming = <wrpc_transport_nats::Client as wrpc_transport::Invoke>::Incoming;
182
183 #[instrument(level = "debug", skip_all)]
184 async fn invoke<P>(
185 &self,
186 target_instance: Self::Context,
187 instance: &str,
188 func: &str,
189 params: Bytes,
190 paths: impl AsRef<[P]> + Send,
191 ) -> anyhow::Result<(Self::Outgoing, Self::Incoming)>
192 where
193 P: AsRef<[Option<usize>]> + Send + Sync,
194 {
195 let links = self.instance_links.read().await;
196 let targets = self.targets.read().await;
197
198 let target_instance = match target_instance {
199 Some(
200 ReplacedInstanceTarget::BlobstoreBlobstore
201 | ReplacedInstanceTarget::BlobstoreContainer,
202 ) => "wasi:blobstore/blobstore",
203 Some(ReplacedInstanceTarget::KeyvalueAtomics) => "wasi:keyvalue/atomics",
204 Some(ReplacedInstanceTarget::KeyvalueStore) => "wasi:keyvalue/store",
205 Some(ReplacedInstanceTarget::KeyvalueBatch) => "wasi:keyvalue/batch",
206 Some(ReplacedInstanceTarget::KeyvalueWatch) => "wasi:keyvalue/watcher",
207 Some(ReplacedInstanceTarget::HttpIncomingHandler) => "wasi:http/incoming-handler",
208 Some(ReplacedInstanceTarget::HttpOutgoingHandler) => "wasi:http/outgoing-handler",
209 None => instance.split_once('@').map_or(instance, |(l, _)| l),
210 };
211
212 let link_name = targets
213 .get(target_instance)
214 .map_or("default", AsRef::as_ref);
215
216 let instances = links
217 .get(link_name)
218 .with_context(|| {
219 warn!(
220 instance,
221 link_name,
222 ?target_instance,
223 ?self.component_id,
224 "no links with link name found for instance"
225 );
226 format!("link `{link_name}` not found for instance `{target_instance}`")
227 })
228 .map_err(Error::LinkNotFound)?;
229
230 let id = instances.get(target_instance).with_context(||{
232 warn!(
233 instance,
234 ?target_instance,
235 ?self.component_id,
236 "component is not linked to a lattice target for the given instance"
237 );
238 format!("failed to call `{func}` in instance `{instance}` (failed to find a configured link with name `{link_name}` from component `{id}`, please check your configuration)", id = self.component_id)
239 }).map_err(Error::LinkNotFound)?;
240
241 let mut headers = injector_to_headers(&TraceContextInjector::default_with_span());
242 headers.insert("source-id", &*self.component_id);
243 headers.insert("link-name", link_name);
244 let nats = wrpc_transport_nats::Client::new(
245 Arc::clone(&self.nats),
246 format!("{}.{id}", &self.lattice),
247 None,
248 )
249 .await
250 .map_err(Error::Handler)?;
251 let (tx, rx) = nats
252 .timeout(self.invocation_timeout)
253 .invoke(Some(headers), instance, func, params, paths)
254 .await
255 .map_err(Error::Handler)?;
256 Ok((tx, rx))
257 }
258}
259
260#[async_trait]
261impl Config for Handler {
262 #[instrument(level = "debug", skip_all)]
263 async fn get(
264 &self,
265 key: &str,
266 ) -> anyhow::Result<Result<Option<String>, capability::config::store::Error>> {
267 let lock = self.config_data.read().await;
268 let conf = lock.get_config().await;
269 let data = conf.get(key).cloned();
270 Ok(Ok(data))
271 }
272
273 #[instrument(level = "debug", skip_all)]
274 async fn get_all(
275 &self,
276 ) -> anyhow::Result<Result<Vec<(String, String)>, capability::config::store::Error>> {
277 Ok(Ok(self
278 .config_data
279 .read()
280 .await
281 .get_config()
282 .await
283 .clone()
284 .into_iter()
285 .collect()))
286 }
287}
288
289#[async_trait]
290impl Logging for Handler {
291 #[instrument(level = "trace", skip(self))]
292 async fn log(
293 &self,
294 level: logging::Level,
295 context: String,
296 message: String,
297 ) -> anyhow::Result<()> {
298 match level {
299 logging::Level::Trace => {
300 tracing::event!(
301 tracing::Level::TRACE,
302 component_id = ?self.component_id,
303 level = level.to_string(),
304 context,
305 "{message}"
306 );
307 }
308 logging::Level::Debug => {
309 tracing::event!(
310 tracing::Level::DEBUG,
311 component_id = ?self.component_id,
312 level = level.to_string(),
313 context,
314 "{message}"
315 );
316 }
317 logging::Level::Info => {
318 tracing::event!(
319 tracing::Level::INFO,
320 component_id = ?self.component_id,
321 level = level.to_string(),
322 context,
323 "{message}"
324 );
325 }
326 logging::Level::Warn => {
327 tracing::event!(
328 tracing::Level::WARN,
329 component_id = ?self.component_id,
330 level = level.to_string(),
331 context,
332 "{message}"
333 );
334 }
335 logging::Level::Error => {
336 tracing::event!(
337 tracing::Level::ERROR,
338 component_id = ?self.component_id,
339 level = level.to_string(),
340 context,
341 "{message}"
342 );
343 }
344 logging::Level::Critical => {
345 tracing::event!(
346 tracing::Level::ERROR,
347 component_id = ?self.component_id,
348 level = level.to_string(),
349 context,
350 "{message}"
351 );
352 }
353 };
354 Ok(())
355 }
356}
357
358#[async_trait]
359impl Secrets for Handler {
360 #[instrument(level = "debug", skip_all)]
361 async fn get(
362 &self,
363 key: &str,
364 ) -> anyhow::Result<Result<secrets::store::Secret, secrets::store::SecretsError>> {
365 if self.secrets.read().await.get(key).is_some() {
366 Ok(Ok(Arc::new(key.to_string())))
367 } else {
368 Ok(Err(secrets::store::SecretsError::NotFound))
369 }
370 }
371
372 async fn reveal(
373 &self,
374 secret: secrets::store::Secret,
375 ) -> anyhow::Result<secrets::store::SecretValue> {
376 let read_lock = self.secrets.read().await;
377 let Some(secret_val) = read_lock.get(secret.as_str()) else {
378 const ERROR_MSG: &str = "secret not found to reveal, ensure the secret is declared and associated with this component at startup";
381 error!(?secret, ERROR_MSG);
384 bail!(ERROR_MSG)
385 };
386 use secrecy::ExposeSecret;
387 Ok(secret_val.expose_secret().clone())
388 }
389}
390
391impl Messaging0_2 for Handler {
392 #[instrument(level = "debug", skip_all)]
393 async fn request(
394 &self,
395 subject: String,
396 body: Vec<u8>,
397 timeout_ms: u32,
398 ) -> anyhow::Result<Result<messaging0_2_0::types::BrokerMessage, String>> {
399 use wasmcloud_runtime::capability::wrpc::wasmcloud::messaging0_2_0 as messaging;
400
401 {
402 let targets = self.targets.read().await;
403 let target = targets
404 .get("wasmcloud:messaging/consumer")
405 .map(AsRef::as_ref)
406 .unwrap_or("default");
407 if let Some(nats) = self.messaging_links.read().await.get(target) {
408 match nats.request(subject, body.into()).await {
409 Ok(async_nats::Message {
410 subject,
411 payload,
412 reply,
413 ..
414 }) => {
415 return Ok(Ok(messaging0_2_0::types::BrokerMessage {
416 subject: subject.into_string(),
417 body: payload.into(),
418 reply_to: reply.map(async_nats::Subject::into_string),
419 }))
420 }
421 Err(err) => return Ok(Err(err.to_string())),
422 }
423 }
424 }
425
426 match messaging::consumer::request(self, None, &subject, &Bytes::from(body), timeout_ms)
427 .await?
428 {
429 Ok(messaging::types::BrokerMessage {
430 subject,
431 body,
432 reply_to,
433 }) => Ok(Ok(messaging0_2_0::types::BrokerMessage {
434 subject,
435 body: body.into(),
436 reply_to,
437 })),
438 Err(err) => Ok(Err(err)),
439 }
440 }
441
442 #[instrument(level = "debug", skip_all)]
443 async fn publish(
444 &self,
445 messaging0_2_0::types::BrokerMessage {
446 subject,
447 body,
448 reply_to,
449 }: messaging0_2_0::types::BrokerMessage,
450 ) -> anyhow::Result<Result<(), String>> {
451 use wasmcloud_runtime::capability::wrpc::wasmcloud::messaging0_2_0 as messaging;
452
453 {
454 let targets = self.targets.read().await;
455 let target = targets
456 .get("wasmcloud:messaging/consumer")
457 .map(AsRef::as_ref)
458 .unwrap_or("default");
459 if let Some(nats) = self.messaging_links.read().await.get(target) {
460 if let Some(reply_to) = reply_to {
461 match nats
462 .publish_with_reply(subject, reply_to, body.into())
463 .await
464 {
465 Ok(()) => return Ok(Ok(())),
466 Err(err) => return Ok(Err(err.to_string())),
467 }
468 }
469 match nats.publish(subject, body.into()).await {
470 Ok(()) => return Ok(Ok(())),
471 Err(err) => return Ok(Err(err.to_string())),
472 }
473 }
474 }
475
476 messaging::consumer::publish(
477 self,
478 None,
479 &messaging::types::BrokerMessage {
480 subject,
481 body: body.into(),
482 reply_to,
483 },
484 )
485 .await
486 }
487}
488
489struct MessagingClient {
490 name: Box<str>,
491}
492
493#[async_trait]
494impl MessagingClient0_3 for MessagingClient {
495 async fn disconnect(&mut self) -> anyhow::Result<Result<(), messaging0_3_0::types::Error>> {
496 Ok(Ok(()))
497 }
498
499 fn as_any(&self) -> &dyn Any {
500 self
501 }
502}
503
504enum Message {
507 Nats(async_nats::Message),
508}
509
510#[async_trait]
511impl MessagingHostMessage0_3 for Message {
512 async fn topic(&self) -> anyhow::Result<Option<messaging0_3_0::types::Topic>> {
513 match self {
514 Message::Nats(async_nats::Message { subject, .. }) => Ok(Some(subject.to_string())),
515 }
516 }
517 async fn content_type(&self) -> anyhow::Result<Option<String>> {
518 Ok(None)
519 }
520 async fn set_content_type(&mut self, _content_type: String) -> anyhow::Result<()> {
521 bail!("`content-type` not supported")
522 }
523 async fn data(&self) -> anyhow::Result<Vec<u8>> {
524 match self {
525 Message::Nats(async_nats::Message { payload, .. }) => Ok(payload.to_vec()),
526 }
527 }
528 async fn set_data(&mut self, buf: Vec<u8>) -> anyhow::Result<()> {
529 match self {
530 Message::Nats(msg) => {
531 msg.payload = buf.into();
532 }
533 }
534 Ok(())
535 }
536 async fn metadata(&self) -> anyhow::Result<Option<messaging0_3_0::types::Metadata>> {
537 match self {
538 Message::Nats(async_nats::Message { headers: None, .. }) => Ok(None),
539 Message::Nats(async_nats::Message {
540 headers: Some(headers),
541 ..
542 }) => Ok(Some(headers.iter().fold(
543 Vec::default(),
548 |mut headers, (k, vs)| {
549 for v in vs {
550 headers.push((k.to_string(), v.to_string()))
551 }
552 headers
553 },
554 ))),
555 }
556 }
557 async fn add_metadata(&mut self, key: String, value: String) -> anyhow::Result<()> {
558 match self {
559 Message::Nats(async_nats::Message {
560 headers: Some(headers),
561 ..
562 }) => {
563 headers.append(key, value);
564 Ok(())
565 }
566 Message::Nats(async_nats::Message { headers, .. }) => {
567 *headers = Some(async_nats::HeaderMap::from_iter([(
568 key.into_header_name(),
569 value.into_header_value(),
570 )]));
571 Ok(())
572 }
573 }
574 }
575 async fn set_metadata(&mut self, meta: messaging0_3_0::types::Metadata) -> anyhow::Result<()> {
576 match self {
577 Message::Nats(async_nats::Message { headers, .. }) => {
578 *headers = Some(
579 meta.into_iter()
580 .map(|(k, v)| (k.into_header_name(), v.into_header_value()))
581 .collect(),
582 );
583 Ok(())
584 }
585 }
586 }
587 async fn remove_metadata(&mut self, key: String) -> anyhow::Result<()> {
588 match self {
589 Message::Nats(async_nats::Message {
590 headers: Some(headers),
591 ..
592 }) => {
593 *headers = headers
594 .iter()
595 .filter(|(k, ..)| <&async_nats::HeaderName as AsRef<str>>::as_ref(k) != key)
598 .flat_map(|(k, vs)| zip(repeat(k.clone()), vs.iter().cloned()))
599 .collect();
600 Ok(())
601 }
602 Message::Nats(..) => Ok(()),
603 }
604 }
605
606 fn as_any(&self) -> &dyn Any {
607 self
608 }
609
610 fn into_any(self: Box<Self>) -> Box<dyn Any> {
611 self
612 }
613}
614
615impl Messaging0_3 for Handler {
616 #[instrument(level = "debug", skip_all)]
617 async fn connect(
618 &self,
619 name: String,
620 ) -> anyhow::Result<
621 Result<Box<dyn MessagingClient0_3 + Send + Sync>, messaging0_3_0::types::Error>,
622 > {
623 Ok(Ok(Box::new(MessagingClient {
624 name: name.into_boxed_str(),
625 })))
626 }
627
628 #[instrument(level = "debug", skip_all)]
629 async fn send(
630 &self,
631 client: &(dyn MessagingClient0_3 + Send + Sync),
632 topic: messaging0_3_0::types::Topic,
633 message: messaging0_3_0::types::Message,
634 ) -> anyhow::Result<Result<(), messaging0_3_0::types::Error>> {
635 use wasmcloud_runtime::capability::wrpc::wasmcloud::messaging0_2_0 as messaging;
636
637 let MessagingClient { name } = client
638 .as_any()
639 .downcast_ref()
640 .context("unknown client type")?;
641 {
642 let targets = self.targets.read().await;
643 let target = targets
644 .get("wasmcloud:messaging/producer")
645 .map(AsRef::as_ref)
646 .unwrap_or("default");
647 let name = if name.is_empty() {
648 "default"
649 } else {
650 name.as_ref()
651 };
652 if name != target {
653 return Ok(Err(messaging0_3_0::types::Error::Other(format!(
654 "mismatch between link name and client connection name, `{name}` != `{target}`"
655 ))));
656 }
657 if let Some(nats) = self.messaging_links.read().await.get(target) {
658 match match message {
659 messaging0_3_0::types::Message::Host(message) => {
660 let message = message
661 .into_any()
662 .downcast::<Message>()
663 .map_err(|_| anyhow!("unknown message type"))?;
664 match *message {
665 Message::Nats(async_nats::Message {
666 payload,
667 headers: Some(headers),
668 ..
669 }) => nats.publish_with_headers(topic, headers, payload).await,
670 Message::Nats(async_nats::Message { payload, .. }) => {
671 nats.publish(topic, payload).await
672 }
673 }
674 }
675 messaging0_3_0::types::Message::Wrpc(messaging::types::BrokerMessage {
676 body,
677 ..
678 }) => nats.publish(topic, body).await,
679 messaging0_3_0::types::Message::Guest(MessagingGuestMessage0_3 {
680 content_type,
681 data,
682 metadata,
683 }) => {
684 if let Some(content_type) = content_type {
685 warn!(
686 content_type,
687 "`content-type` not supported by NATS.io, value is ignored"
688 );
689 }
690 if let Some(metadata) = metadata {
691 nats.publish_with_headers(
692 topic,
693 metadata
694 .into_iter()
695 .map(|(k, v)| (k.into_header_name(), v.into_header_value()))
696 .collect(),
697 data.into(),
698 )
699 .await
700 } else {
701 nats.publish(topic, data.into()).await
702 }
703 }
704 } {
705 Ok(()) => return Ok(Ok(())),
706 Err(err) => {
707 return Ok(Err(messaging0_3_0::types::Error::Other(err.to_string())));
709 }
710 }
711 }
712 let body = match message {
713 messaging0_3_0::types::Message::Host(message) => {
714 let message = message
715 .into_any()
716 .downcast::<Message>()
717 .map_err(|_| anyhow!("unknown message type"))?;
718 match *message {
719 Message::Nats(async_nats::Message {
720 headers: Some(..), ..
721 }) => {
722 return Ok(Err(messaging0_3_0::types::Error::Other(
723 "headers not currently supported by wRPC targets".into(),
724 )));
725 }
726 Message::Nats(async_nats::Message { payload, .. }) => payload,
727 }
728 }
729 messaging0_3_0::types::Message::Wrpc(messaging::types::BrokerMessage {
730 body,
731 ..
732 }) => body,
733 messaging0_3_0::types::Message::Guest(MessagingGuestMessage0_3 {
734 metadata: Some(..),
735 ..
736 }) => {
737 return Ok(Err(messaging0_3_0::types::Error::Other(
738 "`metadata` not currently supported by wRPC targets".into(),
739 )));
740 }
741 messaging0_3_0::types::Message::Guest(MessagingGuestMessage0_3 {
742 content_type,
743 data,
744 ..
745 }) => {
746 if let Some(content_type) = content_type {
747 warn!(
748 content_type,
749 "`content-type` not currently supported by wRPC targets, value is ignored",
750 );
751 }
752 data.into()
753 }
754 };
755 match messaging::consumer::publish(
756 self,
757 None,
758 &messaging::types::BrokerMessage {
759 subject: topic,
760 body,
761 reply_to: None,
762 },
763 )
764 .await
765 {
766 Ok(Ok(())) => Ok(Ok(())),
767 Ok(Err(err)) => Ok(Err(messaging0_3_0::types::Error::Other(err))),
768 Err(err) => Ok(Err(messaging0_3_0::types::Error::Other(err.to_string()))),
770 }
771 }
772 }
773
774 #[instrument(level = "debug", skip_all)]
775 async fn request(
776 &self,
777 client: &(dyn MessagingClient0_3 + Send + Sync),
778 topic: messaging0_3_0::types::Topic,
779 message: &messaging0_3_0::types::Message,
780 options: Option<messaging0_3_0::request_reply::RequestOptions>,
781 ) -> anyhow::Result<
782 Result<Vec<Box<dyn MessagingHostMessage0_3 + Send + Sync>>, messaging0_3_0::types::Error>,
783 > {
784 if options.is_some() {
785 return Ok(Err(messaging0_3_0::types::Error::Other(
786 "`options` not currently supported".into(),
787 )));
788 }
789
790 use wasmcloud_runtime::capability::wrpc::wasmcloud::messaging0_2_0 as messaging;
791
792 let MessagingClient { name } = client
793 .as_any()
794 .downcast_ref()
795 .context("unknown client type")?;
796 {
797 let targets = self.targets.read().await;
798 let target = targets
799 .get("wasmcloud:messaging/request-reply")
800 .map(AsRef::as_ref)
801 .unwrap_or("default");
802 let name = if name.is_empty() {
803 "default"
804 } else {
805 name.as_ref()
806 };
807 if name != target {
808 return Ok(Err(messaging0_3_0::types::Error::Other(format!(
809 "mismatch between link name and client connection name, `{name}` != `{target}`"
810 ))));
811 }
812 if let Some(nats) = self.messaging_links.read().await.get(target) {
813 match match message {
814 messaging0_3_0::types::Message::Host(message) => {
815 let message = message
816 .as_any()
817 .downcast_ref::<Message>()
818 .context("unknown message type")?;
819 match message {
820 Message::Nats(async_nats::Message {
821 payload,
822 headers: Some(headers),
823 ..
824 }) => {
825 nats.request_with_headers(topic, headers.clone(), payload.clone())
826 .await
827 }
828 Message::Nats(async_nats::Message { payload, .. }) => {
829 nats.request(topic, payload.clone()).await
830 }
831 }
832 }
833 messaging0_3_0::types::Message::Wrpc(messaging::types::BrokerMessage {
834 body,
835 ..
836 }) => nats.request(topic, body.clone()).await,
837 messaging0_3_0::types::Message::Guest(MessagingGuestMessage0_3 {
838 content_type,
839 data,
840 metadata,
841 }) => {
842 if let Some(content_type) = content_type {
843 warn!(
844 content_type,
845 "`content-type` not supported by NATS.io, value is ignored"
846 );
847 }
848 if let Some(metadata) = metadata {
849 nats.request_with_headers(
850 topic,
851 metadata
852 .iter()
853 .map(|(k, v)| {
854 (
855 k.as_str().into_header_name(),
856 v.as_str().into_header_value(),
857 )
858 })
859 .collect(),
860 Bytes::copy_from_slice(data),
861 )
862 .await
863 } else {
864 nats.request(topic, Bytes::copy_from_slice(data)).await
865 }
866 }
867 } {
868 Ok(msg) => return Ok(Ok(vec![Box::new(Message::Nats(msg))])),
869 Err(err) => {
870 return Ok(Err(messaging0_3_0::types::Error::Other(err.to_string())));
872 }
873 }
874 }
875 let body = match message {
876 messaging0_3_0::types::Message::Host(message) => {
877 let message = message
878 .as_any()
879 .downcast_ref::<Message>()
880 .context("unknown message type")?;
881 match message {
882 Message::Nats(async_nats::Message {
883 headers: Some(..), ..
884 }) => {
885 return Ok(Err(messaging0_3_0::types::Error::Other(
886 "headers not currently supported by wRPC targets".into(),
887 )));
888 }
889 Message::Nats(async_nats::Message { payload, .. }) => payload.clone(),
890 }
891 }
892 messaging0_3_0::types::Message::Wrpc(messaging::types::BrokerMessage {
893 body,
894 ..
895 }) => body.clone(),
896 messaging0_3_0::types::Message::Guest(MessagingGuestMessage0_3 {
897 metadata: Some(..),
898 ..
899 }) => {
900 return Ok(Err(messaging0_3_0::types::Error::Other(
901 "`metadata` not currently supported by wRPC targets".into(),
902 )));
903 }
904 messaging0_3_0::types::Message::Guest(MessagingGuestMessage0_3 {
905 content_type,
906 data,
907 ..
908 }) => {
909 if let Some(content_type) = content_type {
910 warn!(
911 content_type,
912 "`content-type` not currently supported by wRPC targets, value is ignored",
913 );
914 }
915 Bytes::copy_from_slice(data)
916 }
917 };
918
919 match messaging::consumer::publish(
920 self,
921 None,
922 &messaging::types::BrokerMessage {
923 subject: topic,
924 body,
925 reply_to: None,
926 },
927 )
928 .await
929 {
930 Ok(Ok(())) => Ok(Err(messaging0_3_0::types::Error::Other(
931 "message sent, but returning responses is not currently supported by wRPC targets".into(),
932 ))),
933 Ok(Err(err)) => Ok(Err(messaging0_3_0::types::Error::Other(err))),
934 Err(err) => Ok(Err(messaging0_3_0::types::Error::Other(err.to_string()))),
936 }
937 }
938 }
939
940 #[instrument(level = "debug", skip_all)]
941 async fn reply(
942 &self,
943 reply_to: &messaging0_3_0::types::Message,
944 message: messaging0_3_0::types::Message,
945 ) -> anyhow::Result<Result<(), messaging0_3_0::types::Error>> {
946 use wasmcloud_runtime::capability::wrpc::wasmcloud::messaging0_2_0 as messaging;
947
948 {
949 let targets = self.targets.read().await;
950 let target = targets
951 .get("wasmcloud:messaging/request-reply")
952 .map(AsRef::as_ref)
953 .unwrap_or("default");
954 if let Some(nats) = self.messaging_links.read().await.get(target) {
955 let subject = match reply_to {
956 messaging0_3_0::types::Message::Host(reply_to) => {
957 match reply_to
958 .as_any()
959 .downcast_ref::<Message>()
960 .context("unknown message type")?
961 {
962 Message::Nats(async_nats::Message {
963 reply: Some(reply), ..
964 }) => reply.clone(),
965 Message::Nats(async_nats::Message { reply: None, .. }) => {
966 return Ok(Err(messaging0_3_0::types::Error::Other(
967 "reply not set in incoming NATS.io message".into(),
968 )))
969 }
970 }
971 }
972 messaging0_3_0::types::Message::Wrpc(messaging::types::BrokerMessage {
973 reply_to: Some(reply_to),
974 ..
975 }) => reply_to.as_str().into(),
976 messaging0_3_0::types::Message::Wrpc(messaging::types::BrokerMessage {
977 reply_to: None,
978 ..
979 }) => {
980 return Ok(Err(messaging0_3_0::types::Error::Other(
981 "reply not set in incoming wRPC message".into(),
982 )))
983 }
984 messaging0_3_0::types::Message::Guest(..) => {
985 return Ok(Err(messaging0_3_0::types::Error::Other(
986 "cannot reply to guest message".into(),
987 )))
988 }
989 };
990 match match message {
991 messaging0_3_0::types::Message::Host(message) => {
992 let message = message
993 .into_any()
994 .downcast::<Message>()
995 .map_err(|_| anyhow!("unknown message type"))?;
996 match *message {
997 Message::Nats(async_nats::Message {
998 payload,
999 headers: Some(headers),
1000 ..
1001 }) => nats.publish_with_headers(subject, headers, payload).await,
1002 Message::Nats(async_nats::Message { payload, .. }) => {
1003 nats.publish(subject, payload).await
1004 }
1005 }
1006 }
1007 messaging0_3_0::types::Message::Wrpc(messaging::types::BrokerMessage {
1008 body,
1009 ..
1010 }) => nats.publish(subject, body).await,
1011 messaging0_3_0::types::Message::Guest(MessagingGuestMessage0_3 {
1012 content_type,
1013 data,
1014 metadata,
1015 }) => {
1016 if let Some(content_type) = content_type {
1017 warn!(
1018 content_type,
1019 "`content-type` not supported by NATS.io, value is ignored"
1020 );
1021 }
1022 if let Some(metadata) = metadata {
1023 nats.publish_with_headers(
1024 subject,
1025 metadata
1026 .into_iter()
1027 .map(|(k, v)| (k.into_header_name(), v.into_header_value()))
1028 .collect(),
1029 data.into(),
1030 )
1031 .await
1032 } else {
1033 nats.publish(subject, data.into()).await
1034 }
1035 }
1036 } {
1037 Ok(()) => return Ok(Ok(())),
1038 Err(err) => {
1039 return Ok(Err(messaging0_3_0::types::Error::Other(err.to_string())));
1041 }
1042 }
1043 }
1044 let body = match message {
1045 messaging0_3_0::types::Message::Host(message) => {
1046 let message = message
1047 .into_any()
1048 .downcast::<Message>()
1049 .map_err(|_| anyhow!("unknown message type"))?;
1050 match *message {
1051 Message::Nats(async_nats::Message {
1052 headers: Some(..), ..
1053 }) => {
1054 return Ok(Err(messaging0_3_0::types::Error::Other(
1055 "headers not currently supported by wRPC targets".into(),
1056 )));
1057 }
1058 Message::Nats(async_nats::Message { payload, .. }) => payload,
1059 }
1060 }
1061 messaging0_3_0::types::Message::Wrpc(messaging::types::BrokerMessage {
1062 body,
1063 ..
1064 }) => body,
1065 messaging0_3_0::types::Message::Guest(MessagingGuestMessage0_3 {
1066 metadata: Some(..),
1067 ..
1068 }) => {
1069 return Ok(Err(messaging0_3_0::types::Error::Other(
1070 "`metadata` not currently supported by wRPC targets".into(),
1071 )));
1072 }
1073 messaging0_3_0::types::Message::Guest(MessagingGuestMessage0_3 {
1074 content_type,
1075 data,
1076 ..
1077 }) => {
1078 if let Some(content_type) = content_type {
1079 warn!(
1080 content_type,
1081 "`content-type` not currently supported by wRPC targets, value is ignored",
1082 );
1083 }
1084 data.into()
1085 }
1086 };
1087 let subject = match reply_to {
1088 messaging0_3_0::types::Message::Host(reply_to) => {
1089 match reply_to
1090 .as_any()
1091 .downcast_ref::<Message>()
1092 .context("unknown message type")?
1093 {
1094 Message::Nats(async_nats::Message {
1095 reply: Some(reply), ..
1096 }) => reply.to_string(),
1097 Message::Nats(async_nats::Message { reply: None, .. }) => {
1098 return Ok(Err(messaging0_3_0::types::Error::Other(
1099 "reply not set in incoming NATS.io message".into(),
1100 )))
1101 }
1102 }
1103 }
1104 messaging0_3_0::types::Message::Wrpc(messaging::types::BrokerMessage {
1105 reply_to: Some(reply_to),
1106 ..
1107 }) => reply_to.clone(),
1108 messaging0_3_0::types::Message::Wrpc(messaging::types::BrokerMessage {
1109 reply_to: None,
1110 ..
1111 }) => {
1112 return Ok(Err(messaging0_3_0::types::Error::Other(
1113 "reply not set in incoming wRPC message".into(),
1114 )))
1115 }
1116 messaging0_3_0::types::Message::Guest(..) => {
1117 return Ok(Err(messaging0_3_0::types::Error::Other(
1118 "cannot reply to guest message".into(),
1119 )))
1120 }
1121 };
1122 match messaging::consumer::publish(
1123 self,
1124 None,
1125 &messaging::types::BrokerMessage {
1126 subject,
1127 body,
1128 reply_to: None,
1129 },
1130 )
1131 .await
1132 {
1133 Ok(Ok(())) => Ok(Ok(())),
1134 Ok(Err(err)) => Ok(Err(messaging0_3_0::types::Error::Other(err))),
1135 Err(err) => Ok(Err(messaging0_3_0::types::Error::Other(err.to_string()))),
1137 }
1138 }
1139 }
1140}
1141
1142#[async_trait]
1143impl Identity for Handler {
1144 #[cfg(unix)]
1145 #[instrument(level = "debug", skip_all)]
1146 async fn get(
1147 &self,
1148 audience: &str,
1149 ) -> anyhow::Result<Result<Option<String>, identity::store::Error>> {
1150 let mut client = match DelegatedIdentityClient::default().await {
1151 Ok(client) => client,
1152 Err(err) => {
1153 return Ok(Err(identity::store::Error::Io(format!(
1154 "Unable to connect to workload identity service: {err}"
1155 ))));
1156 }
1157 };
1158
1159 let mut selectors =
1160 parse_selectors_from_host_labels(self.host_labels.read().await.deref()).await;
1161 selectors.push(Selector::Generic((
1163 WASMCLOUD_SELECTOR_TYPE.to_string(),
1164 format!("{}:{}", WASMCLOUD_SELECTOR_COMPONENT, self.component_id),
1165 )));
1166
1167 let svids = match client
1168 .fetch_jwt_svids(&[audience], Selectors(selectors))
1169 .await
1170 {
1171 Ok(svids) => svids,
1172 Err(err) => {
1173 return Ok(Err(identity::store::Error::Io(format!(
1174 "Unable to query workload identity service: {err}"
1175 ))));
1176 }
1177 };
1178
1179 if !svids.is_empty() {
1180 let svid = svids.first().map(|svid| svid.token()).unwrap_or_default();
1182 Ok(Ok(Some(svid.to_string())))
1183 } else {
1184 Ok(Err(identity::store::Error::NotFound))
1185 }
1186 }
1187
1188 #[cfg(target_family = "windows")]
1189 #[instrument(level = "debug", skip_all)]
1190 async fn get(
1191 &self,
1192 _audience: &str,
1193 ) -> anyhow::Result<Result<Option<String>, identity::store::Error>> {
1194 Ok(Err(identity::store::Error::Other(
1195 "workload identity is not supported on Windows".to_string(),
1196 )))
1197 }
1198}
1199
1200impl InvocationErrorIntrospect for Handler {
1201 fn invocation_error_kind(&self, err: &anyhow::Error) -> InvocationErrorKind {
1202 if let Some(err) = err.root_cause().downcast_ref::<std::io::Error>() {
1203 if err.kind() == std::io::ErrorKind::NotConnected {
1204 return InvocationErrorKind::NotFound;
1205 }
1206 }
1207 InvocationErrorKind::Trap
1208 }
1209}
1210
1211#[cfg(unix)]
1218async fn parse_selectors_from_host_labels(host_labels: &BTreeMap<String, String>) -> Vec<Selector> {
1219 let mut selectors = vec![];
1220
1221 for (key, value) in host_labels.iter() {
1222 if key.starts_with("wasmcloud__") && !key.ends_with("__") {
1224 let selector = key
1225 .replace("__", ":")
1227 .split_once(":")
1229 .map(|(_, selector)| format!("{selector}:{value}"))
1231 .unwrap_or("unknown".to_string());
1233
1234 selectors.push(Selector::Generic((
1235 WASMCLOUD_SELECTOR_TYPE.to_string(),
1236 selector,
1237 )));
1238 }
1239 }
1240
1241 selectors
1242}
1243
1244#[cfg(unix)]
1245#[cfg(test)]
1246mod tests {
1247 use super::*;
1248 use std::env::consts::{ARCH, FAMILY, OS};
1249
1250 #[tokio::test]
1251 async fn test_parse_selectors_from_host_labels() {
1252 let labels = BTreeMap::from([
1253 ("hostcore.arch".into(), ARCH.into()),
1254 ("hostcore.os".into(), OS.into()),
1255 ("hostcore.osfamily".into(), FAMILY.into()),
1256 ("wasmcloud__lattice".into(), "default".into()),
1257 ]);
1258
1259 let selectors = parse_selectors_from_host_labels(&labels).await;
1260
1261 assert_eq!(selectors.len(), 1);
1262
1263 let (selector_type, selector_value) = match selectors.first() {
1264 Some(Selector::Generic(pair)) => pair,
1265 _ => &("wrong-value".into(), "wrong-value".into()),
1266 };
1267 assert_eq!(selector_type, WASMCLOUD_SELECTOR_TYPE);
1268 assert_eq!(selector_value, "lattice:default");
1269 }
1270
1271 #[tokio::test]
1272 async fn test_parse_selectors_from_host_labels_defaults_to_no_selectors() {
1273 let no_labels = BTreeMap::new();
1274 let selectors = parse_selectors_from_host_labels(&no_labels).await;
1275 assert_eq!(selectors.len(), 0);
1276 }
1277}