Skip to content

Commit

Permalink
feat(services/oss): add append support (#2279)
Browse files Browse the repository at this point in the history
* feat(services/oss): add append support

Signed-off-by: suyanhanx <suyanhanx@gmail.com>

* set capability

Signed-off-by: suyanhanx <suyanhanx@gmail.com>

* fix `you`

Signed-off-by: suyanhanx <suyanhanx@gmail.com>

* add append for oss

Signed-off-by: suyanhanx <suyanhanx@gmail.com>

* fix append

Signed-off-by: suyanhanx <suyanhanx@gmail.com>

* try copy from appender

Signed-off-by: suyanhanx <suyanhanx@gmail.com>

---------

Signed-off-by: suyanhanx <suyanhanx@gmail.com>
  • Loading branch information
suyanhanx authored May 23, 2023
1 parent 65d1a57 commit 01e70a6
Show file tree
Hide file tree
Showing 7 changed files with 251 additions and 5 deletions.
143 changes: 143 additions & 0 deletions core/src/services/oss/appender.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
use http::StatusCode;

use super::core::*;
use super::error::parse_error;
use crate::ops::OpAppend;
use crate::raw::*;
use crate::*;

pub const X_OSS_NEXT_APPEND_POSITION: &str = "x-oss-next-append-position";

pub struct OssAppender {
core: Arc<OssCore>,

op: OpAppend,
path: String,

position: Option<u64>,
}

impl OssAppender {
pub fn new(core: Arc<OssCore>, path: &str, op: OpAppend) -> Self {
Self {
core,
op,
path: path.to_string(),
position: None,
}
}
}

#[async_trait]
impl oio::Append for OssAppender {
async fn append(&mut self, bs: Bytes) -> Result<()> {
// If the position is not set, we need to get the current position.
if self.position.is_none() {
let resp = self.core.oss_head_object(&self.path, None, None).await?;

let status = resp.status();
match status {
StatusCode::OK => {
let position = resp
.headers()
.get(X_OSS_NEXT_APPEND_POSITION)
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok())
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"missing x-oss-next-append-position, the object may not be appendable",
)
})?;
self.position = Some(position);
}
StatusCode::NOT_FOUND => {
self.position = Some(0);
}
_ => {
return Err(parse_error(resp).await?);
}
}
}

let mut req = self.core.oss_append_object_request(
&self.path,
self.position.expect("position is not set"),
bs.len(),
&self.op,
AsyncBody::Bytes(bs),
)?;

self.core.sign(&mut req).await?;

let resp = self.core.send(req).await?;

let status = resp.status();

match status {
StatusCode::OK => {
let position = resp
.headers()
.get(X_OSS_NEXT_APPEND_POSITION)
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok())
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"missing x-oss-next-append-position, the object may not be appendable",
)
})?;
self.position = Some(position);
Ok(())
}
StatusCode::CONFLICT => {
// The object is not appendable or the position is not match with the object's length.
// If the position is not match, we could get the current position and retry.
let position = resp
.headers()
.get(X_OSS_NEXT_APPEND_POSITION)
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok())
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"missing x-oss-next-append-position, the object may not be appendable",
)
})?;
self.position = Some(position);

// Then return the error to the caller, so the caller could retry.
Err(Error::new(
ErrorKind::ConditionNotMatch,
"the position is not match with the object's length. position has been updated.",
))
}
_ => Err(parse_error(resp).await?),
}
}

async fn close(&mut self) -> Result<()> {
Ok(())
}
}
15 changes: 14 additions & 1 deletion core/src/services/oss/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use reqsign::AliyunConfig;
use reqsign::AliyunLoader;
use reqsign::AliyunOssSigner;

use super::appender::OssAppender;
use super::core::*;
use super::error::parse_error;
use super::pager::OssPager;
Expand Down Expand Up @@ -441,7 +442,7 @@ impl Accessor for OssBackend {
type BlockingReader = ();
type Writer = OssWriter;
type BlockingWriter = ();
type Appender = ();
type Appender = OssAppender;
type Pager = OssPager;
type BlockingPager = ();

Expand Down Expand Up @@ -469,6 +470,11 @@ impl Accessor for OssBackend {
create_dir: true,
copy: true,

append: true,
append_with_cache_control: true,
append_with_content_type: true,
append_with_content_disposition: true,

list: true,
list_with_delimiter_slash: true,
list_without_delimiter: true,
Expand Down Expand Up @@ -533,6 +539,13 @@ impl Accessor for OssBackend {
))
}

async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
Ok((
RpAppend::default(),
OssAppender::new(self.core.clone(), path, args),
))
}

