diff --git a/core/src/services/oss/appender.rs b/core/src/services/oss/appender.rs new file mode 100644 index 00000000000..b327d824931 --- /dev/null +++ b/core/src/services/oss/appender.rs @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use http::StatusCode; + +use super::core::*; +use super::error::parse_error; +use crate::ops::OpAppend; +use crate::raw::*; +use crate::*; + +pub const X_OSS_NEXT_APPEND_POSITION: &str = "x-oss-next-append-position"; + +pub struct OssAppender { + core: Arc, + + op: OpAppend, + path: String, + + position: Option, +} + +impl OssAppender { + pub fn new(core: Arc, path: &str, op: OpAppend) -> Self { + Self { + core, + op, + path: path.to_string(), + position: None, + } + } +} + +#[async_trait] +impl oio::Append for OssAppender { + async fn append(&mut self, bs: Bytes) -> Result<()> { + // If the position is not set, we need to get the current position. + if self.position.is_none() { + let resp = self.core.oss_head_object(&self.path, None, None).await?; + + let status = resp.status(); + match status { + StatusCode::OK => { + let position = resp + .headers() + .get(X_OSS_NEXT_APPEND_POSITION) + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.parse::().ok()) + .ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "missing x-oss-next-append-position, the object may not be appendable", + ) + })?; + self.position = Some(position); + } + StatusCode::NOT_FOUND => { + self.position = Some(0); + } + _ => { + return Err(parse_error(resp).await?); + } + } + } + + let mut req = self.core.oss_append_object_request( + &self.path, + self.position.expect("position is not set"), + bs.len(), + &self.op, + AsyncBody::Bytes(bs), + )?; + + self.core.sign(&mut req).await?; + + let resp = self.core.send(req).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + let position = resp + .headers() + .get(X_OSS_NEXT_APPEND_POSITION) + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.parse::().ok()) + .ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "missing x-oss-next-append-position, the object may not be appendable", + ) + })?; + self.position = Some(position); + Ok(()) + } + StatusCode::CONFLICT => { + // The object is not appendable or the position is not match with the object's length. + // If the position is not match, we could get the current position and retry. + let position = resp + .headers() + .get(X_OSS_NEXT_APPEND_POSITION) + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.parse::().ok()) + .ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "missing x-oss-next-append-position, the object may not be appendable", + ) + })?; + self.position = Some(position); + + // Then return the error to the caller, so the caller could retry. + Err(Error::new( + ErrorKind::ConditionNotMatch, + "the position is not match with the object's length. position has been updated.", + )) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn close(&mut self) -> Result<()> { + Ok(()) + } +} diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index 713236f148e..c436fc98314 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -30,6 +30,7 @@ use reqsign::AliyunConfig; use reqsign::AliyunLoader; use reqsign::AliyunOssSigner; +use super::appender::OssAppender; use super::core::*; use super::error::parse_error; use super::pager::OssPager; @@ -441,7 +442,7 @@ impl Accessor for OssBackend { type BlockingReader = (); type Writer = OssWriter; type BlockingWriter = (); - type Appender = (); + type Appender = OssAppender; type Pager = OssPager; type BlockingPager = (); @@ -469,6 +470,11 @@ impl Accessor for OssBackend { create_dir: true, copy: true, + append: true, + append_with_cache_control: true, + append_with_content_type: true, + append_with_content_disposition: true, + list: true, list_with_delimiter_slash: true, list_without_delimiter: true, @@ -533,6 +539,13 @@ impl Accessor for OssBackend { )) } + async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> { + Ok(( + RpAppend::default(), + OssAppender::new(self.core.clone(), path, args), + )) + } + async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result { let resp = self.core.oss_copy_object(from, to).await?; let status = resp.status(); diff --git a/core/src/services/oss/core.rs b/core/src/services/oss/core.rs index 68297dfcc49..6fd36ce27c7 100644 --- a/core/src/services/oss/core.rs +++ b/core/src/services/oss/core.rs @@ -37,6 +37,7 @@ use reqsign::AliyunOssSigner; use serde::Deserialize; use serde::Serialize; +use crate::ops::OpAppend; use crate::ops::OpWrite; use crate::raw::*; use crate::*; @@ -186,6 +187,54 @@ impl OssCore { Ok(req) } + /// Oss append object request + /// + /// # Note + /// + /// This request is used to append data to an existing object or create an appendable object. + /// So we must set the `append` and `position` header. + /// + /// https://www.alibabacloud.com/help/object-storage-service/latest/appendobject + pub fn oss_append_object_request( + &self, + path: &str, + position: u64, + size: usize, + args: &OpAppend, + body: AsyncBody, + ) -> Result> { + let p = build_abs_path(&self.root, path); + let endpoint = self.get_endpoint(false); + let url = format!( + "{}/{}?append&position={}", + endpoint, + percent_encode_path(&p), + position + ); + + let mut req = Request::post(&url); + + req = req.header(CONTENT_LENGTH, size); + + if let Some(mime) = args.content_type() { + req = req.header(CONTENT_TYPE, mime); + } + + if let Some(pos) = args.content_disposition() { + req = req.header(CONTENT_DISPOSITION, pos); + } + + if let Some(cache_control) = args.cache_control() { + req = req.header(CACHE_CONTROL, cache_control) + } + + // set sse headers + req = self.insert_sse_headers(req); + + let req = req.body(body).map_err(new_request_build_error)?; + Ok(req) + } + pub fn oss_get_object_request( &self, path: &str, diff --git a/core/src/services/oss/mod.rs b/core/src/services/oss/mod.rs index 9bfd1692c7d..21829ea51dd 100644 --- a/core/src/services/oss/mod.rs +++ b/core/src/services/oss/mod.rs @@ -18,6 +18,7 @@ mod backend; pub use backend::OssBuilder as Oss; +mod appender; mod core; mod error; mod pager; diff --git a/core/src/types/capability.rs b/core/src/types/capability.rs index c6b8e981b94..1153a93f832 100644 --- a/core/src/types/capability.rs +++ b/core/src/types/capability.rs @@ -151,6 +151,9 @@ impl Debug for Capability { if self.write { s.push("Write"); } + if self.append { + s.push("Append"); + } if self.create_dir { s.push("CreateDir"); } diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs index e97ed5dc5de..da0cf1c7dc1 100644 --- a/core/src/types/operator/operator.rs +++ b/core/src/types/operator/operator.rs @@ -1006,9 +1006,9 @@ impl Operator { } let bs = bs.into(); - let (_, mut w) = self.inner().append(&path, args).await?; - w.append(bs).await?; - w.close().await?; + let (_, mut a) = self.inner().append(&path, args).await?; + a.append(bs).await?; + a.close().await?; Ok(()) } diff --git a/core/tests/behavior/append.rs b/core/tests/behavior/append.rs index 1c03a3e6a6d..e9e55d2aed8 100644 --- a/core/tests/behavior/append.rs +++ b/core/tests/behavior/append.rs @@ -16,10 +16,13 @@ // under the License. use anyhow::Result; +use log::warn; use opendal::ops::OpAppend; use opendal::EntryMode; use opendal::ErrorKind; use opendal::Operator; +use sha2::Digest; +use sha2::Sha256; use super::utils::*; @@ -68,6 +71,7 @@ macro_rules! behavior_append_tests { test_append_with_content_type, test_append_with_content_disposition, + test_appender_futures_copy, test_fuzz_appender, ); )* @@ -101,7 +105,7 @@ pub async fn test_append(op: Operator) -> Result<()> { /// Test append to a directory path must fail. pub async fn test_append_with_dir_path(op: Operator) -> Result<()> { - let path = uuid::Uuid::new_v4().to_string(); + let path = format!("{}/", uuid::Uuid::new_v4()); let (content, _) = gen_bytes(); let res = op.append(&path, content).await; @@ -197,6 +201,39 @@ pub async fn test_append_with_content_disposition(op: Operator) -> Result<()> { Ok(()) } +/// Copy data from reader to writer +pub async fn test_appender_futures_copy(op: Operator) -> Result<()> { + let path = uuid::Uuid::new_v4().to_string(); + let (content, size): (Vec, usize) = + gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024); + + let mut a = match op.appender(&path).await { + Ok(a) => a, + Err(err) if err.kind() == ErrorKind::Unsupported => { + warn!("service doesn't support write with append"); + return Ok(()); + } + Err(err) => return Err(err.into()), + }; + + futures::io::copy(&mut content.as_slice(), &mut a).await?; + a.close().await?; + + let meta = op.stat(&path).await.expect("stat must succeed"); + assert_eq!(meta.content_length(), size as u64); + + let bs = op.read(&path).await?; + assert_eq!(bs.len(), size, "read size"); + assert_eq!( + format!("{:x}", Sha256::digest(&bs[..size])), + format!("{:x}", Sha256::digest(content)), + "read content" + ); + + op.delete(&path).await.expect("delete must succeed"); + Ok(()) +} + /// Test for fuzzing appender. pub async fn test_fuzz_appender(op: Operator) -> Result<()> { let path = uuid::Uuid::new_v4().to_string();