1use core::any::Any;
2use core::future::Future;
3
4use anyhow::{bail, Context as _};
5use async_trait::async_trait;
6use tracing::{info_span, instrument, Instrument as _};
7use tracing_opentelemetry::OpenTelemetrySpanExt as _;
8use wasmtime::component::Resource;
9use wasmtime::Store;
10
11use crate::capability::messaging0_3_0::types::{Error, Metadata, Topic};
12use crate::capability::messaging0_3_0::{producer, request_reply, types};
13use crate::capability::wrpc;
14use crate::component::{Ctx, Handler};
15
16pub mod bindings {
17 wasmtime::component::bindgen!({
18 world: "messaging-handler",
19 imports: { default: async | trappable | tracing },
20 exports: { default: async | trappable | tracing },
21 with: {
22 "wasmcloud:messaging/types": crate::capability::messaging0_3_0::types,
23 },
24 });
25}
26
27#[instrument(level = "debug", skip_all)]
28pub(crate) async fn handle_message<H>(
29 pre: bindings::MessagingHandlerPre<Ctx<H>>,
30 mut store: &mut Store<Ctx<H>>,
31 msg: wrpc::wasmcloud::messaging0_2_0::types::BrokerMessage,
32) -> anyhow::Result<Result<(), String>>
33where
34 H: Handler,
35{
36 let call_handle_message = info_span!("call_handle_message");
37 store.data_mut().parent_context = Some(call_handle_message.context());
38 let bindings = pre.instantiate_async(&mut store).await?;
39 let msg = store
40 .data_mut()
41 .table
42 .push(Message::Wrpc(msg))
43 .context("failed to push message to table")?;
44 bindings
45 .wasmcloud_messaging0_3_0_incoming_handler()
46 .call_handle(&mut store, msg)
47 .instrument(call_handle_message)
48 .await
49 .context("failed to call `wasmcloud:messaging/incoming-handler@0.3.0#handle`")
50 .map(|err| err.map_err(|err| err.to_string()))
51}
52
53#[derive(Debug, Default)]
55pub struct RequestOptions {
56 pub timeout_ms: Option<u32>,
59
60 pub expected_replies: Option<u32>,
62}
63
64#[async_trait]
65pub trait HostMessage {
67 async fn topic(&self) -> wasmtime::Result<Option<Topic>>;
69 async fn content_type(&self) -> wasmtime::Result<Option<String>>;
72 async fn set_content_type(&mut self, content_type: String) -> wasmtime::Result<()>;
75 async fn data(&self) -> wasmtime::Result<Vec<u8>>;
77 async fn set_data(&mut self, buf: Vec<u8>) -> wasmtime::Result<()>;
79 async fn metadata(&self) -> wasmtime::Result<Option<Metadata>>;
83 async fn add_metadata(&mut self, key: String, value: String) -> wasmtime::Result<()>;
85 async fn set_metadata(&mut self, meta: Metadata) -> wasmtime::Result<()>;
87 async fn remove_metadata(&mut self, key: String) -> wasmtime::Result<()>;
89
90 fn as_any(&self) -> &dyn Any;
92
93 fn into_any(self: Box<Self>) -> Box<dyn Any>;
95}
96
97#[async_trait]
98pub trait Client {
100 async fn disconnect(&mut self) -> wasmtime::Result<Result<(), Error>>;
102
103 fn as_any(&self) -> &dyn Any;
105}
106
107pub trait Messaging {
109 fn connect(
111 &self,
112 name: String,
113 ) -> impl Future<Output = wasmtime::Result<Result<Box<dyn Client + Send + Sync>, Error>>> + Send;
114
115 fn send(
117 &self,
118 client: &(dyn Client + Send + Sync),
119 topic: Topic,
120 message: Message,
121 ) -> impl Future<Output = wasmtime::Result<Result<(), Error>>> + Send;
122
123 fn request(
136 &self,
137 client: &(dyn Client + Send + Sync),
138 topic: Topic,
139 message: &Message,
140 options: Option<RequestOptions>,
141 ) -> impl Future<Output = wasmtime::Result<Result<Vec<Box<dyn HostMessage + Send + Sync>>, Error>>>
142 + Send;
143
144 fn reply(
155 &self,
156 reply_to: &Message,
157 message: Message,
158 ) -> impl Future<Output = wasmtime::Result<Result<(), Error>>> + Send;
159}
160
161#[derive(Debug, Default)]
163pub struct GuestMessage {
164 pub content_type: Option<String>,
167 pub data: Vec<u8>,
169 pub metadata: Option<Vec<(String, String)>>,
173}
174
175pub enum Message {
176 Host(Box<dyn HostMessage + Send + Sync>),
177 Wrpc(wrpc::wasmcloud::messaging0_2_0::types::BrokerMessage),
178 Guest(GuestMessage),
179}
180
181impl<H> types::Host for Ctx<H> where H: Handler {}
182
183impl<H> types::HostClient for Ctx<H>
184where
185 H: Handler,
186{
187 #[instrument(level = "debug", skip_all)]
188 async fn connect(
189 &mut self,
190 name: String,
191 ) -> wasmtime::Result<Result<Resource<Box<dyn Client + Send + Sync>>, Error>> {
192 self.attach_parent_context();
193 match self.handler.connect(name).await? {
194 Ok(client) => {
195 let client = self
196 .table
197 .push(client)
198 .context("failed to push client to table")?;
199 Ok(Ok(client))
200 }
201 Err(err) => Ok(Err(err)),
202 }
203 }
204
205 #[instrument(level = "debug", skip_all)]
206 async fn disconnect(
207 &mut self,
208 client: Resource<Box<dyn Client + Send + Sync>>,
209 ) -> wasmtime::Result<Result<(), Error>> {
210 self.attach_parent_context();
211 let client = self
212 .table
213 .get_mut(&client)
214 .context("failed to get client")?;
215 client.disconnect().await
216 }
217
218 #[instrument(level = "debug", skip_all)]
219 async fn drop(
220 &mut self,
221 client: Resource<Box<dyn Client + Send + Sync>>,
222 ) -> wasmtime::Result<()> {
223 self.attach_parent_context();
224 self.table
225 .delete(client)
226 .context("failed to delete client")?;
227 Ok(())
228 }
229}
230
231impl<H> types::HostMessage for Ctx<H>
232where
233 H: Handler,
234{
235 #[instrument(level = "debug", skip_all)]
236 async fn new(&mut self, data: Vec<u8>) -> wasmtime::Result<Resource<Message>> {
237 self.attach_parent_context();
238 self.table
239 .push(Message::Guest(GuestMessage {
240 data,
241 ..Default::default()
242 }))
243 .context("failed to push message to table")
244 }
245
246 #[instrument(level = "debug", skip_all)]
247 async fn topic(&mut self, msg: Resource<Message>) -> wasmtime::Result<Option<Topic>> {
248 self.attach_parent_context();
249 let msg = self.table.get(&msg).context("failed to get message")?;
250 match msg {
251 Message::Host(msg) => msg.topic().await,
252 Message::Wrpc(msg) => Ok(Some(msg.subject.clone())),
253 Message::Guest(GuestMessage { .. }) => Ok(None),
254 }
255 }
256
257 #[instrument(level = "debug", skip_all)]
258 async fn content_type(&mut self, msg: Resource<Message>) -> wasmtime::Result<Option<String>> {
259 self.attach_parent_context();
260 let msg = self.table.get(&msg).context("failed to get message")?;
261 match msg {
262 Message::Host(msg) => msg.content_type().await,
263 Message::Wrpc(..) => Ok(None),
264 Message::Guest(GuestMessage { content_type, .. }) => Ok(content_type.clone()),
265 }
266 }
267
268 #[instrument(level = "debug", skip_all)]
269 async fn set_content_type(
270 &mut self,
271 msg: Resource<Message>,
272 content_type: String,
273 ) -> wasmtime::Result<()> {
274 self.attach_parent_context();
275 let msg = self.table.get_mut(&msg).context("failed to get message")?;
276 match msg {
277 Message::Host(msg) => msg.set_content_type(content_type).await,
278 Message::Wrpc(..) => bail!("content-type not currently supported by wRPC messages"),
279 Message::Guest(msg) => {
280 msg.content_type = Some(content_type);
281 Ok(())
282 }
283 }
284 }
285
286 #[instrument(level = "debug", skip_all)]
287 async fn data(&mut self, msg: Resource<Message>) -> wasmtime::Result<Vec<u8>> {
288 self.attach_parent_context();
289 let msg = self.table.get(&msg).context("failed to get message")?;
290 match msg {
291 Message::Host(msg) => msg.data().await,
292 Message::Wrpc(msg) => Ok(msg.body.to_vec()),
293 Message::Guest(GuestMessage { data, .. }) => Ok(data.clone()),
294 }
295 }
296
297 #[instrument(level = "debug", skip_all)]
298 async fn set_data(&mut self, msg: Resource<Message>, buf: Vec<u8>) -> wasmtime::Result<()> {
299 self.attach_parent_context();
300 let msg = self.table.get_mut(&msg).context("failed to get message")?;
301 match msg {
302 Message::Host(msg) => msg.set_data(buf).await,
303 Message::Wrpc(msg) => {
304 msg.body = buf.into();
305 Ok(())
306 }
307 Message::Guest(GuestMessage { data, .. }) => {
308 *data = buf;
309 Ok(())
310 }
311 }
312 }
313
314 #[instrument(level = "debug", skip_all)]
315 async fn metadata(&mut self, msg: Resource<Message>) -> wasmtime::Result<Option<Metadata>> {
316 self.attach_parent_context();
317 let msg = self.table.get(&msg).context("failed to get message")?;
318 match msg {
319 Message::Host(msg) => msg.metadata().await,
320 Message::Wrpc(..) => Ok(None),
321 Message::Guest(GuestMessage { metadata, .. }) => Ok(metadata.clone()),
322 }
323 }
324
325 #[instrument(level = "debug", skip_all)]
326 async fn add_metadata(
327 &mut self,
328 msg: Resource<Message>,
329 key: String,
330 value: String,
331 ) -> wasmtime::Result<()> {
332 self.attach_parent_context();
333 let msg = self.table.get_mut(&msg).context("failed to get message")?;
334 match msg {
335 Message::Host(msg) => msg.add_metadata(key, value).await,
336 Message::Wrpc(..) => bail!("metadata not currently supported by wRPC messages"),
337 Message::Guest(GuestMessage {
338 metadata: Some(metadata),
339 ..
340 }) => {
341 metadata.push((key, value));
342 Ok(())
343 }
344 Message::Guest(GuestMessage { metadata, .. }) => {
345 *metadata = Some(vec![(key, value)]);
346 Ok(())
347 }
348 }
349 }
350
351 #[instrument(level = "debug", skip_all)]
352 async fn set_metadata(
353 &mut self,
354 msg: Resource<Message>,
355 meta: Metadata,
356 ) -> wasmtime::Result<()> {
357 self.attach_parent_context();
358 let msg = self.table.get_mut(&msg).context("failed to get message")?;
359 match msg {
360 Message::Host(msg) => msg.set_metadata(meta).await,
361 Message::Wrpc(..) if meta.is_empty() => Ok(()),
362 Message::Wrpc(..) => bail!("metadata not currently supported by wRPC messages"),
363 Message::Guest(GuestMessage { metadata, .. }) => {
364 *metadata = Some(meta);
365 Ok(())
366 }
367 }
368 }
369
370 #[instrument(level = "debug", skip_all)]
371 async fn remove_metadata(
372 &mut self,
373 msg: Resource<Message>,
374 key: String,
375 ) -> wasmtime::Result<()> {
376 self.attach_parent_context();
377 let msg = self.table.get_mut(&msg).context("failed to get message")?;
378 match msg {
379 Message::Host(msg) => msg.remove_metadata(key).await,
380 Message::Guest(GuestMessage {
381 metadata: Some(metadata),
382 ..
383 }) => {
384 metadata.retain(|(k, _)| *k != key);
385 Ok(())
386 }
387 Message::Guest(..) | Message::Wrpc(..) => Ok(()),
388 }
389 }
390
391 #[instrument(level = "debug", skip_all)]
392 async fn drop(&mut self, rep: Resource<Message>) -> wasmtime::Result<()> {
393 self.attach_parent_context();
394 self.table.delete(rep).context("failed to delete message")?;
395 Ok(())
396 }
397}
398
399impl<H> producer::Host for Ctx<H>
400where
401 H: Handler,
402{
403 #[instrument(level = "debug", skip_all)]
404 async fn send(
405 &mut self,
406 client: Resource<Box<dyn Client + Send + Sync>>,
407 topic: Topic,
408 message: Resource<Message>,
409 ) -> wasmtime::Result<Result<(), Error>> {
410 self.attach_parent_context();
411 let message = self
412 .table
413 .delete(message)
414 .context("failed to delete outgoing message")?;
415 let client = self.table.get(&client).context("failed to get client")?;
416 self.handler.send(client.as_ref(), topic, message).await
417 }
418}
419
420impl<H> request_reply::Host for Ctx<H>
421where
422 H: Handler,
423{
424 async fn request(
425 &mut self,
426 client: Resource<Box<dyn Client + Send + Sync>>,
427 topic: Topic,
428 message: Resource<Message>,
429 options: Option<Resource<RequestOptions>>,
430 ) -> wasmtime::Result<Result<Vec<Resource<Message>>, Error>> {
431 self.attach_parent_context();
432 let options = options
433 .map(|options| self.table.delete(options))
434 .transpose()
435 .context("failed to delete request options")?;
436 let client = self.table.get(&client).context("failed to get client")?;
437 let message = self
438 .table
439 .get(&message)
440 .context("failed to get outgoing message")?;
441 match Messaging::request(&self.handler, client.as_ref(), topic, message, options).await? {
442 Ok(msgs) => {
443 let msgs = msgs
444 .into_iter()
445 .map(|msg| {
446 self.table
447 .push(Message::Host(msg))
448 .context("failed to push message to table")
449 })
450 .collect::<wasmtime::Result<Vec<_>>>()?;
451 Ok(Ok(msgs))
452 }
453 Err(err) => Ok(Err(err)),
454 }
455 }
456
457 async fn reply(
458 &mut self,
459 reply_to: Resource<Message>,
460 message: Resource<Message>,
461 ) -> wasmtime::Result<Result<(), Error>> {
462 self.attach_parent_context();
463 let message = self
464 .table
465 .delete(message)
466 .context("failed to delete outgoing message")?;
467 let reply_to = self
468 .table
469 .get(&reply_to)
470 .context("failed to get incoming message")?;
471 self.handler.reply(reply_to, message).await
472 }
473}
474
475impl<H> request_reply::HostRequestOptions for Ctx<H>
476where
477 H: Handler,
478{
479 async fn new(&mut self) -> wasmtime::Result<Resource<RequestOptions>> {
480 self.attach_parent_context();
481 self.table
482 .push(RequestOptions::default())
483 .context("failed to push request options to table")
484 }
485
486 async fn set_timeout_ms(
487 &mut self,
488 opts: Resource<RequestOptions>,
489 timeout_ms: u32,
490 ) -> wasmtime::Result<()> {
491 self.attach_parent_context();
492 let opts = self
493 .table
494 .get_mut(&opts)
495 .context("failed to get request options")?;
496 opts.timeout_ms = Some(timeout_ms);
497 Ok(())
498 }
499
500 async fn set_expected_replies(
501 &mut self,
502 opts: Resource<RequestOptions>,
503 expected_replies: u32,
504 ) -> wasmtime::Result<()> {
505 self.attach_parent_context();
506 let opts = self
507 .table
508 .get_mut(&opts)
509 .context("failed to get request options")?;
510 opts.expected_replies = Some(expected_replies);
511 Ok(())
512 }
513
514 async fn drop(&mut self, opts: Resource<RequestOptions>) -> wasmtime::Result<()> {
515 self.attach_parent_context();
516 self.table
517 .delete(opts)
518 .context("failed to delete request options")?;
519 Ok(())
520 }
521}