wasmcloud_provider_blobstore_fs/
lib.rs

1#![allow(clippy::type_complexity)]
2
3//! blobstore-fs capability provider
4
5use core::future::Future;
6use core::pin::Pin;
7use core::time::Duration;
8
9use std::collections::HashMap;
10use std::io::SeekFrom;
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13use std::time::SystemTime;
14
15use anyhow::{anyhow, bail, Context as _};
16use bytes::Bytes;
17use futures::{Stream, StreamExt as _, TryStreamExt as _};
18use path_clean::PathClean;
19use tokio::fs::{self, create_dir_all, File};
20use tokio::io::{self, AsyncReadExt as _, AsyncSeekExt as _};
21use tokio::sync::{mpsc, RwLock};
22use tokio_stream::wrappers::{ReadDirStream, ReceiverStream};
23use tokio_util::io::{ReaderStream, StreamReader};
24use tracing::{debug, error, info, instrument, trace};
25use wasmcloud_provider_sdk::{
26    get_connection, initialize_observability, propagate_trace_for_ctx, run_provider,
27    serve_provider_exports, Context, LinkConfig, LinkDeleteInfo, Provider,
28};
29use wrpc_interface_blobstore::bindings::{
30    exports::wrpc::blobstore::blobstore::Handler,
31    serve,
32    wrpc::blobstore::types::{ContainerMetadata, ObjectId, ObjectMetadata},
33};
34
35#[derive(Default, Debug, Clone)]
36struct FsProviderConfig {
37    root: Arc<PathBuf>,
38}
39
40/// fs capability provider implementation
41#[derive(Default, Clone)]
42pub struct FsProvider {
43    config: Arc<RwLock<HashMap<String, FsProviderConfig>>>,
44}
45
46pub async fn run() -> anyhow::Result<()> {
47    FsProvider::run().await
48}
49
50impl FsProvider {
51    pub async fn run() -> anyhow::Result<()> {
52        initialize_observability!(
53            "blobstore-fs-provider",
54            std::env::var_os("PROVIDER_BLOBSTORE_FS_FLAMEGRAPH_PATH")
55        );
56
57        let provider = Self::default();
58        let shutdown = run_provider(provider.clone(), "blobstore-fs-provider")
59            .await
60            .context("failed to run provider")?;
61        let connection = get_connection();
62        let wrpc = connection
63            .get_wrpc_client(connection.provider_key())
64            .await?;
65        serve_provider_exports(&wrpc, provider, shutdown, serve)
66            .await
67            .context("failed to serve provider exports")
68    }
69}
70
71/// Resolve a path with two components (base & root),
72/// ensuring that the path is below the given root.
73fn resolve_subpath(root: &Path, path: impl AsRef<Path>) -> Result<PathBuf, std::io::Error> {
74    let joined = root.join(&path);
75    let joined = joined.clean();
76
77    // Check components of either path
78    let mut joined_abs_iter = joined.components();
79    for root_part in root.components() {
80        let joined_part = joined_abs_iter.next();
81
82        // If the joined path is shorter or doesn't match
83        // for the duration of the root, path is suspect
84        if joined_part.is_none() || joined_part != Some(root_part) {
85            return Err(std::io::Error::new(
86                std::io::ErrorKind::PermissionDenied,
87                format!(
88                    "Invalid path [{}], is not contained by root path [{}]",
89                    path.as_ref().display(),
90                    root.display(),
91                ),
92            ));
93        }
94    }
95
96    // At this point, the root iterator has ben exhausted
97    // and the remaining components are the paths beneath the root
98    Ok(joined)
99}
100
101impl FsProvider {
102    async fn get_root(&self, context: Option<Context>) -> anyhow::Result<Arc<PathBuf>> {
103        if let Some(ref source_id) = context.and_then(|Context { component, .. }| component) {
104            self.config
105                .read()
106                .await
107                .get(source_id)
108                .with_context(|| format!("failed to lookup {source_id} configuration"))
109                .map(|FsProviderConfig { root }| Arc::clone(root))
110        } else {
111            // TODO: Support a default here
112            bail!("failed to lookup invocation source ID")
113        }
114    }
115
116    async fn get_container(
117        &self,
118        context: Option<Context>,
119        container: impl AsRef<Path>,
120    ) -> anyhow::Result<PathBuf> {
121        let root = self
122            .get_root(context)
123            .await
124            .context("failed to get container root")?;
125        resolve_subpath(&root, container).context("failed to resolve subpath")
126    }
127
128    async fn get_object(
129        &self,
130        context: Option<Context>,
131        ObjectId { container, object }: ObjectId,
132    ) -> anyhow::Result<PathBuf> {
133        let container = self
134            .get_container(context, container)
135            .await
136            .context("failed to get container")?;
137        resolve_subpath(&container, object).context("failed to resolve subpath")
138    }
139}
140
141impl Handler<Option<Context>> for FsProvider {
142    #[instrument(level = "trace", skip(self))]
143    async fn clear_container(
144        &self,
145        cx: Option<Context>,
146        name: String,
147    ) -> anyhow::Result<Result<(), String>> {
148        Ok(async {
149            propagate_trace_for_ctx!(cx);
150            let path = self.get_container(cx, name).await?;
151            debug!("read directory at `{}`", path.display());
152            let dir = fs::read_dir(path).await.context("failed to read path")?;
153            ReadDirStream::new(dir)
154                .map(|entry| entry.context("failed to lookup directory entry"))
155                .try_for_each_concurrent(None, |entry| async move {
156                    let ty = entry
157                        .file_type()
158                        .await
159                        .context("failed to lookup directory entry type")?;
160                    let path = entry.path();
161                    if ty.is_dir() {
162                        fs::remove_dir_all(&path).await.with_context(|| {
163                            format!("failed to remove directory at `{}`", path.display())
164                        })?;
165                    } else {
166                        fs::remove_file(&path).await.with_context(|| {
167                            format!("failed to remove file at `{}`", path.display())
168                        })?;
169                    }
170                    Ok(())
171                })
172                .await
173                .context("failed to remove directory contents")
174        }
175        .await
176        .map_err(|err| format!("{err:#}")))
177    }
178
179    #[instrument(level = "trace", skip(self))]
180    async fn container_exists(
181        &self,
182        cx: Option<Context>,
183        name: String,
184    ) -> anyhow::Result<Result<bool, String>> {
185        Ok(async {
186            propagate_trace_for_ctx!(cx);
187            let path = self.get_container(cx, name).await?;
188            fs::try_exists(path)
189                .await
190                .context("failed to check if path exists")
191        }
192        .await
193        .map_err(|err| format!("{err:#}")))
194    }
195
196    #[instrument(level = "trace", skip(self))]
197    async fn create_container(
198        &self,
199        cx: Option<Context>,
200        name: String,
201    ) -> anyhow::Result<Result<(), String>> {
202        Ok(async {
203            propagate_trace_for_ctx!(cx);
204            let path = self.get_container(cx, name).await?;
205            fs::create_dir_all(path)
206                .await
207                .context("failed to create path")
208        }
209        .await
210        .map_err(|err| format!("{err:#}")))
211    }
212
213    #[instrument(level = "trace", skip(self))]
214    async fn delete_container(
215        &self,
216        cx: Option<Context>,
217        name: String,
218    ) -> anyhow::Result<Result<(), String>> {
219        Ok(async {
220            propagate_trace_for_ctx!(cx);
221            let path = self.get_container(cx, name).await?;
222            fs::remove_dir_all(path)
223                .await
224                .context("failed to remove path")
225        }
226        .await
227        .map_err(|err| format!("{err:#}")))
228    }
229
230    #[instrument(level = "trace", skip(self))]
231    async fn get_container_info(
232        &self,
233        cx: Option<Context>,
234        name: String,
235    ) -> anyhow::Result<Result<ContainerMetadata, String>> {
236        Ok(async {
237            propagate_trace_for_ctx!(cx);
238            let path = self.get_container(cx, name).await?;
239            let md = fs::metadata(&path)
240                .await
241                .context("failed to lookup directory metadata")?;
242
243            let created_at = match md.created() {
244                Ok(created_time) => created_time
245                    .duration_since(SystemTime::UNIX_EPOCH)
246                    .context("creation time before Unix epoch")?,
247                Err(e) => {
248                    // NOTE: Some platforms don't have support for creation time, so we default to the unix epoch
249                    debug!(
250                        error = ?e,
251                        ?path,
252                        "failed to get creation time for container, defaulting to 0"
253                    );
254                    Duration::from_secs(0)
255                }
256            };
257            // NOTE: The `created_at` format is currently undefined
258            // https://github.com/WebAssembly/wasi-blobstore/issues/7
259            anyhow::Ok(ContainerMetadata {
260                created_at: created_at.as_secs(),
261            })
262        }
263        .await
264        .map_err(|err| format!("{err:#}")))
265    }
266
267    #[instrument(level = "trace", skip(self))]
268    async fn list_container_objects(
269        &self,
270        cx: Option<Context>,
271        name: String,
272        limit: Option<u64>,
273        offset: Option<u64>,
274    ) -> anyhow::Result<
275        Result<
276            (
277                Pin<Box<dyn Stream<Item = Vec<String>> + Send>>,
278                Pin<Box<dyn Future<Output = Result<(), String>> + Send>>,
279            ),
280            String,
281        >,
282    > {
283        Ok(async {
284            propagate_trace_for_ctx!(cx);
285            let path = self.get_container(cx, name).await?;
286            let offset = offset.unwrap_or_default().try_into().unwrap_or(usize::MAX);
287            let limit = limit.unwrap_or(u64::MAX).try_into().unwrap_or(usize::MAX);
288            debug!(path = ?path.display(), offset, limit, "read directory");
289            let dir = fs::read_dir(path).await.context("failed to read path")?;
290            let mut names = ReadDirStream::new(dir)
291                .skip(offset)
292                .take(limit)
293                .map(move |entry| {
294                    let entry = entry.context("failed to lookup directory entry")?;
295                    let name = entry.file_name().to_string_lossy().to_string();
296                    trace!(name, "list file name");
297                    anyhow::Ok(name)
298                });
299            let (tx, rx) = mpsc::channel(16);
300            anyhow::Ok((
301                Box::pin(ReceiverStream::new(rx).ready_chunks(128))
302                    as Pin<Box<dyn Stream<Item = _> + Send>>,
303                Box::pin(async move {
304                    async move {
305                        while let Some(name) = names.next().await {
306                            let name = name.context("failed to list file names")?;
307                            tx.send(name).await.context("stream receiver closed")?;
308                        }
309                        anyhow::Ok(())
310                    }
311                    .await
312                    .map_err(|err| format!("{err:#}"))
313                }) as Pin<Box<dyn Future<Output = _> + Send>>,
314            ))
315        }
316        .await
317        .map_err(|err| format!("{err:#}")))
318    }
319
320    #[instrument(level = "trace", skip(self))]
321    async fn copy_object(
322        &self,
323        cx: Option<Context>,
324        src: ObjectId,
325        dest: ObjectId,
326    ) -> anyhow::Result<Result<(), String>> {
327        Ok(async {
328            propagate_trace_for_ctx!(cx);
329            let root = self.get_root(cx).await.context("failed to get root")?;
330            let src_container = resolve_subpath(&root, src.container)
331                .context("failed to resolve source container path")?;
332            let src = resolve_subpath(&src_container, src.object)
333                .context("failed to resolve source object path")?;
334
335            let dest_container = resolve_subpath(&root, dest.container)
336                .context("failed to resolve destination container path")?;
337            let dest = resolve_subpath(&dest_container, dest.object)
338                .context("failed to resolve destination object path")?;
339            debug!("copy `{}` to `{}`", src.display(), dest.display());
340            fs::copy(src, dest).await.context("failed to copy")?;
341            anyhow::Ok(())
342        }
343        .await
344        .map_err(|err| format!("{err:#}")))
345    }
346
347    #[instrument(level = "trace", skip(self))]
348    async fn delete_object(
349        &self,
350        cx: Option<Context>,
351        id: ObjectId,
352    ) -> anyhow::Result<Result<(), String>> {
353        Ok(async {
354            propagate_trace_for_ctx!(cx);
355            let path = self.get_object(cx, id).await?;
356            debug!("remove file at `{}`", path.display());
357            match fs::remove_file(&path).await {
358                Ok(()) => Ok(()),
359                Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
360                Err(err) => {
361                    Err(anyhow!(err)
362                        .context(format!("failed to remove file at `{}`", path.display())))
363                }
364            }
365        }
366        .await
367        .map_err(|err| format!("{err:#}")))
368    }
369
370    #[instrument(level = "trace", skip(self))]
371    async fn delete_objects(
372        &self,
373        cx: Option<Context>,
374        container: String,
375        objects: Vec<String>,
376    ) -> anyhow::Result<Result<(), String>> {
377        Ok(async {
378            propagate_trace_for_ctx!(cx);
379            let container = self.get_container(cx, container).await?;
380            for name in objects {
381                let path =
382                    resolve_subpath(&container, name).context("failed to resolve object path")?;
383                debug!("remove file at `{}`", path.display());
384                match fs::remove_file(&path).await {
385                    Ok(()) => Ok(()),
386                    Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
387                    Err(err) => Err(anyhow!(err)
388                        .context(format!("failed to remove file at `{}`", path.display()))),
389                }?;
390            }
391            anyhow::Ok(())
392        }
393        .await
394        .map_err(|err| format!("{err:#}")))
395    }
396
397    #[instrument(level = "trace", skip(self))]
398    async fn get_container_data(
399        &self,
400        cx: Option<Context>,
401        id: ObjectId,
402        start: u64,
403        end: u64,
404    ) -> anyhow::Result<
405        Result<
406            (
407                Pin<Box<dyn Stream<Item = Bytes> + Send>>,
408                Pin<Box<dyn Future<Output = Result<(), String>> + Send>>,
409            ),
410            String,
411        >,
412    > {
413        Ok(async {
414            propagate_trace_for_ctx!(cx);
415            let limit = end
416                .checked_sub(start)
417                .context("`end` must be greater than `start`")?;
418            let path = self.get_object(cx, id).await?;
419            debug!(path = ?path.display(), "open file");
420            let mut object = File::open(&path)
421                .await
422                .with_context(|| format!("failed to open object file [{}]", path.display()))?;
423            if start > 0 {
424                debug!("seek file");
425                object
426                    .seek(SeekFrom::Start(start))
427                    .await
428                    .context("failed to seek from start")?;
429            }
430            let mut data = ReaderStream::new(object.take(limit));
431            let (tx, rx) = mpsc::channel(16);
432            anyhow::Ok((
433                Box::pin(ReceiverStream::new(rx)) as Pin<Box<dyn Stream<Item = _> + Send>>,
434                Box::pin(async move {
435                    async move {
436                        while let Some(buf) = data.next().await {
437                            let buf = buf.context("failed to read file")?;
438                            debug!(?buf, "sending chunk");
439                            tx.send(buf).await.context("stream receiver closed")?;
440                        }
441                        debug!("finished reading file");
442                        anyhow::Ok(())
443                    }
444                    .await
445                    .map_err(|err| format!("{err:#}"))
446                }) as Pin<Box<dyn Future<Output = _> + Send>>,
447            ))
448        }
449        .await
450        .map_err(|err| format!("{err:#}")))
451    }
452
453    #[instrument(level = "trace", skip(self))]
454    async fn get_object_info(
455        &self,
456        cx: Option<Context>,
457        id: ObjectId,
458    ) -> anyhow::Result<Result<ObjectMetadata, String>> {
459        Ok(async {
460            propagate_trace_for_ctx!(cx);
461            let path = self.get_object(cx, id).await?;
462            let md = fs::metadata(&path)
463                .await
464                .context("failed to lookup file metadata")?;
465
466            let created_at = match md.created() {
467                Ok(created_time) => created_time
468                    .duration_since(SystemTime::UNIX_EPOCH)
469                    .context("creation time before Unix epoch")?,
470                Err(e) => {
471                    // NOTE: Some platforms don't have support for creation time, so we default to the unix epoch
472                    debug!(
473                        error = ?e,
474                        ?path,
475                        "failed to get creation time for object, defaulting to 0"
476                    );
477                    Duration::from_secs(0)
478                }
479            };
480            // NOTE: The `created_at` format is currently undefined
481            // https://github.com/WebAssembly/wasi-blobstore/issues/7
482            #[cfg(unix)]
483            let size = std::os::unix::fs::MetadataExt::size(&md);
484            #[cfg(windows)]
485            let size = std::os::windows::fs::MetadataExt::file_size(&md);
486            anyhow::Ok(ObjectMetadata {
487                created_at: created_at.as_secs(),
488                size,
489            })
490        }
491        .await
492        .map_err(|err| format!("{err:#}")))
493    }
494
495    #[instrument(level = "trace", skip(self))]
496    async fn has_object(
497        &self,
498        cx: Option<Context>,
499        id: ObjectId,
500    ) -> anyhow::Result<Result<bool, String>> {
501        Ok(async {
502            propagate_trace_for_ctx!(cx);
503            let path = self.get_object(cx, id).await?;
504            fs::try_exists(path)
505                .await
506                .context("failed to check if path exists")
507        }
508        .await
509        .map_err(|err| format!("{err:#}")))
510    }
511
512    #[instrument(level = "trace", skip(self))]
513    async fn move_object(
514        &self,
515        cx: Option<Context>,
516        src: ObjectId,
517        dest: ObjectId,
518    ) -> anyhow::Result<Result<(), String>> {
519        Ok(async {
520            propagate_trace_for_ctx!(cx);
521            let root = self.get_root(cx).await.context("failed to get root")?;
522            let src_container = resolve_subpath(&root, src.container)
523                .context("failed to resolve source container path")?;
524            let src = resolve_subpath(&src_container, src.object)
525                .context("failed to resolve source object path")?;
526
527            let dest_container = resolve_subpath(&root, dest.container)
528                .context("failed to resolve destination container path")?;
529            let dest = resolve_subpath(&dest_container, dest.object)
530                .context("failed to resolve destination object path")?;
531            debug!("copy `{}` to `{}`", src.display(), dest.display());
532            fs::copy(&src, dest).await.context("failed to copy")?;
533            debug!("remove `{}`", src.display());
534            fs::remove_file(src)
535                .await
536                .context("failed to remove source")
537        }
538        .await
539        .map_err(|err| format!("{err:#}")))
540    }
541
542    #[instrument(level = "trace", skip(self, data))]
543    async fn write_container_data(
544        &self,
545        cx: Option<Context>,
546        id: ObjectId,
547        data: Pin<Box<dyn Stream<Item = Bytes> + Send>>,
548    ) -> anyhow::Result<Result<Pin<Box<dyn Future<Output = Result<(), String>> + Send>>, String>>
549    {
550        Ok(async {
551            propagate_trace_for_ctx!(cx);
552            let path = self.get_object(cx, id).await?;
553            if let Some(parent) = path.parent() {
554                info!(parent = ?parent.display(), "creating directory");
555                fs::create_dir_all(parent)
556                    .await
557                    .context("failed to create parent directories")?;
558            }
559            let mut file = File::options()
560                .create(true)
561                .truncate(true)
562                .write(true)
563                .open(&path)
564                .await
565                .context("failed to open file")?;
566            anyhow::Ok(Box::pin(async move {
567                debug!(path = ?path.display(), "streaming data to file");
568                let n = io::copy(
569                    &mut StreamReader::new(data.map(|chunk| {
570                        trace!(?chunk, "received data chunk");
571                        std::io::Result::Ok(chunk)
572                    })),
573                    &mut file,
574                )
575                .await
576                .context("failed to write file")
577                .map_err(|err| format!("{err:#}"))?;
578                debug!(n, path = ?path.display(), "finished writing file");
579                Ok(())
580            }) as Pin<Box<dyn Future<Output = _> + Send>>)
581        }
582        .await
583        .map_err(|err| format!("{err:#}")))
584    }
585}
586
587impl Provider for FsProvider {
588    /// The fs provider has one configuration parameter, the root of the file system
589    async fn receive_link_config_as_target(
590        &self,
591        LinkConfig {
592            source_id, config, ..
593        }: LinkConfig<'_>,
594    ) -> anyhow::Result<()> {
595        for (k, v) in config {
596            info!("link definition configuration [{k}] set to [{v}]");
597        }
598
599        // Determine the root path value
600        let root_val: PathBuf = match config.iter().find(|(key, _)| key.to_uppercase() == "ROOT") {
601            None => {
602                // If no root is specified, use the tempdir and create a specific directory for this component
603                let root = std::env::temp_dir();
604                // Resolve the subpath from the root to the component ID, carefully
605                match resolve_subpath(&root, source_id) {
606                    Ok(path) => path,
607                    Err(e) => {
608                        error!("Failed to resolve subpath to component directory: {e}");
609                        return Err(
610                            anyhow!(e).context("failed to resolve subpath to component dir")
611                        );
612                    }
613                }
614            }
615            // If a root is manually specified, use that path exactly
616            Some((_, value)) => value.into(),
617        };
618
619        // Ensure the root path exists
620        if let Err(e) = create_dir_all(&root_val).await {
621            error!("Could not create component directory: {:?}", e);
622            return Err(anyhow!(e).context("failed to create component directory"));
623        }
624
625        // Build configuration for FS Provider to use later
626        let config = FsProviderConfig {
627            root: Arc::new(root_val.clean()),
628        };
629
630        info!("Saved FsProviderConfig: {:#?}", config);
631        info!(
632            "File System Blob Store Container Root: '{:?}'",
633            &config.root
634        );
635
636        // Save the configuration for the component
637        self.config
638            .write()
639            .await
640            .insert(source_id.into(), config.clone());
641
642        Ok(())
643    }
644
645    #[instrument(level = "info", skip_all, fields(source_id = info.get_source_id()))]
646    async fn delete_link_as_target(&self, info: impl LinkDeleteInfo) -> anyhow::Result<()> {
647        let component_id = info.get_source_id();
648        self.config.write().await.remove(component_id);
649        Ok(())
650    }
651
652    async fn shutdown(&self) -> anyhow::Result<()> {
653        self.config.write().await.drain();
654        Ok(())
655    }
656}
657
658#[cfg(test)]
659mod tests {
660    use super::*;
661    use futures::stream;
662    use tempfile::tempdir;
663    use wrpc_interface_blobstore::bindings::exports::wrpc::blobstore::blobstore::Handler;
664
665    /// Ensure that only safe subpaths are resolved
666    #[tokio::test]
667    async fn resolve_safe_samepath() {
668        assert!(resolve_subpath(&PathBuf::from("./"), "./././").is_ok());
669    }
670
671    /// Ensure that ancestor paths are not allowed to be resolved as subpaths
672    #[tokio::test]
673    async fn resolve_fail_ancestor() {
674        let res = resolve_subpath(&PathBuf::from("./"), "../").unwrap_err();
675        assert_eq!(res.kind(), std::io::ErrorKind::PermissionDenied);
676    }
677
678    #[tokio::test]
679    async fn test_write_container_data() {
680        // Create a temporary directory
681        let temp_dir = tempdir().unwrap();
682        let root_path = temp_dir.path().to_path_buf();
683
684        // Create a mock FsProvider with the temporary directory as the root
685        let config = Arc::new(RwLock::new(HashMap::new()));
686        config.write().await.insert(
687            "test_source".to_string(),
688            FsProviderConfig {
689                root: Arc::new(root_path.clone()),
690            },
691        );
692        let provider = FsProvider { config };
693
694        // Create a mock Context and ObjectId
695        let context = Some(Context {
696            component: Some("test_source".to_string()),
697            ..Default::default()
698        });
699        let object_id = ObjectId {
700            container: "test_container".to_string(),
701            object: "test_object/with_slash.txt".to_string(),
702        };
703
704        // Create a stream of Bytes to write
705        let data = stream::iter(vec![Ok(Bytes::from("Hello, ")), Ok(Bytes::from("world!"))])
706            .map(|result: Result<Bytes, std::io::Error>| result.unwrap());
707
708        // Call the write_container_data function
709        let result = provider
710            .write_container_data(context, object_id, Box::pin(data))
711            .await;
712
713        // Ensure the result is Ok
714        assert!(result.is_ok());
715
716        // Get the future from the result and await it
717        let write_future = result.unwrap().unwrap();
718        let write_result = write_future.await;
719
720        // Ensure the write result is Ok
721        assert!(write_result.is_ok());
722
723        // File path with slashes
724        let file_path = root_path.join("test_container/test_object/with_slash.txt");
725        // Verify the file contents
726        let contents = tokio::fs::read_to_string(file_path).await.unwrap();
727        assert_eq!(contents, "Hello, world!");
728    }
729}