Skip to content

Commit

Permalink
Add ObjectStore::get_opts (apache#2241)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed May 12, 2023
1 parent d6c3c01 commit a513d8c
Show file tree
Hide file tree
Showing 17 changed files with 440 additions and 234 deletions.
41 changes: 23 additions & 18 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ use crate::aws::credential::{AwsCredential, CredentialExt, CredentialProvider};
use crate::aws::STRICT_PATH_ENCODE_SET;
use crate::client::pagination::stream_paginated;
use crate::client::retry::RetryExt;
use crate::client::GetOptionsExt;
use crate::multipart::UploadPart;
use crate::path::DELIMITER;
use crate::util::{format_http_range, format_prefix};
use crate::util::format_prefix;
use crate::{
BoxStream, ClientOptions, ListResult, MultipartId, ObjectMeta, Path, Result,
RetryConfig, StreamExt,
BoxStream, ClientOptions, GetOptions, ListResult, MultipartId, ObjectMeta, Path,
Result, RetryConfig, StreamExt,
};
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
Expand All @@ -37,7 +38,6 @@ use reqwest::{
};
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
use std::ops::Range;
use std::sync::Arc;

/// A specialized `Error` for object store-related errors
Expand Down Expand Up @@ -102,14 +102,24 @@ impl From<Error> for crate::Error {
Error::GetRequest { source, path }
| Error::DeleteRequest { source, path }
| Error::CopyRequest { source, path }
| Error::PutRequest { source, path }
if matches!(source.status(), Some(StatusCode::NOT_FOUND)) =>
{
Self::NotFound {
| Error::PutRequest { source, path } => match source.status() {
Some(StatusCode::NOT_FOUND) => Self::NotFound {
path,
source: Box::new(source),
}
}
},
Some(StatusCode::NOT_MODIFIED) => Self::NotModified {
path,
source: Box::new(source),
},
Some(StatusCode::PRECONDITION_FAILED) => Self::Precondition {
path,
source: Box::new(source),
},
_ => Self::Generic {
store: "S3",
source: Box::new(source),
},
},
_ => Self::Generic {
store: "S3",
source: Box::new(err),
Expand Down Expand Up @@ -245,25 +255,20 @@ impl S3Client {
pub async fn get_request(
&self,
path: &Path,
range: Option<Range<usize>>,
options: GetOptions,
head: bool,
) -> Result<Response> {
use reqwest::header::RANGE;

let credential = self.get_credential().await?;
let url = self.config.path_url(path);
let method = match head {
true => Method::HEAD,
false => Method::GET,
};

let mut builder = self.client.request(method, url);

if let Some(range) = range {
builder = builder.header(RANGE, format_http_range(range));
}
let builder = self.client.request(method, url);

let response = builder
.with_get_options(options)
.with_aws_sigv4(
credential.as_ref(),
&self.config.region,
Expand Down
31 changes: 9 additions & 22 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::collections::BTreeSet;
use std::ops::Range;
use std::str::FromStr;
use std::sync::Arc;
use tokio::io::AsyncWrite;
Expand All @@ -57,8 +56,8 @@ use crate::client::ClientConfigKey;
use crate::config::ConfigValue;
use crate::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart};
use crate::{
ClientOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path,
Result, RetryConfig, StreamExt,
ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
ObjectStore, Path, Result, RetryConfig, StreamExt,
};

mod checksum;
Expand Down Expand Up @@ -246,8 +245,8 @@ impl ObjectStore for AmazonS3 {
.await
}

async fn get(&self, location: &Path) -> Result<GetResult> {
let response = self.client.get_request(location, None, false).await?;
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let response = self.client.get_request(location, options, false).await?;
let stream = response
.bytes_stream()
.map_err(|source| crate::Error::Generic {
Expand All @@ -259,26 +258,13 @@ impl ObjectStore for AmazonS3 {
Ok(GetResult::Stream(stream))
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
let bytes = self
.client
.get_request(location, Some(range), false)
.await?
.bytes()
.await
.map_err(|source| client::Error::GetResponseBody {
source,
path: location.to_string(),
})?;
Ok(bytes)
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
use reqwest::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};

let options = GetOptions::default();
// Extract meta from headers
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax
let response = self.client.get_request(location, None, true).await?;
let response = self.client.get_request(location, options, true).await?;
let headers = response.headers();

let last_modified = headers
Expand Down Expand Up @@ -1169,8 +1155,8 @@ fn profile_credentials(
mod tests {
use super::*;
use crate::tests::{
get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter,
put_get_delete_list_opts, rename_and_copy, stream_get,
get_nonexistent_object, get_opts, list_uses_directories_correctly,
list_with_delimiter, put_get_delete_list_opts, rename_and_copy, stream_get,
};
use bytes::Bytes;
use std::collections::HashMap;
Expand Down Expand Up @@ -1417,6 +1403,7 @@ mod tests {

// Localstack doesn't support listing with spaces https://github.com/localstack/localstack/issues/6328
put_get_delete_list_opts(&integration, is_local).await;
get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
Expand Down
42 changes: 24 additions & 18 deletions object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ use super::credential::{AzureCredential, CredentialProvider};
use crate::azure::credential::*;
use crate::client::pagination::stream_paginated;
use crate::client::retry::RetryExt;
use crate::client::GetOptionsExt;
use crate::path::DELIMITER;
use crate::util::{deserialize_rfc1123, format_http_range, format_prefix};
use crate::util::{deserialize_rfc1123, format_prefix};
use crate::{
BoxStream, ClientOptions, ListResult, ObjectMeta, Path, Result, RetryConfig,
StreamExt,
BoxStream, ClientOptions, GetOptions, ListResult, ObjectMeta, Path, Result,
RetryConfig, StreamExt,
};
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
Expand All @@ -32,13 +33,12 @@ use chrono::{DateTime, Utc};
use itertools::Itertools;
use reqwest::header::CONTENT_TYPE;
use reqwest::{
header::{HeaderValue, CONTENT_LENGTH, IF_NONE_MATCH, RANGE},
header::{HeaderValue, CONTENT_LENGTH, IF_NONE_MATCH},
Client as ReqwestClient, Method, Response, StatusCode,
};
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
use std::collections::HashMap;
use std::ops::Range;
use url::Url;

/// A specialized `Error` for object store-related errors
Expand Down Expand Up @@ -95,15 +95,24 @@ impl From<Error> for crate::Error {
match err {
Error::GetRequest { source, path }
| Error::DeleteRequest { source, path }
| Error::CopyRequest { source, path }
| Error::PutRequest { source, path }
if matches!(source.status(), Some(StatusCode::NOT_FOUND)) =>
{
Self::NotFound {
| Error::PutRequest { source, path } => match source.status() {
Some(StatusCode::NOT_FOUND) => Self::NotFound {
path,
source: Box::new(source),
}
}
},
Some(StatusCode::NOT_MODIFIED) => Self::NotModified {
path,
source: Box::new(source),
},
Some(StatusCode::PRECONDITION_FAILED) => Self::Precondition {
path,
source: Box::new(source),
},
_ => Self::Generic {
store: "S3",
source: Box::new(source),
},
},
Error::CopyRequest { source, path }
if matches!(source.status(), Some(StatusCode::CONFLICT)) =>
{
Expand Down Expand Up @@ -253,7 +262,7 @@ impl AzureClient {
pub async fn get_request(
&self,
path: &Path,
range: Option<Range<usize>>,
options: GetOptions,
head: bool,
) -> Result<Response> {
let credential = self.get_credential().await?;
Expand All @@ -263,17 +272,14 @@ impl AzureClient {
false => Method::GET,
};

let mut builder = self
let builder = self
.client
.request(method, url)
.header(CONTENT_LENGTH, HeaderValue::from_static("0"))
.body(Bytes::new());

if let Some(range) = range {
builder = builder.header(RANGE, format_http_range(range));
}

let response = builder
.with_get_options(options)
.with_azure_authorization(&credential, &self.config.account)
.send_retry(&self.config.retry_config)
.await
Expand Down
34 changes: 12 additions & 22 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use crate::client::token::TokenCache;
use crate::{
multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart},
path::Path,
ClientOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result,
RetryConfig,
ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
ObjectStore, Result, RetryConfig,
};
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
Expand All @@ -45,7 +45,6 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
use std::fmt::{Debug, Formatter};
use std::io;
use std::ops::Range;
use std::sync::Arc;
use std::{collections::BTreeSet, str::FromStr};
use tokio::io::AsyncWrite;
Expand Down Expand Up @@ -150,6 +149,7 @@ enum Error {
impl From<Error> for super::Error {
fn from(source: Error) -> Self {
match source {

Error::UnknownConfigurationKey { key } => Self::UnknownConfigurationKey {
store: "MicrosoftAzure",
key,
Expand Down Expand Up @@ -209,8 +209,8 @@ impl ObjectStore for MicrosoftAzure {
Ok(())
}

async fn get(&self, location: &Path) -> Result<GetResult> {
let response = self.client.get_request(location, None, false).await?;
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let response = self.client.get_request(location, options, false).await?;
let stream = response
.bytes_stream()
.map_err(|source| crate::Error::Generic {
Expand All @@ -222,26 +222,13 @@ impl ObjectStore for MicrosoftAzure {
Ok(GetResult::Stream(stream))
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
let bytes = self
.client
.get_request(location, Some(range), false)
.await?
.bytes()
.await
.map_err(|source| client::Error::GetResponseBody {
source,
path: location.to_string(),
})?;
Ok(bytes)
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
use reqwest::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};
let options = GetOptions::default();

// Extract meta from headers
// https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties
let response = self.client.get_request(location, None, true).await?;
let response = self.client.get_request(location, options, true).await?;
let headers = response.headers();

let last_modified = headers
Expand Down Expand Up @@ -1103,8 +1090,9 @@ fn split_sas(sas: &str) -> Result<Vec<(String, String)>, Error> {
mod tests {
use super::*;
use crate::tests::{
copy_if_not_exists, list_uses_directories_correctly, list_with_delimiter,
put_get_delete_list, put_get_delete_list_opts, rename_and_copy, stream_get,
copy_if_not_exists, get_opts, list_uses_directories_correctly,
list_with_delimiter, put_get_delete_list, put_get_delete_list_opts,
rename_and_copy, stream_get,
};
use std::collections::HashMap;
use std::env;
Expand Down Expand Up @@ -1175,6 +1163,7 @@ mod tests {
async fn azure_blob_test() {
let integration = maybe_skip_integration!().build().unwrap();
put_get_delete_list_opts(&integration, false).await;
get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
Expand Down Expand Up @@ -1203,6 +1192,7 @@ mod tests {
let integration = builder.build().unwrap();

put_get_delete_list(&integration).await;
get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
Expand Down
7 changes: 4 additions & 3 deletions object_store/src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tokio::io::AsyncWrite;

use crate::path::Path;
use crate::util::maybe_spawn_blocking;
use crate::{GetResult, ListResult, ObjectMeta, ObjectStore};
use crate::{GetOptions, GetResult, ListResult, ObjectMeta, ObjectStore};
use crate::{MultipartId, Result};

/// Wraps a [`ObjectStore`] and makes its get response return chunks
Expand Down Expand Up @@ -81,8 +81,8 @@ impl ObjectStore for ChunkedStore {
self.inner.abort_multipart(location, multipart_id).await
}

async fn get(&self, location: &Path) -> Result<GetResult> {
match self.inner.get(location).await? {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
match self.inner.get_opts(location, options).await? {
GetResult::File(std_file, ..) => {
let reader = BufReader::new(std_file);
let chunk_size = self.chunk_size;
Expand Down Expand Up @@ -245,6 +245,7 @@ mod tests {
let integration = ChunkedStore::new(Arc::clone(integration), 100);

put_get_delete_list(&integration).await;
get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
Expand Down
Loading

0 comments on commit a513d8c

Please sign in to comment.