Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tests): Add fuzz test for writer without content length #2100

Merged
merged 3 commits into from
Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 44 additions & 7 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,21 +509,33 @@ where
}

pub struct CompleteWriter<W> {
inner: W,
inner: Option<W>,
size: Option<u64>,
written: u64,
}

impl<W> CompleteWriter<W> {
pub fn new(inner: W, size: Option<u64>) -> CompleteWriter<W> {
CompleteWriter {
inner,
inner: Some(inner),
size,
written: 0,
}
}
}

/// Check if the writer has been closed or aborted while debug_assertions
/// enabled. This code will never be executed in release mode.
#[cfg(debug_assertions)]
impl<W> Drop for CompleteWriter<W> {
fn drop(&mut self) {
if self.inner.is_some() {
// Do we need to panic here?
log::warn!("writer has not been closed or aborted, must be a bug")
}
}
}

#[async_trait]
impl<W> oio::Write for CompleteWriter<W>
where
Expand All @@ -544,13 +556,23 @@ where
}
}

self.inner.write(bs).await?;
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;
w.write(bs).await?;
self.written += n as u64;
Ok(())
}

async fn abort(&mut self) -> Result<()> {
self.inner.abort().await
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;

w.abort().await?;
self.inner = None;

Ok(())
}

async fn close(&mut self) -> Result<()> {
Expand All @@ -566,7 +588,13 @@ where
}
}

self.inner.close().await?;
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;

w.close().await?;
self.inner = None;

Ok(())
}
}
Expand All @@ -590,7 +618,11 @@ where
}
}

self.inner.write(bs)?;
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;

w.write(bs)?;
self.written += n as u64;
Ok(())
}
Expand All @@ -608,7 +640,12 @@ where
}
}

self.inner.close()?;
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;

w.close()?;
self.inner = None;
Ok(())
}
}
37 changes: 22 additions & 15 deletions core/src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1340,19 +1340,6 @@ impl<W> LoggingWriter<W> {
}
}

impl<W> Drop for LoggingWriter<W> {
fn drop(&mut self) {
debug!(
target: LOGGING_TARGET,
"service={} operation={} path={} written={} -> data written finished",
self.scheme,
self.op,
self.path,
self.written
);
}
}

