wasmcloud_runtime/component/
blobstore.rs

1use core::future::Future;
2use core::mem;
3use core::pin::Pin;
4
5use std::collections::VecDeque;
6use std::sync::Arc;
7
8use anyhow::{anyhow, bail, Context as _};
9use async_trait::async_trait;
10use bytes::Bytes;
11use futures::future::OptionFuture;
12use futures::{future, FutureExt, Stream, StreamExt as _};
13use tokio::sync::mpsc;
14use tokio::{join, select, try_join};
15use tokio_stream::wrappers::ReceiverStream;
16use tracing::{debug, instrument};
17use wasmtime::component::Resource;
18use wasmtime_wasi::p2::{DynInputStream, DynOutputStream, Pollable, StreamError, StreamResult};
19use wasmtime_wasi::runtime::AbortOnDropJoinHandle;
20use wrpc_interface_blobstore::bindings;
21
22use crate::capability::blobstore::blobstore::ContainerName;
23use crate::capability::blobstore::container::Container;
24use crate::capability::blobstore::types::{
25    ContainerMetadata, Error, ObjectId, ObjectMetadata, ObjectName,
26};
27use crate::capability::blobstore::{blobstore, container, types};
28use crate::capability::wrpc::wrpc::blobstore::blobstore as blobstore_0_1_0;
29use crate::io::BufferedIncomingStream;
30
31use super::{Ctx, Handler, InvocationErrorIntrospect, InvocationErrorKind, ReplacedInstanceTarget};
32
33/// Maximum chunk size, pretty arbitrary number of bytes that should fit in a single transport
34/// packet. Some profiling is due to figure out the optimal value here.
35/// This should be configurable by users of this crate.
36const MAX_CHUNK_SIZE: usize = 1 << 16;
37
38type Result<T, E = Error> = core::result::Result<T, E>;
39
40async fn invoke_with_fallback<
41    T,
42    Fut: Future<Output = anyhow::Result<T>>,
43    Fut0_1_0: Future<Output = anyhow::Result<T>>,
44>(
45    name: &str,
46    introspect: &impl InvocationErrorIntrospect,
47    f: impl FnOnce() -> Fut,
48    f_0_1_0: impl FnOnce() -> Fut0_1_0,
49) -> anyhow::Result<T> {
50    match f().await {
51        Ok(res) => Ok(res),
52        Err(err) => match introspect.invocation_error_kind(&err) {
53            InvocationErrorKind::NotFound => {
54                debug!(
55                    name,
56                    desired_instance = "wrpc:blobstore/blobstore@0.2.0",
57                    fallback_instance = "wrpc:blobstore/blobstore@0.1.0",
58                    "desired function export not found, fallback to older version"
59                );
60                f_0_1_0().await
61            }
62            InvocationErrorKind::Trap => Err(err),
63        },
64    }
65}
66
67pub struct OutgoingValue {
68    guest: GuestOutgoingValue,
69    host: HostOutgoingValue,
70}
71
72#[derive(Default)]
73pub enum GuestOutgoingValue {
74    #[default]
75    Corrupted,
76    Init(mpsc::Sender<Bytes>),
77}
78
79#[derive(Default)]
80pub enum HostOutgoingValue {
81    #[default]
82    Corrupted,
83    Init(mpsc::Receiver<Bytes>),
84    Writing {
85        status: Pin<Box<dyn Future<Output = Result<(), String>> + Send>>,
86        io: Option<AbortOnDropJoinHandle<anyhow::Result<()>>>,
87    },
88}
89
90pub struct IncomingValue {
91    stream: Pin<Box<dyn Stream<Item = Bytes> + Send>>,
92    status: Pin<Box<dyn Future<Output = Result<(), String>> + Send>>,
93    io: Option<AbortOnDropJoinHandle<anyhow::Result<()>>>,
94}
95
96pub struct StreamObjectNames {
97    stream: BufferedIncomingStream<String>,
98    status: future::Fuse<Pin<Box<dyn Future<Output = Result<(), String>> + Send>>>,
99    io: OptionFuture<future::Fuse<AbortOnDropJoinHandle<anyhow::Result<()>>>>,
100}
101
102impl<H> container::HostContainer for Ctx<H>
103where
104    H: Handler,
105{
106    #[instrument(skip(self))]
107    async fn drop(&mut self, container: Resource<Container>) -> anyhow::Result<()> {
108        self.attach_parent_context();
109        self.table
110            .delete(container)
111            .context("failed to delete container")?;
112        Ok(())
113    }
114
115    #[instrument(skip(self))]
116    async fn name(&mut self, container: Resource<Container>) -> anyhow::Result<Result<String>> {
117        self.attach_parent_context();
118        let name = self
119            .table
120            .get(&container)
121            .context("failed to get container")?;
122        Ok(Ok(name.to_string()))
123    }
124
125    #[instrument(skip(self))]
126    async fn info(
127        &mut self,
128        container: Resource<Container>,
129    ) -> anyhow::Result<Result<ContainerMetadata>> {
130        self.attach_parent_context();
131        let name = self
132            .table
133            .get(&container)
134            .context("failed to get container")?;
135        match invoke_with_fallback(
136            "get-container-info",
137            &self.handler,
138            || {
139                bindings::wrpc::blobstore::blobstore::get_container_info(
140                    &self.handler,
141                    Some(ReplacedInstanceTarget::BlobstoreContainer),
142                    name,
143                )
144            },
145            || {
146                blobstore_0_1_0::get_container_info(
147                    &self.handler,
148                    Some(ReplacedInstanceTarget::BlobstoreContainer),
149                    name,
150                )
151            },
152        )
153        .await?
154        {
155            Ok(bindings::wrpc::blobstore::types::ContainerMetadata { created_at }) => {
156                Ok(Ok(ContainerMetadata {
157                    name: name.to_string(),
158                    created_at,
159                }))
160            }
161            Err(err) => Ok(Err(err)),
162        }
163    }
164
165    #[instrument(skip(self))]
166    async fn get_data(
167        &mut self,
168        container: Resource<Container>,
169        name: ObjectName,
170        start: u64,
171        end: u64,
172    ) -> anyhow::Result<Result<Resource<IncomingValue>>> {
173        self.attach_parent_context();
174        let container = self
175            .table
176            .get(&container)
177            .context("failed to get container")?;
178        let id = bindings::wasi::blobstore::types::ObjectId {
179            container: container.to_string(),
180            object: name,
181        };
182        match invoke_with_fallback(
183            "get-container-data",
184            &self.handler,
185            || async {
186                let (res, io) = bindings::wrpc::blobstore::blobstore::get_container_data(
187                    &self.handler,
188                    Some(ReplacedInstanceTarget::BlobstoreContainer),
189                    &id,
190                    start,
191                    end,
192                )
193                .await?;
194                Ok((res, io.map(wasmtime_wasi::runtime::spawn)))
195            },
196            || async {
197                let (res, io) = blobstore_0_1_0::get_container_data(
198                    &self.handler,
199                    Some(ReplacedInstanceTarget::BlobstoreContainer),
200                    &id,
201                    start,
202                    end,
203                )
204                .await?;
205                Ok((
206                    res.map(|stream| {
207                        (
208                            stream,
209                            Box::pin(async { Ok(()) }) as Pin<Box<dyn Future<Output = _> + Send>>,
210                        )
211                    }),
212                    io.map(wasmtime_wasi::runtime::spawn),
213                ))
214            },
215        )
216        .await?
217        {
218            (Ok((stream, status)), io) => {
219                let value = self
220                    .table
221                    .push(IncomingValue { stream, status, io })
222                    .context("failed to push stream and I/O future")?;
223                Ok(Ok(value))
224            }
225            (Err(err), _) => Ok(Err(err)),
226        }
227    }
228
229    #[instrument(skip(self))]
230    async fn write_data(
231        &mut self,
232        container: Resource<Container>,
233        object: ObjectName,
234        data: Resource<OutgoingValue>,
235    ) -> anyhow::Result<Result<()>> {
236        self.attach_parent_context();
237        let container = self
238            .table
239            .get(&container)
240            .cloned()
241            .context("failed to get container")?;
242        let OutgoingValue { host, .. } = self
243            .table
244            .get_mut(&data)
245            .context("failed to get outgoing value")?;
246        let HostOutgoingValue::Init(mut rx) = mem::take(host) else {
247            bail!("outgoing-value.write-data was already called")
248        };
249        let id = bindings::wrpc::blobstore::types::ObjectId {
250            container: container.to_string(),
251            object,
252        };
253        let (tx, rx_wrpc) = mpsc::channel(128);
254        let (tx_0_1_0, rx_wrpc_0_1_0) = mpsc::channel(128);
255        // Due to the fallback, we cannot directly pass `rx` to the invocation, instead,
256        // spawn a task, which forwards messages to both invocation streams.
257        tokio::spawn(async move {
258            while let Some(item) = rx.recv().await {
259                if let (Err(_), Err(_)) = join!(tx.send(item.clone()), tx_0_1_0.send(item)) {
260                    return;
261                }
262            }
263        });
264        match invoke_with_fallback(
265            "write-container-data",
266            &self.handler,
267            || async {
268                let (res, io) = bindings::wrpc::blobstore::blobstore::write_container_data(
269                    &self.handler,
270                    Some(ReplacedInstanceTarget::BlobstoreContainer),
271                    &id,
272                    Box::pin(ReceiverStream::new(rx_wrpc)),
273                )
274                .await?;
275                Ok((res, io.map(wasmtime_wasi::runtime::spawn)))
276            },
277            || async {
278                let (res, io) = blobstore_0_1_0::write_container_data(
279                    &self.handler,
280                    Some(ReplacedInstanceTarget::BlobstoreContainer),
281                    &id,
282                    Box::pin(ReceiverStream::new(rx_wrpc_0_1_0)),
283                )
284                .await?;
285                Ok((
286                    res.map(|()| {
287                        Box::pin(async { Ok(()) }) as Pin<Box<dyn Future<Output = _> + Send>>
288                    }),
289                    io.map(wasmtime_wasi::runtime::spawn),
290                ))
291            },
292        )
293        .await?
294        {
295            (Ok(status), io) => {
296                *host = HostOutgoingValue::Writing { status, io };
297                Ok(Ok(()))
298            }
299            (Err(err), _) => Ok(Err(err)),
300        }
301    }
302
303    #[instrument(skip(self))]
304    async fn list_objects(
305        &mut self,
306        container: Resource<Container>,
307    ) -> anyhow::Result<Result<Resource<StreamObjectNames>>> {
308        self.attach_parent_context();
309        let container = self
310            .table
311            .get(&container)
312            .context("failed to get container")?;
313        // TODO: implement a stream with limit and offset
314        match invoke_with_fallback(
315            "list-container-objects",
316            &self.handler,
317            || async {
318                let (res, io) = bindings::wrpc::blobstore::blobstore::list_container_objects(
319                    &self.handler,
320                    Some(ReplacedInstanceTarget::BlobstoreContainer),
321                    container,
322                    None,
323                    None,
324                )
325                .await?;
326                Ok((res, io.map(wasmtime_wasi::runtime::spawn)))
327            },
328            || async {
329                let (res, io) = blobstore_0_1_0::list_container_objects(
330                    &self.handler,
331                    Some(ReplacedInstanceTarget::BlobstoreContainer),
332                    container,
333                    None,
334                    None,
335                )
336                .await?;
337                Ok((
338                    res.map(|stream| {
339                        (
340                            stream,
341                            Box::pin(async { Ok(()) }) as Pin<Box<dyn Future<Output = _> + Send>>,
342                        )
343                    }),
344                    io.map(wasmtime_wasi::runtime::spawn),
345                ))
346            },
347        )
348        .await?
349        {
350            (Ok((stream, status)), io) => {
351                let stream = BufferedIncomingStream::new(stream);
352                let status = status.fuse();
353                let io = io.map(FutureExt::fuse).into();
354                let stream = self
355                    .table
356                    .push(StreamObjectNames { stream, status, io })
357                    .context("failed to push object name stream")?;
358                Ok(Ok(stream))
359            }
360            (Err(err), _) => Ok(Err(err)),
361        }
362    }
363
364    #[instrument(skip(self))]
365    async fn delete_object(
366        &mut self,
367        container: Resource<Container>,
368        name: ObjectName,
369    ) -> anyhow::Result<Result<()>> {
370        self.attach_parent_context();
371        self.delete_objects(container, vec![name]).await
372    }
373
374    #[instrument(skip(self))]
375    async fn delete_objects(
376        &mut self,
377        container: Resource<Container>,
378        names: Vec<ObjectName>,
379    ) -> anyhow::Result<Result<()>> {
380        self.attach_parent_context();
381        let container = self
382            .table
383            .get(&container)
384            .context("failed to get container")?;
385        let names = names.iter().map(String::as_str).collect::<Vec<_>>();
386        invoke_with_fallback(
387            "delete-objects",
388            &self.handler,
389            || {
390                bindings::wrpc::blobstore::blobstore::delete_objects(
391                    &self.handler,
392                    Some(ReplacedInstanceTarget::BlobstoreContainer),
393                    container,
394                    &names,
395                )
396            },
397            || {
398                blobstore_0_1_0::delete_objects(
399                    &self.handler,
400                    Some(ReplacedInstanceTarget::BlobstoreContainer),
401                    container,
402                    &names,
403                )
404            },
405        )
406        .await
407    }
408
409    #[instrument(skip(self))]
410    async fn has_object(
411        &mut self,
412        container: Resource<Container>,
413        object: ObjectName,
414    ) -> anyhow::Result<Result<bool>> {
415        self.attach_parent_context();
416        let container = self
417            .table
418            .get(&container)
419            .context("failed to get container")?;
420        let id = bindings::wrpc::blobstore::types::ObjectId {
421            container: container.to_string(),
422            object,
423        };
424        invoke_with_fallback(
425            "has-object",
426            &self.handler,
427            || {
428                bindings::wrpc::blobstore::blobstore::has_object(
429                    &self.handler,
430                    Some(ReplacedInstanceTarget::BlobstoreContainer),
431                    &id,
432                )
433            },
434            || {
435                blobstore_0_1_0::has_object(
436                    &self.handler,
437                    Some(ReplacedInstanceTarget::BlobstoreContainer),
438                    &id,
439                )
440            },
441        )
442        .await
443    }
444
445    #[instrument(skip(self))]
446    async fn object_info(
447        &mut self,
448        container: Resource<Container>,
449        name: ObjectName,
450    ) -> anyhow::Result<Result<ObjectMetadata>> {
451        self.attach_parent_context();
452        let container = self
453            .table
454            .get(&container)
455            .context("failed to get container")?;
456        let id = bindings::wrpc::blobstore::types::ObjectId {
457            container: container.to_string(),
458            object: name.clone(),
459        };
460        match invoke_with_fallback(
461            "get-object-info",
462            &self.handler,
463            || {
464                bindings::wrpc::blobstore::blobstore::get_object_info(
465                    &self.handler,
466                    Some(ReplacedInstanceTarget::BlobstoreContainer),
467                    &id,
468                )
469            },
470            || {
471                blobstore_0_1_0::get_object_info(
472                    &self.handler,
473                    Some(ReplacedInstanceTarget::BlobstoreContainer),
474                    &id,
475                )
476            },
477        )
478        .await?
479        {
480            Ok(bindings::wrpc::blobstore::types::ObjectMetadata { created_at, size }) => {
481                Ok(Ok(ObjectMetadata {
482                    name,
483                    container: container.to_string(),
484                    created_at,
485                    size,
486                }))
487            }
488            Err(err) => Ok(Err(err)),
489        }
490    }
491
492    #[instrument(skip(self))]
493    async fn clear(&mut self, container: Resource<Container>) -> anyhow::Result<Result<()>> {
494        self.attach_parent_context();
495        let container = self
496            .table
497            .get(&container)
498            .context("failed to get container")?;
499        invoke_with_fallback(
500            "clear-container",
501            &self.handler,
502            || {
503                bindings::wrpc::blobstore::blobstore::clear_container(
504                    &self.handler,
505                    Some(ReplacedInstanceTarget::BlobstoreContainer),
506                    container,
507                )
508            },
509            || {
510                blobstore_0_1_0::clear_container(
511                    &self.handler,
512                    Some(ReplacedInstanceTarget::BlobstoreContainer),
513                    container,
514                )
515            },
516        )
517        .await
518    }
519}
520
521impl<H: Handler> container::HostStreamObjectNames for Ctx<H> {
522    #[instrument(skip(self))]
523    async fn drop(&mut self, names: Resource<StreamObjectNames>) -> anyhow::Result<()> {
524        self.attach_parent_context();
525        let _ = self
526            .table
527            .delete(names)
528            .context("failed to delete object name stream")?;
529        Ok(())
530    }
531
532    #[instrument(skip(self))]
533    async fn read_stream_object_names(
534        &mut self,
535        this: Resource<StreamObjectNames>,
536        len: u64,
537    ) -> anyhow::Result<Result<(Vec<ObjectName>, bool)>> {
538        self.attach_parent_context();
539        let StreamObjectNames {
540            stream,
541            ref mut status,
542            ref mut io,
543        } = self
544            .table
545            .get_mut(&this)
546            .context("failed to get object name stream")?;
547        let mut names = Vec::with_capacity(len.try_into().unwrap_or(usize::MAX));
548        for _ in 0..len {
549            select! {
550                biased;
551
552                Some(Err(err)) = &mut *io => {
553                    return Ok(Err(format!("{:#}", err.context("failed to perform async I/O"))))
554                }
555                Err(err) = &mut *status => {
556                    return Ok(Err(err))
557                }
558                item = stream.next() => {
559                    match item {
560                        Some(name) => names.push(name),
561                        None => return Ok(Ok((names, true))),
562                    }
563                }
564            }
565        }
566        Ok(Ok((names, false)))
567    }
568
569    #[instrument(skip(self))]
570    async fn skip_stream_object_names(
571        &mut self,
572        this: Resource<StreamObjectNames>,
573        num: u64,
574    ) -> anyhow::Result<Result<(u64, bool)>> {
575        self.attach_parent_context();
576        let StreamObjectNames { stream, status, io } = self
577            .table
578            .get_mut(&this)
579            .context("failed to get object name stream")?;
580        for i in 0..num {
581            select! {
582                biased;
583
584                Some(Err(err)) = &mut *io => {
585                    return Ok(Err(format!("{:#}", err.context("failed to perform async I/O"))))
586                }
587                Err(err) = &mut *status => {
588                    return Ok(Err(err))
589                }
590                item = stream.next() => {
591                    match item {
592                        Some(_) => {}
593                        None => return Ok(Ok((i, true))),
594                    }
595                }
596            }
597        }
598        Ok(Ok((num, false)))
599    }
600}
601
602#[derive(Default)]
603enum OutputStream {
604    #[default]
605    Corrupted,
606    Pending(mpsc::Sender<Bytes>),
607    Ready(mpsc::OwnedPermit<Bytes>),
608    Error(mpsc::error::SendError<()>),
609}
610
611impl wasmtime_wasi::p2::OutputStream for OutputStream {
612    fn write(&mut self, bytes: Bytes) -> StreamResult<()> {
613        match mem::take(self) {
614            OutputStream::Corrupted => Err(StreamError::Trap(anyhow!(
615                "corrupted output stream memory state"
616            ))),
617            OutputStream::Pending(sender) => {
618                *self = OutputStream::Pending(sender);
619                Err(StreamError::Trap(anyhow!(
620                    "`check_write` was not called prior to calling `write`"
621                )))
622            }
623            OutputStream::Ready(permit) => {
624                let sender = permit.send(bytes);
625                *self = OutputStream::Pending(sender);
626                Ok(())
627            }
628            OutputStream::Error(err) => {
629                *self = OutputStream::Error(err);
630                Err(StreamError::LastOperationFailed(anyhow!("broken pipe")))
631            }
632        }
633    }
634
635    fn flush(&mut self) -> StreamResult<()> {
636        Ok(())
637    }
638
639    fn check_write(&mut self) -> StreamResult<usize> {
640        match self {
641            OutputStream::Corrupted => Err(StreamError::Trap(anyhow!(
642                "corrupted output stream memory state"
643            ))),
644            OutputStream::Pending(..) => Ok(0),
645            OutputStream::Ready(..) => Ok(MAX_CHUNK_SIZE),
646            OutputStream::Error(..) => {
647                Err(StreamError::LastOperationFailed(anyhow!("broken pipe")))
648            }
649        }
650    }
651}
652
653#[async_trait]
654impl Pollable for OutputStream {
655    async fn ready(&mut self) {
656        match mem::take(self) {
657            OutputStream::Corrupted => {}
658            OutputStream::Pending(sender) => match sender.reserve_owned().await {
659                Ok(permit) => *self = OutputStream::Ready(permit),
660                Err(err) => *self = OutputStream::Error(err),
661            },
662            OutputStream::Ready(permit) => *self = OutputStream::Ready(permit),
663            OutputStream::Error(err) => *self = OutputStream::Error(err),
664        }
665    }
666}
667
668impl<H: Handler> types::HostOutgoingValue for Ctx<H> {
669    #[instrument(skip(self))]
670    async fn drop(&mut self, outgoing_value: Resource<OutgoingValue>) -> anyhow::Result<()> {
671        self.attach_parent_context();
672        self.table
673            .delete(outgoing_value)
674            .context("failed to delete outgoing value")?;
675        Ok(())
676    }
677
678    #[instrument(skip(self))]
679    async fn new_outgoing_value(&mut self) -> anyhow::Result<Resource<OutgoingValue>> {
680        self.attach_parent_context();
681        let (tx, rx) = mpsc::channel(128);
682        self.table
683            .push(OutgoingValue {
684                guest: GuestOutgoingValue::Init(tx),
685                host: HostOutgoingValue::Init(rx),
686            })
687            .context("failed to push outgoing value")
688    }
689
690    #[instrument(skip(self))]
691    async fn outgoing_value_write_body(
692        &mut self,
693        outgoing_value: Resource<OutgoingValue>,
694    ) -> anyhow::Result<Result<Resource<DynOutputStream>, ()>> {
695        let OutgoingValue { guest, .. } = self
696            .table
697            .get_mut(&outgoing_value)
698            .context("failed to get outgoing value")?;
699        let GuestOutgoingValue::Init(tx) = mem::take(guest) else {
700            return Ok(Err(()));
701        };
702        let stream: DynOutputStream = Box::new(OutputStream::Pending(tx));
703        let stream = self
704            .table
705            .push_child(stream, &outgoing_value)
706            .context("failed to push output stream")?;
707        Ok(Ok(stream))
708    }
709
710    #[instrument(skip(self), ret)]
711    async fn finish(&mut self, this: Resource<OutgoingValue>) -> anyhow::Result<Result<()>> {
712        let OutgoingValue { host, .. } = self
713            .table
714            .delete(this)
715            .context("failed to delete outgoing value")?;
716        match host {
717            HostOutgoingValue::Corrupted => Ok(Err("corrupted value state".to_string())),
718            HostOutgoingValue::Init(..) => Ok(Ok(())),
719            HostOutgoingValue::Writing { status, io } => Ok(async {
720                try_join!(
721                    async {
722                        if let Some(io) = io {
723                            io.await
724                                .context("I/O task failed")
725                                .map_err(|err| format!("{err:#}"))?;
726                        }
727                        Ok(())
728                    },
729                    status,
730                )?;
731                Ok(())
732            }
733            .await),
734        }
735    }
736}
737
738struct InputStream {
739    ready: VecDeque<Bytes>,
740    stream: Pin<Box<dyn Stream<Item = Bytes> + Send>>,
741    status: future::Fuse<Pin<Box<dyn Future<Output = Result<(), String>> + Send>>>,
742    io: OptionFuture<future::Fuse<AbortOnDropJoinHandle<anyhow::Result<()>>>>,
743    error: Option<StreamError>,
744    closed: bool,
745}
746
747impl wasmtime_wasi::p2::InputStream for InputStream {
748    fn read(&mut self, size: usize) -> StreamResult<Bytes> {
749        if let Some(err) = self.error.take() {
750            return Err(err);
751        }
752        if let Some(mut buf) = self.ready.pop_front() {
753            if buf.len() > size {
754                self.ready.push_front(buf.split_off(size));
755            }
756            Ok(buf)
757        } else if self.closed {
758            Err(StreamError::Closed)
759        } else {
760            Err(StreamError::Trap(anyhow!(
761                "`ready` was not called prior to calling `read`"
762            )))
763        }
764    }
765}
766
767#[async_trait]
768impl Pollable for InputStream {
769    async fn ready(&mut self) {
770        if !self.ready.is_empty() || self.closed {
771            return;
772        }
773        select! {
774            biased;
775
776            Some(Err(err)) = &mut self.io => {
777                self.error = Some(StreamError::LastOperationFailed(err.context("failed to perform async I/O")));
778            }
779            Err(err) = &mut self.status => {
780                self.error = Some(StreamError::LastOperationFailed(anyhow!(err)));
781            }
782            item = self.stream.next() => {
783                if let Some(buf) = item {
784                    self.ready.push_back(buf);
785                } else {
786                    self.closed = true;
787                }
788            }
789        }
790    }
791}
792
793impl<H: Handler> types::HostIncomingValue for Ctx<H> {
794    #[instrument(skip(self))]
795    async fn drop(&mut self, incoming_value: Resource<IncomingValue>) -> anyhow::Result<()> {
796        self.attach_parent_context();
797        let _ = self
798            .table
799            .delete(incoming_value)
800            .context("failed to delete incoming value")?;
801        Ok(())
802    }
803
804    #[instrument(skip(self))]
805    async fn incoming_value_consume_sync(
806        &mut self,
807        incoming_value: Resource<IncomingValue>,
808    ) -> anyhow::Result<Result<Vec<u8>>> {
809        self.attach_parent_context();
810        let IncomingValue { stream, status, io } = self
811            .table
812            .delete(incoming_value)
813            .context("failed to delete incoming value")?;
814        Ok(async {
815            let (buf, (), ()) = try_join!(
816                async {
817                    Ok(stream
818                        .fold(Vec::default(), |mut buf, chunk| async move {
819                            buf.extend_from_slice(&chunk);
820                            buf
821                        })
822                        .await)
823                },
824                status,
825                async {
826                    if let Some(io) = io {
827                        io.await
828                            .context("failed to perform async I/O")
829                            .map_err(|err| format!("{err:#}"))?;
830                    }
831                    Ok(())
832                },
833            )?;
834            Ok(buf)
835        }
836        .await)
837    }
838
839    #[instrument(skip(self))]
840    async fn incoming_value_consume_async(
841        &mut self,
842        incoming_value: Resource<IncomingValue>,
843    ) -> anyhow::Result<Result<Resource<DynInputStream>>> {
844        self.attach_parent_context();
845        let IncomingValue { stream, status, io } = self
846            .table
847            .delete(incoming_value)
848            .context("failed to delete incoming value")?;
849        let stream = self
850            .table
851            .push(Box::new(InputStream {
852                ready: VecDeque::default(),
853                stream,
854                status: status.fuse(),
855                io: io.map(FutureExt::fuse).into(),
856                error: None,
857                closed: false,
858            }) as _)
859            .context("failed to push input stream")?;
860        Ok(Ok(stream))
861    }
862
863    #[instrument(skip_all)]
864    async fn size(&mut self, _incoming_value: Resource<IncomingValue>) -> anyhow::Result<u64> {
865        self.attach_parent_context();
866        bail!("size unknown")
867    }
868}
869
870impl<H: Handler> types::Host for Ctx<H> {}
871
872impl<H> blobstore::Host for Ctx<H>
873where
874    H: Handler,
875{
876    #[instrument(skip(self))]
877    async fn create_container(
878        &mut self,
879        name: ContainerName,
880    ) -> anyhow::Result<Result<Resource<Container>>> {
881        self.attach_parent_context();
882        match invoke_with_fallback(
883            "create-container",
884            &self.handler,
885            || {
886                bindings::wrpc::blobstore::blobstore::create_container(
887                    &self.handler,
888                    Some(ReplacedInstanceTarget::BlobstoreBlobstore),
889                    &name,
890                )
891            },
892            || {
893                blobstore_0_1_0::create_container(
894                    &self.handler,
895                    Some(ReplacedInstanceTarget::BlobstoreBlobstore),
896                    &name,
897                )
898            },
899        )
900        .await?
901        {
902            Ok(()) => {
903                let container = self
904                    .table
905                    .push(Arc::from(name))
906                    .context("failed to push container")?;
907                Ok(Ok(container))
908            }
909            Err(err) => Ok(Err(err)),
910        }
911    }
912
913    #[instrument(skip(self))]
914    async fn get_container(
915        &mut self,
916        name: ContainerName,
917    ) -> anyhow::Result<Result<Resource<Container>>> {
918        self.attach_parent_context();
919        match invoke_with_fallback(
920            "container-exists",
921            &self.handler,
922            || {
923                bindings::wrpc::blobstore::blobstore::container_exists(
924                    &self.handler,
925                    Some(ReplacedInstanceTarget::BlobstoreBlobstore),
926                    &name,
927                )
928            },
929            || {
930                blobstore_0_1_0::container_exists(
931                    &self.handler,
932                    Some(ReplacedInstanceTarget::BlobstoreBlobstore),
933                    &name,
934                )
935            },
936        )
937        .await?
938        {
939            Ok(true) => {
940                let container = self
941                    .table
942                    .push(Arc::from(name))
943                    .context("failed to push container")?;
944                Ok(Ok(container))
945            }
946            Ok(false) => Ok(Err("container does not exist".into())),
947            Err(err) => Ok(Err(err)),
948        }
949    }
950
951    #[instrument(skip(self))]
952    async fn delete_container(&mut self, name: ContainerName) -> anyhow::Result<Result<()>> {
953        self.attach_parent_context();
954        invoke_with_fallback(
955            "delete-container",
956            &self.handler,
957            || {
958                bindings::wrpc::blobstore::blobstore::delete_container(
959                    &self.handler,
960                    Some(ReplacedInstanceTarget::BlobstoreBlobstore),
961                    &name,
962                )
963            },
964            || {
965                blobstore_0_1_0::delete_container(
966                    &self.handler,
967                    Some(ReplacedInstanceTarget::BlobstoreBlobstore),
968                    &name,
969                )
970            },
971        )
972        .await
973    }
974
975    #[instrument(skip(self))]
976    async fn container_exists(&mut self, name: ContainerName) -> anyhow::Result<Result<bool>> {
977        self.attach_parent_context();
978        invoke_with_fallback(
979            "container-exists",
980            &self.handler,
981            || {
982                bindings::wrpc::blobstore::blobstore::container_exists(
983                    &self.handler,
984                    Some(ReplacedInstanceTarget::BlobstoreBlobstore),
985                    &name,
986                )
987            },
988            || {
989                blobstore_0_1_0::container_exists(
990                    &self.handler,
991                    Some(ReplacedInstanceTarget::BlobstoreBlobstore),
992                    &name,
993                )
994            },
995        )
996        .await
997    }
998
999    #[instrument(skip(self))]
1000    async fn copy_object(&mut self, src: ObjectId, dest: ObjectId) -> anyhow::Result<Result<()>> {
1001        self.attach_parent_context();
1002        let src = bindings::wasi::blobstore::types::ObjectId {
1003            container: src.container,
1004            object: src.object,
1005        };
1006        let dest = bindings::wasi::blobstore::types::ObjectId {
1007            container: dest.container,
1008            object: dest.object,
1009        };
1010        invoke_with_fallback(
1011            "copy-object",
1012            &self.handler,
1013            || {
1014                bindings::wrpc::blobstore::blobstore::copy_object(
1015                    &self.handler,
1016                    Some(ReplacedInstanceTarget::BlobstoreBlobstore),
1017                    &src,
1018                    &dest,
1019                )
1020            },
1021            || {
1022                blobstore_0_1_0::copy_object(
1023                    &self.handler,
1024                    Some(ReplacedInstanceTarget::BlobstoreBlobstore),
1025                    &src,
1026                    &dest,
1027                )
1028            },
1029        )
1030        .await
1031    }
1032
1033    #[instrument(skip(self))]
1034    async fn move_object(&mut self, src: ObjectId, dest: ObjectId) -> anyhow::Result<Result<()>> {
1035        self.attach_parent_context();
1036        let src = bindings::wasi::blobstore::types::ObjectId {
1037            container: src.container,
1038            object: src.object,
1039        };
1040        let dest = bindings::wasi::blobstore::types::ObjectId {
1041            container: dest.container,
1042            object: dest.object,
1043        };
1044        invoke_with_fallback(
1045            "move-object",
1046            &self.handler,
1047            || {
1048                bindings::wrpc::blobstore::blobstore::move_object(
1049                    &self.handler,
1050                    Some(ReplacedInstanceTarget::BlobstoreBlobstore),
1051                    &src,
1052                    &dest,
1053                )
1054            },
1055            || {
1056                blobstore_0_1_0::move_object(
1057                    &self.handler,
1058                    Some(ReplacedInstanceTarget::BlobstoreBlobstore),
1059                    &src,
1060                    &dest,
1061                )
1062            },
1063        )
1064        .await
1065    }
1066}
1067
1068impl<H> container::Host for Ctx<H> where H: Handler {}