wasmcloud_provider_blobstore_azure/
lib.rs

1#![allow(clippy::type_complexity)]
2
3use core::future::Future;
4use core::pin::Pin;
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use anyhow::{bail, Context as _, Result};
10use azure_storage::CloudLocation;
11use azure_storage_blobs::prelude::*;
12use bytes::{Bytes, BytesMut};
13use futures::{Stream, StreamExt as _};
14use tokio::sync::{mpsc, RwLock};
15use tokio_stream::wrappers::ReceiverStream;
16use tracing::{error, instrument};
17use wasmcloud_provider_sdk::{
18    get_connection, initialize_observability, load_host_data, propagate_trace_for_ctx,
19    run_provider, serve_provider_exports, Context, HostData, LinkConfig, LinkDeleteInfo, Provider,
20};
21use wrpc_interface_blobstore::bindings::{
22    exports::wrpc::blobstore::blobstore::Handler,
23    serve,
24    wrpc::blobstore::types::{ContainerMetadata, ObjectId, ObjectMetadata},
25};
26
27use config::StorageConfig;
28
29mod config;
30
31/// Blobstore Azblob provider
32///
33/// This struct will be the target of generated implementations (via wit-provider-bindgen)
34/// for the blobstore provider WIT contract
35#[derive(Default, Clone)]
36pub struct BlobstoreAzblobProvider {
37    /// Per-config storage for Azure connection clients
38    config: Arc<RwLock<HashMap<String, BlobServiceClient>>>,
39}
40
41pub async fn run() -> anyhow::Result<()> {
42    BlobstoreAzblobProvider::run().await
43}
44
45/// Handle provider control commands
46/// put_link (new component link command), del_link (remove link command), and shutdown
47impl Provider for BlobstoreAzblobProvider {
48    #[instrument(level = "info", skip_all)]
49    async fn receive_link_config_as_target(
50        &self,
51        link_config: LinkConfig<'_>,
52    ) -> anyhow::Result<()> {
53        let config = match StorageConfig::from_link_config(&link_config) {
54            Ok(v) => v,
55            Err(e) => {
56                error!(error = %e, source_id = %link_config.source_id, "failed to read storage config");
57                return Err(e);
58            }
59        };
60
61        let builder = match &link_config.config.get("CLOUD_LOCATION") {
62            Some(custom_location) => ClientBuilder::with_location(
63                CloudLocation::Custom {
64                    account: config.storage_account.clone(),
65                    uri: custom_location.to_string(),
66                },
67                config.access_key(),
68            ),
69            None => ClientBuilder::new(config.storage_account.clone(), config.access_key()),
70        };
71        let client = builder.blob_service_client();
72
73        let mut update_map = self.config.write().await;
74        update_map.insert(link_config.source_id.to_string(), client);
75
76        Ok(())
77    }
78
79    #[instrument(level = "info", skip_all, fields(source_id = info.get_source_id()))]
80    async fn delete_link_as_target(&self, info: impl LinkDeleteInfo) -> anyhow::Result<()> {
81        let component_id = info.get_source_id();
82        self.config.write().await.remove(component_id);
83        Ok(())
84    }
85
86    async fn shutdown(&self) -> anyhow::Result<()> {
87        self.config.write().await.drain();
88        Ok(())
89    }
90}
91
92impl BlobstoreAzblobProvider {
93    pub async fn run() -> anyhow::Result<()> {
94        let HostData { config, .. } = load_host_data().context("failed to load host data")?;
95        let flamegraph_path = config
96            .get("FLAMEGRAPH_PATH")
97            .map(String::from)
98            .or_else(|| std::env::var("PROVIDER_BLOBSTORE_AZURE_FLAMEGRAPH_PATH").ok());
99        initialize_observability!("blobstore-azure-provider", flamegraph_path);
100
101        let provider = Self::default();
102        let shutdown = run_provider(provider.clone(), "blobstore-azure-provider")
103            .await
104            .context("failed to run provider")?;
105        let connection = get_connection();
106        let wrpc = connection
107            .get_wrpc_client(connection.provider_key())
108            .await?;
109        serve_provider_exports(&wrpc, provider, shutdown, serve)
110            .await
111            .context("failed to serve provider exports")
112    }
113
114    async fn get_config(&self, context: Option<&Context>) -> anyhow::Result<BlobServiceClient> {
115        if let Some(source_id) = context.and_then(|Context { component, .. }| component.as_ref()) {
116            self.config
117                .read()
118                .await
119                .get(source_id)
120                .with_context(|| format!("failed to lookup {source_id} configuration"))
121                .cloned()
122        } else {
123            bail!(
124                "failed to lookup source of invocation, could not construct Azure blobstore client"
125            )
126        }
127    }
128}
129
130impl Handler<Option<Context>> for BlobstoreAzblobProvider {
131    #[instrument(level = "trace", skip(self))]
132    async fn clear_container(
133        &self,
134        cx: Option<Context>,
135        name: String,
136    ) -> anyhow::Result<Result<(), String>> {
137        Ok(async {
138            propagate_trace_for_ctx!(cx);
139            let client = self
140                .get_config(cx.as_ref())
141                .await
142                .context("failed to retrieve azure blobstore client")?;
143
144            let client = client.container_client(&name);
145            let mut blob_stream = client.list_blobs().into_stream();
146            while let Some(blob_entry) = blob_stream.next().await {
147                let blob_entry =
148                    blob_entry.with_context(|| format!("failed to list blobs in '{name}'"))?;
149                for blob in blob_entry.blobs.blobs() {
150                    client
151                        .blob_client(&blob.name)
152                        .delete()
153                        .await
154                        .with_context(|| {
155                            format!("failed to delete blob '{}' in '{name}'", blob.name)
156                        })?;
157                }
158            }
159            Ok(())
160        }
161        .await
162        .map_err(|err: anyhow::Error| format!("{err:#}")))
163    }
164
165    #[instrument(level = "trace", skip(self))]
166    async fn container_exists(
167        &self,
168        cx: Option<Context>,
169        name: String,
170    ) -> anyhow::Result<Result<bool, String>> {
171        Ok(async {
172            propagate_trace_for_ctx!(cx);
173            let client = self
174                .get_config(cx.as_ref())
175                .await
176                .context("failed to retrieve azure blobstore client")?;
177
178            client
179                .container_client(name)
180                .exists()
181                .await
182                .context("failed to check container existence")
183        }
184        .await
185        .map_err(|err| format!("{err:#}")))
186    }
187
188    #[instrument(level = "trace", skip(self))]
189    async fn create_container(
190        &self,
191        cx: Option<Context>,
192        name: String,
193    ) -> anyhow::Result<Result<(), String>> {
194        Ok(async {
195            propagate_trace_for_ctx!(cx);
196            let client = self
197                .get_config(cx.as_ref())
198                .await
199                .context("failed to retrieve azure blobstore client")?;
200
201            client
202                .container_client(name)
203                .create()
204                .await
205                .context("failed to create container")
206        }
207        .await
208        .map_err(|err| format!("{err:#}")))
209    }
210
211    #[instrument(level = "trace", skip(self))]
212    async fn delete_container(
213        &self,
214        cx: Option<Context>,
215        name: String,
216    ) -> anyhow::Result<Result<(), String>> {
217        Ok(async {
218            propagate_trace_for_ctx!(cx);
219            let client = self
220                .get_config(cx.as_ref())
221                .await
222                .context("failed to retrieve azure blobstore client")?;
223
224            client
225                .container_client(name)
226                .delete()
227                .await
228                .context("failed to delete container")
229        }
230        .await
231        .map_err(|err| format!("{err:#}")))
232    }
233
234    #[instrument(level = "trace", skip(self))]
235    async fn get_container_info(
236        &self,
237        cx: Option<Context>,
238        name: String,
239    ) -> anyhow::Result<Result<ContainerMetadata, String>> {
240        Ok(async {
241            propagate_trace_for_ctx!(cx);
242            let client = self
243                .get_config(cx.as_ref())
244                .await
245                .context("failed to retrieve azure blobstore client")?;
246
247            let properties = client
248                .container_client(name)
249                .get_properties()
250                .await
251                .context("failed to get container properties")?;
252
253            let created_at = properties
254                .date
255                .unix_timestamp()
256                .try_into()
257                .context("failed to convert created_at date to u64")?;
258
259            // NOTE: The `created_at` format is currently undefined
260            // https://github.com/WebAssembly/wasi-blobstore/issues/7
261            anyhow::Ok(ContainerMetadata { created_at })
262        }
263        .await
264        .map_err(|err| format!("{err:#}")))
265    }
266
267    #[instrument(level = "trace", skip(self))]
268    async fn list_container_objects(
269        &self,
270        cx: Option<Context>,
271        name: String,
272        limit: Option<u64>,
273        offset: Option<u64>,
274    ) -> anyhow::Result<
275        Result<
276            (
277                Pin<Box<dyn Stream<Item = Vec<String>> + Send>>,
278                Pin<Box<dyn Future<Output = Result<(), String>> + Send>>,
279            ),
280            String,
281        >,
282    > {
283        Ok(async {
284            propagate_trace_for_ctx!(cx);
285            let client = self
286                .get_config(cx.as_ref())
287                .await
288                .context("failed to retrieve azure blobstore client")?;
289
290            let mut names = client.container_client(name).list_blobs().into_stream();
291            let (tx, rx) = mpsc::channel(16);
292            anyhow::Ok((
293                Box::pin(ReceiverStream::new(rx)) as Pin<Box<dyn Stream<Item = _> + Send>>,
294                Box::pin(async move {
295                    let mut offset = offset.unwrap_or_default().try_into().unwrap_or(usize::MAX);
296                    let mut limit = limit
297                        .and_then(|limit| limit.try_into().ok())
298                        .unwrap_or(usize::MAX);
299                    while let Some(res) = names.next().await {
300                        let res = res
301                            .context("failed to receive response")
302                            .map_err(|err| format!("{err:#}"))?;
303                        let mut chunk = vec![];
304                        for name in res.blobs.blobs().map(|Blob { name, .. }| name) {
305                            if limit == 0 {
306                                break;
307                            }
308                            if offset > 0 {
309                                offset -= 1;
310                                continue;
311                            }
312                            chunk.push(name.clone());
313                            limit -= 1;
314                        }
315                        if !chunk.is_empty() && tx.send(chunk).await.is_err() {
316                            return Err("stream receiver closed".to_string());
317                        }
318                    }
319                    Ok(())
320                }) as Pin<Box<dyn Future<Output = _> + Send>>,
321            ))
322        }
323        .await
324        .map_err(|err| format!("{err:#}")))
325    }
326
327    #[instrument(level = "trace", skip(self))]
328    async fn copy_object(
329        &self,
330        cx: Option<Context>,
331        src: ObjectId,
332        dest: ObjectId,
333    ) -> anyhow::Result<Result<(), String>> {
334        Ok(async {
335            propagate_trace_for_ctx!(cx);
336            let client = self
337                .get_config(cx.as_ref())
338                .await
339                .context("failed to retrieve azure blobstore client")?;
340
341            let copy_source = client
342                .container_client(src.container)
343                .blob_client(src.object)
344                .url()
345                .context("failed to get source object for copy")?;
346
347            client
348                .container_client(dest.container)
349                .blob_client(dest.object)
350                .copy(copy_source)
351                .await
352                .map(|_| ())
353                .context("failed to copy source object")
354        }
355        .await
356        .map_err(|err| format!("{err:#}")))
357    }
358
359    #[instrument(level = "trace", skip(self))]
360    async fn delete_object(
361        &self,
362        cx: Option<Context>,
363        id: ObjectId,
364    ) -> anyhow::Result<Result<(), String>> {
365        Ok(async {
366            propagate_trace_for_ctx!(cx);
367            let client = self
368                .get_config(cx.as_ref())
369                .await
370                .context("failed to retrieve azure blobstore client")?;
371
372            client
373                .container_client(id.container)
374                .blob_client(id.object)
375                .delete()
376                .await
377                .map(|_| ())
378                .context("failed to delete object")
379        }
380        .await
381        .map_err(|err| format!("{err:#}")))
382    }
383
384    #[instrument(level = "trace", skip(self))]
385    async fn delete_objects(
386        &self,
387        cx: Option<Context>,
388        container: String,
389        objects: Vec<String>,
390    ) -> anyhow::Result<Result<(), String>> {
391        Ok(async {
392            propagate_trace_for_ctx!(cx);
393            let client = self
394                .get_config(cx.as_ref())
395                .await
396                .context("failed to retrieve azure blobstore client")?;
397
398            let deletes = objects.iter().map(|object| async {
399                client
400                    .container_client(container.clone())
401                    .blob_client(object.clone())
402                    .delete()
403                    .await
404            });
405            futures::future::join_all(deletes)
406                .await
407                .into_iter()
408                .collect::<Result<Vec<_>, azure_storage::Error>>()
409                .map(|_| ())
410                .context("failed to delete objects")
411        }
412        .await
413        .map_err(|err| format!("{err:#}")))
414    }
415
416    #[instrument(level = "trace", skip(self))]
417    async fn get_container_data(
418        &self,
419        cx: Option<Context>,
420        id: ObjectId,
421        start: u64,
422        end: u64,
423    ) -> anyhow::Result<
424        Result<
425            (
426                Pin<Box<dyn Stream<Item = Bytes> + Send>>,
427                Pin<Box<dyn Future<Output = Result<(), String>> + Send>>,
428            ),
429            String,
430        >,
431    > {
432        Ok(async {
433            propagate_trace_for_ctx!(cx);
434            let client = self
435                .get_config(cx.as_ref())
436                .await
437                .context("failed to retrieve azure blobstore client")?;
438
439            let mut stream = client
440                .container_client(id.container)
441                .blob_client(id.object)
442                .get()
443                .range(start..end)
444                .into_stream();
445
446            let (tx, rx) = mpsc::channel(16);
447            anyhow::Ok((
448                Box::pin(ReceiverStream::new(rx)) as Pin<Box<dyn Stream<Item = _> + Send>>,
449                Box::pin(async move {
450                    async move {
451                        while let Some(res) = stream.next().await {
452                            let res = res.context("failed to receive blob")?;
453                            let buf = res
454                                .data
455                                .collect()
456                                .await
457                                .context("failed to receive bytes")?;
458                            tx.send(buf).await.context("stream receiver closed")?;
459                        }
460                        anyhow::Ok(())
461                    }
462                    .await
463                    .map_err(|err| format!("{err:#}"))
464                }) as Pin<Box<dyn Future<Output = _> + Send>>,
465            ))
466        }
467        .await
468        .map_err(|err| format!("{err:#}")))
469    }
470
471    #[instrument(level = "trace", skip(self))]
472    async fn get_object_info(
473        &self,
474        cx: Option<Context>,
475        id: ObjectId,
476    ) -> anyhow::Result<Result<ObjectMetadata, String>> {
477        Ok(async {
478            propagate_trace_for_ctx!(cx);
479            let client = self
480                .get_config(cx.as_ref())
481                .await
482                .context("failed to retrieve azure blobstore client")?;
483
484            let info = client
485                .container_client(id.container)
486                .blob_client(id.object)
487                .get_properties()
488                .await
489                .map_err(|e| anyhow::anyhow!(e))?;
490
491            // NOTE: The `created_at` format is currently undefined
492            // https://github.com/WebAssembly/wasi-blobstore/issues/7
493            let created_at = info
494                .blob
495                .properties
496                .creation_time
497                .unix_timestamp()
498                .try_into()
499                .context("failed to convert created_at date to u64")?;
500            anyhow::Ok(ObjectMetadata {
501                created_at,
502                size: info.blob.properties.content_length,
503            })
504        }
505        .await
506        .map_err(|err| format!("{err:#}")))
507    }
508
509    #[instrument(level = "trace", skip(self))]
510    async fn has_object(
511        &self,
512        cx: Option<Context>,
513        id: ObjectId,
514    ) -> anyhow::Result<Result<bool, String>> {
515        Ok(async {
516            propagate_trace_for_ctx!(cx);
517            let client = self
518                .get_config(cx.as_ref())
519                .await
520                .context("failed to retrieve azure blobstore client")?;
521
522            client
523                .container_client(id.container)
524                .blob_client(id.object)
525                .exists()
526                .await
527                .map_err(|e| anyhow::anyhow!(e))
528        }
529        .await
530        .map_err(|err| format!("{err:#}")))
531    }
532
533    #[instrument(level = "trace", skip(self))]
534    async fn move_object(
535        &self,
536        cx: Option<Context>,
537        src: ObjectId,
538        dest: ObjectId,
539    ) -> anyhow::Result<Result<(), String>> {
540        Ok(async {
541            propagate_trace_for_ctx!(cx);
542            let client = self
543                .get_config(cx.as_ref())
544                .await
545                .context("failed to retrieve azure blobstore client")?;
546
547            let source_client = client
548                .container_client(src.container)
549                .blob_client(src.object);
550
551            // Copy and then delete the source object
552            let copy_source = source_client
553                .url()
554                .context("failed to get source object for copy")?;
555
556            client
557                .container_client(dest.container)
558                .blob_client(dest.object)
559                .copy(copy_source)
560                .await
561                .map(|_| ())
562                .context("failed to copy source object to move")?;
563
564            source_client
565                .delete()
566                .await
567                .map(|_| ())
568                .context("failed to delete source object")
569        }
570        .await
571        .map_err(|err| format!("{err:#}")))
572    }
573
574    #[instrument(level = "trace", skip(self, data))]
575    async fn write_container_data(
576        &self,
577        cx: Option<Context>,
578        id: ObjectId,
579        data: Pin<Box<dyn Stream<Item = Bytes> + Send>>,
580    ) -> anyhow::Result<Result<Pin<Box<dyn Future<Output = Result<(), String>> + Send>>, String>>
581    {
582        Ok(async {
583            propagate_trace_for_ctx!(cx);
584            let client = self
585                .get_config(cx.as_ref())
586                .await
587                .context("failed to retrieve azure blobstore client")?;
588            let client = client.container_client(id.container).blob_client(id.object);
589            anyhow::Ok(Box::pin(async move {
590                // TODO: Stream data
591                let data: BytesMut = data.collect().await;
592                client
593                    .put_block_blob(data)
594                    .await
595                    .map(|_| ())
596                    .context("failed to write container data")
597                    .map_err(|err| format!("{err:#}"))?;
598                Ok(())
599            }) as Pin<Box<dyn Future<Output = _> + Send>>)
600        }
601        .await
602        .map_err(|err| format!("{err:#}")))
603    }
604}