wasmcloud_host/wasmbus/providers/http_server/
address.rs

1use core::net::SocketAddr;
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use anyhow::Context as _;
7use http::header::HOST;
8use http::uri::Scheme;
9use http::Uri;
10use http_body_util::BodyExt as _;
11use tokio::sync::{Mutex, RwLock};
12use tokio::task::JoinSet;
13use tokio::time::Instant;
14use tracing::{info_span, instrument, trace_span, Instrument as _, Span};
15use wasmcloud_core::http::{load_settings, ServiceSettings};
16use wasmcloud_provider_sdk::{LinkConfig, LinkDeleteInfo};
17use wasmcloud_tracing::KeyValue;
18use wrpc_interface_http::ServeIncomingHandlerWasmtime as _;
19
20use crate::wasmbus::{Component, InvocationContext};
21
22use super::listen;
23
24pub(crate) struct Provider {
25    /// Default address for the provider to try to listen on if no address is provided
26    pub(crate) address: SocketAddr,
27    /// Map of components that the provider can instantiate, keyed by component ID
28    pub(crate) components: Arc<RwLock<HashMap<String, Arc<Component>>>>,
29    /// Map of links that the provider has established, component ID -> link name -> listener task
30    pub(crate) links: Mutex<HashMap<Arc<str>, HashMap<Box<str>, JoinSet<()>>>>,
31    pub(crate) lattice_id: Arc<str>,
32    pub(crate) host_id: Arc<str>,
33}
34
35impl wasmcloud_provider_sdk::Provider for Provider {
36    #[instrument(level = "debug", skip_all)]
37    async fn receive_link_config_as_source(
38        &self,
39        LinkConfig {
40            target_id,
41            config,
42            link_name,
43            ..
44        }: LinkConfig<'_>,
45    ) -> anyhow::Result<()> {
46        let ServiceSettings { address, .. } =
47            load_settings(Some(self.address), config).context("failed to load settings")?;
48
49        let components = Arc::clone(&self.components);
50        let host_id = Arc::clone(&self.host_id);
51        let lattice_id = Arc::clone(&self.lattice_id);
52        let target_id: Arc<str> = Arc::from(target_id);
53        let tasks = listen(address, {
54            let target_id = Arc::clone(&target_id);
55            move |req: hyper::Request<hyper::body::Incoming>| {
56                let components = Arc::clone(&components);
57                let host_id = Arc::clone(&host_id);
58                let lattice_id = Arc::clone(&lattice_id);
59                let target_id = Arc::clone(&target_id);
60                async move {
61                    let component = {
62                        let components = components.read().await;
63                        let component = components
64                            .get(target_id.as_ref())
65                            .context("linked component not found")?;
66                        Arc::clone(component)
67                    };
68                    let (
69                        http::request::Parts {
70                            method,
71                            uri,
72                            headers,
73                            ..
74                        },
75                        body,
76                    ) = req.into_parts();
77                    let http::uri::Parts {
78                        scheme,
79                        authority,
80                        path_and_query,
81                        ..
82                    } = uri.into_parts();
83                    // TODO(#3705): Propagate trace context from headers
84                    let mut uri = Uri::builder().scheme(scheme.unwrap_or(Scheme::HTTP));
85                    if let Some(authority) = authority {
86                        uri = uri.authority(authority);
87                    } else if let Some(authority) = headers.get("X-Forwarded-Host") {
88                        uri = uri.authority(authority.as_bytes());
89                    } else if let Some(authority) = headers.get(HOST) {
90                        uri = uri.authority(authority.as_bytes());
91                    }
92                    if let Some(path_and_query) = path_and_query {
93                        uri = uri.path_and_query(path_and_query)
94                    };
95                    let uri = uri.build().context("invalid URI")?;
96                    let mut req = http::Request::builder().method(method);
97                    *req.headers_mut().expect("headers missing") = headers;
98
99                    let req = req
100                        .uri(uri)
101                        .body(
102                            body.map_err(wasmtime_wasi_http::hyper_response_error)
103                                .boxed(),
104                        )
105                        .context("invalid request")?;
106                    let _permit = component
107                        .permits
108                        .acquire()
109                        .instrument(trace_span!("acquire_permit"))
110                        .await
111                        .context("failed to acquire execution permit")?;
112                    let res = component
113                        .instantiate(component.handler.copy_for_new(), component.events.clone())
114                        .handle(
115                            InvocationContext {
116                                span: Span::current(),
117                                start_at: Instant::now(),
118                                attributes: vec![
119                                    KeyValue::new(
120                                        "component.ref",
121                                        Arc::clone(&component.image_reference),
122                                    ),
123                                    KeyValue::new("lattice", Arc::clone(&lattice_id)),
124                                    KeyValue::new("host", Arc::clone(&host_id)),
125                                ],
126                            },
127                            req,
128                        )
129                        .await?;
130                    let res = res?;
131                    anyhow::Ok(res)
132                }
133                .instrument(info_span!("handle"))
134            }
135        })
136        .await?;
137
138        self.links
139            .lock()
140            .instrument(trace_span!("insert_link"))
141            .await
142            .entry(target_id)
143            .or_default()
144            .insert(link_name.into(), tasks);
145        Ok(())
146    }
147
148    #[instrument(level = "debug", skip_all)]
149    async fn delete_link_as_source(&self, info: impl LinkDeleteInfo) -> anyhow::Result<()> {
150        let target_id = info.get_target_id();
151        let link_name = info.get_link_name();
152        self.links
153            .lock()
154            .await
155            .get_mut(target_id)
156            .map(|links| links.remove(link_name));
157        Ok(())
158    }
159}