Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(services/oss): add append support #2279

Merged
merged 6 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions core/src/services/oss/appender.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
use http::StatusCode;

use super::core::*;
use super::error::parse_error;
use crate::ops::OpAppend;
use crate::raw::*;
use crate::*;

pub const X_OSS_NEXT_APPEND_POSITION: &str = "x-oss-next-append-position";

pub struct OssAppender {
core: Arc<OssCore>,

op: OpAppend,
path: String,

position: Option<u64>,
}

impl OssAppender {
pub fn new(core: Arc<OssCore>, path: &str, op: OpAppend) -> Self {
Self {
core,
op,
path: path.to_string(),
position: None,
}
}
}

#[async_trait]
impl oio::Append for OssAppender {
async fn append(&mut self, bs: Bytes) -> Result<()> {
// If the position is not set, we need to get the current position.
if self.position.is_none() {
let resp = self.core.oss_head_object(&self.path, None, None).await?;

let status = resp.status();
match status {
StatusCode::OK => {
let position = resp
.headers()
.get(X_OSS_NEXT_APPEND_POSITION)
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok())
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"missing x-oss-next-append-position, the object may not be appendable",
)
})?;
self.position = Some(position);
}
StatusCode::NOT_FOUND => {
self.position = Some(0);
}
_ => {
return Err(parse_error(resp).await?);
}
}
}

let mut req = self.core.oss_append_object_request(
&self.path,
self.position.expect("position is not set"),
bs.len(),
&self.op,
AsyncBody::Bytes(bs),
)?;

self.core.sign(&mut req).await?;

let resp = self.core.send(req).await?;

let status = resp.status();

match status {
StatusCode::OK => {
let position = resp
.headers()
.get(X_OSS_NEXT_APPEND_POSITION)
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok())
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"missing x-oss-next-append-position, the object may not be appendable",
)
})?;
self.position = Some(position);
Ok(())
}
StatusCode::CONFLICT => {
// The object is not appendable or the position is not match with the object's length.
// If the position is not match, we could get the current position and retry.
let position = resp
.headers()
.get(X_OSS_NEXT_APPEND_POSITION)
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok())
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"missing x-oss-next-append-position, the object may not be appendable",
)
})?;
self.position = Some(position);

// Then return the error to the caller, so the caller could retry.
Err(Error::new(
ErrorKind::ConditionNotMatch,
"the position is not match with the object's length. position has been updated.",
))
}
_ => Err(parse_error(resp).await?),
}
}

async fn close(&mut self) -> Result<()> {
Ok(())
}
}
15 changes: 14 additions & 1 deletion core/src/services/oss/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use reqsign::AliyunConfig;
use reqsign::AliyunLoader;
use reqsign::AliyunOssSigner;

use super::appender::OssAppender;
use super::core::*;
use super::error::parse_error;
use super::pager::OssPager;
Expand Down Expand Up @@ -441,7 +442,7 @@ impl Accessor for OssBackend {
type BlockingReader = ();
type Writer = OssWriter;
type BlockingWriter = ();
type Appender = ();
type Appender = OssAppender;
type Pager = OssPager;
type BlockingPager = ();

Expand Down Expand Up @@ -469,6 +470,11 @@ impl Accessor for OssBackend {
create_dir: true,
copy: true,

append: true,
append_with_cache_control: true,
append_with_content_type: true,
append_with_content_disposition: true,

list: true,
list_with_delimiter_slash: true,
list_without_delimiter: true,
Expand Down Expand Up @@ -533,6 +539,13 @@ impl Accessor for OssBackend {
))
}

async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
Ok((
RpAppend::default(),
OssAppender::new(self.core.clone(), path, args),
))
}

async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
let resp = self.core.oss_copy_object(from, to).await?;
let status = resp.status();
Expand Down
49 changes: 49 additions & 0 deletions core/src/services/oss/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use reqsign::AliyunOssSigner;
use serde::Deserialize;
use serde::Serialize;

use crate::ops::OpAppend;
use crate::ops::OpWrite;
use crate::raw::*;
use crate::*;
Expand Down Expand Up @@ -186,6 +187,54 @@ impl OssCore {
Ok(req)
}

