wasmcloud_provider_blobstore_azure/
lib.rs1#![allow(clippy::type_complexity)]
2
3use core::future::Future;
4use core::pin::Pin;
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use anyhow::{bail, Context as _, Result};
10use azure_storage::CloudLocation;
11use azure_storage_blobs::prelude::*;
12use bytes::{Bytes, BytesMut};
13use futures::{Stream, StreamExt as _};
14use tokio::sync::{mpsc, RwLock};
15use tokio_stream::wrappers::ReceiverStream;
16use tracing::{error, instrument};
17use wasmcloud_provider_sdk::{
18 get_connection, initialize_observability, load_host_data, propagate_trace_for_ctx,
19 run_provider, serve_provider_exports, Context, HostData, LinkConfig, LinkDeleteInfo, Provider,
20};
21use wrpc_interface_blobstore::bindings::{
22 exports::wrpc::blobstore::blobstore::Handler,
23 serve,
24 wrpc::blobstore::types::{ContainerMetadata, ObjectId, ObjectMetadata},
25};
26
27use config::StorageConfig;
28
29mod config;
30
31#[derive(Default, Clone)]
36pub struct BlobstoreAzblobProvider {
37 config: Arc<RwLock<HashMap<String, BlobServiceClient>>>,
39}
40
41pub async fn run() -> anyhow::Result<()> {
42 BlobstoreAzblobProvider::run().await
43}
44
45impl Provider for BlobstoreAzblobProvider {
48 #[instrument(level = "info", skip_all)]
49 async fn receive_link_config_as_target(
50 &self,
51 link_config: LinkConfig<'_>,
52 ) -> anyhow::Result<()> {
53 let config = match StorageConfig::from_link_config(&link_config) {
54 Ok(v) => v,
55 Err(e) => {
56 error!(error = %e, source_id = %link_config.source_id, "failed to read storage config");
57 return Err(e);
58 }
59 };
60
61 let builder = match &link_config.config.get("CLOUD_LOCATION") {
62 Some(custom_location) => ClientBuilder::with_location(
63 CloudLocation::Custom {
64 account: config.storage_account.clone(),
65 uri: custom_location.to_string(),
66 },
67 config.access_key(),
68 ),
69 None => ClientBuilder::new(config.storage_account.clone(), config.access_key()),
70 };
71 let client = builder.blob_service_client();
72
73 let mut update_map = self.config.write().await;
74 update_map.insert(link_config.source_id.to_string(), client);
75
76 Ok(())
77 }
78
79 #[instrument(level = "info", skip_all, fields(source_id = info.get_source_id()))]
80 async fn delete_link_as_target(&self, info: impl LinkDeleteInfo) -> anyhow::Result<()> {
81 let component_id = info.get_source_id();
82 self.config.write().await.remove(component_id);
83 Ok(())
84 }
85
86 async fn shutdown(&self) -> anyhow::Result<()> {
87 self.config.write().await.drain();
88 Ok(())
89 }
90}
91
92impl BlobstoreAzblobProvider {
93 pub async fn run() -> anyhow::Result<()> {
94 let HostData { config, .. } = load_host_data().context("failed to load host data")?;
95 let flamegraph_path = config
96 .get("FLAMEGRAPH_PATH")
97 .map(String::from)
98 .or_else(|| std::env::var("PROVIDER_BLOBSTORE_AZURE_FLAMEGRAPH_PATH").ok());
99 initialize_observability!("blobstore-azure-provider", flamegraph_path);
100
101 let provider = Self::default();
102 let shutdown = run_provider(provider.clone(), "blobstore-azure-provider")
103 .await
104 .context("failed to run provider")?;
105 let connection = get_connection();
106 let wrpc = connection
107 .get_wrpc_client(connection.provider_key())
108 .await?;
109 serve_provider_exports(&wrpc, provider, shutdown, serve)
110 .await
111 .context("failed to serve provider exports")
112 }
113
114 async fn get_config(&self, context: Option<&Context>) -> anyhow::Result<BlobServiceClient> {
115 if let Some(source_id) = context.and_then(|Context { component, .. }| component.as_ref()) {
116 self.config
117 .read()
118 .await
119 .get(source_id)
120 .with_context(|| format!("failed to lookup {source_id} configuration"))
121 .cloned()
122 } else {
123 bail!(
124 "failed to lookup source of invocation, could not construct Azure blobstore client"
125 )
126 }
127 }
128}
129
130impl Handler<Option<Context>> for BlobstoreAzblobProvider {
131 #[instrument(level = "trace", skip(self))]
132 async fn clear_container(
133 &self,
134 cx: Option<Context>,
135 name: String,
136 ) -> anyhow::Result<Result<(), String>> {
137 Ok(async {
138 propagate_trace_for_ctx!(cx);
139 let client = self
140 .get_config(cx.as_ref())
141 .await
142 .context("failed to retrieve azure blobstore client")?;
143
144 let client = client.container_client(&name);
145 let mut blob_stream = client.list_blobs().into_stream();
146 while let Some(blob_entry) = blob_stream.next().await {
147 let blob_entry =
148 blob_entry.with_context(|| format!("failed to list blobs in '{name}'"))?;
149 for blob in blob_entry.blobs.blobs() {
150 client
151 .blob_client(&blob.name)
152 .delete()
153 .await
154 .with_context(|| {
155 format!("failed to delete blob '{}' in '{name}'", blob.name)
156 })?;
157 }
158 }
159 Ok(())
160 }
161 .await
162 .map_err(|err: anyhow::Error| format!("{err:#}")))
163 }
164
165 #[instrument(level = "trace", skip(self))]
166 async fn container_exists(
167 &self,
168 cx: Option<Context>,
169 name: String,
170 ) -> anyhow::Result<Result<bool, String>> {
171 Ok(async {
172 propagate_trace_for_ctx!(cx);
173 let client = self
174 .get_config(cx.as_ref())
175 .await
176 .context("failed to retrieve azure blobstore client")?;
177
178 client
179 .container_client(name)
180 .exists()
181 .await
182 .context("failed to check container existence")
183 }
184 .await
185 .map_err(|err| format!("{err:#}")))
186 }
187
188 #[instrument(level = "trace", skip(self))]
189 async fn create_container(
190 &self,
191 cx: Option<Context>,
192 name: String,
193 ) -> anyhow::Result<Result<(), String>> {
194 Ok(async {
195 propagate_trace_for_ctx!(cx);
196 let client = self
197 .get_config(cx.as_ref())
198 .await
199 .context("failed to retrieve azure blobstore client")?;
200
201 client
202 .container_client(name)
203 .create()
204 .await
205 .context("failed to create container")
206 }
207 .await
208 .map_err(|err| format!("{err:#}")))
209 }
210
211 #[instrument(level = "trace", skip(self))]
212 async fn delete_container(
213 &self,
214 cx: Option<Context>,
215 name: String,
216 ) -> anyhow::Result<Result<(), String>> {
217 Ok(async {
218 propagate_trace_for_ctx!(cx);
219 let client = self
220 .get_config(cx.as_ref())
221 .await
222 .context("failed to retrieve azure blobstore client")?;
223
224 client
225 .container_client(name)
226 .delete()
227 .await
228 .context("failed to delete container")
229 }
230 .await
231 .map_err(|err| format!("{err:#}")))
232 }
233
234 #[instrument(level = "trace", skip(self))]
235 async fn get_container_info(
236 &self,
237 cx: Option<Context>,
238 name: String,
239 ) -> anyhow::Result<Result<ContainerMetadata, String>> {
240 Ok(async {
241 propagate_trace_for_ctx!(cx);
242 let client = self
243 .get_config(cx.as_ref())
244 .await
245 .context("failed to retrieve azure blobstore client")?;
246
247 let properties = client
248 .container_client(name)
249 .get_properties()
250 .await
251 .context("failed to get container properties")?;
252
253 let created_at = properties
254 .date
255 .unix_timestamp()
256 .try_into()
257 .context("failed to convert created_at date to u64")?;
258
259 anyhow::Ok(ContainerMetadata { created_at })
262 }
263 .await
264 .map_err(|err| format!("{err:#}")))
265 }
266
267 #[instrument(level = "trace", skip(self))]
268 async fn list_container_objects(
269 &self,
270 cx: Option<Context>,
271 name: String,
272 limit: Option<u64>,
273 offset: Option<u64>,
274 ) -> anyhow::Result<
275 Result<
276 (
277 Pin<Box<dyn Stream<Item = Vec<String>> + Send>>,
278 Pin<Box<dyn Future<Output = Result<(), String>> + Send>>,
279 ),
280 String,
281 >,
282 > {
283 Ok(async {
284 propagate_trace_for_ctx!(cx);
285 let client = self
286 .get_config(cx.as_ref())
287 .await
288 .context("failed to retrieve azure blobstore client")?;
289
290 let mut names = client.container_client(name).list_blobs().into_stream();
291 let (tx, rx) = mpsc::channel(16);
292 anyhow::Ok((
293 Box::pin(ReceiverStream::new(rx)) as Pin<Box<dyn Stream<Item = _> + Send>>,
294 Box::pin(async move {
295 let mut offset = offset.unwrap_or_default().try_into().unwrap_or(usize::MAX);
296 let mut limit = limit
297 .and_then(|limit| limit.try_into().ok())
298 .unwrap_or(usize::MAX);
299 while let Some(res) = names.next().await {
300 let res = res
301 .context("failed to receive response")
302 .map_err(|err| format!("{err:#}"))?;
303 let mut chunk = vec![];
304 for name in res.blobs.blobs().map(|Blob { name, .. }| name) {
305 if limit == 0 {
306 break;
307 }
308 if offset > 0 {
309 offset -= 1;
310 continue;
311 }
312 chunk.push(name.clone());
313 limit -= 1;
314 }
315 if !chunk.is_empty() && tx.send(chunk).await.is_err() {
316 return Err("stream receiver closed".to_string());
317 }
318 }
319 Ok(())
320 }) as Pin<Box<dyn Future<Output = _> + Send>>,
321 ))
322 }
323 .await
324 .map_err(|err| format!("{err:#}")))
325 }
326
327 #[instrument(level = "trace", skip(self))]
328 async fn copy_object(
329 &self,
330 cx: Option<Context>,
331 src: ObjectId,
332 dest: ObjectId,
333 ) -> anyhow::Result<Result<(), String>> {
334 Ok(async {
335 propagate_trace_for_ctx!(cx);
336 let client = self
337 .get_config(cx.as_ref())
338 .await
339 .context("failed to retrieve azure blobstore client")?;
340
341 let copy_source = client
342 .container_client(src.container)
343 .blob_client(src.object)
344 .url()
345 .context("failed to get source object for copy")?;
346
347 client
348 .container_client(dest.container)
349 .blob_client(dest.object)
350 .copy(copy_source)
351 .await
352 .map(|_| ())
353 .context("failed to copy source object")
354 }
355 .await
356 .map_err(|err| format!("{err:#}")))
357 }
358
359 #[instrument(level = "trace", skip(self))]
360 async fn delete_object(
361 &self,
362 cx: Option<Context>,
363 id: ObjectId,
364 ) -> anyhow::Result<Result<(), String>> {
365 Ok(async {
366 propagate_trace_for_ctx!(cx);
367 let client = self
368 .get_config(cx.as_ref())
369 .await
370 .context("failed to retrieve azure blobstore client")?;
371
372 client
373 .container_client(id.container)
374 .blob_client(id.object)
375 .delete()
376 .await
377 .map(|_| ())
378 .context("failed to delete object")
379 }
380 .await
381 .map_err(|err| format!("{err:#}")))
382 }
383
384 #[instrument(level = "trace", skip(self))]
385 async fn delete_objects(
386 &self,
387 cx: Option<Context>,
388 container: String,
389 objects: Vec<String>,
390 ) -> anyhow::Result<Result<(), String>> {
391 Ok(async {
392 propagate_trace_for_ctx!(cx);
393 let client = self
394 .get_config(cx.as_ref())
395 .await
396 .context("failed to retrieve azure blobstore client")?;
397
398 let deletes = objects.iter().map(|object| async {
399 client
400 .container_client(container.clone())
401 .blob_client(object.clone())
402 .delete()
403 .await
404 });
405 futures::future::join_all(deletes)
406 .await
407 .into_iter()
408 .collect::<Result<Vec<_>, azure_storage::Error>>()
409 .map(|_| ())
410 .context("failed to delete objects")
411 }
412 .await
413 .map_err(|err| format!("{err:#}")))
414 }
415
416 #[instrument(level = "trace", skip(self))]
417 async fn get_container_data(
418 &self,
419 cx: Option<Context>,
420 id: ObjectId,
421 start: u64,
422 end: u64,
423 ) -> anyhow::Result<
424 Result<
425 (
426 Pin<Box<dyn Stream<Item = Bytes> + Send>>,
427 Pin<Box<dyn Future<Output = Result<(), String>> + Send>>,
428 ),
429 String,
430 >,
431 > {
432 Ok(async {
433 propagate_trace_for_ctx!(cx);
434 let client = self
435 .get_config(cx.as_ref())
436 .await
437 .context("failed to retrieve azure blobstore client")?;
438
439 let mut stream = client
440 .container_client(id.container)
441 .blob_client(id.object)
442 .get()
443 .range(start..end)
444 .into_stream();
445
446 let (tx, rx) = mpsc::channel(16);
447 anyhow::Ok((
448 Box::pin(ReceiverStream::new(rx)) as Pin<Box<dyn Stream<Item = _> + Send>>,
449 Box::pin(async move {
450 async move {
451 while let Some(res) = stream.next().await {
452 let res = res.context("failed to receive blob")?;
453 let buf = res
454 .data
455 .collect()
456 .await
457 .context("failed to receive bytes")?;
458 tx.send(buf).await.context("stream receiver closed")?;
459 }
460 anyhow::Ok(())
461 }
462 .await
463 .map_err(|err| format!("{err:#}"))
464 }) as Pin<Box<dyn Future<Output = _> + Send>>,
465 ))
466 }
467 .await
468 .map_err(|err| format!("{err:#}")))
469 }
470
471 #[instrument(level = "trace", skip(self))]
472 async fn get_object_info(
473 &self,
474 cx: Option<Context>,
475 id: ObjectId,
476 ) -> anyhow::Result<Result<ObjectMetadata, String>> {
477 Ok(async {
478 propagate_trace_for_ctx!(cx);
479 let client = self
480 .get_config(cx.as_ref())
481 .await
482 .context("failed to retrieve azure blobstore client")?;
483
484 let info = client
485 .container_client(id.container)
486 .blob_client(id.object)
487 .get_properties()
488 .await
489 .map_err(|e| anyhow::anyhow!(e))?;
490
491 let created_at = info
494 .blob
495 .properties
496 .creation_time
497 .unix_timestamp()
498 .try_into()
499 .context("failed to convert created_at date to u64")?;
500 anyhow::Ok(ObjectMetadata {
501 created_at,
502 size: info.blob.properties.content_length,
503 })
504 }
505 .await
506 .map_err(|err| format!("{err:#}")))
507 }
508
509 #[instrument(level = "trace", skip(self))]
510 async fn has_object(
511 &self,
512 cx: Option<Context>,
513 id: ObjectId,
514 ) -> anyhow::Result<Result<bool, String>> {
515 Ok(async {
516 propagate_trace_for_ctx!(cx);
517 let client = self
518 .get_config(cx.as_ref())
519 .await
520 .context("failed to retrieve azure blobstore client")?;
521
522 client
523 .container_client(id.container)
524 .blob_client(id.object)
525 .exists()
526 .await
527 .map_err(|e| anyhow::anyhow!(e))
528 }
529 .await
530 .map_err(|err| format!("{err:#}")))
531 }
532
533 #[instrument(level = "trace", skip(self))]
534 async fn move_object(
535 &self,
536 cx: Option<Context>,
537 src: ObjectId,
538 dest: ObjectId,
539 ) -> anyhow::Result<Result<(), String>> {
540 Ok(async {
541 propagate_trace_for_ctx!(cx);
542 let client = self
543 .get_config(cx.as_ref())
544 .await
545 .context("failed to retrieve azure blobstore client")?;
546
547 let source_client = client
548 .container_client(src.container)
549 .blob_client(src.object);
550
551 let copy_source = source_client
553 .url()
554 .context("failed to get source object for copy")?;
555
556 client
557 .container_client(dest.container)
558 .blob_client(dest.object)
559 .copy(copy_source)
560 .await
561 .map(|_| ())
562 .context("failed to copy source object to move")?;
563
564 source_client
565 .delete()
566 .await
567 .map(|_| ())
568 .context("failed to delete source object")
569 }
570 .await
571 .map_err(|err| format!("{err:#}")))
572 }
573
574 #[instrument(level = "trace", skip(self, data))]
575 async fn write_container_data(
576 &self,
577 cx: Option<Context>,
578 id: ObjectId,
579 data: Pin<Box<dyn Stream<Item = Bytes> + Send>>,
580 ) -> anyhow::Result<Result<Pin<Box<dyn Future<Output = Result<(), String>> + Send>>, String>>
581 {
582 Ok(async {
583 propagate_trace_for_ctx!(cx);
584 let client = self
585 .get_config(cx.as_ref())
586 .await
587 .context("failed to retrieve azure blobstore client")?;
588 let client = client.container_client(id.container).blob_client(id.object);
589 anyhow::Ok(Box::pin(async move {
590 let data: BytesMut = data.collect().await;
592 client
593 .put_block_blob(data)
594 .await
595 .map(|_| ())
596 .context("failed to write container data")
597 .map_err(|err| format!("{err:#}"))?;
598 Ok(())
599 }) as Pin<Box<dyn Future<Output = _> + Send>>)
600 }
601 .await
602 .map_err(|err| format!("{err:#}")))
603 }
604}