wasmcloud_runtime/component/
http.rs1use core::ops::Deref;
2
3use anyhow::{bail, Context as _};
4use futures::stream::StreamExt as _;
5use tokio::sync::oneshot;
6use tokio::{join, spawn};
7use tracing::{debug, debug_span, info_span, instrument, trace, warn, Instrument as _, Span};
8use tracing_opentelemetry::OpenTelemetrySpanExt as _;
9use wasmtime::component::ResourceTable;
10use wasmtime_wasi_http::body::{HostIncomingBody, HyperOutgoingBody};
11use wasmtime_wasi_http::types::{
12 HostFutureIncomingResponse, HostIncomingRequest, IncomingResponse, OutgoingRequestConfig,
13};
14use wasmtime_wasi_http::{HttpResult, WasiHttpCtx, WasiHttpView};
15use wrpc_interface_http::ServeIncomingHandlerWasmtime;
16
17use crate::capability::http::types;
18
19use super::{new_store, Ctx, Handler, Instance, ReplacedInstanceTarget, WrpcServeEvent};
20
21pub mod incoming_http_bindings {
22 wasmtime::component::bindgen!({
23 world: "incoming-http",
24 imports: { default: async | trappable | tracing },
25 exports: { default: async | trappable | tracing },
26 with: {
27 "wasi:http/types": wasmtime_wasi_http::bindings::http::types,
28 },
29 });
30}
31
32#[instrument(level = "debug", skip_all)]
33async fn invoke_outgoing_handle<H>(
34 handler: H,
35 request: http::Request<HyperOutgoingBody>,
36 config: OutgoingRequestConfig,
37) -> anyhow::Result<Result<IncomingResponse, types::ErrorCode>>
38where
39 H: Handler,
40{
41 use wrpc_interface_http::InvokeOutgoingHandler as _;
42
43 let between_bytes_timeout = config.between_bytes_timeout;
44 debug!(
45 target: "runtime::http::outgoing",
46 method = %request.method(),
47 uri = %request.uri(),
48 headers = ?request.headers(),
49 connect_timeout = ?config.connect_timeout,
50 first_byte_timeout = ?config.first_byte_timeout,
51 between_bytes_timeout = ?config.between_bytes_timeout,
52 use_tls = ?config.use_tls,
53 "Invoking `wrpc:http/outgoing-handler.handle`"
54 );
55
56 match handler
57 .invoke_handle_wasmtime(
58 Some(ReplacedInstanceTarget::HttpOutgoingHandler),
59 request,
60 config,
61 )
62 .await?
63 {
64 (Ok(resp), errs, io) => {
65 debug!(
66 target: "runtime::http::outgoing",
67 status = %resp.status(),
68 headers = ?resp.headers(),
69 "`wrpc:http/outgoing-handler.handle` succeeded"
70 );
71 let worker = wasmtime_wasi::runtime::spawn(
72 async move {
73 join!(
75 errs.for_each(|err| async move {
76 warn!(
77 target: "runtime::http::outgoing",
78 ?err,
79 "Body processing error encountered"
80 );
81 }),
82 async move {
83 if let Some(io) = io {
84 debug!(target: "runtime::http::outgoing", "Performing async I/O");
85 if let Err(err) = io.await {
86 warn!(
87 target: "runtime::http::outgoing",
88 ?err,
89 "Failed to complete async I/O"
90 );
91 }
92 debug!(target: "runtime::http::outgoing", "Async I/O completed");
93 }
94 }
95 );
96 }
97 .in_current_span(),
98 );
99 Ok(Ok(IncomingResponse {
100 resp,
101 worker: Some(worker),
102 between_bytes_timeout,
103 }))
104 }
105 (Err(err), _, _) => {
106 warn!(
107 target: "runtime::http::outgoing",
108 ?err,
109 "`wrpc:http/outgoing-handler.handle` returned an error code"
110 );
111 Ok(Err(err))
112 }
113 }
114}
115
116impl<H> WasiHttpView for Ctx<H>
117where
118 H: Handler,
119{
120 fn ctx(&mut self) -> &mut WasiHttpCtx {
121 &mut self.http
122 }
123
124 fn table(&mut self) -> &mut ResourceTable {
125 &mut self.table
126 }
127
128 #[instrument(level = "debug", skip_all)]
129 fn send_request(
130 &mut self,
131 request: http::Request<HyperOutgoingBody>,
132 config: OutgoingRequestConfig,
133 ) -> HttpResult<HostFutureIncomingResponse>
134 where
135 Self: Sized,
136 {
137 debug!(
138 target: "runtime::http::send_request",
139 method = %request.method(),
140 uri = %request.uri(),
141 headers = ?request.headers(),
142 connect_timeout = ?config.connect_timeout,
143 first_byte_timeout = ?config.first_byte_timeout,
144 between_bytes_timeout = ?config.between_bytes_timeout,
145 use_tls = ?config.use_tls,
146 "WasiHttpView::send_request called"
147 );
148
149 self.attach_parent_context();
150 debug!(
151 target: "runtime::http::send_request",
152 "Parent context attached, spawning request handler"
153 );
154
155 let current_span = Span::current();
157
158 let future = wasmtime_wasi::runtime::spawn(
160 invoke_outgoing_handle(self.handler.clone(), request, config).in_current_span(),
161 );
162
163 debug!(
164 target: "runtime::http::send_request",
165 span_id = ?current_span.id(),
166 "Request handler spawned"
167 );
168 Ok(HostFutureIncomingResponse::pending(future))
169 }
170}
171
172impl<H, C> ServeIncomingHandlerWasmtime<C> for Instance<H, C>
173where
174 H: Handler,
175 C: Send + Deref<Target = tracing::Span>,
176{
177 #[instrument(level = "debug", skip_all)]
178 async fn handle(
179 &self,
180 cx: C,
181 request: ::http::Request<wasmtime_wasi_http::body::HyperIncomingBody>,
182 ) -> anyhow::Result<
183 Result<
184 http::Response<wasmtime_wasi_http::body::HyperOutgoingBody>,
185 wasmtime_wasi_http::bindings::http::types::ErrorCode,
186 >,
187 > {
188 Span::current().set_parent(cx.deref().context());
190 let scheme = request.uri().scheme().context("scheme missing")?;
191 let scheme = wrpc_interface_http::bindings::wrpc::http::types::Scheme::from(scheme).into();
192
193 let (tx, rx) = oneshot::channel();
194 let mut store = new_store(&self.engine, self.handler.clone(), self.max_execution_time);
195 let pre = incoming_http_bindings::IncomingHttpPre::new(self.pre.clone())
196 .context("failed to pre-instantiate `wasi:http/incoming-handler`")?;
197 trace!("instantiating `wasi:http/incoming-handler`");
198 let bindings = pre
199 .instantiate_async(&mut store)
200 .instrument(debug_span!("instantiate_async"))
201 .await
202 .context("failed to instantiate `wasi:http/incoming-handler`")?;
203 let data = store.data_mut();
204
205 let (parts, body) = request.into_parts();
209 let body = HostIncomingBody::new(
210 body,
211 std::time::Duration::from_millis(600 * 1000),
213 );
214 let incoming_req = HostIncomingRequest::new(data, parts, scheme, Some(body))?;
215 let request = data.table().push(incoming_req)?;
216
217 let response = data
218 .new_response_outparam(tx)
219 .context("failed to create response")?;
220
221 let call_incoming_handle = info_span!("call_http_incoming_handle");
224 store.data_mut().parent_context = Some(call_incoming_handle.context());
225 let handle = spawn(
226 async move {
227 debug!("invoking `wasi:http/incoming-handler.handle`");
228 if let Err(err) = bindings
229 .wasi_http_incoming_handler()
230 .call_handle(&mut store, request, response)
231 .instrument(call_incoming_handle)
232 .await
233 {
234 warn!(?err, "failed to call `wasi:http/incoming-handler.handle`");
235 bail!(err.context("failed to call `wasi:http/incoming-handler.handle`"));
236 }
237 Ok(())
238 }
239 .in_current_span(),
240 );
241 let res = async {
242 debug!("awaiting `wasi:http/incoming-handler.handle` response");
243 match rx.await {
244 Ok(Ok(res)) => {
245 debug!("successful `wasi:http/incoming-handler.handle` response received");
246 Ok(Ok(res))
247 }
248 Ok(Err(err)) => {
249 debug!(
250 ?err,
251 "unsuccessful `wasi:http/incoming-handler.handle` response received"
252 );
253 Ok(Err(err))
254 }
255 Err(_) => {
256 debug!("`wasi:http/incoming-handler.handle` response sender dropped");
257 handle
258 .instrument(debug_span!("await_response"))
259 .await
260 .context("failed to join handle task")??;
261 bail!("component did not call `response-outparam::set`")
262 }
263 }
264 }
265 .in_current_span()
266 .await;
267 let success = res.as_ref().is_ok_and(Result::is_ok);
268 if let Err(err) = self
269 .events
270 .try_send(WrpcServeEvent::HttpIncomingHandlerHandleReturned {
271 context: cx,
272 success,
273 })
274 {
275 warn!(
276 ?err,
277 success, "failed to send `wasi:http/incoming-handler.handle` return event"
278 );
279 }
280 res
281 }
282}