wasmcloud_host/wasmbus/providers/http_server/
mod.rs1use core::net::SocketAddr;
2use core::str::FromStr as _;
3
4use std::future::Future;
5use std::sync::Arc;
6
7use anyhow::{bail, Context as _};
8use hyper_util::rt::{TokioExecutor, TokioIo};
9use nkeys::XKey;
10use tokio::sync::{broadcast, Mutex};
11use tokio::task::JoinSet;
12use tracing::{error, instrument};
13use wasmcloud_core::{http::default_listen_address, HostData};
14use wasmcloud_provider_sdk::provider::{
15 handle_provider_commands, receive_link_for_provider, ProviderCommandReceivers,
16};
17use wasmcloud_provider_sdk::ProviderConnection;
18
19pub(crate) mod address;
20pub(crate) mod host;
21pub(crate) mod path;
22
23enum HttpServerProvider {
25 Address(address::Provider),
26 Path(path::Provider),
27 Host(host::Provider),
28}
29
30impl crate::wasmbus::Host {
31 #[instrument(level = "debug", skip_all)]
32 pub(crate) async fn start_http_server_provider(
33 &self,
34 host_data: HostData,
35 provider_xkey: XKey,
36 provider_id: &str,
37 ) -> anyhow::Result<JoinSet<()>> {
38 let host_id = self.host_key.public_key();
39 let default_address = host_data
40 .config
41 .get("default_address")
42 .map(|s| SocketAddr::from_str(s))
43 .transpose()
44 .context("failed to parse default_address")?
45 .unwrap_or_else(default_listen_address);
46
47 let provider = match host_data.config.get("routing_mode").map(String::as_str) {
48 Some("address") | None => HttpServerProvider::Address(address::Provider {
50 address: default_address,
51 components: Arc::clone(&self.components),
52 links: Mutex::default(),
53 host_id: Arc::from(host_id.as_str()),
54 lattice_id: Arc::clone(&self.host_config.lattice),
55 }),
56 Some("path") => HttpServerProvider::Path(
58 path::Provider::new(
59 default_address,
60 Arc::clone(&self.components),
61 Arc::from(host_id.as_str()),
62 Arc::clone(&self.host_config.lattice),
63 )
64 .await?,
65 ),
66 Some("host") => HttpServerProvider::Host(
67 host::Provider::new(
68 default_address,
69 Arc::clone(&self.components),
70 Arc::from(host_id.as_str()),
71 Arc::clone(&self.host_config.lattice),
72 host_data.config.get("header").cloned(),
73 )
74 .await?,
75 ),
76 Some(other) => bail!("unknown routing_mode: {other}"),
77 };
78
79 let (quit_tx, quit_rx) = broadcast::channel(1);
80 let commands = ProviderCommandReceivers::new(
81 Arc::clone(&self.rpc_nats),
82 &quit_tx,
83 &self.host_config.lattice,
84 provider_id,
85 provider_id,
86 &host_id,
87 )
88 .await?;
89 let conn = ProviderConnection::new(
90 Arc::clone(&self.rpc_nats),
91 Arc::from(provider_id),
92 Arc::clone(&self.host_config.lattice),
93 host_id.to_string(),
94 host_data.config,
95 provider_xkey,
96 Arc::clone(&self.secrets_xkey),
97 )
98 .context("failed to establish provider connection")?;
99
100 let mut tasks = JoinSet::new();
101 match provider {
102 HttpServerProvider::Address(provider) => {
103 for ld in host_data.link_definitions {
104 if let Err(e) = receive_link_for_provider(&provider, &conn, ld).await {
105 error!(
106 error = %e,
107 "failed to initialize link during provider startup",
108 );
109 }
110 }
111
112 tasks.spawn(async move {
113 handle_provider_commands(provider, &conn, quit_rx, quit_tx, commands).await
114 });
115 }
116 HttpServerProvider::Path(provider) => {
117 for ld in host_data.link_definitions {
118 if let Err(e) = receive_link_for_provider(&provider, &conn, ld).await {
119 error!(
120 error = %e,
121 "failed to initialize link during provider startup",
122 );
123 }
124 }
125
126 tasks.spawn(async move {
127 handle_provider_commands(provider, &conn, quit_rx, quit_tx, commands).await
128 });
129 }
130 HttpServerProvider::Host(provider) => {
131 for ld in host_data.link_definitions {
132 if let Err(e) = receive_link_for_provider(&provider, &conn, ld).await {
133 error!(
134 error = %e,
135 "failed to initialize link during provider startup",
136 );
137 }
138 }
139
140 tasks.spawn(async move {
141 handle_provider_commands(provider, &conn, quit_rx, quit_tx, commands).await
142 });
143 }
144 }
145
146 Ok(tasks)
147 }
148}
149
150pub(crate) async fn listen<F, S>(address: SocketAddr, svc: F) -> anyhow::Result<JoinSet<()>>
151where
152 F: Fn(hyper::Request<hyper::body::Incoming>) -> S,
153 F: Clone + Send + Sync + 'static,
154 S: Future<Output = anyhow::Result<http::Response<wasmtime_wasi_http::body::HyperOutgoingBody>>>,
155 S: Send + 'static,
156{
157 let socket = match &address {
158 SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
159 SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
160 };
161 socket.set_reuseaddr(!cfg!(windows))?;
162 socket.set_nodelay(true)?;
163 socket.bind(address)?;
164 let listener = socket.listen(8196)?;
165
166 let svc = hyper::service::service_fn(svc);
167 let srv = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
168 let srv = Arc::new(srv);
169
170 let mut task = JoinSet::new();
171 task.spawn(async move {
172 loop {
173 let stream = match listener.accept().await {
174 Ok((stream, _)) => stream,
175 Err(err) => {
176 error!(?err, "failed to accept HTTP server connection");
177 continue;
178 }
179 };
180 let svc = svc.clone();
181 tokio::spawn({
182 let srv = Arc::clone(&srv);
183 let svc = svc.clone();
184 async move {
185 if let Err(err) = srv.serve_connection(TokioIo::new(stream), svc).await {
186 error!(?err, "failed to serve connection");
187 }
188 }
189 });
190 }
191 });
192
193 Ok(task)
194}