Skip to content

Commit

Permalink
backend/s3: Implement RFC-57 Auto Region (#59)
Browse files Browse the repository at this point in the history
* backend/s3: Implement RFC-57 Auto Region

Signed-off-by: Xuanwo <github@xuanwo.io>

* Add more info for error

Signed-off-by: Xuanwo <github@xuanwo.io>

* Better error

Signed-off-by: Xuanwo <github@xuanwo.io>

* Make clippy happy

Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo committed Feb 24, 2022
1 parent 244ffe9 commit 80e8bca
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 71 deletions.
1 change: 0 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ OPENDAL_FS_ROOT=/path/to/dir
# s3
OPENDAL_S3_TEST=false
OPENDAL_S3_BUCKET=<bucket>
OPENDAL_S3_REGION=<region>
OPENDAL_S3_ENDPOINT=<endpoint>
OPENDAL_S3_ACCESS_KEY_ID=<access_key_id>
OPENDAL_S3_SECRET_ACCESS_KEY=<secret_access_key>
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pin-project = "1.0.10"
aws-config = "0.6.0"
blocking = "1.1.0"
anyhow = "1"
reqwest = "0.11"
lazy_static = "1"

[dev-dependencies]
uuid = { version = "0.8.2", features = ["serde", "v4"] }
Expand Down
3 changes: 1 addition & 2 deletions benches/ops/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ pub async fn init() -> Result<Operator> {
s3::Backend::build()
.root(&format!("/{}", uuid::Uuid::new_v4()))
.bucket(&env::var("OPENDAL_S3_BUCKET")?)
.region(&env::var("OPENDAL_S3_REGION")?)
.endpoint(&env::var("OPENDAL_S3_ENDPOINT")?)
.endpoint(&env::var("OPENDAL_S3_ENDPOINT").unwrap_or_default())
.credential(Credential::hmac(
&env::var("OPENDAL_S3_ACCESS_KEY_ID")?,
&env::var("OPENDAL_S3_SECRET_ACCESS_KEY")?,
Expand Down
190 changes: 125 additions & 65 deletions src/services/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ use aws_smithy_http::body::SdkBody;
use aws_smithy_http::byte_stream::ByteStream;
use aws_smithy_http::result::SdkError;
use futures::TryStreamExt;
use http::{HeaderValue, StatusCode};
use lazy_static::lazy_static;

use crate::credential::Credential;
use crate::error::Error;
Expand All @@ -46,6 +48,18 @@ use crate::readers::ReaderStream;
use crate::Accessor;
use crate::BoxedAsyncReader;

lazy_static! {
static ref ENDPOINT_TEMPLATES: HashMap<&'static str, &'static str> = {
let mut m = HashMap::new();
// AWS S3 Service.
m.insert(
"https://s3.amazonaws.com",
"https://s3.{region}.amazonaws.com",
);
m
};
}

/// # TODO
///
/// enable_path_style and enable_signature_v2 need sdk support.
Expand All @@ -56,7 +70,6 @@ pub struct Builder {
root: Option<String>,

bucket: String,
region: Option<String>,
credential: Option<Credential>,
/// endpoint must be full uri, e.g.
/// - https://s3.amazonaws.com
Expand Down Expand Up @@ -84,16 +97,6 @@ impl Builder {
self
}

pub fn region(&mut self, region: &str) -> &mut Self {
self.region = if region.is_empty() {
None
} else {
Some(region.to_string())
};

self
}

pub fn credential(&mut self, credential: Credential) -> &mut Self {
self.credential = Some(credential);

Expand All @@ -111,21 +114,105 @@ impl Builder {
}

pub async fn finish(&mut self) -> Result<Arc<dyn Accessor>> {
if self.bucket.is_empty() {
return Err(Error::Backend {
kind: Kind::BackendConfigurationInvalid,
context: HashMap::from([("bucket".to_string(), "".to_string())]),
source: anyhow!("bucket is empty"),
});
}

// strip the prefix of "/" in root only once.
let root = if let Some(root) = &self.root {
root.strip_prefix('/').unwrap_or(root).to_string()
} else {
String::new()
};

// Handle endpoint, region and bucket name.
let bucket = match self.bucket.is_empty() {
false => Ok(&self.bucket),
true => Err(Error::Backend {
kind: Kind::BackendConfigurationInvalid,
context: HashMap::from([("bucket".to_string(), "".to_string())]),
source: anyhow!("bucket is empty"),
}),
}?;

let endpoint = match &self.endpoint {
Some(endpoint) => endpoint,
None => "https://s3.amazonaws.com",
};

// Setup error context so that we don't need to construct many times.
let mut context: HashMap<String, String> = HashMap::from([
("endpoint".to_string(), endpoint.to_string()),
("bucket".to_string(), bucket.to_string()),
]);

let hc = reqwest::Client::new();
let res = hc
.head(format!("{endpoint}/{bucket}"))
.send()
.await
.map_err(|e| Error::Backend {
kind: Kind::BackendConfigurationInvalid,
context: context.clone(),
source: anyhow::Error::new(e),
})?;
// Read RFC-0057: Auto Region for detailed behavior.
let (endpoint, region) = match res.status() {
// The endpoint works, return with not changed endpoint and
// default region.
StatusCode::OK | StatusCode::FORBIDDEN => {
let region = res
.headers()
.get("x-amz-bucket-region")
.unwrap_or(&HeaderValue::from_static("us-east-1"))
.to_str()
.map_err(|e| Error::Backend {
kind: Kind::BackendConfigurationInvalid,
context: context.clone(),
source: anyhow::Error::new(e),
})?
.to_string();
(endpoint.to_string(), region)
}
// The endpoint should move, return with constructed endpoint
StatusCode::MOVED_PERMANENTLY => {
let region = res
.headers()
.get("x-amz-bucket-region")
.ok_or(Error::Backend {
kind: Kind::BackendConfigurationInvalid,
context: context.clone(),
source: anyhow!("can't detect region automatically, region is empty"),
})?
.to_str()
.map_err(|e| Error::Backend {
kind: Kind::BackendConfigurationInvalid,
context: context.clone(),
source: anyhow::Error::new(e),
})?
.to_string();
let template = ENDPOINT_TEMPLATES.get(endpoint).ok_or(Error::Backend {
kind: Kind::BackendConfigurationInvalid,
context: context.clone(),
source: anyhow!(
"can't detect region automatically, no valid endpoint template for {}",
&endpoint
),
})?;

let endpoint = template.replace("{region}", &region);

(endpoint, region)
}
// Unexpected status code
code => {
return Err(Error::Backend {
kind: Kind::BackendConfigurationInvalid,
context: context.clone(),
source: anyhow!(
"can't detect region automatically, unexpected response: status code {}",
code
),
});
}
};

// Config Loader will load config from environment.
//
// We will take user's input first if any. If there is no user input, we
Expand All @@ -138,19 +225,33 @@ impl Builder {
// - EC2 Instance Metadata Service (IAM Roles attached to instance)
//
// Please keep in mind that the config loader only detect region and credentials.
let mut cfg_loader = aws_config::ConfigLoader::default();
let cfg_loader = aws_config::ConfigLoader::default();
let mut cfg = AwsS3::config::Builder::from(&cfg_loader.load().await);

{
// Set region.
cfg = cfg.region(AwsS3::Region::new(Cow::from(region.clone())));
}

{
// Set endpoint
let uri = http::Uri::from_str(&endpoint).map_err(|e| Error::Backend {
kind: Kind::BackendConfigurationInvalid,
context: context.clone(),
source: anyhow::Error::from(e),
})?;

if let Some(region) = &self.region {
cfg_loader = cfg_loader.region(AwsS3::Region::new(Cow::from(region.clone())));
cfg = cfg.endpoint_resolver(AwsS3::Endpoint::immutable(uri));
}

if let Some(cred) = &self.credential {
context.insert("credential".to_string(), "*".to_string());
match cred {
Credential::HMAC {
access_key_id,
secret_access_key,
} => {
cfg_loader = cfg_loader.credentials_provider(AwsS3::Credentials::from_keys(
cfg = cfg.credentials_provider(AwsS3::Credentials::from_keys(
access_key_id,
secret_access_key,
None,
Expand All @@ -159,54 +260,13 @@ impl Builder {
_ => {
return Err(Error::Backend {
kind: Kind::BackendConfigurationInvalid,
context: HashMap::from([("credential".to_string(), "*".to_string())]),
context: context.clone(),
source: anyhow!("credential is invalid"),
});
}
}
}

let mut cfg = AwsS3::config::Builder::from(&cfg_loader.load().await);

// Load users input first, if user not input, we will fallback to aws
// default load logic.
if let Some(endpoint) = &self.endpoint {
let mut uri = http::Uri::from_str(endpoint).map_err(|e| Error::Backend {
kind: Kind::BackendConfigurationInvalid,
context: HashMap::from([("endpoint".to_string(), endpoint.clone())]),
source: anyhow::Error::from(e),
})?;

let mut parts = uri.into_parts();

// If uri's authority is empty, it's must be an invalid url.
if parts.authority.is_none() {
return Err(Error::Backend {
kind: Kind::BackendConfigurationInvalid,
context: HashMap::from([("endpoint".to_string(), endpoint.clone())]),
source: anyhow!("uri is invalid"),
});
}

// If user doesn't input scheme, we will use https as default.
if parts.scheme.is_none() {
parts.scheme = Some(http::uri::Scheme::HTTPS);
}

// If user doesn't input path, we will set it to "/" as default.
if parts.path_and_query.is_none() {
parts.path_and_query = Some(http::uri::PathAndQuery::from_static("/"));
}

uri = http::Uri::from_parts(parts).map_err(|e| Error::Backend {
kind: Kind::BackendConfigurationInvalid,
context: HashMap::from([("endpoint".to_string(), endpoint.clone())]),
source: anyhow::Error::from(e),
})?;

cfg = cfg.endpoint_resolver(AwsS3::Endpoint::immutable(uri));
}

Ok(Arc::new(Backend {
// Make `/` as the default of root.
root,
Expand Down
4 changes: 1 addition & 3 deletions tests/behavior/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use super::BehaviorTest;
///
/// - `OPENDAL_S3_TEST=on`: set to `on` to enable the test.
/// - `OPENDAL_S3_BUCKET=<bucket>`: set the bucket name.
/// - `OPENDAL_S3_REGION=<region>`: set the bucket region.
/// - `OPENDAL_S3_ENDPOINT=<endpoint>`: set the endpoint of the s3 service.
/// - `OPENDAL_S3_ACCESS_KEY_ID=<access_key_id>`: set the access key id.
/// - `OPENDAL_S3_SECRET_ACCESS_KEY=<secret_access_key>`: set the secret access key.
Expand All @@ -42,8 +41,7 @@ async fn test_s3() -> Result<()> {
s3::Backend::build()
.root(&format!("/{}", uuid::Uuid::new_v4()))
.bucket(&env::var("OPENDAL_S3_BUCKET")?)
.region(&env::var("OPENDAL_S3_REGION")?)
.endpoint(&env::var("OPENDAL_S3_ENDPOINT")?)
.endpoint(&env::var("OPENDAL_S3_ENDPOINT").unwrap_or_default())
.credential(Credential::hmac(
&env::var("OPENDAL_S3_ACCESS_KEY_ID")?,
&env::var("OPENDAL_S3_SECRET_ACCESS_KEY")?,
Expand Down

0 comments on commit 80e8bca

Please sign in to comment.