Skip to content

Commit

Permalink
feat(core): add version(bool) for List operation to include version d…
Browse files Browse the repository at this point in the history
…uring list or not
  • Loading branch information
meteorgan committed Sep 9, 2024
1 parent f6110ac commit 90b3895
Show file tree
Hide file tree
Showing 10 changed files with 433 additions and 17 deletions.
9 changes: 9 additions & 0 deletions core/src/raw/enum_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWays<ONE, TWO> {
}
}

impl<ONE: oio::List, TWO: oio::List> oio::List for TwoWays<ONE, TWO> {
async fn next(&mut self) -> Result<Option<oio::Entry>> {
match self {
Self::One(v) => v.next().await,
Self::Two(v) => v.next().await,
}
}
}

/// ThreeWays is used to implement traits that based on three ways.
///
/// Users can wrap three different trait types together.
Expand Down
14 changes: 11 additions & 3 deletions core/src/raw/oio/list/page_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::VecDeque;
use std::future::Future;

use crate::raw::*;
use crate::*;
use std::collections::VecDeque;
use std::future::Future;

/// PageList is used to implement [`oio::List`] based on API supporting pagination. By implementing
/// PageList, services don't need to care about the details of page list.
Expand All @@ -46,6 +45,8 @@ pub trait PageList: Send + Sync + Unpin + 'static {
///
/// - Set `done` to `true` if all page have been fetched.
/// - Update `token` if there is more page to fetch. `token` is not exposed to users, it's internal used only.
/// - Update `key_marker` and `version_id_marker` if object version is enabled and there are more page to fetch.
/// similar to `token`, they should only be internal used
/// - Push back into the entries for each entry fetched from underlying storage.
///
/// NOTE: `entries` is a `VecDeque` to avoid unnecessary memory allocation. Only `push_back` is allowed.
Expand All @@ -54,6 +55,11 @@ pub struct PageContext {
pub done: bool,
/// token is used by underlying storage services to fetch next page.
pub token: String,
/// key_marker and version_id_marker are used together by underlying storage services to fetch
/// next page when object versioning is enabled
pub key_marker: String,
/// version_id_marker is used with key_marker
pub version_id_marker: String,
/// entries are used to store entries fetched from underlying storage.
///
/// Please always reuse the same `VecDeque` to avoid unnecessary memory allocation.
Expand All @@ -79,6 +85,8 @@ where
ctx: PageContext {
done: false,
token: "".to_string(),
key_marker: "".to_string(),
version_id_marker: "".to_string(),
entries: VecDeque::new(),
},
}
Expand Down
22 changes: 20 additions & 2 deletions core/src/raw/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@
use std::collections::HashMap;
use std::time::Duration;

use flagset::FlagSet;

use crate::raw::*;
use crate::*;
use flagset::FlagSet;

/// Args for `create` operation.
///
Expand Down Expand Up @@ -100,6 +99,13 @@ pub struct OpList {
/// - If this is set to > 1, the list operation will be concurrent,
/// and the maximum number of concurrent operations will be determined by this value.
concurrent: usize,
/// The version is used to control whether the object versions should be returned.
///
/// - If `false`, list operation will not return with object versions
/// - If `true`, list operation will return with object versions if the underlying service is supported
///
/// Default to `false`
version: bool,
}

impl Default for OpList {
Expand All @@ -111,6 +117,7 @@ impl Default for OpList {
// By default, we want to know what's the mode of this entry.
metakey: Metakey::Mode.into(),
concurrent: 1,
version: false,
}
}
}
Expand Down Expand Up @@ -184,6 +191,17 @@ impl OpList {
pub fn concurrent(&self) -> usize {
self.concurrent
}

/// Change the version of this list operation
pub fn with_version(mut self, version: bool) -> Self {
self.version = version;
self
}

/// Get the version of this list operation
pub fn version(&self) -> bool {
self.version
}
}

/// Args for `presign` operation.
Expand Down
40 changes: 30 additions & 10 deletions core/src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ use reqwest::Url;
use super::core::*;
use super::error::parse_error;
use super::error::parse_s3_error_code;
use super::lister::S3Lister;
use super::lister::{S3Lister, S3Listers, S3ObjectVersionsLister};
use super::writer::S3Writer;
use super::writer::S3Writers;
use crate::raw::oio::PageLister;
use crate::raw::*;
use crate::services::S3Config;
use crate::*;
Expand Down Expand Up @@ -116,6 +117,13 @@ impl S3Builder {
self
}

