wasmcloud_provider_blobstore_nats/
blobstore.rs1#![allow(clippy::type_complexity)]
4use anyhow::{Context as _, Result};
5use bytes::Bytes;
6use core::future::Future;
7use core::pin::Pin;
8use futures::{Stream, StreamExt};
9use std::collections::HashMap;
10use std::time::Duration;
11use tokio::sync::mpsc;
12use tokio_stream::wrappers::ReceiverStream;
13use tracing::{info, instrument};
14use wasmcloud_provider_sdk::{propagate_trace_for_ctx, Context};
15
16use wrpc_interface_blobstore::bindings;
18
19impl bindings::exports::wrpc::blobstore::blobstore::Handler<Option<Context>>
20 for crate::NatsBlobstoreProvider
21{
22 #[instrument(level = "debug", skip(self))]
24 async fn create_container(
25 &self,
26 context: Option<Context>,
27 name: String,
28 ) -> anyhow::Result<Result<(), String>> {
29 Ok(async {
30 propagate_trace_for_ctx!(context);
31
32 let blobstore = self.get_blobstore(context).await.context(
34 "failed to get NATS Blobstore connection and container storage configuration",
35 )?;
36
37 let container_config = async_nats::jetstream::object_store::Config {
39 bucket: name,
40 description: Some("NATS Blobstore".to_string()),
41 max_age: blobstore.storage_config.max_age,
42 max_bytes: blobstore.storage_config.max_bytes,
43 storage: blobstore.storage_config.storage_type.0,
44 num_replicas: blobstore.storage_config.num_replicas,
45 compression: blobstore.storage_config.compression,
46 placement: None,
47 };
48
49 blobstore
51 .jetstream
52 .create_object_store(container_config)
53 .await
54 .context("failed to create NATS Blobstore Container")
55 .map(|_| ())
56 }
57 .await
58 .map_err(|err| format!("{err:#}")))
59 }
60
61 #[instrument(level = "trace", skip(self))]
63 async fn get_container_info(
64 &self,
65 context: Option<Context>,
66 name: String,
67 ) -> anyhow::Result<Result<bindings::wrpc::blobstore::types::ContainerMetadata, String>> {
68 Ok(async {
69 propagate_trace_for_ctx!(context);
70
71 let blobstore = self
73 .get_blobstore(context)
74 .await
75 .context("failed to get NATS Blobstore connection")?;
76
77 let _container = blobstore
79 .jetstream
80 .get_object_store(name)
81 .await
82 .context("failed to get container info")?;
83
84 let metadata = bindings::wrpc::blobstore::types::ContainerMetadata {
86 created_at: 0u64, };
88 Ok(metadata)
89 }
90 .await
91 .map_err(|err: anyhow::Error| err.to_string()))
92 }
93
94 #[instrument(level = "debug", skip(self))]
96 async fn container_exists(
97 &self,
98 context: Option<Context>,
99 name: String,
100 ) -> anyhow::Result<Result<bool, String>> {
101 Ok(async {
102 propagate_trace_for_ctx!(context);
103
104 let blobstore = self
106 .get_blobstore(context)
107 .await
108 .map_err(|e| e.to_string())?;
109
110 match blobstore.jetstream.get_object_store(&name).await {
112 Ok(_) => Ok(true),
113 Err(e)
114 if matches!(
115 e.kind(),
116 async_nats::jetstream::context::ObjectStoreErrorKind::GetStore
117 ) =>
118 {
119 Ok(false)
120 }
121 Err(e) => Err(format!("failed to check container existence: {e}")),
122 }
123 }
124 .await
125 .map_err(|err| err.to_string()))
126 }
127
128 #[instrument(level = "debug", skip(self))]
131 async fn get_container_data(
132 &self,
133 context: Option<Context>,
134 id: bindings::wrpc::blobstore::types::ObjectId,
135 start: u64,
136 end: u64,
137 ) -> anyhow::Result<
138 Result<
139 (
140 Pin<Box<dyn Stream<Item = Bytes> + Send>>,
141 Pin<Box<dyn Future<Output = Result<(), String>> + Send>>,
142 ),
143 String,
144 >,
145 > {
146 use tokio::io::AsyncReadExt; Ok(async {
149 propagate_trace_for_ctx!(context);
150
151 let blobstore = self
153 .get_blobstore(context)
154 .await
155 .context("failed to get NATS Blobstore connection")?;
156
157 let container = blobstore
159 .jetstream
160 .get_object_store(&id.container)
161 .await
162 .context("failed to get container")?;
163
164 let mut object = container
166 .get(&id.object)
167 .await
168 .context("failed to get object data")?;
169
170 let (tx, rx) = mpsc::channel(16);
172 anyhow::Ok((
173 Box::pin(ReceiverStream::new(rx)) as Pin<Box<dyn Stream<Item = _> + Send>>,
174 Box::pin(async move {
175 async move {
176 let mut buffer = vec![0; 1024];
178 while let Ok(bytes_read) = object.read(&mut buffer).await {
179 if bytes_read == 0 {
180 break;
181 }
182 let chunk = Bytes::copy_from_slice(&buffer[..bytes_read]);
183 tx.send(chunk).await.context("stream receiver closed")?;
184 }
185 anyhow::Ok(())
186 }
187 .await
188 .map_err(|err| format!("{err:#}"))
189 }) as Pin<Box<dyn Future<Output = _> + Send>>,
190 ))
191 }
192 .await
193 .map_err(|err| format!("{err:#}")))
194 }
195
196 #[instrument(level = "debug", skip(self, data))]
198 async fn write_container_data(
199 &self,
200 context: Option<Context>,
201 id: bindings::wrpc::blobstore::types::ObjectId,
202 data: Pin<Box<dyn Stream<Item = Bytes> + Send>>,
203 ) -> anyhow::Result<Result<Pin<Box<dyn Future<Output = Result<(), String>> + Send>>, String>>
204 {
205 Ok(async {
206 propagate_trace_for_ctx!(context);
207
208 let blobstore = self
209 .get_blobstore(context)
210 .await
211 .context("failed to get NATS Blobstore connection")?;
212
213 let container = blobstore
214 .jetstream
215 .get_object_store(&id.container)
216 .await
217 .context("failed to get container")?;
218
219 let metadata = async_nats::jetstream::object_store::ObjectMetadata {
220 name: id.object.clone(),
221 description: Some("NATS WASI Blobstore Object".to_string()),
222 chunk_size: Some(256 * 1024), headers: None, metadata: HashMap::new(), };
226
227 let result: Result<(), String> = async move {
228 let data = data.map(Ok::<_, std::io::Error>);
229 let mut reader = tokio_util::io::StreamReader::new(data);
230
231 let timeout = Duration::from_secs(self.default_config.max_write_wait.unwrap_or(30));
233
234 tokio::time::timeout(timeout, container.put(metadata, &mut reader))
235 .await
236 .context("operation timed out")
237 .map_err(|e| e.to_string())?
238 .context("failed to write container data")
239 .map_err(|e| e.to_string())?;
240
241 Ok(())
242 }
243 .await;
244
245 Ok(Box::pin(async move { result })
246 as Pin<Box<dyn Future<Output = Result<(), String>> + Send>>)
247 }
248 .await
249 .map_err(|err: anyhow::Error| format!("{err:#}")))
250 }
251
252 #[instrument(level = "debug", skip_all)]
256 async fn list_container_objects(
257 &self,
258 context: Option<Context>,
259 name: String,
260 _offset: Option<u64>,
261 _limit: Option<u64>,
262 ) -> anyhow::Result<
263 Result<
264 (
265 Pin<Box<dyn Stream<Item = Vec<String>> + Send>>,
266 Pin<Box<dyn Future<Output = Result<(), String>> + Send>>,
267 ),
268 String,
269 >,
270 > {
271 Ok(async {
272 propagate_trace_for_ctx!(context);
273
274 let blobstore = self
276 .get_blobstore(context)
277 .await
278 .context("failed to get NATS Blobstore connection")?;
279
280 let container = blobstore
282 .jetstream
283 .get_object_store(&name)
284 .await
285 .context("failed to get container")?;
286
287 let mut objects = container
289 .list()
290 .await
291 .context("failed to list container objects")?;
292
293 let (tx, rx) = tokio::sync::mpsc::channel(16);
295 anyhow::Ok((
296 Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx))
297 as Pin<Box<dyn Stream<Item = Vec<String>> + Send>>,
298 Box::pin(async move {
299 while let Some(object) = objects.next().await {
300 let object = object.map_err(|e| format!("{e:#}"))?;
301 tx.send(vec![object.name])
302 .await
303 .map_err(|e| format!("{e:#}"))?;
304 }
305 Ok(())
306 }) as Pin<Box<dyn Future<Output = Result<(), String>> + Send>>,
307 ))
308 }
309 .await
310 .map_err(|err: anyhow::Error| format!("{err:#}")))
311 }
312
313 #[instrument(level = "debug", skip(self))]
315 async fn clear_container(
316 &self,
317 context: Option<Context>,
318 name: String,
319 ) -> anyhow::Result<Result<(), String>> {
320 Ok(async {
321 propagate_trace_for_ctx!(context);
322
323 let (mut objects_stream, _) = self
325 .list_container_objects(context.clone(), name.clone(), None, None)
326 .await
327 .context("failed to list container objects")
328 .map(|r| r.map_err(|e| anyhow::anyhow!(e)))??;
329
330 let mut all_objects = Vec::new();
332 while let Some(batch) = objects_stream.next().await {
333 all_objects.extend(batch);
334 }
335
336 self.delete_objects(context, name, all_objects)
338 .await
339 .context("failed to delete objects")
340 .map(|_| ())
341 .map_err(|e| anyhow::anyhow!(e))
342 }
343 .await
344 .map_err(|err| format!("{err:#}")))
345 }
346
347 #[instrument(level = "debug", skip(self))]
349 async fn delete_container(
350 &self,
351 context: Option<Context>,
352 name: String,
353 ) -> anyhow::Result<Result<(), String>> {
354 Ok(async {
355 propagate_trace_for_ctx!(context);
356
357 let blobstore = self
359 .get_blobstore(context)
360 .await
361 .context("failed to get NATS Blobstore connection")?;
362
363 blobstore
365 .jetstream
366 .delete_object_store(name)
367 .await
368 .context("failed to delete NATS Blobstore Container")
369 .map(|_| ())
370 }
371 .await
372 .map_err(|err| format!("{err:#}")))
373 }
374
375 #[instrument(level = "debug", skip(self))]
377 async fn has_object(
378 &self,
379 context: Option<Context>,
380 id: bindings::wrpc::blobstore::types::ObjectId,
381 ) -> anyhow::Result<Result<bool, String>> {
382 Ok(async {
383 propagate_trace_for_ctx!(context);
384
385 let blobstore = self
387 .get_blobstore(context)
388 .await
389 .context("failed to get NATS Blobstore connection")?;
390
391 let container = blobstore
393 .jetstream
394 .get_object_store(&id.container)
395 .await
396 .context("failed to get container")?;
397
398 container
400 .info(id.object)
401 .await
402 .context("failed to get object info")
403 .map(|_| true)
404 .map_err(|e| anyhow::anyhow!(e))
405 }
406 .await
407 .map_err(|err| format!("{err:#}")))
408 }
409
410 #[instrument(level = "debug", skip(self))]
412 async fn get_object_info(
413 &self,
414 context: Option<Context>,
415 id: bindings::wrpc::blobstore::types::ObjectId,
416 ) -> anyhow::Result<Result<bindings::wrpc::blobstore::types::ObjectMetadata, String>> {
417 Ok(async {
418 propagate_trace_for_ctx!(context);
419
420 let blobstore = self
422 .get_blobstore(context)
423 .await
424 .context("failed to get NATS Blobstore connection")?;
425
426 let container = blobstore
428 .jetstream
429 .get_object_store(&id.container)
430 .await
431 .context("failed to get container")?;
432
433 container
435 .info(id.object)
436 .await
437 .context("failed to get object info")
438 .map(
439 |object_info| bindings::wrpc::blobstore::types::ObjectMetadata {
440 created_at: 0,
442 size: object_info.size as u64,
443 },
444 )
445 .map_err(|e| anyhow::anyhow!(e))
446 }
447 .await
448 .map_err(|err| format!("{err:#}")))
449 }
450
451 #[instrument(level = "debug", skip(self))]
453 async fn copy_object(
454 &self,
455 context: Option<Context>,
456 source: bindings::wrpc::blobstore::types::ObjectId,
457 destination: bindings::wrpc::blobstore::types::ObjectId,
458 ) -> anyhow::Result<Result<(), String>> {
459 Ok(async {
460 propagate_trace_for_ctx!(context);
461
462 if source.container == destination.container && source.object == destination.object {
464 info!(
465 "skipping copying object '{}' to itself in container '{}'",
466 source.object, source.container
467 );
468 return Ok(());
469 }
470
471 let blobstore = self
473 .get_blobstore(context)
474 .await
475 .context("failed to get NATS Blobstore connection")?;
476
477 let src_container = blobstore
479 .jetstream
480 .get_object_store(&source.container)
481 .await
482 .context("failed to open source NATS Blobstore Container")?;
483
484 let dst_container = blobstore
486 .jetstream
487 .get_object_store(&destination.container)
488 .await
489 .context("failed to open destination NATS Blobstore Container")?;
490
491 let mut src_object = src_container
493 .get(source.object.clone())
494 .await
495 .context("failed to read object from source container")?;
496
497 let metadata = async_nats::jetstream::object_store::ObjectMetadata {
499 name: destination.object.clone(),
500 description: src_object.info.description.clone(),
501 chunk_size: Some(src_object.info.chunks),
502 headers: None, metadata: HashMap::new(), };
505
506 dst_container
508 .put(metadata, &mut src_object)
509 .await
510 .context("failed to copy object to destination container")
511 .map(|_| ())
512 }
513 .await
514 .map_err(|err| format!("{err:#}")))
515 }
516
517 #[instrument(level = "debug", skip(self))]
519 async fn move_object(
520 &self,
521 context: Option<Context>,
522 source: bindings::wrpc::blobstore::types::ObjectId,
523 destination: bindings::wrpc::blobstore::types::ObjectId,
524 ) -> anyhow::Result<Result<(), String>> {
525 Ok(async {
526 propagate_trace_for_ctx!(context);
527
528 if source.container == destination.container && source.object == destination.object {
530 info!(
531 "skipping moving object '{}' to itself in container '{}'",
532 source.object, source.container
533 );
534 return Ok(());
535 }
536
537 let blobstore = self
539 .get_blobstore(context)
540 .await
541 .context("failed to get NATS Blobstore connection")?;
542
543 let src_container = blobstore
545 .jetstream
546 .get_object_store(&source.container)
547 .await
548 .context("failed to open source NATS Blobstore Container")?;
549
550 let dst_container = blobstore
552 .jetstream
553 .get_object_store(&destination.container)
554 .await
555 .context("failed to open destination NATS Blobstore Container")?;
556
557 let mut src_object = src_container
559 .get(source.object.clone())
560 .await
561 .context("failed to read object from source container")?;
562
563 let metadata = async_nats::jetstream::object_store::ObjectMetadata {
565 name: destination.object.clone(),
566 description: src_object.info.description.clone(),
567 chunk_size: Some(src_object.info.chunks),
568 headers: None, metadata: HashMap::new(), };
571
572 dst_container
574 .put(metadata, &mut src_object)
575 .await
576 .context("failed to move object to destination container")?;
577
578 src_container
580 .delete(source.object.clone())
581 .await
582 .context("failed to delete object from source container")
583 .map(|_| ())
584 }
585 .await
586 .map_err(|err| format!("{err:#}")))
587 }
588
589 #[instrument(level = "debug", skip(self))]
591 async fn delete_object(
592 &self,
593 context: Option<Context>,
594 id: bindings::wrpc::blobstore::types::ObjectId,
595 ) -> anyhow::Result<Result<(), String>> {
596 Ok(async {
597 propagate_trace_for_ctx!(context);
598
599 let blobstore = self
601 .get_blobstore(context)
602 .await
603 .map_err(|e| e.to_string())?;
604
605 let container = blobstore
607 .jetstream
608 .get_object_store(&id.container)
609 .await
610 .map_err(|e| e.to_string())?;
611 let result: Result<(), String> =
613 container.delete(id.object).await.map_err(|e| e.to_string());
614
615 result
616 }
617 .await
618 .map_err(|err: String| format!("{err:#}")))
619 }
620
621 #[instrument(level = "trace", skip(self))]
624 async fn delete_objects(
625 &self,
626 context: Option<Context>,
627 name: String,
628 objects: Vec<String>,
629 ) -> anyhow::Result<Result<(), String>> {
630 Ok(async {
631 propagate_trace_for_ctx!(context);
632
633 const BATCH_SIZE: usize = 50;
635 let mut handles = Vec::with_capacity(objects.len());
636
637 for chunk in objects.chunks(BATCH_SIZE) {
639 let mut chunk_handles = chunk
640 .iter()
641 .map(|object| {
642 let ctx = context.clone();
643 let container = name.clone();
644 let object = object.clone();
645 let this = self.clone();
646
647 tokio::spawn(async move {
648 this.delete_object(
649 ctx,
650 bindings::wrpc::blobstore::types::ObjectId { container, object },
651 )
652 .await
653 })
654 })
655 .collect::<Vec<_>>();
656 handles.append(&mut chunk_handles);
657 }
658
659 let results = futures::future::join_all(handles).await;
661
662 let errors: Vec<String> = results
664 .into_iter()
665 .filter_map(|r| match r {
666 Ok(Ok(Ok(()))) => None, Ok(Ok(Err(e))) => Some(e), Ok(Err(e)) => Some(format!("Provider error: {e:#}")), Err(e) => Some(format!("Task join error: {e:#}")), })
671 .collect();
672
673 if errors.is_empty() {
674 Ok(())
675 } else {
676 Err(format!(
677 "Failed to delete some objects: {}",
678 errors.join("; ")
679 ))
680 }
681 }
682 .await
683 .map_err(|err| format!("{err:#}")))
684 }
685}