Skip to content

Commit

Permalink
feat: cos list-with-deleted
Browse files Browse the repository at this point in the history
  • Loading branch information
hoslo committed Jan 8, 2025
1 parent 6ca3eab commit 3b184e8
Show file tree
Hide file tree
Showing 5 changed files with 306 additions and 10 deletions.
37 changes: 33 additions & 4 deletions core/src/services/cos/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ use reqsign::TencentCosSigner;
use super::core::*;
use super::delete::CosDeleter;
use super::error::parse_error;
use super::lister::CosLister;
use super::lister::{CosLister, CosListers, CosObjectVersionsLister};
use super::writer::CosWriter;
use super::writer::CosWriters;
use crate::raw::oio::PageLister;
use crate::raw::*;
use crate::services::CosConfig;
use crate::*;
Expand Down Expand Up @@ -123,6 +124,13 @@ impl CosBuilder {
self
}

/// Set bucket versioning status for this backend
pub fn enable_versioning(mut self, enabled: bool) -> Self {
self.config.enable_versioning = enabled;

self
}

/// Disable config load so that opendal will not load config from
/// environment.
///
Expand Down Expand Up @@ -215,6 +223,7 @@ impl Builder for CosBuilder {
bucket: bucket.clone(),
root,
endpoint: format!("{}://{}.{}", &scheme, &bucket, &endpoint),
enable_versioning: self.config.enable_versioning,
signer,
loader: cred_loader,
client,
Expand All @@ -232,7 +241,7 @@ pub struct CosBackend {
impl Access for CosBackend {
type Reader = HttpBody;
type Writer = CosWriters;
type Lister = oio::PageLister<CosLister>;
type Lister = CosListers;
type Deleter = oio::OneShotDeleter<CosDeleter>;
type BlockingReader = ();
type BlockingWriter = ();
Expand All @@ -253,15 +262,18 @@ impl Access for CosBackend {
stat_has_content_type: true,
stat_has_content_encoding: true,
stat_has_content_range: true,
stat_with_version: self.core.enable_versioning,
stat_has_etag: true,
stat_has_content_md5: true,
stat_has_last_modified: true,
stat_has_content_disposition: true,
stat_has_version: true,

read: true,

read_with_if_match: true,
read_with_if_none_match: true,
read_with_version: self.core.enable_versioning,

write: true,
write_can_empty: true,
Expand All @@ -286,10 +298,13 @@ impl Access for CosBackend {
},

delete: true,
delete_with_version: self.core.enable_versioning,
copy: true,

list: true,
list_with_recursive: true,
list_with_versions: self.core.enable_versioning,
list_with_deleted: self.core.enable_versioning,
list_has_content_length: true,

presign: true,
Expand Down Expand Up @@ -357,8 +372,22 @@ impl Access for CosBackend {
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
let l = CosLister::new(self.core.clone(), path, args.recursive(), args.limit());
Ok((RpList::default(), oio::PageLister::new(l)))
let l = if args.versions() || args.deleted() {
TwoWays::Two(PageLister::new(CosObjectVersionsLister::new(
self.core.clone(),
path,
args,
)))
} else {
TwoWays::One(PageLister::new(CosLister::new(
self.core.clone(),
path,
args.recursive(),
args.limit(),
)))
};

Ok((RpList::default(), l))
}

async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
Expand Down
2 changes: 2 additions & 0 deletions core/src/services/cos/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub struct CosConfig {
pub secret_key: Option<String>,
/// Bucket of this backend.
pub bucket: Option<String>,
/// is bucket versioning enabled for this bucket
pub enable_versioning: bool,
/// Disable config load so that opendal will not load config from
pub disable_config_load: bool,
}
Expand Down
133 changes: 129 additions & 4 deletions core/src/services/cos/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::fmt::Debug;
use std::fmt::Formatter;
use std::fmt::Write;
use std::time::Duration;

use bytes::Bytes;
Expand All @@ -37,10 +38,15 @@ use serde::Serialize;
use crate::raw::*;
use crate::*;

pub mod constants {
pub const COS_QUERY_VERSION_ID: &str = "versionId";
}

pub struct CosCore {
pub bucket: String,
pub root: String,
pub endpoint: String,
pub enable_versioning: bool,

pub signer: TencentCosSigner,
pub loader: TencentCosCredentialLoader,
Expand Down Expand Up @@ -125,7 +131,19 @@ impl CosCore {
) -> Result<Request<Buffer>> {
let p = build_abs_path(&self.root, path);

let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));

let mut query_args = Vec::new();
if let Some(version) = args.version() {
query_args.push(format!(
"{}={}",
constants::COS_QUERY_VERSION_ID,
percent_decode_path(version)
))
}
if !query_args.is_empty() {
url.push_str(&format!("?{}", query_args.join("&")));
}

let mut req = Request::get(&url);

Expand Down Expand Up @@ -200,7 +218,19 @@ impl CosCore {
pub fn cos_head_object_request(&self, path: &str, args: &OpStat) -> Result<Request<Buffer>> {
let p = build_abs_path(&self.root, path);

let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));

let mut query_args = Vec::new();
if let Some(version) = args.version() {
query_args.push(format!(
"{}={}",
constants::COS_QUERY_VERSION_ID,
percent_decode_path(version)
))
}
if !query_args.is_empty() {
url.push_str(&format!("?{}", query_args.join("&")));
}

let mut req = Request::head(&url);

Expand All @@ -217,10 +247,22 @@ impl CosCore {
Ok(req)
}

pub async fn cos_delete_object(&self, path: &str) -> Result<Response<Buffer>> {
pub async fn cos_delete_object(&self, path: &str, args: &OpDelete) -> Result<Response<Buffer>> {
let p = build_abs_path(&self.root, path);

let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));

let mut query_args = Vec::new();
if let Some(version) = args.version() {
query_args.push(format!(
"{}={}",
constants::COS_QUERY_VERSION_ID,
percent_decode_path(version)
))
}
if !query_args.is_empty() {
url.push_str(&format!("?{}", query_args.join("&")));
}

let req = Request::delete(&url);

Expand Down Expand Up @@ -434,6 +476,50 @@ impl CosCore {
self.sign(&mut req).await?;
self.send(req).await
}

pub async fn cos_list_object_versions(
&self,
prefix: &str,
delimiter: &str,
limit: Option<usize>,
key_marker: &str,
version_id_marker: &str,
) -> Result<Response<Buffer>> {
let p = build_abs_path(&self.root, prefix);

let mut url = format!("{}?versions", self.endpoint);
if !p.is_empty() {
write!(url, "&prefix={}", percent_encode_path(p.as_str()))
.expect("write into string must succeed");
}
if !delimiter.is_empty() {
write!(url, "&delimiter={}", delimiter).expect("write into string must succeed");
}

if let Some(limit) = limit {
write!(url, "&max-keys={}", limit).expect("write into string must succeed");
}
if !key_marker.is_empty() {
write!(url, "&key-marker={}", percent_encode_path(key_marker))
.expect("write into string must succeed");
}
if !version_id_marker.is_empty() {
write!(
url,
"&version-id-marker={}",
percent_encode_path(version_id_marker)
)
.expect("write into string must succeed");
}

let mut req = Request::get(&url)
.body(Buffer::new())
.map_err(new_request_build_error)?;

self.sign(&mut req).await?;

self.send(req).await
}
}

/// Result of CreateMultipartUpload
Expand Down Expand Up @@ -511,6 +597,45 @@ pub struct ListObjectsOutputContent {
pub size: u64,
}

#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct OutputCommonPrefix {
pub prefix: String,
}

/// Output of ListObjectVersions
#[derive(Default, Debug, Deserialize)]
#[serde(default, rename_all = "PascalCase")]
pub struct ListObjectVersionsOutput {
pub is_truncated: Option<bool>,
pub next_key_marker: Option<String>,
pub next_version_id_marker: Option<String>,
pub common_prefixes: Vec<OutputCommonPrefix>,
pub version: Vec<ListObjectVersionsOutputVersion>,
pub delete_marker: Vec<ListObjectVersionsOutputDeleteMarker>,
}

#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct ListObjectVersionsOutputVersion {
pub key: String,
pub version_id: String,
pub is_latest: bool,
pub size: u64,
pub last_modified: String,
#[serde(rename = "ETag")]
pub etag: Option<String>,
}

#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct ListObjectVersionsOutputDeleteMarker {
pub key: String,
pub version_id: String,
pub is_latest: bool,
pub last_modified: String,
}

#[cfg(test)]
mod tests {
use bytes::Buf;
Expand Down
4 changes: 2 additions & 2 deletions core/src/services/cos/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ impl CosDeleter {
}

impl oio::OneShotDelete for CosDeleter {
async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
let resp = self.core.cos_delete_object(&path).await?;
async fn delete_once(&self, path: String, args: OpDelete) -> Result<()> {
let resp = self.core.cos_delete_object(&path, &args).await?;

let status = resp.status();

Expand Down
Loading

0 comments on commit 3b184e8

Please sign in to comment.