/// Oss append object request
///
/// # Note
///
/// This request is used to append data to an existing object or create an appendable object.
/// So we must set the `append` and `position` header.
///
/// https://www.alibabacloud.com/help/object-storage-service/latest/appendobject
pub fn oss_append_object_request(
&self,
path: &str,
position: u64,
size: usize,
args: &OpAppend,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
let endpoint = self.get_endpoint(false);
let url = format!(
"{}/{}?append&position={}",
endpoint,
percent_encode_path(&p),
position
);

let mut req = Request::post(&url);

req = req.header(CONTENT_LENGTH, size);

if let Some(mime) = args.content_type() {
req = req.header(CONTENT_TYPE, mime);
}

if let Some(pos) = args.content_disposition() {
req = req.header(CONTENT_DISPOSITION, pos);
}

if let Some(cache_control) = args.cache_control() {
req = req.header(CACHE_CONTROL, cache_control)
}

// set sse headers
req = self.insert_sse_headers(req);

let req = req.body(body).map_err(new_request_build_error)?;
Ok(req)
}

pub fn oss_get_object_request(
&self,
path: &str,
Expand Down
1 change: 1 addition & 0 deletions core/src/services/oss/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
mod backend;
pub use backend::OssBuilder as Oss;

mod appender;
mod core;
mod error;
mod pager;
Expand Down
3 changes: 3 additions & 0 deletions core/src/types/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ impl Debug for Capability {
if self.write {
s.push("Write");
}
if self.append {
s.push("Append");
}
if self.create_dir {
s.push("CreateDir");
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/types/operator/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1006,9 +1006,9 @@ impl Operator {
}

let bs = bs.into();
let (_, mut w) = self.inner().append(&path, args).await?;
w.append(bs).await?;
w.close().await?;
let (_, mut a) = self.inner().append(&path, args).await?;
a.append(bs).await?;
a.close().await?;

Ok(())
}
Expand Down
39 changes: 38 additions & 1 deletion core/tests/behavior/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
// under the License.

use anyhow::Result;
use log::warn;
use opendal::ops::OpAppend;
use opendal::EntryMode;
use opendal::ErrorKind;
use opendal::Operator;
use sha2::Digest;
use sha2::Sha256;

use super::utils::*;

Expand Down Expand Up @@ -68,6 +71,7 @@ macro_rules! behavior_append_tests {
test_append_with_content_type,
test_append_with_content_disposition,

test_appender_futures_copy,
test_fuzz_appender,
);
)*
Expand Down Expand Up @@ -101,7 +105,7 @@ pub async fn test_append(op: Operator) -> Result<()> {

/// Test append to a directory path must fail.
pub async fn test_append_with_dir_path(op: Operator) -> Result<()> {
let path = uuid::Uuid::new_v4().to_string();
let path = format!("{}/", uuid::Uuid::new_v4());
let (content, _) = gen_bytes();

let res = op.append(&path, content).await;
Expand Down Expand Up @@ -197,6 +201,39 @@ pub async fn test_append_with_content_disposition(op: Operator) -> Result<()> {
Ok(())
}

/// Copy data from reader to writer
pub async fn test_appender_futures_copy(op: Operator) -> Result<()> {
let path = uuid::Uuid::new_v4().to_string();
let (content, size): (Vec<u8>, usize) =
gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024);

let mut a = match op.appender(&path).await {
Ok(a) => a,
Err(err) if err.kind() == ErrorKind::Unsupported => {
warn!("service doesn't support write with append");
return Ok(());
}
Err(err) => return Err(err.into()),
};

futures::io::copy(&mut content.as_slice(), &mut a).await?;
a.close().await?;

let meta = op.stat(&path).await.expect("stat must succeed");
assert_eq!(meta.content_length(), size as u64);

let bs = op.read(&path).await?;
assert_eq!(bs.len(), size, "read size");
assert_eq!(
format!("{:x}", Sha256::digest(&bs[..size])),
format!("{:x}", Sha256::digest(content)),
"read content"
);

op.delete(&path).await.expect("delete must succeed");
Ok(())
}

/// Test for fuzzing appender.
pub async fn test_fuzz_appender(op: Operator) -> Result<()> {
let path = uuid::Uuid::new_v4().to_string();
Expand Down