Skip to content

Commit

Permalink
feat(services/azblob): support sink (#2574)
Browse files Browse the repository at this point in the history
* feat(services/azblob): support sink

Signed-off-by: suyanhanx <suyanhanx@gmail.com>

* polish

Signed-off-by: suyanhanx <suyanhanx@gmail.com>

---------

Signed-off-by: suyanhanx <suyanhanx@gmail.com>
  • Loading branch information
suyanhanx committed Jun 29, 2023
1 parent f887b67 commit 1deb087
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 12 deletions.
1 change: 1 addition & 0 deletions core/src/services/azblob/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ impl Accessor for AzblobBackend {
read_with_override_content_disposition: true,

write: true,
write_can_sink: true,
write_with_cache_control: true,
write_with_content_type: true,

Expand Down
2 changes: 1 addition & 1 deletion core/src/services/azblob/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ impl AzblobCore {
pub fn azblob_put_blob_request(
&self,
path: &str,
size: Option<usize>,
size: Option<u64>,
content_type: Option<&str>,
cache_control: Option<&str>,
body: AsyncBody,
Expand Down
24 changes: 13 additions & 11 deletions core/src/services/azblob/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,14 @@ impl AzblobWriter {
pub fn new(core: Arc<AzblobCore>, op: OpWrite, path: String) -> Self {
AzblobWriter { core, op, path }
}
}

#[async_trait]
impl oio::Write for AzblobWriter {
async fn write(&mut self, bs: Bytes) -> Result<()> {
async fn write_oneshot(&self, size: u64, body: AsyncBody) -> Result<()> {
let mut req = self.core.azblob_put_blob_request(
&self.path,
Some(bs.len()),
Some(size),
self.op.content_type(),
self.op.cache_control(),
AsyncBody::Bytes(bs),
body,
)?;

self.core.sign(&mut req).await?;
Expand All @@ -64,12 +61,17 @@ impl oio::Write for AzblobWriter {
_ => Err(parse_error(resp).await?),
}
}
}

#[async_trait]
impl oio::Write for AzblobWriter {
async fn write(&mut self, bs: Bytes) -> Result<()> {
self.write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs))
.await
}

async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
Err(Error::new(
ErrorKind::Unsupported,
"Write::sink is not supported",
))
async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
self.write_oneshot(size, AsyncBody::Stream(s)).await
}

async fn abort(&mut self) -> Result<()> {
Expand Down

0 comments on commit 1deb087

Please sign in to comment.