Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 21 additions & 21 deletions eng/emitter-package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions eng/emitter-package.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
{
"main": "dist/src/index.js",
"dependencies": {
"@azure-tools/typespec-rust": "0.24.1"
"@azure-tools/typespec-rust": "0.26.0"
},
"devDependencies": {
"@azure-tools/typespec-azure-core": "~0.61",
"@azure-tools/typespec-azure-rulesets": "~0.61",
"@azure-tools/typespec-client-generator-core": ">=0.61.1",
"@azure-tools/typespec-client-generator-core": ">=0.61.2",
"@typespec/compiler": "^1.4.0",
"@typespec/http": "^1.4.0",
"@typespec/openapi": "^1.4.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use azure_storage_blob::{
},
BlobContainerClient,
};
use futures::TryStreamExt;
use futures::StreamExt;
use std::{collections::HashMap, sync::Arc};
use time::OffsetDateTime;
use tracing::{debug, info, warn};
Expand Down Expand Up @@ -217,7 +217,7 @@ impl CheckpointStore for BlobCheckpointStore {

debug!("Using checkpoint prefix: {}", prefix);

let mut blobs = self.blob_container_client.list_blobs(Some(
let list_blobs_response = self.blob_container_client.list_blobs(Some(
BlobContainerClientListBlobFlatSegmentOptions {
prefix: Some(prefix),
include: Some(vec![ListBlobsIncludeItem::Metadata]),
Expand All @@ -233,37 +233,37 @@ impl CheckpointStore for BlobCheckpointStore {
..Default::default()
};

while let Some(blob) = blobs.try_next().await? {
let blob_body = blob.into_body()?;
debug!("Blob body: {blob_body:?}, {:?}", blob_body.container_name);
for blob in blob_body.segment.blob_items.iter() {
let mut checkpoint = checkpoint.clone();
if let Some(name) = &blob.name {
if let Some(name) = &name.content {
checkpoint.partition_id = name
.rfind('/')
.map(|pos| &name[pos + 1..])
.unwrap_or_default()
.to_string();
if let Some(additional_properties) = blob
.metadata
.as_ref()
.and_then(|m| m.additional_properties.as_ref())
{
if let Some(sequence_number) =
additional_properties.get(SEQUENCE_NUMBER)
{
checkpoint.sequence_number = Some(sequence_number.parse()?);
}
if let Some(offset) = additional_properties.get(OFFSET) {
checkpoint.offset = Some(offset.clone());
}
let mut pages = list_blobs_response.into_pages();
let list_blob_flat_segment = pages.next().await.unwrap()?.into_body()?;
debug!(
"Blob body: {list_blob_flat_segment:?}, {:?}",
list_blob_flat_segment.container_name
);
for blob in list_blob_flat_segment.segment.blob_items.iter() {
let mut checkpoint = checkpoint.clone();
if let Some(name) = &blob.name {
if let Some(name) = &name.content {
checkpoint.partition_id = name
.rfind('/')
.map(|pos| &name[pos + 1..])
.unwrap_or_default()
.to_string();
if let Some(additional_properties) = blob
.metadata
.as_ref()
.and_then(|m| m.additional_properties.as_ref())
{
if let Some(sequence_number) = additional_properties.get(SEQUENCE_NUMBER) {
checkpoint.sequence_number = Some(sequence_number.parse()?);
}
if let Some(offset) = additional_properties.get(OFFSET) {
checkpoint.offset = Some(offset.clone());
}
}
}

checkpoints.push(checkpoint);
}

checkpoints.push(checkpoint);
}

debug!("Found {} checkpoints", checkpoints.len());
Expand All @@ -286,7 +286,7 @@ impl CheckpointStore for BlobCheckpointStore {

debug!("Using ownership prefix: {}", prefix);

let mut blobs = self.blob_container_client.list_blobs(Some(
let list_blobs_response = self.blob_container_client.list_blobs(Some(
BlobContainerClientListBlobFlatSegmentOptions {
prefix: Some(prefix),
include: Some(vec![ListBlobsIncludeItem::Metadata]),
Expand All @@ -302,32 +302,34 @@ impl CheckpointStore for BlobCheckpointStore {
..Default::default()
};

while let Some(blob) = blobs.try_next().await? {
let blob_body = blob.into_body()?;
debug!("Blob body: {blob_body:?}, {:?}", blob_body.container_name);
for blob in blob_body.segment.blob_items.iter() {
let mut ownership = ownership.clone();
if let Some(name) = &blob.name {
if let Some(name) = &name.content {
ownership.partition_id = name
.rfind('/')
.map(|pos| &name[pos + 1..])
.unwrap_or_default()
.to_string();
ownership.owner_id = blob
.metadata
.as_ref()
.and_then(|m| m.additional_properties.as_ref())
.and_then(|ap| ap.get(OWNER_ID).cloned());
}
}
if let Some(properties) = &blob.properties {
ownership.etag = properties.etag.as_ref().map(|s| Etag::from(s.clone()));
ownership.last_modified_time = properties.last_modified;
let mut pages = list_blobs_response.into_pages();
let list_blob_flat_segment = pages.next().await.unwrap()?.into_body()?;
debug!(
"Blob body: {list_blob_flat_segment:?}, {:?}",
list_blob_flat_segment.container_name
);
for blob in list_blob_flat_segment.segment.blob_items.iter() {
let mut ownership = ownership.clone();
if let Some(name) = &blob.name {
if let Some(name) = &name.content {
ownership.partition_id = name
.rfind('/')
.map(|pos| &name[pos + 1..])
.unwrap_or_default()
.to_string();
ownership.owner_id = blob
.metadata
.as_ref()
.and_then(|m| m.additional_properties.as_ref())
.and_then(|ap| ap.get(OWNER_ID).cloned());
}

ownerships.push(ownership);
}
if let Some(properties) = &blob.properties {
ownership.etag = properties.etag.as_ref().map(|s| Etag::from(s.clone()));
ownership.last_modified_time = properties.last_modified;
}

ownerships.push(ownership);
}

debug!("Found {} ownerships", ownerships.len());
Expand Down
1 change: 1 addition & 0 deletions sdk/storage/.dict.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ RAGRS
restype
ruleid
secondtag
subsecond
testblob1
testblob2
testblob3
Expand Down
1 change: 1 addition & 0 deletions sdk/storage/azure_storage_blob/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ async-trait.workspace = true
azure_core = { workspace = true, features = ["xml"] }
serde.workspace = true
serde_json.workspace = true
time.workspace = true
typespec_client_core = { workspace = true, features = ["derive"] }
url.workspace = true
uuid.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion sdk/storage/azure_storage_blob/assets.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "rust",
"Tag": "rust/azure_storage_blob_1e5e3b2c6c",
"Tag": "rust/azure_storage_blob_b294798f4f",
"TagPrefix": "rust/azure_storage_blob"
}
7 changes: 2 additions & 5 deletions sdk/storage/azure_storage_blob/perf/list_blob_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,8 @@ impl PerfTest for ListBlobTest {
// The actual performance test code

let mut iterator = self.client.get().unwrap().list_blobs(None)?;
while let Some(blob_segment) = iterator.try_next().await? {
let body = blob_segment.into_body()?;
for blob in body.segment.blob_items.iter() {
std::hint::black_box(blob);
}
while let Some(blob) = iterator.try_next().await? {
std::hint::black_box(blob);
}

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ impl BlobClient {
metadata: HashMap<String, String>,
options: Option<BlobClientSetMetadataOptions<'_>>,
) -> Result<Response<(), NoFormat>> {
self.client.set_metadata(metadata, options).await
self.client.set_metadata(&metadata, options).await
}

/// Deletes the blob.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ use crate::{
BlobContainerClientBreakLeaseOptions, BlobContainerClientBreakLeaseResult,
BlobContainerClientChangeLeaseOptions, BlobContainerClientChangeLeaseResult,
BlobContainerClientCreateOptions, BlobContainerClientDeleteOptions,
BlobContainerClientFindBlobsByTagsOptions, BlobContainerClientGetAccountInfoOptions,
BlobContainerClientGetAccountInfoResult, BlobContainerClientGetPropertiesOptions,
BlobContainerClientGetPropertiesResult, BlobContainerClientListBlobFlatSegmentOptions,
BlobContainerClientReleaseLeaseOptions, BlobContainerClientReleaseLeaseResult,
BlobContainerClientRenewLeaseOptions, BlobContainerClientRenewLeaseResult,
BlobContainerClientSetMetadataOptions,
BlobContainerClientFindBlobsByTagsOptions, BlobContainerClientGetAccessPolicyOptions,
BlobContainerClientGetAccountInfoOptions, BlobContainerClientGetAccountInfoResult,
BlobContainerClientGetPropertiesOptions, BlobContainerClientGetPropertiesResult,
BlobContainerClientListBlobFlatSegmentOptions, BlobContainerClientReleaseLeaseOptions,
BlobContainerClientReleaseLeaseResult, BlobContainerClientRenewLeaseOptions,
BlobContainerClientRenewLeaseResult, BlobContainerClientSetAccessPolicyOptions,
BlobContainerClientSetAccessPolicyResult, BlobContainerClientSetMetadataOptions,
SignedIdentifiers,
},
models::{FilterBlobSegment, ListBlobsFlatSegmentResponse, StorageErrorCode},
pipeline::StorageHeadersPolicy,
Expand All @@ -25,7 +27,7 @@ use azure_core::{
error::ErrorKind,
http::{
policies::{BearerTokenAuthorizationPolicy, Policy},
NoFormat, PageIterator, Pipeline, Response, StatusCode, Url, XmlFormat,
NoFormat, Pager, Pipeline, RequestContent, Response, StatusCode, Url, XmlFormat,
},
tracing, Result,
};
Expand Down Expand Up @@ -192,7 +194,7 @@ impl BlobContainerClient {
metadata: HashMap<String, String>,
options: Option<BlobContainerClientSetMetadataOptions<'_>>,
) -> Result<Response<(), NoFormat>> {
self.client.set_metadata(metadata, options).await
self.client.set_metadata(&metadata, options).await
}

/// Marks the specified container for deletion. The container and any blobs contained within are later deleted during garbage collection.
Expand Down Expand Up @@ -228,7 +230,7 @@ impl BlobContainerClient {
pub fn list_blobs(
&self,
options: Option<BlobContainerClientListBlobFlatSegmentOptions<'_>>,
) -> Result<PageIterator<Response<ListBlobsFlatSegmentResponse, XmlFormat>>> {
) -> Result<Pager<ListBlobsFlatSegmentResponse, XmlFormat>> {
self.client.list_blob_flat_segment(options)
}

Expand Down Expand Up @@ -365,4 +367,32 @@ impl BlobContainerClient {
Err(e) => Err(e),
}
}

/// Sets the permissions for the specified container. The permissions indicate whether blobs in a
/// container may be accessed publicly.
///
/// # Arguments
///
/// * `container_acl` - The access control list for the container.
/// * `options` - Optional configuration for the request.
pub async fn set_access_policy(
&self,
container_acl: RequestContent<SignedIdentifiers, XmlFormat>,
options: Option<BlobContainerClientSetAccessPolicyOptions<'_>>,
) -> Result<Response<BlobContainerClientSetAccessPolicyResult, NoFormat>> {
self.client.set_access_policy(container_acl, options).await
}

/// Gets the permissions for the specified container. The permissions indicate whether container data
/// may be accessed publicly.
///
/// # Arguments
///
/// * `options` - Optional configuration for the request.
pub async fn get_access_policy(
&self,
options: Option<BlobContainerClientGetAccessPolicyOptions<'_>>,
) -> Result<Response<SignedIdentifiers, XmlFormat>> {
self.client.get_access_policy(options).await
}
}
Loading
Loading