wasmcloud_provider_http_server/
path.rs

1//! This module contains the implementation of the `wrpc:http/incoming-handler` provider in path-based mode.
2//!
3//! In path-based mode, the HTTP server listens on a single address and routes requests to different components
4//! based on the path of the request.
5
6use core::time::Duration;
7
8use std::collections::HashMap;
9use std::net::SocketAddr;
10use std::str::FromStr;
11use std::sync::Arc;
12
13use anyhow::{bail, Context as _};
14use axum::extract::{self};
15use axum::handler::Handler;
16use axum_server::tls_rustls::RustlsConfig;
17use axum_server::Handle;
18use tokio::sync::RwLock;
19use tokio::task::JoinHandle;
20use tracing::{debug, error, info, instrument};
21use wasmcloud_provider_sdk::provider::WrpcClient;
22use wasmcloud_provider_sdk::{get_connection, HostData, LinkConfig, LinkDeleteInfo, Provider};
23
24use crate::{
25    build_request, get_cors_layer, get_tcp_listener, invoke_component, load_settings,
26    ServiceSettings,
27};
28
29/// This struct holds both the forward and reverse mappings for path-based routing
30/// so that they can be modified by just acquiring a single lock in the [`HttpServerProvider`]
31#[derive(Default)]
32struct Router {
33    /// Lookup from a path to the component ID that is handling that path
34    paths: HashMap<Arc<str>, (Arc<str>, WrpcClient)>,
35    /// Reverse lookup to find the path for a (component,link_name) pair
36    components: HashMap<(Arc<str>, Arc<str>), Arc<str>>,
37}
38
39/// `wrpc:http/incoming-handler` provider implementation with path-based routing
40#[derive(Clone)]
41pub struct HttpServerProvider {
42    /// Struct that holds the routing information based on path/component_id
43    path_router: Arc<RwLock<Router>>,
44    /// [`Handle`] to the server task
45    handle: Handle,
46    /// Task handle for the server task
47    task: Arc<JoinHandle<()>>,
48}
49
50impl Drop for HttpServerProvider {
51    fn drop(&mut self) {
52        self.handle.shutdown();
53        self.task.abort();
54    }
55}
56
57impl HttpServerProvider {
58    pub(crate) async fn new(host_data: &HostData) -> anyhow::Result<Self> {
59        let default_address = host_data
60            .config
61            .get("default_address")
62            .map(|s| SocketAddr::from_str(s))
63            .transpose()
64            .context("failed to parse default_address")?;
65        let settings = load_settings(default_address, &host_data.config)
66            .context("failed to load settings in path mode")?;
67        let settings = Arc::new(settings);
68
69        let path_router = Arc::default();
70
71        let addr = settings.address;
72        info!(
73            %addr,
74            "httpserver starting listener in path-based mode",
75        );
76        let cors = get_cors_layer(&settings)?;
77        let listener = get_tcp_listener(&settings)?;
78        let service = handle_request.layer(cors);
79
80        let handle = axum_server::Handle::new();
81        let task_handle = handle.clone();
82        let task_router = Arc::clone(&path_router);
83        let task = if let (Some(crt), Some(key)) =
84            (&settings.tls_cert_file, &settings.tls_priv_key_file)
85        {
86            debug!(?addr, "bind HTTPS listener");
87            let tls = RustlsConfig::from_pem_file(crt, key)
88                .await
89                .context("failed to construct TLS config")?;
90
91            tokio::spawn(async move {
92                if let Err(e) = axum_server::from_tcp_rustls(listener, tls)
93                    .handle(task_handle)
94                    .serve(
95                        service
96                            .with_state(RequestContext {
97                                router: task_router,
98                                scheme: http::uri::Scheme::HTTPS,
99                                settings: Arc::clone(&settings),
100                            })
101                            .into_make_service(),
102                    )
103                    .await
104                {
105                    error!(error = %e, "failed to serve HTTPS for path-based mode");
106                }
107            })
108        } else {
109            debug!(?addr, "bind HTTP listener");
110
111            tokio::spawn(async move {
112                if let Err(e) = axum_server::from_tcp(listener)
113                    .handle(task_handle)
114                    .serve(
115                        service
116                            .with_state(RequestContext {
117                                router: task_router,
118                                scheme: http::uri::Scheme::HTTP,
119                                settings: Arc::clone(&settings),
120                            })
121                            .into_make_service(),
122                    )
123                    .await
124                {
125                    error!(error = %e, "failed to serve HTTP for path-based mode");
126                }
127            })
128        };
129
130        Ok(Self {
131            path_router,
132            handle,
133            task: Arc::new(task),
134        })
135    }
136}
137
138impl Provider for HttpServerProvider {
139    /// This is called when the HTTP server provider is linked to a component
140    ///
141    /// This HTTP server mode will register the path in the link for routing to the target
142    /// component when a request is received on the listen address.
143    async fn receive_link_config_as_source(
144        &self,
145        link_config: LinkConfig<'_>,
146    ) -> anyhow::Result<()> {
147        let Some(path) = link_config.config.get("path") else {
148            error!(?link_config.config, ?link_config.target_id, "path not found in link config, cannot register path");
149            bail!(
150                "path not found in link config, cannot register path for component {}",
151                link_config.target_id
152            );
153        };
154
155        let target = Arc::from(link_config.target_id);
156        let name = Arc::from(link_config.link_name);
157
158        let key = (Arc::clone(&target), Arc::clone(&name));
159
160        let mut path_router = self.path_router.write().await;
161        if path_router.components.contains_key(&key) {
162            // When we can return errors from links, tell the host this was invalid
163            bail!("Component {target} already has a path registered with link name {name}");
164        }
165        if path_router.paths.contains_key(path.as_str()) {
166            // When we can return errors from links, tell the host this was invalid
167            bail!("Path {path} already in use by a different component");
168        }
169
170        let wrpc = get_connection()
171            .get_wrpc_client(link_config.target_id)
172            .await
173            .context("failed to construct wRPC client")?;
174
175        let path = Arc::from(path.clone());
176        // Insert the path into the paths map for future lookups
177        path_router.components.insert(key, Arc::clone(&path));
178        path_router.paths.insert(path, (target, wrpc));
179
180        Ok(())
181    }
182
183    /// Remove the path for a particular component/link_name pair
184    #[instrument(level = "debug", skip_all, fields(target_id = info.get_target_id()))]
185    async fn delete_link_as_source(&self, info: impl LinkDeleteInfo) -> anyhow::Result<()> {
186        debug!(
187            source = info.get_source_id(),
188            target = info.get_target_id(),
189            link = info.get_link_name(),
190            "deleting http path link"
191        );
192        let component_id = info.get_target_id();
193        let link_name = info.get_link_name();
194
195        let mut path_router = self.path_router.write().await;
196        let path = path_router
197            .components
198            .remove(&(Arc::from(component_id), Arc::from(link_name)));
199        if let Some(path) = path {
200            path_router.paths.remove(&path);
201        }
202
203        Ok(())
204    }
205
206    /// Handle shutdown request by shutting down the http server task
207    async fn shutdown(&self) -> anyhow::Result<()> {
208        self.handle.shutdown();
209        self.task.abort();
210
211        Ok(())
212    }
213}
214
215#[derive(Clone)]
216struct RequestContext {
217    router: Arc<RwLock<Router>>,
218    scheme: http::uri::Scheme,
219    settings: Arc<ServiceSettings>,
220}
221
222/// Handle an HTTP request by looking up the component ID for the path and invoking the component
223#[instrument(level = "debug", skip(router, settings))]
224async fn handle_request(
225    extract::State(RequestContext {
226        router,
227        scheme,
228        settings,
229    }): extract::State<RequestContext>,
230    axum_extra::extract::Host(authority): axum_extra::extract::Host,
231    request: extract::Request,
232) -> impl axum::response::IntoResponse {
233    let timeout = settings.timeout_ms.map(Duration::from_millis);
234    let req = build_request(request, scheme, authority, &settings).map_err(|err| *err)?;
235    let path = req.uri().path();
236    let Some((target_component, wrpc)) = router.read().await.paths.get(path).cloned() else {
237        Err((http::StatusCode::NOT_FOUND, "path not found"))?
238    };
239    axum::response::Result::<_, axum::response::ErrorResponse>::Ok(
240        invoke_component(
241            &wrpc,
242            &target_component,
243            req,
244            timeout,
245            settings.cache_control.as_ref(),
246        )
247        .await,
248    )
249}