Skip to content

Commit

Permalink
refactor: Allow reusing the same operator to speed up tests (#2068)
Browse files Browse the repository at this point in the history
* refactor: Allow reusing the same operator to speed up tests

Signed-off-by: Xuanwo <github@xuanwo.io>

* Fix scan root

Signed-off-by: Xuanwo <github@xuanwo.io>

* Fix debug for cap

Signed-off-by: Xuanwo <github@xuanwo.io>

* avoid conflict

Signed-off-by: Xuanwo <github@xuanwo.io>

* Fix fs

Signed-off-by: Xuanwo <github@xuanwo.io>

* Allow retry content incomplete

Signed-off-by: Xuanwo <github@xuanwo.io>

* Allow retry for copy and rename

Signed-off-by: Xuanwo <github@xuanwo.io>

* FIx list tests

Signed-off-by: Xuanwo <github@xuanwo.io>

* Fix ensure root

Signed-off-by: Xuanwo <github@xuanwo.io>

* Allow retry for redis error

Signed-off-by: Xuanwo <github@xuanwo.io>

* Refactor random root

Signed-off-by: Xuanwo <github@xuanwo.io>

* Use better cap debug format

Signed-off-by: Xuanwo <github@xuanwo.io>

* Better!

Signed-off-by: Xuanwo <github@xuanwo.io>

* Fix webdav

Signed-off-by: Xuanwo <github@xuanwo.io>

* ftp should ensure parent dir exists

Signed-off-by: Xuanwo <github@xuanwo.io>

* Refactor tests

Signed-off-by: Xuanwo <github@xuanwo.io>

* format code

Signed-off-by: Xuanwo <github@xuanwo.io>

---------

Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo authored Apr 23, 2023
1 parent bf9dad6 commit ca53000
Show file tree
Hide file tree
Showing 23 changed files with 425 additions and 320 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/service_test_http.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ jobs:
RUST_LOG: debug
OPENDAL_HTTP_TEST: on
OPENDAL_HTTP_ENDPOINT: http://127.0.0.1:8080
OPENDAL_DISABLE_RANDOM_ROOT: true

caddy:
runs-on: ${{ matrix.os }}
Expand Down Expand Up @@ -102,3 +103,4 @@ jobs:
RUST_LOG: debug
OPENDAL_HTTP_TEST: on
OPENDAL_HTTP_ENDPOINT: http://127.0.0.1:8080
OPENDAL_DISABLE_RANDOM_ROOT: true
2 changes: 2 additions & 0 deletions .github/workflows/service_test_ipfs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ jobs:
OPENDAL_IPFS_TEST: on
OPENDAL_IPFS_ROOT: /ipfs/QmPpCt1aYGb9JWJRmXRUnmJtVgeFFTJGzWFYEEX7bo9zGJ/
OPENDAL_IPFS_ENDPOINT: "http://127.0.0.1:8080"
OPENDAL_DISABLE_RANDOM_ROOT: true

# # ipfs.io can't pass our test by now, we should address them in the future.
# ipfs-io:
# runs-on: ubuntu-latest
Expand Down
28 changes: 28 additions & 0 deletions core/src/layers/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,34 @@ impl<A: Accessor> LayeredAccessor for RetryAccessor<A> {
.await
}

async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
{ || self.inner.copy(from, to, args.clone()) }
.retry(&self.builder)
.when(|e| e.is_temporary())
.notify(|err, dur| {
warn!(
target: "opendal::service",
"operation={} -> retry after {}s: error={:?}",
Operation::Copy, dur.as_secs_f64(), err)
})
.map(|v| v.map_err(|e| e.set_persistent()))
.await
}

