Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): sets default chunk_size and sends buffer > chunk_size directly #4710

Merged
merged 11 commits into from
Jun 12, 2024
2 changes: 1 addition & 1 deletion core/benches/oio/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub fn bench_exact_buf_write(c: &mut Criterion) {
group.throughput(criterion::Throughput::Bytes(size.bytes() as u64));
group.bench_with_input(size.to_string(), &content, |b, content| {
b.to_async(&*TOKIO).iter(|| async {
let mut w = ChunkedWriter::new(BlackHoleWriter, 256 * 1024);
let mut w = ChunkedWriter::new(BlackHoleWriter, 256 * 1024, true);

let mut bs = content.clone();
while !bs.is_empty() {
Expand Down
37 changes: 22 additions & 15 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,28 +423,35 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
}

// Calculate buffer size.
let chunk_size = args.chunk().map(|mut size| {
if let Some(v) = capability.write_multi_max_size {
size = size.min(v);
}
if let Some(v) = capability.write_multi_min_size {
size = size.max(v);
}
if let Some(v) = capability.write_multi_align_size {
// Make sure size >= size first.
size = size.max(v);
size -= size % v;
}
// If `chunk` is not set, we use `write_multi_min_size` or `write_multi_align_size`
// as the default size.
let chunk_size = args
.chunk()
.or(capability.write_multi_min_size)
.or(capability.write_multi_align_size)
.map(|mut size| {
if let Some(v) = capability.write_multi_max_size {
size = size.min(v);
}
if let Some(v) = capability.write_multi_min_size {
size = size.max(v);
}
if let Some(v) = capability.write_multi_align_size {
// Make sure size >= size first.
size = size.max(v);
size -= size % v;
}

size
});
size
});
let exact = args.chunk().is_some() || capability.write_multi_align_size.is_some();

let (rp, w) = self.inner.write(path, args.clone()).await?;
let w = CompleteWriter::new(w);

let w = match chunk_size {
None => TwoWays::One(w),
Some(size) => TwoWays::Two(oio::ChunkedWriter::new(w, size)),
Some(size) => TwoWays::Two(oio::ChunkedWriter::new(w, size, exact)),
};

Ok((rp, w))
Expand Down
260 changes: 250 additions & 10 deletions core/src/raw/oio/write/chunked_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,43 +15,85 @@
// specific language governing permissions and limitations
// under the License.

use bytes::Buf;

use crate::raw::*;
use crate::*;

/// ChunkedWriter is used to implement [`oio::Write`] based on chunk:
/// flush the underlying storage at the `chunk`` size.
///
/// ChunkedWriter makes sure that the size of the data written to the
/// underlying storage is exactly `chunk` bytes.
/// underlying storage is
/// - exactly `chunk` bytes if `exact` is true
/// - at least `chunk` bytes if `exact` is false
pub struct ChunkedWriter<W: oio::Write> {
inner: W,

/// The size for buffer, we will flush the underlying storage at the size of this buffer.
chunk_size: usize,
/// If `exact` is true, the size of the data written to the underlying storage is
/// exactly `chunk_size` bytes.
exact: bool,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice idea about introducing exact.

buffer: oio::QueueBuf,
}

impl<W: oio::Write> ChunkedWriter<W> {
/// Create a new exact buf writer.
pub fn new(inner: W, chunk_size: usize) -> Self {
pub fn new(inner: W, chunk_size: usize, exact: bool) -> Self {
Self {
inner,
chunk_size,
exact,
buffer: oio::QueueBuf::new(),
}
}
}

impl<W: oio::Write> oio::Write for ChunkedWriter<W> {
async fn write(&mut self, mut bs: Buffer) -> Result<usize> {
if self.buffer.len() >= self.chunk_size {
let written = self.inner.write(self.buffer.clone().collect()).await?;
self.buffer.advance(written);
if self.exact {
if self.buffer.len() >= self.chunk_size {
let written = self.inner.write(self.buffer.clone().collect()).await?;
self.buffer.advance(written);
}

let remaining = self.chunk_size - self.buffer.len();
bs.truncate(remaining);
let n = bs.len();
self.buffer.push(bs);
return Ok(n);
}
// We are in inexact mode.

if self.buffer.len() + bs.len() < self.chunk_size {
// We haven't buffered enough data.
let n = bs.len();
self.buffer.push(bs);
return Ok(n);
}
// We have enough data to send.

if self.buffer.is_empty() {
// Fast path: Sends the buffer directly if the buffer queue is empty.
return self.inner.write(bs).await;
}

let remaining = self.chunk_size - self.buffer.len();
bs.truncate(remaining);
let n = bs.len();
// If we always push `bs` to the buffer queue, the buffer queue may grow infinitely if inner
// doesn't fully consume the queue. So we clone the buffer queue and send it with `bs` first.
let mut buffer = self.buffer.clone();
buffer.push(bs.clone());
let written = self.inner.write(buffer.collect()).await?;
// The number of bytes in `self.buffer` that already written.
let queue_written = written.min(self.buffer.len());
self.buffer.advance(queue_written);
// The number of bytes in `bs` that already written.
let bs_written = written - queue_written;
// Skip bytes that already written.
bs.advance(bs_written);
// We already sent `written` bytes so we put more `written` bytes into the buffer queue.
bs.truncate(written);
let n = bs_written + bs.len();
self.buffer.push(bs);
Ok(n)
}
Expand Down Expand Up @@ -124,7 +166,7 @@ mod tests {
let mut expected = vec![0; 5];
rng.fill_bytes(&mut expected);

let mut w = ChunkedWriter::new(MockWriter { buf: vec![] }, 10);
let mut w = ChunkedWriter::new(MockWriter { buf: vec![] }, 10, true);

let mut bs = Bytes::from(expected.clone());
while !bs.is_empty() {
Expand All @@ -142,6 +184,204 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_inexact_buf_writer_large_write() -> Result<()> {
let _ = tracing_subscriber::fmt()
.pretty()
.with_test_writer()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();

let mut w = ChunkedWriter::new(MockWriter { buf: vec![] }, 10, false);

let mut rng = thread_rng();
let mut expected = vec![0; 15];
rng.fill_bytes(&mut expected);

let bs = Bytes::from(expected.clone());
// The MockWriter always returns the first chunk size.
let n = w.write(bs.into()).await?;
assert_eq!(expected.len(), n);

w.close().await?;

assert_eq!(w.inner.buf.len(), expected.len());
assert_eq!(
format!("{:x}", Sha256::digest(&w.inner.buf)),
format!("{:x}", Sha256::digest(&expected))
);
Ok(())
}

#[tokio::test]
async fn test_inexact_buf_writer_combine_small() -> Result<()> {
let _ = tracing_subscriber::fmt()
.pretty()
.with_test_writer()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();

let mut w = ChunkedWriter::new(MockWriter { buf: vec![] }, 10, false);

let mut rng = thread_rng();
let mut expected = vec![];

let mut new_content = |size| {
let mut content = vec![0; size];
rng.fill_bytes(&mut content);
expected.extend_from_slice(&content);
Bytes::from(content)
};

// content > chunk size.
let content = new_content(15);
assert_eq!(15, w.write(content.into()).await?);
// content < chunk size.
let content = new_content(5);
assert_eq!(5, w.write(content.into()).await?);
// content > chunk size, but 5 bytes in queue.
let mut content = new_content(15);
// The MockWriter can only send 5 bytes each time, so we can only advance 5 bytes.
assert_eq!(5, w.write(content.clone().into()).await?);
content.advance(5);
assert_eq!(5, w.write(content.clone().into()).await?);
content.advance(5);
assert_eq!(5, w.write(content.clone().into()).await?);

w.close().await?;

assert_eq!(w.inner.buf.len(), expected.len());
assert_eq!(
format!("{:x}", Sha256::digest(&w.inner.buf)),
format!("{:x}", Sha256::digest(&expected))
);
Ok(())
}

#[tokio::test]
async fn test_inexact_buf_writer_queue_remaining() -> Result<()> {
let _ = tracing_subscriber::fmt()
.pretty()
.with_test_writer()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();

let mut w = ChunkedWriter::new(MockWriter { buf: vec![] }, 10, false);

let mut rng = thread_rng();
let mut expected = vec![];

let mut new_content = |size| {
let mut content = vec![0; size];
rng.fill_bytes(&mut content);
expected.extend_from_slice(&content);
Bytes::from(content)
};

// content > chunk size.
let content = new_content(15);
assert_eq!(15, w.write(content.into()).await?);
// content < chunk size.
let content = new_content(5);
assert_eq!(5, w.write(content.into()).await?);
// content < chunk size.
let content = new_content(3);
assert_eq!(3, w.write(content.into()).await?);
// content > chunk size, but only sends the first chunk in the queue.
let mut content = new_content(15);
assert_eq!(5, w.write(content.clone().into()).await?);
// queue: 3, 5, bs: 10
content.advance(5);
assert_eq!(3, w.write(content.clone().into()).await?);
// queue: 5, 3, bs: 7
content.advance(3);
assert_eq!(5, w.write(content.clone().into()).await?);
// queue: 3, 5, bs: 2
content.advance(5);
assert_eq!(2, w.write(content.clone().into()).await?);
// queue: 5, 2, bs: empty.
content.advance(2);
assert!(content.is_empty());

w.close().await?;

assert_eq!(w.inner.buf.len(), expected.len());
assert_eq!(
format!("{:x}", Sha256::digest(&w.inner.buf)),
format!("{:x}", Sha256::digest(&expected))
);
Ok(())
}

struct PartialWriter {
buf: Vec<u8>,
}

impl Write for PartialWriter {
async fn write(&mut self, mut bs: Buffer) -> Result<usize> {
if Buffer::count(&bs) > 1 {
// Always leaves last buffer for non-contiguous buffer.
let mut written = 0;
while Buffer::count(&bs) > 1 {
let chunk = bs.chunk();
self.buf.extend_from_slice(chunk);
written += chunk.len();
bs.advance(chunk.len());
}
Ok(written)
} else {
let chunk = bs.chunk();
self.buf.extend_from_slice(chunk);
Ok(chunk.len())
}
}

async fn close(&mut self) -> Result<()> {
Ok(())
}

async fn abort(&mut self) -> Result<()> {
Ok(())
}
}

#[tokio::test]
async fn test_inexact_buf_writer_partial_send() -> Result<()> {
let _ = tracing_subscriber::fmt()
.pretty()
.with_test_writer()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init();

let mut w = ChunkedWriter::new(PartialWriter { buf: vec![] }, 10, false);

let mut rng = thread_rng();
let mut expected = vec![];

let mut new_content = |size| {
let mut content = vec![0; size];
rng.fill_bytes(&mut content);
expected.extend_from_slice(&content);
Bytes::from(content)
};

// content < chunk size.
let content = new_content(5);
assert_eq!(5, w.write(content.into()).await?);
// Non-contiguous buffer.
let content = Buffer::from(vec![new_content(3), new_content(2)]);
assert_eq!(5, w.write(content).await?);

w.close().await?;

assert_eq!(w.inner.buf.len(), expected.len());
assert_eq!(
format!("{:x}", Sha256::digest(&w.inner.buf)),
format!("{:x}", Sha256::digest(&expected))
);
Ok(())
}

#[tokio::test]
async fn test_fuzz_exact_buf_writer() -> Result<()> {
let _ = tracing_subscriber::fmt()
Expand All @@ -154,7 +394,7 @@ mod tests {
let mut expected = vec![];

let buffer_size = rng.gen_range(1..10);
let mut writer = ChunkedWriter::new(MockWriter { buf: vec![] }, buffer_size);
let mut writer = ChunkedWriter::new(MockWriter { buf: vec![] }, buffer_size, true);
debug!("test_fuzz_exact_buf_writer: buffer size: {buffer_size}");

for _ in 0..1000 {
Expand Down
Loading
Loading