wasmcloud_host/wasmbus/providers/http_client/
mod.rs

1use anyhow::Context as _;
2use futures::StreamExt as _;
3use nkeys::XKey;
4use rustls_pemfile;
5use std::io::BufReader;
6use std::sync::Arc;
7use tokio::select;
8use tokio::sync::broadcast;
9use tokio::task::JoinSet;
10use tracing::{debug, error, info, warn};
11use webpki_roots;
12
13use wasmcloud_core::HostData;
14use wasmcloud_provider_sdk::{
15    provider::{handle_provider_commands, receive_link_for_provider, ProviderCommandReceivers},
16    ProviderConnection,
17};
18use wrpc_interface_http::ServeHttp;
19
20// Re-export the main HTTP client provider implementation
21pub(crate) mod provider;
22
23// Common features shared with external HTTP client provider
24use wasmcloud_core::http_client::{
25    DEFAULT_IDLE_TIMEOUT, LOAD_NATIVE_CERTS, LOAD_WEBPKI_CERTS, SSL_CERTS_FILE,
26};
27
28impl crate::wasmbus::Host {
29    /// Initializes and starts the internal HTTP client provider.
30    ///
31    /// This method is called by the wasmCloud host to create and start the built-in
32    /// HTTP client provider. It sets up the provider with the host's configuration,
33    /// establishes the necessary NATS connections, and starts the provider's command
34    /// handling tasks.
35    ///
36    /// # Arguments
37    ///
38    /// * `host_data` - Configuration data from the host
39    /// * `provider_xkey` - The provider's signing key for secure communications
40    /// * `provider_id` - Unique identifier for this provider instance
41    ///
42    /// # Returns
43    ///
44    /// A JoinSet containing the provider's background tasks, or an error if initialization fails
45    pub(crate) async fn start_http_client_provider(
46        &self,
47        host_data: HostData,
48        provider_xkey: XKey,
49        provider_id: &str,
50    ) -> anyhow::Result<JoinSet<()>> {
51        info!("Starting HTTP client provider with ID: {}", provider_id);
52        let host_id = self.host_key.public_key();
53
54        // Initialize TLS connector based on configuration
55        let tls = if host_data.config.is_empty() {
56            debug!("Using default TLS connector");
57            wasmcloud_provider_sdk::core::tls::DEFAULT_RUSTLS_CONNECTOR.clone()
58        } else {
59            debug!("Configuring custom TLS connector");
60            let mut ca = rustls::RootCertStore::empty();
61
62            // Load native certificates if configured
63            if host_data
64                .config
65                .get(LOAD_NATIVE_CERTS)
66                .map(|v| v.eq_ignore_ascii_case("true"))
67                .unwrap_or(true)
68            {
69                let (added, ignored) = ca.add_parsable_certificates(
70                    wasmcloud_provider_sdk::core::tls::NATIVE_ROOTS
71                        .iter()
72                        .cloned(),
73                );
74                debug!(added, ignored, "loaded native root certificate store");
75            }
76
77            // Load Mozilla trusted root certificates if configured
78            if host_data
79                .config
80                .get(LOAD_WEBPKI_CERTS)
81                .map(|v| v.eq_ignore_ascii_case("true"))
82                .unwrap_or(true)
83            {
84                ca.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
85                debug!("loaded webpki root certificate store");
86            }
87
88            // Load additional certificates from file if specified
89            if let Some(file_path) = host_data.config.get(SSL_CERTS_FILE) {
90                let f = std::fs::File::open(file_path)?;
91                let mut reader = BufReader::new(f);
92                let certs = rustls_pemfile::certs(&mut reader).collect::<Result<Vec<_>, _>>()?;
93                let (added, ignored) = ca.add_parsable_certificates(certs);
94                debug!(
95                    added,
96                    ignored, "added additional root certificates from file"
97                );
98            }
99
100            tokio_rustls::TlsConnector::from(Arc::new(
101                rustls::ClientConfig::builder()
102                    .with_root_certificates(ca)
103                    .with_no_client_auth(),
104            ))
105        };
106
107        // Create provider instance
108        debug!("Creating HTTP client provider instance");
109        let provider = provider::HttpClientProvider::new(tls, DEFAULT_IDLE_TIMEOUT).await?;
110
111        let mut tasks = JoinSet::new();
112
113        debug!("Setting up provider command receivers");
114        let (quit_tx, quit_rx) = broadcast::channel(1);
115        let commands = ProviderCommandReceivers::new(
116            Arc::clone(&self.rpc_nats),
117            &quit_tx,
118            &self.host_config.lattice,
119            provider_id,
120            provider_id,
121            &host_id,
122        )
123        .await?;
124
125        debug!("Creating provider connection");
126        let conn = ProviderConnection::new(
127            Arc::clone(&self.rpc_nats),
128            Arc::from(provider_id),
129            Arc::clone(&self.host_config.lattice),
130            host_id.to_string(),
131            host_data.config,
132            provider_xkey,
133            Arc::clone(&self.secrets_xkey),
134        )
135        .context("failed to establish provider connection")?;
136
137        debug!(
138            target: "http_client::connection",
139            provider_id = %provider_id,
140            lattice = %self.host_config.lattice,
141            "Provider connection created"
142        );
143
144        // Process link definitions with enhanced logging
145        for ld in host_data.link_definitions {
146            debug!(
147                target: "http_client::link",
148                link_name = %ld.name,
149                source_id = %ld.source_id,
150                target = %ld.target,
151                interfaces = ?ld.interfaces,
152                "Processing link definitions"
153            );
154            if let Err(err) = receive_link_for_provider(&provider, &conn, ld.clone()).await {
155                error!(
156                    target: "http_client::link",
157                    error = %err,
158                    "Failed to initialize link during provider startup"
159                );
160            } else {
161                debug!(
162                    target: "http_client::link",
163                    instance = "wasi:http/outgoing-handler",
164                    link_name = %ld.name,
165                    target = %provider_id,
166                    "Successfully initialized link"
167                );
168            }
169        }
170
171        let provider_clone = provider.clone();
172        let conn_clone = conn.clone();
173
174        info!("Starting provider command handler");
175        tasks.spawn(async move {
176            handle_provider_commands(provider, &conn, quit_rx, quit_tx, commands).await
177        });
178
179        // Start the HTTP client interface handler
180        tasks.spawn(async move {
181            debug!("Setting up wrpc interface");
182            let wrpc = match conn_clone.get_wrpc_client(conn_clone.provider_key()).await {
183                Ok(wrpc) => wrpc,
184                Err(err) => {
185                    error!("Failed to get wRPC client: {}", err);
186                    return;
187                }
188            };
189
190            let [(_, _, mut invocations)] = match wrpc_interface_http::bindings::exports::wrpc::http::outgoing_handler::serve_interface(
191                &wrpc,
192                ServeHttp(provider_clone.clone()),
193            ).await {
194                Ok(interfaces) => interfaces,
195                Err(err) => {
196                    error!("Failed to serve exports: {}", err);
197                    return;
198                }
199            };
200
201            info!("HTTP client provider ready to handle requests");
202            let mut tasks = JoinSet::new();
203
204            loop {
205                select! {
206                    Some(res) = invocations.next() => {
207                        match res {
208                            Ok(fut) => {
209                                tasks.spawn(async move {
210                                    if let Err(err) = fut.await {
211                                        warn!(?err, "failed to serve invocation");
212                                    }
213                                });
214                            },
215                            Err(err) => {
216                                warn!(?err, "failed to accept invocation");
217                            }
218                        }
219                    }
220                }
221            }
222        });
223
224        info!("HTTP client provider started successfully");
225        Ok(tasks)
226    }
227}