diff --git a/core/benches/oio/write.rs b/core/benches/oio/write.rs index 13f31560d71..d9d8dd8b93e 100644 --- a/core/benches/oio/write.rs +++ b/core/benches/oio/write.rs @@ -44,7 +44,7 @@ pub fn bench_exact_buf_write(c: &mut Criterion) { group.throughput(criterion::Throughput::Bytes(size.bytes() as u64)); group.bench_with_input(size.to_string(), &content, |b, content| { b.to_async(&*TOKIO).iter(|| async { - let mut w = ChunkedWriter::new(BlackHoleWriter, 256 * 1024); + let mut w = ChunkedWriter::new(BlackHoleWriter, 256 * 1024, true); let mut bs = content.clone(); while !bs.is_empty() { diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 97483f10b17..1d38fbda6b5 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -423,28 +423,35 @@ impl LayeredAccess for CompleteAccessor { } // Calculate buffer size. - let chunk_size = args.chunk().map(|mut size| { - if let Some(v) = capability.write_multi_max_size { - size = size.min(v); - } - if let Some(v) = capability.write_multi_min_size { - size = size.max(v); - } - if let Some(v) = capability.write_multi_align_size { - // Make sure size >= size first. - size = size.max(v); - size -= size % v; - } + // If `chunk` is not set, we use `write_multi_min_size` or `write_multi_align_size` + // as the default size. + let chunk_size = args + .chunk() + .or(capability.write_multi_min_size) + .or(capability.write_multi_align_size) + .map(|mut size| { + if let Some(v) = capability.write_multi_max_size { + size = size.min(v); + } + if let Some(v) = capability.write_multi_min_size { + size = size.max(v); + } + if let Some(v) = capability.write_multi_align_size { + // Make sure size >= size first. + size = size.max(v); + size -= size % v; + } - size - }); + size + }); + let exact = args.chunk().is_some() || capability.write_multi_align_size.is_some(); let (rp, w) = self.inner.write(path, args.clone()).await?; let w = CompleteWriter::new(w); let w = match chunk_size { None => TwoWays::One(w), - Some(size) => TwoWays::Two(oio::ChunkedWriter::new(w, size)), + Some(size) => TwoWays::Two(oio::ChunkedWriter::new(w, size, exact)), }; Ok((rp, w)) diff --git a/core/src/raw/oio/write/chunked_write.rs b/core/src/raw/oio/write/chunked_write.rs index 8eadaa3dd26..c98ad41c974 100644 --- a/core/src/raw/oio/write/chunked_write.rs +++ b/core/src/raw/oio/write/chunked_write.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use bytes::Buf; + use crate::raw::*; use crate::*; @@ -22,21 +24,27 @@ use crate::*; /// flush the underlying storage at the `chunk`` size. /// /// ChunkedWriter makes sure that the size of the data written to the -/// underlying storage is exactly `chunk` bytes. +/// underlying storage is +/// - exactly `chunk` bytes if `exact` is true +/// - at least `chunk` bytes if `exact` is false pub struct ChunkedWriter { inner: W, /// The size for buffer, we will flush the underlying storage at the size of this buffer. chunk_size: usize, + /// If `exact` is true, the size of the data written to the underlying storage is + /// exactly `chunk_size` bytes. + exact: bool, buffer: oio::QueueBuf, } impl ChunkedWriter { /// Create a new exact buf writer. - pub fn new(inner: W, chunk_size: usize) -> Self { + pub fn new(inner: W, chunk_size: usize, exact: bool) -> Self { Self { inner, chunk_size, + exact, buffer: oio::QueueBuf::new(), } } @@ -44,14 +52,48 @@ impl ChunkedWriter { impl oio::Write for ChunkedWriter { async fn write(&mut self, mut bs: Buffer) -> Result { - if self.buffer.len() >= self.chunk_size { - let written = self.inner.write(self.buffer.clone().collect()).await?; - self.buffer.advance(written); + if self.exact { + if self.buffer.len() >= self.chunk_size { + let written = self.inner.write(self.buffer.clone().collect()).await?; + self.buffer.advance(written); + } + + let remaining = self.chunk_size - self.buffer.len(); + bs.truncate(remaining); + let n = bs.len(); + self.buffer.push(bs); + return Ok(n); + } + // We are in inexact mode. + + if self.buffer.len() + bs.len() < self.chunk_size { + // We haven't buffered enough data. + let n = bs.len(); + self.buffer.push(bs); + return Ok(n); + } + // We have enough data to send. + + if self.buffer.is_empty() { + // Fast path: Sends the buffer directly if the buffer queue is empty. + return self.inner.write(bs).await; } - let remaining = self.chunk_size - self.buffer.len(); - bs.truncate(remaining); - let n = bs.len(); + // If we always push `bs` to the buffer queue, the buffer queue may grow infinitely if inner + // doesn't fully consume the queue. So we clone the buffer queue and send it with `bs` first. + let mut buffer = self.buffer.clone(); + buffer.push(bs.clone()); + let written = self.inner.write(buffer.collect()).await?; + // The number of bytes in `self.buffer` that already written. + let queue_written = written.min(self.buffer.len()); + self.buffer.advance(queue_written); + // The number of bytes in `bs` that already written. + let bs_written = written - queue_written; + // Skip bytes that already written. + bs.advance(bs_written); + // We already sent `written` bytes so we put more `written` bytes into the buffer queue. + bs.truncate(written); + let n = bs_written + bs.len(); self.buffer.push(bs); Ok(n) } @@ -124,7 +166,7 @@ mod tests { let mut expected = vec![0; 5]; rng.fill_bytes(&mut expected); - let mut w = ChunkedWriter::new(MockWriter { buf: vec![] }, 10); + let mut w = ChunkedWriter::new(MockWriter { buf: vec![] }, 10, true); let mut bs = Bytes::from(expected.clone()); while !bs.is_empty() { @@ -142,6 +184,204 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_inexact_buf_writer_large_write() -> Result<()> { + let _ = tracing_subscriber::fmt() + .pretty() + .with_test_writer() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + let mut w = ChunkedWriter::new(MockWriter { buf: vec![] }, 10, false); + + let mut rng = thread_rng(); + let mut expected = vec![0; 15]; + rng.fill_bytes(&mut expected); + + let bs = Bytes::from(expected.clone()); + // The MockWriter always returns the first chunk size. + let n = w.write(bs.into()).await?; + assert_eq!(expected.len(), n); + + w.close().await?; + + assert_eq!(w.inner.buf.len(), expected.len()); + assert_eq!( + format!("{:x}", Sha256::digest(&w.inner.buf)), + format!("{:x}", Sha256::digest(&expected)) + ); + Ok(()) + } + + #[tokio::test] + async fn test_inexact_buf_writer_combine_small() -> Result<()> { + let _ = tracing_subscriber::fmt() + .pretty() + .with_test_writer() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + let mut w = ChunkedWriter::new(MockWriter { buf: vec![] }, 10, false); + + let mut rng = thread_rng(); + let mut expected = vec![]; + + let mut new_content = |size| { + let mut content = vec![0; size]; + rng.fill_bytes(&mut content); + expected.extend_from_slice(&content); + Bytes::from(content) + }; + + // content > chunk size. + let content = new_content(15); + assert_eq!(15, w.write(content.into()).await?); + // content < chunk size. + let content = new_content(5); + assert_eq!(5, w.write(content.into()).await?); + // content > chunk size, but 5 bytes in queue. + let mut content = new_content(15); + // The MockWriter can only send 5 bytes each time, so we can only advance 5 bytes. + assert_eq!(5, w.write(content.clone().into()).await?); + content.advance(5); + assert_eq!(5, w.write(content.clone().into()).await?); + content.advance(5); + assert_eq!(5, w.write(content.clone().into()).await?); + + w.close().await?; + + assert_eq!(w.inner.buf.len(), expected.len()); + assert_eq!( + format!("{:x}", Sha256::digest(&w.inner.buf)), + format!("{:x}", Sha256::digest(&expected)) + ); + Ok(()) + } + + #[tokio::test] + async fn test_inexact_buf_writer_queue_remaining() -> Result<()> { + let _ = tracing_subscriber::fmt() + .pretty() + .with_test_writer() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + let mut w = ChunkedWriter::new(MockWriter { buf: vec![] }, 10, false); + + let mut rng = thread_rng(); + let mut expected = vec![]; + + let mut new_content = |size| { + let mut content = vec![0; size]; + rng.fill_bytes(&mut content); + expected.extend_from_slice(&content); + Bytes::from(content) + }; + + // content > chunk size. + let content = new_content(15); + assert_eq!(15, w.write(content.into()).await?); + // content < chunk size. + let content = new_content(5); + assert_eq!(5, w.write(content.into()).await?); + // content < chunk size. + let content = new_content(3); + assert_eq!(3, w.write(content.into()).await?); + // content > chunk size, but only sends the first chunk in the queue. + let mut content = new_content(15); + assert_eq!(5, w.write(content.clone().into()).await?); + // queue: 3, 5, bs: 10 + content.advance(5); + assert_eq!(3, w.write(content.clone().into()).await?); + // queue: 5, 3, bs: 7 + content.advance(3); + assert_eq!(5, w.write(content.clone().into()).await?); + // queue: 3, 5, bs: 2 + content.advance(5); + assert_eq!(2, w.write(content.clone().into()).await?); + // queue: 5, 2, bs: empty. + content.advance(2); + assert!(content.is_empty()); + + w.close().await?; + + assert_eq!(w.inner.buf.len(), expected.len()); + assert_eq!( + format!("{:x}", Sha256::digest(&w.inner.buf)), + format!("{:x}", Sha256::digest(&expected)) + ); + Ok(()) + } + + struct PartialWriter { + buf: Vec, + } + + impl Write for PartialWriter { + async fn write(&mut self, mut bs: Buffer) -> Result { + if Buffer::count(&bs) > 1 { + // Always leaves last buffer for non-contiguous buffer. + let mut written = 0; + while Buffer::count(&bs) > 1 { + let chunk = bs.chunk(); + self.buf.extend_from_slice(chunk); + written += chunk.len(); + bs.advance(chunk.len()); + } + Ok(written) + } else { + let chunk = bs.chunk(); + self.buf.extend_from_slice(chunk); + Ok(chunk.len()) + } + } + + async fn close(&mut self) -> Result<()> { + Ok(()) + } + + async fn abort(&mut self) -> Result<()> { + Ok(()) + } + } + + #[tokio::test] + async fn test_inexact_buf_writer_partial_send() -> Result<()> { + let _ = tracing_subscriber::fmt() + .pretty() + .with_test_writer() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); + + let mut w = ChunkedWriter::new(PartialWriter { buf: vec![] }, 10, false); + + let mut rng = thread_rng(); + let mut expected = vec![]; + + let mut new_content = |size| { + let mut content = vec![0; size]; + rng.fill_bytes(&mut content); + expected.extend_from_slice(&content); + Bytes::from(content) + }; + + // content < chunk size. + let content = new_content(5); + assert_eq!(5, w.write(content.into()).await?); + // Non-contiguous buffer. + let content = Buffer::from(vec![new_content(3), new_content(2)]); + assert_eq!(5, w.write(content).await?); + + w.close().await?; + + assert_eq!(w.inner.buf.len(), expected.len()); + assert_eq!( + format!("{:x}", Sha256::digest(&w.inner.buf)), + format!("{:x}", Sha256::digest(&expected)) + ); + Ok(()) + } + #[tokio::test] async fn test_fuzz_exact_buf_writer() -> Result<()> { let _ = tracing_subscriber::fmt() @@ -154,7 +394,7 @@ mod tests { let mut expected = vec![]; let buffer_size = rng.gen_range(1..10); - let mut writer = ChunkedWriter::new(MockWriter { buf: vec![] }, buffer_size); + let mut writer = ChunkedWriter::new(MockWriter { buf: vec![] }, buffer_size, true); debug!("test_fuzz_exact_buf_writer: buffer size: {buffer_size}"); for _ in 0..1000 { diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs index fd42fe49ff6..b06687292c1 100644 --- a/core/src/types/operator/operator.rs +++ b/core/src/types/operator/operator.rs @@ -858,12 +858,14 @@ impl Operator { /// /// ## Chunk /// - /// OpenDAL is designed to write files directly without chunking by default, giving users - /// control over the exact size of their writes and helping avoid unnecessary costs. + /// Some storage services have a minimum chunk size requirement. For example, `s3` could return + /// hard errors like `EntityTooSmall` if the chunk size is too small. Some services like `gcs` + /// also return errors if the chunk size is not aligned. Besides, cloud storage services will cost + /// more money if we write data in small chunks. /// - /// This is not efficient for cases when users write small chunks of data. Some storage services - /// like `s3` could even return hard errors like `EntityTooSmall`. Besides, cloud storage services - /// will cost more money if we write data in small chunks. + /// OpenDAL sets the chunk size automatically based on the [Capability](crate::types::Capability) + /// of the service if users don't set it. Users can set `chunk` to control the exact size to send + /// to the storage service. /// /// Users can use [`Operator::writer_with`] to set a good chunk size might improve the performance, /// @@ -919,12 +921,14 @@ impl Operator { /// /// Set `chunk` for the writer. /// - /// OpenDAL is designed to write files directly without chunking by default, giving users - /// control over the exact size of their writes and helping avoid unnecessary costs. + /// Some storage services have a minimum chunk size requirement. For example, `s3` could return + /// hard errors like `EntityTooSmall` if the chunk size is too small. Some services like `gcs` + /// also return errors if the chunk size is not aligned. Besides, cloud storage services will cost + /// more money if we write data in small chunks. /// - /// This is not efficient for cases when users write small chunks of data. Some storage services - /// like `s3` could even return hard errors like `EntityTooSmall`. Besides, cloud storage services - /// will cost more money if we write data in small chunks. + /// OpenDAL sets the chunk size automatically based on the [Capability](crate::types::Capability) + /// of the service if users don't set it. Users can set `chunk` to control the exact size to send + /// to the storage service. /// /// Set a good chunk size might improve the performance, reduce the API calls and save money. ///