Skip to content

Commit

Permalink
refactor: Use Buffer as cache in MultipartWrite (#4493)
Browse files Browse the repository at this point in the history
Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun authored Apr 16, 2024
1 parent 4486d62 commit a991b95
Showing 1 changed file with 11 additions and 17 deletions.
28 changes: 11 additions & 17 deletions core/src/raw/oio/write/multipart_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::sync::Arc;
use std::task::Context;
use std::task::Poll;

use bytes::Bytes;
use futures::Future;
use futures::FutureExt;
use futures::StreamExt;
Expand Down Expand Up @@ -119,8 +118,8 @@ pub struct MultipartPart {

/// WritePartResult is the result returned by [`WritePartFuture`].
///
/// The error part will carries input `(part_number, bytes, err)` so caller can retry them.
type WritePartResult = std::result::Result<MultipartPart, (usize, Bytes, Error)>;
/// The error part will carries input `(part_number, buffer, err)` so caller can retry them.
type WritePartResult = std::result::Result<MultipartPart, (usize, Buffer, Error)>;

struct WritePartFuture(BoxedStaticFuture<WritePartResult>);

Expand All @@ -146,17 +145,12 @@ impl WritePartFuture {
w: Arc<W>,
upload_id: Arc<String>,
part_number: usize,
bytes: Bytes,
bytes: Buffer,
) -> Self {
let fut = async move {
w.write_part(
&upload_id,
part_number,
bytes.len() as u64,
Buffer::from(bytes.clone()),
)
.await
.map_err(|err| (part_number, bytes, err))
w.write_part(&upload_id, part_number, bytes.len() as u64, bytes.clone())
.await
.map_err(|err| (part_number, bytes, err))
};

WritePartFuture(Box::pin(fut))
Expand All @@ -170,7 +164,7 @@ pub struct MultipartWriter<W: MultipartWrite> {

upload_id: Option<Arc<String>>,
parts: Vec<MultipartPart>,
cache: Option<Bytes>,
cache: Option<Buffer>,
futures: ConcurrentFutures<WritePartFuture>,
next_part_number: usize,
}
Expand All @@ -192,7 +186,7 @@ impl<W: MultipartWrite> MultipartWriter<W> {
}
}

fn fill_cache(&mut self, bs: Bytes) -> usize {
fn fill_cache(&mut self, bs: Buffer) -> usize {
let size = bs.len();
assert!(self.cache.is_none());
self.cache = Some(bs);
Expand All @@ -210,7 +204,7 @@ where
None => {
// Fill cache with the first write.
if self.cache.is_none() {
let size = self.fill_cache(bs.to_bytes());
let size = self.fill_cache(bs);
return Ok(size);
}

Expand All @@ -233,7 +227,7 @@ where
part_number,
cache,
));
let size = self.fill_cache(bs.to_bytes());
let size = self.fill_cache(bs);
return Ok(size);
}

Expand Down Expand Up @@ -262,7 +256,7 @@ where
Some(v) => v,
None => {
let (size, body) = match self.cache.clone() {
Some(cache) => (cache.len(), Buffer::from(cache)),
Some(cache) => (cache.len(), cache),
None => (0, Buffer::new()),
};
// Call write_once if there is no upload_id.
Expand Down

0 comments on commit a991b95

Please sign in to comment.