Skip to content

Commit

Permalink
feat(tests): Add fuzz test for writer without content length (#2100)
Browse files Browse the repository at this point in the history
* feat(tests): Add fuzz test for writer

Signed-off-by: Xuanwo <github@xuanwo.io>

* Fix writer could lost data

Signed-off-by: Xuanwo <github@xuanwo.io>

* change to log warning

Signed-off-by: Xuanwo <github@xuanwo.io>

---------

Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo authored Apr 24, 2023
1 parent 0e887f7 commit 2fed1a1
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 24 deletions.
51 changes: 44 additions & 7 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,21 +509,33 @@ where
}

pub struct CompleteWriter<W> {
inner: W,
inner: Option<W>,
size: Option<u64>,
written: u64,
}

impl<W> CompleteWriter<W> {
pub fn new(inner: W, size: Option<u64>) -> CompleteWriter<W> {
CompleteWriter {
inner,
inner: Some(inner),
size,
written: 0,
}
}
}

/// Check if the writer has been closed or aborted while debug_assertions
/// enabled. This code will never be executed in release mode.
#[cfg(debug_assertions)]
impl<W> Drop for CompleteWriter<W> {
fn drop(&mut self) {
if self.inner.is_some() {
// Do we need to panic here?
log::warn!("writer has not been closed or aborted, must be a bug")
}
}
}

#[async_trait]
impl<W> oio::Write for CompleteWriter<W>
where
Expand All @@ -544,13 +556,23 @@ where
}
}

self.inner.write(bs).await?;
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;
w.write(bs).await?;
self.written += n as u64;
Ok(())
}

async fn abort(&mut self) -> Result<()> {
self.inner.abort().await
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;

w.abort().await?;
self.inner = None;

Ok(())
}

async fn close(&mut self) -> Result<()> {
Expand All @@ -566,7 +588,13 @@ where
}
}

self.inner.close().await?;
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;

w.close().await?;
self.inner = None;

Ok(())
}
}
Expand All @@ -590,7 +618,11 @@ where
}
}

self.inner.write(bs)?;
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;

w.write(bs)?;
self.written += n as u64;
Ok(())
}
Expand All @@ -608,7 +640,12 @@ where
}
}

self.inner.close()?;
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;

w.close()?;
self.inner = None;
Ok(())
}
}
37 changes: 22 additions & 15 deletions core/src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1340,19 +1340,6 @@ impl<W> LoggingWriter<W> {
}
}

impl<W> Drop for LoggingWriter<W> {
fn drop(&mut self) {
debug!(
target: LOGGING_TARGET,
"service={} operation={} path={} written={} -> data written finished",
self.scheme,
self.op,
self.path,
self.written
);
}
}