#[async_trait]
impl<W: oio::Write> oio::Write for LoggingWriter<W> {
async fn write(&mut self, bs: Bytes) -> Result<()> {
Expand Down Expand Up @@ -1420,7 +1407,17 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {

async fn close(&mut self) -> Result<()> {
match self.inner.close().await {
Ok(_) => Ok(()),
Ok(_) => {
debug!(
target: LOGGING_TARGET,
"service={} operation={} path={} written={} -> data written finished",
self.scheme,
self.op,
self.path,
self.written
);
Ok(())
}
Err(err) => {
if let Some(lvl) = self.failure_level {
log!(
Expand Down Expand Up @@ -1475,7 +1472,17 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> {

fn close(&mut self) -> Result<()> {
match self.inner.close() {
Ok(_) => Ok(()),
Ok(_) => {
debug!(
target: LOGGING_TARGET,
"service={} operation={} path={} written={} -> data written finished",
self.scheme,
self.op,
self.path,
self.written
);
Ok(())
}
Err(err) => {
if let Some(lvl) = self.failure_level {
log!(
Expand Down
1 change: 1 addition & 0 deletions core/src/services/fs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ impl Accessor for FsBackend {
read: true,
read_can_seek: true,
write: true,
write_without_content_length: true,
create_dir: true,
list: true,
copy: true,
Expand Down
1 change: 1 addition & 0 deletions core/src/services/gcs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ impl Accessor for GcsBackend {
read: true,
read_can_next: true,
write: true,
write_without_content_length: true,
list: true,
scan: true,
copy: true,
Expand Down
1 change: 1 addition & 0 deletions core/src/services/oss/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ impl Accessor for OssBackend {
read: true,
read_can_next: true,
write: true,
write_without_content_length: true,
list: true,
scan: true,
copy: true,
Expand Down
2 changes: 2 additions & 0 deletions core/src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,8 @@ impl Accessor for S3Backend {
read_with_override_content_disposition: true,

write: true,
write_without_content_length: true,

list: true,
scan: true,
copy: true,
Expand Down
19 changes: 17 additions & 2 deletions core/src/types/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ use crate::*;
///
/// ## Notes
///
/// Please make sure either `close` or `abort` has been called before
/// dropping the writer otherwise the data could be lost.
///
/// ## Notes
///
/// Writer can be used in two ways:
///
/// - Sized: write data with a known size by specify the content length.
Expand Down Expand Up @@ -81,7 +86,12 @@ impl Writer {
}
}

/// Abort inner writer.
/// Abort the writer and clean up all written data.
///
/// ## Notes
///
/// Abort should only be called when the writer is not closed or
/// aborted, otherwise an unexpected error could be returned.
pub async fn abort(&mut self) -> Result<()> {
if let State::Idle(Some(w)) = &mut self.state {
w.abort().await
Expand All @@ -93,7 +103,12 @@ impl Writer {
}
}

/// Close the writer and make sure all data have been stored.
/// Close the writer and make sure all data have been committed.
///
/// ## Notes
///
/// Close should only be called when the writer is not closed or
/// aborted, otherwise an unexpected error could be returned.
pub async fn close(&mut self) -> Result<()> {
if let State::Idle(Some(w)) = &mut self.state {
w.close().await
Expand Down
96 changes: 96 additions & 0 deletions core/tests/behavior/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

use std::collections::HashMap;
use std::env;
use std::fmt;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::io::SeekFrom;
use std::usize;

Expand Down Expand Up @@ -279,3 +282,96 @@ impl ObjectReaderFuzzer {
}
}
}

/// ObjectWriterFuzzer is the fuzzer for object writer.
///
/// We will generate random write operations to operate on object
/// write to check if the output is expected.
///
/// # TODO
///
/// This fuzzer only generate valid operations.
///
/// In the future, we need to generate invalid operations to check if we
/// handled correctly.
pub struct ObjectWriterFuzzer {
name: String,
bs: Vec<u8>,

size: Option<usize>,
cur: usize,
rng: ThreadRng,
actions: Vec<ObjectWriterAction>,
}

#[derive(Clone)]
pub enum ObjectWriterAction {
Write(Bytes),
}

impl Debug for ObjectWriterAction {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
ObjectWriterAction::Write(bs) => write!(f, "Write({})", bs.len()),
}
}
}

impl ObjectWriterFuzzer {
/// Create a new fuzzer.
pub fn new(name: &str, size: Option<usize>) -> Self {
Self {
name: name.to_string(),
bs: Vec::new(),

size,
cur: 0,

rng: thread_rng(),
actions: vec![],
}
}

/// Generate a new action.
pub fn fuzz(&mut self) -> ObjectWriterAction {
let max = if let Some(size) = self.size {
size - self.cur
} else {
// Set max to 1MiB
1024 * 1024
};

let size = self.rng.gen_range(0..max);

let mut bs = vec![0; size];
self.rng.fill_bytes(&mut bs);

let bs = Bytes::from(bs);
self.bs.extend_from_slice(&bs);
self.cur += bs.len();

let action = ObjectWriterAction::Write(bs);
debug!("{} perform fuzz action: {:?}", self.name, action);

self.actions.push(action.clone());

action
}

/// Check if read operation is expected.
pub fn check(&mut self, actual_bs: &[u8]) {
assert_eq!(
self.bs.len(),
actual_bs.len(),
"check failed: expected len is different with actual len, actions: {:?}",
self.actions
);

assert_eq!(
format!("{:x}", Sha256::digest(&self.bs)),
format!("{:x}", Sha256::digest(actual_bs)),
"check failed: expected bs is different with actual bs, actions: {:?}",
self.actions,
);
}
}
28 changes: 28 additions & 0 deletions core/tests/behavior/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ macro_rules! behavior_write_tests {
test_writer_write,
test_writer_abort,
test_writer_futures_copy,
test_fuzz_unsized_writer,
);
)*
};
Expand Down Expand Up @@ -885,3 +886,30 @@ pub async fn test_writer_futures_copy(op: Operator) -> Result<()> {
op.delete(&path).await.expect("delete must succeed");
Ok(())
}

/// Add test for unsized writer
pub async fn test_fuzz_unsized_writer(op: Operator) -> Result<()> {
if !op.info().capability().write_without_content_length {
warn!("{op:?} doesn't support write without content length, test skip");
return Ok(());
}

let path = uuid::Uuid::new_v4().to_string();

let mut fuzzer = ObjectWriterFuzzer::new(&path, None);

let mut w = op.writer(&path).await?;

for _ in 0..100 {
match fuzzer.fuzz() {
ObjectWriterAction::Write(bs) => w.write(bs).await?,
}
}
w.close().await?;

let content = op.read(&path).await?;
fuzzer.check(&content);

op.delete(&path).await.expect("delete must succeed");
Ok(())
}