Skip to content

Commit

Permalink
feat(services/oss): add append support
Browse files Browse the repository at this point in the history
Signed-off-by: suyanhanx <suyanhanx@gmail.com>
  • Loading branch information
suyanhanx committed May 22, 2023
1 parent e2f3a2a commit ac351e1
Show file tree
Hide file tree
Showing 7 changed files with 258 additions and 1 deletion.
144 changes: 144 additions & 0 deletions core/src/services/oss/appender.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
use http::StatusCode;

use super::core::*;
use super::error::parse_error;
use crate::ops::OpAppend;
use crate::raw::*;
use crate::*;

pub const X_OSS_NEXT_APPEND_POSITION: &str = "x-oss-next-append-position";

pub struct OssAppender {
core: Arc<OssCore>,

op: OpAppend,
path: String,

position: Option<u64>,
}

impl OssAppender {
#[allow(dead_code)]
pub fn new(core: Arc<OssCore>, path: &str, op: OpAppend) -> Self {
Self {
core,
op,
path: path.to_string(),
position: None,
}
}
}

#[async_trait]
impl oio::Append for OssAppender {
async fn append(&mut self, bs: Bytes) -> Result<()> {
// If the position is not set, we need to get the current position.
if self.position.is_none() {
let resp = self.core.oss_head_object(&self.path, None, None).await?;

let status = resp.status();
match status {
StatusCode::OK => {
let position = resp
.headers()
.get(X_OSS_NEXT_APPEND_POSITION)
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok())
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"missing x-oss-next-append-position, the object may not be appendable",
)
})?;
self.position = Some(position);
}
StatusCode::NOT_FOUND => {
self.position = Some(0);
}
_ => {
return Err(parse_error(resp).await?);
}
}
}

let mut req = self.core.oss_append_object_request(
&self.path,
self.position.expect("position is not set"),
bs.len(),
&self.op,
AsyncBody::Bytes(bs),
)?;

self.core.sign(&mut req).await?;

let resp = self.core.send(req).await?;

let status = resp.status();

match status {
StatusCode::OK => {
let position = resp
.headers()
.get(X_OSS_NEXT_APPEND_POSITION)
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok())
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"missing x-oss-next-append-position, the object may not be appendable",
)
})?;
self.position = Some(position);
Ok(())
}
StatusCode::CONFLICT => {
// The object is not appendable or the position is not match with the object's length.
// If the position is not match, we could get the current position and retry.
let position = resp
.headers()
.get(X_OSS_NEXT_APPEND_POSITION)
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok())
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"missing x-oss-next-append-position, the object may not be appendable",
)
})?;
self.position = Some(position);

// Then return the error to the caller, so the caller could retry.
Err(Error::new(
ErrorKind::ConditionNotMatch,
"the position is not match with the object's length. position has been updated.",
))
}
_ => Err(parse_error(resp).await?),
}
}

async fn close(&mut self) -> Result<()> {
Ok(())
}
}
2 changes: 2 additions & 0 deletions core/src/services/oss/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,8 @@ impl Accessor for OssBackend {
create_dir: true,
copy: true,

append: true,

list: true,
list_with_delimiter_slash: true,
list_without_delimiter: true,
Expand Down
48 changes: 48 additions & 0 deletions core/src/services/oss/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use reqsign::AliyunOssSigner;
use serde::Deserialize;
use serde::Serialize;

use crate::ops::OpAppend;
use crate::ops::OpWrite;
use crate::raw::*;
use crate::*;
Expand Down Expand Up @@ -186,6 +187,53 @@ impl OssCore {
Ok(req)
}

/// Oss append object request
///
/// # Note
///
/// This request is used to append data to an existing object or create an appendable object.
/// So you must set the `append` and `position` header.
///
/// https://www.alibabacloud.com/help/object-storage-service/latest/appendobject
pub fn oss_append_object_request(
&self,
path: &str,
position: u64,
size: usize,
args: &OpAppend,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);
let endpoint = self.get_endpoint(false);
let url = format!("{}/{}", endpoint, percent_encode_path(&p));

let mut req = Request::post(&url);

// The header `append` does not need a value.
req = req.header(HeaderName::from_static("append"), "");
req = req.header(HeaderName::from_static("position"), position);

req = req.header(CONTENT_LENGTH, size);

if let Some(mime) = args.content_type() {
req = req.header(CONTENT_TYPE, mime);
}

if let Some(pos) = args.content_disposition() {
req = req.header(CONTENT_DISPOSITION, pos);
}

if let Some(cache_control) = args.cache_control() {
req = req.header(CACHE_CONTROL, cache_control)
}

// set sse headers
req = self.insert_sse_headers(req);

let req = req.body(body).map_err(new_request_build_error)?;
Ok(req)
}

pub fn oss_get_object_request(
&self,
path: &str,
Expand Down
1 change: 1 addition & 0 deletions core/src/services/oss/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
mod backend;
pub use backend::OssBuilder as Oss;

mod appender;
mod core;
mod error;
mod pager;
Expand Down
3 changes: 3 additions & 0 deletions core/src/types/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ impl Debug for Capability {
if self.write {
s.push("Write");
}
if self.append {
s.push("Append");
}
if self.create_dir {
s.push("CreateDir");
}
Expand Down
5 changes: 5 additions & 0 deletions core/src/types/operator/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ impl OperatorInfo {
self.0.capability().write
}

/// Check if current backend supports [`Accessor::append`] or not.
pub fn can_append(&self) -> bool {
self.0.capability().append
}

/// Check if current backend supports [`Accessor::copy`] or not.
pub fn can_copy(&self) -> bool {
self.0.capability().copy
Expand Down
56 changes: 55 additions & 1 deletion core/src/types/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,13 +399,67 @@ impl OpWrite {

/// Args for `append` operation.
#[derive(Debug, Clone, Default)]
pub struct OpAppend {}
pub struct OpAppend {
content_length: Option<u64>,
content_type: Option<String>,
content_disposition: Option<String>,
cache_control: Option<String>,
}

impl OpAppend {
/// Create a new `OpAppend`.
pub fn new() -> Self {
Self::default()
}

/// Get the content length from op.
///
/// The content length is the total length of the data to be written.
pub fn content_length(&self) -> Option<u64> {
self.content_length
}

/// Set the content length of op.
///
/// If the content length is not set, the content length will be
/// calculated automatically by buffering part of data.
pub fn with_content_length(mut self, content_length: u64) -> Self {
self.content_length = Some(content_length);
self
}

/// Get the content type from option
pub fn content_type(&self) -> Option<&str> {
self.content_type.as_deref()
}

/// Set the content type of option
pub fn with_content_type(mut self, content_type: &str) -> Self {
self.content_type = Some(content_type.to_string());
self
}

/// Get the content disposition from option
pub fn content_disposition(&self) -> Option<&str> {
self.content_disposition.as_deref()
}

/// Set the content disposition of option
pub fn with_content_disposition(mut self, content_disposition: &str) -> Self {
self.content_disposition = Some(content_disposition.to_string());
self
}

/// Get the cache control from option
pub fn cache_control(&self) -> Option<&str> {
self.cache_control.as_deref()
}

/// Set the content type of option
pub fn with_cache_control(mut self, cache_control: &str) -> Self {
self.cache_control = Some(cache_control.to_string());
self
}
}

/// Args for `copy` operation.
Expand Down

0 comments on commit ac351e1

Please sign in to comment.