Skip to content

Commit

Permalink
feat(core): sets default chunk_size and sends buffer > chunk_size dir…
Browse files Browse the repository at this point in the history
…ectly (apache#4710)
  • Loading branch information
evenyag authored Jun 12, 2024
1 parent e1e430d commit 27c6a77
Show file tree
Hide file tree
Showing 4 changed files with 287 additions and 36 deletions.
2 changes: 1 addition & 1 deletion core/benches/oio/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub fn bench_exact_buf_write(c: &mut Criterion) {
group.throughput(criterion::Throughput::Bytes(size.bytes() as u64));
group.bench_with_input(size.to_string(), &content, |b, content| {
b.to_async(&*TOKIO).iter(|| async {
let mut w = ChunkedWriter::new(BlackHoleWriter, 256 * 1024);
let mut w = ChunkedWriter::new(BlackHoleWriter, 256 * 1024, true);

let mut bs = content.clone();
while !bs.is_empty() {
Expand Down
37 changes: 22 additions & 15 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,28 +423,35 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
}

// Calculate buffer size.
let chunk_size = args.chunk().map(|mut size| {
if let Some(v) = capability.write_multi_max_size {
size = size.min(v);
}
if let Some(v) = capability.write_multi_min_size {
size = size.max(v);
}
if let Some(v) = capability.write_multi_align_size {
// Make sure size >= size first.
size = size.max(v);
size -= size % v;
}
// If `chunk` is not set, we use `write_multi_min_size` or `write_multi_align_size`
// as the default size.
let chunk_size = args
.chunk()
.or(capability.write_multi_min_size)
.or(capability.write_multi_align_size)
.map(|mut size| {
if let Some(v) = capability.write_multi_max_size {
size = size.min(v);
}
if let Some(v) = capability.write_multi_min_size {
size = size.max(v);
}
if let Some(v) = capability.write_multi_align_size {
// Make sure size >= size first.
size = size.max(v);
size -= size % v;
}

size
});
size
});
let exact = args.chunk().is_some() || capability.write_multi_align_size.is_some();

let (rp, w) = self.inner.write(path, args.clone()).await?;
let w = CompleteWriter::new(w);

let w = match chunk_size {
None => TwoWays::One(w),
Some(size) => TwoWays::Two(oio::ChunkedWriter::new(w, size)),
Some(size) => TwoWays::Two(oio::ChunkedWriter::new(w, size, exact)),
};

Ok((rp, w))
Expand Down
260 changes: 250 additions & 10 deletions core/src/raw/oio/write/chunked_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,43 +15,85 @@
// specific language governing permissions and limitations
// under the License.

use bytes::Buf;

use crate::raw::*;
use crate::*;

/// ChunkedWriter is used to implement [`oio::Write`] based on chunk:
/// flush the underlying storage at the `chunk`` size.
///
/// ChunkedWriter makes sure that the size of the data written to the
/// underlying storage is exactly `chunk` bytes.
/// underlying storage is
/// - exactly `chunk` bytes if `exact` is true
/// - at least `chunk` bytes if `exact` is false
pub struct ChunkedWriter<W: oio::Write> {
inner: W,

/// The size for buffer, we will flush the underlying storage at the size of this buffer.
chunk_size: usize,
/// If `exact` is true, the size of the data written to the underlying storage is
/// exactly `chunk_size` bytes.
exact: bool,
buffer: oio::QueueBuf,
}

impl<W: oio::Write> ChunkedWriter<W> {
/// Create a new exact buf writer.
pub fn new(inner: W, chunk_size: usize) -> Self {
pub fn new(inner: W, chunk_size: usize, exact: bool) -> Self {
Self {
inner,
chunk_size,
exact,
buffer: oio::QueueBuf::new(),
}
}
}

impl<W: oio::Write> oio::Write for ChunkedWriter<W> {
async fn write(&mut self, mut bs: Buffer) -> Result<usize> {
if self.buffer.len() >= self.chunk_size {
let written = self.inner.write(self.buffer.clone().collect()).await?;
self.buffer.advance(written);
if self.exact {
if self.buffer.len() >= self.chunk_size {
let written = self.inner.write(self.buffer.clone().collect()).await?;
self.buffer.advance(written);
}

let remaining = self.chunk_size - self.buffer.len();
bs.truncate(remaining);
let n = bs.len();
self.buffer.push(bs);
return Ok(n);
}
// We are in inexact mode.

if self.buffer.len() + bs.len() < self.chunk_size {
// We haven't buffered enough data.
let n = bs.len();
self.buffer.push(bs);
return Ok(n);
}
// We have enough data to send.

if self.buffer.is_empty() {
// Fast path: Sends the buffer directly if the buffer queue is empty.
return self.inner.write(bs).await;
}

let remaining = self.chunk_size - self.buffer.len();
bs.truncate(remaining);
let n = bs.len();
// If we always push `bs` to the buffer queue, the buffer queue may grow infinitely if inner
// doesn't fully consume the queue. So we clone the buffer queue and send it with `bs` first.
let mut buffer = self.buffer.clone();
buffer.push(bs.clone());
let written = self.inner.write(buffer.collect()).await?;
// The number of bytes in `self.buffer` that already written.
let queue_written = written.min(self.buffer.len());
self.buffer.advance(queue_written);
// The number of bytes in `bs` that already written.
let bs_written = written - queue_written;
// Skip bytes that already written.
bs.advance(bs_written);
// We already sent `written` bytes so we put more `written` bytes into the buffer queue.
bs.truncate(written);
let n = bs_written + bs.len();
self.buffer.push(bs);
Ok(n)
}
Expand Down Expand Up @@ -124,7 +166,7 @@ mod tests {
let mut expected = vec![0; 5];
rng.fill_bytes(&mut expected);

let mut w = ChunkedWriter::new(MockWriter { buf: vec![] }, 10);
let mut w = ChunkedWriter::new(MockWriter { buf: vec![] }, 10, true);

let mut bs = Bytes::from(expected.clone());
while !bs.is_empty() {
Expand All @@ -142,6 +184,204 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_inexact_buf_writer_large_write() -> Result<()> {
let _ = tracing_subscriber::fmt()
.pretty()
.with_test_writer()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();

let mut w = ChunkedWriter::new(MockWriter { buf: vec![] }, 10, false);

let mut rng = thread_rng();
let mut expected = vec![0; 15];
rng.fill_bytes(&mut expected);

let bs = Bytes::from(expected.clone());
// The MockWriter always returns the first chunk size.
let n = w.write(bs.into()).await?;
assert_eq!(expected.len(), n);

w.close().await?;

assert_eq!(w.inner.buf.len(), expected.len());
assert_eq!(
format!("{:x}", Sha256::digest(&w.inner.buf)),
format!("{:x}", Sha256::digest(&expected))
);
Ok(())
}

#[tokio::test]
async fn test_inexact_buf_writer_combine_small() -> Result<()> {
let _ = tracing_subscriber::fmt()
.pretty()
.with_test_writer()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();

let mut w = ChunkedWriter::new(MockWriter { buf: vec![] }, 10, false);

let mut rng = thread_rng();
let mut expected = vec![];

let mut new_content = |size| {
let mut content = vec![0; size];
rng.fill_bytes(&mut content);
expected.extend_from_slice(&content);
Bytes::from(content)
};

// content > chunk size.
let content = new_content(15);
assert_eq!(15, w.write(content.into()).await?);
// content < chunk size.
let content = new_content(5);
assert_eq!(5, w.write(content.into()).await?);
// content > chunk size, but 5 bytes in queue.
let mut content = new_content(15);
// The MockWriter can only send 5 bytes each time, so we can only advance 5 bytes.
assert_eq!(5, w.write(content.clone().into()).await?);
content.advance(5);
assert_eq!(5, w.write(content.clone().into()).await?);
content.advance(5);
assert_eq!(5, w.write(content.clone().into()).await?);

w.close().await?;

assert_eq!(w.inner.buf.len(), expected.len());
assert_eq!(
format!("{:x}", Sha256::digest(&w.inner.buf)),
format!("{:x}", Sha256::digest(&expected))
);
Ok(())
}

#[tokio::test]
async fn test_inexact_buf_writer_queue_remaining() -> Result<()> {
let _ = tracing_subscriber::fmt()
.pretty()
.with_test_writer()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();

let mut w = ChunkedWriter::new(MockWriter { buf: vec![] }, 10, false);

let mut rng = thread_rng();
let mut expected = vec![];

let mut new_content = |size| {
let mut content = vec![0; size];
rng.fill_bytes(&mut content);
expected.extend_from_slice(&content);
Bytes::from(content)
};

// content > chunk size.
let content = new_content(15);
assert_eq!(15, w.write(content.into()).await?);
// content < chunk size.
let content = new_content(5);
assert_eq!(5, w.write(content.into()).await?);
// content < chunk size.
let content = new_content(3);
assert_eq!(3, w.write(content.into()).await?);
// content > chunk size, but only sends the first chunk in the queue.
let mut content = new_content(15);
assert_eq!(5, w.write(content.clone().into()).await?);
// queue: 3, 5, bs: 10
content.advance(5);
assert_eq!(3, w.write(content.clone().into()).await?);
// queue: 5, 3, bs: 7
content.advance(3);
assert_eq!(5, w.write(content.clone().into()).await?);
// queue: 3, 5, bs: 2
content.advance(5);
assert_eq!(2, w.write(content.clone().into()).await?);
// queue: 5, 2, bs: empty.
content.advance(2);
assert!(content.is_empty());

w.close().await?;

assert_eq!(w.inner.buf.len(), expected.len());
assert_eq!(
format!("{:x}", Sha256::digest(&w.inner.buf)),
format!("{:x}", Sha256::digest(&expected))
);
Ok(())
}

struct PartialWriter {
buf: Vec<u8>,
}

impl Write for PartialWriter {
async fn write(&mut self, mut bs: Buffer) -> Result<usize> {
if Buffer::count(&bs) > 1 {
// Always leaves last buffer for non-contiguous buffer.
let mut written = 0;
while Buffer::count(&bs) > 1 {
let chunk = bs.chunk();
self.buf.extend_from_slice(chunk);
written += chunk.len();
bs.advance(chunk.len());
}
Ok(written)
} else {
let chunk = bs.chunk();
self.buf.extend_from_slice(chunk);
Ok(chunk.len())
}
}

async fn close(&mut self) -> Result<()> {
Ok(())
}

async fn abort(&mut self) -> Result<()> {
Ok(())
}
}

#[tokio::test]
async fn test_inexact_buf_writer_partial_send() -> Result<()> {
let _ = tracing_subscriber::fmt()
.pretty()
.with_test_writer()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();

let mut w = ChunkedWriter::new(PartialWriter { buf: vec![] }, 10, false);

let mut rng = thread_rng();
let mut expected = vec![];

let mut new_content = |size| {
let mut content = vec![0; size];
rng.fill_bytes(&mut content);
expected.extend_from_slice(&content);
Bytes::from(content)
};

// content < chunk size.
let content = new_content(5);
assert_eq!(5, w.write(content.into()).await?);
// Non-contiguous buffer.
let content = Buffer::from(vec![new_content(3), new_content(2)]);
assert_eq!(5, w.write(content).await?);

w.close().await?;

assert_eq!(w.inner.buf.len(), expected.len());
assert_eq!(
format!("{:x}", Sha256::digest(&w.inner.buf)),
format!("{:x}", Sha256::digest(&expected))
);
Ok(())
}

#[tokio::test]
async fn test_fuzz_exact_buf_writer() -> Result<()> {
let _ = tracing_subscriber::fmt()
Expand All @@ -154,7 +394,7 @@ mod tests {
let mut expected = vec![];

let buffer_size = rng.gen_range(1..10);
let mut writer = ChunkedWriter::new(MockWriter { buf: vec![] }, buffer_size);
let mut writer = ChunkedWriter::new(MockWriter { buf: vec![] }, buffer_size, true);
debug!("test_fuzz_exact_buf_writer: buffer size: {buffer_size}");

for _ in 0..1000 {
Expand Down
Loading

0 comments on commit 27c6a77

Please sign in to comment.