Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s3): allow users to specify storage_class #1854

Merged
merged 4 commits into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 58 additions & 3 deletions core/src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ mod constants {
"x-amz-server-side-encryption-customer-key-md5";
pub const X_AMZ_SERVER_SIDE_ENCRYPTION_AWS_KMS_KEY_ID: &str =
"x-amz-server-side-encryption-aws-kms-key-id";
pub const X_AMZ_STORAGE_CLASS: &str = "x-amz-storage-class";

pub const RESPONSE_CONTENT_DISPOSITION: &str = "response-content-disposition";
pub const RESPONSE_CACHE_CONTROL: &str = "response-cache-control";
Expand Down Expand Up @@ -100,6 +101,7 @@ mod constants {
/// - `access_key_id`: Set the access_key_id for backend.
/// - `secret_access_key`: Set the secret_access_key for backend.
/// - `security_token`: Set the security_token for backend.
/// - `storage_class`: Set the storage_class for backend.
/// - `server_side_encryption`: Set the server_side_encryption for backend.
/// - `server_side_encryption_aws_kms_key_id`: Set the server_side_encryption_aws_kms_key_id for backend.
/// - `server_side_encryption_customer_algorithm`: Set the server_side_encryption_customer_algorithm for backend.
Expand Down Expand Up @@ -314,6 +316,7 @@ pub struct S3Builder {
server_side_encryption_customer_algorithm: Option<String>,
server_side_encryption_customer_key: Option<String>,
server_side_encryption_customer_key_md5: Option<String>,
storage_class: Option<String>,

/// temporary credentials, check the official [doc](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp.html) for detail
security_token: Option<String>,
Expand All @@ -336,7 +339,8 @@ impl Debug for S3Builder {
.field("role_arn", &self.role_arn)
.field("external_id", &self.external_id)
.field("disable_config_load", &self.disable_config_load)
.field("enable_virtual_host_style", &self.enable_virtual_host_style);
.field("enable_virtual_host_style", &self.enable_virtual_host_style)
.field("storage_class", &self.storage_class);

if self.access_key_id.is_some() {
d.field("access_key_id", &"<redacted>");
Expand Down Expand Up @@ -463,6 +467,26 @@ impl S3Builder {
self
}

/// Set storage_class for this backend.
///
/// Available values:
/// - `DEEP_ARCHIVE`
/// - `GLACIER`
/// - `GLACIER_IR`
/// - `INTELLIGENT_TIERING`
/// - `ONEZONE_IA`
/// - `OUTPOSTS`
/// - `REDUCED_REDUNDANCY`
/// - `STANDARD`
/// - `STANDARD_IA`
pub fn storage_class(&mut self, v: &str) -> &mut Self {
knight42 marked this conversation as resolved.
Show resolved Hide resolved
if !v.is_empty() {
self.storage_class = Some(v.to_string())
}

self
}

/// Set server_side_encryption for this backend.
///
/// Available values: `AES256`, `aws:kms`.
Expand Down Expand Up @@ -743,6 +767,7 @@ impl Builder for S3Builder {
map.get("enable_virtual_host_style")
.filter(|v| *v == "on" || *v == "true")
.map(|_| builder.enable_virtual_host_style());
map.get("storage_class").map(|v| builder.storage_class(v));

builder
}
Expand All @@ -763,6 +788,15 @@ impl Builder for S3Builder {
}?;
debug!("backend use bucket {}", &bucket);

let storage_class = match &self.storage_class {
None => None,
Some(v) => Some(v.parse().map_err(|e| {
Error::new(ErrorKind::ConfigInvalid, "storage_class value is invalid")
.with_context("value", v)
.set_source(e)
})?),
};

let server_side_encryption = match &self.server_side_encryption {
None => None,
Some(v) => Some(v.parse().map_err(|e| {
Expand Down Expand Up @@ -916,6 +950,7 @@ impl Builder for S3Builder {
server_side_encryption_customer_algorithm,
server_side_encryption_customer_key,
server_side_encryption_customer_key_md5,
storage_class,
})
}
}
Expand All @@ -935,6 +970,7 @@ pub struct S3Backend {
server_side_encryption_customer_algorithm: Option<HeaderValue>,
server_side_encryption_customer_key: Option<HeaderValue>,
server_side_encryption_customer_key_md5: Option<HeaderValue>,
storage_class: Option<HeaderValue>,
}

impl S3Backend {
Expand Down Expand Up @@ -1026,7 +1062,7 @@ impl Accessor for S3Backend {

async fn create(&self, path: &str, _: OpCreate) -> Result<RpCreate> {
let mut req =
self.s3_put_object_request(path, Some(0), None, None, None, AsyncBody::Empty)?;
self.s3_put_object_request(path, Some(0), None, None, None, None, AsyncBody::Empty)?;
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved

self.signer.sign(&mut req).map_err(new_request_sign_error)?;

Expand Down Expand Up @@ -1067,6 +1103,7 @@ impl Accessor for S3Backend {
args.content_type(),
args.content_disposition(),
args.cache_control(),
args.storage_class(),
)
.await?;

Expand Down Expand Up @@ -1150,7 +1187,7 @@ impl Accessor for S3Backend {
v.if_none_match(),
)?,
PresignOperation::Write(_) => {
self.s3_put_object_request(path, None, None, None, None, AsyncBody::Empty)?
self.s3_put_object_request(path, None, None, None, None, None, AsyncBody::Empty)?
}
};

Expand Down Expand Up @@ -1311,6 +1348,7 @@ impl S3Backend {
content_type: Option<&str>,
content_disposition: Option<&str>,
cache_control: Option<&str>,
storage_class: Option<&str>,
knight42 marked this conversation as resolved.
Show resolved Hide resolved
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
Expand All @@ -1335,6 +1373,14 @@ impl S3Backend {
req = req.header(CACHE_CONTROL, cache_control)
}

// Set storage class header
if let Some(v) = storage_class
.and_then(|v| v.parse::<HeaderValue>().ok())
.or_else(|| self.storage_class.clone())
{
req = req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v);
}

Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
// Set SSE headers.
req = self.insert_sse_headers(req, true);

Expand Down Expand Up @@ -1432,6 +1478,7 @@ impl S3Backend {
content_type: Option<&str>,
content_disposition: Option<&str>,
cache_control: Option<&str>,
storage_class: Option<&str>,
) -> Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);

Expand All @@ -1451,6 +1498,14 @@ impl S3Backend {
req = req.header(CACHE_CONTROL, cache_control)
}

// Set storage class header
if let Some(v) = storage_class
.and_then(|v| v.parse::<HeaderValue>().ok())
.or_else(|| self.storage_class.clone())
{
req = req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v);
}

// Set SSE headers.
let req = self.insert_sse_headers(req, true);

Expand Down
1 change: 1 addition & 0 deletions core/src/services/s3/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl oio::Write for S3Writer {
self.op.content_type(),
self.op.content_disposition(),
self.op.cache_control(),
self.op.storage_class(),
AsyncBody::Bytes(bs),
)?;

Expand Down
20 changes: 13 additions & 7 deletions core/src/types/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,20 +309,15 @@ pub struct OpWrite {
content_type: Option<String>,
content_disposition: Option<String>,
cache_control: Option<String>,
storage_class: Option<String>,
}

impl OpWrite {
/// Create a new `OpWrite`.
///
/// If input path is not a file path, an error will be returned.
pub fn new() -> Self {
Self {
append: false,

content_type: None,
content_disposition: None,
cache_control: None,
}
Self::default()
}

pub(crate) fn with_append(mut self) -> Self {
Expand Down Expand Up @@ -366,6 +361,17 @@ impl OpWrite {
self.cache_control = Some(cache_control.to_string());
self
}

/// Get the storage class from option
pub fn storage_class(&self) -> Option<&str> {
self.storage_class.as_deref()
}

/// Set the storage class of option
pub fn with_storage_class(mut self, storage_class: &str) -> Self {
self.storage_class = Some(storage_class.to_string());
self
}
}

/// Args for `copy` operation.
Expand Down