Skip to content

Commit

Permalink
Consistently use GCP XML API (#4207)
Browse files Browse the repository at this point in the history
* Consistently use GCP XML API

* Use updated fake-gcs-server

* Review feedback
  • Loading branch information
tustvold authored May 15, 2023
1 parent 4e92f93 commit 108b7a8
Show file tree
Hide file tree
Showing 11 changed files with 247 additions and 287 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/object_store.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ jobs:
- uses: actions/checkout@v3

- name: Configure Fake GCS Server (GCP emulation)
# Custom image - see fsouza/fake-gcs-server#1164
run: |
docker run -d -p 4443:4443 fsouza/fake-gcs-server -scheme http
docker run -d -p 4443:4443 tustvold/fake-gcs-server -scheme http -backend memory -public-host localhost:4443
# Give the container a moment to start up prior to configuring it
sleep 1
curl -v -X POST --data-binary '{"name":"test-bucket"}' -H "Content-Type: application/json" "http://localhost:4443/storage/v1/b"
Expand Down
2 changes: 1 addition & 1 deletion object_store/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ To test the GCS integration, we use [Fake GCS Server](https://github.com/fsouza/
Startup the fake server:

```shell
docker run -p 4443:4443 fsouza/fake-gcs-server -scheme http
docker run -p 4443:4443 tustvold/fake-gcs-server -scheme http
```

Configure the account:
Expand Down
2 changes: 1 addition & 1 deletion object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", "io-ut
nix = "0.26.1"

[features]
cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/json","reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
azure = ["cloud"]
gcp = ["cloud", "rustls-pemfile"]
aws = ["cloud"]
Expand Down
69 changes: 3 additions & 66 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@
use crate::aws::checksum::Checksum;
use crate::aws::credential::{AwsCredential, CredentialExt, CredentialProvider};
use crate::aws::{STORE, STRICT_PATH_ENCODE_SET};
use crate::client::list::ListResponse;
use crate::client::pagination::stream_paginated;
use crate::client::retry::RetryExt;
use crate::client::GetOptionsExt;
use crate::multipart::UploadPart;
use crate::path::DELIMITER;
use crate::util::format_prefix;
use crate::{
BoxStream, ClientOptions, GetOptions, ListResult, MultipartId, ObjectMeta, Path,
Result, RetryConfig, StreamExt,
BoxStream, ClientOptions, GetOptions, ListResult, MultipartId, Path, Result,
RetryConfig, StreamExt,
};
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::{Buf, Bytes};
use chrono::{DateTime, Utc};
use percent_encoding::{utf8_percent_encode, PercentEncode};
use reqwest::{header::CONTENT_TYPE, Client as ReqwestClient, Method, Response};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -109,69 +109,6 @@ impl From<Error> for crate::Error {
}
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct ListResponse {
#[serde(default)]
pub contents: Vec<ListContents>,
#[serde(default)]
pub common_prefixes: Vec<ListPrefix>,
#[serde(default)]
pub next_continuation_token: Option<String>,
}

impl TryFrom<ListResponse> for ListResult {
type Error = crate::Error;

fn try_from(value: ListResponse) -> Result<Self> {
let common_prefixes = value
.common_prefixes
.into_iter()
.map(|x| Ok(Path::parse(x.prefix)?))
.collect::<Result<_>>()?;

let objects = value
.contents
.into_iter()
.map(TryFrom::try_from)
.collect::<Result<_>>()?;

Ok(Self {
common_prefixes,
objects,
})
}
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct ListPrefix {
pub prefix: String,
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct ListContents {
pub key: String,
pub size: usize,
pub last_modified: DateTime<Utc>,
#[serde(rename = "ETag")]
pub e_tag: Option<String>,
}

impl TryFrom<ListContents> for ObjectMeta {
type Error = crate::Error;

fn try_from(value: ListContents) -> Result<Self> {
Ok(Self {
location: Path::parse(value.key)?,
last_modified: value.last_modified,
size: value.size,
e_tag: value.e_tag,
})
}
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
struct InitiateMultipart {
Expand Down
57 changes: 7 additions & 50 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::stream::BoxStream;
use futures::TryStreamExt;
use itertools::Itertools;
Expand All @@ -52,6 +51,7 @@ use crate::aws::credential::{
AwsCredential, CredentialProvider, InstanceCredentialProvider,
StaticCredentialProvider, WebIdentityProvider,
};
use crate::client::header::header_meta;
use crate::client::ClientConfigKey;
use crate::config::ConfigValue;
use crate::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart};
Expand Down Expand Up @@ -87,24 +87,6 @@ static METADATA_ENDPOINT: &str = "http://169.254.169.254";
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
enum Error {
#[snafu(display("Last-Modified Header missing from response"))]
MissingLastModified,

#[snafu(display("Content-Length Header missing from response"))]
MissingContentLength,

#[snafu(display("Invalid last modified '{}': {}", last_modified, source))]
InvalidLastModified {
last_modified: String,
source: chrono::ParseError,
},

#[snafu(display("Invalid content length '{}': {}", content_length, source))]
InvalidContentLength {
content_length: String,
source: std::num::ParseIntError,
},

#[snafu(display("Missing region"))]
MissingRegion,

Expand Down Expand Up @@ -155,6 +137,11 @@ enum Error {

#[snafu(display("Failed to parse the region for bucket '{}'", bucket))]
RegionParse { bucket: String },

#[snafu(display("Failed to parse headers: {}", source))]
Header {
source: crate::client::header::Error,
},
}

impl From<Error> for super::Error {
Expand Down Expand Up @@ -261,41 +248,11 @@ impl ObjectStore for AmazonS3 {
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
use reqwest::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};

let options = GetOptions::default();
// Extract meta from headers
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax
let response = self.client.get_request(location, options, true).await?;
let headers = response.headers();

let last_modified = headers
.get(LAST_MODIFIED)
.context(MissingLastModifiedSnafu)?;

let content_length = headers
.get(CONTENT_LENGTH)
.context(MissingContentLengthSnafu)?;

let last_modified = last_modified.to_str().context(BadHeaderSnafu)?;
let last_modified = DateTime::parse_from_rfc2822(last_modified)
.context(InvalidLastModifiedSnafu { last_modified })?
.with_timezone(&Utc);

let content_length = content_length.to_str().context(BadHeaderSnafu)?;
let content_length = content_length
.parse()
.context(InvalidContentLengthSnafu { content_length })?;

let e_tag = headers.get(ETAG).context(MissingEtagSnafu)?;
let e_tag = e_tag.to_str().context(BadHeaderSnafu)?;

Ok(ObjectMeta {
location: location.clone(),
last_modified,
size: content_length,
e_tag: Some(e_tag.to_string()),
})
Ok(header_meta(location, response.headers()).context(HeaderSnafu)?)
}

async fn delete(&self, location: &Path) -> Result<()> {
Expand Down
60 changes: 7 additions & 53 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::Bytes;
use chrono::{TimeZone, Utc};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use percent_encoding::percent_decode_str;
use serde::{Deserialize, Serialize};
Expand All @@ -50,9 +49,9 @@ use std::{collections::BTreeSet, str::FromStr};
use tokio::io::AsyncWrite;
use url::Url;

use crate::client::header::header_meta;
use crate::client::ClientConfigKey;
use crate::config::ConfigValue;
use crate::util::RFC1123_FMT;
pub use credential::authority_hosts;

mod client;
Expand All @@ -75,24 +74,6 @@ const MSI_ENDPOINT_ENV_KEY: &str = "IDENTITY_ENDPOINT";
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
enum Error {
#[snafu(display("Last-Modified Header missing from response"))]
MissingLastModified,

#[snafu(display("Content-Length Header missing from response"))]
MissingContentLength,

#[snafu(display("Invalid last modified '{}': {}", last_modified, source))]
InvalidLastModified {
last_modified: String,
source: chrono::ParseError,
},

#[snafu(display("Invalid content length '{}': {}", content_length, source))]
InvalidContentLength {
content_length: String,
source: std::num::ParseIntError,
},

#[snafu(display("Received header containing non-ASCII data"))]
BadHeader { source: reqwest::header::ToStrError },

Expand Down Expand Up @@ -146,6 +127,11 @@ enum Error {

#[snafu(display("ETag Header missing from response"))]
MissingEtag,

#[snafu(display("Failed to parse headers: {}", source))]
Header {
source: crate::client::header::Error,
},
}

impl From<Error> for super::Error {
Expand Down Expand Up @@ -223,44 +209,12 @@ impl ObjectStore for MicrosoftAzure {
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
use reqwest::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};
let options = GetOptions::default();

// Extract meta from headers
// https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties
let response = self.client.get_request(location, options, true).await?;
let headers = response.headers();

let last_modified = headers
.get(LAST_MODIFIED)
.ok_or(Error::MissingLastModified)?
.to_str()
.context(BadHeaderSnafu)?;
let last_modified = Utc
.datetime_from_str(last_modified, RFC1123_FMT)
.context(InvalidLastModifiedSnafu { last_modified })?;

let content_length = headers
.get(CONTENT_LENGTH)
.ok_or(Error::MissingContentLength)?
.to_str()
.context(BadHeaderSnafu)?;
let content_length = content_length
.parse()
.context(InvalidContentLengthSnafu { content_length })?;

let e_tag = headers
.get(ETAG)
.ok_or(Error::MissingEtag)?
.to_str()
.context(BadHeaderSnafu)?;

Ok(ObjectMeta {
location: location.clone(),
last_modified,
size: content_length,
e_tag: Some(e_tag.to_string()),
})
Ok(header_meta(location, response.headers()).context(HeaderSnafu)?)
}

async fn delete(&self, location: &Path) -> Result<()> {
Expand Down
Loading

0 comments on commit 108b7a8

Please sign in to comment.