wasmcloud_host/wasmbus/providers/http_server/
mod.rs

1use core::net::SocketAddr;
2use core::str::FromStr as _;
3
4use std::future::Future;
5use std::sync::Arc;
6
7use anyhow::{bail, Context as _};
8use hyper_util::rt::{TokioExecutor, TokioIo};
9use nkeys::XKey;
10use tokio::sync::{broadcast, Mutex};
11use tokio::task::JoinSet;
12use tracing::{error, instrument};
13use wasmcloud_core::{http::default_listen_address, HostData};
14use wasmcloud_provider_sdk::provider::{
15    handle_provider_commands, receive_link_for_provider, ProviderCommandReceivers,
16};
17use wasmcloud_provider_sdk::ProviderConnection;
18
19pub(crate) mod address;
20pub(crate) mod host;
21pub(crate) mod path;
22
23/// Helper enum to allow for code reuse between different routing modes
24enum HttpServerProvider {
25    Address(address::Provider),
26    Path(path::Provider),
27    Host(host::Provider),
28}
29
30impl crate::wasmbus::Host {
31    #[instrument(level = "debug", skip_all)]
32    pub(crate) async fn start_http_server_provider(
33        &self,
34        host_data: HostData,
35        provider_xkey: XKey,
36        provider_id: &str,
37    ) -> anyhow::Result<JoinSet<()>> {
38        let host_id = self.host_key.public_key();
39        let default_address = host_data
40            .config
41            .get("default_address")
42            .map(|s| SocketAddr::from_str(s))
43            .transpose()
44            .context("failed to parse default_address")?
45            .unwrap_or_else(default_listen_address);
46
47        let provider = match host_data.config.get("routing_mode").map(String::as_str) {
48            // Run provider in address mode by default
49            Some("address") | None => HttpServerProvider::Address(address::Provider {
50                address: default_address,
51                components: Arc::clone(&self.components),
52                links: Mutex::default(),
53                host_id: Arc::from(host_id.as_str()),
54                lattice_id: Arc::clone(&self.host_config.lattice),
55            }),
56            // Run provider in path mode
57            Some("path") => HttpServerProvider::Path(
58                path::Provider::new(
59                    default_address,
60                    Arc::clone(&self.components),
61                    Arc::from(host_id.as_str()),
62                    Arc::clone(&self.host_config.lattice),
63                )
64                .await?,
65            ),
66            Some("host") => HttpServerProvider::Host(
67                host::Provider::new(
68                    default_address,
69                    Arc::clone(&self.components),
70                    Arc::from(host_id.as_str()),
71                    Arc::clone(&self.host_config.lattice),
72                    host_data.config.get("header").cloned(),
73                )
74                .await?,
75            ),
76            Some(other) => bail!("unknown routing_mode: {other}"),
77        };
78
79        let (quit_tx, quit_rx) = broadcast::channel(1);
80        let commands = ProviderCommandReceivers::new(
81            Arc::clone(&self.rpc_nats),
82            &quit_tx,
83            &self.host_config.lattice,
84            provider_id,
85            provider_id,
86            &host_id,
87        )
88        .await?;
89        let conn = ProviderConnection::new(
90            Arc::clone(&self.rpc_nats),
91            Arc::from(provider_id),
92            Arc::clone(&self.host_config.lattice),
93            host_id.to_string(),
94            host_data.config,
95            provider_xkey,
96            Arc::clone(&self.secrets_xkey),
97        )
98        .context("failed to establish provider connection")?;
99
100        let mut tasks = JoinSet::new();
101        match provider {
102            HttpServerProvider::Address(provider) => {
103                for ld in host_data.link_definitions {
104                    if let Err(e) = receive_link_for_provider(&provider, &conn, ld).await {
105                        error!(
106                            error = %e,
107                            "failed to initialize link during provider startup",
108                        );
109                    }
110                }
111
112                tasks.spawn(async move {
113                    handle_provider_commands(provider, &conn, quit_rx, quit_tx, commands).await
114                });
115            }
116            HttpServerProvider::Path(provider) => {
117                for ld in host_data.link_definitions {
118                    if let Err(e) = receive_link_for_provider(&provider, &conn, ld).await {
119                        error!(
120                            error = %e,
121                            "failed to initialize link during provider startup",
122                        );
123                    }
124                }
125
126                tasks.spawn(async move {
127                    handle_provider_commands(provider, &conn, quit_rx, quit_tx, commands).await
128                });
129            }
130            HttpServerProvider::Host(provider) => {
131                for ld in host_data.link_definitions {
132                    if let Err(e) = receive_link_for_provider(&provider, &conn, ld).await {
133                        error!(
134                            error = %e,
135                            "failed to initialize link during provider startup",
136                        );
137                    }
138                }
139
140                tasks.spawn(async move {
141                    handle_provider_commands(provider, &conn, quit_rx, quit_tx, commands).await
142                });
143            }
144        }
145
146        Ok(tasks)
147    }
148}
149
150pub(crate) async fn listen<F, S>(address: SocketAddr, svc: F) -> anyhow::Result<JoinSet<()>>
151where
152    F: Fn(hyper::Request<hyper::body::Incoming>) -> S,
153    F: Clone + Send + Sync + 'static,
154    S: Future<Output = anyhow::Result<http::Response<wasmtime_wasi_http::body::HyperOutgoingBody>>>,
155    S: Send + 'static,
156{
157    let socket = match &address {
158        SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
159        SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
160    };
161    socket.set_reuseaddr(!cfg!(windows))?;
162    socket.set_nodelay(true)?;
163    socket.bind(address)?;
164    let listener = socket.listen(8196)?;
165
166    let svc = hyper::service::service_fn(svc);
167    let srv = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
168    let srv = Arc::new(srv);
169
170    let mut task = JoinSet::new();
171    task.spawn(async move {
172        loop {
173            let stream = match listener.accept().await {
174                Ok((stream, _)) => stream,
175                Err(err) => {
176                    error!(?err, "failed to accept HTTP server connection");
177                    continue;
178                }
179            };
180            let svc = svc.clone();
181            tokio::spawn({
182                let srv = Arc::clone(&srv);
183                let svc = svc.clone();
184                async move {
185                    if let Err(err) = srv.serve_connection(TokioIo::new(stream), svc).await {
186                        error!(?err, "failed to serve connection");
187                    }
188                }
189            });
190        }
191    });
192
193    Ok(task)
194}