Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(core): test for write_with_cache_control #2131

Merged
merged 13 commits into from
Apr 26, 2023
2 changes: 1 addition & 1 deletion core/src/services/azblob/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,8 @@ impl Accessor for AzblobBackend {
read_with_override_content_disposition: true,

write: true,
write_with_content_type: true,
write_with_cache_control: true,
write_with_content_type: true,

delete: true,
create_dir: true,
Expand Down
4 changes: 2 additions & 2 deletions core/src/services/azblob/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use std::fmt::Write;
use std::str::FromStr;

use http::header::HeaderName;
use http::header::CACHE_CONTROL;
use http::header::CONTENT_LENGTH;
use http::header::CONTENT_TYPE;
use http::header::IF_MATCH;
Expand All @@ -41,6 +40,7 @@ use crate::*;
mod constants {
pub const X_MS_BLOB_TYPE: &str = "x-ms-blob-type";
pub const X_MS_COPY_SOURCE: &str = "x-ms-copy-source";
pub const X_MS_BLOB_CACHE_CONTROL: &str = "x-ms-blob-cache-control";
}

pub struct AzblobCore {
Expand Down Expand Up @@ -209,7 +209,7 @@ impl AzblobCore {

let mut req = Request::put(&url);
if let Some(cache_control) = cache_control {
req = req.header(CACHE_CONTROL, cache_control);
req = req.header(constants::X_MS_BLOB_CACHE_CONTROL, cache_control);
}
if let Some(size) = size {
req = req.header(CONTENT_LENGTH, size)
Expand Down
9 changes: 5 additions & 4 deletions core/src/services/gcs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,9 +397,9 @@ impl Accessor for GcsBackend {
}

async fn create_dir(&self, path: &str, _: OpCreate) -> Result<RpCreate> {
let mut req =
self.core
.gcs_insert_object_request(path, Some(0), None, None, AsyncBody::Empty)?;
let mut req = self
.core
.gcs_insert_object_request(path, Some(0), None, AsyncBody::Empty)?;

self.core.sign(&mut req).await?;

Expand Down Expand Up @@ -459,6 +459,7 @@ impl Accessor for GcsBackend {
if resp.status().is_success() {
// read http response body
let slc = resp.into_body().bytes().await?;

let meta: GetObjectJsonResponse =
serde_json::from_slice(&slc).map_err(new_json_deserialize_error)?;

Expand Down Expand Up @@ -537,7 +538,7 @@ impl Accessor for GcsBackend {
)?,
PresignOperation::Write(_) => {
self.core
.gcs_insert_object_xml_request(path, None, None, AsyncBody::Empty)?
.gcs_insert_object_xml_request(path, None, AsyncBody::Empty)?
}
};

Expand Down
12 changes: 1 addition & 11 deletions core/src/services/gcs/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use backon::ExponentialBuilder;
use backon::Retryable;
use bytes::Bytes;
use bytes::BytesMut;
use http::header::CACHE_CONTROL;

use http::header::CONTENT_LENGTH;
use http::header::CONTENT_RANGE;
use http::header::CONTENT_TYPE;
Expand Down Expand Up @@ -208,7 +208,6 @@ impl GcsCore {
path: &str,
size: Option<usize>,
content_type: Option<&str>,
cache_control: Option<&str>,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
Expand All @@ -233,10 +232,6 @@ impl GcsCore {

req = req.header(CONTENT_LENGTH, size.unwrap_or_default());

if let Some(cache_control) = cache_control {
req = req.header(CACHE_CONTROL, cache_control)
}

if let Some(storage_class) = &self.default_storage_class {
req = req.header(CONTENT_TYPE, "multipart/related; boundary=my-boundary");

Expand Down Expand Up @@ -276,7 +271,6 @@ impl GcsCore {
&self,
path: &str,
content_type: Option<&str>,
cache_control: Option<&str>,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
Expand All @@ -289,10 +283,6 @@ impl GcsCore {
req = req.header(CONTENT_TYPE, content_type);
}

if let Some(cache_control) = cache_control {
req = req.header(CACHE_CONTROL, cache_control)
}

if let Some(acl) = &self.predefined_acl {
req = req.header("x-goog-acl", acl);
}
Expand Down
1 change: 0 additions & 1 deletion core/src/services/gcs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ impl GcsWriter {
&percent_encode_path(&self.path),
Some(bs.len()),
self.op.content_type(),
self.op.cache_control(),
AsyncBody::Bytes(bs),
)?;

Expand Down
2 changes: 2 additions & 0 deletions core/src/services/obs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ impl Accessor for ObsBackend {
read_can_next: true,

write: true,
write_with_cache_control: true,

list: true,
scan: true,
copy: true,
Expand Down
1 change: 1 addition & 0 deletions core/src/services/oss/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ impl Accessor for OssBackend {
read_can_next: true,

write: true,
write_with_cache_control: true,
write_without_content_length: true,

list: true,
Expand Down
1 change: 1 addition & 0 deletions core/src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,7 @@ impl Accessor for S3Backend {
read_with_override_content_disposition: true,

write: true,
write_with_cache_control: true,
write_without_content_length: true,

list: true,
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/s3/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl S3Writer {
// The part size must be 5 MiB to 5 GiB. There is no minimum
// size limit on the last part of your multipart upload.
//
// We pick the default value as 8 MiB for better thoughput.
// We pick the default value as 8 MiB for better throughput.
//
// TODO: allow this value to be configured.
buffer_size: 8 * 1024 * 1024,
Expand Down
31 changes: 30 additions & 1 deletion core/tests/behavior/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use futures::AsyncSeekExt;
use futures::StreamExt;
use log::debug;
use log::warn;
use opendal::ops::{OpRead, OpStat};
use opendal::ops::{OpRead, OpStat, OpWrite};
use opendal::EntryMode;
use opendal::ErrorKind;
use opendal::Operator;
Expand Down Expand Up @@ -75,6 +75,7 @@ macro_rules! behavior_write_tests {
test_write,
test_write_with_dir_path,
test_write_with_special_chars,
test_write_with_cache_control,
test_stat,
test_stat_dir,
test_stat_with_special_chars,
Expand Down Expand Up @@ -179,6 +180,34 @@ pub async fn test_write_with_special_chars(op: Operator) -> Result<()> {
Ok(())
}

// Write a single file with cache control should succeed.
pub async fn test_write_with_cache_control(op: Operator) -> Result<()> {
if !op.info().capability().write_with_cache_control {
return Ok(());
}

let path = uuid::Uuid::new_v4().to_string();
let (content, _) = gen_bytes();

let target_cache_control = "no-cache, no-store, max-age=300";

let mut op_write = OpWrite::default();
op_write = op_write.with_cache_control(target_cache_control);

op.write_with(&path, op_write, content).await?;

let meta = op.stat(&path).await.expect("stat must succeed");
assert_eq!(meta.mode(), EntryMode::FILE);
assert_eq!(
meta.cache_control().expect("cache control must exist"),
target_cache_control
);

op.delete(&path).await.expect("delete must succeed");

Ok(())
}

/// Stat existing file should return metadata
pub async fn test_stat(op: Operator) -> Result<()> {
let path = uuid::Uuid::new_v4().to_string();
Expand Down