wasmcloud_host/wasmbus/providers/http_server/
path.rs

1use core::net::SocketAddr;
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use anyhow::{bail, Context as _};
7use http::header::HOST;
8use http::uri::Scheme;
9use http::Uri;
10use http_body_util::BodyExt as _;
11use tokio::time::Instant;
12use tokio::{sync::RwLock, task::JoinSet};
13use tracing::{debug, error, info_span, instrument, trace_span, warn, Instrument as _, Span};
14use wasmcloud_provider_sdk::{LinkConfig, LinkDeleteInfo};
15use wasmcloud_tracing::KeyValue;
16use wrpc_interface_http::ServeIncomingHandlerWasmtime as _;
17
18use crate::wasmbus::{Component, InvocationContext};
19
20use super::listen;
21
22/// This struct holds both the forward and reverse mappings for path-based routing
23/// so that they can be modified by just acquiring a single lock in the [`HttpServerProvider`]
24#[derive(Default)]
25pub(crate) struct Router {
26    /// Lookup from a path to the component ID that is handling that path
27    pub(crate) paths: HashMap<Arc<str>, Arc<str>>,
28    /// Reverse lookup to find the path for a (component,link_name) pair
29    pub(crate) components: HashMap<(Arc<str>, Arc<str>), Arc<str>>,
30}
31
32pub(crate) struct Provider {
33    /// Handle to the server task. The use of the [`JoinSet`] allows for the server to be
34    /// gracefully shutdown when the provider is shutdown
35    #[allow(unused)]
36    pub(crate) handle: JoinSet<()>,
37    /// Struct that holds the routing information based on path/component_id
38    pub(crate) path_router: Arc<RwLock<Router>>,
39}
40
41// Implementations of put and delete link are done in the `impl Provider` block to aid in testing
42impl wasmcloud_provider_sdk::Provider for Provider {
43    #[instrument(level = "debug", skip_all)]
44    async fn receive_link_config_as_source(&self, link: LinkConfig<'_>) -> anyhow::Result<()> {
45        self.put_link(link.target_id, link.link_name, link.config)
46            .await
47    }
48
49    #[instrument(level = "debug", skip_all)]
50    async fn delete_link_as_source(&self, info: impl LinkDeleteInfo) -> anyhow::Result<()> {
51        self.delete_link(
52            info.get_source_id(),
53            info.get_target_id(),
54            info.get_link_name(),
55        )
56        .await
57    }
58}
59
60impl Provider {
61    #[instrument(level = "debug", skip(self))]
62    async fn put_link(
63        &self,
64        target_id: &str,
65        link_name: &str,
66        config: &HashMap<String, String>,
67    ) -> anyhow::Result<()> {
68        let Some(path) = config.get("path") else {
69            error!(
70                ?config,
71                ?target_id,
72                "path not found in link config, cannot register path"
73            );
74            bail!("path not found in link config, cannot register path for component {target_id}",);
75        };
76
77        let target = Arc::from(target_id);
78        let name = Arc::from(link_name);
79        let key = (Arc::clone(&target), Arc::clone(&name));
80
81        let mut path_router = self.path_router.write().await;
82        if path_router.components.contains_key(&key) {
83            bail!("Component {target} already has a path registered with link name {name}");
84        }
85        if path_router.paths.contains_key(path.as_str()) {
86            bail!("Path {path} already in use by a different component");
87        }
88
89        let path = Arc::from(path.clone());
90        // Insert the path into the paths map for future lookups
91        path_router.components.insert(key, Arc::clone(&path));
92        path_router.paths.insert(path, target);
93
94        Ok(())
95    }
96
97    #[instrument(level = "debug", skip(self))]
98    async fn delete_link(
99        &self,
100        source_id: &str,
101        target_id: &str,
102        link_name: &str,
103    ) -> anyhow::Result<()> {
104        debug!(
105            source = source_id,
106            target = target_id,
107            link = link_name,
108            "deleting http path link"
109        );
110
111        let mut path_router = self.path_router.write().await;
112        let path = path_router
113            .components
114            .remove(&(Arc::from(target_id), Arc::from(link_name)));
115        if let Some(path) = path {
116            path_router.paths.remove(&path);
117        }
118
119        Ok(())
120    }
121}
122
123impl Provider {
124    pub(crate) async fn new(
125        address: SocketAddr,
126        components: Arc<RwLock<HashMap<String, Arc<Component>>>>,
127        lattice_id: Arc<str>,
128        host_id: Arc<str>,
129    ) -> anyhow::Result<Self> {
130        let path_router: Arc<RwLock<Router>> = Arc::default();
131        let handle = listen(address, {
132            let path_router = Arc::clone(&path_router);
133            move |req: hyper::Request<hyper::body::Incoming>| {
134                let lattice_id = Arc::clone(&lattice_id);
135                let host_id = Arc::clone(&host_id);
136                let components = Arc::clone(&components);
137                let path_router = Arc::clone(&path_router);
138                async move {
139                    let (
140                        http::request::Parts {
141                            method,
142                            uri,
143                            headers,
144                            ..
145                        },
146                        body,
147                    ) = req.into_parts();
148                    let http::uri::Parts {
149                        scheme,
150                        authority,
151                        path_and_query,
152                        ..
153                    } = uri.into_parts();
154
155                    // TODO(#3705): Propagate trace context from headers
156                    let mut uri = Uri::builder().scheme(scheme.unwrap_or(Scheme::HTTP));
157                    let component = if let Some(path_and_query) = path_and_query {
158                        let component_id = {
159                            let router = path_router.read().await;
160                            let Some(component_id) = router.paths.get(path_and_query.path()) else {
161                                warn!(path = path_and_query.path(), "received request for unregistered http path");
162                                return anyhow::Ok(
163                                    http::Response::builder()
164                                        .status(404)
165                                        .body(wasmtime_wasi_http::body::HyperOutgoingBody::default())
166                                        .context("failed to construct missing path error response")?,
167                                );
168                            };
169                            component_id.to_string()
170                        };
171
172                        uri = uri.path_and_query(path_and_query);
173
174                        let components = components.read().await;
175                        let component = components
176                            .get(&component_id)
177                            .context("linked component not found")?;
178                        Arc::clone(component)
179                    } else {
180                        warn!("path not found in URI, could not look up component");
181                        return anyhow::Ok(
182                            http::Response::builder()
183                                .status(404)
184                                .body(wasmtime_wasi_http::body::HyperOutgoingBody::default())
185                                .context("failed to construct missing path error response")?,
186                        );
187                    };
188                    if let Some(authority) = authority {
189                        uri = uri.authority(authority);
190                    } else if let Some(authority) = headers.get("X-Forwarded-Host") {
191                        uri = uri.authority(authority.as_bytes());
192                    } else if let Some(authority) = headers.get(HOST) {
193                        uri = uri.authority(authority.as_bytes());
194                    }
195
196                    let uri = uri.build().context("invalid URI")?;
197                    let mut req = http::Request::builder().method(method);
198                    *req.headers_mut().expect("headers missing") = headers;
199                    let req = req
200                        .uri(uri)
201                        .body(
202                            body.map_err(wasmtime_wasi_http::hyper_response_error)
203                                .boxed(),
204                        )
205                        .context("invalid request")?;
206                    let _permit = component
207                        .permits
208                        .acquire()
209                        .instrument(trace_span!("acquire_permit"))
210                        .await
211                        .context("failed to acquire execution permit")?;
212                    let res = component
213                        .instantiate(component.handler.copy_for_new(), component.events.clone())
214                        .handle(
215                            InvocationContext {
216                                span: Span::current(),
217                                start_at: Instant::now(),
218                                attributes: vec![
219                                    KeyValue::new(
220                                        "component.ref",
221                                        Arc::clone(&component.image_reference),
222                                    ),
223                                    KeyValue::new("lattice", Arc::clone(&lattice_id)),
224                                    KeyValue::new("host", Arc::clone(&host_id)),
225                                ],
226                            },
227                            req,
228                        )
229                        .await?;
230                    let res = res?;
231                    anyhow::Ok(res)
232                }
233                .instrument(info_span!("handle"))
234            }
235        })
236        .await
237        .context("failed to listen on address for path based http server")?;
238
239        Ok(Provider {
240            handle,
241            path_router,
242        })
243    }
244}
245
246#[cfg(test)]
247mod test {
248    use std::{collections::HashMap, sync::Arc};
249
250    use anyhow::Context as _;
251    use tokio::task::JoinSet;
252
253    /// Ensure we can register and deregister a bunch of paths properly
254    #[tokio::test]
255    async fn can_manage_paths() -> anyhow::Result<()> {
256        let provider = super::Provider {
257            handle: JoinSet::new(),
258            path_router: Arc::default(),
259        };
260
261        // Put path registrations:
262        // /foo -> foo
263        // /api/bar -> bar
264        // /foo/api/baz -> baz
265        provider
266            .put_link(
267                "foo",
268                "default",
269                &HashMap::from([("path".to_string(), "/foo".to_string())]),
270            )
271            .await
272            .context("should register foo path")?;
273        provider
274            .put_link(
275                "bar",
276                "default",
277                &HashMap::from([("path".to_string(), "/api/bar".to_string())]),
278            )
279            .await
280            .context("should register bar path")?;
281        provider
282            .put_link(
283                "baz",
284                "default",
285                &HashMap::from([("path".to_string(), "/foo/api/baz".to_string())]),
286            )
287            .await
288            .context("should register baz path")?;
289
290        {
291            let router = provider.path_router.read().await;
292            assert_eq!(router.paths.len(), 3);
293            assert_eq!(router.components.len(), 3);
294            assert!(router
295                .paths
296                .get("/foo")
297                .is_some_and(|target| &target.to_string() == "foo"));
298            assert!(router
299                .components
300                .get(&(Arc::from("foo"), Arc::from("default")))
301                .is_some_and(|p| &p.to_string() == "/foo"));
302            assert!(router
303                .paths
304                .get("/api/bar")
305                .is_some_and(|target| &target.to_string() == "bar"));
306            assert!(router
307                .components
308                .get(&(Arc::from("bar"), Arc::from("default")))
309                .is_some_and(|p| &p.to_string() == "/api/bar"));
310            assert!(router
311                .paths
312                .get("/foo/api/baz")
313                .is_some_and(|target| &target.to_string() == "baz"));
314            assert!(router
315                .components
316                .get(&(Arc::from("baz"), Arc::from("default")))
317                .is_some_and(|p| &p.to_string() == "/foo/api/baz"));
318        }
319
320        // Rejecting reserved paths / linked components
321        assert!(
322            provider
323                .put_link(
324                    "notbaz",
325                    "default",
326                    &HashMap::from([("path".to_string(), "/foo/api/baz".to_string())]),
327                )
328                .await
329                .is_err(),
330            "should fail to register a path that's already registered"
331        );
332        assert!(
333            provider
334                .put_link(
335                    "baz",
336                    "default",
337                    &HashMap::from([("path".to_string(), "/foo/api/notbaz".to_string())]),
338                )
339                .await
340                .is_err(),
341            "should fail to register a path to a component that already has a path"
342        );
343
344        // Delete path registrations
345        provider
346            .delete_link("builtin", "foo", "default")
347            .await
348            .context("should delete link")?;
349        provider
350            .delete_link("builtin", "bar", "default")
351            .await
352            .context("should delete link")?;
353        provider
354            .delete_link("builtin", "baz", "default")
355            .await
356            .context("should delete link")?;
357        {
358            let router = provider.path_router.read().await;
359            assert!(router.paths.is_empty());
360            assert!(router.components.is_empty());
361        }
362
363        Ok(())
364    }
365}