1#![allow(clippy::type_complexity)]
2
3use core::future::Future;
6use core::pin::Pin;
7use core::time::Duration;
8
9use std::collections::HashMap;
10use std::io::SeekFrom;
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13use std::time::SystemTime;
14
15use anyhow::{anyhow, bail, Context as _};
16use bytes::Bytes;
17use futures::{Stream, StreamExt as _, TryStreamExt as _};
18use path_clean::PathClean;
19use tokio::fs::{self, create_dir_all, File};
20use tokio::io::{self, AsyncReadExt as _, AsyncSeekExt as _};
21use tokio::sync::{mpsc, RwLock};
22use tokio_stream::wrappers::{ReadDirStream, ReceiverStream};
23use tokio_util::io::{ReaderStream, StreamReader};
24use tracing::{debug, error, info, instrument, trace};
25use wasmcloud_provider_sdk::{
26 get_connection, initialize_observability, propagate_trace_for_ctx, run_provider,
27 serve_provider_exports, Context, LinkConfig, LinkDeleteInfo, Provider,
28};
29use wrpc_interface_blobstore::bindings::{
30 exports::wrpc::blobstore::blobstore::Handler,
31 serve,
32 wrpc::blobstore::types::{ContainerMetadata, ObjectId, ObjectMetadata},
33};
34
35#[derive(Default, Debug, Clone)]
36struct FsProviderConfig {
37 root: Arc<PathBuf>,
38}
39
40#[derive(Default, Clone)]
42pub struct FsProvider {
43 config: Arc<RwLock<HashMap<String, FsProviderConfig>>>,
44}
45
46pub async fn run() -> anyhow::Result<()> {
47 FsProvider::run().await
48}
49
50impl FsProvider {
51 pub async fn run() -> anyhow::Result<()> {
52 initialize_observability!(
53 "blobstore-fs-provider",
54 std::env::var_os("PROVIDER_BLOBSTORE_FS_FLAMEGRAPH_PATH")
55 );
56
57 let provider = Self::default();
58 let shutdown = run_provider(provider.clone(), "blobstore-fs-provider")
59 .await
60 .context("failed to run provider")?;
61 let connection = get_connection();
62 let wrpc = connection
63 .get_wrpc_client(connection.provider_key())
64 .await?;
65 serve_provider_exports(&wrpc, provider, shutdown, serve)
66 .await
67 .context("failed to serve provider exports")
68 }
69}
70
71fn resolve_subpath(root: &Path, path: impl AsRef<Path>) -> Result<PathBuf, std::io::Error> {
74 let joined = root.join(&path);
75 let joined = joined.clean();
76
77 let mut joined_abs_iter = joined.components();
79 for root_part in root.components() {
80 let joined_part = joined_abs_iter.next();
81
82 if joined_part.is_none() || joined_part != Some(root_part) {
85 return Err(std::io::Error::new(
86 std::io::ErrorKind::PermissionDenied,
87 format!(
88 "Invalid path [{}], is not contained by root path [{}]",
89 path.as_ref().display(),
90 root.display(),
91 ),
92 ));
93 }
94 }
95
96 Ok(joined)
99}
100
101impl FsProvider {
102 async fn get_root(&self, context: Option<Context>) -> anyhow::Result<Arc<PathBuf>> {
103 if let Some(ref source_id) = context.and_then(|Context { component, .. }| component) {
104 self.config
105 .read()
106 .await
107 .get(source_id)
108 .with_context(|| format!("failed to lookup {source_id} configuration"))
109 .map(|FsProviderConfig { root }| Arc::clone(root))
110 } else {
111 bail!("failed to lookup invocation source ID")
113 }
114 }
115
116 async fn get_container(
117 &self,
118 context: Option<Context>,
119 container: impl AsRef<Path>,
120 ) -> anyhow::Result<PathBuf> {
121 let root = self
122 .get_root(context)
123 .await
124 .context("failed to get container root")?;
125 resolve_subpath(&root, container).context("failed to resolve subpath")
126 }
127
128 async fn get_object(
129 &self,
130 context: Option<Context>,
131 ObjectId { container, object }: ObjectId,
132 ) -> anyhow::Result<PathBuf> {
133 let container = self
134 .get_container(context, container)
135 .await
136 .context("failed to get container")?;
137 resolve_subpath(&container, object).context("failed to resolve subpath")
138 }
139}
140
141impl Handler<Option<Context>> for FsProvider {
142 #[instrument(level = "trace", skip(self))]
143 async fn clear_container(
144 &self,
145 cx: Option<Context>,
146 name: String,
147 ) -> anyhow::Result<Result<(), String>> {
148 Ok(async {
149 propagate_trace_for_ctx!(cx);
150 let path = self.get_container(cx, name).await?;
151 debug!("read directory at `{}`", path.display());
152 let dir = fs::read_dir(path).await.context("failed to read path")?;
153 ReadDirStream::new(dir)
154 .map(|entry| entry.context("failed to lookup directory entry"))
155 .try_for_each_concurrent(None, |entry| async move {
156 let ty = entry
157 .file_type()
158 .await
159 .context("failed to lookup directory entry type")?;
160 let path = entry.path();
161 if ty.is_dir() {
162 fs::remove_dir_all(&path).await.with_context(|| {
163 format!("failed to remove directory at `{}`", path.display())
164 })?;
165 } else {
166 fs::remove_file(&path).await.with_context(|| {
167 format!("failed to remove file at `{}`", path.display())
168 })?;
169 }
170 Ok(())
171 })
172 .await
173 .context("failed to remove directory contents")
174 }
175 .await
176 .map_err(|err| format!("{err:#}")))
177 }
178
179 #[instrument(level = "trace", skip(self))]
180 async fn container_exists(
181 &self,
182 cx: Option<Context>,
183 name: String,
184 ) -> anyhow::Result<Result<bool, String>> {
185 Ok(async {
186 propagate_trace_for_ctx!(cx);
187 let path = self.get_container(cx, name).await?;
188 fs::try_exists(path)
189 .await
190 .context("failed to check if path exists")
191 }
192 .await
193 .map_err(|err| format!("{err:#}")))
194 }
195
196 #[instrument(level = "trace", skip(self))]
197 async fn create_container(
198 &self,
199 cx: Option<Context>,
200 name: String,
201 ) -> anyhow::Result<Result<(), String>> {
202 Ok(async {
203 propagate_trace_for_ctx!(cx);
204 let path = self.get_container(cx, name).await?;
205 fs::create_dir_all(path)
206 .await
207 .context("failed to create path")
208 }
209 .await
210 .map_err(|err| format!("{err:#}")))
211 }
212
213 #[instrument(level = "trace", skip(self))]
214 async fn delete_container(
215 &self,
216 cx: Option<Context>,
217 name: String,
218 ) -> anyhow::Result<Result<(), String>> {
219 Ok(async {
220 propagate_trace_for_ctx!(cx);
221 let path = self.get_container(cx, name).await?;
222 fs::remove_dir_all(path)
223 .await
224 .context("failed to remove path")
225 }
226 .await
227 .map_err(|err| format!("{err:#}")))
228 }
229
230 #[instrument(level = "trace", skip(self))]
231 async fn get_container_info(
232 &self,
233 cx: Option<Context>,
234 name: String,
235 ) -> anyhow::Result<Result<ContainerMetadata, String>> {
236 Ok(async {
237 propagate_trace_for_ctx!(cx);
238 let path = self.get_container(cx, name).await?;
239 let md = fs::metadata(&path)
240 .await
241 .context("failed to lookup directory metadata")?;
242
243 let created_at = match md.created() {
244 Ok(created_time) => created_time
245 .duration_since(SystemTime::UNIX_EPOCH)
246 .context("creation time before Unix epoch")?,
247 Err(e) => {
248 debug!(
250 error = ?e,
251 ?path,
252 "failed to get creation time for container, defaulting to 0"
253 );
254 Duration::from_secs(0)
255 }
256 };
257 anyhow::Ok(ContainerMetadata {
260 created_at: created_at.as_secs(),
261 })
262 }
263 .await
264 .map_err(|err| format!("{err:#}")))
265 }
266
267 #[instrument(level = "trace", skip(self))]
268 async fn list_container_objects(
269 &self,
270 cx: Option<Context>,
271 name: String,
272 limit: Option<u64>,
273 offset: Option<u64>,
274 ) -> anyhow::Result<
275 Result<
276 (
277 Pin<Box<dyn Stream<Item = Vec<String>> + Send>>,
278 Pin<Box<dyn Future<Output = Result<(), String>> + Send>>,
279 ),
280 String,
281 >,
282 > {
283 Ok(async {
284 propagate_trace_for_ctx!(cx);
285 let path = self.get_container(cx, name).await?;
286 let offset = offset.unwrap_or_default().try_into().unwrap_or(usize::MAX);
287 let limit = limit.unwrap_or(u64::MAX).try_into().unwrap_or(usize::MAX);
288 debug!(path = ?path.display(), offset, limit, "read directory");
289 let dir = fs::read_dir(path).await.context("failed to read path")?;
290 let mut names = ReadDirStream::new(dir)
291 .skip(offset)
292 .take(limit)
293 .map(move |entry| {
294 let entry = entry.context("failed to lookup directory entry")?;
295 let name = entry.file_name().to_string_lossy().to_string();
296 trace!(name, "list file name");
297 anyhow::Ok(name)
298 });
299 let (tx, rx) = mpsc::channel(16);
300 anyhow::Ok((
301 Box::pin(ReceiverStream::new(rx).ready_chunks(128))
302 as Pin<Box<dyn Stream<Item = _> + Send>>,
303 Box::pin(async move {
304 async move {
305 while let Some(name) = names.next().await {
306 let name = name.context("failed to list file names")?;
307 tx.send(name).await.context("stream receiver closed")?;
308 }
309 anyhow::Ok(())
310 }
311 .await
312 .map_err(|err| format!("{err:#}"))
313 }) as Pin<Box<dyn Future<Output = _> + Send>>,
314 ))
315 }
316 .await
317 .map_err(|err| format!("{err:#}")))
318 }
319
320 #[instrument(level = "trace", skip(self))]
321 async fn copy_object(
322 &self,
323 cx: Option<Context>,
324 src: ObjectId,
325 dest: ObjectId,
326 ) -> anyhow::Result<Result<(), String>> {
327 Ok(async {
328 propagate_trace_for_ctx!(cx);
329 let root = self.get_root(cx).await.context("failed to get root")?;
330 let src_container = resolve_subpath(&root, src.container)
331 .context("failed to resolve source container path")?;
332 let src = resolve_subpath(&src_container, src.object)
333 .context("failed to resolve source object path")?;
334
335 let dest_container = resolve_subpath(&root, dest.container)
336 .context("failed to resolve destination container path")?;
337 let dest = resolve_subpath(&dest_container, dest.object)
338 .context("failed to resolve destination object path")?;
339 debug!("copy `{}` to `{}`", src.display(), dest.display());
340 fs::copy(src, dest).await.context("failed to copy")?;
341 anyhow::Ok(())
342 }
343 .await
344 .map_err(|err| format!("{err:#}")))
345 }
346
347 #[instrument(level = "trace", skip(self))]
348 async fn delete_object(
349 &self,
350 cx: Option<Context>,
351 id: ObjectId,
352 ) -> anyhow::Result<Result<(), String>> {
353 Ok(async {
354 propagate_trace_for_ctx!(cx);
355 let path = self.get_object(cx, id).await?;
356 debug!("remove file at `{}`", path.display());
357 match fs::remove_file(&path).await {
358 Ok(()) => Ok(()),
359 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
360 Err(err) => {
361 Err(anyhow!(err)
362 .context(format!("failed to remove file at `{}`", path.display())))
363 }
364 }
365 }
366 .await
367 .map_err(|err| format!("{err:#}")))
368 }
369
370 #[instrument(level = "trace", skip(self))]
371 async fn delete_objects(
372 &self,
373 cx: Option<Context>,
374 container: String,
375 objects: Vec<String>,
376 ) -> anyhow::Result<Result<(), String>> {
377 Ok(async {
378 propagate_trace_for_ctx!(cx);
379 let container = self.get_container(cx, container).await?;
380 for name in objects {
381 let path =
382 resolve_subpath(&container, name).context("failed to resolve object path")?;
383 debug!("remove file at `{}`", path.display());
384 match fs::remove_file(&path).await {
385 Ok(()) => Ok(()),
386 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
387 Err(err) => Err(anyhow!(err)
388 .context(format!("failed to remove file at `{}`", path.display()))),
389 }?;
390 }
391 anyhow::Ok(())
392 }
393 .await
394 .map_err(|err| format!("{err:#}")))
395 }
396
397 #[instrument(level = "trace", skip(self))]
398 async fn get_container_data(
399 &self,
400 cx: Option<Context>,
401 id: ObjectId,
402 start: u64,
403 end: u64,
404 ) -> anyhow::Result<
405 Result<
406 (
407 Pin<Box<dyn Stream<Item = Bytes> + Send>>,
408 Pin<Box<dyn Future<Output = Result<(), String>> + Send>>,
409 ),
410 String,
411 >,
412 > {
413 Ok(async {
414 propagate_trace_for_ctx!(cx);
415 let limit = end
416 .checked_sub(start)
417 .context("`end` must be greater than `start`")?;
418 let path = self.get_object(cx, id).await?;
419 debug!(path = ?path.display(), "open file");
420 let mut object = File::open(&path)
421 .await
422 .with_context(|| format!("failed to open object file [{}]", path.display()))?;
423 if start > 0 {
424 debug!("seek file");
425 object
426 .seek(SeekFrom::Start(start))
427 .await
428 .context("failed to seek from start")?;
429 }
430 let mut data = ReaderStream::new(object.take(limit));
431 let (tx, rx) = mpsc::channel(16);
432 anyhow::Ok((
433 Box::pin(ReceiverStream::new(rx)) as Pin<Box<dyn Stream<Item = _> + Send>>,
434 Box::pin(async move {
435 async move {
436 while let Some(buf) = data.next().await {
437 let buf = buf.context("failed to read file")?;
438 debug!(?buf, "sending chunk");
439 tx.send(buf).await.context("stream receiver closed")?;
440 }
441 debug!("finished reading file");
442 anyhow::Ok(())
443 }
444 .await
445 .map_err(|err| format!("{err:#}"))
446 }) as Pin<Box<dyn Future<Output = _> + Send>>,
447 ))
448 }
449 .await
450 .map_err(|err| format!("{err:#}")))
451 }
452
453 #[instrument(level = "trace", skip(self))]
454 async fn get_object_info(
455 &self,
456 cx: Option<Context>,
457 id: ObjectId,
458 ) -> anyhow::Result<Result<ObjectMetadata, String>> {
459 Ok(async {
460 propagate_trace_for_ctx!(cx);
461 let path = self.get_object(cx, id).await?;
462 let md = fs::metadata(&path)
463 .await
464 .context("failed to lookup file metadata")?;
465
466 let created_at = match md.created() {
467 Ok(created_time) => created_time
468 .duration_since(SystemTime::UNIX_EPOCH)
469 .context("creation time before Unix epoch")?,
470 Err(e) => {
471 debug!(
473 error = ?e,
474 ?path,
475 "failed to get creation time for object, defaulting to 0"
476 );
477 Duration::from_secs(0)
478 }
479 };
480 #[cfg(unix)]
483 let size = std::os::unix::fs::MetadataExt::size(&md);
484 #[cfg(windows)]
485 let size = std::os::windows::fs::MetadataExt::file_size(&md);
486 anyhow::Ok(ObjectMetadata {
487 created_at: created_at.as_secs(),
488 size,
489 })
490 }
491 .await
492 .map_err(|err| format!("{err:#}")))
493 }
494
495 #[instrument(level = "trace", skip(self))]
496 async fn has_object(
497 &self,
498 cx: Option<Context>,
499 id: ObjectId,
500 ) -> anyhow::Result<Result<bool, String>> {
501 Ok(async {
502 propagate_trace_for_ctx!(cx);
503 let path = self.get_object(cx, id).await?;
504 fs::try_exists(path)
505 .await
506 .context("failed to check if path exists")
507 }
508 .await
509 .map_err(|err| format!("{err:#}")))
510 }
511
512 #[instrument(level = "trace", skip(self))]
513 async fn move_object(
514 &self,
515 cx: Option<Context>,
516 src: ObjectId,
517 dest: ObjectId,
518 ) -> anyhow::Result<Result<(), String>> {
519 Ok(async {
520 propagate_trace_for_ctx!(cx);
521 let root = self.get_root(cx).await.context("failed to get root")?;
522 let src_container = resolve_subpath(&root, src.container)
523 .context("failed to resolve source container path")?;
524 let src = resolve_subpath(&src_container, src.object)
525 .context("failed to resolve source object path")?;
526
527 let dest_container = resolve_subpath(&root, dest.container)
528 .context("failed to resolve destination container path")?;
529 let dest = resolve_subpath(&dest_container, dest.object)
530 .context("failed to resolve destination object path")?;
531 debug!("copy `{}` to `{}`", src.display(), dest.display());
532 fs::copy(&src, dest).await.context("failed to copy")?;
533 debug!("remove `{}`", src.display());
534 fs::remove_file(src)
535 .await
536 .context("failed to remove source")
537 }
538 .await
539 .map_err(|err| format!("{err:#}")))
540 }
541
542 #[instrument(level = "trace", skip(self, data))]
543 async fn write_container_data(
544 &self,
545 cx: Option<Context>,
546 id: ObjectId,
547 data: Pin<Box<dyn Stream<Item = Bytes> + Send>>,
548 ) -> anyhow::Result<Result<Pin<Box<dyn Future<Output = Result<(), String>> + Send>>, String>>
549 {
550 Ok(async {
551 propagate_trace_for_ctx!(cx);
552 let path = self.get_object(cx, id).await?;
553 if let Some(parent) = path.parent() {
554 info!(parent = ?parent.display(), "creating directory");
555 fs::create_dir_all(parent)
556 .await
557 .context("failed to create parent directories")?;
558 }
559 let mut file = File::options()
560 .create(true)
561 .truncate(true)
562 .write(true)
563 .open(&path)
564 .await
565 .context("failed to open file")?;
566 anyhow::Ok(Box::pin(async move {
567 debug!(path = ?path.display(), "streaming data to file");
568 let n = io::copy(
569 &mut StreamReader::new(data.map(|chunk| {
570 trace!(?chunk, "received data chunk");
571 std::io::Result::Ok(chunk)
572 })),
573 &mut file,
574 )
575 .await
576 .context("failed to write file")
577 .map_err(|err| format!("{err:#}"))?;
578 debug!(n, path = ?path.display(), "finished writing file");
579 Ok(())
580 }) as Pin<Box<dyn Future<Output = _> + Send>>)
581 }
582 .await
583 .map_err(|err| format!("{err:#}")))
584 }
585}
586
587impl Provider for FsProvider {
588 async fn receive_link_config_as_target(
590 &self,
591 LinkConfig {
592 source_id, config, ..
593 }: LinkConfig<'_>,
594 ) -> anyhow::Result<()> {
595 for (k, v) in config {
596 info!("link definition configuration [{k}] set to [{v}]");
597 }
598
599 let root_val: PathBuf = match config.iter().find(|(key, _)| key.to_uppercase() == "ROOT") {
601 None => {
602 let root = std::env::temp_dir();
604 match resolve_subpath(&root, source_id) {
606 Ok(path) => path,
607 Err(e) => {
608 error!("Failed to resolve subpath to component directory: {e}");
609 return Err(
610 anyhow!(e).context("failed to resolve subpath to component dir")
611 );
612 }
613 }
614 }
615 Some((_, value)) => value.into(),
617 };
618
619 if let Err(e) = create_dir_all(&root_val).await {
621 error!("Could not create component directory: {:?}", e);
622 return Err(anyhow!(e).context("failed to create component directory"));
623 }
624
625 let config = FsProviderConfig {
627 root: Arc::new(root_val.clean()),
628 };
629
630 info!("Saved FsProviderConfig: {:#?}", config);
631 info!(
632 "File System Blob Store Container Root: '{:?}'",
633 &config.root
634 );
635
636 self.config
638 .write()
639 .await
640 .insert(source_id.into(), config.clone());
641
642 Ok(())
643 }
644
645 #[instrument(level = "info", skip_all, fields(source_id = info.get_source_id()))]
646 async fn delete_link_as_target(&self, info: impl LinkDeleteInfo) -> anyhow::Result<()> {
647 let component_id = info.get_source_id();
648 self.config.write().await.remove(component_id);
649 Ok(())
650 }
651
652 async fn shutdown(&self) -> anyhow::Result<()> {
653 self.config.write().await.drain();
654 Ok(())
655 }
656}
657
658#[cfg(test)]
659mod tests {
660 use super::*;
661 use futures::stream;
662 use tempfile::tempdir;
663 use wrpc_interface_blobstore::bindings::exports::wrpc::blobstore::blobstore::Handler;
664
665 #[tokio::test]
667 async fn resolve_safe_samepath() {
668 assert!(resolve_subpath(&PathBuf::from("./"), "./././").is_ok());
669 }
670
671 #[tokio::test]
673 async fn resolve_fail_ancestor() {
674 let res = resolve_subpath(&PathBuf::from("./"), "../").unwrap_err();
675 assert_eq!(res.kind(), std::io::ErrorKind::PermissionDenied);
676 }
677
678 #[tokio::test]
679 async fn test_write_container_data() {
680 let temp_dir = tempdir().unwrap();
682 let root_path = temp_dir.path().to_path_buf();
683
684 let config = Arc::new(RwLock::new(HashMap::new()));
686 config.write().await.insert(
687 "test_source".to_string(),
688 FsProviderConfig {
689 root: Arc::new(root_path.clone()),
690 },
691 );
692 let provider = FsProvider { config };
693
694 let context = Some(Context {
696 component: Some("test_source".to_string()),
697 ..Default::default()
698 });
699 let object_id = ObjectId {
700 container: "test_container".to_string(),
701 object: "test_object/with_slash.txt".to_string(),
702 };
703
704 let data = stream::iter(vec![Ok(Bytes::from("Hello, ")), Ok(Bytes::from("world!"))])
706 .map(|result: Result<Bytes, std::io::Error>| result.unwrap());
707
708 let result = provider
710 .write_container_data(context, object_id, Box::pin(data))
711 .await;
712
713 assert!(result.is_ok());
715
716 let write_future = result.unwrap().unwrap();
718 let write_result = write_future.await;
719
720 assert!(write_result.is_ok());
722
723 let file_path = root_path.join("test_container/test_object/with_slash.txt");
725 let contents = tokio::fs::read_to_string(file_path).await.unwrap();
727 assert_eq!(contents, "Hello, world!");
728 }
729}