1use core::future::Future;
2use core::mem;
3use core::pin::Pin;
4
5use std::collections::VecDeque;
6use std::sync::Arc;
7
8use anyhow::{anyhow, bail, Context as _};
9use async_trait::async_trait;
10use bytes::Bytes;
11use futures::future::OptionFuture;
12use futures::{future, FutureExt, Stream, StreamExt as _};
13use tokio::sync::mpsc;
14use tokio::{join, select, try_join};
15use tokio_stream::wrappers::ReceiverStream;
16use tracing::{debug, instrument};
17use wasmtime::component::Resource;
18use wasmtime_wasi::p2::{DynInputStream, DynOutputStream, Pollable, StreamError, StreamResult};
19use wasmtime_wasi::runtime::AbortOnDropJoinHandle;
20use wrpc_interface_blobstore::bindings;
21
22use crate::capability::blobstore::blobstore::ContainerName;
23use crate::capability::blobstore::container::Container;
24use crate::capability::blobstore::types::{
25 ContainerMetadata, Error, ObjectId, ObjectMetadata, ObjectName,
26};
27use crate::capability::blobstore::{blobstore, container, types};
28use crate::capability::wrpc::wrpc::blobstore::blobstore as blobstore_0_1_0;
29use crate::io::BufferedIncomingStream;
30
31use super::{Ctx, Handler, InvocationErrorIntrospect, InvocationErrorKind, ReplacedInstanceTarget};
32
33const MAX_CHUNK_SIZE: usize = 1 << 16;
37
38type Result<T, E = Error> = core::result::Result<T, E>;
39
40async fn invoke_with_fallback<
41 T,
42 Fut: Future<Output = anyhow::Result<T>>,
43 Fut0_1_0: Future<Output = anyhow::Result<T>>,
44>(
45 name: &str,
46 introspect: &impl InvocationErrorIntrospect,
47 f: impl FnOnce() -> Fut,
48 f_0_1_0: impl FnOnce() -> Fut0_1_0,
49) -> anyhow::Result<T> {
50 match f().await {
51 Ok(res) => Ok(res),
52 Err(err) => match introspect.invocation_error_kind(&err) {
53 InvocationErrorKind::NotFound => {
54 debug!(
55 name,
56 desired_instance = "wrpc:blobstore/blobstore@0.2.0",
57 fallback_instance = "wrpc:blobstore/blobstore@0.1.0",
58 "desired function export not found, fallback to older version"
59 );
60 f_0_1_0().await
61 }
62 InvocationErrorKind::Trap => Err(err),
63 },
64 }
65}
66
67pub struct OutgoingValue {
68 guest: GuestOutgoingValue,
69 host: HostOutgoingValue,
70}
71
72#[derive(Default)]
73pub enum GuestOutgoingValue {
74 #[default]
75 Corrupted,
76 Init(mpsc::Sender<Bytes>),
77}
78
79#[derive(Default)]
80pub enum HostOutgoingValue {
81 #[default]
82 Corrupted,
83 Init(mpsc::Receiver<Bytes>),
84 Writing {
85 status: Pin<Box<dyn Future<Output = Result<(), String>> + Send>>,
86 io: Option<AbortOnDropJoinHandle<anyhow::Result<()>>>,
87 },
88}
89
90pub struct IncomingValue {
91 stream: Pin<Box<dyn Stream<Item = Bytes> + Send>>,
92 status: Pin<Box<dyn Future<Output = Result<(), String>> + Send>>,
93 io: Option<AbortOnDropJoinHandle<anyhow::Result<()>>>,
94}
95
96pub struct StreamObjectNames {
97 stream: BufferedIncomingStream<String>,
98 status: future::Fuse<Pin<Box<dyn Future<Output = Result<(), String>> + Send>>>,
99 io: OptionFuture<future::Fuse<AbortOnDropJoinHandle<anyhow::Result<()>>>>,
100}
101
102impl<H> container::HostContainer for Ctx<H>
103where
104 H: Handler,
105{
106 #[instrument(skip(self))]
107 async fn drop(&mut self, container: Resource<Container>) -> anyhow::Result<()> {
108 self.attach_parent_context();
109 self.table
110 .delete(container)
111 .context("failed to delete container")?;
112 Ok(())
113 }
114
115 #[instrument(skip(self))]
116 async fn name(&mut self, container: Resource<Container>) -> anyhow::Result<Result<String>> {
117 self.attach_parent_context();
118 let name = self
119 .table
120 .get(&container)
121 .context("failed to get container")?;
122 Ok(Ok(name.to_string()))
123 }
124
125 #[instrument(skip(self))]
126 async fn info(
127 &mut self,
128 container: Resource<Container>,
129 ) -> anyhow::Result<Result<ContainerMetadata>> {
130 self.attach_parent_context();
131 let name = self
132 .table
133 .get(&container)
134 .context("failed to get container")?;
135 match invoke_with_fallback(
136 "get-container-info",
137 &self.handler,
138 || {
139 bindings::wrpc::blobstore::blobstore::get_container_info(
140 &self.handler,
141 Some(ReplacedInstanceTarget::BlobstoreContainer),
142 name,
143 )
144 },
145 || {
146 blobstore_0_1_0::get_container_info(
147 &self.handler,
148 Some(ReplacedInstanceTarget::BlobstoreContainer),
149 name,
150 )
151 },
152 )
153 .await?
154 {
155 Ok(bindings::wrpc::blobstore::types::ContainerMetadata { created_at }) => {
156 Ok(Ok(ContainerMetadata {
157 name: name.to_string(),
158 created_at,
159 }))
160 }
161 Err(err) => Ok(Err(err)),
162 }
163 }
164
165 #[instrument(skip(self))]
166 async fn get_data(
167 &mut self,
168 container: Resource<Container>,
169 name: ObjectName,
170 start: u64,
171 end: u64,
172 ) -> anyhow::Result<Result<Resource<IncomingValue>>> {
173 self.attach_parent_context();
174 let container = self
175 .table
176 .get(&container)
177 .context("failed to get container")?;
178 let id = bindings::wasi::blobstore::types::ObjectId {
179 container: container.to_string(),
180 object: name,
181 };
182 match invoke_with_fallback(
183 "get-container-data",
184 &self.handler,
185 || async {
186 let (res, io) = bindings::wrpc::blobstore::blobstore::get_container_data(
187 &self.handler,
188 Some(ReplacedInstanceTarget::BlobstoreContainer),
189 &id,
190 start,
191 end,
192 )
193 .await?;
194 Ok((res, io.map(wasmtime_wasi::runtime::spawn)))
195 },
196 || async {
197 let (res, io) = blobstore_0_1_0::get_container_data(
198 &self.handler,
199 Some(ReplacedInstanceTarget::BlobstoreContainer),
200 &id,
201 start,
202 end,
203 )
204 .await?;
205 Ok((
206 res.map(|stream| {
207 (
208 stream,
209 Box::pin(async { Ok(()) }) as Pin<Box<dyn Future<Output = _> + Send>>,
210 )
211 }),
212 io.map(wasmtime_wasi::runtime::spawn),
213 ))
214 },
215 )
216 .await?
217 {
218 (Ok((stream, status)), io) => {
219 let value = self
220 .table
221 .push(IncomingValue { stream, status, io })
222 .context("failed to push stream and I/O future")?;
223 Ok(Ok(value))
224 }
225 (Err(err), _) => Ok(Err(err)),
226 }
227 }
228
229 #[instrument(skip(self))]
230 async fn write_data(
231 &mut self,
232 container: Resource<Container>,
233 object: ObjectName,
234 data: Resource<OutgoingValue>,
235 ) -> anyhow::Result<Result<()>> {
236 self.attach_parent_context();
237 let container = self
238 .table
239 .get(&container)
240 .cloned()
241 .context("failed to get container")?;
242 let OutgoingValue { host, .. } = self
243 .table
244 .get_mut(&data)
245 .context("failed to get outgoing value")?;
246 let HostOutgoingValue::Init(mut rx) = mem::take(host) else {
247 bail!("outgoing-value.write-data was already called")
248 };
249 let id = bindings::wrpc::blobstore::types::ObjectId {
250 container: container.to_string(),
251 object,
252 };
253 let (tx, rx_wrpc) = mpsc::channel(128);
254 let (tx_0_1_0, rx_wrpc_0_1_0) = mpsc::channel(128);
255 tokio::spawn(async move {
258 while let Some(item) = rx.recv().await {
259 if let (Err(_), Err(_)) = join!(tx.send(item.clone()), tx_0_1_0.send(item)) {
260 return;
261 }
262 }
263 });
264 match invoke_with_fallback(
265 "write-container-data",
266 &self.handler,
267 || async {
268 let (res, io) = bindings::wrpc::blobstore::blobstore::write_container_data(
269 &self.handler,
270 Some(ReplacedInstanceTarget::BlobstoreContainer),
271 &id,
272 Box::pin(ReceiverStream::new(rx_wrpc)),
273 )
274 .await?;
275 Ok((res, io.map(wasmtime_wasi::runtime::spawn)))
276 },
277 || async {
278 let (res, io) = blobstore_0_1_0::write_container_data(
279 &self.handler,
280 Some(ReplacedInstanceTarget::BlobstoreContainer),
281 &id,
282 Box::pin(ReceiverStream::new(rx_wrpc_0_1_0)),
283 )
284 .await?;
285 Ok((
286 res.map(|()| {
287 Box::pin(async { Ok(()) }) as Pin<Box<dyn Future<Output = _> + Send>>
288 }),
289 io.map(wasmtime_wasi::runtime::spawn),
290 ))
291 },
292 )
293 .await?
294 {
295 (Ok(status), io) => {
296 *host = HostOutgoingValue::Writing { status, io };
297 Ok(Ok(()))
298 }
299 (Err(err), _) => Ok(Err(err)),
300 }
301 }
302
303 #[instrument(skip(self))]
304 async fn list_objects(
305 &mut self,
306 container: Resource<Container>,
307 ) -> anyhow::Result<Result<Resource<StreamObjectNames>>> {
308 self.attach_parent_context();
309 let container = self
310 .table
311 .get(&container)
312 .context("failed to get container")?;
313 match invoke_with_fallback(
315 "list-container-objects",
316 &self.handler,
317 || async {
318 let (res, io) = bindings::wrpc::blobstore::blobstore::list_container_objects(
319 &self.handler,
320 Some(ReplacedInstanceTarget::BlobstoreContainer),
321 container,
322 None,
323 None,
324 )
325 .await?;
326 Ok((res, io.map(wasmtime_wasi::runtime::spawn)))
327 },
328 || async {
329 let (res, io) = blobstore_0_1_0::list_container_objects(
330 &self.handler,
331 Some(ReplacedInstanceTarget::BlobstoreContainer),
332 container,
333 None,
334 None,
335 )
336 .await?;
337 Ok((
338 res.map(|stream| {
339 (
340 stream,
341 Box::pin(async { Ok(()) }) as Pin<Box<dyn Future<Output = _> + Send>>,
342 )
343 }),
344 io.map(wasmtime_wasi::runtime::spawn),
345 ))
346 },
347 )
348 .await?
349 {
350 (Ok((stream, status)), io) => {
351 let stream = BufferedIncomingStream::new(stream);
352 let status = status.fuse();
353 let io = io.map(FutureExt::fuse).into();
354 let stream = self
355 .table
356 .push(StreamObjectNames { stream, status, io })
357 .context("failed to push object name stream")?;
358 Ok(Ok(stream))
359 }
360 (Err(err), _) => Ok(Err(err)),
361 }
362 }
363
364 #[instrument(skip(self))]
365 async fn delete_object(
366 &mut self,
367 container: Resource<Container>,
368 name: ObjectName,
369 ) -> anyhow::Result<Result<()>> {
370 self.attach_parent_context();
371 self.delete_objects(container, vec![name]).await
372 }
373
374 #[instrument(skip(self))]
375 async fn delete_objects(
376 &mut self,
377 container: Resource<Container>,
378 names: Vec<ObjectName>,
379 ) -> anyhow::Result<Result<()>> {
380 self.attach_parent_context();
381 let container = self
382 .table
383 .get(&container)
384 .context("failed to get container")?;
385 let names = names.iter().map(String::as_str).collect::<Vec<_>>();
386 invoke_with_fallback(
387 "delete-objects",
388 &self.handler,
389 || {
390 bindings::wrpc::blobstore::blobstore::delete_objects(
391 &self.handler,
392 Some(ReplacedInstanceTarget::BlobstoreContainer),
393 container,
394 &names,
395 )
396 },
397 || {
398 blobstore_0_1_0::delete_objects(
399 &self.handler,
400 Some(ReplacedInstanceTarget::BlobstoreContainer),
401 container,
402 &names,
403 )
404 },
405 )
406 .await
407 }
408
409 #[instrument(skip(self))]
410 async fn has_object(
411 &mut self,
412 container: Resource<Container>,
413 object: ObjectName,
414 ) -> anyhow::Result<Result<bool>> {
415 self.attach_parent_context();
416 let container = self
417 .table
418 .get(&container)
419 .context("failed to get container")?;
420 let id = bindings::wrpc::blobstore::types::ObjectId {
421 container: container.to_string(),
422 object,
423 };
424 invoke_with_fallback(
425 "has-object",
426 &self.handler,
427 || {
428 bindings::wrpc::blobstore::blobstore::has_object(
429 &self.handler,
430 Some(ReplacedInstanceTarget::BlobstoreContainer),
431 &id,
432 )
433 },
434 || {
435 blobstore_0_1_0::has_object(
436 &self.handler,
437 Some(ReplacedInstanceTarget::BlobstoreContainer),
438 &id,
439 )
440 },
441 )
442 .await
443 }
444
445 #[instrument(skip(self))]
446 async fn object_info(
447 &mut self,
448 container: Resource<Container>,
449 name: ObjectName,
450 ) -> anyhow::Result<Result<ObjectMetadata>> {
451 self.attach_parent_context();
452 let container = self
453 .table
454 .get(&container)
455 .context("failed to get container")?;
456 let id = bindings::wrpc::blobstore::types::ObjectId {
457 container: container.to_string(),
458 object: name.clone(),
459 };
460 match invoke_with_fallback(
461 "get-object-info",
462 &self.handler,
463 || {
464 bindings::wrpc::blobstore::blobstore::get_object_info(
465 &self.handler,
466 Some(ReplacedInstanceTarget::BlobstoreContainer),
467 &id,
468 )
469 },
470 || {
471 blobstore_0_1_0::get_object_info(
472 &self.handler,
473 Some(ReplacedInstanceTarget::BlobstoreContainer),
474 &id,
475 )
476 },
477 )
478 .await?
479 {
480 Ok(bindings::wrpc::blobstore::types::ObjectMetadata { created_at, size }) => {
481 Ok(Ok(ObjectMetadata {
482 name,
483 container: container.to_string(),
484 created_at,
485 size,
486 }))
487 }
488 Err(err) => Ok(Err(err)),
489 }
490 }
491
492 #[instrument(skip(self))]
493 async fn clear(&mut self, container: Resource<Container>) -> anyhow::Result<Result<()>> {
494 self.attach_parent_context();
495 let container = self
496 .table
497 .get(&container)
498 .context("failed to get container")?;
499 invoke_with_fallback(
500 "clear-container",
501 &self.handler,
502 || {
503 bindings::wrpc::blobstore::blobstore::clear_container(
504 &self.handler,
505 Some(ReplacedInstanceTarget::BlobstoreContainer),
506 container,
507 )
508 },
509 || {
510 blobstore_0_1_0::clear_container(
511 &self.handler,
512 Some(ReplacedInstanceTarget::BlobstoreContainer),
513 container,
514 )
515 },
516 )
517 .await
518 }
519}
520
521impl<H: Handler> container::HostStreamObjectNames for Ctx<H> {
522 #[instrument(skip(self))]
523 async fn drop(&mut self, names: Resource<StreamObjectNames>) -> anyhow::Result<()> {
524 self.attach_parent_context();
525 let _ = self
526 .table
527 .delete(names)
528 .context("failed to delete object name stream")?;
529 Ok(())
530 }
531
532 #[instrument(skip(self))]
533 async fn read_stream_object_names(
534 &mut self,
535 this: Resource<StreamObjectNames>,
536 len: u64,
537 ) -> anyhow::Result<Result<(Vec<ObjectName>, bool)>> {
538 self.attach_parent_context();
539 let StreamObjectNames {
540 stream,
541 ref mut status,
542 ref mut io,
543 } = self
544 .table
545 .get_mut(&this)
546 .context("failed to get object name stream")?;
547 let mut names = Vec::with_capacity(len.try_into().unwrap_or(usize::MAX));
548 for _ in 0..len {
549 select! {
550 biased;
551
552 Some(Err(err)) = &mut *io => {
553 return Ok(Err(format!("{:#}", err.context("failed to perform async I/O"))))
554 }
555 Err(err) = &mut *status => {
556 return Ok(Err(err))
557 }
558 item = stream.next() => {
559 match item {
560 Some(name) => names.push(name),
561 None => return Ok(Ok((names, true))),
562 }
563 }
564 }
565 }
566 Ok(Ok((names, false)))
567 }
568
569 #[instrument(skip(self))]
570 async fn skip_stream_object_names(
571 &mut self,
572 this: Resource<StreamObjectNames>,
573 num: u64,
574 ) -> anyhow::Result<Result<(u64, bool)>> {
575 self.attach_parent_context();
576 let StreamObjectNames { stream, status, io } = self
577 .table
578 .get_mut(&this)
579 .context("failed to get object name stream")?;
580 for i in 0..num {
581 select! {
582 biased;
583
584 Some(Err(err)) = &mut *io => {
585 return Ok(Err(format!("{:#}", err.context("failed to perform async I/O"))))
586 }
587 Err(err) = &mut *status => {
588 return Ok(Err(err))
589 }
590 item = stream.next() => {
591 match item {
592 Some(_) => {}
593 None => return Ok(Ok((i, true))),
594 }
595 }
596 }
597 }
598 Ok(Ok((num, false)))
599 }
600}
601
602#[derive(Default)]
603enum OutputStream {
604 #[default]
605 Corrupted,
606 Pending(mpsc::Sender<Bytes>),
607 Ready(mpsc::OwnedPermit<Bytes>),
608 Error(mpsc::error::SendError<()>),
609}
610
611impl wasmtime_wasi::p2::OutputStream for OutputStream {
612 fn write(&mut self, bytes: Bytes) -> StreamResult<()> {
613 match mem::take(self) {
614 OutputStream::Corrupted => Err(StreamError::Trap(anyhow!(
615 "corrupted output stream memory state"
616 ))),
617 OutputStream::Pending(sender) => {
618 *self = OutputStream::Pending(sender);
619 Err(StreamError::Trap(anyhow!(
620 "`check_write` was not called prior to calling `write`"
621 )))
622 }
623 OutputStream::Ready(permit) => {
624 let sender = permit.send(bytes);
625 *self = OutputStream::Pending(sender);
626 Ok(())
627 }
628 OutputStream::Error(err) => {
629 *self = OutputStream::Error(err);
630 Err(StreamError::LastOperationFailed(anyhow!("broken pipe")))
631 }
632 }
633 }
634
635 fn flush(&mut self) -> StreamResult<()> {
636 Ok(())
637 }
638
639 fn check_write(&mut self) -> StreamResult<usize> {
640 match self {
641 OutputStream::Corrupted => Err(StreamError::Trap(anyhow!(
642 "corrupted output stream memory state"
643 ))),
644 OutputStream::Pending(..) => Ok(0),
645 OutputStream::Ready(..) => Ok(MAX_CHUNK_SIZE),
646 OutputStream::Error(..) => {
647 Err(StreamError::LastOperationFailed(anyhow!("broken pipe")))
648 }
649 }
650 }
651}
652
653#[async_trait]
654impl Pollable for OutputStream {
655 async fn ready(&mut self) {
656 match mem::take(self) {
657 OutputStream::Corrupted => {}
658 OutputStream::Pending(sender) => match sender.reserve_owned().await {
659 Ok(permit) => *self = OutputStream::Ready(permit),
660 Err(err) => *self = OutputStream::Error(err),
661 },
662 OutputStream::Ready(permit) => *self = OutputStream::Ready(permit),
663 OutputStream::Error(err) => *self = OutputStream::Error(err),
664 }
665 }
666}
667
668impl<H: Handler> types::HostOutgoingValue for Ctx<H> {
669 #[instrument(skip(self))]
670 async fn drop(&mut self, outgoing_value: Resource<OutgoingValue>) -> anyhow::Result<()> {
671 self.attach_parent_context();
672 self.table
673 .delete(outgoing_value)
674 .context("failed to delete outgoing value")?;
675 Ok(())
676 }
677
678 #[instrument(skip(self))]
679 async fn new_outgoing_value(&mut self) -> anyhow::Result<Resource<OutgoingValue>> {
680 self.attach_parent_context();
681 let (tx, rx) = mpsc::channel(128);
682 self.table
683 .push(OutgoingValue {
684 guest: GuestOutgoingValue::Init(tx),
685 host: HostOutgoingValue::Init(rx),
686 })
687 .context("failed to push outgoing value")
688 }
689
690 #[instrument(skip(self))]
691 async fn outgoing_value_write_body(
692 &mut self,
693 outgoing_value: Resource<OutgoingValue>,
694 ) -> anyhow::Result<Result<Resource<DynOutputStream>, ()>> {
695 let OutgoingValue { guest, .. } = self
696 .table
697 .get_mut(&outgoing_value)
698 .context("failed to get outgoing value")?;
699 let GuestOutgoingValue::Init(tx) = mem::take(guest) else {
700 return Ok(Err(()));
701 };
702 let stream: DynOutputStream = Box::new(OutputStream::Pending(tx));
703 let stream = self
704 .table
705 .push_child(stream, &outgoing_value)
706 .context("failed to push output stream")?;
707 Ok(Ok(stream))
708 }
709
710 #[instrument(skip(self), ret)]
711 async fn finish(&mut self, this: Resource<OutgoingValue>) -> anyhow::Result<Result<()>> {
712 let OutgoingValue { host, .. } = self
713 .table
714 .delete(this)
715 .context("failed to delete outgoing value")?;
716 match host {
717 HostOutgoingValue::Corrupted => Ok(Err("corrupted value state".to_string())),
718 HostOutgoingValue::Init(..) => Ok(Ok(())),
719 HostOutgoingValue::Writing { status, io } => Ok(async {
720 try_join!(
721 async {
722 if let Some(io) = io {
723 io.await
724 .context("I/O task failed")
725 .map_err(|err| format!("{err:#}"))?;
726 }
727 Ok(())
728 },
729 status,
730 )?;
731 Ok(())
732 }
733 .await),
734 }
735 }
736}
737
738struct InputStream {
739 ready: VecDeque<Bytes>,
740 stream: Pin<Box<dyn Stream<Item = Bytes> + Send>>,
741 status: future::Fuse<Pin<Box<dyn Future<Output = Result<(), String>> + Send>>>,
742 io: OptionFuture<future::Fuse<AbortOnDropJoinHandle<anyhow::Result<()>>>>,
743 error: Option<StreamError>,
744 closed: bool,
745}
746
747impl wasmtime_wasi::p2::InputStream for InputStream {
748 fn read(&mut self, size: usize) -> StreamResult<Bytes> {
749 if let Some(err) = self.error.take() {
750 return Err(err);
751 }
752 if let Some(mut buf) = self.ready.pop_front() {
753 if buf.len() > size {
754 self.ready.push_front(buf.split_off(size));
755 }
756 Ok(buf)
757 } else if self.closed {
758 Err(StreamError::Closed)
759 } else {
760 Err(StreamError::Trap(anyhow!(
761 "`ready` was not called prior to calling `read`"
762 )))
763 }
764 }
765}
766
767#[async_trait]
768impl Pollable for InputStream {
769 async fn ready(&mut self) {
770 if !self.ready.is_empty() || self.closed {
771 return;
772 }
773 select! {
774 biased;
775
776 Some(Err(err)) = &mut self.io => {
777 self.error = Some(StreamError::LastOperationFailed(err.context("failed to perform async I/O")));
778 }
779 Err(err) = &mut self.status => {
780 self.error = Some(StreamError::LastOperationFailed(anyhow!(err)));
781 }
782 item = self.stream.next() => {
783 if let Some(buf) = item {
784 self.ready.push_back(buf);
785 } else {
786 self.closed = true;
787 }
788 }
789 }
790 }
791}
792
793impl<H: Handler> types::HostIncomingValue for Ctx<H> {
794 #[instrument(skip(self))]
795 async fn drop(&mut self, incoming_value: Resource<IncomingValue>) -> anyhow::Result<()> {
796 self.attach_parent_context();
797 let _ = self
798 .table
799 .delete(incoming_value)
800 .context("failed to delete incoming value")?;
801 Ok(())
802 }
803
804 #[instrument(skip(self))]
805 async fn incoming_value_consume_sync(
806 &mut self,
807 incoming_value: Resource<IncomingValue>,
808 ) -> anyhow::Result<Result<Vec<u8>>> {
809 self.attach_parent_context();
810 let IncomingValue { stream, status, io } = self
811 .table
812 .delete(incoming_value)
813 .context("failed to delete incoming value")?;
814 Ok(async {
815 let (buf, (), ()) = try_join!(
816 async {
817 Ok(stream
818 .fold(Vec::default(), |mut buf, chunk| async move {
819 buf.extend_from_slice(&chunk);
820 buf
821 })
822 .await)
823 },
824 status,
825 async {
826 if let Some(io) = io {
827 io.await
828 .context("failed to perform async I/O")
829 .map_err(|err| format!("{err:#}"))?;
830 }
831 Ok(())
832 },
833 )?;
834 Ok(buf)
835 }
836 .await)
837 }
838
839 #[instrument(skip(self))]
840 async fn incoming_value_consume_async(
841 &mut self,
842 incoming_value: Resource<IncomingValue>,
843 ) -> anyhow::Result<Result<Resource<DynInputStream>>> {
844 self.attach_parent_context();
845 let IncomingValue { stream, status, io } = self
846 .table
847 .delete(incoming_value)
848 .context("failed to delete incoming value")?;
849 let stream = self
850 .table
851 .push(Box::new(InputStream {
852 ready: VecDeque::default(),
853 stream,
854 status: status.fuse(),
855 io: io.map(FutureExt::fuse).into(),
856 error: None,
857 closed: false,
858 }) as _)
859 .context("failed to push input stream")?;
860 Ok(Ok(stream))
861 }
862
863 #[instrument(skip_all)]
864 async fn size(&mut self, _incoming_value: Resource<IncomingValue>) -> anyhow::Result<u64> {
865 self.attach_parent_context();
866 bail!("size unknown")
867 }
868}
869
870impl<H: Handler> types::Host for Ctx<H> {}
871
872impl<H> blobstore::Host for Ctx<H>
873where
874 H: Handler,
875{
876 #[instrument(skip(self))]
877 async fn create_container(
878 &mut self,
879 name: ContainerName,
880 ) -> anyhow::Result<Result<Resource<Container>>> {
881 self.attach_parent_context();
882 match invoke_with_fallback(
883 "create-container",
884 &self.handler,
885 || {
886 bindings::wrpc::blobstore::blobstore::create_container(
887 &self.handler,
888 Some(ReplacedInstanceTarget::BlobstoreBlobstore),
889 &name,
890 )
891 },
892 || {
893 blobstore_0_1_0::create_container(
894 &self.handler,
895 Some(ReplacedInstanceTarget::BlobstoreBlobstore),
896 &name,
897 )
898 },
899 )
900 .await?
901 {
902 Ok(()) => {
903 let container = self
904 .table
905 .push(Arc::from(name))
906 .context("failed to push container")?;
907 Ok(Ok(container))
908 }
909 Err(err) => Ok(Err(err)),
910 }
911 }
912
913 #[instrument(skip(self))]
914 async fn get_container(
915 &mut self,
916 name: ContainerName,
917 ) -> anyhow::Result<Result<Resource<Container>>> {
918 self.attach_parent_context();
919 match invoke_with_fallback(
920 "container-exists",
921 &self.handler,
922 || {
923 bindings::wrpc::blobstore::blobstore::container_exists(
924 &self.handler,
925 Some(ReplacedInstanceTarget::BlobstoreBlobstore),
926 &name,
927 )
928 },
929 || {
930 blobstore_0_1_0::container_exists(
931 &self.handler,
932 Some(ReplacedInstanceTarget::BlobstoreBlobstore),
933 &name,
934 )
935 },
936 )
937 .await?
938 {
939 Ok(true) => {
940 let container = self
941 .table
942 .push(Arc::from(name))
943 .context("failed to push container")?;
944 Ok(Ok(container))
945 }
946 Ok(false) => Ok(Err("container does not exist".into())),
947 Err(err) => Ok(Err(err)),
948 }
949 }
950
951 #[instrument(skip(self))]
952 async fn delete_container(&mut self, name: ContainerName) -> anyhow::Result<Result<()>> {
953 self.attach_parent_context();
954 invoke_with_fallback(
955 "delete-container",
956 &self.handler,
957 || {
958 bindings::wrpc::blobstore::blobstore::delete_container(
959 &self.handler,
960 Some(ReplacedInstanceTarget::BlobstoreBlobstore),
961 &name,
962 )
963 },
964 || {
965 blobstore_0_1_0::delete_container(
966 &self.handler,
967 Some(ReplacedInstanceTarget::BlobstoreBlobstore),
968 &name,
969 )
970 },
971 )
972 .await
973 }
974
975 #[instrument(skip(self))]
976 async fn container_exists(&mut self, name: ContainerName) -> anyhow::Result<Result<bool>> {
977 self.attach_parent_context();
978 invoke_with_fallback(
979 "container-exists",
980 &self.handler,
981 || {
982 bindings::wrpc::blobstore::blobstore::container_exists(
983 &self.handler,
984 Some(ReplacedInstanceTarget::BlobstoreBlobstore),
985 &name,
986 )
987 },
988 || {
989 blobstore_0_1_0::container_exists(
990 &self.handler,
991 Some(ReplacedInstanceTarget::BlobstoreBlobstore),
992 &name,
993 )
994 },
995 )
996 .await
997 }
998
999 #[instrument(skip(self))]
1000 async fn copy_object(&mut self, src: ObjectId, dest: ObjectId) -> anyhow::Result<Result<()>> {
1001 self.attach_parent_context();
1002 let src = bindings::wasi::blobstore::types::ObjectId {
1003 container: src.container,
1004 object: src.object,
1005 };
1006 let dest = bindings::wasi::blobstore::types::ObjectId {
1007 container: dest.container,
1008 object: dest.object,
1009 };
1010 invoke_with_fallback(
1011 "copy-object",
1012 &self.handler,
1013 || {
1014 bindings::wrpc::blobstore::blobstore::copy_object(
1015 &self.handler,
1016 Some(ReplacedInstanceTarget::BlobstoreBlobstore),
1017 &src,
1018 &dest,
1019 )
1020 },
1021 || {
1022 blobstore_0_1_0::copy_object(
1023 &self.handler,
1024 Some(ReplacedInstanceTarget::BlobstoreBlobstore),
1025 &src,
1026 &dest,
1027 )
1028 },
1029 )
1030 .await
1031 }
1032
1033 #[instrument(skip(self))]
1034 async fn move_object(&mut self, src: ObjectId, dest: ObjectId) -> anyhow::Result<Result<()>> {
1035 self.attach_parent_context();
1036 let src = bindings::wasi::blobstore::types::ObjectId {
1037 container: src.container,
1038 object: src.object,
1039 };
1040 let dest = bindings::wasi::blobstore::types::ObjectId {
1041 container: dest.container,
1042 object: dest.object,
1043 };
1044 invoke_with_fallback(
1045 "move-object",
1046 &self.handler,
1047 || {
1048 bindings::wrpc::blobstore::blobstore::move_object(
1049 &self.handler,
1050 Some(ReplacedInstanceTarget::BlobstoreBlobstore),
1051 &src,
1052 &dest,
1053 )
1054 },
1055 || {
1056 blobstore_0_1_0::move_object(
1057 &self.handler,
1058 Some(ReplacedInstanceTarget::BlobstoreBlobstore),
1059 &src,
1060 &dest,
1061 )
1062 },
1063 )
1064 .await
1065 }
1066}
1067
1068impl<H> container::Host for Ctx<H> where H: Handler {}