1use core::iter::zip;
2use core::pin::pin;
3
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use anyhow::{bail, ensure, Context as _};
8use bytes::BytesMut;
9use futures::future::try_join_all;
10use tokio::io::AsyncWriteExt as _;
11use tokio::time::Instant;
12use tokio::try_join;
13use tokio_util::codec::Encoder;
14use tracing::{debug, instrument, trace, warn, Instrument as _, Span};
15use wasmtime::component::{types, LinkerInstance, ResourceType, Type, Val};
16use wasmtime::{AsContextMut, Engine, StoreContextMut};
17use wrpc_transport::{Index as _, Invoke, InvokeExt as _};
18
19use crate::rpc::Error;
20use crate::{read_value, rpc_func_name, rpc_result_type, ValEncoder, WrpcView, WrpcViewExt as _};
21
22#[instrument(level = "trace", skip_all)]
24pub fn link_item<V>(
25 engine: &Engine,
26 linker: &mut LinkerInstance<V>,
27 guest_resources: impl Into<Arc<[ResourceType]>>,
28 host_resources: impl Into<Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>>,
29 ty: types::ComponentItem,
30 instance: impl Into<Arc<str>>,
31 name: impl Into<Arc<str>>,
32) -> wasmtime::Result<()>
33where
34 V: WrpcView,
35{
36 let instance = instance.into();
37 let guest_resources = guest_resources.into();
38 let host_resources = host_resources.into();
39 match ty {
40 types::ComponentItem::ComponentFunc(ty) => {
41 let name = name.into();
42 debug!(?instance, ?name, "linking function");
43 link_function(
44 linker,
45 Arc::clone(&guest_resources),
46 Arc::clone(&host_resources),
47 ty,
48 instance,
49 name,
50 )?;
51 }
52 types::ComponentItem::CoreFunc(_) => {
53 bail!("polyfilling core functions not supported yet")
54 }
55 types::ComponentItem::Module(_) => bail!("polyfilling modules not supported yet"),
56 types::ComponentItem::Component(ty) => {
57 for (name, ty) in ty.imports(engine) {
58 debug!(?instance, name, "linking component item");
59 link_item(
60 engine,
61 linker,
62 Arc::clone(&guest_resources),
63 Arc::clone(&host_resources),
64 ty,
65 "",
66 name,
67 )?;
68 }
69 }
70 types::ComponentItem::ComponentInstance(ty) => {
71 let name = name.into();
72 let mut linker = linker
73 .instance(&name)
74 .with_context(|| format!("failed to instantiate `{name}` in the linker"))?;
75 debug!(?instance, ?name, "linking instance");
76 link_instance(
77 engine,
78 &mut linker,
79 guest_resources,
80 host_resources,
81 ty,
82 name,
83 )?;
84 }
85 types::ComponentItem::Type(_) => {}
86 types::ComponentItem::Resource(ty) => {
87 let name = name.into();
88 let Some((guest_ty, host_ty)) = host_resources
89 .get(&*instance)
90 .and_then(|instance| instance.get(&*name))
91 else {
92 bail!("resource type for {instance}/{name} not defined");
93 };
94 ensure!(ty == *guest_ty, "{instance}/{name} resource type mismatch");
95
96 debug!(?instance, ?name, "linking resource");
97 linker.resource(&name, *host_ty, |_, _| Ok(()))?;
98 }
99 }
100 Ok(())
101}
102
103#[instrument(level = "trace", skip_all)]
105pub fn link_instance<V>(
106 engine: &Engine,
107 linker: &mut LinkerInstance<V>,
108 guest_resources: impl Into<Arc<[ResourceType]>>,
109 host_resources: impl Into<Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>>,
110 ty: types::ComponentInstance,
111 name: impl Into<Arc<str>>,
112) -> wasmtime::Result<()>
113where
114 V: WrpcView,
115{
116 let instance = name.into();
117 let guest_resources = guest_resources.into();
118 let host_resources = host_resources.into();
119 for (name, ty) in ty.exports(engine) {
120 debug!(name, "linking instance item");
121 link_item(
122 engine,
123 linker,
124 Arc::clone(&guest_resources),
125 Arc::clone(&host_resources),
126 ty,
127 Arc::clone(&instance),
128 name,
129 )?;
130 }
131 Ok(())
132}
133
134#[allow(clippy::too_many_arguments)]
135async fn invoke<T: WrpcView>(
136 mut store: &mut StoreContextMut<'_, T>,
137 params: &[Val],
138 results: &mut [Val],
139 guest_resources: Arc<[ResourceType]>,
140 params_ty: impl IntoIterator<Item = (&str, Type)>,
141 results_ty: impl IntoIterator<Item = Type>,
142 instance: Arc<str>,
143 name: Arc<str>,
144) -> wasmtime::Result<anyhow::Result<()>> {
145 let mut buf = BytesMut::default();
146 let mut deferred = vec![];
147 for (v, (name, ref ty)) in zip(params, params_ty) {
148 let mut enc = ValEncoder::new(store.as_context_mut(), ty, &guest_resources);
149 enc.encode(v, &mut buf)
150 .with_context(|| format!("failed to encode parameter `{name}`"))?;
151 deferred.push(enc.deferred);
152 }
153 let view = store.data_mut().wrpc();
154 let clt = view.ctx.client();
155 let cx = view.ctx.context();
156 let timeout = view.ctx.timeout();
157 let buf = buf.freeze();
158 let paths = &[[]; 0];
160 let rpc_name = rpc_func_name(&name);
161 let start = Instant::now();
162 let invocation = if let Some(timeout) = timeout {
163 clt.timeout(timeout)
164 .invoke(cx, &instance, rpc_name, buf, paths)
165 .await
166 } else {
167 clt.invoke(cx, &instance, rpc_name, buf, paths).await
168 }
169 .with_context(|| format!("failed to invoke `{instance}.{name}` polyfill via wRPC"));
170 let (outgoing, incoming) = match invocation {
171 Ok((outgoing, incoming)) => (outgoing, incoming),
172 Err(err) => return Ok(Err(err)),
173 };
174 let tx = async {
175 try_join_all(
176 zip(0.., deferred)
177 .filter_map(|(i, f)| f.map(|f| (outgoing.index(&[i]), f)))
178 .map(|(w, f)| async move {
179 let w = w?;
180 f(w).await
181 }),
182 )
183 .await
184 .context("failed to write asynchronous parameters")?;
185 let mut outgoing = pin!(outgoing);
186 outgoing
187 .flush()
188 .await
189 .context("failed to flush outgoing stream")?;
190 if let Err(err) = outgoing.shutdown().await {
191 trace!(?err, "failed to shutdown outgoing stream");
192 }
193 anyhow::Ok(())
194 };
195 let rx = async {
196 let mut incoming = pin!(incoming);
197 for (i, (v, ref ty)) in zip(results, results_ty).enumerate() {
198 read_value(&mut store, &mut incoming, &guest_resources, v, ty, &[i])
199 .await
200 .with_context(|| format!("failed to decode return value {i}"))?;
201 }
202 Ok(())
203 };
204 let res = if let Some(timeout) = timeout {
205 let timeout = timeout.saturating_sub(Instant::now().saturating_duration_since(start));
206 try_join!(
207 async {
208 tokio::time::timeout(timeout, tx)
209 .await
210 .context("data transmission timed out")?
211 },
212 async {
213 tokio::time::timeout(timeout, rx)
214 .await
215 .context("data receipt timed out")?
216 },
217 )
218 } else {
219 try_join!(tx, rx)
220 };
221 match res {
222 Ok(((), ())) => Ok(Ok(())),
223 Err(err) => Ok(Err(err)),
224 }
225}
226
227#[instrument(level = "trace", skip_all)]
229pub fn link_function<V>(
230 linker: &mut LinkerInstance<V>,
231 guest_resources: impl Into<Arc<[ResourceType]>>,
232 host_resources: impl Into<Arc<HashMap<Box<str>, HashMap<Box<str>, (ResourceType, ResourceType)>>>>,
233 ty: types::ComponentFunc,
234 instance: impl Into<Arc<str>>,
235 name: impl Into<Arc<str>>,
236) -> wasmtime::Result<()>
237where
238 V: WrpcView,
239{
240 let span = Span::current();
241 let instance = instance.into();
242 let name = name.into();
243 let guest_resources = guest_resources.into();
244 let host_resources = host_resources.into();
245 match rpc_result_type(&host_resources, ty.results()) {
246 None => linker.func_new_async(&Arc::clone(&name), move |mut store, params, results| {
247 let ty = ty.clone();
248 let instance = Arc::clone(&instance);
249 let name = Arc::clone(&name);
250 let resources = Arc::clone(&guest_resources);
251 Box::new(
252 async move {
253 match invoke(
254 &mut store,
255 params,
256 results,
257 resources,
258 ty.params(),
259 ty.results(),
260 instance,
261 name,
262 )
263 .await
264 {
265 Ok(Ok(())) => Ok(()),
266 Ok(Err(err)) => Err(err),
267 Err(err) => Err(err),
268 }
269 }
270 .instrument(span.clone()),
271 )
272 }),
273 Some(None) => {
275 linker.func_new_async(&Arc::clone(&name), move |mut store, params, results| {
276 let ty = ty.clone();
277 let instance = Arc::clone(&instance);
278 let name = Arc::clone(&name);
279 let resources = Arc::clone(&guest_resources);
280 Box::new(
281 async move {
282 let [result] = results else {
283 bail!("result type mismatch");
284 };
285 match invoke(
286 &mut store,
287 params,
288 &mut [],
289 resources,
290 ty.params(),
291 None,
292 instance,
293 name,
294 )
295 .await?
296 {
297 Ok(()) => {
298 *result = Val::Result(Ok(None));
299 }
300 Err(err) => {
301 let err = store.data_mut().push_error(Error::Invoke(err))?;
302 let err = err
303 .try_into_resource_any(&mut store)
304 .context("failed to lower error resource")?;
305 *result = Val::Result(Err(Some(Box::new(Val::Resource(err)))));
306 }
307 }
308 Ok(())
309 }
310 .instrument(span.clone()),
311 )
312 })
313 }
314 Some(Some(result_ty)) => {
316 linker.func_new_async(&Arc::clone(&name), move |mut store, params, results| {
317 let ty = ty.clone();
318 let instance = Arc::clone(&instance);
319 let name = Arc::clone(&name);
320 let resources = Arc::clone(&guest_resources);
321 let result_ty = result_ty.clone();
322 Box::new(
323 async move {
324 let [result] = results else {
325 bail!("result type mismatch");
326 };
327 let mut ok = [Val::Bool(false); 1];
328 match invoke(
329 &mut store,
330 params,
331 ok.as_mut_slice(),
332 resources,
333 ty.params(),
334 [result_ty],
335 instance,
336 name,
337 )
338 .await?
339 {
340 Ok(()) => {
341 let [ok] = ok;
342 *result = Val::Result(Ok(Some(Box::new(ok))));
343 }
344 Err(err) => {
345 let err = store.data_mut().push_error(Error::Invoke(err))?;
346 let err = err
347 .try_into_resource_any(&mut store)
348 .context("failed to lower error resource")?;
349 *result = Val::Result(Err(Some(Box::new(Val::Resource(err)))));
350 }
351 }
352 Ok(())
353 }
354 .instrument(span.clone()),
355 )
356 })
357 }
358 }
359}