#[async_trait]
impl<W: oio::Write> oio::Write for LoggingWriter<W> {
async fn write(&mut self, bs: Bytes) -> Result<()> {
Expand Down Expand Up @@ -1420,7 +1407,17 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {

async fn close(&mut self) -> Result<()> {
match self.inner.close().await {
Ok(_) => Ok(()),
Ok(_) => {
debug!(
target: LOGGING_TARGET,
"service={} operation={} path={} written={} -> data written finished",
self.scheme,
self.op,
self.path,
self.written
);
Ok(())
}
Err(err) => {
if let Some(lvl) = self.failure_level {
log!(
Expand Down Expand Up @@ -1475,7 +1472,17 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> {

fn close(&mut self) -> Result<()> {
match self.inner.close() {
Ok(_) => Ok(()),
Ok(_) => {
debug!(
target: LOGGING_TARGET,
"service={} operation={} path={} written={} -> data written finished",
self.scheme,
self.op,
self.path,
self.written
);
Ok(())
}
Err(err) => {
if let Some(lvl) = self.failure_level {
log!(
Expand Down
1 change: 1 addition & 0 deletions core/src/services/fs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ impl Accessor for FsBackend {
read: true,
read_can_seek: true,
write: true,
write_without_content_length: true,
create_dir: true,
list: true,
copy: true,
Expand Down
1 change: 1 addition & 0 deletions core/src/services/gcs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ impl Accessor for GcsBackend {
read: true,
read_can_next: true,
write: true,
write_without_content_length: true,
list: true,
scan: true,
copy: true,
Expand Down
1 change: 1 addition & 0 deletions core/src/services/oss/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ impl Accessor for OssBackend {
read: true,
read_can_next: true,
write: true,
write_without_content_length: true,
list: true,
scan: true,
copy: true,
Expand Down
2 changes: 2 additions & 0 deletions core/src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,8 @@ impl Accessor for S3Backend {
read_with_override_content_disposition: true,

write: true,
write_without_content_length: true,

list: true,
scan: true,
copy: true,
Expand Down
19 changes: 17 additions & 2 deletions core/src/types/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ use crate::*;
///
/// ## Notes
///
/// Please make sure either `close` or `abort` has been called before
/// dropping the writer otherwise the data could be lost.
///
/// ## Notes
///
/// Writer can be used in two ways:
///
/// - Sized: write data with a known size by specify the content length.
Expand Down Expand Up @@ -81,7 +86,12 @@ impl Writer {
}
}

/// Abort inner writer.
/// Abort the writer and clean up all written data.
///
/// ## Notes
///
/// Abort should only be called when the writer is not closed or
/// aborted, otherwise an unexpected error could be returned.
pub async fn abort(&mut self) -> Result<()> {
if let State::Idle(Some(w)) = &mut self.state {
w.abort().await
Expand All @@ -93,7 +103,12 @@ impl Writer {
}
}

/// Close the writer and make sure all data have been stored.
/// Close the writer and make sure all data have been committed.
///
/// ## Notes
///
/// Close should only be called when the writer is not closed or
/// aborted, otherwise an unexpected error could be returned.
pub async fn close(&mut self) -> Result<()> {
if let State::Idle(Some(w)) = &mut self.state {
w.close().await
Expand Down
96 changes: 96 additions & 0 deletions core/tests/behavior/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

use std::collections::HashMap;
use std::env;
use std::fmt;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::io::SeekFrom;
use std::usize;

Expand Down Expand Up @@ -279,3 +282,96 @@ impl ObjectReaderFuzzer {
}
}
}

/// ObjectWriterFuzzer is the fuzzer for object writer.
///
/// We will generate random write operations to operate on object
/// write to check if the output is expected.
///
/// # TODO
///
/// This fuzzer only generate valid operations.
///
/// In the future, we need to generate invalid operations to check if we
/// handled correctly.
pub struct ObjectWriterFuzzer {
name: String,
bs: Vec<u8>,

size: Option<usize>,
cur: usize,
rng: ThreadRng,
actions: Vec<ObjectWriterAction>,
}

#[derive(Clone)]
pub enum ObjectWriterAction {
Write(Bytes),
}

impl Debug for ObjectWriterAction {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
ObjectWriterAction::Write(bs) => write!(f, "Write({})", bs.len()),
}
}
}

impl ObjectWriterFuzzer {
/// Create a new fuzzer.
pub fn new(name: &str, size: Option<usize>) -> Self {
Self {
name: name.to_string(),
bs: Vec::new(),

size,
cur: 0,

rng: thread_rng(),
actions: vec![],
}
}

/// Generate a new action.
pub fn fuzz(&mut self) -> ObjectWriterAction {
let max = if let Some(size) = self.size {
size - self.cur
} else {
// Set max to 1MiB
1024 * 1024
};

let size = self.rng.gen_range(0..max);

let mut bs = vec![0; size];
self.rng.fill_bytes(&mut bs);

let bs = Bytes::from(bs);
self.bs.extend_from_slice(&bs);
self.cur += bs.len();

let action = ObjectWriterAction::Write(bs);
debug!("{} perform fuzz action: {:?}", self.name, action);

self.actions.push(action.clone());

action
}

/// Check if read operation is expected.
pub fn check(&mut self, actual_bs: &[u8]) {
assert_eq!(
self.bs.len(),
actual_bs.len(),
"check failed: expected len is different with actual len, actions: {:?}",
self.actions
);

assert_eq!(
format!("{:x}", Sha256::digest(&self.bs)),
format!("{:x}", Sha256::digest(actual_bs)),
"check failed: expected bs is different with actual bs, actions: {:?}",
self.actions,
);
}
}
28 changes: 28 additions & 0 deletions core/tests/behavior/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ macro_rules! behavior_write_tests {
test_writer_write,
test_writer_abort,
test_writer_futures_copy,
test_fuzz_unsized_writer,
);
)*
};
Expand Down Expand Up @@ -885,3 +886,30 @@ pub async fn test_writer_futures_copy(op: Operator) -> Result<()> {
op.delete(&path).await.expect("delete must succeed");
Ok(())
}

/// Add test for unsized writer
pub async fn test_fuzz_unsized_writer(op: Operator) -> Result<()> {
if !op.info().capability().write_without_content_length {
warn!("{op:?} doesn't support write without content length, test skip");
return Ok(());
}

let path = uuid::Uuid::new_v4().to_string();

let mut fuzzer = ObjectWriterFuzzer::new(&path, None);

let mut w = op.writer(&path).await?;

for _ in 0..100 {
match fuzzer.fuzz() {
ObjectWriterAction::Write(bs) => w.write(bs).await?,
}
}
w.close().await?;

let content = op.read(&path).await?;
fuzzer.check(&content);

op.delete(&path).await.expect("delete must succeed");
Ok(())
}

0 comments on commit 2fed1a1

Please sign in to comment.