async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
let resp = self.core.oss_copy_object(from, to).await?;
let status = resp.status();
Expand Down
49 changes: 49 additions & 0 deletions core/src/services/oss/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use reqsign::AliyunOssSigner;
use serde::Deserialize;
use serde::Serialize;

use crate::ops::OpAppend;
use crate::ops::OpWrite;
use crate::raw::*;
use crate::*;
Expand Down Expand Up @@ -186,6 +187,54 @@ impl OssCore {
Ok(req)
}

/// Oss append object request
///
/// # Note
///
/// This request is used to append data to an existing object or create an appendable object.
/// So we must set the `append` and `position` header.
///
/// https://www.alibabacloud.com/help/object-storage-service/latest/appendobject
pub fn oss_append_object_request(
&self,
path: &str,
position: u64,
size: usize,
args: &OpAppend,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
let endpoint = self.get_endpoint(false);
let url = format!(
"{}/{}?append&position={}",
endpoint,
percent_encode_path(&p),
position
);

let mut req = Request::post(&url);

req = req.header(CONTENT_LENGTH, size);

if let Some(mime) = args.content_type() {
req = req.header(CONTENT_TYPE, mime);
}

if let Some(pos) = args.content_disposition() {
req = req.header(CONTENT_DISPOSITION, pos);
}

if let Some(cache_control) = args.cache_control() {
req = req.header(CACHE_CONTROL, cache_control)
}

// set sse headers
req = self.insert_sse_headers(req);

let req = req.body(body).map_err(new_request_build_error)?;
Ok(req)
}

pub fn oss_get_object_request(
&self,
path: &str,
Expand Down
1 change: 1 addition & 0 deletions core/src/services/oss/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
mod backend;
pub use backend::OssBuilder as Oss;

mod appender;
mod core;
mod error;
mod pager;
Expand Down
3 changes: 3 additions & 0 deletions core/src/types/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ impl Debug for Capability {
if self.write {
s.push("Write");
}
if self.append {
s.push("Append");
}
if self.create_dir {
s.push("CreateDir");
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/types/operator/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1006,9 +1006,9 @@ impl Operator {
}

let bs = bs.into();
let (_, mut w) = self.inner().append(&path, args).await?;
w.append(bs).await?;
w.close().await?;
let (_, mut a) = self.inner().append(&path, args).await?;
a.append(bs).await?;
a.close().await?;

Ok(())
}
Expand Down
39 changes: 38 additions & 1 deletion core/tests/behavior/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
// under the License.

use anyhow::Result;
use log::warn;
use opendal::ops::OpAppend;
use opendal::EntryMode;
use opendal::ErrorKind;
use opendal::Operator;
use sha2::Digest;
use sha2::Sha256;

use super::utils::*;

Expand Down Expand Up @@ -68,6 +71,7 @@ macro_rules! behavior_append_tests {
test_append_with_content_type,
test_append_with_content_disposition,

test_appender_futures_copy,
test_fuzz_appender,
);
)*
Expand Down Expand Up @@ -101,7 +105,7 @@ pub async fn test_append(op: Operator) -> Result<()> {

/// Test append to a directory path must fail.
pub async fn test_append_with_dir_path(op: Operator) -> Result<()> {
let path = uuid::Uuid::new_v4().to_string();
let path = format!("{}/", uuid::Uuid::new_v4());
let (content, _) = gen_bytes();

let res = op.append(&path, content).await;
Expand Down Expand Up @@ -197,6 +201,39 @@ pub async fn test_append_with_content_disposition(op: Operator) -> Result<()> {
Ok(())
}

/// Copy data from reader to writer
pub async fn test_appender_futures_copy(op: Operator) -> Result<()> {
let path = uuid::Uuid::new_v4().to_string();
let (content, size): (Vec<u8>, usize) =
gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024);

let mut a = match op.appender(&path).await {
Ok(a) => a,
Err(err) if err.kind() == ErrorKind::Unsupported => {
warn!("service doesn't support write with append");
return Ok(());
}
Err(err) => return Err(err.into()),
};

futures::io::copy(&mut content.as_slice(), &mut a).await?;
a.close().await?;

let meta = op.stat(&path).await.expect("stat must succeed");
assert_eq!(meta.content_length(), size as u64);

let bs = op.read(&path).await?;
assert_eq!(bs.len(), size, "read size");
assert_eq!(
format!("{:x}", Sha256::digest(&bs[..size])),
format!("{:x}", Sha256::digest(content)),
"read content"
);

op.delete(&path).await.expect("delete must succeed");
Ok(())
}

/// Test for fuzzing appender.
pub async fn test_fuzz_appender(op: Operator) -> Result<()> {
let path = uuid::Uuid::new_v4().to_string();
Expand Down

0 comments on commit 01e70a6

Please sign in to comment.