wasmcloud_provider_blobstore_nats/
blobstore.rs

1//! NATS implementation for `wrpc:blobstore/blobstore@0.2.0` interface.
2
3#![allow(clippy::type_complexity)]
4use anyhow::{Context as _, Result};
5use bytes::Bytes;
6use core::future::Future;
7use core::pin::Pin;
8use futures::{Stream, StreamExt};
9use std::collections::HashMap;
10use std::time::Duration;
11use tokio::sync::mpsc;
12use tokio_stream::wrappers::ReceiverStream;
13use tracing::{info, instrument};
14use wasmcloud_provider_sdk::{propagate_trace_for_ctx, Context};
15
16// Import the wrpc interface bindings
17use wrpc_interface_blobstore::bindings;
18
19impl bindings::exports::wrpc::blobstore::blobstore::Handler<Option<Context>>
20    for crate::NatsBlobstoreProvider
21{
22    // Create a new NATS Blobstore Container
23    #[instrument(level = "debug", skip(self))]
24    async fn create_container(
25        &self,
26        context: Option<Context>,
27        name: String,
28    ) -> anyhow::Result<Result<(), String>> {
29        Ok(async {
30            propagate_trace_for_ctx!(context);
31
32            // Create a new NATS Blobstore Container, with provided storage configuration
33            let blobstore = self.get_blobstore(context).await.context(
34                "failed to get NATS Blobstore connection and container storage configuration",
35            )?;
36
37            // Create a NATS Blobstore Container configuration
38            let container_config = async_nats::jetstream::object_store::Config {
39                bucket: name,
40                description: Some("NATS Blobstore".to_string()),
41                max_age: blobstore.storage_config.max_age,
42                max_bytes: blobstore.storage_config.max_bytes,
43                storage: blobstore.storage_config.storage_type.0,
44                num_replicas: blobstore.storage_config.num_replicas,
45                compression: blobstore.storage_config.compression,
46                placement: None,
47            };
48
49            // Create a NATS Blobstore Container
50            blobstore
51                .jetstream
52                .create_object_store(container_config)
53                .await
54                .context("failed to create NATS Blobstore Container")
55                .map(|_| ())
56        }
57        .await
58        .map_err(|err| format!("{err:#}")))
59    }
60
61    // Get metadata of an existing NATS Blobstore Container
62    #[instrument(level = "trace", skip(self))]
63    async fn get_container_info(
64        &self,
65        context: Option<Context>,
66        name: String,
67    ) -> anyhow::Result<Result<bindings::wrpc::blobstore::types::ContainerMetadata, String>> {
68        Ok(async {
69            propagate_trace_for_ctx!(context);
70
71            // Retrieve the blobstore connection
72            let blobstore = self
73                .get_blobstore(context)
74                .await
75                .context("failed to get NATS Blobstore connection")?;
76
77            // Attempt to get the container metadata
78            let _container = blobstore
79                .jetstream
80                .get_object_store(name)
81                .await
82                .context("failed to get container info")?;
83
84            // Construct and return the container metadata
85            let metadata = bindings::wrpc::blobstore::types::ContainerMetadata {
86                created_at: 0u64, // Unix epoch as a placeholder
87            };
88            Ok(metadata)
89        }
90        .await
91        .map_err(|err: anyhow::Error| err.to_string()))
92    }
93
94    // Check if a NATS Blobstore Container exists
95    #[instrument(level = "debug", skip(self))]
96    async fn container_exists(
97        &self,
98        context: Option<Context>,
99        name: String,
100    ) -> anyhow::Result<Result<bool, String>> {
101        Ok(async {
102            propagate_trace_for_ctx!(context);
103
104            // Retrieve the NATS blobstore connection
105            let blobstore = self
106                .get_blobstore(context)
107                .await
108                .map_err(|e| e.to_string())?;
109
110            // Check if the container exists
111            match blobstore.jetstream.get_object_store(&name).await {
112                Ok(_) => Ok(true),
113                Err(e)
114                    if matches!(
115                        e.kind(),
116                        async_nats::jetstream::context::ObjectStoreErrorKind::GetStore
117                    ) =>
118                {
119                    Ok(false)
120                }
121                Err(e) => Err(format!("failed to check container existence: {e}")),
122            }
123        }
124        .await
125        .map_err(|err| err.to_string()))
126    }
127
128    /// Retrieve data from an object in the specified NATS blobstore Container
129    /// Optionally specify start and end byte offsets for partial reads
130    #[instrument(level = "debug", skip(self))]
131    async fn get_container_data(
132        &self,
133        context: Option<Context>,
134        id: bindings::wrpc::blobstore::types::ObjectId,
135        start: u64,
136        end: u64,
137    ) -> anyhow::Result<
138        Result<
139            (
140                Pin<Box<dyn Stream<Item = Bytes> + Send>>,
141                Pin<Box<dyn Future<Output = Result<(), String>> + Send>>,
142            ),
143            String,
144        >,
145    > {
146        use tokio::io::AsyncReadExt; // Import the trait to use `read`
147
148        Ok(async {
149            propagate_trace_for_ctx!(context);
150
151            // Retrieve the NATS blobstore connection
152            let blobstore = self
153                .get_blobstore(context)
154                .await
155                .context("failed to get NATS Blobstore connection")?;
156
157            // Get the container (object store) for the specified container name
158            let container = blobstore
159                .jetstream
160                .get_object_store(&id.container)
161                .await
162                .context("failed to get container")?;
163
164            // Retrieve the object data as a stream
165            let mut object = container
166                .get(&id.object)
167                .await
168                .context("failed to get object data")?;
169
170            // Create a channel to stream the data
171            let (tx, rx) = mpsc::channel(16);
172            anyhow::Ok((
173                Box::pin(ReceiverStream::new(rx)) as Pin<Box<dyn Stream<Item = _> + Send>>,
174                Box::pin(async move {
175                    async move {
176                        // Stream the object data in chunks of 1024 bytes
177                        let mut buffer = vec![0; 1024];
178                        while let Ok(bytes_read) = object.read(&mut buffer).await {
179                            if bytes_read == 0 {
180                                break;
181                            }
182                            let chunk = Bytes::copy_from_slice(&buffer[..bytes_read]);
183                            tx.send(chunk).await.context("stream receiver closed")?;
184                        }
185                        anyhow::Ok(())
186                    }
187                    .await
188                    .map_err(|err| format!("{err:#}"))
189                }) as Pin<Box<dyn Future<Output = _> + Send>>,
190            ))
191        }
192        .await
193        .map_err(|err| format!("{err:#}")))
194    }
195
196    // Create or replace an object with the data blob in the specified NATS blobstore Container
197    #[instrument(level = "debug", skip(self, data))]
198    async fn write_container_data(
199        &self,
200        context: Option<Context>,
201        id: bindings::wrpc::blobstore::types::ObjectId,
202        data: Pin<Box<dyn Stream<Item = Bytes> + Send>>,
203    ) -> anyhow::Result<Result<Pin<Box<dyn Future<Output = Result<(), String>> + Send>>, String>>
204    {
205        Ok(async {
206            propagate_trace_for_ctx!(context);
207
208            let blobstore = self
209                .get_blobstore(context)
210                .await
211                .context("failed to get NATS Blobstore connection")?;
212
213            let container = blobstore
214                .jetstream
215                .get_object_store(&id.container)
216                .await
217                .context("failed to get container")?;
218
219            let metadata = async_nats::jetstream::object_store::ObjectMetadata {
220                name: id.object.clone(),
221                description: Some("NATS WASI Blobstore Object".to_string()),
222                chunk_size: Some(256 * 1024), // 256KB chunks
223                headers: None,                // No custom headers
224                metadata: HashMap::new(),     // Empty metadata map
225            };
226
227            let result: Result<(), String> = async move {
228                let data = data.map(Ok::<_, std::io::Error>);
229                let mut reader = tokio_util::io::StreamReader::new(data);
230
231                // Get timeout from config, defaulting to 30 seconds if not set
232                let timeout = Duration::from_secs(self.default_config.max_write_wait.unwrap_or(30));
233
234                tokio::time::timeout(timeout, container.put(metadata, &mut reader))
235                    .await
236                    .context("operation timed out")
237                    .map_err(|e| e.to_string())?
238                    .context("failed to write container data")
239                    .map_err(|e| e.to_string())?;
240
241                Ok(())
242            }
243            .await;
244
245            Ok(Box::pin(async move { result })
246                as Pin<Box<dyn Future<Output = Result<(), String>> + Send>>)
247        }
248        .await
249        .map_err(|err: anyhow::Error| format!("{err:#}")))
250    }
251
252    /// Helper function to list all objects in a NATS blobstore container.
253    /// This ensures consistent implementation across all functions that need to list objects.
254    /// Delegates to the core implementation in blobstore.rs.
255    #[instrument(level = "debug", skip_all)]
256    async fn list_container_objects(
257        &self,
258        context: Option<Context>,
259        name: String,
260        _offset: Option<u64>,
261        _limit: Option<u64>,
262    ) -> anyhow::Result<
263        Result<
264            (
265                Pin<Box<dyn Stream<Item = Vec<String>> + Send>>,
266                Pin<Box<dyn Future<Output = Result<(), String>> + Send>>,
267            ),
268            String,
269        >,
270    > {
271        Ok(async {
272            propagate_trace_for_ctx!(context);
273
274            // Retrieve the NATS blobstore connection
275            let blobstore = self
276                .get_blobstore(context)
277                .await
278                .context("failed to get NATS Blobstore connection")?;
279
280            // Get the container (object store) for the specified container name
281            let container = blobstore
282                .jetstream
283                .get_object_store(&name)
284                .await
285                .context("failed to get container")?;
286
287            // Get the list of objects in the container
288            let mut objects = container
289                .list()
290                .await
291                .context("failed to list container objects")?;
292
293            // Create a channel to stream the data
294            let (tx, rx) = tokio::sync::mpsc::channel(16);
295            anyhow::Ok((
296                Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx))
297                    as Pin<Box<dyn Stream<Item = Vec<String>> + Send>>,
298                Box::pin(async move {
299                    while let Some(object) = objects.next().await {
300                        let object = object.map_err(|e| format!("{e:#}"))?;
301                        tx.send(vec![object.name])
302                            .await
303                            .map_err(|e| format!("{e:#}"))?;
304                    }
305                    Ok(())
306                }) as Pin<Box<dyn Future<Output = Result<(), String>> + Send>>,
307            ))
308        }
309        .await
310        .map_err(|err: anyhow::Error| format!("{err:#}")))
311    }
312
313    // Remove all objects within the NATS blobstore Container, leaving the container empty.
314    #[instrument(level = "debug", skip(self))]
315    async fn clear_container(
316        &self,
317        context: Option<Context>,
318        name: String,
319    ) -> anyhow::Result<Result<(), String>> {
320        Ok(async {
321            propagate_trace_for_ctx!(context);
322
323            // List all objects in the container
324            let (mut objects_stream, _) = self
325                .list_container_objects(context.clone(), name.clone(), None, None)
326                .await
327                .context("failed to list container objects")
328                .map(|r| r.map_err(|e| anyhow::anyhow!(e)))??;
329
330            // Collect ALL objects from the stream into a single vector
331            let mut all_objects = Vec::new();
332            while let Some(batch) = objects_stream.next().await {
333                all_objects.extend(batch);
334            }
335
336            // Delete all objects in the container
337            self.delete_objects(context, name, all_objects)
338                .await
339                .context("failed to delete objects")
340                .map(|_| ())
341                .map_err(|e| anyhow::anyhow!(e))
342        }
343        .await
344        .map_err(|err| format!("{err:#}")))
345    }
346
347    // Delete an existing NATS Blobstore Container
348    #[instrument(level = "debug", skip(self))]
349    async fn delete_container(
350        &self,
351        context: Option<Context>,
352        name: String,
353    ) -> anyhow::Result<Result<(), String>> {
354        Ok(async {
355            propagate_trace_for_ctx!(context);
356
357            // Delete an existing NATS Blobstore
358            let blobstore = self
359                .get_blobstore(context)
360                .await
361                .context("failed to get NATS Blobstore connection")?;
362
363            // Delete the container
364            blobstore
365                .jetstream
366                .delete_object_store(name)
367                .await
368                .context("failed to delete NATS Blobstore Container")
369                .map(|_| ())
370        }
371        .await
372        .map_err(|err| format!("{err:#}")))
373    }
374
375    // Check if an object exists in the specified NATS Blobstore Container
376    #[instrument(level = "debug", skip(self))]
377    async fn has_object(
378        &self,
379        context: Option<Context>,
380        id: bindings::wrpc::blobstore::types::ObjectId,
381    ) -> anyhow::Result<Result<bool, String>> {
382        Ok(async {
383            propagate_trace_for_ctx!(context);
384
385            // Retrieve the NATS blobstore connection
386            let blobstore = self
387                .get_blobstore(context)
388                .await
389                .context("failed to get NATS Blobstore connection")?;
390
391            // Get the container (object store) for the specified container name
392            let container = blobstore
393                .jetstream
394                .get_object_store(&id.container)
395                .await
396                .context("failed to get container")?;
397
398            // Check if the object exists
399            container
400                .info(id.object)
401                .await
402                .context("failed to get object info")
403                .map(|_| true)
404                .map_err(|e| anyhow::anyhow!(e))
405        }
406        .await
407        .map_err(|err| format!("{err:#}")))
408    }
409
410    // Get metadata of an object in the specified NATS blobstore Container
411    #[instrument(level = "debug", skip(self))]
412    async fn get_object_info(
413        &self,
414        context: Option<Context>,
415        id: bindings::wrpc::blobstore::types::ObjectId,
416    ) -> anyhow::Result<Result<bindings::wrpc::blobstore::types::ObjectMetadata, String>> {
417        Ok(async {
418            propagate_trace_for_ctx!(context);
419
420            // Retrieve the NATS blobstore connection
421            let blobstore = self
422                .get_blobstore(context)
423                .await
424                .context("failed to get NATS Blobstore connection")?;
425
426            // Get the container (object store) for the specified container name
427            let container = blobstore
428                .jetstream
429                .get_object_store(&id.container)
430                .await
431                .context("failed to get container")?;
432
433            // Get the object info
434            container
435                .info(id.object)
436                .await
437                .context("failed to get object info")
438                .map(
439                    |object_info| bindings::wrpc::blobstore::types::ObjectMetadata {
440                        // NATS doesn't store the object creation time, so always return the Unix epoch
441                        created_at: 0,
442                        size: object_info.size as u64,
443                    },
444                )
445                .map_err(|e| anyhow::anyhow!(e))
446        }
447        .await
448        .map_err(|err| format!("{err:#}")))
449    }
450
451    // Copy an object from one to the same (different key name, or revision number update), or another NATS Blobstore Container
452    #[instrument(level = "debug", skip(self))]
453    async fn copy_object(
454        &self,
455        context: Option<Context>,
456        source: bindings::wrpc::blobstore::types::ObjectId,
457        destination: bindings::wrpc::blobstore::types::ObjectId,
458    ) -> anyhow::Result<Result<(), String>> {
459        Ok(async {
460            propagate_trace_for_ctx!(context);
461
462            // Skip the copy if source and destination are the same
463            if source.container == destination.container && source.object == destination.object {
464                info!(
465                    "skipping copying object '{}' to itself in container '{}'",
466                    source.object, source.container
467                );
468                return Ok(());
469            }
470
471            // Retrieve the NATS blobstore connection
472            let blobstore = self
473                .get_blobstore(context)
474                .await
475                .context("failed to get NATS Blobstore connection")?;
476
477            // Get the source container (object store)
478            let src_container = blobstore
479                .jetstream
480                .get_object_store(&source.container)
481                .await
482                .context("failed to open source NATS Blobstore Container")?;
483
484            // Get the destination container (object store)
485            let dst_container = blobstore
486                .jetstream
487                .get_object_store(&destination.container)
488                .await
489                .context("failed to open destination NATS Blobstore Container")?;
490
491            // Get the source object
492            let mut src_object = src_container
493                .get(source.object.clone())
494                .await
495                .context("failed to read object from source container")?;
496
497            // Prepare metadata for the destination object
498            let metadata = async_nats::jetstream::object_store::ObjectMetadata {
499                name: destination.object.clone(),
500                description: src_object.info.description.clone(),
501                chunk_size: Some(src_object.info.chunks),
502                headers: None,            // No custom headers
503                metadata: HashMap::new(), // Empty metadata map
504            };
505
506            // Put the object into the destination container
507            dst_container
508                .put(metadata, &mut src_object)
509                .await
510                .context("failed to copy object to destination container")
511                .map(|_| ())
512        }
513        .await
514        .map_err(|err| format!("{err:#}")))
515    }
516
517    // Move an object from one to the same (different key name, or revision number update), or another NATS Blobstore Container
518    #[instrument(level = "debug", skip(self))]
519    async fn move_object(
520        &self,
521        context: Option<Context>,
522        source: bindings::wrpc::blobstore::types::ObjectId,
523        destination: bindings::wrpc::blobstore::types::ObjectId,
524    ) -> anyhow::Result<Result<(), String>> {
525        Ok(async {
526            propagate_trace_for_ctx!(context);
527
528            // Skip the move if source and destination are the same
529            if source.container == destination.container && source.object == destination.object {
530                info!(
531                    "skipping moving object '{}' to itself in container '{}'",
532                    source.object, source.container
533                );
534                return Ok(());
535            }
536
537            // Retrieve the NATS blobstore connection
538            let blobstore = self
539                .get_blobstore(context)
540                .await
541                .context("failed to get NATS Blobstore connection")?;
542
543            // Get the source container (object store)
544            let src_container = blobstore
545                .jetstream
546                .get_object_store(&source.container)
547                .await
548                .context("failed to open source NATS Blobstore Container")?;
549
550            // Get the destination container (object store)
551            let dst_container = blobstore
552                .jetstream
553                .get_object_store(&destination.container)
554                .await
555                .context("failed to open destination NATS Blobstore Container")?;
556
557            // Get the source object
558            let mut src_object = src_container
559                .get(source.object.clone())
560                .await
561                .context("failed to read object from source container")?;
562
563            // Prepare metadata for the destination object
564            let metadata = async_nats::jetstream::object_store::ObjectMetadata {
565                name: destination.object.clone(),
566                description: src_object.info.description.clone(),
567                chunk_size: Some(src_object.info.chunks),
568                headers: None,            // No custom headers
569                metadata: HashMap::new(), // Empty metadata map
570            };
571
572            // Put the object into the destination container
573            dst_container
574                .put(metadata, &mut src_object)
575                .await
576                .context("failed to move object to destination container")?;
577
578            // Delete the source object
579            src_container
580                .delete(source.object.clone())
581                .await
582                .context("failed to delete object from source container")
583                .map(|_| ())
584        }
585        .await
586        .map_err(|err| format!("{err:#}")))
587    }
588
589    // Delete an object in the specified NATS Blobstore Container
590    #[instrument(level = "debug", skip(self))]
591    async fn delete_object(
592        &self,
593        context: Option<Context>,
594        id: bindings::wrpc::blobstore::types::ObjectId,
595    ) -> anyhow::Result<Result<(), String>> {
596        Ok(async {
597            propagate_trace_for_ctx!(context);
598
599            // Retrieve the NATS blobstore connection
600            let blobstore = self
601                .get_blobstore(context)
602                .await
603                .map_err(|e| e.to_string())?;
604
605            // Get the container (object store) for the specified container name
606            let container = blobstore
607                .jetstream
608                .get_object_store(&id.container)
609                .await
610                .map_err(|e| e.to_string())?;
611            // Delete the object
612            let result: Result<(), String> =
613                container.delete(id.object).await.map_err(|e| e.to_string());
614
615            result
616        }
617        .await
618        .map_err(|err: String| format!("{err:#}")))
619    }
620
621    // Delete multiple objects in the specified NATS Blobstore Container
622    // Objects are deleted concurrently in batches for improved performance
623    #[instrument(level = "trace", skip(self))]
624    async fn delete_objects(
625        &self,
626        context: Option<Context>,
627        name: String,
628        objects: Vec<String>,
629    ) -> anyhow::Result<Result<(), String>> {
630        Ok(async {
631            propagate_trace_for_ctx!(context);
632
633            // Create deletion tasks in batches of 50 to prevent overwhelming the server
634            const BATCH_SIZE: usize = 50;
635            let mut handles = Vec::with_capacity(objects.len());
636
637            // Process objects in chunks to maintain reasonable resource usage
638            for chunk in objects.chunks(BATCH_SIZE) {
639                let mut chunk_handles = chunk
640                    .iter()
641                    .map(|object| {
642                        let ctx = context.clone();
643                        let container = name.clone();
644                        let object = object.clone();
645                        let this = self.clone();
646
647                        tokio::spawn(async move {
648                            this.delete_object(
649                                ctx,
650                                bindings::wrpc::blobstore::types::ObjectId { container, object },
651                            )
652                            .await
653                        })
654                    })
655                    .collect::<Vec<_>>();
656                handles.append(&mut chunk_handles);
657            }
658
659            // Wait for all deletion tasks to complete
660            let results = futures::future::join_all(handles).await;
661
662            // Process results and collect any errors
663            let errors: Vec<String> = results
664                .into_iter()
665                .filter_map(|r| match r {
666                    Ok(Ok(Ok(()))) => None,                               // Successful deletion
667                    Ok(Ok(Err(e))) => Some(e),                            // Operation error
668                    Ok(Err(e)) => Some(format!("Provider error: {e:#}")), // Provider error
669                    Err(e) => Some(format!("Task join error: {e:#}")),    // Task execution error
670                })
671                .collect();
672
673            if errors.is_empty() {
674                Ok(())
675            } else {
676                Err(format!(
677                    "Failed to delete some objects: {}",
678                    errors.join("; ")
679                ))
680            }
681        }
682        .await
683        .map_err(|err| format!("{err:#}")))
684    }
685}