azure_storage_blobs/clients/
blob_client.rsuse crate::{blob::operations::*, prelude::*};
use azure_core::{
error::{Error, ErrorKind},
headers::Headers,
prelude::*,
Body, Method, Request, Response, StatusCode, Url,
};
use azure_storage::{
prelude::*,
shared_access_signature::service_sas::{BlobSharedAccessSignature, UserDeligationKey},
StorageCredentialsInner,
};
use futures::StreamExt;
use std::ops::Deref;
use time::OffsetDateTime;
#[derive(Debug, Clone)]
pub struct BlobClient {
container_client: ContainerClient,
blob_name: String,
}
impl BlobClient {
pub(crate) fn new(container_client: ContainerClient, blob_name: String) -> Self {
Self {
container_client,
blob_name,
}
}
pub fn from_sas_url(url: &Url) -> azure_core::Result<Self> {
let container_client = ContainerClient::from_sas_url(url)?;
let path: Vec<_> = url.path().split_terminator('/').skip(2).collect();
if path.is_empty() {
Err(azure_core::Error::with_message(
azure_core::error::ErrorKind::DataConversion,
|| "unable to find blob path",
))
} else {
let path = path.join("/");
Ok(container_client.blob_client(path))
}
}
pub fn get(&self) -> GetBlobBuilder {
GetBlobBuilder::new(self.clone())
}
pub async fn get_content(&self) -> azure_core::Result<Vec<u8>> {
let mut blob = Vec::new();
let mut stream = self.get().into_stream();
while let Some(value) = stream.next().await {
let data = value?.data.collect().await?;
blob.extend(&data);
}
Ok(blob)
}
pub fn get_properties(&self) -> GetPropertiesBuilder {
GetPropertiesBuilder::new(self.clone())
}
pub fn set_properties(&self) -> SetPropertiesBuilder {
SetPropertiesBuilder::new(self.clone())
}
pub fn get_metadata(&self) -> GetMetadataBuilder {
GetMetadataBuilder::new(self.clone())
}
pub fn set_metadata(&self) -> SetMetadataBuilder {
SetMetadataBuilder::new(self.clone())
}
pub fn set_blob_tier(&self, access_tier: AccessTier) -> SetBlobTierBuilder {
SetBlobTierBuilder::new(self.clone(), access_tier)
}
pub fn set_blob_expiry(&self, blob_expiry: BlobExpiry) -> SetBlobExpiryBuilder {
SetBlobExpiryBuilder::new(self.clone(), blob_expiry)
}
pub fn put_page_blob(&self, length: u128) -> PutPageBlobBuilder {
PutPageBlobBuilder::new(self.clone(), length)
}
pub fn put_append_blob(&self) -> PutAppendBlobBuilder {
PutAppendBlobBuilder::new(self.clone())
}
pub fn put_block_blob(&self, body: impl Into<Body>) -> PutBlockBlobBuilder {
PutBlockBlobBuilder::new(self.clone(), body.into())
}
pub fn copy(&self, copy_source: Url) -> CopyBlobBuilder {
CopyBlobBuilder::new(self.clone(), copy_source)
}
pub fn copy_from_url(&self, copy_source: Url) -> CopyBlobFromUrlBuilder {
CopyBlobFromUrlBuilder::new(self.clone(), copy_source)
}
pub fn acquire_lease<LD: Into<LeaseDuration>>(
&self,
lease_duration: LD,
) -> AcquireLeaseBuilder {
AcquireLeaseBuilder::new(self.clone(), lease_duration.into())
}
pub fn break_lease(&self) -> BreakLeaseBuilder {
BreakLeaseBuilder::new(self.clone())
}
pub fn delete(&self) -> DeleteBlobBuilder {
DeleteBlobBuilder::new(self.clone())
}
pub fn delete_snapshot(&self, snapshot: Snapshot) -> DeleteBlobSnapshotBuilder {
DeleteBlobSnapshotBuilder::new(self.clone(), snapshot)
}
pub fn delete_version_id(&self, version_id: VersionId) -> DeleteBlobVersionBuilder {
DeleteBlobVersionBuilder::new(self.clone(), version_id)
}
pub fn put_block(
&self,
block_id: impl Into<BlockId>,
body: impl Into<Body>,
) -> PutBlockBuilder {
PutBlockBuilder::new(self.clone(), block_id.into(), body.into())
}
pub fn put_block_url(
&self,
block_id: impl Into<BlockId>,
copy_source: Url,
) -> PutBlockUrlBuilder {
PutBlockUrlBuilder::new(self.clone(), block_id.into(), copy_source)
}
pub fn get_block_list(&self) -> GetBlockListBuilder {
GetBlockListBuilder::new(self.clone())
}
pub fn get_tags(&self) -> GetTagsBuilder {
GetTagsBuilder::new(self.clone())
}
pub fn set_tags(&self, tags: impl Into<Tags>) -> SetTagsBuilder {
SetTagsBuilder::new(self.clone(), tags.into())
}
pub fn put_block_list(&self, block_list: BlockList) -> PutBlockListBuilder {
PutBlockListBuilder::new(self.clone(), block_list)
}
pub fn put_page(&self, ba512_range: BA512Range, content: impl Into<Body>) -> PutPageBuilder {
PutPageBuilder::new(self.clone(), ba512_range, content.into())
}
pub fn get_page_ranges(&self) -> GetPageRangesBuilder {
GetPageRangesBuilder::new(self.clone())
}
pub fn append_block(&self, body: impl Into<Body>) -> AppendBlockBuilder {
AppendBlockBuilder::new(self.clone(), body.into())
}
pub fn clear_page(&self, ba512_range: BA512Range) -> ClearPageBuilder {
ClearPageBuilder::new(self.clone(), ba512_range)
}
pub async fn user_delegation_shared_access_signature(
&self,
permissions: BlobSasPermissions,
user_delegation_key: &UserDeligationKey,
) -> azure_core::Result<BlobSharedAccessSignature> {
let creds = self.container_client.credentials().0.read().await;
if !matches!(creds.deref(), StorageCredentialsInner::TokenCredential(_)) {
return Err(Error::message(
ErrorKind::Credential,
"User delegation access signature generation requires Token authentication",
));
};
let service_client = self.container_client().service_client();
let account = service_client.account();
let canonicalized_resource = format!(
"/blob/{}/{}/{}",
account,
self.container_client.container_name(),
self.blob_name()
);
Ok(BlobSharedAccessSignature::new(
user_delegation_key.clone(),
canonicalized_resource,
permissions,
user_delegation_key.signed_expiry,
BlobSignedResource::Blob,
))
}
pub async fn shared_access_signature(
&self,
permissions: BlobSasPermissions,
expiry: OffsetDateTime,
) -> azure_core::Result<BlobSharedAccessSignature> {
let creds = self.container_client.credentials().0.read().await;
let StorageCredentialsInner::Key(account, key) = creds.deref() else {
return Err(Error::message(
ErrorKind::Credential,
"Shared access signature generation - SAS can be generated with access_key clients",
));
};
let canonicalized_resource = format!(
"/blob/{}/{}/{}",
account,
self.container_client.container_name(),
self.blob_name()
);
Ok(BlobSharedAccessSignature::new(
key.clone(),
canonicalized_resource,
permissions,
expiry,
BlobSignedResource::Blob,
))
}
pub fn generate_signed_blob_url<T>(&self, signature: &T) -> azure_core::Result<Url>
where
T: SasToken,
{
let mut url = self.url()?;
url.set_query(Some(&signature.token()?));
Ok(url)
}
pub async fn exists(&self) -> azure_core::Result<bool> {
match self.get_properties().await {
Ok(_) => Ok(true),
Err(err)
if err
.as_http_error()
.map(|e| e.status() == StatusCode::NotFound)
.unwrap_or_default() =>
{
Ok(false)
}
Err(err) => Err(err),
}
}
pub fn snapshot(&self) -> SnapshotBlobBuilder {
SnapshotBlobBuilder::new(self.clone())
}
pub fn blob_name(&self) -> &str {
&self.blob_name
}
pub fn blob_lease_client(&self, lease_id: LeaseId) -> BlobLeaseClient {
BlobLeaseClient::new(self.clone(), lease_id)
}
pub fn container_client(&self) -> &ContainerClient {
&self.container_client
}
pub fn url(&self) -> azure_core::Result<Url> {
let mut url = self.container_client().url()?;
let parts = self.blob_name().trim_matches('/').split('/');
url.path_segments_mut()
.map_err(|()| Error::message(ErrorKind::DataConversion, "Invalid url"))?
.extend(parts);
Ok(url)
}
pub(crate) fn finalize_request(
url: Url,
method: Method,
headers: Headers,
request_body: Option<Body>,
) -> azure_core::Result<Request> {
ContainerClient::finalize_request(url, method, headers, request_body)
}
pub(crate) async fn send(
&self,
context: &mut Context,
request: &mut Request,
) -> azure_core::Result<Response> {
self.container_client.send(context, request).await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_from_url() -> azure_core::Result<()> {
let path = "my/complex/nested/path/here";
let container = "mycontainer";
let account = "accountname";
let example = format!("https://{account}.blob.core.windows.net/{container}/{path}?token=1");
let url = Url::parse(&example)?;
let blob_client = BlobClient::from_sas_url(&url)?;
assert_eq!(blob_client.blob_name(), path);
assert_eq!(blob_client.container_client().container_name(), container);
let creds = blob_client
.container_client
.credentials()
.0
.try_read()
.expect("creds should be unlocked at this point");
assert!(matches!(
creds.deref(),
StorageCredentialsInner::SASToken(_)
));
let url = Url::parse("https://accountname.blob.core.windows.net/mycontainer/myblob")?;
let blob_client = BlobClient::from_sas_url(&url)?;
let creds = blob_client
.container_client
.credentials()
.0
.try_read()
.expect("creds should be unlocked at this point");
assert!(matches!(creds.deref(), StorageCredentialsInner::Anonymous));
let url = Url::parse("https://accountname.blob.core.windows.net/mycontainer?token=1")?;
assert!(BlobClient::from_sas_url(&url).is_err(), "missing path");
let url = Url::parse("https://accountname.blob.core.windows.net/?token=1")?;
assert!(BlobClient::from_sas_url(&url).is_err(), "missing container");
let example =
format!("https://{account}.blob.core.chinacloudapi.cn/{container}/{path}?token=1");
let url = Url::parse(&example)?;
let blob_client = BlobClient::from_sas_url(&url)?;
assert_eq!(blob_client.blob_name(), path);
assert_eq!(blob_client.container_client().container_name(), container);
Ok(())
}
struct FakeSas {
token: String,
}
impl SasToken for FakeSas {
fn token(&self) -> azure_core::Result<String> {
Ok(self.token.clone())
}
}
fn build_url(container_name: &str, blob_name: &str, sas: &FakeSas) -> Url {
let blob_client = ClientBuilder::emulator().blob_client(container_name, blob_name);
blob_client
.generate_signed_blob_url(sas)
.expect("build url failed")
}
#[test]
fn test_generate_url() {
let sas = FakeSas {
token: "fake_token".to_owned(),
};
let url = build_url("a", "b", &sas);
assert_eq!(
url.as_str(),
"http://127.0.0.1:10000/devstoreaccount1/a/b?fake_token"
);
let url = build_url("a", "b/c/d", &sas);
assert_eq!(
url.as_str(),
"http://127.0.0.1:10000/devstoreaccount1/a/b/c/d?fake_token"
);
let url = build_url("a", "/b/c/d", &sas);
assert_eq!(
url.as_str(),
"http://127.0.0.1:10000/devstoreaccount1/a/b/c/d?fake_token"
);
let url = build_url("a", "b/c/d/hi there", &sas);
assert_eq!(
url.as_str(),
"http://127.0.0.1:10000/devstoreaccount1/a/b/c/d/hi%20there?fake_token"
);
}
}