wasmcloud_host/wasmbus/providers/http_client/
mod.rs1use anyhow::Context as _;
2use futures::StreamExt as _;
3use nkeys::XKey;
4use rustls_pemfile;
5use std::io::BufReader;
6use std::sync::Arc;
7use tokio::select;
8use tokio::sync::broadcast;
9use tokio::task::JoinSet;
10use tracing::{debug, error, info, warn};
11use webpki_roots;
12
13use wasmcloud_core::HostData;
14use wasmcloud_provider_sdk::{
15 provider::{handle_provider_commands, receive_link_for_provider, ProviderCommandReceivers},
16 ProviderConnection,
17};
18use wrpc_interface_http::ServeHttp;
19
20pub(crate) mod provider;
22
23use wasmcloud_core::http_client::{
25 DEFAULT_IDLE_TIMEOUT, LOAD_NATIVE_CERTS, LOAD_WEBPKI_CERTS, SSL_CERTS_FILE,
26};
27
28impl crate::wasmbus::Host {
29 pub(crate) async fn start_http_client_provider(
46 &self,
47 host_data: HostData,
48 provider_xkey: XKey,
49 provider_id: &str,
50 ) -> anyhow::Result<JoinSet<()>> {
51 info!("Starting HTTP client provider with ID: {}", provider_id);
52 let host_id = self.host_key.public_key();
53
54 let tls = if host_data.config.is_empty() {
56 debug!("Using default TLS connector");
57 wasmcloud_provider_sdk::core::tls::DEFAULT_RUSTLS_CONNECTOR.clone()
58 } else {
59 debug!("Configuring custom TLS connector");
60 let mut ca = rustls::RootCertStore::empty();
61
62 if host_data
64 .config
65 .get(LOAD_NATIVE_CERTS)
66 .map(|v| v.eq_ignore_ascii_case("true"))
67 .unwrap_or(true)
68 {
69 let (added, ignored) = ca.add_parsable_certificates(
70 wasmcloud_provider_sdk::core::tls::NATIVE_ROOTS
71 .iter()
72 .cloned(),
73 );
74 debug!(added, ignored, "loaded native root certificate store");
75 }
76
77 if host_data
79 .config
80 .get(LOAD_WEBPKI_CERTS)
81 .map(|v| v.eq_ignore_ascii_case("true"))
82 .unwrap_or(true)
83 {
84 ca.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
85 debug!("loaded webpki root certificate store");
86 }
87
88 if let Some(file_path) = host_data.config.get(SSL_CERTS_FILE) {
90 let f = std::fs::File::open(file_path)?;
91 let mut reader = BufReader::new(f);
92 let certs = rustls_pemfile::certs(&mut reader).collect::<Result<Vec<_>, _>>()?;
93 let (added, ignored) = ca.add_parsable_certificates(certs);
94 debug!(
95 added,
96 ignored, "added additional root certificates from file"
97 );
98 }
99
100 tokio_rustls::TlsConnector::from(Arc::new(
101 rustls::ClientConfig::builder()
102 .with_root_certificates(ca)
103 .with_no_client_auth(),
104 ))
105 };
106
107 debug!("Creating HTTP client provider instance");
109 let provider = provider::HttpClientProvider::new(tls, DEFAULT_IDLE_TIMEOUT).await?;
110
111 let mut tasks = JoinSet::new();
112
113 debug!("Setting up provider command receivers");
114 let (quit_tx, quit_rx) = broadcast::channel(1);
115 let commands = ProviderCommandReceivers::new(
116 Arc::clone(&self.rpc_nats),
117 &quit_tx,
118 &self.host_config.lattice,
119 provider_id,
120 provider_id,
121 &host_id,
122 )
123 .await?;
124
125 debug!("Creating provider connection");
126 let conn = ProviderConnection::new(
127 Arc::clone(&self.rpc_nats),
128 Arc::from(provider_id),
129 Arc::clone(&self.host_config.lattice),
130 host_id.to_string(),
131 host_data.config,
132 provider_xkey,
133 Arc::clone(&self.secrets_xkey),
134 )
135 .context("failed to establish provider connection")?;
136
137 debug!(
138 target: "http_client::connection",
139 provider_id = %provider_id,
140 lattice = %self.host_config.lattice,
141 "Provider connection created"
142 );
143
144 for ld in host_data.link_definitions {
146 debug!(
147 target: "http_client::link",
148 link_name = %ld.name,
149 source_id = %ld.source_id,
150 target = %ld.target,
151 interfaces = ?ld.interfaces,
152 "Processing link definitions"
153 );
154 if let Err(err) = receive_link_for_provider(&provider, &conn, ld.clone()).await {
155 error!(
156 target: "http_client::link",
157 error = %err,
158 "Failed to initialize link during provider startup"
159 );
160 } else {
161 debug!(
162 target: "http_client::link",
163 instance = "wasi:http/outgoing-handler",
164 link_name = %ld.name,
165 target = %provider_id,
166 "Successfully initialized link"
167 );
168 }
169 }
170
171 let provider_clone = provider.clone();
172 let conn_clone = conn.clone();
173
174 info!("Starting provider command handler");
175 tasks.spawn(async move {
176 handle_provider_commands(provider, &conn, quit_rx, quit_tx, commands).await
177 });
178
179 tasks.spawn(async move {
181 debug!("Setting up wrpc interface");
182 let wrpc = match conn_clone.get_wrpc_client(conn_clone.provider_key()).await {
183 Ok(wrpc) => wrpc,
184 Err(err) => {
185 error!("Failed to get wRPC client: {}", err);
186 return;
187 }
188 };
189
190 let [(_, _, mut invocations)] = match wrpc_interface_http::bindings::exports::wrpc::http::outgoing_handler::serve_interface(
191 &wrpc,
192 ServeHttp(provider_clone.clone()),
193 ).await {
194 Ok(interfaces) => interfaces,
195 Err(err) => {
196 error!("Failed to serve exports: {}", err);
197 return;
198 }
199 };
200
201 info!("HTTP client provider ready to handle requests");
202 let mut tasks = JoinSet::new();
203
204 loop {
205 select! {
206 Some(res) = invocations.next() => {
207 match res {
208 Ok(fut) => {
209 tasks.spawn(async move {
210 if let Err(err) = fut.await {
211 warn!(?err, "failed to serve invocation");
212 }
213 });
214 },
215 Err(err) => {
216 warn!(?err, "failed to accept invocation");
217 }
218 }
219 }
220 }
221 }
222 });
223
224 info!("HTTP client provider started successfully");
225 Ok(tasks)
226 }
227}