wasmcloud_runtime/component/
http.rs

1use core::ops::Deref;
2
3use anyhow::{bail, Context as _};
4use futures::stream::StreamExt as _;
5use tokio::sync::oneshot;
6use tokio::{join, spawn};
7use tracing::{debug, debug_span, info_span, instrument, trace, warn, Instrument as _, Span};
8use tracing_opentelemetry::OpenTelemetrySpanExt as _;
9use wasmtime::component::ResourceTable;
10use wasmtime_wasi_http::body::{HostIncomingBody, HyperOutgoingBody};
11use wasmtime_wasi_http::types::{
12    HostFutureIncomingResponse, HostIncomingRequest, IncomingResponse, OutgoingRequestConfig,
13};
14use wasmtime_wasi_http::{HttpResult, WasiHttpCtx, WasiHttpView};
15use wrpc_interface_http::ServeIncomingHandlerWasmtime;
16
17use crate::capability::http::types;
18
19use super::{new_store, Ctx, Handler, Instance, ReplacedInstanceTarget, WrpcServeEvent};
20
21pub mod incoming_http_bindings {
22    wasmtime::component::bindgen!({
23        world: "incoming-http",
24        imports: { default: async | trappable | tracing },
25        exports: { default: async | trappable | tracing },
26        with: {
27            "wasi:http/types": wasmtime_wasi_http::bindings::http::types,
28        },
29    });
30}
31
32#[instrument(level = "debug", skip_all)]
33async fn invoke_outgoing_handle<H>(
34    handler: H,
35    request: http::Request<HyperOutgoingBody>,
36    config: OutgoingRequestConfig,
37) -> anyhow::Result<Result<IncomingResponse, types::ErrorCode>>
38where
39    H: Handler,
40{
41    use wrpc_interface_http::InvokeOutgoingHandler as _;
42
43    let between_bytes_timeout = config.between_bytes_timeout;
44    debug!(
45        target: "runtime::http::outgoing",
46        method = %request.method(),
47        uri = %request.uri(),
48        headers = ?request.headers(),
49        connect_timeout = ?config.connect_timeout,
50        first_byte_timeout = ?config.first_byte_timeout,
51        between_bytes_timeout = ?config.between_bytes_timeout,
52        use_tls = ?config.use_tls,
53        "Invoking `wrpc:http/outgoing-handler.handle`"
54    );
55
56    match handler
57        .invoke_handle_wasmtime(
58            Some(ReplacedInstanceTarget::HttpOutgoingHandler),
59            request,
60            config,
61        )
62        .await?
63    {
64        (Ok(resp), errs, io) => {
65            debug!(
66                target: "runtime::http::outgoing",
67                status = %resp.status(),
68                headers = ?resp.headers(),
69                "`wrpc:http/outgoing-handler.handle` succeeded"
70            );
71            let worker = wasmtime_wasi::runtime::spawn(
72                async move {
73                    // TODO: Do more than just log errors
74                    join!(
75                        errs.for_each(|err| async move {
76                            warn!(
77                                target: "runtime::http::outgoing",
78                                ?err,
79                                "Body processing error encountered"
80                            );
81                        }),
82                        async move {
83                            if let Some(io) = io {
84                                debug!(target: "runtime::http::outgoing", "Performing async I/O");
85                                if let Err(err) = io.await {
86                                    warn!(
87                                        target: "runtime::http::outgoing",
88                                        ?err,
89                                        "Failed to complete async I/O"
90                                    );
91                                }
92                                debug!(target: "runtime::http::outgoing", "Async I/O completed");
93                            }
94                        }
95                    );
96                }
97                .in_current_span(),
98            );
99            Ok(Ok(IncomingResponse {
100                resp,
101                worker: Some(worker),
102                between_bytes_timeout,
103            }))
104        }
105        (Err(err), _, _) => {
106            warn!(
107                target: "runtime::http::outgoing",
108                ?err,
109                "`wrpc:http/outgoing-handler.handle` returned an error code"
110            );
111            Ok(Err(err))
112        }
113    }
114}
115
116impl<H> WasiHttpView for Ctx<H>
117where
118    H: Handler,
119{
120    fn ctx(&mut self) -> &mut WasiHttpCtx {
121        &mut self.http
122    }
123
124    fn table(&mut self) -> &mut ResourceTable {
125        &mut self.table
126    }
127
128    #[instrument(level = "debug", skip_all)]
129    fn send_request(
130        &mut self,
131        request: http::Request<HyperOutgoingBody>,
132        config: OutgoingRequestConfig,
133    ) -> HttpResult<HostFutureIncomingResponse>
134    where
135        Self: Sized,
136    {
137        debug!(
138            target: "runtime::http::send_request",
139            method = %request.method(),
140            uri = %request.uri(),
141            headers = ?request.headers(),
142            connect_timeout = ?config.connect_timeout,
143            first_byte_timeout = ?config.first_byte_timeout,
144            between_bytes_timeout = ?config.between_bytes_timeout,
145            use_tls = ?config.use_tls,
146            "WasiHttpView::send_request called"
147        );
148
149        self.attach_parent_context();
150        debug!(
151            target: "runtime::http::send_request",
152            "Parent context attached, spawning request handler"
153        );
154
155        // Get the current tracing span
156        let current_span = Span::current();
157
158        // Spawn the request handler in the current tracing span
159        let future = wasmtime_wasi::runtime::spawn(
160            invoke_outgoing_handle(self.handler.clone(), request, config).in_current_span(),
161        );
162
163        debug!(
164            target: "runtime::http::send_request",
165            span_id = ?current_span.id(),
166            "Request handler spawned"
167        );
168        Ok(HostFutureIncomingResponse::pending(future))
169    }
170}
171
172impl<H, C> ServeIncomingHandlerWasmtime<C> for Instance<H, C>
173where
174    H: Handler,
175    C: Send + Deref<Target = tracing::Span>,
176{
177    #[instrument(level = "debug", skip_all)]
178    async fn handle(
179        &self,
180        cx: C,
181        request: ::http::Request<wasmtime_wasi_http::body::HyperIncomingBody>,
182    ) -> anyhow::Result<
183        Result<
184            http::Response<wasmtime_wasi_http::body::HyperOutgoingBody>,
185            wasmtime_wasi_http::bindings::http::types::ErrorCode,
186        >,
187    > {
188        // Set the parent of the current context to the span passed in
189        Span::current().set_parent(cx.deref().context());
190        let scheme = request.uri().scheme().context("scheme missing")?;
191        let scheme = wrpc_interface_http::bindings::wrpc::http::types::Scheme::from(scheme).into();
192
193        let (tx, rx) = oneshot::channel();
194        let mut store = new_store(&self.engine, self.handler.clone(), self.max_execution_time);
195        let pre = incoming_http_bindings::IncomingHttpPre::new(self.pre.clone())
196            .context("failed to pre-instantiate `wasi:http/incoming-handler`")?;
197        trace!("instantiating `wasi:http/incoming-handler`");
198        let bindings = pre
199            .instantiate_async(&mut store)
200            .instrument(debug_span!("instantiate_async"))
201            .await
202            .context("failed to instantiate `wasi:http/incoming-handler`")?;
203        let data = store.data_mut();
204
205        // The below is adapted from `WasiHttpView::new_incoming_request`, which is unusable for
206        // us, since it requires a `hyper::Error`
207
208        let (parts, body) = request.into_parts();
209        let body = HostIncomingBody::new(
210            body,
211            // TODO: this needs to be plumbed through
212            std::time::Duration::from_millis(600 * 1000),
213        );
214        let incoming_req = HostIncomingRequest::new(data, parts, scheme, Some(body))?;
215        let request = data.table().push(incoming_req)?;
216
217        let response = data
218            .new_response_outparam(tx)
219            .context("failed to create response")?;
220
221        // TODO: Replicate this for custom interface
222        // Set the current invocation parent context for injection on outgoing wRPC requests
223        let call_incoming_handle = info_span!("call_http_incoming_handle");
224        store.data_mut().parent_context = Some(call_incoming_handle.context());
225        let handle = spawn(
226            async move {
227                debug!("invoking `wasi:http/incoming-handler.handle`");
228                if let Err(err) = bindings
229                    .wasi_http_incoming_handler()
230                    .call_handle(&mut store, request, response)
231                    .instrument(call_incoming_handle)
232                    .await
233                {
234                    warn!(?err, "failed to call `wasi:http/incoming-handler.handle`");
235                    bail!(err.context("failed to call `wasi:http/incoming-handler.handle`"));
236                }
237                Ok(())
238            }
239            .in_current_span(),
240        );
241        let res = async {
242            debug!("awaiting `wasi:http/incoming-handler.handle` response");
243            match rx.await {
244                Ok(Ok(res)) => {
245                    debug!("successful `wasi:http/incoming-handler.handle` response received");
246                    Ok(Ok(res))
247                }
248                Ok(Err(err)) => {
249                    debug!(
250                        ?err,
251                        "unsuccessful `wasi:http/incoming-handler.handle` response received"
252                    );
253                    Ok(Err(err))
254                }
255                Err(_) => {
256                    debug!("`wasi:http/incoming-handler.handle` response sender dropped");
257                    handle
258                        .instrument(debug_span!("await_response"))
259                        .await
260                        .context("failed to join handle task")??;
261                    bail!("component did not call `response-outparam::set`")
262                }
263            }
264        }
265        .in_current_span()
266        .await;
267        let success = res.as_ref().is_ok_and(Result::is_ok);
268        if let Err(err) = self
269            .events
270            .try_send(WrpcServeEvent::HttpIncomingHandlerHandleReturned {
271                context: cx,
272                success,
273            })
274        {
275            warn!(
276                ?err,
277                success, "failed to send `wasi:http/incoming-handler.handle` return event"
278            );
279        }
280        res
281    }
282}