diff --git a/core/src/raw/oio/write/range_write.rs b/core/src/raw/oio/write/range_write.rs index 7b72805ffc6..9e76f50db59 100644 --- a/core/src/raw/oio/write/range_write.rs +++ b/core/src/raw/oio/write/range_write.rs @@ -20,7 +20,6 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; -use bytes::Bytes; use futures::Future; use futures::FutureExt; use futures::StreamExt; @@ -93,7 +92,7 @@ pub trait RangeWrite: Send + Sync + Unpin + 'static { /// WritePartResult is the result returned by [`WriteRangeFuture`]. /// /// The error part will carries input `(offset, bytes, err)` so caller can retry them. -type WriteRangeResult = std::result::Result<(), (u64, Bytes, Error)>; +type WriteRangeResult = std::result::Result<(), (u64, Buffer, Error)>; struct WriteRangeFuture(BoxedStaticFuture); @@ -115,16 +114,16 @@ impl Future for WriteRangeFuture { } impl WriteRangeFuture { - pub fn new(w: Arc, location: Arc, offset: u64, bytes: Bytes) -> Self { + pub fn new( + w: Arc, + location: Arc, + offset: u64, + bytes: Buffer, + ) -> Self { let fut = async move { - w.write_range( - &location, - offset, - bytes.len() as u64, - Buffer::from(bytes.clone()), - ) - .await - .map_err(|err| (offset, bytes, err)) + w.write_range(&location, offset, bytes.len() as u64, bytes.clone()) + .await + .map_err(|err| (offset, bytes, err)) }; WriteRangeFuture(Box::pin(fut)) @@ -135,7 +134,7 @@ impl WriteRangeFuture { pub struct RangeWriter { location: Option>, next_offset: u64, - buffer: Option, + buffer: Option, futures: ConcurrentFutures, w: Arc, @@ -154,7 +153,7 @@ impl RangeWriter { } } - fn fill_cache(&mut self, bs: Bytes) -> usize { + fn fill_cache(&mut self, bs: Buffer) -> usize { let size = bs.len(); assert!(self.buffer.is_none()); self.buffer = Some(bs); @@ -169,7 +168,7 @@ impl oio::Write for RangeWriter { None => { // Fill cache with the first write. if self.buffer.is_none() { - let size = self.fill_cache(bs.to_bytes()); + let size = self.fill_cache(bs); return Ok(size); } @@ -192,7 +191,7 @@ impl oio::Write for RangeWriter { cache, )); - let size = self.fill_cache(bs.to_bytes()); + let size = self.fill_cache(bs); return Ok(size); } @@ -211,7 +210,7 @@ impl oio::Write for RangeWriter { async fn close(&mut self) -> Result<()> { let Some(location) = self.location.clone() else { let (size, body) = match self.buffer.clone() { - Some(cache) => (cache.len(), Buffer::from(cache)), + Some(cache) => (cache.len(), cache), None => (0, Buffer::new()), }; // Call write_once if there is no data in buffer and no location. @@ -235,7 +234,7 @@ impl oio::Write for RangeWriter { if let Some(buffer) = self.buffer.clone() { let offset = self.next_offset; self.w - .complete_range(&location, offset, buffer.len() as u64, Buffer::from(buffer)) + .complete_range(&location, offset, buffer.len() as u64, buffer) .await?; self.buffer = None; }