wasmcloud_provider_http_server/
path.rs1use core::time::Duration;
7
8use std::collections::HashMap;
9use std::net::SocketAddr;
10use std::str::FromStr;
11use std::sync::Arc;
12
13use anyhow::{bail, Context as _};
14use axum::extract::{self};
15use axum::handler::Handler;
16use axum_server::tls_rustls::RustlsConfig;
17use axum_server::Handle;
18use tokio::sync::RwLock;
19use tokio::task::JoinHandle;
20use tracing::{debug, error, info, instrument};
21use wasmcloud_provider_sdk::provider::WrpcClient;
22use wasmcloud_provider_sdk::{get_connection, HostData, LinkConfig, LinkDeleteInfo, Provider};
23
24use crate::{
25 build_request, get_cors_layer, get_tcp_listener, invoke_component, load_settings,
26 ServiceSettings,
27};
28
29#[derive(Default)]
32struct Router {
33 paths: HashMap<Arc<str>, (Arc<str>, WrpcClient)>,
35 components: HashMap<(Arc<str>, Arc<str>), Arc<str>>,
37}
38
39#[derive(Clone)]
41pub struct HttpServerProvider {
42 path_router: Arc<RwLock<Router>>,
44 handle: Handle,
46 task: Arc<JoinHandle<()>>,
48}
49
50impl Drop for HttpServerProvider {
51 fn drop(&mut self) {
52 self.handle.shutdown();
53 self.task.abort();
54 }
55}
56
57impl HttpServerProvider {
58 pub(crate) async fn new(host_data: &HostData) -> anyhow::Result<Self> {
59 let default_address = host_data
60 .config
61 .get("default_address")
62 .map(|s| SocketAddr::from_str(s))
63 .transpose()
64 .context("failed to parse default_address")?;
65 let settings = load_settings(default_address, &host_data.config)
66 .context("failed to load settings in path mode")?;
67 let settings = Arc::new(settings);
68
69 let path_router = Arc::default();
70
71 let addr = settings.address;
72 info!(
73 %addr,
74 "httpserver starting listener in path-based mode",
75 );
76 let cors = get_cors_layer(&settings)?;
77 let listener = get_tcp_listener(&settings)?;
78 let service = handle_request.layer(cors);
79
80 let handle = axum_server::Handle::new();
81 let task_handle = handle.clone();
82 let task_router = Arc::clone(&path_router);
83 let task = if let (Some(crt), Some(key)) =
84 (&settings.tls_cert_file, &settings.tls_priv_key_file)
85 {
86 debug!(?addr, "bind HTTPS listener");
87 let tls = RustlsConfig::from_pem_file(crt, key)
88 .await
89 .context("failed to construct TLS config")?;
90
91 tokio::spawn(async move {
92 if let Err(e) = axum_server::from_tcp_rustls(listener, tls)
93 .handle(task_handle)
94 .serve(
95 service
96 .with_state(RequestContext {
97 router: task_router,
98 scheme: http::uri::Scheme::HTTPS,
99 settings: Arc::clone(&settings),
100 })
101 .into_make_service(),
102 )
103 .await
104 {
105 error!(error = %e, "failed to serve HTTPS for path-based mode");
106 }
107 })
108 } else {
109 debug!(?addr, "bind HTTP listener");
110
111 tokio::spawn(async move {
112 if let Err(e) = axum_server::from_tcp(listener)
113 .handle(task_handle)
114 .serve(
115 service
116 .with_state(RequestContext {
117 router: task_router,
118 scheme: http::uri::Scheme::HTTP,
119 settings: Arc::clone(&settings),
120 })
121 .into_make_service(),
122 )
123 .await
124 {
125 error!(error = %e, "failed to serve HTTP for path-based mode");
126 }
127 })
128 };
129
130 Ok(Self {
131 path_router,
132 handle,
133 task: Arc::new(task),
134 })
135 }
136}
137
138impl Provider for HttpServerProvider {
139 async fn receive_link_config_as_source(
144 &self,
145 link_config: LinkConfig<'_>,
146 ) -> anyhow::Result<()> {
147 let Some(path) = link_config.config.get("path") else {
148 error!(?link_config.config, ?link_config.target_id, "path not found in link config, cannot register path");
149 bail!(
150 "path not found in link config, cannot register path for component {}",
151 link_config.target_id
152 );
153 };
154
155 let target = Arc::from(link_config.target_id);
156 let name = Arc::from(link_config.link_name);
157
158 let key = (Arc::clone(&target), Arc::clone(&name));
159
160 let mut path_router = self.path_router.write().await;
161 if path_router.components.contains_key(&key) {
162 bail!("Component {target} already has a path registered with link name {name}");
164 }
165 if path_router.paths.contains_key(path.as_str()) {
166 bail!("Path {path} already in use by a different component");
168 }
169
170 let wrpc = get_connection()
171 .get_wrpc_client(link_config.target_id)
172 .await
173 .context("failed to construct wRPC client")?;
174
175 let path = Arc::from(path.clone());
176 path_router.components.insert(key, Arc::clone(&path));
178 path_router.paths.insert(path, (target, wrpc));
179
180 Ok(())
181 }
182
183 #[instrument(level = "debug", skip_all, fields(target_id = info.get_target_id()))]
185 async fn delete_link_as_source(&self, info: impl LinkDeleteInfo) -> anyhow::Result<()> {
186 debug!(
187 source = info.get_source_id(),
188 target = info.get_target_id(),
189 link = info.get_link_name(),
190 "deleting http path link"
191 );
192 let component_id = info.get_target_id();
193 let link_name = info.get_link_name();
194
195 let mut path_router = self.path_router.write().await;
196 let path = path_router
197 .components
198 .remove(&(Arc::from(component_id), Arc::from(link_name)));
199 if let Some(path) = path {
200 path_router.paths.remove(&path);
201 }
202
203 Ok(())
204 }
205
206 async fn shutdown(&self) -> anyhow::Result<()> {
208 self.handle.shutdown();
209 self.task.abort();
210
211 Ok(())
212 }
213}
214
215#[derive(Clone)]
216struct RequestContext {
217 router: Arc<RwLock<Router>>,
218 scheme: http::uri::Scheme,
219 settings: Arc<ServiceSettings>,
220}
221
222#[instrument(level = "debug", skip(router, settings))]
224async fn handle_request(
225 extract::State(RequestContext {
226 router,
227 scheme,
228 settings,
229 }): extract::State<RequestContext>,
230 axum_extra::extract::Host(authority): axum_extra::extract::Host,
231 request: extract::Request,
232) -> impl axum::response::IntoResponse {
233 let timeout = settings.timeout_ms.map(Duration::from_millis);
234 let req = build_request(request, scheme, authority, &settings).map_err(|err| *err)?;
235 let path = req.uri().path();
236 let Some((target_component, wrpc)) = router.read().await.paths.get(path).cloned() else {
237 Err((http::StatusCode::NOT_FOUND, "path not found"))?
238 };
239 axum::response::Result::<_, axum::response::ErrorResponse>::Ok(
240 invoke_component(
241 &wrpc,
242 &target_component,
243 req,
244 timeout,
245 settings.cache_control.as_ref(),
246 )
247 .await,
248 )
249}