wrpc_runtime_wasmtime/
codec.rs

1use core::future::Future;
2use core::iter::zip;
3use core::ops::{BitOrAssign, Shl};
4use core::pin::{pin, Pin};
5
6use std::collections::HashSet;
7
8use anyhow::{bail, Context as _};
9use bytes::{BufMut as _, BytesMut};
10use futures::stream::FuturesUnordered;
11use futures::TryStreamExt as _;
12use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
13use tokio_util::codec::{Encoder, FramedRead};
14use tokio_util::compat::FuturesAsyncReadCompatExt as _;
15use tracing::{error, instrument, trace, warn};
16use uuid::Uuid;
17use wasm_tokio::cm::AsyncReadValue as _;
18use wasm_tokio::{
19    AsyncReadCore as _, AsyncReadLeb128 as _, AsyncReadUtf8 as _, CoreNameEncoder,
20    CoreVecEncoderBytes, Leb128Encoder, Utf8Codec,
21};
22use wasmtime::component::types::{Case, Field};
23use wasmtime::component::{ResourceType, Type, Val};
24use wasmtime::{AsContextMut, StoreContextMut};
25use wasmtime_wasi::p2::pipe::AsyncReadStream;
26use wasmtime_wasi::p2::{DynInputStream, StreamError};
27use wrpc_transport::ListDecoderU8;
28
29use crate::{RemoteResource, WrpcView};
30
31pub struct ValEncoder<'a, T: 'static, W> {
32    pub store: StoreContextMut<'a, T>,
33    pub ty: &'a Type,
34    pub resources: &'a [ResourceType],
35    pub deferred: Option<
36        Box<dyn FnOnce(W) -> Pin<Box<dyn Future<Output = wasmtime::Result<()>> + Send>> + Send>,
37    >,
38}
39
40impl<T, W> ValEncoder<'_, T, W> {
41    #[must_use]
42    pub fn new<'a>(
43        store: StoreContextMut<'a, T>,
44        ty: &'a Type,
45        resources: &'a [ResourceType],
46    ) -> ValEncoder<'a, T, W> {
47        ValEncoder {
48            store,
49            ty,
50            resources,
51            deferred: None,
52        }
53    }
54
55    pub fn with_type<'a>(&'a mut self, ty: &'a Type) -> ValEncoder<'a, T, W> {
56        ValEncoder {
57            store: self.store.as_context_mut(),
58            ty,
59            resources: self.resources,
60            deferred: None,
61        }
62    }
63}
64
65fn find_enum_discriminant<'a, T>(
66    iter: impl IntoIterator<Item = T>,
67    names: impl IntoIterator<Item = &'a str>,
68    discriminant: &str,
69) -> wasmtime::Result<T> {
70    zip(iter, names)
71        .find_map(|(i, name)| (name == discriminant).then_some(i))
72        .context("unknown enum discriminant")
73}
74
75fn find_variant_discriminant<'a, T>(
76    iter: impl IntoIterator<Item = T>,
77    cases: impl IntoIterator<Item = Case<'a>>,
78    discriminant: &str,
79) -> wasmtime::Result<(T, Option<Type>)> {
80    zip(iter, cases)
81        .find_map(|(i, Case { name, ty })| (name == discriminant).then_some((i, ty)))
82        .context("unknown variant discriminant")
83}
84
85#[inline]
86fn flag_bits<'a, T: BitOrAssign + Shl<u8, Output = T> + From<u8>>(
87    names: impl IntoIterator<Item = &'a str>,
88    flags: impl IntoIterator<Item = &'a str>,
89) -> T {
90    let mut v = T::from(0);
91    let flags: HashSet<&str> = flags.into_iter().collect();
92    for (i, name) in zip(0u8.., names) {
93        if flags.contains(name) {
94            v |= T::from(1) << i;
95        }
96    }
97    v
98}
99
100async fn write_deferred<W, I>(w: W, deferred: I) -> wasmtime::Result<()>
101where
102    W: wrpc_transport::Index<W> + Sync + Send + 'static,
103    I: IntoIterator,
104    I::IntoIter: ExactSizeIterator<
105        Item = Option<
106            Box<dyn FnOnce(W) -> Pin<Box<dyn Future<Output = wasmtime::Result<()>> + Send>> + Send>,
107        >,
108    >,
109{
110    let mut futs: FuturesUnordered<_> = zip(0.., deferred)
111        .filter_map(|(i, f)| f.map(|f| (w.index(&[i]), f)))
112        .map(|(w, f)| async move {
113            let w = w?;
114            f(w).await
115        })
116        .collect();
117    while let Some(()) = futs.try_next().await? {}
118    Ok(())
119}
120
121impl<T, W> Encoder<&Val> for ValEncoder<'_, T, W>
122where
123    T: WrpcView,
124    W: AsyncWrite + wrpc_transport::Index<W> + Sync + Send + 'static,
125{
126    type Error = wasmtime::Error;
127
128    #[allow(clippy::too_many_lines)]
129    #[instrument(level = "trace", skip(self))]
130    fn encode(&mut self, v: &Val, dst: &mut BytesMut) -> Result<(), Self::Error> {
131        match (v, self.ty) {
132            (Val::Bool(v), Type::Bool) => {
133                dst.reserve(1);
134                dst.put_u8((*v).into());
135                Ok(())
136            }
137            (Val::S8(v), Type::S8) => {
138                dst.reserve(1);
139                dst.put_i8(*v);
140                Ok(())
141            }
142            (Val::U8(v), Type::U8) => {
143                dst.reserve(1);
144                dst.put_u8(*v);
145                Ok(())
146            }
147            (Val::S16(v), Type::S16) => Leb128Encoder
148                .encode(*v, dst)
149                .context("failed to encode s16"),
150            (Val::U16(v), Type::U16) => Leb128Encoder
151                .encode(*v, dst)
152                .context("failed to encode u16"),
153            (Val::S32(v), Type::S32) => Leb128Encoder
154                .encode(*v, dst)
155                .context("failed to encode s32"),
156            (Val::U32(v), Type::U32) => Leb128Encoder
157                .encode(*v, dst)
158                .context("failed to encode u32"),
159            (Val::S64(v), Type::S64) => Leb128Encoder
160                .encode(*v, dst)
161                .context("failed to encode s64"),
162            (Val::U64(v), Type::U64) => Leb128Encoder
163                .encode(*v, dst)
164                .context("failed to encode u64"),
165            (Val::Float32(v), Type::Float32) => {
166                dst.reserve(4);
167                dst.put_f32_le(*v);
168                Ok(())
169            }
170            (Val::Float64(v), Type::Float64) => {
171                dst.reserve(8);
172                dst.put_f64_le(*v);
173                Ok(())
174            }
175            (Val::Char(v), Type::Char) => {
176                Utf8Codec.encode(*v, dst).context("failed to encode char")
177            }
178            (Val::String(v), Type::String) => CoreNameEncoder
179                .encode(v.as_str(), dst)
180                .context("failed to encode string"),
181            (Val::List(vs), Type::List(ty)) => {
182                let ty = ty.ty();
183                let n = u32::try_from(vs.len()).context("list length does not fit in u32")?;
184                dst.reserve(5 + vs.len());
185                Leb128Encoder
186                    .encode(n, dst)
187                    .context("failed to encode list length")?;
188                let mut deferred = Vec::with_capacity(vs.len());
189                for v in vs {
190                    let mut enc = self.with_type(&ty);
191                    enc.encode(v, dst)
192                        .context("failed to encode list element")?;
193                    deferred.push(enc.deferred);
194                }
195                if deferred.iter().any(Option::is_some) {
196                    self.deferred = Some(Box::new(|w| Box::pin(write_deferred(w, deferred))));
197                }
198                Ok(())
199            }
200            (Val::Record(vs), Type::Record(ty)) => {
201                dst.reserve(vs.len());
202                let mut deferred = Vec::with_capacity(vs.len());
203                for ((name, v), Field { ref ty, .. }) in zip(vs, ty.fields()) {
204                    let mut enc = self.with_type(ty);
205                    enc.encode(v, dst)
206                        .with_context(|| format!("failed to encode `{name}` field"))?;
207                    deferred.push(enc.deferred);
208                }
209                if deferred.iter().any(Option::is_some) {
210                    self.deferred = Some(Box::new(|w| Box::pin(write_deferred(w, deferred))));
211                }
212                Ok(())
213            }
214            (Val::Tuple(vs), Type::Tuple(ty)) => {
215                dst.reserve(vs.len());
216                let mut deferred = Vec::with_capacity(vs.len());
217                for (v, ref ty) in zip(vs, ty.types()) {
218                    let mut enc = self.with_type(ty);
219                    enc.encode(v, dst)
220                        .context("failed to encode tuple element")?;
221                    deferred.push(enc.deferred);
222                }
223                if deferred.iter().any(Option::is_some) {
224                    self.deferred = Some(Box::new(|w| Box::pin(write_deferred(w, deferred))));
225                }
226                Ok(())
227            }
228            (Val::Variant(discriminant, v), Type::Variant(ty)) => {
229                let cases = ty.cases();
230                let ty = match cases.len() {
231                    ..=0x0000_00ff => {
232                        let (discriminant, ty) =
233                            find_variant_discriminant(0u8.., cases, discriminant)?;
234                        dst.reserve(2 + usize::from(v.is_some()));
235                        Leb128Encoder.encode(discriminant, dst)?;
236                        ty
237                    }
238                    0x0000_0100..=0x0000_ffff => {
239                        let (discriminant, ty) =
240                            find_variant_discriminant(0u16.., cases, discriminant)?;
241                        dst.reserve(3 + usize::from(v.is_some()));
242                        Leb128Encoder.encode(discriminant, dst)?;
243                        ty
244                    }
245                    0x0001_0000..=0x00ff_ffff => {
246                        let (discriminant, ty) =
247                            find_variant_discriminant(0u32.., cases, discriminant)?;
248                        dst.reserve(4 + usize::from(v.is_some()));
249                        Leb128Encoder.encode(discriminant, dst)?;
250                        ty
251                    }
252                    0x0100_0000..=0xffff_ffff => {
253                        let (discriminant, ty) =
254                            find_variant_discriminant(0u32.., cases, discriminant)?;
255                        dst.reserve(5 + usize::from(v.is_some()));
256                        Leb128Encoder.encode(discriminant, dst)?;
257                        ty
258                    }
259                    0x1_0000_0000.. => bail!("case count does not fit in u32"),
260                };
261                if let Some(v) = v {
262                    let ty = ty.context("type missing for variant")?;
263                    let mut enc = self.with_type(&ty);
264                    enc.encode(v, dst)
265                        .context("failed to encode variant value")?;
266                    if let Some(f) = enc.deferred {
267                        self.deferred = Some(f);
268                    }
269                }
270                Ok(())
271            }
272            (Val::Enum(discriminant), Type::Enum(ty)) => {
273                let names = ty.names();
274                match names.len() {
275                    ..=0x0000_00ff => {
276                        let discriminant = find_enum_discriminant(0u8.., names, discriminant)?;
277                        dst.reserve(2);
278                        Leb128Encoder.encode(discriminant, dst)?;
279                    }
280                    0x0000_0100..=0x0000_ffff => {
281                        let discriminant = find_enum_discriminant(0u16.., names, discriminant)?;
282                        dst.reserve(3);
283                        Leb128Encoder.encode(discriminant, dst)?;
284                    }
285                    0x0001_0000..=0x00ff_ffff => {
286                        let discriminant = find_enum_discriminant(0u32.., names, discriminant)?;
287                        dst.reserve(4);
288                        Leb128Encoder.encode(discriminant, dst)?;
289                    }
290                    0x0100_0000..=0xffff_ffff => {
291                        let discriminant = find_enum_discriminant(0u32.., names, discriminant)?;
292                        dst.reserve(5);
293                        Leb128Encoder.encode(discriminant, dst)?;
294                    }
295                    0x1_0000_0000.. => bail!("name count does not fit in u32"),
296                }
297                Ok(())
298            }
299            (Val::Option(None), Type::Option(_)) => {
300                dst.reserve(1);
301                dst.put_u8(0);
302                Ok(())
303            }
304            (Val::Option(Some(v)), Type::Option(ty)) => {
305                dst.reserve(2);
306                dst.put_u8(1);
307                let ty = ty.ty();
308                let mut enc = self.with_type(&ty);
309                enc.encode(v, dst)
310                    .context("failed to encode `option::some` value")?;
311                if let Some(f) = enc.deferred {
312                    self.deferred = Some(f);
313                }
314                Ok(())
315            }
316            (Val::Result(v), Type::Result(ty)) => match v {
317                Ok(v) => match (v, ty.ok()) {
318                    (Some(v), Some(ty)) => {
319                        dst.reserve(2);
320                        dst.put_u8(0);
321                        let mut enc = self.with_type(&ty);
322                        enc.encode(v, dst)
323                            .context("failed to encode `result::ok` value")?;
324                        if let Some(f) = enc.deferred {
325                            self.deferred = Some(f);
326                        }
327                        Ok(())
328                    }
329                    (Some(_v), None) => bail!("`result::ok` value of unknown type"),
330                    (None, Some(_ty)) => bail!("`result::ok` value missing"),
331                    (None, None) => {
332                        dst.reserve(1);
333                        dst.put_u8(0);
334                        Ok(())
335                    }
336                },
337                Err(v) => match (v, ty.err()) {
338                    (Some(v), Some(ty)) => {
339                        dst.reserve(2);
340                        dst.put_u8(1);
341                        let mut enc = self.with_type(&ty);
342                        enc.encode(v, dst)
343                            .context("failed to encode `result::err` value")?;
344                        if let Some(f) = enc.deferred {
345                            self.deferred = Some(f);
346                        }
347                        Ok(())
348                    }
349                    (Some(_v), None) => bail!("`result::err` value of unknown type"),
350                    (None, Some(_ty)) => bail!("`result::err` value missing"),
351                    (None, None) => {
352                        dst.reserve(1);
353                        dst.put_u8(1);
354                        Ok(())
355                    }
356                },
357            },
358            (Val::Flags(vs), Type::Flags(ty)) => {
359                let names = ty.names();
360                let vs = vs.iter().map(String::as_str);
361                match names.len() {
362                    ..=8 => {
363                        dst.reserve(1);
364                        dst.put_u8(flag_bits(names, vs));
365                    }
366                    9..=16 => {
367                        dst.reserve(2);
368                        dst.put_u16_le(flag_bits(names, vs));
369                    }
370                    17..=24 => {
371                        dst.reserve(3);
372                        dst.put_slice(&u32::to_le_bytes(flag_bits(names, vs))[..3]);
373                    }
374                    25..=32 => {
375                        dst.reserve(4);
376                        dst.put_u32_le(flag_bits(names, vs));
377                    }
378                    33..=40 => {
379                        dst.reserve(5);
380                        dst.put_slice(&u64::to_le_bytes(flag_bits(names, vs))[..5]);
381                    }
382                    41..=48 => {
383                        dst.reserve(6);
384                        dst.put_slice(&u64::to_le_bytes(flag_bits(names, vs))[..6]);
385                    }
386                    49..=56 => {
387                        dst.reserve(7);
388                        dst.put_slice(&u64::to_le_bytes(flag_bits(names, vs))[..7]);
389                    }
390                    57..=64 => {
391                        dst.reserve(8);
392                        dst.put_u64_le(flag_bits(names, vs));
393                    }
394                    65..=72 => {
395                        dst.reserve(9);
396                        dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..9]);
397                    }
398                    73..=80 => {
399                        dst.reserve(10);
400                        dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..10]);
401                    }
402                    81..=88 => {
403                        dst.reserve(11);
404                        dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..11]);
405                    }
406                    89..=96 => {
407                        dst.reserve(12);
408                        dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..12]);
409                    }
410                    97..=104 => {
411                        dst.reserve(13);
412                        dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..13]);
413                    }
414                    105..=112 => {
415                        dst.reserve(14);
416                        dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..14]);
417                    }
418                    113..=120 => {
419                        dst.reserve(15);
420                        dst.put_slice(&u128::to_le_bytes(flag_bits(names, vs))[..15]);
421                    }
422                    121..=128 => {
423                        dst.reserve(16);
424                        dst.put_u128_le(flag_bits(names, vs));
425                    }
426                    bits @ 129.. => {
427                        let mut cap = bits / 8;
428                        if bits % 8 != 0 {
429                            cap = cap.saturating_add(1);
430                        }
431                        let mut buf = vec![0; cap];
432                        let flags: HashSet<&str> = vs.into_iter().collect();
433                        for (i, name) in names.enumerate() {
434                            if flags.contains(name) {
435                                buf[i / 8] |= 1 << (i % 8);
436                            }
437                        }
438                        dst.extend_from_slice(&buf);
439                    }
440                }
441                Ok(())
442            }
443            (Val::Resource(resource), Type::Own(ty) | Type::Borrow(ty)) => {
444                if *ty == ResourceType::host::<DynInputStream>() {
445                    let stream = resource
446                        .try_into_resource::<DynInputStream>(&mut self.store)
447                        .context("failed to downcast `wasi:io/input-stream`")?;
448                    if stream.owned() {
449                        let mut stream = self
450                            .store
451                            .data_mut()
452                            .wrpc()
453                            .table
454                            .delete(stream)
455                            .context("failed to delete input stream")?;
456                        self.deferred = Some(Box::new(|w| {
457                            Box::pin(async move {
458                                let mut w = pin!(w);
459                                loop {
460                                    stream.ready().await;
461                                    match stream.read(8096) {
462                                        Ok(buf) => {
463                                            let mut chunk = BytesMut::with_capacity(
464                                                buf.len().saturating_add(5),
465                                            );
466                                            CoreVecEncoderBytes
467                                                .encode(buf, &mut chunk)
468                                                .context("failed to encode input stream chunk")?;
469                                            w.write_all(&chunk).await?;
470                                        }
471                                        Err(StreamError::Closed) => {
472                                            w.write_all(&[0x00]).await?;
473                                        }
474                                        Err(err) => return Err(err.into()),
475                                    }
476                                }
477                            })
478                        }));
479                    } else {
480                        self.store
481                            .data_mut()
482                            .wrpc()
483                            .table
484                            .get_mut(&stream)
485                            .context("failed to get input stream")?;
486                        // NOTE: In order to handle this we'd need to know how many bytes the
487                        // receiver has read. That means that some kind of callback would be required from
488                        // the receiver. This is not trivial and generally should be a very rare use case.
489                        bail!("encoding borrowed `wasi:io/input-stream` not supported yet");
490                    };
491                    Ok(())
492                } else if resource.ty() == ResourceType::host::<RemoteResource>() {
493                    let resource = resource
494                        .try_into_resource(&mut self.store)
495                        .context("resource type mismatch")?;
496                    let table = self.store.data_mut().wrpc().table;
497                    if resource.owned() {
498                        let RemoteResource(buf) = table
499                            .delete(resource)
500                            .context("failed to delete remote resource")?;
501                        CoreVecEncoderBytes
502                            .encode(buf, dst)
503                            .context("failed to encode resource handle")
504                    } else {
505                        let RemoteResource(buf) = table
506                            .get(&resource)
507                            .context("failed to get remote resource")?;
508                        CoreVecEncoderBytes
509                            .encode(buf, dst)
510                            .context("failed to encode resource handle")
511                    }
512                } else if self.resources.contains(ty) {
513                    let id = Uuid::now_v7();
514                    CoreVecEncoderBytes
515                        .encode(id.to_bytes_le().as_slice(), dst)
516                        .context("failed to encode resource handle")?;
517                    trace!(?id, "store shared resource");
518                    if self
519                        .store
520                        .data_mut()
521                        .wrpc()
522                        .ctx
523                        .shared_resources()
524                        .0
525                        .insert(id, *resource)
526                        .is_some()
527                    {
528                        error!(?id, "duplicate resource ID generated");
529                    }
530                    Ok(())
531                } else {
532                    bail!("encoding host resources not supported yet")
533                }
534            }
535
536            (_, Type::Future(..) | Type::Stream(..) | Type::ErrorContext) => {
537                bail!("async not supported")
538            }
539            _ => bail!("value type mismatch"),
540        }
541    }
542}
543
544#[inline]
545async fn read_flags(n: usize, r: &mut (impl AsyncRead + Unpin)) -> std::io::Result<u128> {
546    let mut buf = 0u128.to_le_bytes();
547    r.read_exact(&mut buf[..n]).await?;
548    Ok(u128::from_le_bytes(buf))
549}
550
551/// Read encoded value of type [`Type`] from an [`AsyncRead`] into a [`Val`]
552#[instrument(level = "trace", skip_all, fields(ty, path))]
553pub async fn read_value<T, R>(
554    store: &mut impl AsContextMut<Data = T>,
555    r: &mut Pin<&mut R>,
556    resources: &[ResourceType],
557    val: &mut Val,
558    ty: &Type,
559    path: &[usize],
560) -> std::io::Result<()>
561where
562    T: WrpcView + 'static,
563    R: AsyncRead + wrpc_transport::Index<R> + Send + Unpin + 'static,
564{
565    match ty {
566        Type::Bool => {
567            let v = r.read_bool().await?;
568            *val = Val::Bool(v);
569            Ok(())
570        }
571        Type::S8 => {
572            let v = r.read_i8().await?;
573            *val = Val::S8(v);
574            Ok(())
575        }
576        Type::U8 => {
577            let v = r.read_u8().await?;
578            *val = Val::U8(v);
579            Ok(())
580        }
581        Type::S16 => {
582            let v = r.read_i16_leb128().await?;
583            *val = Val::S16(v);
584            Ok(())
585        }
586        Type::U16 => {
587            let v = r.read_u16_leb128().await?;
588            *val = Val::U16(v);
589            Ok(())
590        }
591        Type::S32 => {
592            let v = r.read_i32_leb128().await?;
593            *val = Val::S32(v);
594            Ok(())
595        }
596        Type::U32 => {
597            let v = r.read_u32_leb128().await?;
598            *val = Val::U32(v);
599            Ok(())
600        }
601        Type::S64 => {
602            let v = r.read_i64_leb128().await?;
603            *val = Val::S64(v);
604            Ok(())
605        }
606        Type::U64 => {
607            let v = r.read_u64_leb128().await?;
608            *val = Val::U64(v);
609            Ok(())
610        }
611        Type::Float32 => {
612            let v = r.read_f32_le().await?;
613            *val = Val::Float32(v);
614            Ok(())
615        }
616        Type::Float64 => {
617            let v = r.read_f64_le().await?;
618            *val = Val::Float64(v);
619            Ok(())
620        }
621        Type::Char => {
622            let v = r.read_char_utf8().await?;
623            *val = Val::Char(v);
624            Ok(())
625        }
626        Type::String => {
627            let mut s = String::default();
628            r.read_core_name(&mut s).await?;
629            *val = Val::String(s);
630            Ok(())
631        }
632        Type::List(ty) => {
633            let n = r.read_u32_leb128().await?;
634            let n = n.try_into().unwrap_or(usize::MAX);
635            let mut vs = Vec::with_capacity(n);
636            let ty = ty.ty();
637            let mut path = path.to_vec();
638            for i in 0..n {
639                let mut v = Val::Bool(false);
640                path.push(i);
641                trace!(i, "reading list element value");
642                Box::pin(read_value(store, r, resources, &mut v, &ty, &path)).await?;
643                path.pop();
644                vs.push(v);
645            }
646            *val = Val::List(vs);
647            Ok(())
648        }
649        Type::Record(ty) => {
650            let fields = ty.fields();
651            let mut vs = Vec::with_capacity(fields.len());
652            let mut path = path.to_vec();
653            for (i, Field { name, ty }) in fields.enumerate() {
654                let mut v = Val::Bool(false);
655                path.push(i);
656                trace!(i, "reading struct field value");
657                Box::pin(read_value(store, r, resources, &mut v, &ty, &path)).await?;
658                path.pop();
659                vs.push((name.to_string(), v));
660            }
661            *val = Val::Record(vs);
662            Ok(())
663        }
664        Type::Tuple(ty) => {
665            let types = ty.types();
666            let mut vs = Vec::with_capacity(types.len());
667            let mut path = path.to_vec();
668            for (i, ty) in types.enumerate() {
669                let mut v = Val::Bool(false);
670                path.push(i);
671                trace!(i, "reading tuple element value");
672                Box::pin(read_value(store, r, resources, &mut v, &ty, &path)).await?;
673                path.pop();
674                vs.push(v);
675            }
676            *val = Val::Tuple(vs);
677            Ok(())
678        }
679        Type::Variant(ty) => {
680            let discriminant = r.read_u32_leb128().await?;
681            let discriminant = discriminant
682                .try_into()
683                .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
684            let Case { name, ty } = ty.cases().nth(discriminant).ok_or_else(|| {
685                std::io::Error::new(
686                    std::io::ErrorKind::InvalidInput,
687                    format!("unknown variant discriminant `{discriminant}`"),
688                )
689            })?;
690            let name = name.to_string();
691            if let Some(ty) = ty {
692                let mut v = Val::Bool(false);
693                trace!(variant = name, "reading nested variant value");
694                Box::pin(read_value(store, r, resources, &mut v, &ty, path)).await?;
695                *val = Val::Variant(name, Some(Box::new(v)));
696            } else {
697                *val = Val::Variant(name, None);
698            }
699            Ok(())
700        }
701        Type::Enum(ty) => {
702            let discriminant = r.read_u32_leb128().await?;
703            let discriminant = discriminant
704                .try_into()
705                .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
706            let name = ty.names().nth(discriminant).ok_or_else(|| {
707                std::io::Error::new(
708                    std::io::ErrorKind::InvalidInput,
709                    format!("unknown enum discriminant `{discriminant}`"),
710                )
711            })?;
712            *val = Val::Enum(name.to_string());
713            Ok(())
714        }
715        Type::Option(ty) => {
716            let ok = r.read_option_status().await?;
717            if ok {
718                let mut v = Val::Bool(false);
719                trace!("reading nested `option::some` value");
720                Box::pin(read_value(store, r, resources, &mut v, &ty.ty(), path)).await?;
721                *val = Val::Option(Some(Box::new(v)));
722            } else {
723                *val = Val::Option(None);
724            }
725            Ok(())
726        }
727        Type::Result(ty) => {
728            let ok = r.read_result_status().await?;
729            if ok {
730                if let Some(ty) = ty.ok() {
731                    let mut v = Val::Bool(false);
732                    trace!("reading nested `result::ok` value");
733                    Box::pin(read_value(store, r, resources, &mut v, &ty, path)).await?;
734                    *val = Val::Result(Ok(Some(Box::new(v))));
735                } else {
736                    *val = Val::Result(Ok(None));
737                }
738            } else if let Some(ty) = ty.err() {
739                let mut v = Val::Bool(false);
740                trace!("reading nested `result::err` value");
741                Box::pin(read_value(store, r, resources, &mut v, &ty, path)).await?;
742                *val = Val::Result(Err(Some(Box::new(v))));
743            } else {
744                *val = Val::Result(Err(None));
745            }
746            Ok(())
747        }
748        Type::Flags(ty) => {
749            let names = ty.names();
750            let flags = match names.len() {
751                ..=8 => read_flags(1, r).await?,
752                9..=16 => read_flags(2, r).await?,
753                17..=24 => read_flags(3, r).await?,
754                25..=32 => read_flags(4, r).await?,
755                33..=40 => read_flags(5, r).await?,
756                41..=48 => read_flags(6, r).await?,
757                49..=56 => read_flags(7, r).await?,
758                57..=64 => read_flags(8, r).await?,
759                65..=72 => read_flags(9, r).await?,
760                73..=80 => read_flags(10, r).await?,
761                81..=88 => read_flags(11, r).await?,
762                89..=96 => read_flags(12, r).await?,
763                97..=104 => read_flags(13, r).await?,
764                105..=112 => read_flags(14, r).await?,
765                113..=120 => read_flags(15, r).await?,
766                121..=128 => r.read_u128_le().await?,
767                bits @ 129.. => {
768                    let mut cap = bits / 8;
769                    if bits % 8 != 0 {
770                        cap = cap.saturating_add(1);
771                    }
772                    let mut buf = vec![0; cap];
773                    r.read_exact(&mut buf).await?;
774                    let mut vs = Vec::with_capacity(
775                        buf.iter()
776                            .map(|b| b.count_ones())
777                            .sum::<u32>()
778                            .try_into()
779                            .unwrap_or(usize::MAX),
780                    );
781                    for (i, name) in names.enumerate() {
782                        if buf[i / 8] & (1 << (i % 8)) != 0 {
783                            vs.push(name.to_string());
784                        }
785                    }
786                    *val = Val::Flags(vs);
787                    return Ok(());
788                }
789            };
790            let mut vs = Vec::with_capacity(flags.count_ones().try_into().unwrap_or(usize::MAX));
791            for (i, name) in zip(0.., names) {
792                if flags & (1 << i) != 0 {
793                    vs.push(name.to_string());
794                }
795            }
796            *val = Val::Flags(vs);
797            Ok(())
798        }
799        Type::Own(ty) | Type::Borrow(ty) => {
800            if *ty == ResourceType::host::<DynInputStream>() {
801                let mut store = store.as_context_mut();
802                let r = r.index(path).map_err(std::io::Error::other)?;
803                // TODO: Implement a custom reader, this approach ignores the stream end (`\0`),
804                // which will could potentially break/hang with some transports
805                let res = store
806                    .data_mut()
807                    .wrpc()
808                    .table
809                    .push(Box::new(AsyncReadStream::new(
810                        FramedRead::new(r, ListDecoderU8::default())
811                            .into_async_read()
812                            .compat(),
813                    )))
814                    .map_err(|err| std::io::Error::new(std::io::ErrorKind::OutOfMemory, err))?;
815                let v = res
816                    .try_into_resource_any(store)
817                    .map_err(std::io::Error::other)?;
818                *val = Val::Resource(v);
819                Ok(())
820            } else if resources.contains(ty) {
821                let mut store = store.as_context_mut();
822                let mut id = uuid::Bytes::default();
823                debug_assert_eq!(id.len(), 16);
824                let n = r.read_u8_leb128().await?;
825                if usize::from(n) != id.len() {
826                    return Err(std::io::Error::new(
827                        std::io::ErrorKind::InvalidInput,
828                        format!(
829                            "invalid guest resource handle length {n}, expected {}",
830                            id.len()
831                        ),
832                    ));
833                }
834                let n = r.read_exact(&mut id).await?;
835                if n != id.len() {
836                    return Err(std::io::Error::new(
837                        std::io::ErrorKind::InvalidInput,
838                        format!(
839                            "invalid amount of guest resource handle bytes read {n}, expected {}",
840                            id.len()
841                        ),
842                    ));
843                }
844
845                let id = Uuid::from_bytes_le(id);
846                trace!(?id, "lookup shared resource");
847                let resource = store
848                    .data_mut()
849                    .wrpc()
850                    .ctx
851                    .shared_resources()
852                    .0
853                    .get(&id)
854                    .ok_or_else(|| std::io::Error::from(std::io::ErrorKind::NotFound))?;
855                *val = Val::Resource(*resource);
856                Ok(())
857            } else {
858                let mut store = store.as_context_mut();
859                let n = r.read_u32_leb128().await?;
860                let n = usize::try_from(n)
861                    .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
862                let mut buf = Vec::with_capacity(n);
863                r.read_to_end(&mut buf).await?;
864                let table = store.data_mut().wrpc().table;
865                let resource = table
866                    .push(RemoteResource(buf.into()))
867                    .map_err(|err| std::io::Error::new(std::io::ErrorKind::OutOfMemory, err))?;
868                let resource = resource
869                    .try_into_resource_any(store)
870                    .map_err(std::io::Error::other)?;
871                *val = Val::Resource(resource);
872                Ok(())
873            }
874        }
875        Type::Future(..) | Type::Stream(..) | Type::ErrorContext => Err(std::io::Error::new(
876            std::io::ErrorKind::Unsupported,
877            "async not supported",
878        )),
879    }
880}