Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for partial range headers #1482

Merged
merged 1 commit into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 44 additions & 52 deletions sdk/core/src/request_options/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,67 +2,66 @@ use crate::error::{Error, ErrorKind, ResultExt};
use crate::headers::{self, AsHeaders, HeaderName, HeaderValue};
use std::convert::From;
use std::fmt;
use std::ops::{Range as StdRange, RangeFrom};
use std::str::FromStr;

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct Range {
pub start: u64,
pub end: u64,
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Range {
Range(StdRange<u64>),
RangeFrom(RangeFrom<u64>),
}

impl Range {
pub fn new(start: u64, end: u64) -> Range {
Range { start, end }
(start..end).into()
}

pub fn len(&self) -> u64 {
self.end - self.start
fn optional_len(&self) -> Option<u64> {
match self {
Range::Range(r) => Some(r.end - r.start),
Range::RangeFrom(_) => None,
}
}
}

pub fn is_empty(&self) -> bool {
self.end == self.start
impl From<StdRange<u64>> for Range {
fn from(r: StdRange<u64>) -> Self {
Self::Range(r)
}
}

impl AsHeaders for Range {
type Iter = std::vec::IntoIter<(HeaderName, HeaderValue)>;

fn as_headers(&self) -> Self::Iter {
let mut headers = vec![(headers::MS_RANGE, format!("{self}").into())];
if self.len() < 1024 * 1024 * 4 {
headers.push((
headers::RANGE_GET_CONTENT_CRC64,
HeaderValue::from_static("true"),
));
}
headers.into_iter()
impl From<RangeFrom<u64>> for Range {
fn from(r: RangeFrom<u64>) -> Self {
Self::RangeFrom(r)
}
}

impl From<std::ops::Range<u64>> for Range {
fn from(r: std::ops::Range<u64>) -> Self {
Self {
start: r.start,
end: r.end,
}
impl From<StdRange<usize>> for Range {
fn from(r: StdRange<usize>) -> Self {
(r.start as u64..r.end as u64).into()
}
}

impl From<std::ops::Range<i32>> for Range {
fn from(r: std::ops::Range<i32>) -> Self {
Self {
start: r.start as u64,
end: r.end as u64,
}
impl From<RangeFrom<usize>> for Range {
fn from(r: RangeFrom<usize>) -> Self {
(r.start as u64..).into()
}
}

impl From<std::ops::Range<usize>> for Range {
fn from(r: std::ops::Range<usize>) -> Self {
Self {
start: r.start as u64,
end: r.end as u64,
impl AsHeaders for Range {
type Iter = std::vec::IntoIter<(HeaderName, HeaderValue)>;

fn as_headers(&self) -> Self::Iter {
let mut headers = vec![(headers::MS_RANGE, format!("{self}").into())];
if let Some(len) = self.optional_len() {
if len < 1024 * 1024 * 4 {
headers.push((
headers::RANGE_GET_CONTENT_CRC64,
HeaderValue::from_static("true"),
));
}
}
headers.into_iter()
}
}

Expand All @@ -82,16 +81,16 @@ impl FromStr for Range {
let cp_start = v[0].parse::<u64>().map_kind(ErrorKind::DataConversion)?;
let cp_end = v[1].parse::<u64>().map_kind(ErrorKind::DataConversion)? + 1;

Ok(Range {
start: cp_start,
end: cp_end,
})
Ok((cp_start..cp_end).into())
}
}

impl fmt::Display for Range {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "bytes={}-{}", self.start, self.end - 1)
match self {
Range::Range(r) => write!(f, "bytes={}-{}", r.start, r.end - 1),
Range::RangeFrom(r) => write!(f, "bytes={}-", r.start),
}
}
}

Expand All @@ -102,9 +101,7 @@ mod test {
#[test]
fn test_range_parse() {
let range = "1000/2000".parse::<Range>().unwrap();

assert_eq!(range.start, 1000);
assert_eq!(range.end, 2001);
assert_eq!(range, Range::new(1000, 2001));
}

#[test]
Expand All @@ -119,13 +116,8 @@ mod test {

#[test]
fn test_range_display() {
let range = Range {
start: 100,
end: 501,
};

let range = Range::new(100, 501);
let txt = format!("{range}");

assert_eq!(txt, "bytes=100-500");
}
}
45 changes: 45 additions & 0 deletions sdk/storage_blobs/examples/partial_range.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use azure_storage::prelude::*;
use azure_storage_blobs::prelude::*;
use futures::stream::StreamExt;
use uuid::Uuid;

#[tokio::main]
async fn main() -> azure_core::Result<()> {
env_logger::init();

// First we retrieve the account name and access key from environment variables.
let account =
std::env::var("STORAGE_ACCOUNT").expect("Set env variable STORAGE_ACCOUNT first!");
let access_key =
std::env::var("STORAGE_ACCESS_KEY").expect("Set env variable STORAGE_ACCESS_KEY first!");

let container_name = format!("range-example-{}", Uuid::new_v4());
let blob_name = format!("blob-{}.txt", Uuid::new_v4());

let storage_credentials = StorageCredentials::access_key(account.clone(), access_key);
let container_client =
BlobServiceClient::new(account, storage_credentials).container_client(container_name);
container_client.create().await?;

let blob_client = container_client.blob_client(&blob_name);

let buf = "0123456789".repeat(100);

blob_client.put_block_blob(buf.clone()).await?;

let range = 3usize..;
let mut stream = blob_client.get().range(range.clone()).into_stream();

let mut data: Vec<u8> = vec![];
while let Some(value) = stream.next().await {
let value = value?.data.collect().await?;
println!("{}", value.len());
data.extend(&value);
}
let value = String::from_utf8(data)?;
assert_eq!(&buf[range.clone()], value);

container_client.delete().await?;

Ok(())
}
37 changes: 21 additions & 16 deletions sdk/storage_blobs/src/blob/operations/get_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ impl GetBlobBuilder {

let range = match continuation {
Some(range) => range,
None => {
initial_range(this.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE), this.range)
}
None => initial_range(
this.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE),
this.range.clone(),
),
};

this.blob_versioning.append_to_url_query(&mut url);
Expand Down Expand Up @@ -105,17 +106,18 @@ impl GetBlobResponse {
impl Continuable for GetBlobResponse {
type Continuation = Range;
fn continuation(&self) -> Option<Self::Continuation> {
self.remaining_range
self.remaining_range.clone()
}
}

// calculate the first Range for use at the beginning of the Pageable.
fn initial_range(chunk_size: u64, request_range: Option<Range>) -> Range {
match request_range {
Some(range) => {
let len = std::cmp::min(range.len(), chunk_size);
Range::new(range.start, range.start + len)
Some(Range::Range(x)) => {
let len = std::cmp::min(x.end - x.start, chunk_size);
(x.start..x.start + len).into()
}
Some(Range::RangeFrom(x)) => (x.start..x.start + chunk_size).into(),
None => Range::new(0, chunk_size),
}
}
Expand Down Expand Up @@ -152,19 +154,22 @@ fn remaining_range(
// if the response said the end of the blob was downloaded, we're done
// Note, we add + 1, as we don't need to re-fetch the last
// byte of the previous request.
if content_range.end() + 1 >= requested_range.end {
return None;
}
let after = content_range.end() + 1;

// if the user specified range is smaller than the blob, truncate the
// requested range. Note, we add + 1, as we don't need to re-fetch the last
// byte of the previous request.
let start = content_range.end() + 1;
let remaining_size = requested_range.end - start;
let remaining_size = match requested_range {
Range::Range(x) => {
if after >= x.end {
return None;
}
x.end - after
}
// no requested end
Range::RangeFrom(_) => after,
};

let size = std::cmp::min(remaining_size, chunk_size);

Some(Range::new(start, start + size))
Some(Range::new(after, after + size))
}

#[cfg(test)]
Expand Down
6 changes: 2 additions & 4 deletions sdk/storage_blobs/src/blob/page_range_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,8 @@ mod test {

let prl = PageRangeList::try_from_xml(page_list).unwrap();
assert!(prl.ranges.len() == 2);
assert!(prl.ranges[0].start == 0);
assert!(prl.ranges[0].end == 511);
assert!(prl.ranges[1].start == 1024);
assert!(prl.ranges[1].end == 1535);
assert!(prl.ranges[0] == Range::new(0, 511));
assert!(prl.ranges[1] == Range::new(1024, 1535));

let page_list = "<?xml version=\"1.0\" encoding=\"utf-8\"?><PageList></PageList>";
let prl = PageRangeList::try_from_xml(page_list).unwrap();
Expand Down
17 changes: 8 additions & 9 deletions sdk/storage_blobs/src/options/ba512_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,20 @@ impl BA512Range {

impl From<BA512Range> for Range {
fn from(range: BA512Range) -> Self {
Self {
start: range.start(),
end: range.end(),
}
(range.start()..range.end()).into()
}
}

impl TryFrom<Range> for BA512Range {
type Error = Error;

fn try_from(r: Range) -> azure_core::Result<Self> {
BA512Range::new(r.start, r.end)
match r {
Range::Range(r) => BA512Range::new(r.start, r.end),
Range::RangeFrom(r) => Err(Error::with_message(ErrorKind::DataConversion, || {
format!("error converting RangeFrom<{:?}> into BA512Range", r)
})),
}
}
}

Expand Down Expand Up @@ -109,10 +111,7 @@ impl fmt::Display for BA512Range {

impl<'a> From<&'a BA512Range> for Range {
fn from(ba: &'a BA512Range) -> Range {
Range {
start: ba.start(),
end: ba.end(),
}
(ba.start()..ba.end()).into()
}
}

Expand Down