wasmcloud_provider_blobstore_s3/
lib.rs

1#![allow(clippy::type_complexity)]
2
3//! blobstore-s3 capability provider
4//!
5//! This capability provider exposes [S3](https://aws.amazon.com/s3/)-compatible object storage
6//! (AKA "blob store") as a [wasmcloud capability](https://wasmcloud.com/docs/concepts/capabilities) which
7//! can be used by actors on your lattice.
8//!
9
10use core::future::Future;
11use core::pin::Pin;
12use core::str::FromStr;
13
14use std::collections::HashMap;
15use std::env;
16use std::sync::Arc;
17
18use anyhow::{anyhow, bail, Context as _, Result};
19use aws_config::default_provider::credentials::DefaultCredentialsChain;
20use aws_config::default_provider::region::DefaultRegionChain;
21use aws_config::retry::RetryConfig;
22use aws_config::sts::AssumeRoleProvider;
23use aws_sdk_s3::config::{Region, SharedCredentialsProvider};
24use aws_sdk_s3::error::{ProvideErrorMetadata, SdkError};
25use aws_sdk_s3::operation::create_bucket::{CreateBucketError, CreateBucketOutput};
26use aws_sdk_s3::operation::get_object::GetObjectOutput;
27use aws_sdk_s3::operation::head_bucket::HeadBucketError;
28use aws_sdk_s3::operation::head_object::{HeadObjectError, HeadObjectOutput};
29use aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output;
30use aws_sdk_s3::types::{
31    BucketLocationConstraint, CreateBucketConfiguration, Delete, Object, ObjectIdentifier,
32};
33use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder;
34use base64::Engine as _;
35use bytes::{Bytes, BytesMut};
36use futures::{stream, Stream, StreamExt as _};
37use serde::Deserialize;
38use tokio::io::AsyncReadExt as _;
39use tokio::sync::{mpsc, RwLock};
40use tokio_stream::wrappers::ReceiverStream;
41use tokio_util::io::ReaderStream;
42use tracing::{debug, error, instrument, warn};
43use wasmcloud_provider_sdk::core::secrets::SecretValue;
44use wasmcloud_provider_sdk::core::tls;
45use wasmcloud_provider_sdk::{
46    get_connection, initialize_observability, propagate_trace_for_ctx, run_provider,
47    serve_provider_exports, Context, LinkConfig, LinkDeleteInfo, Provider,
48};
49use wrpc_interface_blobstore::bindings::{
50    exports::wrpc::blobstore::blobstore::Handler,
51    serve,
52    wrpc::blobstore::types::{ContainerMetadata, ObjectId, ObjectMetadata},
53};
54
55const ALIAS_PREFIX: &str = "alias_";
56const DEFAULT_STS_SESSION: &str = "blobstore_s3_provider";
57
58/// Configuration for connecting to S3-compatible storage
59///
60/// This value is meant to be parsed from link configuration, and can
61/// represent any S3-compatible storage (excluding AWS-specific things like STS)
62///
63/// NOTE that when storage config is provided via link configuration
64#[derive(Clone, Debug, Default, Deserialize)]
65pub struct StorageConfig {
66    /// AWS_ACCESS_KEY_ID, can be specified from environment
67    pub access_key_id: Option<String>,
68    /// AWS_SECRET_ACCESS_KEY, can be in environment
69    pub secret_access_key: Option<String>,
70    /// Session Token
71    pub session_token: Option<String>,
72    /// AWS_REGION
73    pub region: Option<String>,
74    /// override default max_attempts (3) for retries
75    pub max_attempts: Option<u32>,
76    /// optional configuration for STS Assume Role
77    pub sts_config: Option<StsAssumeRoleConfig>,
78    /// optional override for the AWS endpoint
79    pub endpoint: Option<String>,
80    /// optional map of bucket aliases to names
81    #[serde(default)]
82    pub aliases: HashMap<String, String>,
83    /// Region in which buckets will be created
84    pub bucket_region: Option<String>,
85}
86
87#[derive(Clone, Debug, Default, Deserialize)]
88pub struct StsAssumeRoleConfig {
89    /// Role to assume (AWS_ASSUME_ROLE_ARN)
90    /// Should be in the form "arn:aws:iam::123456789012:role/example"
91    pub role: String,
92    /// AWS Region for using sts, not for S3
93    pub region: Option<String>,
94    /// Optional Session name
95    pub session: Option<String>,
96    /// Optional external id
97    pub external_id: Option<String>,
98}
99
100impl StorageConfig {
101    /// initialize from linkdef values
102    pub async fn from_link_config(
103        LinkConfig {
104            config, secrets, ..
105        }: &LinkConfig<'_>,
106    ) -> Result<StorageConfig> {
107        let mut storage_config = if let Some(config_b64) = secrets
108            .get("config_b64")
109            .and_then(SecretValue::as_string)
110            .or_else(|| config.get("config_b64").map(String::as_str))
111        {
112            if secrets.get("config_b64").is_none() {
113                warn!("secret value [config_b64] was not found, but present in configuration. Please prefer using secrets for sensitive values.");
114            }
115            let bytes = base64::engine::general_purpose::STANDARD
116                .decode(config_b64.as_bytes())
117                .context("invalid base64 encoding")?;
118            serde_json::from_slice::<StorageConfig>(&bytes).context("corrupt config_b64")?
119        } else if let Some(encoded) = secrets
120            .get("config_json")
121            .and_then(SecretValue::as_string)
122            .or_else(|| config.get("config_json").map(String::as_str))
123        {
124            if secrets.get("config_json").is_none() {
125                warn!("secret value [config_json] was not found, but was present in configuration. Please prefer using secrets for sensitive values.");
126            }
127            serde_json::from_str::<StorageConfig>(encoded).context("corrupt config_json")?
128        } else {
129            StorageConfig::default()
130        };
131
132        // If a top level BUCKET_REGION was specified config, use it
133        if let Some(region) = config.get("BUCKET_REGION") {
134            storage_config.bucket_region = Some(region.into());
135        }
136
137        if let Ok(arn) = env::var("AWS_ROLE_ARN") {
138            let mut sts_config = storage_config.sts_config.unwrap_or_default();
139            sts_config.role = arn;
140            if let Ok(region) = env::var("AWS_ROLE_REGION") {
141                sts_config.region = Some(region);
142            }
143            if let Ok(session) = env::var("AWS_ROLE_SESSION_NAME") {
144                sts_config.session = Some(session);
145            }
146            if let Ok(external_id) = env::var("AWS_ROLE_EXTERNAL_ID") {
147                sts_config.external_id = Some(external_id);
148            }
149            storage_config.sts_config = Some(sts_config);
150        }
151
152        if let Ok(endpoint) = env::var("AWS_ENDPOINT") {
153            storage_config.endpoint = Some(endpoint);
154        }
155
156        // aliases are added from linkdefs in StorageClient::new()
157        Ok(storage_config)
158    }
159}
160
161#[derive(Clone)]
162pub struct StorageClient {
163    s3_client: aws_sdk_s3::Client,
164    aliases: Arc<HashMap<String, String>>,
165    /// Preferred region for bucket creation
166    bucket_region: Option<BucketLocationConstraint>,
167}
168
169impl StorageClient {
170    pub async fn new(
171        StorageConfig {
172            access_key_id,
173            secret_access_key,
174            session_token,
175            region,
176            max_attempts,
177            sts_config,
178            endpoint,
179            mut aliases,
180            bucket_region,
181        }: StorageConfig,
182        config_values: &HashMap<String, String>,
183    ) -> Self {
184        let region = match region {
185            Some(region) => Some(Region::new(region)),
186            _ => DefaultRegionChain::builder().build().region().await,
187        };
188
189        // use static credentials or defaults from environment
190        let mut cred_provider = match (access_key_id, secret_access_key) {
191            (Some(access_key_id), Some(secret_access_key)) => {
192                SharedCredentialsProvider::new(aws_sdk_s3::config::Credentials::new(
193                    access_key_id,
194                    secret_access_key,
195                    session_token,
196                    None,
197                    "static",
198                ))
199            }
200            _ => SharedCredentialsProvider::new(
201                DefaultCredentialsChain::builder()
202                    .region(region.clone())
203                    .build()
204                    .await,
205            ),
206        };
207        if let Some(StsAssumeRoleConfig {
208            role,
209            region,
210            session,
211            external_id,
212        }) = sts_config
213        {
214            let mut role = AssumeRoleProvider::builder(role)
215                .session_name(session.unwrap_or_else(|| DEFAULT_STS_SESSION.to_string()));
216            if let Some(region) = region {
217                role = role.region(Region::new(region));
218            }
219            if let Some(external_id) = external_id {
220                role = role.external_id(external_id);
221            }
222            cred_provider = SharedCredentialsProvider::new(role.build().await);
223        }
224
225        let mut retry_config = RetryConfig::standard();
226        if let Some(max_attempts) = max_attempts {
227            retry_config = retry_config.with_max_attempts(max_attempts);
228        }
229        let mut loader = aws_config::defaults(aws_config::BehaviorVersion::v2025_08_07())
230            .region(region)
231            .credentials_provider(cred_provider)
232            .retry_config(retry_config);
233        if let Some(endpoint) = endpoint {
234            loader = loader.endpoint_url(endpoint);
235        };
236        let s3_client = aws_sdk_s3::Client::from_conf(
237            aws_sdk_s3::Config::from(&loader.load().await)
238                .to_builder()
239                // Since minio requires force path style,
240                // turn it on since it's disabled by default
241                // due to deprecation by AWS.
242                // https://github.com/awslabs/aws-sdk-rust/issues/390
243                .force_path_style(true)
244                .http_client(
245                    HyperClientBuilder::new().build(
246                        hyper_rustls::HttpsConnectorBuilder::new()
247                            .with_tls_config(
248                                // use `tls::DEFAULT_CLIENT_CONFIG` directly once `rustls` versions
249                                // are in sync
250                                rustls::ClientConfig::builder()
251                                    .with_root_certificates(rustls::RootCertStore {
252                                        roots: tls::DEFAULT_ROOTS.roots.clone(),
253                                    })
254                                    .with_no_client_auth(),
255                            )
256                            .https_or_http()
257                            .enable_all_versions()
258                            .build(),
259                    ),
260                )
261                .build(),
262        );
263
264        // Process aliases
265        for (k, v) in config_values {
266            if let Some(alias) = k.strip_prefix(ALIAS_PREFIX) {
267                if alias.is_empty() || v.is_empty() {
268                    error!("invalid bucket alias_ key and value must not be empty");
269                } else {
270                    aliases.insert(alias.to_string(), v.to_string());
271                }
272            }
273        }
274
275        StorageClient {
276            s3_client,
277            aliases: Arc::new(aliases),
278            bucket_region: bucket_region.and_then(|v| BucketLocationConstraint::from_str(&v).ok()),
279        }
280    }
281
282    /// perform alias lookup on bucket name
283    /// This can be used either for giving shortcuts to actors in the linkdefs, for example:
284    /// - component could use bucket names `alias_today`, `alias_images`, etc. and the linkdef aliases
285    ///   will remap them to the real bucket name
286    ///
287    /// The `'alias_'` prefix is not required, so this also works as a general redirect capability
288    pub fn unalias<'n, 's: 'n>(&'s self, bucket_or_alias: &'n str) -> &'n str {
289        debug!(%bucket_or_alias, aliases = ?self.aliases);
290        let name = bucket_or_alias
291            .strip_prefix(ALIAS_PREFIX)
292            .unwrap_or(bucket_or_alias);
293        if let Some(name) = self.aliases.get(name) {
294            name.as_ref()
295        } else {
296            name
297        }
298    }
299
300    /// Check whether a container exists
301    #[instrument(level = "debug", skip(self))]
302    pub async fn container_exists(&self, bucket: &str) -> anyhow::Result<bool> {
303        match self.s3_client.head_bucket().bucket(bucket).send().await {
304            Ok(_) => Ok(true),
305            Err(se) => match se.into_service_error() {
306                HeadBucketError::NotFound(_) => Ok(false),
307                err => {
308                    error!(?err, code = err.code(), "Unable to head bucket");
309                    bail!(anyhow!(err).context("failed to `head` bucket"))
310                }
311            },
312        }
313    }
314
315    /// Create a bucket
316    #[instrument(level = "debug", skip(self))]
317    pub async fn create_container(&self, bucket: &str) -> anyhow::Result<()> {
318        let mut builder = self.s3_client.create_bucket();
319
320        // Only add BucketLocationConstraint if bucket_region was set.
321        if let Some(bucket_region) = &self.bucket_region {
322            // Build bucket config, using location constraint if necessary
323            let bucket_config = CreateBucketConfiguration::builder()
324                .set_location_constraint(Some(bucket_region.clone()))
325                .build();
326
327            builder = builder.create_bucket_configuration(bucket_config);
328        }
329
330        match builder.bucket(bucket).send().await {
331            Ok(CreateBucketOutput { location, .. }) => {
332                debug!(?location, "bucket created");
333                Ok(())
334            }
335            Err(se) => match se.into_service_error() {
336                CreateBucketError::BucketAlreadyOwnedByYou(..) => Ok(()),
337                err => {
338                    error!(?err, code = err.code(), "failed to create bucket");
339                    bail!(anyhow!(err).context("failed to create bucket"))
340                }
341            },
342        }
343    }
344
345    #[instrument(level = "debug", skip(self))]
346    pub async fn get_container_info(&self, bucket: &str) -> anyhow::Result<ContainerMetadata> {
347        match self.s3_client.head_bucket().bucket(bucket).send().await {
348            Ok(_) => Ok(ContainerMetadata {
349                // unfortunately, HeadBucketOut doesn't include any information
350                // so we can't fill in creation date
351                created_at: 0,
352            }),
353            Err(se) => match se.into_service_error() {
354                HeadBucketError::NotFound(_) => {
355                    error!("bucket [{bucket}] not found");
356                    bail!("bucket [{bucket}] not found")
357                }
358                err => {
359                    error!(?err, code = err.code(), "unexpected error");
360                    bail!(anyhow!(err).context("unexpected error"));
361                }
362            },
363        }
364    }
365
366    #[instrument(level = "debug", skip(self))]
367    pub async fn list_container_objects(
368        &self,
369        bucket: &str,
370        limit: Option<u64>,
371        offset: Option<u64>,
372    ) -> anyhow::Result<impl Iterator<Item = String>> {
373        // TODO: Stream names
374        match self
375            .s3_client
376            .list_objects_v2()
377            .bucket(bucket)
378            .set_max_keys(limit.map(|limit| limit.try_into().unwrap_or(i32::MAX)))
379            .send()
380            .await
381        {
382            Ok(ListObjectsV2Output { contents, .. }) => Ok(contents
383                .into_iter()
384                .flatten()
385                .filter_map(|Object { key, .. }| key)
386                .skip(offset.unwrap_or_default().try_into().unwrap_or(usize::MAX))
387                .take(limit.unwrap_or(u64::MAX).try_into().unwrap_or(usize::MAX))),
388            Err(SdkError::ServiceError(err)) => {
389                error!(?err, "service error");
390                bail!(anyhow!("{err:?}").context("service error"))
391            }
392            Err(err) => {
393                error!(%err, code = err.code(), "unexpected error");
394                bail!(anyhow!("{err:?}").context("unexpected error"))
395            }
396        }
397    }
398
399    #[instrument(level = "debug", skip(self))]
400    pub async fn copy_object(
401        &self,
402        src_bucket: &str,
403        src_key: &str,
404        dest_bucket: &str,
405        dest_key: &str,
406    ) -> anyhow::Result<()> {
407        self.s3_client
408            .copy_object()
409            .copy_source(format!("{src_bucket}/{src_key}"))
410            .bucket(dest_bucket)
411            .key(dest_key)
412            .send()
413            .await
414            .context("failed to copy object")?;
415        Ok(())
416    }
417
418    #[instrument(level = "debug", skip(self, object))]
419    pub async fn delete_object(&self, container: &str, object: String) -> anyhow::Result<()> {
420        self.s3_client
421            .delete_object()
422            .bucket(container)
423            .key(object)
424            .send()
425            .await
426            .context("failed to delete object")?;
427        Ok(())
428    }
429
430    #[instrument(level = "debug", skip(self, objects))]
431    pub async fn delete_objects(
432        &self,
433        container: &str,
434        objects: impl IntoIterator<Item = String>,
435    ) -> anyhow::Result<()> {
436        let objects: Vec<_> = objects
437            .into_iter()
438            .map(|key| ObjectIdentifier::builder().key(key).build())
439            .collect::<Result<_, _>>()
440            .context("failed to build object identifier list")?;
441        if objects.is_empty() {
442            debug!("no objects to delete, return");
443            return Ok(());
444        }
445        let delete = Delete::builder()
446            .set_objects(Some(objects))
447            .build()
448            .context("failed to build `delete_objects` command")?;
449        let out = self
450            .s3_client
451            .delete_objects()
452            .bucket(container)
453            .delete(delete)
454            .send()
455            .await
456            .context("failed to delete objects")?;
457        let errs = out.errors();
458        if !errs.is_empty() {
459            bail!("failed with errors {errs:?}")
460        }
461        Ok(())
462    }
463
464    #[instrument(level = "debug", skip(self))]
465    pub async fn delete_container(&self, bucket: &str) -> anyhow::Result<()> {
466        match self.s3_client.delete_bucket().bucket(bucket).send().await {
467            Ok(_) => Ok(()),
468            Err(SdkError::ServiceError(err)) => {
469                bail!("{err:?}")
470            }
471            Err(err) => {
472                error!(%err, code = err.code(), "unexpected error");
473                bail!(err)
474            }
475        }
476    }
477
478    /// Find out whether object exists
479    #[instrument(level = "debug", skip(self))]
480    pub async fn has_object(&self, bucket: &str, key: &str) -> anyhow::Result<bool> {
481        match self
482            .s3_client
483            .head_object()
484            .bucket(bucket)
485            .key(key)
486            .send()
487            .await
488        {
489            Ok(_) => Ok(true),
490            Err(se) => match se.into_service_error() {
491                HeadObjectError::NotFound(_) => Ok(false),
492                err => {
493                    error!(
494                        %err,
495                        code = err.code(),
496                        "unexpected error for object_exists"
497                    );
498                    bail!(anyhow!(err).context("unexpected error for object_exists"))
499                }
500            },
501        }
502    }
503
504    /// Retrieves metadata about the object
505    #[instrument(level = "debug", skip(self))]
506    pub async fn get_object_info(&self, bucket: &str, key: &str) -> anyhow::Result<ObjectMetadata> {
507        match self
508            .s3_client
509            .head_object()
510            .bucket(bucket)
511            .key(key)
512            .send()
513            .await
514        {
515            Ok(HeadObjectOutput { content_length, .. }) => {
516                Ok(ObjectMetadata {
517                    // NOTE: The `created_at` value is not reported by S3
518                    created_at: 0,
519                    size: content_length
520                        .and_then(|v| v.try_into().ok())
521                        .unwrap_or_default(),
522                })
523            }
524            Err(se) => match se.into_service_error() {
525                HeadObjectError::NotFound(_) => {
526                    error!("object [{bucket}/{key}] not found");
527                    bail!("object [{bucket}/{key}] not found")
528                }
529                err => {
530                    error!(
531                        ?err,
532                        code = err.code(),
533                        "get_object_metadata failed for object [{bucket}/{key}]"
534                    );
535                    bail!(anyhow!(err).context(format!(
536                        "get_object_metadata failed for object [{bucket}/{key}]"
537                    )))
538                }
539            },
540        }
541    }
542}
543
544/// Blobstore S3 provider
545///
546/// This struct will be the target of generated implementations (via wit-provider-bindgen)
547/// for the blobstore provider WIT contract
548#[derive(Default, Clone)]
549pub struct BlobstoreS3Provider {
550    /// Per-component storage for NATS connection clients
551    actors: Arc<RwLock<HashMap<String, StorageClient>>>,
552}
553
554pub async fn run() -> anyhow::Result<()> {
555    BlobstoreS3Provider::run().await
556}
557
558impl BlobstoreS3Provider {
559    pub async fn run() -> anyhow::Result<()> {
560        initialize_observability!(
561            "blobstore-s3-provider",
562            std::env::var_os("PROVIDER_BLOBSTORE_S3_FLAMEGRAPH_PATH")
563        );
564
565        let provider = Self::default();
566        let shutdown = run_provider(provider.clone(), "blobstore-s3-provider")
567            .await
568            .context("failed to run provider")?;
569        let connection = get_connection();
570        let wrpc = connection
571            .get_wrpc_client(connection.provider_key())
572            .await?;
573        serve_provider_exports(&wrpc, provider, shutdown, serve)
574            .await
575            .context("failed to serve provider exports")
576    }
577
578    /// Retrieve the per-component [`StorageClient`] for a given link context
579    async fn client(&self, context: Option<Context>) -> Result<StorageClient> {
580        if let Some(ref source_id) = context.and_then(|Context { component, .. }| component) {
581            self.actors
582                .read()
583                .await
584                .get(source_id)
585                .with_context(|| format!("failed to lookup {source_id} configuration"))
586                .cloned()
587        } else {
588            // TODO: Support a default here
589            bail!("failed to lookup invocation source ID")
590        }
591    }
592}
593
594impl Handler<Option<Context>> for BlobstoreS3Provider {
595    #[instrument(level = "trace", skip(self))]
596    async fn clear_container(
597        &self,
598        cx: Option<Context>,
599        name: String,
600    ) -> anyhow::Result<Result<(), String>> {
601        Ok(async {
602            propagate_trace_for_ctx!(cx);
603            let client = self.client(cx).await?;
604            let bucket = client.unalias(&name);
605            let objects = client
606                .list_container_objects(bucket, None, None)
607                .await
608                .context("failed to list container objects")?;
609            client.delete_objects(bucket, objects).await
610        }
611        .await
612        .map_err(|err| format!("{err:#}")))
613    }
614
615    #[instrument(level = "trace", skip(self))]
616    async fn container_exists(
617        &self,
618        cx: Option<Context>,
619        name: String,
620    ) -> anyhow::Result<Result<bool, String>> {
621        Ok(async {
622            propagate_trace_for_ctx!(cx);
623            let client = self.client(cx).await?;
624            client.container_exists(client.unalias(&name)).await
625        }
626        .await
627        .map_err(|err| format!("{err:#}")))
628    }
629
630    #[instrument(level = "trace", skip(self))]
631    async fn create_container(
632        &self,
633        cx: Option<Context>,
634        name: String,
635    ) -> anyhow::Result<Result<(), String>> {
636        Ok(async {
637            propagate_trace_for_ctx!(cx);
638            let client = self.client(cx).await?;
639            client.create_container(client.unalias(&name)).await
640        }
641        .await
642        .map_err(|err| format!("{err:#}")))
643    }
644
645    #[instrument(level = "trace", skip(self))]
646    async fn delete_container(
647        &self,
648        cx: Option<Context>,
649        name: String,
650    ) -> anyhow::Result<Result<(), String>> {
651        Ok(async {
652            propagate_trace_for_ctx!(cx);
653            let client = self.client(cx).await?;
654            client.delete_container(client.unalias(&name)).await
655        }
656        .await
657        .map_err(|err| format!("{err:#}")))
658    }
659
660    #[instrument(level = "trace", skip(self))]
661    async fn get_container_info(
662        &self,
663        cx: Option<Context>,
664        name: String,
665    ) -> anyhow::Result<Result<ContainerMetadata, String>> {
666        Ok(async {
667            propagate_trace_for_ctx!(cx);
668            let client = self.client(cx).await?;
669            client.get_container_info(client.unalias(&name)).await
670        }
671        .await
672        .map_err(|err| format!("{err:#}")))
673    }
674
675    #[instrument(level = "trace", skip(self))]
676    async fn list_container_objects(
677        &self,
678        cx: Option<Context>,
679        name: String,
680        limit: Option<u64>,
681        offset: Option<u64>,
682    ) -> anyhow::Result<
683        Result<
684            (
685                Pin<Box<dyn Stream<Item = Vec<String>> + Send>>,
686                Pin<Box<dyn Future<Output = Result<(), String>> + Send>>,
687            ),
688            String,
689        >,
690    > {
691        Ok(async {
692            propagate_trace_for_ctx!(cx);
693            let client = self.client(cx).await?;
694            let names = client
695                .list_container_objects(client.unalias(&name), limit, offset)
696                .await
697                .map(Vec::from_iter)?;
698            anyhow::Ok((
699                Box::pin(stream::iter([names])) as Pin<Box<dyn Stream<Item = _> + Send>>,
700                Box::pin(async move { Ok(()) }) as Pin<Box<dyn Future<Output = _> + Send>>,
701            ))
702        }
703        .await
704        .map_err(|err| format!("{err:#}")))
705    }
706
707    #[instrument(level = "trace", skip(self))]
708    async fn copy_object(
709        &self,
710        cx: Option<Context>,
711        src: ObjectId,
712        dest: ObjectId,
713    ) -> anyhow::Result<Result<(), String>> {
714        Ok(async {
715            propagate_trace_for_ctx!(cx);
716            let client = self.client(cx).await?;
717            let src_bucket = client.unalias(&src.container);
718            let dest_bucket = client.unalias(&dest.container);
719            client
720                .copy_object(src_bucket, &src.object, dest_bucket, &dest.object)
721                .await
722        }
723        .await
724        .map_err(|err| format!("{err:#}")))
725    }
726
727    #[instrument(level = "trace", skip(self))]
728    async fn delete_object(
729        &self,
730        cx: Option<Context>,
731        id: ObjectId,
732    ) -> anyhow::Result<Result<(), String>> {
733        Ok(async {
734            propagate_trace_for_ctx!(cx);
735            let client = self.client(cx).await?;
736            client
737                .delete_object(client.unalias(&id.container), id.object)
738                .await
739        }
740        .await
741        .map_err(|err| format!("{err:#}")))
742    }
743
744    #[instrument(level = "trace", skip(self))]
745    async fn delete_objects(
746        &self,
747        cx: Option<Context>,
748        container: String,
749        objects: Vec<String>,
750    ) -> anyhow::Result<Result<(), String>> {
751        Ok(async {
752            propagate_trace_for_ctx!(cx);
753            let client = self.client(cx).await?;
754            client
755                .delete_objects(client.unalias(&container), objects)
756                .await
757        }
758        .await
759        .map_err(|err| format!("{err:#}")))
760    }
761
762    #[instrument(level = "trace", skip(self))]
763    async fn get_container_data(
764        &self,
765        cx: Option<Context>,
766        id: ObjectId,
767        start: u64,
768        end: u64,
769    ) -> anyhow::Result<
770        Result<
771            (
772                Pin<Box<dyn Stream<Item = Bytes> + Send>>,
773                Pin<Box<dyn Future<Output = Result<(), String>> + Send>>,
774            ),
775            String,
776        >,
777    > {
778        Ok(async {
779            propagate_trace_for_ctx!(cx);
780            let limit = end
781                .checked_sub(start)
782                .context("`end` must be greater than `start`")?;
783            let client = self.client(cx).await?;
784            let bucket = client.unalias(&id.container);
785            let GetObjectOutput { body, .. } = client
786                .s3_client
787                .get_object()
788                .bucket(bucket)
789                .key(id.object)
790                .range(format!("bytes={start}-{end}"))
791                .send()
792                .await
793                .context("failed to get object")?;
794            let mut data = ReaderStream::new(body.into_async_read().take(limit));
795            let (tx, rx) = mpsc::channel(16);
796            anyhow::Ok((
797                Box::pin(ReceiverStream::new(rx)) as Pin<Box<dyn Stream<Item = _> + Send>>,
798                Box::pin(async move {
799                    while let Some(buf) = data.next().await {
800                        let buf = buf
801                            .context("failed to read object")
802                            .map_err(|err| format!("{err:#}"))?;
803                        if tx.send(buf).await.is_err() {
804                            return Err("stream receiver closed".to_string());
805                        }
806                    }
807                    Ok(())
808                }) as Pin<Box<dyn Future<Output = _> + Send>>,
809            ))
810        }
811        .await
812        .map_err(|err| format!("{err:#}")))
813    }
814
815    #[instrument(level = "trace", skip(self))]
816    async fn get_object_info(
817        &self,
818        cx: Option<Context>,
819        id: ObjectId,
820    ) -> anyhow::Result<Result<ObjectMetadata, String>> {
821        Ok(async {
822            propagate_trace_for_ctx!(cx);
823            let client = self.client(cx).await?;
824            client
825                .get_object_info(client.unalias(&id.container), &id.object)
826                .await
827        }
828        .await
829        .map_err(|err| format!("{err:#}")))
830    }
831
832    #[instrument(level = "trace", skip(self))]
833    async fn has_object(
834        &self,
835        cx: Option<Context>,
836        id: ObjectId,
837    ) -> anyhow::Result<Result<bool, String>> {
838        Ok(async {
839            propagate_trace_for_ctx!(cx);
840            let client = self.client(cx).await?;
841            client
842                .has_object(client.unalias(&id.container), &id.object)
843                .await
844        }
845        .await
846        .map_err(|err| format!("{err:#}")))
847    }
848
849    #[instrument(level = "trace", skip(self))]
850    async fn move_object(
851        &self,
852        cx: Option<Context>,
853        src: ObjectId,
854        dest: ObjectId,
855    ) -> anyhow::Result<Result<(), String>> {
856        Ok(async {
857            propagate_trace_for_ctx!(cx);
858            let client = self.client(cx).await?;
859            let src_bucket = client.unalias(&src.container);
860            let dest_bucket = client.unalias(&dest.container);
861            client
862                .copy_object(src_bucket, &src.object, dest_bucket, &dest.object)
863                .await
864                .context("failed to copy object")?;
865            client
866                .delete_object(src_bucket, src.object)
867                .await
868                .context("failed to delete source object")
869        }
870        .await
871        .map_err(|err| format!("{err:#}")))
872    }
873
874    #[instrument(level = "trace", skip(self, data))]
875    async fn write_container_data(
876        &self,
877        cx: Option<Context>,
878        id: ObjectId,
879        data: Pin<Box<dyn Stream<Item = Bytes> + Send>>,
880    ) -> anyhow::Result<Result<Pin<Box<dyn Future<Output = Result<(), String>> + Send>>, String>>
881    {
882        Ok(async {
883            propagate_trace_for_ctx!(cx);
884            let client = self.client(cx).await?;
885            let req = client
886                .s3_client
887                .put_object()
888                .bucket(client.unalias(&id.container))
889                .key(&id.object);
890            anyhow::Ok(Box::pin(async {
891                // TODO: Stream data to S3
892                let data: BytesMut = data.collect().await;
893                req.body(data.freeze().into())
894                    .send()
895                    .await
896                    .context("failed to put object")
897                    .map_err(|err| format!("{err:#}"))?;
898                Ok(())
899            }) as Pin<Box<dyn Future<Output = _> + Send>>)
900        }
901        .await
902        .map_err(|err| format!("{err:#}")))
903    }
904}
905
906/// Handle provider control commands
907/// `put_link` (new component link command), `del_link` (remove link command), and shutdown
908impl Provider for BlobstoreS3Provider {
909    /// Provider should perform any operations needed for a new link,
910    /// including setting up per-component resources, and checking authorization.
911    /// If the link is allowed, return true, otherwise return false to deny the link.
912    async fn receive_link_config_as_target(
913        &self,
914        link_config: LinkConfig<'_>,
915    ) -> anyhow::Result<()> {
916        // Build storage config
917        let config = match StorageConfig::from_link_config(&link_config).await {
918            Ok(v) => v,
919            Err(e) => {
920                error!(error = %e, %link_config.source_id, "failed to build storage config");
921                return Err(anyhow!(e).context("failed to build source config"));
922            }
923        };
924
925        let link = StorageClient::new(config, link_config.config).await;
926
927        let mut update_map = self.actors.write().await;
928        update_map.insert(link_config.source_id.to_string(), link);
929
930        Ok(())
931    }
932
933    /// Handle notification that a link is dropped: close the connection
934    #[instrument(level = "info", skip_all, fields(source_id = info.get_source_id()))]
935    async fn delete_link_as_target(&self, info: impl LinkDeleteInfo) -> anyhow::Result<()> {
936        let component_id = info.get_source_id();
937        let mut aw = self.actors.write().await;
938        aw.remove(component_id);
939        Ok(())
940    }
941
942    /// Handle shutdown request by closing all connections
943    async fn shutdown(&self) -> anyhow::Result<()> {
944        let mut aw = self.actors.write().await;
945        // empty the component link data and stop all servers
946        aw.drain();
947        Ok(())
948    }
949}
950
951#[cfg(test)]
952mod test {
953    use super::*;
954
955    #[tokio::test]
956    async fn aliases() {
957        let client = StorageClient::new(
958            StorageConfig::default(),
959            &HashMap::from([(format!("{ALIAS_PREFIX}foo"), "bar".into())]),
960        )
961        .await;
962
963        // no alias
964        assert_eq!(client.unalias("boo"), "boo");
965        // alias without prefix
966        assert_eq!(client.unalias("foo"), "bar");
967        // alias with prefix
968        assert_eq!(client.unalias(&format!("{ALIAS_PREFIX}foo")), "bar");
969        // undefined alias
970        assert_eq!(client.unalias(&format!("{ALIAS_PREFIX}baz")), "baz");
971    }
972}