From fa8dad7f555321c30dfc4e4658845e0d5323d16a Mon Sep 17 00:00:00 2001 From: suyanhanx Date: Fri, 30 Jun 2023 16:52:49 +0800 Subject: [PATCH] feat(services/oss): support sink Signed-off-by: suyanhanx --- core/src/services/oss/backend.rs | 1 + core/src/services/oss/core.rs | 4 ++-- core/src/services/oss/writer.rs | 17 ++++++++--------- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index e2210b07524..e3bb47456e3 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -408,6 +408,7 @@ impl Accessor for OssBackend { read_with_if_none_match: true, write: true, + write_can_sink: true, write_with_cache_control: true, write_with_content_type: true, write_without_content_length: true, diff --git a/core/src/services/oss/core.rs b/core/src/services/oss/core.rs index 3ccafbcc043..1f4deea85a6 100644 --- a/core/src/services/oss/core.rs +++ b/core/src/services/oss/core.rs @@ -152,7 +152,7 @@ impl OssCore { pub fn oss_put_object_request( &self, path: &str, - size: Option, + size: Option, content_type: Option<&str>, content_disposition: Option<&str>, cache_control: Option<&str>, @@ -376,7 +376,7 @@ impl OssCore { pub async fn oss_put_object( &self, path: &str, - size: Option, + size: Option, content_type: Option<&str>, content_disposition: Option<&str>, cache_control: Option<&str>, diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs index f7cc7b7e878..d81fbebf13a 100644 --- a/core/src/services/oss/writer.rs +++ b/core/src/services/oss/writer.rs @@ -54,14 +54,14 @@ impl OssWriter { } } - async fn write_oneshot(&self, bs: Bytes) -> Result<()> { + async fn write_oneshot(&self, size: u64, body: AsyncBody) -> Result<()> { let mut req = self.core.oss_put_object_request( &self.path, - Some(bs.len()), + Some(size), self.op.content_type(), self.op.content_disposition(), self.op.cache_control(), - AsyncBody::Bytes(bs), + body, false, )?; @@ -136,7 +136,9 @@ impl oio::Write for OssWriter { Some(upload_id) => upload_id, None => { if self.op.content_length().unwrap_or_default() == bs.len() as u64 { - return self.write_oneshot(bs).await; + return self + .write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs)) + .await; } else { let upload_id = self.initiate_upload().await?; self.upload_id = Some(upload_id); @@ -174,11 +176,8 @@ impl oio::Write for OssWriter { } } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { - Err(Error::new( - ErrorKind::Unsupported, - "Write::sink is not supported", - )) + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + self.write_oneshot(size, AsyncBody::Stream(s)).await } // TODO: we can cancel the upload by sending an abort request.