/// Set bucket versioning status of this backend
pub fn bucket_versioning_enabled(mut self, enabled: bool) -> Self {
self.config.bucket_versioning_enabled = enabled;

self
}

/// Set endpoint of this backend.
///
/// Endpoint must be full uri, e.g.
Expand Down Expand Up @@ -856,6 +864,7 @@ impl Builder for S3Builder {
default_storage_class,
allow_anonymous: self.config.allow_anonymous,
disable_stat_with_override: self.config.disable_stat_with_override,
bucket_versioning_enabled: self.config.bucket_versioning_enabled,
signer,
loader,
credential_loaded: AtomicBool::new(false),
Expand All @@ -876,7 +885,7 @@ pub struct S3Backend {
impl Access for S3Backend {
type Reader = HttpBody;
type Writer = S3Writers;
type Lister = oio::PageLister<S3Lister>;
type Lister = S3Listers;
type BlockingReader = ();
type BlockingWriter = ();
type BlockingLister = ();
Expand Down Expand Up @@ -929,6 +938,7 @@ impl Access for S3Backend {
list_with_limit: true,
list_with_start_after: true,
list_with_recursive: true,
list_with_version: self.core.bucket_versioning_enabled,

presign: true,
presign_stat: true,
Expand Down Expand Up @@ -1027,14 +1037,24 @@ impl Access for S3Backend {
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
let l = S3Lister::new(
self.core.clone(),
path,
args.recursive(),
args.limit(),
args.start_after(),
);
Ok((RpList::default(), oio::PageLister::new(l)))
let l = if self.core.bucket_versioning_enabled && args.version() {
TwoWays::Two(PageLister::new(S3ObjectVersionsLister::new(
self.core.clone(),
path,
args.recursive(),
args.limit(),
)))
} else {
TwoWays::One(PageLister::new(S3Lister::new(
self.core.clone(),
path,
args.recursive(),
args.limit(),
args.start_after(),
)))
};

Ok((RpList::default(), l))
}

async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
Expand Down
2 changes: 2 additions & 0 deletions core/src/services/s3/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub struct S3Config {
///
/// required.
pub bucket: String,
/// is bucket versioning enabled for this bucket
pub bucket_versioning_enabled: bool,
/// endpoint of this backend.
///
/// Endpoint must be full uri, e.g.
Expand Down
157 changes: 157 additions & 0 deletions core/src/services/s3/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ pub struct S3Core {
pub default_storage_class: Option<HeaderValue>,
pub allow_anonymous: bool,
pub disable_stat_with_override: bool,
pub bucket_versioning_enabled: bool,

pub signer: AwsV4Signer,
pub loader: Box<dyn AwsCredentialLoad>,
Expand Down Expand Up @@ -788,6 +789,49 @@ impl S3Core {

self.send(req).await
}

pub async fn s3_list_object_versions(
&self,
prefix: &str,
delimiter: &str,
limit: Option<usize>,
key_marker: &str,
version_id_marker: &str,
) -> Result<Response<Buffer>> {
let p = build_abs_path(&self.root, prefix);

let mut url = format!("{}?versions", self.endpoint);
if !p.is_empty() {
write!(url, "&prefix={}", percent_encode_path(p.as_str()))
.expect("write into string must succeed");
}
if !delimiter.is_empty() {
write!(url, "&delimiter={}", delimiter).expect("write into string must succeed");
}

if let Some(limit) = limit {
write!(url, "&max-keys={}", limit).expect("write into string must succeed");
}
if !key_marker.is_empty() {
write!(url, "&key-marker={}", key_marker).expect("write into string must succeed");
}
if !version_id_marker.is_empty() {
write!(
url,
"&version-id-marker={}",
percent_encode_path(&version_id_marker)
)
.expect("write into string must succeed");
}

let mut req = Request::get(&url)
.body(Buffer::new())
.map_err(new_request_build_error)?;

self.sign(&mut req).await?;

self.send(req).await
}
}

/// Result of CreateMultipartUpload
Expand Down Expand Up @@ -912,6 +956,29 @@ pub struct OutputCommonPrefix {
pub prefix: String,
}

/// Output of ListObjectVersions
#[derive(Default, Debug, Deserialize)]
#[serde(default, rename_all = "PascalCase")]
pub struct ListObjectVersionsOutput {
pub is_truncated: Option<bool>,
pub next_key_marker: Option<String>,
pub next_version_id_marker: Option<String>,
pub common_prefixes: Vec<OutputCommonPrefix>,
pub version: Vec<ListObjectVersionsOutputVersion>,
}

#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct ListObjectVersionsOutputVersion {
pub key: String,
pub version_id: String,
pub is_latest: bool,
pub size: u64,
pub last_modified: String,
#[serde(rename = "ETag")]
pub etag: Option<String>,
}

pub enum ChecksumAlgorithm {
Crc32c,
}
Expand Down Expand Up @@ -1141,4 +1208,94 @@ mod tests {
]
)
}

#[test]
fn test_parse_list_object_versions() {
let bs = bytes::Bytes::from(
r#"<?xml version="1.0" encoding="UTF-8"?>
<ListVersionsResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Name>mtp-versioning-fresh</Name>
<Prefix/>
<KeyMarker>key3</KeyMarker>
<VersionIdMarker>null</VersionIdMarker>
<NextKeyMarker>key3</NextKeyMarker>
<NextVersionIdMarker>d-d309mfjFrUmoQ0DBsVqmcMV15OI.</NextVersionIdMarker>
<MaxKeys>3</MaxKeys>
<IsTruncated>true</IsTruncated>
<Version>
<Key>key3</Key>
<VersionId>8XECiENpj8pydEDJdd-_VRrvaGKAHOaGMNW7tg6UViI.</VersionId>
<IsLatest>true</IsLatest>
<LastModified>2009-12-09T00:18:23.000Z</LastModified>
<ETag>"396fefef536d5ce46c7537ecf978a360"</ETag>
<Size>217</Size>
<Owner>
<ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
</Owner>
<StorageClass>STANDARD</StorageClass>
</Version>
<Version>
<Key>key3</Key>
<VersionId>d-d309mfjFri40QYukDozqBt3UmoQ0DBsVqmcMV15OI.</VersionId>
<IsLatest>false</IsLatest>
<LastModified>2009-12-09T00:18:08.000Z</LastModified>
<ETag>"396fefef536d5ce46c7537ecf978a360"</ETag>
<Size>217</Size>
<Owner>
<ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
</Owner>
<StorageClass>STANDARD</StorageClass>
</Version>
<CommonPrefixes>
<Prefix>photos/</Prefix>
</CommonPrefixes>
<CommonPrefixes>
<Prefix>videos/</Prefix>
</CommonPrefixes>
</ListVersionsResult>"#,
);

let output: ListObjectVersionsOutput =
quick_xml::de::from_reader(bs.reader()).expect("must succeed");

assert!(output.is_truncated.unwrap());
assert_eq!(output.next_key_marker, Some("key3".to_owned()));
assert_eq!(
output.next_version_id_marker,
Some("d-d309mfjFrUmoQ0DBsVqmcMV15OI.".to_owned())
);
assert_eq!(
output.common_prefixes,
vec![
OutputCommonPrefix {
prefix: "photos/".to_owned()
},
OutputCommonPrefix {
prefix: "videos/".to_owned()
}
]
);

assert_eq!(
output.version,
vec![
ListObjectVersionsOutputVersion {
key: "key3".to_owned(),
version_id: "8XECiENpj8pydEDJdd-_VRrvaGKAHOaGMNW7tg6UViI.".to_owned(),
is_latest: true,
size: 217,
last_modified: "2009-12-09T00:18:23.000Z".to_owned(),
etag: Some("\"396fefef536d5ce46c7537ecf978a360\"".to_owned()),
},
ListObjectVersionsOutputVersion {
key: "key3".to_owned(),
version_id: "d-d309mfjFri40QYukDozqBt3UmoQ0DBsVqmcMV15OI.".to_owned(),
is_latest: false,
size: 217,
last_modified: "2009-12-09T00:18:08.000Z".to_owned(),
etag: Some("\"396fefef536d5ce46c7537ecf978a360\"".to_owned()),
}
]
);
}
}
Loading

0 comments on commit 90b3895

Please sign in to comment.