wasmcloud_host/wasmbus/providers/http_server/
path.rs1use core::net::SocketAddr;
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use anyhow::{bail, Context as _};
7use http::header::HOST;
8use http::uri::Scheme;
9use http::Uri;
10use http_body_util::BodyExt as _;
11use tokio::time::Instant;
12use tokio::{sync::RwLock, task::JoinSet};
13use tracing::{debug, error, info_span, instrument, trace_span, warn, Instrument as _, Span};
14use wasmcloud_provider_sdk::{LinkConfig, LinkDeleteInfo};
15use wasmcloud_tracing::KeyValue;
16use wrpc_interface_http::ServeIncomingHandlerWasmtime as _;
17
18use crate::wasmbus::{Component, InvocationContext};
19
20use super::listen;
21
22#[derive(Default)]
25pub(crate) struct Router {
26 pub(crate) paths: HashMap<Arc<str>, Arc<str>>,
28 pub(crate) components: HashMap<(Arc<str>, Arc<str>), Arc<str>>,
30}
31
32pub(crate) struct Provider {
33 #[allow(unused)]
36 pub(crate) handle: JoinSet<()>,
37 pub(crate) path_router: Arc<RwLock<Router>>,
39}
40
41impl wasmcloud_provider_sdk::Provider for Provider {
43 #[instrument(level = "debug", skip_all)]
44 async fn receive_link_config_as_source(&self, link: LinkConfig<'_>) -> anyhow::Result<()> {
45 self.put_link(link.target_id, link.link_name, link.config)
46 .await
47 }
48
49 #[instrument(level = "debug", skip_all)]
50 async fn delete_link_as_source(&self, info: impl LinkDeleteInfo) -> anyhow::Result<()> {
51 self.delete_link(
52 info.get_source_id(),
53 info.get_target_id(),
54 info.get_link_name(),
55 )
56 .await
57 }
58}
59
60impl Provider {
61 #[instrument(level = "debug", skip(self))]
62 async fn put_link(
63 &self,
64 target_id: &str,
65 link_name: &str,
66 config: &HashMap<String, String>,
67 ) -> anyhow::Result<()> {
68 let Some(path) = config.get("path") else {
69 error!(
70 ?config,
71 ?target_id,
72 "path not found in link config, cannot register path"
73 );
74 bail!("path not found in link config, cannot register path for component {target_id}",);
75 };
76
77 let target = Arc::from(target_id);
78 let name = Arc::from(link_name);
79 let key = (Arc::clone(&target), Arc::clone(&name));
80
81 let mut path_router = self.path_router.write().await;
82 if path_router.components.contains_key(&key) {
83 bail!("Component {target} already has a path registered with link name {name}");
84 }
85 if path_router.paths.contains_key(path.as_str()) {
86 bail!("Path {path} already in use by a different component");
87 }
88
89 let path = Arc::from(path.clone());
90 path_router.components.insert(key, Arc::clone(&path));
92 path_router.paths.insert(path, target);
93
94 Ok(())
95 }
96
97 #[instrument(level = "debug", skip(self))]
98 async fn delete_link(
99 &self,
100 source_id: &str,
101 target_id: &str,
102 link_name: &str,
103 ) -> anyhow::Result<()> {
104 debug!(
105 source = source_id,
106 target = target_id,
107 link = link_name,
108 "deleting http path link"
109 );
110
111 let mut path_router = self.path_router.write().await;
112 let path = path_router
113 .components
114 .remove(&(Arc::from(target_id), Arc::from(link_name)));
115 if let Some(path) = path {
116 path_router.paths.remove(&path);
117 }
118
119 Ok(())
120 }
121}
122
123impl Provider {
124 pub(crate) async fn new(
125 address: SocketAddr,
126 components: Arc<RwLock<HashMap<String, Arc<Component>>>>,
127 lattice_id: Arc<str>,
128 host_id: Arc<str>,
129 ) -> anyhow::Result<Self> {
130 let path_router: Arc<RwLock<Router>> = Arc::default();
131 let handle = listen(address, {
132 let path_router = Arc::clone(&path_router);
133 move |req: hyper::Request<hyper::body::Incoming>| {
134 let lattice_id = Arc::clone(&lattice_id);
135 let host_id = Arc::clone(&host_id);
136 let components = Arc::clone(&components);
137 let path_router = Arc::clone(&path_router);
138 async move {
139 let (
140 http::request::Parts {
141 method,
142 uri,
143 headers,
144 ..
145 },
146 body,
147 ) = req.into_parts();
148 let http::uri::Parts {
149 scheme,
150 authority,
151 path_and_query,
152 ..
153 } = uri.into_parts();
154
155 let mut uri = Uri::builder().scheme(scheme.unwrap_or(Scheme::HTTP));
157 let component = if let Some(path_and_query) = path_and_query {
158 let component_id = {
159 let router = path_router.read().await;
160 let Some(component_id) = router.paths.get(path_and_query.path()) else {
161 warn!(path = path_and_query.path(), "received request for unregistered http path");
162 return anyhow::Ok(
163 http::Response::builder()
164 .status(404)
165 .body(wasmtime_wasi_http::body::HyperOutgoingBody::default())
166 .context("failed to construct missing path error response")?,
167 );
168 };
169 component_id.to_string()
170 };
171
172 uri = uri.path_and_query(path_and_query);
173
174 let components = components.read().await;
175 let component = components
176 .get(&component_id)
177 .context("linked component not found")?;
178 Arc::clone(component)
179 } else {
180 warn!("path not found in URI, could not look up component");
181 return anyhow::Ok(
182 http::Response::builder()
183 .status(404)
184 .body(wasmtime_wasi_http::body::HyperOutgoingBody::default())
185 .context("failed to construct missing path error response")?,
186 );
187 };
188 if let Some(authority) = authority {
189 uri = uri.authority(authority);
190 } else if let Some(authority) = headers.get("X-Forwarded-Host") {
191 uri = uri.authority(authority.as_bytes());
192 } else if let Some(authority) = headers.get(HOST) {
193 uri = uri.authority(authority.as_bytes());
194 }
195
196 let uri = uri.build().context("invalid URI")?;
197 let mut req = http::Request::builder().method(method);
198 *req.headers_mut().expect("headers missing") = headers;
199 let req = req
200 .uri(uri)
201 .body(
202 body.map_err(wasmtime_wasi_http::hyper_response_error)
203 .boxed(),
204 )
205 .context("invalid request")?;
206 let _permit = component
207 .permits
208 .acquire()
209 .instrument(trace_span!("acquire_permit"))
210 .await
211 .context("failed to acquire execution permit")?;
212 let res = component
213 .instantiate(component.handler.copy_for_new(), component.events.clone())
214 .handle(
215 InvocationContext {
216 span: Span::current(),
217 start_at: Instant::now(),
218 attributes: vec![
219 KeyValue::new(
220 "component.ref",
221 Arc::clone(&component.image_reference),
222 ),
223 KeyValue::new("lattice", Arc::clone(&lattice_id)),
224 KeyValue::new("host", Arc::clone(&host_id)),
225 ],
226 },
227 req,
228 )
229 .await?;
230 let res = res?;
231 anyhow::Ok(res)
232 }
233 .instrument(info_span!("handle"))
234 }
235 })
236 .await
237 .context("failed to listen on address for path based http server")?;
238
239 Ok(Provider {
240 handle,
241 path_router,
242 })
243 }
244}
245
246#[cfg(test)]
247mod test {
248 use std::{collections::HashMap, sync::Arc};
249
250 use anyhow::Context as _;
251 use tokio::task::JoinSet;
252
253 #[tokio::test]
255 async fn can_manage_paths() -> anyhow::Result<()> {
256 let provider = super::Provider {
257 handle: JoinSet::new(),
258 path_router: Arc::default(),
259 };
260
261 provider
266 .put_link(
267 "foo",
268 "default",
269 &HashMap::from([("path".to_string(), "/foo".to_string())]),
270 )
271 .await
272 .context("should register foo path")?;
273 provider
274 .put_link(
275 "bar",
276 "default",
277 &HashMap::from([("path".to_string(), "/api/bar".to_string())]),
278 )
279 .await
280 .context("should register bar path")?;
281 provider
282 .put_link(
283 "baz",
284 "default",
285 &HashMap::from([("path".to_string(), "/foo/api/baz".to_string())]),
286 )
287 .await
288 .context("should register baz path")?;
289
290 {
291 let router = provider.path_router.read().await;
292 assert_eq!(router.paths.len(), 3);
293 assert_eq!(router.components.len(), 3);
294 assert!(router
295 .paths
296 .get("/foo")
297 .is_some_and(|target| &target.to_string() == "foo"));
298 assert!(router
299 .components
300 .get(&(Arc::from("foo"), Arc::from("default")))
301 .is_some_and(|p| &p.to_string() == "/foo"));
302 assert!(router
303 .paths
304 .get("/api/bar")
305 .is_some_and(|target| &target.to_string() == "bar"));
306 assert!(router
307 .components
308 .get(&(Arc::from("bar"), Arc::from("default")))
309 .is_some_and(|p| &p.to_string() == "/api/bar"));
310 assert!(router
311 .paths
312 .get("/foo/api/baz")
313 .is_some_and(|target| &target.to_string() == "baz"));
314 assert!(router
315 .components
316 .get(&(Arc::from("baz"), Arc::from("default")))
317 .is_some_and(|p| &p.to_string() == "/foo/api/baz"));
318 }
319
320 assert!(
322 provider
323 .put_link(
324 "notbaz",
325 "default",
326 &HashMap::from([("path".to_string(), "/foo/api/baz".to_string())]),
327 )
328 .await
329 .is_err(),
330 "should fail to register a path that's already registered"
331 );
332 assert!(
333 provider
334 .put_link(
335 "baz",
336 "default",
337 &HashMap::from([("path".to_string(), "/foo/api/notbaz".to_string())]),
338 )
339 .await
340 .is_err(),
341 "should fail to register a path to a component that already has a path"
342 );
343
344 provider
346 .delete_link("builtin", "foo", "default")
347 .await
348 .context("should delete link")?;
349 provider
350 .delete_link("builtin", "bar", "default")
351 .await
352 .context("should delete link")?;
353 provider
354 .delete_link("builtin", "baz", "default")
355 .await
356 .context("should delete link")?;
357 {
358 let router = provider.path_router.read().await;
359 assert!(router.paths.is_empty());
360 assert!(router.components.is_empty());
361 }
362
363 Ok(())
364 }
365}