1#![allow(clippy::type_complexity)]
2
3use core::future::Future;
11use core::pin::Pin;
12use core::str::FromStr;
13
14use std::collections::HashMap;
15use std::env;
16use std::sync::Arc;
17
18use anyhow::{anyhow, bail, Context as _, Result};
19use aws_config::default_provider::credentials::DefaultCredentialsChain;
20use aws_config::default_provider::region::DefaultRegionChain;
21use aws_config::retry::RetryConfig;
22use aws_config::sts::AssumeRoleProvider;
23use aws_sdk_s3::config::{Region, SharedCredentialsProvider};
24use aws_sdk_s3::error::{ProvideErrorMetadata, SdkError};
25use aws_sdk_s3::operation::create_bucket::{CreateBucketError, CreateBucketOutput};
26use aws_sdk_s3::operation::get_object::GetObjectOutput;
27use aws_sdk_s3::operation::head_bucket::HeadBucketError;
28use aws_sdk_s3::operation::head_object::{HeadObjectError, HeadObjectOutput};
29use aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output;
30use aws_sdk_s3::types::{
31 BucketLocationConstraint, CreateBucketConfiguration, Delete, Object, ObjectIdentifier,
32};
33use aws_smithy_runtime::client::http::hyper_014::HyperClientBuilder;
34use base64::Engine as _;
35use bytes::{Bytes, BytesMut};
36use futures::{stream, Stream, StreamExt as _};
37use serde::Deserialize;
38use tokio::io::AsyncReadExt as _;
39use tokio::sync::{mpsc, RwLock};
40use tokio_stream::wrappers::ReceiverStream;
41use tokio_util::io::ReaderStream;
42use tracing::{debug, error, instrument, warn};
43use wasmcloud_provider_sdk::core::secrets::SecretValue;
44use wasmcloud_provider_sdk::core::tls;
45use wasmcloud_provider_sdk::{
46 get_connection, initialize_observability, propagate_trace_for_ctx, run_provider,
47 serve_provider_exports, Context, LinkConfig, LinkDeleteInfo, Provider,
48};
49use wrpc_interface_blobstore::bindings::{
50 exports::wrpc::blobstore::blobstore::Handler,
51 serve,
52 wrpc::blobstore::types::{ContainerMetadata, ObjectId, ObjectMetadata},
53};
54
55const ALIAS_PREFIX: &str = "alias_";
56const DEFAULT_STS_SESSION: &str = "blobstore_s3_provider";
57
58#[derive(Clone, Debug, Default, Deserialize)]
65pub struct StorageConfig {
66 pub access_key_id: Option<String>,
68 pub secret_access_key: Option<String>,
70 pub session_token: Option<String>,
72 pub region: Option<String>,
74 pub max_attempts: Option<u32>,
76 pub sts_config: Option<StsAssumeRoleConfig>,
78 pub endpoint: Option<String>,
80 #[serde(default)]
82 pub aliases: HashMap<String, String>,
83 pub bucket_region: Option<String>,
85}
86
87#[derive(Clone, Debug, Default, Deserialize)]
88pub struct StsAssumeRoleConfig {
89 pub role: String,
92 pub region: Option<String>,
94 pub session: Option<String>,
96 pub external_id: Option<String>,
98}
99
100impl StorageConfig {
101 pub async fn from_link_config(
103 LinkConfig {
104 config, secrets, ..
105 }: &LinkConfig<'_>,
106 ) -> Result<StorageConfig> {
107 let mut storage_config = if let Some(config_b64) = secrets
108 .get("config_b64")
109 .and_then(SecretValue::as_string)
110 .or_else(|| config.get("config_b64").map(String::as_str))
111 {
112 if secrets.get("config_b64").is_none() {
113 warn!("secret value [config_b64] was not found, but present in configuration. Please prefer using secrets for sensitive values.");
114 }
115 let bytes = base64::engine::general_purpose::STANDARD
116 .decode(config_b64.as_bytes())
117 .context("invalid base64 encoding")?;
118 serde_json::from_slice::<StorageConfig>(&bytes).context("corrupt config_b64")?
119 } else if let Some(encoded) = secrets
120 .get("config_json")
121 .and_then(SecretValue::as_string)
122 .or_else(|| config.get("config_json").map(String::as_str))
123 {
124 if secrets.get("config_json").is_none() {
125 warn!("secret value [config_json] was not found, but was present in configuration. Please prefer using secrets for sensitive values.");
126 }
127 serde_json::from_str::<StorageConfig>(encoded).context("corrupt config_json")?
128 } else {
129 StorageConfig::default()
130 };
131
132 if let Some(region) = config.get("BUCKET_REGION") {
134 storage_config.bucket_region = Some(region.into());
135 }
136
137 if let Ok(arn) = env::var("AWS_ROLE_ARN") {
138 let mut sts_config = storage_config.sts_config.unwrap_or_default();
139 sts_config.role = arn;
140 if let Ok(region) = env::var("AWS_ROLE_REGION") {
141 sts_config.region = Some(region);
142 }
143 if let Ok(session) = env::var("AWS_ROLE_SESSION_NAME") {
144 sts_config.session = Some(session);
145 }
146 if let Ok(external_id) = env::var("AWS_ROLE_EXTERNAL_ID") {
147 sts_config.external_id = Some(external_id);
148 }
149 storage_config.sts_config = Some(sts_config);
150 }
151
152 if let Ok(endpoint) = env::var("AWS_ENDPOINT") {
153 storage_config.endpoint = Some(endpoint);
154 }
155
156 Ok(storage_config)
158 }
159}
160
161#[derive(Clone)]
162pub struct StorageClient {
163 s3_client: aws_sdk_s3::Client,
164 aliases: Arc<HashMap<String, String>>,
165 bucket_region: Option<BucketLocationConstraint>,
167}
168
169impl StorageClient {
170 pub async fn new(
171 StorageConfig {
172 access_key_id,
173 secret_access_key,
174 session_token,
175 region,
176 max_attempts,
177 sts_config,
178 endpoint,
179 mut aliases,
180 bucket_region,
181 }: StorageConfig,
182 config_values: &HashMap<String, String>,
183 ) -> Self {
184 let region = match region {
185 Some(region) => Some(Region::new(region)),
186 _ => DefaultRegionChain::builder().build().region().await,
187 };
188
189 let mut cred_provider = match (access_key_id, secret_access_key) {
191 (Some(access_key_id), Some(secret_access_key)) => {
192 SharedCredentialsProvider::new(aws_sdk_s3::config::Credentials::new(
193 access_key_id,
194 secret_access_key,
195 session_token,
196 None,
197 "static",
198 ))
199 }
200 _ => SharedCredentialsProvider::new(
201 DefaultCredentialsChain::builder()
202 .region(region.clone())
203 .build()
204 .await,
205 ),
206 };
207 if let Some(StsAssumeRoleConfig {
208 role,
209 region,
210 session,
211 external_id,
212 }) = sts_config
213 {
214 let mut role = AssumeRoleProvider::builder(role)
215 .session_name(session.unwrap_or_else(|| DEFAULT_STS_SESSION.to_string()));
216 if let Some(region) = region {
217 role = role.region(Region::new(region));
218 }
219 if let Some(external_id) = external_id {
220 role = role.external_id(external_id);
221 }
222 cred_provider = SharedCredentialsProvider::new(role.build().await);
223 }
224
225 let mut retry_config = RetryConfig::standard();
226 if let Some(max_attempts) = max_attempts {
227 retry_config = retry_config.with_max_attempts(max_attempts);
228 }
229 let mut loader = aws_config::defaults(aws_config::BehaviorVersion::v2025_08_07())
230 .region(region)
231 .credentials_provider(cred_provider)
232 .retry_config(retry_config);
233 if let Some(endpoint) = endpoint {
234 loader = loader.endpoint_url(endpoint);
235 };
236 let s3_client = aws_sdk_s3::Client::from_conf(
237 aws_sdk_s3::Config::from(&loader.load().await)
238 .to_builder()
239 .force_path_style(true)
244 .http_client(
245 HyperClientBuilder::new().build(
246 hyper_rustls::HttpsConnectorBuilder::new()
247 .with_tls_config(
248 rustls::ClientConfig::builder()
251 .with_root_certificates(rustls::RootCertStore {
252 roots: tls::DEFAULT_ROOTS.roots.clone(),
253 })
254 .with_no_client_auth(),
255 )
256 .https_or_http()
257 .enable_all_versions()
258 .build(),
259 ),
260 )
261 .build(),
262 );
263
264 for (k, v) in config_values {
266 if let Some(alias) = k.strip_prefix(ALIAS_PREFIX) {
267 if alias.is_empty() || v.is_empty() {
268 error!("invalid bucket alias_ key and value must not be empty");
269 } else {
270 aliases.insert(alias.to_string(), v.to_string());
271 }
272 }
273 }
274
275 StorageClient {
276 s3_client,
277 aliases: Arc::new(aliases),
278 bucket_region: bucket_region.and_then(|v| BucketLocationConstraint::from_str(&v).ok()),
279 }
280 }
281
282 pub fn unalias<'n, 's: 'n>(&'s self, bucket_or_alias: &'n str) -> &'n str {
289 debug!(%bucket_or_alias, aliases = ?self.aliases);
290 let name = bucket_or_alias
291 .strip_prefix(ALIAS_PREFIX)
292 .unwrap_or(bucket_or_alias);
293 if let Some(name) = self.aliases.get(name) {
294 name.as_ref()
295 } else {
296 name
297 }
298 }
299
300 #[instrument(level = "debug", skip(self))]
302 pub async fn container_exists(&self, bucket: &str) -> anyhow::Result<bool> {
303 match self.s3_client.head_bucket().bucket(bucket).send().await {
304 Ok(_) => Ok(true),
305 Err(se) => match se.into_service_error() {
306 HeadBucketError::NotFound(_) => Ok(false),
307 err => {
308 error!(?err, code = err.code(), "Unable to head bucket");
309 bail!(anyhow!(err).context("failed to `head` bucket"))
310 }
311 },
312 }
313 }
314
315 #[instrument(level = "debug", skip(self))]
317 pub async fn create_container(&self, bucket: &str) -> anyhow::Result<()> {
318 let mut builder = self.s3_client.create_bucket();
319
320 if let Some(bucket_region) = &self.bucket_region {
322 let bucket_config = CreateBucketConfiguration::builder()
324 .set_location_constraint(Some(bucket_region.clone()))
325 .build();
326
327 builder = builder.create_bucket_configuration(bucket_config);
328 }
329
330 match builder.bucket(bucket).send().await {
331 Ok(CreateBucketOutput { location, .. }) => {
332 debug!(?location, "bucket created");
333 Ok(())
334 }
335 Err(se) => match se.into_service_error() {
336 CreateBucketError::BucketAlreadyOwnedByYou(..) => Ok(()),
337 err => {
338 error!(?err, code = err.code(), "failed to create bucket");
339 bail!(anyhow!(err).context("failed to create bucket"))
340 }
341 },
342 }
343 }
344
345 #[instrument(level = "debug", skip(self))]
346 pub async fn get_container_info(&self, bucket: &str) -> anyhow::Result<ContainerMetadata> {
347 match self.s3_client.head_bucket().bucket(bucket).send().await {
348 Ok(_) => Ok(ContainerMetadata {
349 created_at: 0,
352 }),
353 Err(se) => match se.into_service_error() {
354 HeadBucketError::NotFound(_) => {
355 error!("bucket [{bucket}] not found");
356 bail!("bucket [{bucket}] not found")
357 }
358 err => {
359 error!(?err, code = err.code(), "unexpected error");
360 bail!(anyhow!(err).context("unexpected error"));
361 }
362 },
363 }
364 }
365
366 #[instrument(level = "debug", skip(self))]
367 pub async fn list_container_objects(
368 &self,
369 bucket: &str,
370 limit: Option<u64>,
371 offset: Option<u64>,
372 ) -> anyhow::Result<impl Iterator<Item = String>> {
373 match self
375 .s3_client
376 .list_objects_v2()
377 .bucket(bucket)
378 .set_max_keys(limit.map(|limit| limit.try_into().unwrap_or(i32::MAX)))
379 .send()
380 .await
381 {
382 Ok(ListObjectsV2Output { contents, .. }) => Ok(contents
383 .into_iter()
384 .flatten()
385 .filter_map(|Object { key, .. }| key)
386 .skip(offset.unwrap_or_default().try_into().unwrap_or(usize::MAX))
387 .take(limit.unwrap_or(u64::MAX).try_into().unwrap_or(usize::MAX))),
388 Err(SdkError::ServiceError(err)) => {
389 error!(?err, "service error");
390 bail!(anyhow!("{err:?}").context("service error"))
391 }
392 Err(err) => {
393 error!(%err, code = err.code(), "unexpected error");
394 bail!(anyhow!("{err:?}").context("unexpected error"))
395 }
396 }
397 }
398
399 #[instrument(level = "debug", skip(self))]
400 pub async fn copy_object(
401 &self,
402 src_bucket: &str,
403 src_key: &str,
404 dest_bucket: &str,
405 dest_key: &str,
406 ) -> anyhow::Result<()> {
407 self.s3_client
408 .copy_object()
409 .copy_source(format!("{src_bucket}/{src_key}"))
410 .bucket(dest_bucket)
411 .key(dest_key)
412 .send()
413 .await
414 .context("failed to copy object")?;
415 Ok(())
416 }
417
418 #[instrument(level = "debug", skip(self, object))]
419 pub async fn delete_object(&self, container: &str, object: String) -> anyhow::Result<()> {
420 self.s3_client
421 .delete_object()
422 .bucket(container)
423 .key(object)
424 .send()
425 .await
426 .context("failed to delete object")?;
427 Ok(())
428 }
429
430 #[instrument(level = "debug", skip(self, objects))]
431 pub async fn delete_objects(
432 &self,
433 container: &str,
434 objects: impl IntoIterator<Item = String>,
435 ) -> anyhow::Result<()> {
436 let objects: Vec<_> = objects
437 .into_iter()
438 .map(|key| ObjectIdentifier::builder().key(key).build())
439 .collect::<Result<_, _>>()
440 .context("failed to build object identifier list")?;
441 if objects.is_empty() {
442 debug!("no objects to delete, return");
443 return Ok(());
444 }
445 let delete = Delete::builder()
446 .set_objects(Some(objects))
447 .build()
448 .context("failed to build `delete_objects` command")?;
449 let out = self
450 .s3_client
451 .delete_objects()
452 .bucket(container)
453 .delete(delete)
454 .send()
455 .await
456 .context("failed to delete objects")?;
457 let errs = out.errors();
458 if !errs.is_empty() {
459 bail!("failed with errors {errs:?}")
460 }
461 Ok(())
462 }
463
464 #[instrument(level = "debug", skip(self))]
465 pub async fn delete_container(&self, bucket: &str) -> anyhow::Result<()> {
466 match self.s3_client.delete_bucket().bucket(bucket).send().await {
467 Ok(_) => Ok(()),
468 Err(SdkError::ServiceError(err)) => {
469 bail!("{err:?}")
470 }
471 Err(err) => {
472 error!(%err, code = err.code(), "unexpected error");
473 bail!(err)
474 }
475 }
476 }
477
478 #[instrument(level = "debug", skip(self))]
480 pub async fn has_object(&self, bucket: &str, key: &str) -> anyhow::Result<bool> {
481 match self
482 .s3_client
483 .head_object()
484 .bucket(bucket)
485 .key(key)
486 .send()
487 .await
488 {
489 Ok(_) => Ok(true),
490 Err(se) => match se.into_service_error() {
491 HeadObjectError::NotFound(_) => Ok(false),
492 err => {
493 error!(
494 %err,
495 code = err.code(),
496 "unexpected error for object_exists"
497 );
498 bail!(anyhow!(err).context("unexpected error for object_exists"))
499 }
500 },
501 }
502 }
503
504 #[instrument(level = "debug", skip(self))]
506 pub async fn get_object_info(&self, bucket: &str, key: &str) -> anyhow::Result<ObjectMetadata> {
507 match self
508 .s3_client
509 .head_object()
510 .bucket(bucket)
511 .key(key)
512 .send()
513 .await
514 {
515 Ok(HeadObjectOutput { content_length, .. }) => {
516 Ok(ObjectMetadata {
517 created_at: 0,
519 size: content_length
520 .and_then(|v| v.try_into().ok())
521 .unwrap_or_default(),
522 })
523 }
524 Err(se) => match se.into_service_error() {
525 HeadObjectError::NotFound(_) => {
526 error!("object [{bucket}/{key}] not found");
527 bail!("object [{bucket}/{key}] not found")
528 }
529 err => {
530 error!(
531 ?err,
532 code = err.code(),
533 "get_object_metadata failed for object [{bucket}/{key}]"
534 );
535 bail!(anyhow!(err).context(format!(
536 "get_object_metadata failed for object [{bucket}/{key}]"
537 )))
538 }
539 },
540 }
541 }
542}
543
544#[derive(Default, Clone)]
549pub struct BlobstoreS3Provider {
550 actors: Arc<RwLock<HashMap<String, StorageClient>>>,
552}
553
554pub async fn run() -> anyhow::Result<()> {
555 BlobstoreS3Provider::run().await
556}
557
558impl BlobstoreS3Provider {
559 pub async fn run() -> anyhow::Result<()> {
560 initialize_observability!(
561 "blobstore-s3-provider",
562 std::env::var_os("PROVIDER_BLOBSTORE_S3_FLAMEGRAPH_PATH")
563 );
564
565 let provider = Self::default();
566 let shutdown = run_provider(provider.clone(), "blobstore-s3-provider")
567 .await
568 .context("failed to run provider")?;
569 let connection = get_connection();
570 let wrpc = connection
571 .get_wrpc_client(connection.provider_key())
572 .await?;
573 serve_provider_exports(&wrpc, provider, shutdown, serve)
574 .await
575 .context("failed to serve provider exports")
576 }
577
578 async fn client(&self, context: Option<Context>) -> Result<StorageClient> {
580 if let Some(ref source_id) = context.and_then(|Context { component, .. }| component) {
581 self.actors
582 .read()
583 .await
584 .get(source_id)
585 .with_context(|| format!("failed to lookup {source_id} configuration"))
586 .cloned()
587 } else {
588 bail!("failed to lookup invocation source ID")
590 }
591 }
592}
593
594impl Handler<Option<Context>> for BlobstoreS3Provider {
595 #[instrument(level = "trace", skip(self))]
596 async fn clear_container(
597 &self,
598 cx: Option<Context>,
599 name: String,
600 ) -> anyhow::Result<Result<(), String>> {
601 Ok(async {
602 propagate_trace_for_ctx!(cx);
603 let client = self.client(cx).await?;
604 let bucket = client.unalias(&name);
605 let objects = client
606 .list_container_objects(bucket, None, None)
607 .await
608 .context("failed to list container objects")?;
609 client.delete_objects(bucket, objects).await
610 }
611 .await
612 .map_err(|err| format!("{err:#}")))
613 }
614
615 #[instrument(level = "trace", skip(self))]
616 async fn container_exists(
617 &self,
618 cx: Option<Context>,
619 name: String,
620 ) -> anyhow::Result<Result<bool, String>> {
621 Ok(async {
622 propagate_trace_for_ctx!(cx);
623 let client = self.client(cx).await?;
624 client.container_exists(client.unalias(&name)).await
625 }
626 .await
627 .map_err(|err| format!("{err:#}")))
628 }
629
630 #[instrument(level = "trace", skip(self))]
631 async fn create_container(
632 &self,
633 cx: Option<Context>,
634 name: String,
635 ) -> anyhow::Result<Result<(), String>> {
636 Ok(async {
637 propagate_trace_for_ctx!(cx);
638 let client = self.client(cx).await?;
639 client.create_container(client.unalias(&name)).await
640 }
641 .await
642 .map_err(|err| format!("{err:#}")))
643 }
644
645 #[instrument(level = "trace", skip(self))]
646 async fn delete_container(
647 &self,
648 cx: Option<Context>,
649 name: String,
650 ) -> anyhow::Result<Result<(), String>> {
651 Ok(async {
652 propagate_trace_for_ctx!(cx);
653 let client = self.client(cx).await?;
654 client.delete_container(client.unalias(&name)).await
655 }
656 .await
657 .map_err(|err| format!("{err:#}")))
658 }
659
660 #[instrument(level = "trace", skip(self))]
661 async fn get_container_info(
662 &self,
663 cx: Option<Context>,
664 name: String,
665 ) -> anyhow::Result<Result<ContainerMetadata, String>> {
666 Ok(async {
667 propagate_trace_for_ctx!(cx);
668 let client = self.client(cx).await?;
669 client.get_container_info(client.unalias(&name)).await
670 }
671 .await
672 .map_err(|err| format!("{err:#}")))
673 }
674
675 #[instrument(level = "trace", skip(self))]
676 async fn list_container_objects(
677 &self,
678 cx: Option<Context>,
679 name: String,
680 limit: Option<u64>,
681 offset: Option<u64>,
682 ) -> anyhow::Result<
683 Result<
684 (
685 Pin<Box<dyn Stream<Item = Vec<String>> + Send>>,
686 Pin<Box<dyn Future<Output = Result<(), String>> + Send>>,
687 ),
688 String,
689 >,
690 > {
691 Ok(async {
692 propagate_trace_for_ctx!(cx);
693 let client = self.client(cx).await?;
694 let names = client
695 .list_container_objects(client.unalias(&name), limit, offset)
696 .await
697 .map(Vec::from_iter)?;
698 anyhow::Ok((
699 Box::pin(stream::iter([names])) as Pin<Box<dyn Stream<Item = _> + Send>>,
700 Box::pin(async move { Ok(()) }) as Pin<Box<dyn Future<Output = _> + Send>>,
701 ))
702 }
703 .await
704 .map_err(|err| format!("{err:#}")))
705 }
706
707 #[instrument(level = "trace", skip(self))]
708 async fn copy_object(
709 &self,
710 cx: Option<Context>,
711 src: ObjectId,
712 dest: ObjectId,
713 ) -> anyhow::Result<Result<(), String>> {
714 Ok(async {
715 propagate_trace_for_ctx!(cx);
716 let client = self.client(cx).await?;
717 let src_bucket = client.unalias(&src.container);
718 let dest_bucket = client.unalias(&dest.container);
719 client
720 .copy_object(src_bucket, &src.object, dest_bucket, &dest.object)
721 .await
722 }
723 .await
724 .map_err(|err| format!("{err:#}")))
725 }
726
727 #[instrument(level = "trace", skip(self))]
728 async fn delete_object(
729 &self,
730 cx: Option<Context>,
731 id: ObjectId,
732 ) -> anyhow::Result<Result<(), String>> {
733 Ok(async {
734 propagate_trace_for_ctx!(cx);
735 let client = self.client(cx).await?;
736 client
737 .delete_object(client.unalias(&id.container), id.object)
738 .await
739 }
740 .await
741 .map_err(|err| format!("{err:#}")))
742 }
743
744 #[instrument(level = "trace", skip(self))]
745 async fn delete_objects(
746 &self,
747 cx: Option<Context>,
748 container: String,
749 objects: Vec<String>,
750 ) -> anyhow::Result<Result<(), String>> {
751 Ok(async {
752 propagate_trace_for_ctx!(cx);
753 let client = self.client(cx).await?;
754 client
755 .delete_objects(client.unalias(&container), objects)
756 .await
757 }
758 .await
759 .map_err(|err| format!("{err:#}")))
760 }
761
762 #[instrument(level = "trace", skip(self))]
763 async fn get_container_data(
764 &self,
765 cx: Option<Context>,
766 id: ObjectId,
767 start: u64,
768 end: u64,
769 ) -> anyhow::Result<
770 Result<
771 (
772 Pin<Box<dyn Stream<Item = Bytes> + Send>>,
773 Pin<Box<dyn Future<Output = Result<(), String>> + Send>>,
774 ),
775 String,
776 >,
777 > {
778 Ok(async {
779 propagate_trace_for_ctx!(cx);
780 let limit = end
781 .checked_sub(start)
782 .context("`end` must be greater than `start`")?;
783 let client = self.client(cx).await?;
784 let bucket = client.unalias(&id.container);
785 let GetObjectOutput { body, .. } = client
786 .s3_client
787 .get_object()
788 .bucket(bucket)
789 .key(id.object)
790 .range(format!("bytes={start}-{end}"))
791 .send()
792 .await
793 .context("failed to get object")?;
794 let mut data = ReaderStream::new(body.into_async_read().take(limit));
795 let (tx, rx) = mpsc::channel(16);
796 anyhow::Ok((
797 Box::pin(ReceiverStream::new(rx)) as Pin<Box<dyn Stream<Item = _> + Send>>,
798 Box::pin(async move {
799 while let Some(buf) = data.next().await {
800 let buf = buf
801 .context("failed to read object")
802 .map_err(|err| format!("{err:#}"))?;
803 if tx.send(buf).await.is_err() {
804 return Err("stream receiver closed".to_string());
805 }
806 }
807 Ok(())
808 }) as Pin<Box<dyn Future<Output = _> + Send>>,
809 ))
810 }
811 .await
812 .map_err(|err| format!("{err:#}")))
813 }
814
815 #[instrument(level = "trace", skip(self))]
816 async fn get_object_info(
817 &self,
818 cx: Option<Context>,
819 id: ObjectId,
820 ) -> anyhow::Result<Result<ObjectMetadata, String>> {
821 Ok(async {
822 propagate_trace_for_ctx!(cx);
823 let client = self.client(cx).await?;
824 client
825 .get_object_info(client.unalias(&id.container), &id.object)
826 .await
827 }
828 .await
829 .map_err(|err| format!("{err:#}")))
830 }
831
832 #[instrument(level = "trace", skip(self))]
833 async fn has_object(
834 &self,
835 cx: Option<Context>,
836 id: ObjectId,
837 ) -> anyhow::Result<Result<bool, String>> {
838 Ok(async {
839 propagate_trace_for_ctx!(cx);
840 let client = self.client(cx).await?;
841 client
842 .has_object(client.unalias(&id.container), &id.object)
843 .await
844 }
845 .await
846 .map_err(|err| format!("{err:#}")))
847 }
848
849 #[instrument(level = "trace", skip(self))]
850 async fn move_object(
851 &self,
852 cx: Option<Context>,
853 src: ObjectId,
854 dest: ObjectId,
855 ) -> anyhow::Result<Result<(), String>> {
856 Ok(async {
857 propagate_trace_for_ctx!(cx);
858 let client = self.client(cx).await?;
859 let src_bucket = client.unalias(&src.container);
860 let dest_bucket = client.unalias(&dest.container);
861 client
862 .copy_object(src_bucket, &src.object, dest_bucket, &dest.object)
863 .await
864 .context("failed to copy object")?;
865 client
866 .delete_object(src_bucket, src.object)
867 .await
868 .context("failed to delete source object")
869 }
870 .await
871 .map_err(|err| format!("{err:#}")))
872 }
873
874 #[instrument(level = "trace", skip(self, data))]
875 async fn write_container_data(
876 &self,
877 cx: Option<Context>,
878 id: ObjectId,
879 data: Pin<Box<dyn Stream<Item = Bytes> + Send>>,
880 ) -> anyhow::Result<Result<Pin<Box<dyn Future<Output = Result<(), String>> + Send>>, String>>
881 {
882 Ok(async {
883 propagate_trace_for_ctx!(cx);
884 let client = self.client(cx).await?;
885 let req = client
886 .s3_client
887 .put_object()
888 .bucket(client.unalias(&id.container))
889 .key(&id.object);
890 anyhow::Ok(Box::pin(async {
891 let data: BytesMut = data.collect().await;
893 req.body(data.freeze().into())
894 .send()
895 .await
896 .context("failed to put object")
897 .map_err(|err| format!("{err:#}"))?;
898 Ok(())
899 }) as Pin<Box<dyn Future<Output = _> + Send>>)
900 }
901 .await
902 .map_err(|err| format!("{err:#}")))
903 }
904}
905
906impl Provider for BlobstoreS3Provider {
909 async fn receive_link_config_as_target(
913 &self,
914 link_config: LinkConfig<'_>,
915 ) -> anyhow::Result<()> {
916 let config = match StorageConfig::from_link_config(&link_config).await {
918 Ok(v) => v,
919 Err(e) => {
920 error!(error = %e, %link_config.source_id, "failed to build storage config");
921 return Err(anyhow!(e).context("failed to build source config"));
922 }
923 };
924
925 let link = StorageClient::new(config, link_config.config).await;
926
927 let mut update_map = self.actors.write().await;
928 update_map.insert(link_config.source_id.to_string(), link);
929
930 Ok(())
931 }
932
933 #[instrument(level = "info", skip_all, fields(source_id = info.get_source_id()))]
935 async fn delete_link_as_target(&self, info: impl LinkDeleteInfo) -> anyhow::Result<()> {
936 let component_id = info.get_source_id();
937 let mut aw = self.actors.write().await;
938 aw.remove(component_id);
939 Ok(())
940 }
941
942 async fn shutdown(&self) -> anyhow::Result<()> {
944 let mut aw = self.actors.write().await;
945 aw.drain();
947 Ok(())
948 }
949}
950
951#[cfg(test)]
952mod test {
953 use super::*;
954
955 #[tokio::test]
956 async fn aliases() {
957 let client = StorageClient::new(
958 StorageConfig::default(),
959 &HashMap::from([(format!("{ALIAS_PREFIX}foo"), "bar".into())]),
960 )
961 .await;
962
963 assert_eq!(client.unalias("boo"), "boo");
965 assert_eq!(client.unalias("foo"), "bar");
967 assert_eq!(client.unalias(&format!("{ALIAS_PREFIX}foo")), "bar");
969 assert_eq!(client.unalias(&format!("{ALIAS_PREFIX}baz")), "baz");
971 }
972}