1use core::future::Future;
2use core::iter::zip;
3use core::ops::{BitOrAssign, Shl};
4use core::pin::{pin, Pin};
5
6use std::collections::HashSet;
7
8use anyhow::{bail, Context as _};
9use bytes::{BufMut as _, BytesMut};
10use futures::stream::FuturesUnordered;
11use futures::TryStreamExt as _;
12use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
13use tokio_util::codec::{Encoder, FramedRead};
14use tokio_util::compat::FuturesAsyncReadCompatExt as _;
15use tracing::{error, instrument, trace, warn};
16use uuid::Uuid;
17use wasm_tokio::cm::AsyncReadValue as _;
18use wasm_tokio::{
19 AsyncReadCore as _, AsyncReadLeb128 as _, AsyncReadUtf8 as _, CoreNameEncoder,
20 CoreVecEncoderBytes, Leb128Encoder, Utf8Codec,
21};
22use wasmtime::component::types::{Case, Field};
23use wasmtime::component::{ResourceType, Type, Val};
24use wasmtime::{AsContextMut, StoreContextMut};
25use wasmtime_wasi::p2::pipe::AsyncReadStream;
26use wasmtime_wasi::p2::{DynInputStream, StreamError};
27use wrpc_transport::ListDecoderU8;
28
29use crate::{RemoteResource, WrpcView};
30
31pub struct ValEncoder<'a, T: 'static, W> {
32 pub store: StoreContextMut<'a, T>,
33 pub ty: &'a Type,
34 pub resources: &'a [ResourceType],
35 pub deferred: Option<
36 Box<dyn FnOnce(W) -> Pin<Box<dyn Future<Output = wasmtime::Result<()>> + Send>> + Send>,
37 >,
38}
39
40impl<T, W> ValEncoder<'_, T, W> {
41 #[must_use]
42 pub fn new<'a>(
43 store: StoreContextMut<'a, T>,
44 ty: &'a Type,
45 resources: &'a [ResourceType],
46 ) -> ValEncoder<'a, T, W> {
47 ValEncoder {
48 store,
49 ty,
50 resources,
51 deferred: None,
52 }
53 }
54
55 pub fn with_type<'a>(&'a mut self, ty: &'a Type) -> ValEncoder<'a, T, W> {
56 ValEncoder {
57 store: self.store.as_context_mut(),
58 ty,
59 resources: self.resources,
60 deferred: None,
61 }
62 }
63}
64
65fn find_enum_discriminant<'a, T>(
66 iter: impl IntoIterator<Item = T>,
67 names: impl IntoIterator<Item = &'a str>,
68 discriminant: &str,
69) -> wasmtime::Result<T> {
70 zip(iter, names)
71 .find_map(|(i, name)| (name == discriminant).then_some(i))
72 .context("unknown enum discriminant")
73}
74
75fn find_variant_discriminant<'a, T>(
76 iter: impl IntoIterator<Item = T>,
77 cases: impl IntoIterator<Item = Case<'a>>,
78 discriminant: &str,
79) -> wasmtime::Result<(T, Option<Type>)> {
80 zip(iter, cases)
81 .find_map(|(i, Case { name, ty })| (name == discriminant).then_some((i, ty)))
82 .context("unknown variant discriminant")
83}
84
85#[inline]
86fn flag_bits<'a, T: BitOrAssign + Shl<u8, Output = T> + From<u8>>(
87 names: impl IntoIterator<Item = &'a str>,
88 flags: impl IntoIterator<Item = &'a str>,
89) -> T {
90 let mut v = T::from(0);
91 let flags: HashSet<&str> = flags.into_iter().collect();
92 for (i, name) in zip(0u8.., names) {
93 if flags.contains(name) {
94 v |= T::from(1) << i;
95 }
96 }
97 v
98}
99
100async fn write_deferred<W, I>(w: W, deferred: I) -> wasmtime::Result<()>
101where
102 W: wrpc_transport::Index<W> + Sync + Send + 'static,
103 I: IntoIterator,
104 I::IntoIter: ExactSizeIterator<
105 Item = Option<
106 Box<dyn FnOnce(W) -> Pin<Box<dyn Future<Output = wasmtime::Result<()>> + Send>> + Send>,
107 >,
108 >,
109{
110 let mut futs: FuturesUnordered<_> = zip(0.., deferred)
111 .filter_map(|(i, f)| f.map(|f| (w.index(&[i]), f)))
112 .map(|(w, f)| async move {
113 let w = w?;
114 f(w).await
115 })
116 .collect();
117 while let Some(()) = futs.try_next().await? {}
118 Ok(())
119}
120
121impl<T, W> Encoder<&Val> for ValEncoder<'_, T, W>
122where
123 T: WrpcView,
124 W: AsyncWrite + wrpc_transport::Index<W> + Sync + Send + 'static,
125{
126 type Error = wasmtime::Error;
127
128 #[allow(clippy::too_many_lines)]
129 #[instrument(level = "trace", skip(self))]
130 fn encode(&mut self, v: &Val, dst: &mut BytesMut) -> Result<(), Self::Error> {
131 match (v, self.ty) {
132 (Val::Bool(v), Type::Bool) => {
133 dst.reserve(1);
134 dst.put_u8((*v).into());
135 Ok(())
136 }
137 (Val::S8(v), Type::S8) => {
138 dst.reserve(1);
139 dst.put_i8(*v);
140 Ok(())
141 }
142 (Val::U8(v), Type::U8) => {
143 dst.reserve(1);
144 dst.put_u8(*v);
145 Ok(())
146 }
147 (Val::S16(v), Type::S16) => Leb128Encoder
148 .encode(*v, dst)
149 .context("failed to encode s16"),
150 (Val::U16(v), Type::U16) => Leb128Encoder
151 .encode(*v, dst)
152 .context("failed to encode u16"),
153 (Val::S32(v), Type::S32) => Leb128Encoder
154 .encode(*v, dst)
155 .context("failed to encode s32"),
156 (Val::U32(v), Type::U32) => Leb128Encoder
157 .encode(*v, dst)
158 .context("failed to encode u32"),
159 (Val::S64(v), Type::S64) => Leb128Encoder
160 .encode(*v, dst)
161 .context("failed to encode s64"),
162 (Val::U64(v), Type::U64) => Leb128Encoder
163 .encode(*v, dst)
164 .context("failed to encode u64"),
165 (Val::Float32(v), Type::Float32) => {
166 dst.reserve(4);
167 dst.put_f32_le(*v);
168 Ok(())
169 }
170 (Val::Float64(v), Type::Float64) => {
171 dst.reserve(8);
172 dst.put_f64_le(*v);
173 Ok(())
174 }
175 (Val::Char(v), Type::Char) => {
176 Utf8Codec.encode(*v, dst).context("failed to encode char")
177 }
178 (Val::String(v), Type::String) => CoreNameEncoder
179 .encode(v.as_str(), dst)
180 .context("failed to encode string"),
181 (Val::List(vs), Type::List(ty)) => {
182 let ty = ty.ty();
183 let n = u32::try_from(vs.len()).context("list length does not fit in u32")?;
184 dst.reserve(5 + vs.len());
185 Leb128Encoder
186 .encode(n, dst)
187 .context("failed to encode list length")?;
188 let mut deferred = Vec::with_capacity(vs.len());
189 for v in vs {
190 let mut enc = self.with_type(&ty);
191 enc.encode(v, dst)
192 .context("failed to encode list element")?;
193 deferred.push(enc.deferred);
194 }
195 if deferred.iter().any(Option::is_some) {
196 self.deferred = Some(Box::new(|w| Box::pin(write_deferred(w, deferred))));
197 }
198 Ok(())
199 }
200 (Val::Record(vs), Type::Record(ty)) => {
201 dst.reserve(vs.len());
202 let mut deferred = Vec::with_capacity(vs.len());
203 for ((name, v), Field { ref ty, .. }) in zip(vs, ty.fields()) {
204 let mut enc = self.with_type(ty);
205 enc.encode(v, dst)
206 .with_context(|| format!("failed to encode `{name}` field"))?;
207 deferred.push(enc.deferred);
208 }
209 if deferred.iter().any(Option::is_some) {
210 self.deferred = Some(Box::new(|w| Box::pin(write_deferred(w, deferred))));
211 }
212 Ok(())
213 }
214 (Val::Tuple(vs), Type::Tuple(ty)) => {
215 dst.reserve(vs.len());
216 let mut deferred = Vec::with_capacity(vs.len());
217 for (v, ref ty) in zip(vs, ty.types()) {
218 let mut enc = self.with_type(ty);
219 enc.encode(v, dst)
220 .context("failed to encode tuple element")?;
221 deferred.push(enc.deferred);
222 }
223 if deferred.iter().any(Option::is_some) {
224 self.deferred = Some(Box::new(|w| Box::pin(write_deferred(w, deferred))));
225 }
226 Ok(())
227 }
228 (Val::Variant(discriminant, v), Type::Variant(ty)) => {
229 let cases = ty.cases();
230 let ty = match cases.len() {
231 ..=0x0000_00ff => {
232 let (discriminant, ty) =
233 find_variant_discriminant(0u8.., cases, discriminant)?;
234 dst.reserve(2 + usize::from(v.is_some()));
235 Leb128Encoder.encode(discriminant, dst)?;
236 ty
237 }
238 0x0000_0100..=0x0000_ffff => {
239 let (discriminant, ty) =
240 find_variant_discriminant(0u16.., cases, discriminant)?;
241 dst.reserve(3 + usize::from(v.is_some()));
242 Leb128Encoder.encode(discriminant, dst)?;
243 ty
244 }
245 0x0001_0000..=0x00ff_ffff => {
246 let (discriminant, ty) =
247 find_variant_discriminant(0u32.., cases, discriminant)?;
248 dst.reserve(4 + usize::from(v.is_some()));
249 Leb128Encoder.encode(discriminant, dst)?;
250 ty
251 }
252 0x0100_0000..=0xffff_ffff => {
253 let (discriminant, ty) =
254 find_variant_discriminant(0u32.., cases, discriminant)?;
255 dst.reserve(5 + usize::from(v.is_some()));
256 Leb128Encoder.encode(discriminant, dst)?;
257 ty
258 }
259 0x1_0000_0000.. => bail!("case count does not fit in u32"),
260 };
261 if let Some(v) = v {
262 let ty = ty.context("type missing for variant")?;
263 let mut enc = self.with_type(&ty);
264 enc.encode(v, dst)
265 .context("failed to encode variant value")?;
266 if let Some(f) = enc.deferred {
267 self.deferred = Some(f);
268 }
269 }
270 Ok(())
271 }
272 (Val::Enum(discriminant), Type::Enum(ty)) => {
273 let names = ty.names();
274 match names.len() {
275 ..=0x0000_00ff => {
276 let discriminant = find_enum_discriminant(0u8.., names, discriminant)?;
277 dst.reserve(2);
278 Leb128Encoder.encode(discriminant, dst)?;
279 }
280 0x0000_0100..=0x0000_ffff => {
281 let discriminant = find_enum_discriminant(0u16.., names, discriminant)?;
282 dst.reserve(3);
283 Leb128Encoder.encode(discriminant, dst)?;
284 }
285 0x0001_0000..=0x00ff_ffff => {
286 let discriminant = find_enum_discriminant(0u32.., names, discriminant)?;
287 dst.reserve(4);
288 Leb128Encoder.encode(discriminant, dst)?;
289 }
290 0x0100_0000..=0xffff_ffff => {
291 let discriminant = find_enum_discriminant(0u32.., names, discriminant)?;
292 dst.reserve(5);
293 Leb128Encoder.encode(discriminant, dst)?;
294 }
295 0x1_0000_0000.. => bail!("name count does not fit in u32"),
296 }
297 Ok(())
298 }
299 (Val::Option(None), Type::Option(_)) => {
300 dst.reserve(1);
301 dst.put_u8(0);
302 Ok(())
303 }
304 (Val::Option(Some(v)), Type::Option(ty)) => {
305 dst.reserve(2);
306 dst.put_u8(1);
307 let ty = ty.ty();
308 let mut enc = self.with_type(&ty);
309 enc.encode(v, dst)
310 .context("failed to encode `option::some` value")?;
311 if let Some(f) = enc.deferred {
312 self.deferred = Some(f);
313 }
314 Ok(())
315 }
316 (Val::Result(v), Type::Result(ty)) => match v {
317 Ok(v) => match (v, ty.ok()) {
318 (Some(v), Some(ty)) => {
319 dst.reserve(2);
320 dst.put_u8(0);
321 let mut enc = self.with_type(&ty);
322 enc.encode(v, dst)
323 .context("failed to encode `result::ok` value")?;
324 if let Some(f) = enc.deferred {
325 self.deferred = Some(f);
326 }
327 Ok(())
328 }
329 (Some(_v), None) => bail!("`result::ok` value of unknown type"),
330 (None, Some(_ty)) => bail!("`result::ok` value missing"),
331 (None, None) => {
332 dst.reserve(1);
333 dst.put_u8(0);
334 Ok(())
335 }
336 },
337 Err(v) => match (v, ty.err()) {
338 (Some(v), Some(ty)) => {
339 dst.reserve(2);
340 dst.put_u8(1);
341 let mut enc = self.with_type(&ty);
342 enc.encode(v, dst)
343 .context("failed to encode `result::err` value")?;
344 if let Some(f) = enc.deferred {
345 self.deferred = Some(f);
346 }
347 Ok(())
348 }
349 (Some(_v), None) => bail!("`result::err` value of unknown type"),
350 (None, Some(_ty)) => bail!("`result::err` value missing"),
351 (None, None) => {
352 dst.reserve(1);
353 dst.put_u8(1);
354 Ok(())
355 }
356 },
357 },
358 (Val::Flags(vs), Type::Flags(ty)) => {
359 let names = ty.names();
360 let vs = vs.iter().map(String::as_str);
361 match names.len() {
362 ..=8 => {
363 dst.reserve(1);
364 dst.put_u8(flag_bits(names, vs));
365 }
366 9..=16 => {
367 dst.reserve(2);
368 dst.put_u16_le(flag_bits(names, vs));
369 }
370 17..=24 => {
371 dst.reserve(3);
372 dst.put_slice(&u32::to_le_bytes(flag_bits(names, vs))[..3]);
373 }
374 25..=32 => {
375 dst.reserve(4);
376 dst.put_u32_le(flag_bits(names, vs));
377 }
378 33..=40 => {
379 dst.reserve(5);
380 dst.put_slice(&u64::to_le_bytes(flag_bits(names, vs))[..5]);
381 }
382 41..=48 => {
383 dst.reserve(6);
384 dst.put_slice(&u64::to_le_bytes(flag_bits(names, vs))[..6]);
385 }
386 49..=56 => {
387 dst.reserve(7);
388 dst.put_slice(&u64::to_le_bytes(flag_bits(names, vs))[..7]);
389 }
390 57..=64 => {
391 dst.reserve(8);
392 dst.put_u64_le(flag_bits(names, vs));
393 }
394 65..=72 => {
395 dst.reserve(9);
396 dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..9]);
397 }
398 73..=80 => {
399 dst.reserve(10);
400 dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..10]);
401 }
402 81..=88 => {
403 dst.reserve(11);
404 dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..11]);
405 }
406 89..=96 => {
407 dst.reserve(12);
408 dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..12]);
409 }
410 97..=104 => {
411 dst.reserve(13);
412 dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..13]);
413 }
414 105..=112 => {
415 dst.reserve(14);
416 dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..14]);
417 }
418 113..=120 => {
419 dst.reserve(15);
420 dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..15]);
421 }
422 121..=128 => {
423 dst.reserve(16);
424 dst.put_u128_le(flag_bits(names, vs));
425 }
426 bits @ 129.. => {
427 let mut cap = bits / 8;
428 if bits % 8 != 0 {
429 cap = cap.saturating_add(1);
430 }
431 let mut buf = vec![0; cap];
432 let flags: HashSet<&str> = vs.into_iter().collect();
433 for (i, name) in names.enumerate() {
434 if flags.contains(name) {
435 buf[i / 8] |= 1 << (i % 8);
436 }
437 }
438 dst.extend_from_slice(&buf);
439 }
440 }
441 Ok(())
442 }
443 (Val::Resource(resource), Type::Own(ty) | Type::Borrow(ty)) => {
444 if *ty == ResourceType::host::<DynInputStream>() {
445 let stream = resource
446 .try_into_resource::<DynInputStream>(&mut self.store)
447 .context("failed to downcast `wasi:io/input-stream`")?;
448 if stream.owned() {
449 let mut stream = self
450 .store
451 .data_mut()
452 .wrpc()
453 .table
454 .delete(stream)
455 .context("failed to delete input stream")?;
456 self.deferred = Some(Box::new(|w| {
457 Box::pin(async move {
458 let mut w = pin!(w);
459 loop {
460 stream.ready().await;
461 match stream.read(8096) {
462 Ok(buf) => {
463 let mut chunk = BytesMut::with_capacity(
464 buf.len().saturating_add(5),
465 );
466 CoreVecEncoderBytes
467 .encode(buf, &mut chunk)
468 .context("failed to encode input stream chunk")?;
469 w.write_all(&chunk).await?;
470 }
471 Err(StreamError::Closed) => {
472 w.write_all(&[0x00]).await?;
473 }
474 Err(err) => return Err(err.into()),
475 }
476 }
477 })
478 }));
479 } else {
480 self.store
481 .data_mut()
482 .wrpc()
483 .table
484 .get_mut(&stream)
485 .context("failed to get input stream")?;
486 bail!("encoding borrowed `wasi:io/input-stream` not supported yet");
490 };
491 Ok(())
492 } else if resource.ty() == ResourceType::host::<RemoteResource>() {
493 let resource = resource
494 .try_into_resource(&mut self.store)
495 .context("resource type mismatch")?;
496 let table = self.store.data_mut().wrpc().table;
497 if resource.owned() {
498 let RemoteResource(buf) = table
499 .delete(resource)
500 .context("failed to delete remote resource")?;
501 CoreVecEncoderBytes
502 .encode(buf, dst)
503 .context("failed to encode resource handle")
504 } else {
505 let RemoteResource(buf) = table
506 .get(&resource)
507 .context("failed to get remote resource")?;
508 CoreVecEncoderBytes
509 .encode(buf, dst)
510 .context("failed to encode resource handle")
511 }
512 } else if self.resources.contains(ty) {
513 let id = Uuid::now_v7();
514 CoreVecEncoderBytes
515 .encode(id.to_bytes_le().as_slice(), dst)
516 .context("failed to encode resource handle")?;
517 trace!(?id, "store shared resource");
518 if self
519 .store
520 .data_mut()
521 .wrpc()
522 .ctx
523 .shared_resources()
524 .0
525 .insert(id, *resource)
526 .is_some()
527 {
528 error!(?id, "duplicate resource ID generated");
529 }
530 Ok(())
531 } else {
532 bail!("encoding host resources not supported yet")
533 }
534 }
535
536 (_, Type::Future(..) | Type::Stream(..) | Type::ErrorContext) => {
537 bail!("async not supported")
538 }
539 _ => bail!("value type mismatch"),
540 }
541 }
542}
543
544#[inline]
545async fn read_flags(n: usize, r: &mut (impl AsyncRead + Unpin)) -> std::io::Result<u128> {
546 let mut buf = 0u128.to_le_bytes();
547 r.read_exact(&mut buf[..n]).await?;
548 Ok(u128::from_le_bytes(buf))
549}
550
551#[instrument(level = "trace", skip_all, fields(ty, path))]
553pub async fn read_value<T, R>(
554 store: &mut impl AsContextMut<Data = T>,
555 r: &mut Pin<&mut R>,
556 resources: &[ResourceType],
557 val: &mut Val,
558 ty: &Type,
559 path: &[usize],
560) -> std::io::Result<()>
561where
562 T: WrpcView + 'static,
563 R: AsyncRead + wrpc_transport::Index<R> + Send + Unpin + 'static,
564{
565 match ty {
566 Type::Bool => {
567 let v = r.read_bool().await?;
568 *val = Val::Bool(v);
569 Ok(())
570 }
571 Type::S8 => {
572 let v = r.read_i8().await?;
573 *val = Val::S8(v);
574 Ok(())
575 }
576 Type::U8 => {
577 let v = r.read_u8().await?;
578 *val = Val::U8(v);
579 Ok(())
580 }
581 Type::S16 => {
582 let v = r.read_i16_leb128().await?;
583 *val = Val::S16(v);
584 Ok(())
585 }
586 Type::U16 => {
587 let v = r.read_u16_leb128().await?;
588 *val = Val::U16(v);
589 Ok(())
590 }
591 Type::S32 => {
592 let v = r.read_i32_leb128().await?;
593 *val = Val::S32(v);
594 Ok(())
595 }
596 Type::U32 => {
597 let v = r.read_u32_leb128().await?;
598 *val = Val::U32(v);
599 Ok(())
600 }
601 Type::S64 => {
602 let v = r.read_i64_leb128().await?;
603 *val = Val::S64(v);
604 Ok(())
605 }
606 Type::U64 => {
607 let v = r.read_u64_leb128().await?;
608 *val = Val::U64(v);
609 Ok(())
610 }
611 Type::Float32 => {
612 let v = r.read_f32_le().await?;
613 *val = Val::Float32(v);
614 Ok(())
615 }
616 Type::Float64 => {
617 let v = r.read_f64_le().await?;
618 *val = Val::Float64(v);
619 Ok(())
620 }
621 Type::Char => {
622 let v = r.read_char_utf8().await?;
623 *val = Val::Char(v);
624 Ok(())
625 }
626 Type::String => {
627 let mut s = String::default();
628 r.read_core_name(&mut s).await?;
629 *val = Val::String(s);
630 Ok(())
631 }
632 Type::List(ty) => {
633 let n = r.read_u32_leb128().await?;
634 let n = n.try_into().unwrap_or(usize::MAX);
635 let mut vs = Vec::with_capacity(n);
636 let ty = ty.ty();
637 let mut path = path.to_vec();
638 for i in 0..n {
639 let mut v = Val::Bool(false);
640 path.push(i);
641 trace!(i, "reading list element value");
642 Box::pin(read_value(store, r, resources, &mut v, &ty, &path)).await?;
643 path.pop();
644 vs.push(v);
645 }
646 *val = Val::List(vs);
647 Ok(())
648 }
649 Type::Record(ty) => {
650 let fields = ty.fields();
651 let mut vs = Vec::with_capacity(fields.len());
652 let mut path = path.to_vec();
653 for (i, Field { name, ty }) in fields.enumerate() {
654 let mut v = Val::Bool(false);
655 path.push(i);
656 trace!(i, "reading struct field value");
657 Box::pin(read_value(store, r, resources, &mut v, &ty, &path)).await?;
658 path.pop();
659 vs.push((name.to_string(), v));
660 }
661 *val = Val::Record(vs);
662 Ok(())
663 }
664 Type::Tuple(ty) => {
665 let types = ty.types();
666 let mut vs = Vec::with_capacity(types.len());
667 let mut path = path.to_vec();
668 for (i, ty) in types.enumerate() {
669 let mut v = Val::Bool(false);
670 path.push(i);
671 trace!(i, "reading tuple element value");
672 Box::pin(read_value(store, r, resources, &mut v, &ty, &path)).await?;
673 path.pop();
674 vs.push(v);
675 }
676 *val = Val::Tuple(vs);
677 Ok(())
678 }
679 Type::Variant(ty) => {
680 let discriminant = r.read_u32_leb128().await?;
681 let discriminant = discriminant
682 .try_into()
683 .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
684 let Case { name, ty } = ty.cases().nth(discriminant).ok_or_else(|| {
685 std::io::Error::new(
686 std::io::ErrorKind::InvalidInput,
687 format!("unknown variant discriminant `{discriminant}`"),
688 )
689 })?;
690 let name = name.to_string();
691 if let Some(ty) = ty {
692 let mut v = Val::Bool(false);
693 trace!(variant = name, "reading nested variant value");
694 Box::pin(read_value(store, r, resources, &mut v, &ty, path)).await?;
695 *val = Val::Variant(name, Some(Box::new(v)));
696 } else {
697 *val = Val::Variant(name, None);
698 }
699 Ok(())
700 }
701 Type::Enum(ty) => {
702 let discriminant = r.read_u32_leb128().await?;
703 let discriminant = discriminant
704 .try_into()
705 .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
706 let name = ty.names().nth(discriminant).ok_or_else(|| {
707 std::io::Error::new(
708 std::io::ErrorKind::InvalidInput,
709 format!("unknown enum discriminant `{discriminant}`"),
710 )
711 })?;
712 *val = Val::Enum(name.to_string());
713 Ok(())
714 }
715 Type::Option(ty) => {
716 let ok = r.read_option_status().await?;
717 if ok {
718 let mut v = Val::Bool(false);
719 trace!("reading nested `option::some` value");
720 Box::pin(read_value(store, r, resources, &mut v, &ty.ty(), path)).await?;
721 *val = Val::Option(Some(Box::new(v)));
722 } else {
723 *val = Val::Option(None);
724 }
725 Ok(())
726 }
727 Type::Result(ty) => {
728 let ok = r.read_result_status().await?;
729 if ok {
730 if let Some(ty) = ty.ok() {
731 let mut v = Val::Bool(false);
732 trace!("reading nested `result::ok` value");
733 Box::pin(read_value(store, r, resources, &mut v, &ty, path)).await?;
734 *val = Val::Result(Ok(Some(Box::new(v))));
735 } else {
736 *val = Val::Result(Ok(None));
737 }
738 } else if let Some(ty) = ty.err() {
739 let mut v = Val::Bool(false);
740 trace!("reading nested `result::err` value");
741 Box::pin(read_value(store, r, resources, &mut v, &ty, path)).await?;
742 *val = Val::Result(Err(Some(Box::new(v))));
743 } else {
744 *val = Val::Result(Err(None));
745 }
746 Ok(())
747 }
748 Type::Flags(ty) => {
749 let names = ty.names();
750 let flags = match names.len() {
751 ..=8 => read_flags(1, r).await?,
752 9..=16 => read_flags(2, r).await?,
753 17..=24 => read_flags(3, r).await?,
754 25..=32 => read_flags(4, r).await?,
755 33..=40 => read_flags(5, r).await?,
756 41..=48 => read_flags(6, r).await?,
757 49..=56 => read_flags(7, r).await?,
758 57..=64 => read_flags(8, r).await?,
759 65..=72 => read_flags(9, r).await?,
760 73..=80 => read_flags(10, r).await?,
761 81..=88 => read_flags(11, r).await?,
762 89..=96 => read_flags(12, r).await?,
763 97..=104 => read_flags(13, r).await?,
764 105..=112 => read_flags(14, r).await?,
765 113..=120 => read_flags(15, r).await?,
766 121..=128 => r.read_u128_le().await?,
767 bits @ 129.. => {
768 let mut cap = bits / 8;
769 if bits % 8 != 0 {
770 cap = cap.saturating_add(1);
771 }
772 let mut buf = vec![0; cap];
773 r.read_exact(&mut buf).await?;
774 let mut vs = Vec::with_capacity(
775 buf.iter()
776 .map(|b| b.count_ones())
777 .sum::<u32>()
778 .try_into()
779 .unwrap_or(usize::MAX),
780 );
781 for (i, name) in names.enumerate() {
782 if buf[i / 8] & (1 << (i % 8)) != 0 {
783 vs.push(name.to_string());
784 }
785 }
786 *val = Val::Flags(vs);
787 return Ok(());
788 }
789 };
790 let mut vs = Vec::with_capacity(flags.count_ones().try_into().unwrap_or(usize::MAX));
791 for (i, name) in zip(0.., names) {
792 if flags & (1 << i) != 0 {
793 vs.push(name.to_string());
794 }
795 }
796 *val = Val::Flags(vs);
797 Ok(())
798 }
799 Type::Own(ty) | Type::Borrow(ty) => {
800 if *ty == ResourceType::host::<DynInputStream>() {
801 let mut store = store.as_context_mut();
802 let r = r.index(path).map_err(std::io::Error::other)?;
803 let res = store
806 .data_mut()
807 .wrpc()
808 .table
809 .push(Box::new(AsyncReadStream::new(
810 FramedRead::new(r, ListDecoderU8::default())
811 .into_async_read()
812 .compat(),
813 )))
814 .map_err(|err| std::io::Error::new(std::io::ErrorKind::OutOfMemory, err))?;
815 let v = res
816 .try_into_resource_any(store)
817 .map_err(std::io::Error::other)?;
818 *val = Val::Resource(v);
819 Ok(())
820 } else if resources.contains(ty) {
821 let mut store = store.as_context_mut();
822 let mut id = uuid::Bytes::default();
823 debug_assert_eq!(id.len(), 16);
824 let n = r.read_u8_leb128().await?;
825 if usize::from(n) != id.len() {
826 return Err(std::io::Error::new(
827 std::io::ErrorKind::InvalidInput,
828 format!(
829 "invalid guest resource handle length {n}, expected {}",
830 id.len()
831 ),
832 ));
833 }
834 let n = r.read_exact(&mut id).await?;
835 if n != id.len() {
836 return Err(std::io::Error::new(
837 std::io::ErrorKind::InvalidInput,
838 format!(
839 "invalid amount of guest resource handle bytes read {n}, expected {}",
840 id.len()
841 ),
842 ));
843 }
844
845 let id = Uuid::from_bytes_le(id);
846 trace!(?id, "lookup shared resource");
847 let resource = store
848 .data_mut()
849 .wrpc()
850 .ctx
851 .shared_resources()
852 .0
853 .get(&id)
854 .ok_or_else(|| std::io::Error::from(std::io::ErrorKind::NotFound))?;
855 *val = Val::Resource(*resource);
856 Ok(())
857 } else {
858 let mut store = store.as_context_mut();
859 let n = r.read_u32_leb128().await?;
860 let n = usize::try_from(n)
861 .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
862 let mut buf = Vec::with_capacity(n);
863 r.read_to_end(&mut buf).await?;
864 let table = store.data_mut().wrpc().table;
865 let resource = table
866 .push(RemoteResource(buf.into()))
867 .map_err(|err| std::io::Error::new(std::io::ErrorKind::OutOfMemory, err))?;
868 let resource = resource
869 .try_into_resource_any(store)
870 .map_err(std::io::Error::other)?;
871 *val = Val::Resource(resource);
872 Ok(())
873 }
874 }
875 Type::Future(..) | Type::Stream(..) | Type::ErrorContext => Err(std::io::Error::new(
876 std::io::ErrorKind::Unsupported,
877 "async not supported",
878 )),
879 }
880}