diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 9634c740d01d..fb1944d5a1cd 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -20,12 +20,13 @@ use crate::aws::credential::{AwsCredential, CredentialExt, CredentialProvider}; use crate::aws::STRICT_PATH_ENCODE_SET; use crate::client::pagination::stream_paginated; use crate::client::retry::RetryExt; +use crate::client::GetOptionsExt; use crate::multipart::UploadPart; use crate::path::DELIMITER; -use crate::util::{format_http_range, format_prefix}; +use crate::util::format_prefix; use crate::{ - BoxStream, ClientOptions, ListResult, MultipartId, ObjectMeta, Path, Result, - RetryConfig, StreamExt, + BoxStream, ClientOptions, GetOptions, ListResult, MultipartId, ObjectMeta, Path, + Result, RetryConfig, StreamExt, }; use base64::prelude::BASE64_STANDARD; use base64::Engine; @@ -37,7 +38,6 @@ use reqwest::{ }; use serde::{Deserialize, Serialize}; use snafu::{ResultExt, Snafu}; -use std::ops::Range; use std::sync::Arc; /// A specialized `Error` for object store-related errors @@ -102,14 +102,24 @@ impl From for crate::Error { Error::GetRequest { source, path } | Error::DeleteRequest { source, path } | Error::CopyRequest { source, path } - | Error::PutRequest { source, path } - if matches!(source.status(), Some(StatusCode::NOT_FOUND)) => - { - Self::NotFound { + | Error::PutRequest { source, path } => match source.status() { + Some(StatusCode::NOT_FOUND) => Self::NotFound { path, source: Box::new(source), - } - } + }, + Some(StatusCode::NOT_MODIFIED) => Self::NotModified { + path, + source: Box::new(source), + }, + Some(StatusCode::PRECONDITION_FAILED) => Self::Precondition { + path, + source: Box::new(source), + }, + _ => Self::Generic { + store: "S3", + source: Box::new(source), + }, + }, _ => Self::Generic { store: "S3", source: Box::new(err), @@ -245,11 +255,9 @@ impl S3Client { pub async fn get_request( &self, path: &Path, - range: Option>, + options: GetOptions, head: bool, ) -> Result { - use reqwest::header::RANGE; - let credential = self.get_credential().await?; let url = self.config.path_url(path); let method = match head { @@ -257,13 +265,10 @@ impl S3Client { false => Method::GET, }; - let mut builder = self.client.request(method, url); - - if let Some(range) = range { - builder = builder.header(RANGE, format_http_range(range)); - } + let builder = self.client.request(method, url); let response = builder + .with_get_options(options) .with_aws_sigv4( credential.as_ref(), &self.config.region, diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 17d779ff6a51..5b9c5a418a93 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -40,7 +40,6 @@ use itertools::Itertools; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt, Snafu}; use std::collections::BTreeSet; -use std::ops::Range; use std::str::FromStr; use std::sync::Arc; use tokio::io::AsyncWrite; @@ -57,8 +56,8 @@ use crate::client::ClientConfigKey; use crate::config::ConfigValue; use crate::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart}; use crate::{ - ClientOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path, - Result, RetryConfig, StreamExt, + ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, + ObjectStore, Path, Result, RetryConfig, StreamExt, }; mod checksum; @@ -246,8 +245,8 @@ impl ObjectStore for AmazonS3 { .await } - async fn get(&self, location: &Path) -> Result { - let response = self.client.get_request(location, None, false).await?; + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + let response = self.client.get_request(location, options, false).await?; let stream = response .bytes_stream() .map_err(|source| crate::Error::Generic { @@ -259,26 +258,13 @@ impl ObjectStore for AmazonS3 { Ok(GetResult::Stream(stream)) } - async fn get_range(&self, location: &Path, range: Range) -> Result { - let bytes = self - .client - .get_request(location, Some(range), false) - .await? - .bytes() - .await - .map_err(|source| client::Error::GetResponseBody { - source, - path: location.to_string(), - })?; - Ok(bytes) - } - async fn head(&self, location: &Path) -> Result { use reqwest::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED}; + let options = GetOptions::default(); // Extract meta from headers // https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax - let response = self.client.get_request(location, None, true).await?; + let response = self.client.get_request(location, options, true).await?; let headers = response.headers(); let last_modified = headers @@ -1169,8 +1155,8 @@ fn profile_credentials( mod tests { use super::*; use crate::tests::{ - get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter, - put_get_delete_list_opts, rename_and_copy, stream_get, + get_nonexistent_object, get_opts, list_uses_directories_correctly, + list_with_delimiter, put_get_delete_list_opts, rename_and_copy, stream_get, }; use bytes::Bytes; use std::collections::HashMap; @@ -1417,6 +1403,7 @@ mod tests { // Localstack doesn't support listing with spaces https://github.com/localstack/localstack/issues/6328 put_get_delete_list_opts(&integration, is_local).await; + get_opts(&integration).await; list_uses_directories_correctly(&integration).await; list_with_delimiter(&integration).await; rename_and_copy(&integration).await; diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index 87432f62b5cd..48cfb1909d66 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -19,11 +19,12 @@ use super::credential::{AzureCredential, CredentialProvider}; use crate::azure::credential::*; use crate::client::pagination::stream_paginated; use crate::client::retry::RetryExt; +use crate::client::GetOptionsExt; use crate::path::DELIMITER; -use crate::util::{deserialize_rfc1123, format_http_range, format_prefix}; +use crate::util::{deserialize_rfc1123, format_prefix}; use crate::{ - BoxStream, ClientOptions, ListResult, ObjectMeta, Path, Result, RetryConfig, - StreamExt, + BoxStream, ClientOptions, GetOptions, ListResult, ObjectMeta, Path, Result, + RetryConfig, StreamExt, }; use base64::prelude::BASE64_STANDARD; use base64::Engine; @@ -32,13 +33,12 @@ use chrono::{DateTime, Utc}; use itertools::Itertools; use reqwest::header::CONTENT_TYPE; use reqwest::{ - header::{HeaderValue, CONTENT_LENGTH, IF_NONE_MATCH, RANGE}, + header::{HeaderValue, CONTENT_LENGTH, IF_NONE_MATCH}, Client as ReqwestClient, Method, Response, StatusCode, }; use serde::{Deserialize, Serialize}; use snafu::{ResultExt, Snafu}; use std::collections::HashMap; -use std::ops::Range; use url::Url; /// A specialized `Error` for object store-related errors @@ -95,15 +95,24 @@ impl From for crate::Error { match err { Error::GetRequest { source, path } | Error::DeleteRequest { source, path } - | Error::CopyRequest { source, path } - | Error::PutRequest { source, path } - if matches!(source.status(), Some(StatusCode::NOT_FOUND)) => - { - Self::NotFound { + | Error::PutRequest { source, path } => match source.status() { + Some(StatusCode::NOT_FOUND) => Self::NotFound { path, source: Box::new(source), - } - } + }, + Some(StatusCode::NOT_MODIFIED) => Self::NotModified { + path, + source: Box::new(source), + }, + Some(StatusCode::PRECONDITION_FAILED) => Self::Precondition { + path, + source: Box::new(source), + }, + _ => Self::Generic { + store: "S3", + source: Box::new(source), + }, + }, Error::CopyRequest { source, path } if matches!(source.status(), Some(StatusCode::CONFLICT)) => { @@ -253,7 +262,7 @@ impl AzureClient { pub async fn get_request( &self, path: &Path, - range: Option>, + options: GetOptions, head: bool, ) -> Result { let credential = self.get_credential().await?; @@ -263,17 +272,14 @@ impl AzureClient { false => Method::GET, }; - let mut builder = self + let builder = self .client .request(method, url) .header(CONTENT_LENGTH, HeaderValue::from_static("0")) .body(Bytes::new()); - if let Some(range) = range { - builder = builder.header(RANGE, format_http_range(range)); - } - let response = builder + .with_get_options(options) .with_azure_authorization(&credential, &self.config.account) .send_retry(&self.config.retry_config) .await diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index c2cfdfe6af32..119f6a97add5 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -31,8 +31,8 @@ use crate::client::token::TokenCache; use crate::{ multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart}, path::Path, - ClientOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result, - RetryConfig, + ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, + ObjectStore, Result, RetryConfig, }; use async_trait::async_trait; use base64::prelude::BASE64_STANDARD; @@ -45,7 +45,6 @@ use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt, Snafu}; use std::fmt::{Debug, Formatter}; use std::io; -use std::ops::Range; use std::sync::Arc; use std::{collections::BTreeSet, str::FromStr}; use tokio::io::AsyncWrite; @@ -150,6 +149,7 @@ enum Error { impl From for super::Error { fn from(source: Error) -> Self { match source { + Error::UnknownConfigurationKey { key } => Self::UnknownConfigurationKey { store: "MicrosoftAzure", key, @@ -209,8 +209,8 @@ impl ObjectStore for MicrosoftAzure { Ok(()) } - async fn get(&self, location: &Path) -> Result { - let response = self.client.get_request(location, None, false).await?; + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + let response = self.client.get_request(location, options, false).await?; let stream = response .bytes_stream() .map_err(|source| crate::Error::Generic { @@ -222,26 +222,13 @@ impl ObjectStore for MicrosoftAzure { Ok(GetResult::Stream(stream)) } - async fn get_range(&self, location: &Path, range: Range) -> Result { - let bytes = self - .client - .get_request(location, Some(range), false) - .await? - .bytes() - .await - .map_err(|source| client::Error::GetResponseBody { - source, - path: location.to_string(), - })?; - Ok(bytes) - } - async fn head(&self, location: &Path) -> Result { use reqwest::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED}; + let options = GetOptions::default(); // Extract meta from headers // https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties - let response = self.client.get_request(location, None, true).await?; + let response = self.client.get_request(location, options, true).await?; let headers = response.headers(); let last_modified = headers @@ -1103,8 +1090,9 @@ fn split_sas(sas: &str) -> Result, Error> { mod tests { use super::*; use crate::tests::{ - copy_if_not_exists, list_uses_directories_correctly, list_with_delimiter, - put_get_delete_list, put_get_delete_list_opts, rename_and_copy, stream_get, + copy_if_not_exists, get_opts, list_uses_directories_correctly, + list_with_delimiter, put_get_delete_list, put_get_delete_list_opts, + rename_and_copy, stream_get, }; use std::collections::HashMap; use std::env; @@ -1175,6 +1163,7 @@ mod tests { async fn azure_blob_test() { let integration = maybe_skip_integration!().build().unwrap(); put_get_delete_list_opts(&integration, false).await; + get_opts(&integration).await; list_uses_directories_correctly(&integration).await; list_with_delimiter(&integration).await; rename_and_copy(&integration).await; @@ -1203,6 +1192,7 @@ mod tests { let integration = builder.build().unwrap(); put_get_delete_list(&integration).await; + get_opts(&integration).await; list_uses_directories_correctly(&integration).await; list_with_delimiter(&integration).await; rename_and_copy(&integration).await; diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs index aebefec61559..c639d7e89812 100644 --- a/object_store/src/chunked.rs +++ b/object_store/src/chunked.rs @@ -30,7 +30,7 @@ use tokio::io::AsyncWrite; use crate::path::Path; use crate::util::maybe_spawn_blocking; -use crate::{GetResult, ListResult, ObjectMeta, ObjectStore}; +use crate::{GetOptions, GetResult, ListResult, ObjectMeta, ObjectStore}; use crate::{MultipartId, Result}; /// Wraps a [`ObjectStore`] and makes its get response return chunks @@ -81,8 +81,8 @@ impl ObjectStore for ChunkedStore { self.inner.abort_multipart(location, multipart_id).await } - async fn get(&self, location: &Path) -> Result { - match self.inner.get(location).await? { + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + match self.inner.get_opts(location, options).await? { GetResult::File(std_file, ..) => { let reader = BufReader::new(std_file); let chunk_size = self.chunk_size; @@ -245,6 +245,7 @@ mod tests { let integration = ChunkedStore::new(Arc::clone(integration), 100); put_get_delete_list(&integration).await; + get_opts(&integration).await; list_uses_directories_correctly(&integration).await; list_with_delimiter(&integration).await; rename_and_copy(&integration).await; diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs index d2242dd41089..0b25c00f6ba4 100644 --- a/object_store/src/client/mod.rs +++ b/object_store/src/client/mod.rs @@ -27,8 +27,9 @@ pub mod retry; pub mod token; use crate::config::ConfigValue; +use crate::GetOptions; use reqwest::header::{HeaderMap, HeaderValue}; -use reqwest::{Client, ClientBuilder, Proxy}; +use reqwest::{Client, ClientBuilder, Proxy, RequestBuilder}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::str::FromStr; @@ -333,3 +334,37 @@ impl ClientOptions { .map_err(map_client_error) } } + +pub trait GetOptionsExt { + fn with_get_options(self, options: GetOptions) -> Self; +} + +impl GetOptionsExt for RequestBuilder { + fn with_get_options(mut self, options: GetOptions) -> Self { + use hyper::header::*; + + if let Some(range) = options.range { + let range = format!("bytes={}-{}", range.start, range.end.saturating_sub(1)); + self = self.header(RANGE, range); + } + + if let Some(tag) = options.if_match { + self = self.header(IF_MATCH, tag); + } + + if let Some(tag) = options.if_none_match { + self = self.header(IF_NONE_MATCH, tag); + } + + const DATE_FORMAT: &str = "%a, %d %b %Y %H:%M:%S GMT"; + if let Some(date) = options.if_unmodified_since { + self = self.header(IF_UNMODIFIED_SINCE, date.format(DATE_FORMAT).to_string()); + } + + if let Some(date) = options.if_modified_since { + self = self.header(IF_MODIFIED_SINCE, date.format(DATE_FORMAT).to_string()); + } + + self + } +} diff --git a/object_store/src/client/retry.rs b/object_store/src/client/retry.rs index f9c2dd30088d..c2ea0e751f63 100644 --- a/object_store/src/client/retry.rs +++ b/object_store/src/client/retry.rs @@ -32,6 +32,7 @@ pub struct Error { retries: usize, message: String, source: Option, + status: Option, } impl std::fmt::Display for Error { @@ -57,7 +58,7 @@ impl std::error::Error for Error { impl Error { /// Returns the status code associated with this error if any pub fn status(&self) -> Option { - self.source.as_ref().and_then(|e| e.status()) + self.status } } @@ -146,6 +147,14 @@ impl RetryExt for reqwest::RequestBuilder { match s.send().await { Ok(r) => match r.error_for_status_ref() { Ok(_) if r.status().is_success() => return Ok(r), + Ok(r) if r.status() == StatusCode::NOT_MODIFIED => { + return Err(Error{ + message: "not modified".to_string(), + retries, + status: Some(r.status()), + source: None, + }) + } Ok(r) => { let is_bare_redirect = r.status().is_redirection() && !r.headers().contains_key(LOCATION); let message = match is_bare_redirect { @@ -157,6 +166,7 @@ impl RetryExt for reqwest::RequestBuilder { return Err(Error{ message, retries, + status: Some(r.status()), source: None, }) } @@ -180,6 +190,7 @@ impl RetryExt for reqwest::RequestBuilder { return Err(Error{ message, retries, + status: Some(status), source: Some(e), }) @@ -209,7 +220,8 @@ impl RetryExt for reqwest::RequestBuilder { return Err(Error{ retries, message: "request error".to_string(), - source: Some(e) + status: e.status(), + source: Some(e), }) } let sleep = backoff.next(); diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 375b4d8f8c37..63c5dc87b687 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -31,7 +31,6 @@ //! week. use std::collections::BTreeSet; use std::io; -use std::ops::Range; use std::str::FromStr; use std::sync::Arc; @@ -40,7 +39,6 @@ use bytes::{Buf, Bytes}; use chrono::{DateTime, Utc}; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use percent_encoding::{percent_encode, NON_ALPHANUMERIC}; -use reqwest::header::RANGE; use reqwest::{header, Client, Method, Response, StatusCode}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt, Snafu}; @@ -49,14 +47,14 @@ use url::Url; use crate::client::pagination::stream_paginated; use crate::client::retry::RetryExt; -use crate::client::ClientConfigKey; +use crate::client::{ClientConfigKey, GetOptionsExt}; use crate::{ client::token::TokenCache, multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart}, path::{Path, DELIMITER}, - util::{format_http_range, format_prefix}, - ClientOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result, - RetryConfig, + util::format_prefix, + ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, + ObjectStore, Result, RetryConfig, }; use self::credential::{ @@ -159,14 +157,24 @@ impl From for super::Error { match err { Error::GetRequest { source, path } | Error::DeleteRequest { source, path } - | Error::CopyRequest { source, path } - if matches!(source.status(), Some(StatusCode::NOT_FOUND)) => - { - Self::NotFound { + | Error::CopyRequest { source, path } => match source.status() { + Some(StatusCode::NOT_FOUND) => Self::NotFound { path, source: Box::new(source), - } - } + }, + Some(StatusCode::NOT_MODIFIED) => Self::NotModified { + path, + source: Box::new(source), + }, + Some(StatusCode::PRECONDITION_FAILED) => Self::Precondition { + path, + source: Box::new(source), + }, + _ => Self::Generic { + store: "GCS", + source: Box::new(source), + }, + }, Error::AlreadyExists { source, path } => Self::AlreadyExists { source: Box::new(source), path, @@ -280,26 +288,23 @@ impl GoogleCloudStorageClient { async fn get_request( &self, path: &Path, - range: Option>, + options: GetOptions, head: bool, ) -> Result { let token = self.get_token().await?; let url = self.object_url(path); - let mut builder = self.client.request(Method::GET, url); - - if let Some(range) = range { - builder = builder.header(RANGE, format_http_range(range)); - } - let alt = match head { true => "json", false => "media", }; + let builder = self.client.request(Method::GET, url); + let response = builder .bearer_auth(token) .query(&[("alt", alt)]) + .with_get_options(options) .send_retry(&self.retry_config) .await .context(GetRequestSnafu { @@ -667,8 +672,14 @@ impl ObjectStore for GoogleCloudStorage { Ok(()) } - async fn get(&self, location: &Path) -> Result { - let response = self.client.get_request(location, None, false).await?; + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + if options.if_modified_since.is_some() || options.if_unmodified_since.is_some() { + return Err(super::Error::NotSupported { + source: "ModifiedSince Preconditions not supported by GoogleCloudStorage JSON API".to_string().into(), + }); + } + + let response = self.client.get_request(location, options, false).await?; let stream = response .bytes_stream() .map_err(|source| crate::Error::Generic { @@ -680,18 +691,9 @@ impl ObjectStore for GoogleCloudStorage { Ok(GetResult::Stream(stream)) } - async fn get_range(&self, location: &Path, range: Range) -> Result { - let response = self - .client - .get_request(location, Some(range), false) - .await?; - Ok(response.bytes().await.context(GetResponseBodySnafu { - path: location.as_ref(), - })?) - } - async fn head(&self, location: &Path) -> Result { - let response = self.client.get_request(location, None, true).await?; + let options = GetOptions::default(); + let response = self.client.get_request(location, options, true).await?; let object = response.json().await.context(GetResponseBodySnafu { path: location.as_ref(), })?; @@ -1224,13 +1226,7 @@ mod test { use std::io::Write; use tempfile::NamedTempFile; - use crate::{ - tests::{ - copy_if_not_exists, get_nonexistent_object, list_uses_directories_correctly, - list_with_delimiter, put_get_delete_list, rename_and_copy, stream_get, - }, - Error as ObjectStoreError, ObjectStore, - }; + use crate::tests::*; use super::*; @@ -1299,6 +1295,8 @@ mod test { // Fake GCS server does not yet implement XML Multipart uploads // https://github.com/fsouza/fake-gcs-server/issues/852 stream_get(&integration).await; + // Fake GCS server doesn't currently honor preconditions + get_opts(&integration).await; } } @@ -1311,7 +1309,7 @@ mod test { let err = integration.get(&location).await.unwrap_err(); assert!( - matches!(err, ObjectStoreError::NotFound { .. }), + matches!(err, crate::Error::NotFound { .. }), "unexpected error type: {err}" ); } @@ -1330,7 +1328,7 @@ mod test { .unwrap_err(); assert!( - matches!(err, ObjectStoreError::NotFound { .. }), + matches!(err, crate::Error::NotFound { .. }), "unexpected error type: {err}" ); } @@ -1343,7 +1341,7 @@ mod test { let err = integration.delete(&location).await.unwrap_err(); assert!( - matches!(err, ObjectStoreError::NotFound { .. }), + matches!(err, crate::Error::NotFound { .. }), "unexpected error type: {err}" ); } @@ -1359,7 +1357,7 @@ mod test { let err = integration.delete(&location).await.unwrap_err(); assert!( - matches!(err, ObjectStoreError::NotFound { .. }), + matches!(err, crate::Error::NotFound { .. }), "unexpected error type: {err}" ); } diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs index 5ef272180abc..4e58eb0b2927 100644 --- a/object_store/src/http/client.rs +++ b/object_store/src/http/client.rs @@ -16,17 +16,17 @@ // under the License. use crate::client::retry::{self, RetryConfig, RetryExt}; +use crate::client::GetOptionsExt; use crate::path::{Path, DELIMITER}; -use crate::util::{deserialize_rfc1123, format_http_range}; -use crate::{ClientOptions, ObjectMeta, Result}; +use crate::util::deserialize_rfc1123; +use crate::{ClientOptions, GetOptions, ObjectMeta, Result}; use bytes::{Buf, Bytes}; use chrono::{DateTime, Utc}; use percent_encoding::percent_decode_str; -use reqwest::header::{CONTENT_TYPE, RANGE}; +use reqwest::header::CONTENT_TYPE; use reqwest::{Method, Response, StatusCode}; use serde::Deserialize; use snafu::{OptionExt, ResultExt, Snafu}; -use std::ops::Range; use url::Url; #[derive(Debug, Snafu)] @@ -229,19 +229,12 @@ impl Client { Ok(()) } - pub async fn get( - &self, - location: &Path, - range: Option>, - ) -> Result { + pub async fn get(&self, location: &Path, options: GetOptions) -> Result { let url = self.path_url(location); - let mut builder = self.client.get(url); - - if let Some(range) = range { - builder = builder.header(RANGE, format_http_range(range)); - } + let builder = self.client.get(url); builder + .with_get_options(options) .send_retry(&self.retry_config) .await .map_err(|source| match source.status() { diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs index c91faa2358ac..bed19722c83a 100644 --- a/object_store/src/http/mod.rs +++ b/object_store/src/http/mod.rs @@ -31,8 +31,6 @@ //! [rfc2518]: https://datatracker.ietf.org/doc/html/rfc2518 //! [WebDAV]: https://en.wikipedia.org/wiki/WebDAV -use std::ops::Range; - use async_trait::async_trait; use bytes::Bytes; use futures::stream::BoxStream; @@ -45,8 +43,8 @@ use url::Url; use crate::http::client::Client; use crate::path::Path; use crate::{ - ClientOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result, - RetryConfig, + ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, + ObjectStore, Result, RetryConfig, }; mod client; @@ -119,8 +117,8 @@ impl ObjectStore for HttpStore { Err(super::Error::NotImplemented) } - async fn get(&self, location: &Path) -> Result { - let response = self.client.get(location, None).await?; + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + let response = self.client.get(location, options).await?; let stream = response .bytes_stream() .map_err(|source| Error::Reqwest { source }.into()) @@ -129,17 +127,6 @@ impl ObjectStore for HttpStore { Ok(GetResult::Stream(stream)) } - async fn get_range(&self, location: &Path, range: Range) -> Result { - let bytes = self - .client - .get(location, Some(range)) - .await? - .bytes() - .await - .context(ReqwestSnafu)?; - Ok(bytes) - } - async fn head(&self, location: &Path) -> Result { let status = self.client.list(Some(location), "0").await?; match status.response.len() { diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 1390a0140d1c..f6ddc8030283 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -343,11 +343,24 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { } /// Return the bytes that are stored at the specified location. - async fn get(&self, location: &Path) -> Result; + async fn get(&self, location: &Path) -> Result { + self.get_opts(location, GetOptions::default()).await + } + + /// Perform a get request with options + /// + /// Note: options.range will be ignored if [`GetResult::File`] + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result; /// Return the bytes that are stored at the specified location /// in the given byte range - async fn get_range(&self, location: &Path, range: Range) -> Result; + async fn get_range(&self, location: &Path, range: Range) -> Result { + let options = GetOptions { + range: Some(range), + ..Default::default() + }; + self.get_opts(location, options).await?.bytes().await + } /// Return the bytes that are stored at the specified location /// in the given byte ranges @@ -475,6 +488,10 @@ impl ObjectStore for Box { self.as_ref().get(location).await } + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + self.as_ref().get_opts(location, options).await + } + async fn get_range(&self, location: &Path, range: Range) -> Result { self.as_ref().get_range(location, range).await } @@ -555,6 +572,62 @@ pub struct ObjectMeta { pub e_tag: Option, } +/// Options for a get request +#[derive(Debug, Default)] +pub struct GetOptions { + /// Request will succeed if the ETag matches + /// + /// + pub if_match: Option, + /// Request will succeed if the ETag does not match + /// + /// + pub if_none_match: Option, + /// Request will succeed if the object has been modified since + /// + /// + pub if_modified_since: Option>, + /// Request will succeed if the object has not been modified since + /// + /// Some stores, such as S3, will only return `NotModified` for exact + /// timestamp matches, instead of for any timestamp greater than or equal. + /// + /// + pub if_unmodified_since: Option>, + /// Request transfer of only the specified range of bytes + /// + /// + pub range: Option>, +} + +impl GetOptions { + /// Returns an error if the modification conditions on this request are not satisfied + fn check_modified( + &self, + location: &Path, + last_modified: DateTime, + ) -> Result<()> { + if let Some(date) = self.if_modified_since { + if last_modified <= date { + return Err(Error::NotModified { + path: location.to_string(), + source: format!("{} >= {}", date, last_modified).into(), + }); + } + } + + if let Some(date) = self.if_unmodified_since { + if last_modified > date { + return Err(Error::Precondition { + path: location.to_string(), + source: format!("{} < {}", date, last_modified).into(), + }); + } + } + Ok(()) + } +} + /// Result for a get request /// /// This special cases the case of a local file, as some systems may @@ -699,6 +772,18 @@ pub enum Error { source: Box, }, + #[snafu(display("Request precondition failure for path {}: {}", path, source))] + Precondition { + path: String, + source: Box, + }, + + #[snafu(display("Object at location {} not modified: {}", path, source))] + NotModified { + path: String, + source: Box, + }, + #[snafu(display("Operation not yet implemented."))] NotImplemented, @@ -1022,6 +1107,85 @@ mod tests { delete_fixtures(storage).await; } + pub(crate) async fn get_opts(storage: &dyn ObjectStore) { + let path = Path::from("test"); + storage.put(&path, "foo".into()).await.unwrap(); + let meta = storage.head(&path).await.unwrap(); + + let options = GetOptions { + if_unmodified_since: Some(meta.last_modified), + ..GetOptions::default() + }; + match storage.get_opts(&path, options).await { + Ok(_) | Err(Error::NotSupported { .. }) => {} + Err(e) => panic!("{e}"), + } + + let options = GetOptions { + if_unmodified_since: Some(meta.last_modified + chrono::Duration::hours(10)), + ..GetOptions::default() + }; + match storage.get_opts(&path, options).await { + Ok(_) | Err(Error::NotSupported { .. }) => {} + Err(e) => panic!("{e}"), + } + + let options = GetOptions { + if_unmodified_since: Some(meta.last_modified - chrono::Duration::hours(10)), + ..GetOptions::default() + }; + match storage.get_opts(&path, options).await { + Err(Error::Precondition { .. } | Error::NotSupported { .. }) => {} + d => panic!("{d:?}"), + } + + let options = GetOptions { + if_modified_since: Some(meta.last_modified), + ..GetOptions::default() + }; + match storage.get_opts(&path, options).await { + Err(Error::NotModified { .. } | Error::NotSupported { .. }) => {} + d => panic!("{d:?}"), + } + + let options = GetOptions { + if_modified_since: Some(meta.last_modified - chrono::Duration::hours(10)), + ..GetOptions::default() + }; + match storage.get_opts(&path, options).await { + Ok(_) | Err(Error::NotSupported { .. }) => {} + Err(e) => panic!("{e}"), + } + + if let Some(tag) = meta.e_tag { + let options = GetOptions { + if_match: Some(tag.clone()), + ..GetOptions::default() + }; + storage.get_opts(&path, options).await.unwrap(); + + let options = GetOptions { + if_match: Some("invalid".to_string()), + ..GetOptions::default() + }; + let err = storage.get_opts(&path, options).await.unwrap_err(); + assert!(matches!(err, Error::Precondition { .. }), "{err}"); + + let options = GetOptions { + if_none_match: Some(tag.clone()), + ..GetOptions::default() + }; + let err = storage.get_opts(&path, options).await.unwrap_err(); + assert!(matches!(err, Error::NotModified { .. }), "{err}"); + + let options = GetOptions { + if_none_match: Some("invalid".to_string()), + ..GetOptions::default() + }; + storage.get_opts(&path, options).await.unwrap(); + } + } + fn get_vec_of_bytes(chunk_length: usize, num_chunks: usize) -> Vec { std::iter::repeat(Bytes::from_iter(std::iter::repeat(b'x').take(chunk_length))) .take(num_chunks) diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs index d0d9f73c5c59..e0091115d8f6 100644 --- a/object_store/src/limit.rs +++ b/object_store/src/limit.rs @@ -18,8 +18,8 @@ //! An object store that limits the maximum concurrency of the wrapped implementation use crate::{ - BoxStream, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path, Result, - StreamExt, + BoxStream, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, + Path, Result, StreamExt, }; use async_trait::async_trait; use bytes::Bytes; @@ -114,6 +114,16 @@ impl ObjectStore for LimitStore { } } + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + let permit = Arc::clone(&self.semaphore).acquire_owned().await.unwrap(); + match self.inner.get_opts(location, options).await? { + r @ GetResult::File(_, _) => Ok(r), + GetResult::Stream(s) => { + Ok(GetResult::Stream(PermitWrapper::new(s, permit).boxed())) + } + } + } + async fn get_range(&self, location: &Path, range: Range) -> Result { let _permit = self.semaphore.acquire().await.unwrap(); self.inner.get_range(location, range).await @@ -251,10 +261,7 @@ impl AsyncWrite for PermitWrapper { mod tests { use crate::limit::LimitStore; use crate::memory::InMemory; - use crate::tests::{ - list_uses_directories_correctly, list_with_delimiter, put_get_delete_list, - rename_and_copy, stream_get, - }; + use crate::tests::*; use crate::ObjectStore; use std::time::Duration; use tokio::time::timeout; @@ -266,6 +273,7 @@ mod tests { let integration = LimitStore::new(memory, max_requests); put_get_delete_list(&integration).await; + get_opts(&integration).await; list_uses_directories_correctly(&integration).await; list_with_delimiter(&integration).await; rename_and_copy(&integration).await; diff --git a/object_store/src/local.rs b/object_store/src/local.rs index b40f5a777860..26a8bf336873 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -19,7 +19,7 @@ use crate::{ maybe_spawn_blocking, path::{absolute_path_to_url, Path}, - GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result, + GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result, }; use async_trait::async_trait; use bytes::Bytes; @@ -56,7 +56,7 @@ pub(crate) enum Error { }, #[snafu(display("Unable to access metadata for {}: {}", path, source))] - UnableToAccessMetadata { + Metadata { source: Box, path: String, }, @@ -360,10 +360,27 @@ impl ObjectStore for LocalFileSystem { Err(super::Error::NotImplemented) } - async fn get(&self, location: &Path) -> Result { - let path = self.config.path_to_filesystem(location)?; + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + if options.if_match.is_some() || options.if_none_match.is_some() { + return Err(super::Error::NotSupported { + source: "ETags not supported by LocalFileSystem".to_string().into(), + }); + } + + let location = location.clone(); + let path = self.config.path_to_filesystem(&location)?; maybe_spawn_blocking(move || { let file = open_file(&path)?; + if options.if_unmodified_since.is_some() + || options.if_modified_since.is_some() + { + let metadata = file.metadata().map_err(|e| Error::Metadata { + source: e.into(), + path: location.to_string(), + })?; + options.check_modified(&location, last_modified(&metadata))?; + } + Ok(GetResult::File(file, path)) }) .await @@ -408,7 +425,7 @@ impl ObjectStore for LocalFileSystem { source: e, } } else { - Error::UnableToAccessMetadata { + Error::Metadata { source: e.into(), path: location.to_string(), } @@ -878,21 +895,22 @@ fn open_file(path: &PathBuf) -> Result { } fn convert_entry(entry: DirEntry, location: Path) -> Result { - let metadata = entry - .metadata() - .map_err(|e| Error::UnableToAccessMetadata { - source: e.into(), - path: location.to_string(), - })?; + let metadata = entry.metadata().map_err(|e| Error::Metadata { + source: e.into(), + path: location.to_string(), + })?; convert_metadata(metadata, location) } -fn convert_metadata(metadata: std::fs::Metadata, location: Path) -> Result { - let last_modified: DateTime = metadata +fn last_modified(metadata: &std::fs::Metadata) -> DateTime { + metadata .modified() .expect("Modified file time should be supported on this platform") - .into(); + .into() +} +fn convert_metadata(metadata: std::fs::Metadata, location: Path) -> Result { + let last_modified = last_modified(&metadata); let size = usize::try_from(metadata.len()).context(FileSizeOverflowedUsizeSnafu { path: location.as_ref(), })?; @@ -956,13 +974,7 @@ fn convert_walkdir_result( mod tests { use super::*; use crate::test_util::flatten_list_stream; - use crate::{ - tests::{ - copy_if_not_exists, get_nonexistent_object, list_uses_directories_correctly, - list_with_delimiter, put_get_delete_list, rename_and_copy, stream_get, - }, - Error as ObjectStoreError, ObjectStore, - }; + use crate::tests::*; use futures::TryStreamExt; use tempfile::{NamedTempFile, TempDir}; use tokio::io::AsyncWriteExt; @@ -973,6 +985,7 @@ mod tests { let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap(); put_get_delete_list(&integration).await; + get_opts(&integration).await; list_uses_directories_correctly(&integration).await; list_with_delimiter(&integration).await; rename_and_copy(&integration).await; @@ -1085,7 +1098,7 @@ mod tests { let err = get_nonexistent_object(&integration, Some(location)) .await .unwrap_err(); - if let ObjectStoreError::NotFound { path, source } = err { + if let crate::Error::NotFound { path, source } = err { let source_variant = source.downcast_ref::(); assert!( matches!(source_variant, Some(std::io::Error { .. }),), diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index b01ffbb02495..82d485997e88 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -16,8 +16,8 @@ // under the License. //! An in-memory object store implementation -use crate::MultipartId; use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result}; +use crate::{GetOptions, MultipartId}; use async_trait::async_trait; use bytes::Bytes; use chrono::{DateTime, Utc}; @@ -128,12 +128,17 @@ impl ObjectStore for InMemory { })) } - async fn get(&self, location: &Path) -> Result { - let data = self.entry(location).await?; + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + if options.if_match.is_some() || options.if_none_match.is_some() { + return Err(super::Error::NotSupported { + source: "ETags not supported by InMemory".to_string().into(), + }); + } + let (data, last_modified) = self.entry(location).await?; + options.check_modified(location, last_modified)?; - Ok(GetResult::Stream( - futures::stream::once(async move { Ok(data.0) }).boxed(), - )) + let stream = futures::stream::once(futures::future::ready(Ok(data))); + Ok(GetResult::Stream(stream.boxed())) } async fn get_range(&self, location: &Path, range: Range) -> Result { @@ -391,19 +396,14 @@ mod tests { use super::*; - use crate::{ - tests::{ - copy_if_not_exists, get_nonexistent_object, list_uses_directories_correctly, - list_with_delimiter, put_get_delete_list, rename_and_copy, stream_get, - }, - Error as ObjectStoreError, ObjectStore, - }; + use crate::tests::*; #[tokio::test] async fn in_memory_test() { let integration = InMemory::new(); put_get_delete_list(&integration).await; + get_opts(&integration).await; list_uses_directories_correctly(&integration).await; list_with_delimiter(&integration).await; rename_and_copy(&integration).await; @@ -443,7 +443,7 @@ mod tests { let err = get_nonexistent_object(&integration, Some(location)) .await .unwrap_err(); - if let ObjectStoreError::NotFound { path, source } = err { + if let crate::Error::NotFound { path, source } = err { let source_variant = source.downcast_ref::(); assert!( matches!(source_variant, Some(Error::NoDataInMemory { .. }),), diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs index eba379553733..45464948e951 100644 --- a/object_store/src/prefix.rs +++ b/object_store/src/prefix.rs @@ -23,7 +23,7 @@ use tokio::io::AsyncWrite; use crate::path::Path; use crate::{ - GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, + GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result as ObjectStoreResult, }; @@ -93,6 +93,15 @@ impl ObjectStore for PrefixStore { self.inner.get_range(&full_path, range).await } + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> ObjectStoreResult { + let full_path = self.full_path(location); + self.inner.get_opts(&full_path, options).await + } + async fn get_ranges( &self, location: &Path, @@ -215,10 +224,7 @@ mod tests { use super::*; use crate::local::LocalFileSystem; use crate::test_util::flatten_list_stream; - use crate::tests::{ - copy_if_not_exists, list_uses_directories_correctly, list_with_delimiter, - put_get_delete_list, rename_and_copy, stream_get, - }; + use crate::tests::*; use tempfile::TempDir; @@ -229,6 +235,7 @@ mod tests { let integration = PrefixStore::new(inner, "prefix"); put_get_delete_list(&integration).await; + get_opts(&integration).await; list_uses_directories_correctly(&integration).await; list_with_delimiter(&integration).await; rename_and_copy(&integration).await; diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs index e51303114788..fb90afcec9fb 100644 --- a/object_store/src/throttle.rs +++ b/object_store/src/throttle.rs @@ -20,8 +20,8 @@ use parking_lot::Mutex; use std::ops::Range; use std::{convert::TryInto, sync::Arc}; -use crate::MultipartId; use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result}; +use crate::{GetOptions, MultipartId}; use async_trait::async_trait; use bytes::Bytes; use futures::{stream::BoxStream, FutureExt, StreamExt}; @@ -179,17 +179,18 @@ impl ObjectStore for ThrottledStore { // need to copy to avoid moving / referencing `self` let wait_get_per_byte = self.config().wait_get_per_byte; - self.inner.get(location).await.map(|result| { - let s = match result { - GetResult::Stream(s) => s, - GetResult::File(_, _) => unimplemented!(), - }; + let result = self.inner.get(location).await?; + Ok(throttle_get(result, wait_get_per_byte)) + } - GetResult::Stream(throttle_stream(s, move |bytes| { - let bytes_len: u32 = usize_to_u32_saturate(bytes.len()); - wait_get_per_byte * bytes_len - })) - }) + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + sleep(self.config().wait_get_per_call).await; + + // need to copy to avoid moving / referencing `self` + let wait_get_per_byte = self.config().wait_get_per_byte; + + let result = self.inner.get_opts(location, options).await?; + Ok(throttle_get(result, wait_get_per_byte)) } async fn get_range(&self, location: &Path, range: Range) -> Result { @@ -299,6 +300,18 @@ fn usize_to_u32_saturate(x: usize) -> u32 { x.try_into().unwrap_or(u32::MAX) } +fn throttle_get(result: GetResult, wait_get_per_byte: Duration) -> GetResult { + let s = match result { + GetResult::Stream(s) => s, + GetResult::File(_, _) => unimplemented!(), + }; + + GetResult::Stream(throttle_stream(s, move |bytes| { + let bytes_len: u32 = usize_to_u32_saturate(bytes.len()); + wait_get_per_byte * bytes_len + })) +} + fn throttle_stream( stream: BoxStream<'_, Result>, delay: F, @@ -317,13 +330,7 @@ where #[cfg(test)] mod tests { use super::*; - use crate::{ - memory::InMemory, - tests::{ - copy_if_not_exists, list_uses_directories_correctly, list_with_delimiter, - put_get_delete_list, rename_and_copy, - }, - }; + use crate::{memory::InMemory, tests::*}; use bytes::Bytes; use futures::TryStreamExt; use tokio::time::Duration; diff --git a/object_store/src/util.rs b/object_store/src/util.rs index e5c701dd8b1b..ba4c68345d73 100644 --- a/object_store/src/util.rs +++ b/object_store/src/util.rs @@ -44,13 +44,6 @@ pub fn format_prefix(prefix: Option<&crate::path::Path>) -> Option { .map(|p| format!("{}{}", p.as_ref(), crate::path::DELIMITER)) } -/// Returns a formatted HTTP range header as per -/// -#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))] -pub fn format_http_range(range: std::ops::Range) -> String { - format!("bytes={}-{}", range.start, range.end.saturating_sub(1)) -} - #[cfg(any(feature = "aws", feature = "azure"))] pub(crate) fn hmac_sha256( secret: impl AsRef<[u8]>,