wasmcloud_host/wasmbus/providers/http_server/
address.rs1use core::net::SocketAddr;
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use anyhow::Context as _;
7use http::header::HOST;
8use http::uri::Scheme;
9use http::Uri;
10use http_body_util::BodyExt as _;
11use tokio::sync::{Mutex, RwLock};
12use tokio::task::JoinSet;
13use tokio::time::Instant;
14use tracing::{info_span, instrument, trace_span, Instrument as _, Span};
15use wasmcloud_core::http::{load_settings, ServiceSettings};
16use wasmcloud_provider_sdk::{LinkConfig, LinkDeleteInfo};
17use wasmcloud_tracing::KeyValue;
18use wrpc_interface_http::ServeIncomingHandlerWasmtime as _;
19
20use crate::wasmbus::{Component, InvocationContext};
21
22use super::listen;
23
24pub(crate) struct Provider {
25 pub(crate) address: SocketAddr,
27 pub(crate) components: Arc<RwLock<HashMap<String, Arc<Component>>>>,
29 pub(crate) links: Mutex<HashMap<Arc<str>, HashMap<Box<str>, JoinSet<()>>>>,
31 pub(crate) lattice_id: Arc<str>,
32 pub(crate) host_id: Arc<str>,
33}
34
35impl wasmcloud_provider_sdk::Provider for Provider {
36 #[instrument(level = "debug", skip_all)]
37 async fn receive_link_config_as_source(
38 &self,
39 LinkConfig {
40 target_id,
41 config,
42 link_name,
43 ..
44 }: LinkConfig<'_>,
45 ) -> anyhow::Result<()> {
46 let ServiceSettings { address, .. } =
47 load_settings(Some(self.address), config).context("failed to load settings")?;
48
49 let components = Arc::clone(&self.components);
50 let host_id = Arc::clone(&self.host_id);
51 let lattice_id = Arc::clone(&self.lattice_id);
52 let target_id: Arc<str> = Arc::from(target_id);
53 let tasks = listen(address, {
54 let target_id = Arc::clone(&target_id);
55 move |req: hyper::Request<hyper::body::Incoming>| {
56 let components = Arc::clone(&components);
57 let host_id = Arc::clone(&host_id);
58 let lattice_id = Arc::clone(&lattice_id);
59 let target_id = Arc::clone(&target_id);
60 async move {
61 let component = {
62 let components = components.read().await;
63 let component = components
64 .get(target_id.as_ref())
65 .context("linked component not found")?;
66 Arc::clone(component)
67 };
68 let (
69 http::request::Parts {
70 method,
71 uri,
72 headers,
73 ..
74 },
75 body,
76 ) = req.into_parts();
77 let http::uri::Parts {
78 scheme,
79 authority,
80 path_and_query,
81 ..
82 } = uri.into_parts();
83 let mut uri = Uri::builder().scheme(scheme.unwrap_or(Scheme::HTTP));
85 if let Some(authority) = authority {
86 uri = uri.authority(authority);
87 } else if let Some(authority) = headers.get("X-Forwarded-Host") {
88 uri = uri.authority(authority.as_bytes());
89 } else if let Some(authority) = headers.get(HOST) {
90 uri = uri.authority(authority.as_bytes());
91 }
92 if let Some(path_and_query) = path_and_query {
93 uri = uri.path_and_query(path_and_query)
94 };
95 let uri = uri.build().context("invalid URI")?;
96 let mut req = http::Request::builder().method(method);
97 *req.headers_mut().expect("headers missing") = headers;
98
99 let req = req
100 .uri(uri)
101 .body(
102 body.map_err(wasmtime_wasi_http::hyper_response_error)
103 .boxed(),
104 )
105 .context("invalid request")?;
106 let _permit = component
107 .permits
108 .acquire()
109 .instrument(trace_span!("acquire_permit"))
110 .await
111 .context("failed to acquire execution permit")?;
112 let res = component
113 .instantiate(component.handler.copy_for_new(), component.events.clone())
114 .handle(
115 InvocationContext {
116 span: Span::current(),
117 start_at: Instant::now(),
118 attributes: vec![
119 KeyValue::new(
120 "component.ref",
121 Arc::clone(&component.image_reference),
122 ),
123 KeyValue::new("lattice", Arc::clone(&lattice_id)),
124 KeyValue::new("host", Arc::clone(&host_id)),
125 ],
126 },
127 req,
128 )
129 .await?;
130 let res = res?;
131 anyhow::Ok(res)
132 }
133 .instrument(info_span!("handle"))
134 }
135 })
136 .await?;
137
138 self.links
139 .lock()
140 .instrument(trace_span!("insert_link"))
141 .await
142 .entry(target_id)
143 .or_default()
144 .insert(link_name.into(), tasks);
145 Ok(())
146 }
147
148 #[instrument(level = "debug", skip_all)]
149 async fn delete_link_as_source(&self, info: impl LinkDeleteInfo) -> anyhow::Result<()> {
150 let target_id = info.get_target_id();
151 let link_name = info.get_link_name();
152 self.links
153 .lock()
154 .await
155 .get_mut(target_id)
156 .map(|links| links.remove(link_name));
157 Ok(())
158 }
159}