Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core): Implement RFC-2299 for list_with #2323

Merged
merged 5 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions bin/oay/src/services/s3/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use axum::response::Response;
use axum::routing::get;
use axum::Router;
use chrono::SecondsFormat;
use opendal::ops::OpList;
use opendal::Metakey;
use opendal::Operator;
use serde::Deserialize;
Expand Down Expand Up @@ -102,10 +101,8 @@ async fn handle_list_objects(

let mut lister = state
.op
.list_with(
&params.prefix,
OpList::new().with_start_after(&params.start_after),
)
.list_with(&params.prefix)
.start_after(&params.start_after)
.await?;

let page = lister.next_page().await?.unwrap_or_default();
Expand Down
65 changes: 28 additions & 37 deletions core/src/types/operator/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1264,7 +1264,7 @@ impl Operator {
/// # }
/// ```
pub async fn list(&self, path: &str) -> Result<Lister> {
self.list_with(path, OpList::new()).await
self.list_with(path).await
}

/// List given path with OpList.
Expand All @@ -1287,8 +1287,7 @@ impl Operator {
/// use opendal::Operator;
/// # #[tokio::main]
/// # async fn test(op: Operator) -> Result<()> {
/// let option = OpList::new().with_limit(10).with_start_after("start");
/// let mut ds = op.list_with("path/to/dir/", option).await?;
/// let mut ds = op.list_with("path/to/dir/").limit(10).start_after("start").await?;
/// while let Some(mut de) = ds.try_next().await? {
/// let meta = op.metadata(&de, Metakey::Mode).await?;
/// match meta.mode() {
Expand Down Expand Up @@ -1319,8 +1318,7 @@ impl Operator {
/// use opendal::Operator;
/// # #[tokio::main]
/// # async fn test(op: Operator) -> Result<()> {
/// let option = OpList::new().with_delimiter("");
/// let mut ds = op.list_with("path/to/dir/", option).await?;
/// let mut ds = op.list_with("path/to/dir/").delimiter("").await?;
/// while let Some(mut de) = ds.try_next().await? {
/// let meta = op.metadata(&de, Metakey::Mode).await?;
/// match meta.mode() {
Expand All @@ -1336,22 +1334,33 @@ impl Operator {
/// # Ok(())
/// # }
/// ```
pub async fn list_with(&self, path: &str, op: OpList) -> Result<Lister> {
pub fn list_with(&self, path: &str) -> FutureList {
let path = normalize_path(path);

if !validate_path(&path, EntryMode::DIR) {
return Err(Error::new(
ErrorKind::NotADirectory,
"the path trying to list should end with `/`",
)
.with_operation("Operator::list")
.with_context("service", self.info().scheme().into_static())
.with_context("path", &path));
}
let fut = FutureList(OperatorFuture::new(
self.inner().clone(),
path,
OpList::default(),
|inner, path, args| {
let fut = async move {
if !validate_path(&path, EntryMode::DIR) {
return Err(Error::new(
ErrorKind::NotADirectory,
"the path trying to list should end with `/`",
)
.with_operation("Operator::list")
.with_context("service", inner.info().scheme().into_static())
.with_context("path", &path));
}

let (_, pager) = self.inner().list(&path, op).await?;
let (_, pager) = inner.list(&path, args).await?;

Ok(Lister::new(pager))
Ok(Lister::new(pager))
};
Box::pin(fut)
},
));
fut
}

/// List dir in flat way.
Expand All @@ -1363,7 +1372,7 @@ impl Operator {
/// # Notes
///
/// - `scan` will not return the prefix itself.
/// - `scan` is an alias of `list_with(OpList::new().with_delimiter(""))`
/// - `scan` is an alias of `list_with(path).delimiter("")`
///
/// # Examples
///
Expand Down Expand Up @@ -1394,27 +1403,9 @@ impl Operator {
/// # }
/// ```
pub async fn scan(&self, path: &str) -> Result<Lister> {
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
let path = normalize_path(path);

if !validate_path(&path, EntryMode::DIR) {
return Err(Error::new(
ErrorKind::NotADirectory,
"the path trying to scan should end with `/`",
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
)
.with_operation("list")
.with_context("service", self.info().scheme().into_static())
.with_context("path", &path));
}

let (_, pager) = self
.inner()
.list(&path, OpList::new().with_delimiter(""))
.await?;

Ok(Lister::new(pager))
self.list_with(path).delimiter("").await
}
}

/// Operator presign API.
impl Operator {
/// Presign an operation for stat(head).
Expand Down
33 changes: 33 additions & 0 deletions core/src/types/operator/operator_futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,3 +509,36 @@ impl Future for FutureWriter {
self.0.poll_unpin(cx)
}
}

/// Future that generated by [`Operator::list_with`].
///
/// Users can add more options by public functions provided by this struct.
pub struct FutureList(pub(crate) OperatorFuture<OpList, Lister>);

impl FutureList {
/// Change the limit of this list operation.
pub fn limit(mut self, v: usize) -> Self {
self.0 = self.0.map_args(|args| args.with_limit(v));
self
}

/// Change the start_after of this list operation.
pub fn start_after(mut self, v: &str) -> Self {
self.0 = self.0.map_args(|args| args.with_start_after(v));
self
}

/// Change the delimiter. The default delimiter is "/"
pub fn delimiter(mut self, v: &str) -> Self {
self.0 = self.0.map_args(|args| args.with_delimiter(v));
self
}
}

impl Future for FutureList {
type Output = Result<Lister>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0.poll_unpin(cx)
}
}
4 changes: 1 addition & 3 deletions core/tests/behavior/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;
use futures::TryStreamExt;
use log::debug;
use opendal::ops::OpList;
use opendal::EntryMode;
use opendal::ErrorKind;
use opendal::Operator;
Expand Down Expand Up @@ -304,8 +303,7 @@ pub async fn test_list_with_start_after(op: Operator) -> Result<()> {
.collect::<Vec<_>>()
.await;

let option = OpList::new().with_start_after(&given[2]);
let mut objects = op.list_with(dir, option).await?;
let mut objects = op.list_with(dir).start_after(&given[2]).await?;
let mut actual = vec![];
while let Some(o) = objects.try_next().await? {
let path = o.path().to_string();
Expand Down