async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
{ || self.inner.rename(from, to, args.clone()) }
.retry(&self.builder)
.when(|e| e.is_temporary())
.notify(|err, dur| {
warn!(
target: "opendal::service",
"operation={} -> retry after {}s: error={:?}",
Operation::Rename, dur.as_secs_f64(), err)
})
.map(|v| v.map_err(|e| e.set_persistent()))
.await
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> {
{ || self.inner.list(path, args.clone()) }
.retry(&self.builder)
Expand Down
6 changes: 4 additions & 2 deletions core/src/raw/http_util/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,13 @@ impl IncomingAsyncBody {
Ordering::Less => Err(Error::new(
ErrorKind::ContentIncomplete,
&format!("reader got too less data, expect: {expect}, actual: {actual}"),
)),
)
.set_temporary()),
Ordering::Greater => Err(Error::new(
ErrorKind::ContentTruncated,
&format!("reader got too much data, expect: {expect}, actual: {actual}"),
)),
)
.set_temporary()),
}
}
}
Expand Down
46 changes: 31 additions & 15 deletions core/src/services/ftp/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,22 +335,16 @@ impl Accessor for FtpBackend {

for path in paths {
curr_path.push_str(path);
// try to create directory
if curr_path.ends_with('/') {
match ftp_stream.mkdir(&curr_path).await {
// Do nothing if status is FileUnavailable or OK(()) is return.
Err(FtpError::UnexpectedResponse(Response {
status: Status::FileUnavailable,
..
}))
| Ok(()) => (),
Err(e) => {
return Err(e.into());
}
match ftp_stream.mkdir(&curr_path).await {
// Do nothing if status is FileUnavailable or OK(()) is return.
Err(FtpError::UnexpectedResponse(Response {
status: Status::FileUnavailable,
..
}))
| Ok(()) => (),
Err(e) => {
return Err(e.into());
}
} else {
// else, create file
ftp_stream.put_file(&curr_path, &mut "".as_bytes()).await?;
}
}

Expand Down Expand Up @@ -398,6 +392,28 @@ impl Accessor for FtpBackend {
));
}

// Ensure the parent dir exists.
let parent = get_parent(path);
let paths: Vec<&str> = parent.split('/').collect();

// TODO: we can optimize this by checking dir existence first.
let mut ftp_stream = self.ftp_connect(Operation::Write).await?;
let mut curr_path = String::new();
for path in paths {
curr_path.push_str(path);
match ftp_stream.mkdir(&curr_path).await {
// Do nothing if status is FileUnavailable or OK(()) is return.
Err(FtpError::UnexpectedResponse(Response {
status: Status::FileUnavailable,
..
}))
| Ok(()) => (),
Err(e) => {
return Err(e.into());
}
}
}

Ok((
RpWrite::new(),
FtpWriter::new(self.clone(), path.to_string()),
Expand Down
4 changes: 3 additions & 1 deletion core/src/services/redis/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,8 @@ impl kv::Adapter for Adapter {

impl From<RedisError> for Error {
fn from(e: RedisError) -> Self {
Error::new(ErrorKind::Unexpected, e.category()).set_source(e)
Error::new(ErrorKind::Unexpected, e.category())
.set_source(e)
.set_temporary()
}
}
4 changes: 4 additions & 0 deletions core/src/services/webdav/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,10 @@ impl WebdavBackend {
}

async fn ensure_parent_path(&self, path: &str) -> Result<()> {
if path == "/" {
return Ok(());
}

// create dir recursively, split path by `/` and create each dir except the last one
let abs_path = build_abs_path(&self.root, path);
let abs_path = abs_path.as_str();
Expand Down
11 changes: 6 additions & 5 deletions core/src/services/webdav/pager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,13 @@ impl oio::Page for WebdavPager {
.into_iter()
.filter_map(|de| {
let path = de.href;
let normalized_path = if self.root != path {
build_rel_path(&self.root, &path)
} else {
path
};

// Ignore the root path itself.
if self.root == path {
return None;
}

let normalized_path = build_rel_path(&self.root, &path);
if normalized_path == self.path {
// WebDav server may return the current path as an entry.
return None;
Expand Down
49 changes: 48 additions & 1 deletion core/src/types/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use std::fmt::Debug;

/// Capability is used to describe what operations are supported
/// by current Operator.
///
Expand Down Expand Up @@ -42,7 +44,7 @@
/// - Operation with variants should be named like `read_can_seek`.
/// - Operation with arguments should be named like `read_with_range`.
/// - Operation with limtations should be named like `batch_max_operations`.
#[derive(Copy, Clone, Debug, Default)]
#[derive(Copy, Clone, Default)]
pub struct Capability {
/// If operator supports stat natively, it will be true.
pub stat: bool,
Expand Down Expand Up @@ -127,3 +129,48 @@ pub struct Capability {
/// If operator supports blocking natively, it will be true.
pub blocking: bool,
}

impl Debug for Capability {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut s = vec![];

if self.read {
s.push("Read");
}
if self.stat {
s.push("Stat");
}
if self.write {
s.push("Write");
}
if self.create_dir {
s.push("CreateDir");
}
if self.delete {
s.push("Delete");
}
if self.list {
s.push("List");
}
if self.scan {
s.push("Scan");
}
if self.copy {
s.push("Copy");
}
if self.rename {
s.push("Rename");
}
if self.presign {
s.push("Presign");
}
if self.batch {
s.push("Batch");
}
if self.blocking {
s.push("Blocking");
}

write!(f, "{{ {} }}", s.join(" | "))
}
}
41 changes: 19 additions & 22 deletions core/tests/behavior/blocking_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,28 @@ use super::utils::*;
macro_rules! behavior_blocking_copy_test {
($service:ident, $($(#[$meta:meta])* $test:ident),*,) => {
paste::item! {
mod [<services_ $service:lower _blocking_copy>] {
$(
#[test]
$(
#[test]
$(
#[$meta]
)*
fn [< $test >]() -> anyhow::Result<()> {
let op = $crate::utils::init_service::<opendal::services::$service>(true);
match op {
Some(op) if op.info().can_read()
&& op.info().can_write()
&& op.info().can_copy()
&& op.info().can_blocking() => $crate::blocking_copy::$test(op.blocking()),
Some(_) => {
log::warn!("service {} doesn't support blocking_copy, ignored", opendal::Scheme::$service);
Ok(())
},
None => {
log::warn!("service {} not initiated, ignored", opendal::Scheme::$service);
Ok(())
}
#[$meta]
)*
fn [<blocking_copy_ $test >]() -> anyhow::Result<()> {
match OPERATOR.as_ref() {
Some(op) if op.info().can_read()
&& op.info().can_write()
&& op.info().can_copy()
&& op.info().can_blocking() => $crate::blocking_copy::$test(op.blocking()),
Some(_) => {
log::warn!("service {} doesn't support blocking_copy, ignored", opendal::Scheme::$service);
Ok(())
},
None => {
log::warn!("service {} not initiated, ignored", opendal::Scheme::$service);
Ok(())
}
}
)*
}
}
)*
}
};
}
Expand Down
60 changes: 33 additions & 27 deletions core/tests/behavior/blocking_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,30 +34,27 @@ use super::utils::*;
macro_rules! behavior_blocking_list_test {
($service:ident, $($(#[$meta:meta])* $test:ident),*,) => {
paste::item! {
mod [<services_ $service:lower _blocking_list>] {
$(
#[test]
$(
#[test]
$(
#[$meta]
)*
fn [< $test >]() -> anyhow::Result<()> {
let op = $crate::utils::init_service::<opendal::services::$service>(true);
match op {
Some(op) if op.info().can_read()
&& op.info().can_write()
&& op.info().can_blocking() && (op.info().can_list()||op.info().can_scan()) => $crate::blocking_list::$test(op.blocking()),
Some(_) => {
log::warn!("service {} doesn't support blocking_list, ignored", opendal::Scheme::$service);
Ok(())
},
None => {
log::warn!("service {} not initiated, ignored", opendal::Scheme::$service);
Ok(())
}
#[$meta]
)*
fn [<blocking_list_ $test >]() -> anyhow::Result<()> {
match OPERATOR.as_ref() {
Some(op) if op.info().can_read()
&& op.info().can_write()
&& op.info().can_blocking() && (op.info().can_list()||op.info().can_scan()) => $crate::blocking_list::$test(op.blocking()),
Some(_) => {
log::warn!("service {} doesn't support blocking_list, ignored", opendal::Scheme::$service);
Ok(())
},
None => {
log::warn!("service {} not initiated, ignored", opendal::Scheme::$service);
Ok(())
}
}
)*
}
}
)*
}
};
}
Expand All @@ -79,13 +76,14 @@ macro_rules! behavior_blocking_list_tests {

/// List dir should return newly created file.
pub fn test_list_dir(op: BlockingOperator) -> Result<()> {
let path = uuid::Uuid::new_v4().to_string();
let parent = uuid::Uuid::new_v4().to_string();
let path = format!("{parent}/{}", uuid::Uuid::new_v4());
debug!("Generate a random file: {}", &path);
let (content, size) = gen_bytes();

op.write(&path, content).expect("write must succeed");

let obs = op.list("/")?;
let obs = op.list(&format!("{parent}/"))?;
let mut found = false;
for de in obs {
let de = de?;
Expand Down Expand Up @@ -122,22 +120,30 @@ pub fn test_list_non_exist_dir(op: BlockingOperator) -> Result<()> {

// Walk top down should output as expected
pub fn test_scan(op: BlockingOperator) -> Result<()> {
let parent = uuid::Uuid::new_v4().to_string();

let expected = vec![
"x/", "x/y", "x/x/", "x/x/y", "x/x/x/", "x/x/x/y", "x/x/x/x/",
];
for path in expected.iter() {
if path.ends_with('/') {
op.create_dir(path)?;
op.create_dir(&format!("{parent}/{path}"))?;
} else {
op.write(path, "test_scan")?;
op.write(&format!("{parent}/{path}"), "test_scan")?;
}
}

let w = op.scan("x/")?;
let w = op.scan(&format!("{parent}/x/"))?;
let actual = w
.collect::<Vec<_>>()
.into_iter()
.map(|v| v.unwrap().path().to_string())
.map(|v| {
v.unwrap()
.path()
.strip_prefix(&format!("{parent}/"))
.unwrap()
.to_string()
})
.collect::<HashSet<_>>();

debug!("walk top down: {:?}", actual);
Expand Down
Loading

0 comments on commit ca53000

Please sign in to comment.