wasmcloud_runtime/component/
bus.rs

1use core::fmt;
2
3use std::sync::Arc;
4
5use anyhow::Context as _;
6use async_trait::async_trait;
7use tracing::instrument;
8use wasmcloud_core::CallTargetInterface;
9use wasmtime::component::Resource;
10use wrpc_runtime_wasmtime::rpc;
11
12use crate::capability::bus::{error, lattice};
13
14use super::{Ctx, Handler, TableResult};
15
16/// Wasmcloud error type
17pub enum Error {
18    /// Link was not found for target
19    LinkNotFound(anyhow::Error),
20
21    /// wRPC transport error
22    Transport(rpc::Error),
23
24    /// Handler error
25    Handler(anyhow::Error),
26}
27
28impl std::error::Error for Error {}
29
30impl fmt::Debug for Error {
31    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32        match self {
33            Error::LinkNotFound(error) | Error::Handler(error) => error.fmt(f),
34            Error::Transport(error) => error.fmt(f),
35        }
36    }
37}
38
39impl fmt::Display for Error {
40    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41        match self {
42            Error::LinkNotFound(error) | Error::Handler(error) => error.fmt(f),
43            Error::Transport(error) => error.fmt(f),
44        }
45    }
46}
47
48#[async_trait]
49/// `wasmcloud:bus/lattice@2.0.1` implementation
50pub trait Bus {
51    /// Set the link name to use for a given list of interfaces, returning an error
52    /// if a link doesn't exist on the given interfaces for the given target
53    async fn set_link_name(
54        &self,
55        link_name: String,
56        interfaces: Vec<Arc<CallTargetInterface>>,
57    ) -> anyhow::Result<Result<(), String>>;
58}
59
60impl<H: Handler> lattice::Host for Ctx<H> {
61    #[instrument(level = "debug", skip_all)]
62    async fn set_link_name(
63        &mut self,
64        link_name: String,
65        interfaces: Vec<Resource<Arc<CallTargetInterface>>>,
66    ) -> anyhow::Result<Result<(), String>> {
67        self.attach_parent_context();
68        let interfaces = interfaces
69            .into_iter()
70            .map(|interface| self.table.get(&interface).cloned())
71            .collect::<TableResult<_>>()
72            .context("failed to convert call target interfaces")?;
73        self.handler
74            .set_link_name(link_name, interfaces)
75            .await
76            .context("failed to set link name")
77    }
78}
79
80impl<H: Handler> lattice::HostCallTargetInterface for Ctx<H> {
81    #[instrument(level = "debug", skip_all)]
82    async fn new(
83        &mut self,
84        namespace: String,
85        package: String,
86        interface: String,
87    ) -> anyhow::Result<Resource<Arc<CallTargetInterface>>> {
88        self.attach_parent_context();
89        self.table
90            .push(Arc::new(CallTargetInterface {
91                namespace,
92                package,
93                interface,
94            }))
95            .context("failed to push target interface")
96    }
97
98    async fn drop(&mut self, interface: Resource<Arc<CallTargetInterface>>) -> anyhow::Result<()> {
99        self.table.delete(interface)?;
100        Ok(())
101    }
102}
103
104impl<H: Handler> error::Host for Ctx<H> {}
105
106impl<H: Handler> error::HostError for Ctx<H> {
107    async fn from_rpc_error(
108        &mut self,
109        error: Resource<rpc::Error>,
110    ) -> wasmtime::Result<Resource<Error>> {
111        let error = self
112            .table
113            .delete(error)
114            .context("failed to delete `wrpc:rpc/error.error` from table")?;
115        let error = match error {
116            rpc::Error::Invoke(error) => match error.downcast() {
117                Ok(error) => error,
118                Err(error) => Error::Transport(rpc::Error::Invoke(error)),
119            },
120            rpc::Error::IncomingIndex(error) => match error.downcast() {
121                Ok(error) => error,
122                Err(error) => Error::Transport(rpc::Error::IncomingIndex(error)),
123            },
124            rpc::Error::OutgoingIndex(error) => match error.downcast() {
125                Ok(error) => error,
126                Err(error) => Error::Transport(rpc::Error::OutgoingIndex(error)),
127            },
128            rpc::Error::Stream(error) => Error::Transport(rpc::Error::Stream(error)),
129        };
130        let error = self
131            .table
132            .push(error)
133            .context("failed to push error to table")?;
134        Ok(error)
135    }
136
137    async fn from_io_error(
138        &mut self,
139        error: Resource<wasmtime_wasi::p2::bindings::io::error::Error>,
140    ) -> wasmtime::Result<
141        Result<Resource<Error>, Resource<wasmtime_wasi::p2::bindings::io::error::Error>>,
142    > {
143        let error = self
144            .table
145            .delete(error)
146            .context("failed to delete `wasi:io/error.error` from table")?;
147        match error.downcast() {
148            Ok(error) => {
149                let error = self
150                    .table
151                    .push(Error::Transport(rpc::Error::Stream(error)))
152                    .context("failed to push error to table")?;
153                Ok(Ok(error))
154            }
155            Err(error) => {
156                let error = self
157                    .table
158                    .push(error)
159                    .context("failed to push `wasi:io/error.error` to table")?;
160                Ok(Err(error))
161            }
162        }
163    }
164
165    async fn to_debug_string(&mut self, error: Resource<Error>) -> wasmtime::Result<String> {
166        let error = self
167            .table
168            .get(&error)
169            .context("failed to get error from table")?;
170        Ok(error.to_string())
171    }
172
173    async fn drop(&mut self, error: Resource<Error>) -> wasmtime::Result<()> {
174        self.table
175            .delete(error)
176            .context("failed to delete error from table")?;
177        Ok(())
178    }
179}