diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index 65c78df18466..df43ae3bf76a 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -95,8 +95,9 @@ jobs: - uses: actions/checkout@v3 - name: Configure Fake GCS Server (GCP emulation) + # Custom image - see fsouza/fake-gcs-server#1164 run: | - docker run -d -p 4443:4443 fsouza/fake-gcs-server -scheme http + docker run -d -p 4443:4443 tustvold/fake-gcs-server -scheme http -backend memory -public-host localhost:4443 # Give the container a moment to start up prior to configuring it sleep 1 curl -v -X POST --data-binary '{"name":"test-bucket"}' -H "Content-Type: application/json" "http://localhost:4443/storage/v1/b" diff --git a/object_store/CONTRIBUTING.md b/object_store/CONTRIBUTING.md index 550640d931b4..47c294022659 100644 --- a/object_store/CONTRIBUTING.md +++ b/object_store/CONTRIBUTING.md @@ -103,7 +103,7 @@ To test the GCS integration, we use [Fake GCS Server](https://github.com/fsouza/ Startup the fake server: ```shell -docker run -p 4443:4443 fsouza/fake-gcs-server -scheme http +docker run -p 4443:4443 tustvold/fake-gcs-server -scheme http ``` Configure the account: diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml index e25801b6c92d..c6b89fa23186 100644 --- a/object_store/Cargo.toml +++ b/object_store/Cargo.toml @@ -68,7 +68,7 @@ tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", "io-ut nix = "0.26.1" [features] -cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/json","reqwest/stream", "chrono/serde", "base64", "rand", "ring"] +cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"] azure = ["cloud"] gcp = ["cloud", "rustls-pemfile"] aws = ["cloud"] diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index b2d01abfb6f3..1cdf785e5f4d 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -18,6 +18,7 @@ use crate::aws::checksum::Checksum; use crate::aws::credential::{AwsCredential, CredentialExt, CredentialProvider}; use crate::aws::{STORE, STRICT_PATH_ENCODE_SET}; +use crate::client::list::ListResponse; use crate::client::pagination::stream_paginated; use crate::client::retry::RetryExt; use crate::client::GetOptionsExt; @@ -25,13 +26,12 @@ use crate::multipart::UploadPart; use crate::path::DELIMITER; use crate::util::format_prefix; use crate::{ - BoxStream, ClientOptions, GetOptions, ListResult, MultipartId, ObjectMeta, Path, - Result, RetryConfig, StreamExt, + BoxStream, ClientOptions, GetOptions, ListResult, MultipartId, Path, Result, + RetryConfig, StreamExt, }; use base64::prelude::BASE64_STANDARD; use base64::Engine; use bytes::{Buf, Bytes}; -use chrono::{DateTime, Utc}; use percent_encoding::{utf8_percent_encode, PercentEncode}; use reqwest::{header::CONTENT_TYPE, Client as ReqwestClient, Method, Response}; use serde::{Deserialize, Serialize}; @@ -109,69 +109,6 @@ impl From for crate::Error { } } -#[derive(Debug, Deserialize)] -#[serde(rename_all = "PascalCase")] -pub struct ListResponse { - #[serde(default)] - pub contents: Vec, - #[serde(default)] - pub common_prefixes: Vec, - #[serde(default)] - pub next_continuation_token: Option, -} - -impl TryFrom for ListResult { - type Error = crate::Error; - - fn try_from(value: ListResponse) -> Result { - let common_prefixes = value - .common_prefixes - .into_iter() - .map(|x| Ok(Path::parse(x.prefix)?)) - .collect::>()?; - - let objects = value - .contents - .into_iter() - .map(TryFrom::try_from) - .collect::>()?; - - Ok(Self { - common_prefixes, - objects, - }) - } -} - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "PascalCase")] -pub struct ListPrefix { - pub prefix: String, -} - -#[derive(Debug, Deserialize)] -#[serde(rename_all = "PascalCase")] -pub struct ListContents { - pub key: String, - pub size: usize, - pub last_modified: DateTime, - #[serde(rename = "ETag")] - pub e_tag: Option, -} - -impl TryFrom for ObjectMeta { - type Error = crate::Error; - - fn try_from(value: ListContents) -> Result { - Ok(Self { - location: Path::parse(value.key)?, - last_modified: value.last_modified, - size: value.size, - e_tag: value.e_tag, - }) - } -} - #[derive(Debug, Deserialize)] #[serde(rename_all = "PascalCase")] struct InitiateMultipart { diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 3f9b4803fe7d..2c38a9b712c2 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -33,7 +33,6 @@ use async_trait::async_trait; use bytes::Bytes; -use chrono::{DateTime, Utc}; use futures::stream::BoxStream; use futures::TryStreamExt; use itertools::Itertools; @@ -52,6 +51,7 @@ use crate::aws::credential::{ AwsCredential, CredentialProvider, InstanceCredentialProvider, StaticCredentialProvider, WebIdentityProvider, }; +use crate::client::header::header_meta; use crate::client::ClientConfigKey; use crate::config::ConfigValue; use crate::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart}; @@ -87,24 +87,6 @@ static METADATA_ENDPOINT: &str = "http://169.254.169.254"; #[derive(Debug, Snafu)] #[allow(missing_docs)] enum Error { - #[snafu(display("Last-Modified Header missing from response"))] - MissingLastModified, - - #[snafu(display("Content-Length Header missing from response"))] - MissingContentLength, - - #[snafu(display("Invalid last modified '{}': {}", last_modified, source))] - InvalidLastModified { - last_modified: String, - source: chrono::ParseError, - }, - - #[snafu(display("Invalid content length '{}': {}", content_length, source))] - InvalidContentLength { - content_length: String, - source: std::num::ParseIntError, - }, - #[snafu(display("Missing region"))] MissingRegion, @@ -155,6 +137,11 @@ enum Error { #[snafu(display("Failed to parse the region for bucket '{}'", bucket))] RegionParse { bucket: String }, + + #[snafu(display("Failed to parse headers: {}", source))] + Header { + source: crate::client::header::Error, + }, } impl From for super::Error { @@ -261,41 +248,11 @@ impl ObjectStore for AmazonS3 { } async fn head(&self, location: &Path) -> Result { - use reqwest::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED}; - let options = GetOptions::default(); // Extract meta from headers // https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax let response = self.client.get_request(location, options, true).await?; - let headers = response.headers(); - - let last_modified = headers - .get(LAST_MODIFIED) - .context(MissingLastModifiedSnafu)?; - - let content_length = headers - .get(CONTENT_LENGTH) - .context(MissingContentLengthSnafu)?; - - let last_modified = last_modified.to_str().context(BadHeaderSnafu)?; - let last_modified = DateTime::parse_from_rfc2822(last_modified) - .context(InvalidLastModifiedSnafu { last_modified })? - .with_timezone(&Utc); - - let content_length = content_length.to_str().context(BadHeaderSnafu)?; - let content_length = content_length - .parse() - .context(InvalidContentLengthSnafu { content_length })?; - - let e_tag = headers.get(ETAG).context(MissingEtagSnafu)?; - let e_tag = e_tag.to_str().context(BadHeaderSnafu)?; - - Ok(ObjectMeta { - location: location.clone(), - last_modified, - size: content_length, - e_tag: Some(e_tag.to_string()), - }) + Ok(header_meta(location, response.headers()).context(HeaderSnafu)?) } async fn delete(&self, location: &Path) -> Result<()> { diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 6726241aa868..0f8dae00c6c0 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -38,7 +38,6 @@ use async_trait::async_trait; use base64::prelude::BASE64_STANDARD; use base64::Engine; use bytes::Bytes; -use chrono::{TimeZone, Utc}; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use percent_encoding::percent_decode_str; use serde::{Deserialize, Serialize}; @@ -50,9 +49,9 @@ use std::{collections::BTreeSet, str::FromStr}; use tokio::io::AsyncWrite; use url::Url; +use crate::client::header::header_meta; use crate::client::ClientConfigKey; use crate::config::ConfigValue; -use crate::util::RFC1123_FMT; pub use credential::authority_hosts; mod client; @@ -75,24 +74,6 @@ const MSI_ENDPOINT_ENV_KEY: &str = "IDENTITY_ENDPOINT"; #[derive(Debug, Snafu)] #[allow(missing_docs)] enum Error { - #[snafu(display("Last-Modified Header missing from response"))] - MissingLastModified, - - #[snafu(display("Content-Length Header missing from response"))] - MissingContentLength, - - #[snafu(display("Invalid last modified '{}': {}", last_modified, source))] - InvalidLastModified { - last_modified: String, - source: chrono::ParseError, - }, - - #[snafu(display("Invalid content length '{}': {}", content_length, source))] - InvalidContentLength { - content_length: String, - source: std::num::ParseIntError, - }, - #[snafu(display("Received header containing non-ASCII data"))] BadHeader { source: reqwest::header::ToStrError }, @@ -146,6 +127,11 @@ enum Error { #[snafu(display("ETag Header missing from response"))] MissingEtag, + + #[snafu(display("Failed to parse headers: {}", source))] + Header { + source: crate::client::header::Error, + }, } impl From for super::Error { @@ -223,44 +209,12 @@ impl ObjectStore for MicrosoftAzure { } async fn head(&self, location: &Path) -> Result { - use reqwest::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED}; let options = GetOptions::default(); // Extract meta from headers // https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties let response = self.client.get_request(location, options, true).await?; - let headers = response.headers(); - - let last_modified = headers - .get(LAST_MODIFIED) - .ok_or(Error::MissingLastModified)? - .to_str() - .context(BadHeaderSnafu)?; - let last_modified = Utc - .datetime_from_str(last_modified, RFC1123_FMT) - .context(InvalidLastModifiedSnafu { last_modified })?; - - let content_length = headers - .get(CONTENT_LENGTH) - .ok_or(Error::MissingContentLength)? - .to_str() - .context(BadHeaderSnafu)?; - let content_length = content_length - .parse() - .context(InvalidContentLengthSnafu { content_length })?; - - let e_tag = headers - .get(ETAG) - .ok_or(Error::MissingEtag)? - .to_str() - .context(BadHeaderSnafu)?; - - Ok(ObjectMeta { - location: location.clone(), - last_modified, - size: content_length, - e_tag: Some(e_tag.to_string()), - }) + Ok(header_meta(location, response.headers()).context(HeaderSnafu)?) } async fn delete(&self, location: &Path) -> Result<()> { diff --git a/object_store/src/client/header.rs b/object_store/src/client/header.rs new file mode 100644 index 000000000000..cc4f16eaa599 --- /dev/null +++ b/object_store/src/client/header.rs @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Logic for extracting ObjectMeta from headers used by AWS, GCP and Azure + +use crate::path::Path; +use crate::ObjectMeta; +use chrono::{DateTime, Utc}; +use hyper::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED}; +use hyper::HeaderMap; +use snafu::{OptionExt, ResultExt, Snafu}; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("ETag Header missing from response"))] + MissingEtag, + + #[snafu(display("Received header containing non-ASCII data"))] + BadHeader { source: reqwest::header::ToStrError }, + + #[snafu(display("Last-Modified Header missing from response"))] + MissingLastModified, + + #[snafu(display("Content-Length Header missing from response"))] + MissingContentLength, + + #[snafu(display("Invalid last modified '{}': {}", last_modified, source))] + InvalidLastModified { + last_modified: String, + source: chrono::ParseError, + }, + + #[snafu(display("Invalid content length '{}': {}", content_length, source))] + InvalidContentLength { + content_length: String, + source: std::num::ParseIntError, + }, +} + +/// Extracts [`ObjectMeta`] from the provided [`HeaderMap`] +pub fn header_meta(location: &Path, headers: &HeaderMap) -> Result { + let last_modified = headers + .get(LAST_MODIFIED) + .context(MissingLastModifiedSnafu)?; + + let content_length = headers + .get(CONTENT_LENGTH) + .context(MissingContentLengthSnafu)?; + + let last_modified = last_modified.to_str().context(BadHeaderSnafu)?; + let last_modified = DateTime::parse_from_rfc2822(last_modified) + .context(InvalidLastModifiedSnafu { last_modified })? + .with_timezone(&Utc); + + let content_length = content_length.to_str().context(BadHeaderSnafu)?; + let content_length = content_length + .parse() + .context(InvalidContentLengthSnafu { content_length })?; + + let e_tag = headers.get(ETAG).context(MissingEtagSnafu)?; + let e_tag = e_tag.to_str().context(BadHeaderSnafu)?; + + Ok(ObjectMeta { + location: location.clone(), + last_modified, + size: content_length, + e_tag: Some(e_tag.to_string()), + }) +} diff --git a/object_store/src/client/list.rs b/object_store/src/client/list.rs new file mode 100644 index 000000000000..6a3889e3be5b --- /dev/null +++ b/object_store/src/client/list.rs @@ -0,0 +1,85 @@ +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! The list response format used by GCP and AWS + +use crate::path::Path; +use crate::{ListResult, ObjectMeta, Result}; +use chrono::{DateTime, Utc}; +use serde::Deserialize; + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct ListResponse { + #[serde(default)] + pub contents: Vec, + #[serde(default)] + pub common_prefixes: Vec, + #[serde(default)] + pub next_continuation_token: Option, +} + +impl TryFrom for ListResult { + type Error = crate::Error; + + fn try_from(value: ListResponse) -> Result { + let common_prefixes = value + .common_prefixes + .into_iter() + .map(|x| Ok(Path::parse(x.prefix)?)) + .collect::>()?; + + let objects = value + .contents + .into_iter() + .map(TryFrom::try_from) + .collect::>()?; + + Ok(Self { + common_prefixes, + objects, + }) + } +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct ListPrefix { + pub prefix: String, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct ListContents { + pub key: String, + pub size: usize, + pub last_modified: DateTime, + #[serde(rename = "ETag")] + pub e_tag: Option, +} + +impl TryFrom for ObjectMeta { + type Error = crate::Error; + + fn try_from(value: ListContents) -> Result { + Ok(Self { + location: Path::parse(value.key)?, + last_modified: value.last_modified, + size: value.size, + e_tag: value.e_tag, + }) + } +} diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs index be44a9f99b27..c6a73fe7a618 100644 --- a/object_store/src/client/mod.rs +++ b/object_store/src/client/mod.rs @@ -26,6 +26,12 @@ pub mod retry; #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))] pub mod token; +#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))] +pub mod header; + +#[cfg(any(feature = "aws", feature = "gcp"))] +pub mod list; + use std::collections::HashMap; use std::str::FromStr; use std::time::Duration; diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 41a91fef84a9..32f4055f1178 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -36,15 +36,16 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::{Buf, Bytes}; -use chrono::{DateTime, Utc}; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; -use percent_encoding::{percent_encode, NON_ALPHANUMERIC}; +use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC}; use reqwest::{header, Client, Method, Response, StatusCode}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt, Snafu}; use tokio::io::AsyncWrite; use url::Url; +use crate::client::header::header_meta; +use crate::client::list::ListResponse; use crate::client::pagination::stream_paginated; use crate::client::retry::RetryExt; use crate::client::{ClientConfigKey, GetOptionsExt}; @@ -82,6 +83,9 @@ enum Error { #[snafu(display("Error getting list response body: {}", source))] ListResponseBody { source: reqwest::Error }, + #[snafu(display("Got invalid list response: {}", source))] + InvalidListResponse { source: quick_xml::de::DeError }, + #[snafu(display("Error performing get request {}: {}", path, source))] GetRequest { source: crate::client::retry::Error, @@ -143,6 +147,11 @@ enum Error { #[snafu(display("Configuration key: '{}' is not known.", key))] UnknownConfigurationKey { key: String }, + + #[snafu(display("Failed to parse headers: {}", source))] + Header { + source: crate::client::header::Error, + }, } impl From for super::Error { @@ -162,25 +171,6 @@ impl From for super::Error { } } -#[derive(serde::Deserialize, Debug)] -#[serde(rename_all = "camelCase")] -struct ListResponse { - next_page_token: Option, - #[serde(default)] - prefixes: Vec, - #[serde(default)] - items: Vec, -} - -#[derive(serde::Deserialize, Debug)] -struct Object { - name: String, - size: String, - updated: DateTime, - #[serde(rename = "etag")] - e_tag: Option, -} - #[derive(serde::Deserialize, Debug)] #[serde(rename_all = "PascalCase")] struct InitiateMultipartUploadResult { @@ -248,15 +238,11 @@ impl GoogleCloudStorageClient { } fn object_url(&self, path: &Path) -> String { - let encoded = - percent_encoding::utf8_percent_encode(path.as_ref(), NON_ALPHANUMERIC); - format!( - "{}/storage/v1/b/{}/o/{}", - self.base_url, self.bucket_name_encoded, encoded - ) + let encoded = utf8_percent_encode(path.as_ref(), NON_ALPHANUMERIC); + format!("{}/{}/{}", self.base_url, self.bucket_name_encoded, encoded) } - /// Perform a get request + /// Perform a get request async fn get_request( &self, path: &Path, @@ -266,16 +252,15 @@ impl GoogleCloudStorageClient { let token = self.get_token().await?; let url = self.object_url(path); - let alt = match head { - true => "json", - false => "media", + let method = match head { + true => Method::HEAD, + false => Method::GET, }; - let builder = self.client.request(Method::GET, url); - - let response = builder + let response = self + .client + .request(method, url) .bearer_auth(token) - .query(&[("alt", alt)]) .with_get_options(options) .send_retry(&self.retry_config) .await @@ -286,13 +271,10 @@ impl GoogleCloudStorageClient { Ok(response) } - /// Perform a put request + /// Perform a put request async fn put_request(&self, path: &Path, payload: Bytes) -> Result<()> { let token = self.get_token().await?; - let url = format!( - "{}/upload/storage/v1/b/{}/o", - self.base_url, self.bucket_name_encoded - ); + let url = self.object_url(path); let content_type = self .client_options @@ -300,11 +282,10 @@ impl GoogleCloudStorageClient { .unwrap_or("application/octet-stream"); self.client - .request(Method::POST, url) + .request(Method::PUT, url) .bearer_auth(token) .header(header::CONTENT_TYPE, content_type) .header(header::CONTENT_LENGTH, payload.len()) - .query(&[("uploadType", "media"), ("name", path.as_ref())]) .body(payload) .send_retry(&self.retry_config) .await @@ -373,7 +354,7 @@ impl GoogleCloudStorageClient { Ok(()) } - /// Perform a delete request + /// Perform a delete request async fn delete_request(&self, path: &Path) -> Result<()> { let token = self.get_token().await?; let url = self.object_url(path); @@ -390,7 +371,7 @@ impl GoogleCloudStorageClient { Ok(()) } - /// Perform a copy request + /// Perform a copy request async fn copy_request( &self, from: &Path, @@ -398,24 +379,18 @@ impl GoogleCloudStorageClient { if_not_exists: bool, ) -> Result<()> { let token = self.get_token().await?; + let url = self.object_url(to); - let source = - percent_encoding::utf8_percent_encode(from.as_ref(), NON_ALPHANUMERIC); - let destination = - percent_encoding::utf8_percent_encode(to.as_ref(), NON_ALPHANUMERIC); - let url = format!( - "{}/storage/v1/b/{}/o/{}/copyTo/b/{}/o/{}", - self.base_url, - self.bucket_name_encoded, - source, - self.bucket_name_encoded, - destination - ); + let from = utf8_percent_encode(from.as_ref(), NON_ALPHANUMERIC); + let source = format!("{}/{}", self.bucket_name_encoded, from); - let mut builder = self.client.request(Method::POST, url); + let mut builder = self + .client + .request(Method::PUT, url) + .header("x-goog-copy-source", source); if if_not_exists { - builder = builder.query(&[("ifGenerationMatch", "0")]); + builder = builder.header("x-goog-if-generation-match", 0); } builder @@ -436,7 +411,7 @@ impl GoogleCloudStorageClient { Ok(()) } - /// Perform a list request + /// Perform a list request async fn list_request( &self, prefix: Option<&str>, @@ -444,13 +419,10 @@ impl GoogleCloudStorageClient { page_token: Option<&str>, ) -> Result { let token = self.get_token().await?; + let url = format!("{}/{}", self.base_url, self.bucket_name_encoded); - let url = format!( - "{}/storage/v1/b/{}/o", - self.base_url, self.bucket_name_encoded - ); - - let mut query = Vec::with_capacity(4); + let mut query = Vec::with_capacity(5); + query.push(("list-type", "2")); if delimiter { query.push(("delimiter", DELIMITER)) } @@ -460,14 +432,14 @@ impl GoogleCloudStorageClient { } if let Some(page_token) = page_token { - query.push(("pageToken", page_token)) + query.push(("continuation-token", page_token)) } if let Some(max_results) = &self.max_list_results { - query.push(("maxResults", max_results)) + query.push(("max-keys", max_results)) } - let response: ListResponse = self + let response = self .client .request(Method::GET, url) .query(&query) @@ -475,10 +447,13 @@ impl GoogleCloudStorageClient { .send_retry(&self.retry_config) .await .context(ListRequestSnafu)? - .json() + .bytes() .await .context(ListResponseBodySnafu)?; + let response: ListResponse = quick_xml::de::from_reader(response.reader()) + .context(InvalidListResponseSnafu)?; + Ok(response) } @@ -487,14 +462,14 @@ impl GoogleCloudStorageClient { &self, prefix: Option<&Path>, delimiter: bool, - ) -> BoxStream<'_, Result> { + ) -> BoxStream<'_, Result> { let prefix = format_prefix(prefix); stream_paginated(prefix, move |prefix, token| async move { let mut r = self .list_request(prefix.as_deref(), delimiter, token.as_deref()) .await?; - let next_token = r.next_page_token.take(); - Ok((r, prefix, next_token)) + let next_token = r.next_continuation_token.take(); + Ok((r.try_into()?, prefix, next_token)) }) .boxed() } @@ -639,12 +614,6 @@ impl ObjectStore for GoogleCloudStorage { } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { - if options.if_modified_since.is_some() || options.if_unmodified_since.is_some() { - return Err(super::Error::NotSupported { - source: "ModifiedSince Preconditions not supported by GoogleCloudStorage JSON API".to_string().into(), - }); - } - let response = self.client.get_request(location, options, false).await?; let stream = response .bytes_stream() @@ -660,10 +629,7 @@ impl ObjectStore for GoogleCloudStorage { async fn head(&self, location: &Path) -> Result { let options = GetOptions::default(); let response = self.client.get_request(location, options, true).await?; - let object = response.json().await.context(GetResponseBodySnafu { - path: location.as_ref(), - })?; - convert_object_meta(&object) + Ok(header_meta(location, response.headers()).context(HeaderSnafu)?) } async fn delete(&self, location: &Path) -> Result<()> { @@ -677,11 +643,7 @@ impl ObjectStore for GoogleCloudStorage { let stream = self .client .list_paginated(prefix, false) - .map_ok(|r| { - futures::stream::iter( - r.items.into_iter().map(|x| convert_object_meta(&x)), - ) - }) + .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok))) .try_flatten() .boxed(); @@ -696,15 +658,8 @@ impl ObjectStore for GoogleCloudStorage { while let Some(result) = stream.next().await { let response = result?; - - for p in response.prefixes { - common_prefixes.insert(Path::parse(p)?); - } - - objects.reserve(response.items.len()); - for object in &response.items { - objects.push(convert_object_meta(object)?); - } + common_prefixes.extend(response.common_prefixes.into_iter()); + objects.extend(response.objects.into_iter()); } Ok(ListResult { @@ -1170,20 +1125,6 @@ impl GoogleCloudStorageBuilder { } } -fn convert_object_meta(object: &Object) -> Result { - let location = Path::parse(&object.name)?; - let last_modified = object.updated; - let size = object.size.parse().context(InvalidSizeSnafu)?; - let e_tag = object.e_tag.clone(); - - Ok(ObjectMeta { - location, - last_modified, - size, - e_tag, - }) -} - #[cfg(test)] mod test { use bytes::Bytes; diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs index ffe509411911..39585f73b692 100644 --- a/object_store/src/prefix.rs +++ b/object_store/src/prefix.rs @@ -119,11 +119,7 @@ impl ObjectStore for PrefixStore { self.inner.get_range(&full_path, range).await } - async fn get_opts( - &self, - location: &Path, - options: GetOptions, - ) -> Result { + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { let full_path = self.full_path(location); self.inner.get_opts(&full_path, options).await }