Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Implement RFC-3911 Deleter API #5392

Merged
merged 18 commits into from
Dec 5, 2024
2 changes: 1 addition & 1 deletion core/src/docs/rfcs/3911_deleter_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ And the `delete` API will be changed to return a `oio::Delete` instead:

```diff
trait Accessor {
- async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete>;
- async fn delete(&self) -> Result<(RpDelete, Self::Deleter)>;
+ async fn delete(&self, args: OpDelete) -> Result<(RpDelete, Self::Deleter)>;
}
```
Expand Down
49 changes: 38 additions & 11 deletions core/src/layers/async_backtrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ impl<A: Access> LayeredAccess for AsyncBacktraceAccessor<A> {
type BlockingWriter = AsyncBacktraceWrapper<A::BlockingWriter>;
type Lister = AsyncBacktraceWrapper<A::Lister>;
type BlockingLister = AsyncBacktraceWrapper<A::BlockingLister>;
type Deleter = AsyncBacktraceWrapper<A::Deleter>;
type BlockingDeleter = AsyncBacktraceWrapper<A::BlockingDeleter>;

fn inner(&self) -> &Self::Inner {
&self.inner
Expand Down Expand Up @@ -102,8 +104,11 @@ impl<A: Access> LayeredAccess for AsyncBacktraceAccessor<A> {
}

#[async_backtrace::framed]
async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
self.inner.delete(path, args).await
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
self.inner
.delete()
.await
.map(|(rp, r)| (rp, AsyncBacktraceWrapper::new(r)))
}

#[async_backtrace::framed]
Expand All @@ -114,11 +119,6 @@ impl<A: Access> LayeredAccess for AsyncBacktraceAccessor<A> {
.map(|(rp, r)| (rp, AsyncBacktraceWrapper::new(r)))
}

#[async_backtrace::framed]
async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
self.inner.batch(args).await
}

#[async_backtrace::framed]
async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
self.inner.presign(path, args).await
Expand All @@ -141,6 +141,12 @@ impl<A: Access> LayeredAccess for AsyncBacktraceAccessor<A> {
.blocking_list(path, args)
.map(|(rp, r)| (rp, AsyncBacktraceWrapper::new(r)))
}

fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
self.inner
.blocking_delete()
.map(|(rp, r)| (rp, AsyncBacktraceWrapper::new(r)))
}
}

pub struct AsyncBacktraceWrapper<R> {
Expand Down Expand Up @@ -173,13 +179,13 @@ impl<R: oio::Write> oio::Write for AsyncBacktraceWrapper<R> {
}

#[async_backtrace::framed]
async fn abort(&mut self) -> Result<()> {
self.inner.abort().await
async fn close(&mut self) -> Result<()> {
self.inner.close().await
}

#[async_backtrace::framed]
async fn close(&mut self) -> Result<()> {
self.inner.close().await
async fn abort(&mut self) -> Result<()> {
self.inner.abort().await
}
}

Expand All @@ -205,3 +211,24 @@ impl<R: oio::BlockingList> oio::BlockingList for AsyncBacktraceWrapper<R> {
self.inner.next()
}
}

impl<R: oio::Delete> oio::Delete for AsyncBacktraceWrapper<R> {
fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
self.inner.delete(path, args)
}

#[async_backtrace::framed]
async fn flush(&mut self) -> Result<usize> {
self.inner.flush().await
}
}

impl<R: oio::BlockingDelete> oio::BlockingDelete for AsyncBacktraceWrapper<R> {
fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
self.inner.delete(path, args)
}

fn flush(&mut self) -> Result<usize> {
self.inner.flush()
}
}
43 changes: 34 additions & 9 deletions core/src/layers/await_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ impl<A: Access> LayeredAccess for AwaitTreeAccessor<A> {
type BlockingWriter = AwaitTreeWrapper<A::BlockingWriter>;
type Lister = AwaitTreeWrapper<A::Lister>;
type BlockingLister = AwaitTreeWrapper<A::BlockingLister>;
type Deleter = AwaitTreeWrapper<A::Deleter>;
type BlockingDeleter = AwaitTreeWrapper<A::BlockingDeleter>;

fn inner(&self) -> &Self::Inner {
&self.inner
Expand Down Expand Up @@ -118,11 +120,12 @@ impl<A: Access> LayeredAccess for AwaitTreeAccessor<A> {
.await
}

async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
self.inner
.delete(path, args)
.delete()
.instrument_await(format!("opendal::{}", Operation::Delete))
.await
.map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
Expand All @@ -140,13 +143,6 @@ impl<A: Access> LayeredAccess for AwaitTreeAccessor<A> {
.await
}

async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
self.inner
.batch(args)
.instrument_await(format!("opendal::{}", Operation::Batch))
.await
}

fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
self.inner
.blocking_read(path, args)
Expand All @@ -164,6 +160,12 @@ impl<A: Access> LayeredAccess for AwaitTreeAccessor<A> {
.blocking_list(path, args)
.map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
}

fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
self.inner
.blocking_delete()
.map(|(rp, r)| (rp, AwaitTreeWrapper::new(r)))
}
}

pub struct AwaitTreeWrapper<R> {
Expand Down Expand Up @@ -235,3 +237,26 @@ impl<R: oio::BlockingList> oio::BlockingList for AwaitTreeWrapper<R> {
self.inner.next()
}
}

impl<R: oio::Delete> oio::Delete for AwaitTreeWrapper<R> {
fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
self.inner.delete(path, args)
}

async fn flush(&mut self) -> Result<usize> {
self.inner
.flush()
.instrument_await(format!("opendal::{}", Operation::DeleterFlush))
.await
}
}

impl<R: oio::BlockingDelete> oio::BlockingDelete for AwaitTreeWrapper<R> {
fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
self.inner.delete(path, args)
}

fn flush(&mut self) -> Result<usize> {
self.inner.flush()
}
}
28 changes: 20 additions & 8 deletions core/src/layers/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ impl<A: Access> LayeredAccess for BlockingAccessor<A> {
type BlockingWriter = BlockingWrapper<A::Writer>;
type Lister = A::Lister;
type BlockingLister = BlockingWrapper<A::Lister>;
type Deleter = A::Deleter;
type BlockingDeleter = BlockingWrapper<A::Deleter>;

fn inner(&self) -> &Self::Inner {
&self.inner
Expand Down Expand Up @@ -203,8 +205,8 @@ impl<A: Access> LayeredAccess for BlockingAccessor<A> {
self.inner.stat(path, args).await
}

async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
self.inner.delete(path, args).await
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
self.inner.delete().await
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
Expand All @@ -215,10 +217,6 @@ impl<A: Access> LayeredAccess for BlockingAccessor<A> {
self.inner.presign(path, args).await
}

async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
self.inner.batch(args).await
}

fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
self.handle.block_on(self.inner.create_dir(path, args))
}
Expand Down Expand Up @@ -252,8 +250,12 @@ impl<A: Access> LayeredAccess for BlockingAccessor<A> {
self.handle.block_on(self.inner.stat(path, args))
}

fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
self.handle.block_on(self.inner.delete(path, args))
fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
self.handle.block_on(async {
let (rp, writer) = self.inner.delete().await?;
let blocking_deleter = Self::BlockingDeleter::new(self.handle.clone(), writer);
Ok((rp, blocking_deleter))
})
}

fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
Expand Down Expand Up @@ -298,6 +300,16 @@ impl<I: oio::List> oio::BlockingList for BlockingWrapper<I> {
}
}

impl<I: oio::Delete + 'static> oio::BlockingDelete for BlockingWrapper<I> {
fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
self.inner.delete(path, args)
}

fn flush(&mut self) -> Result<usize> {
self.handle.block_on(self.inner.flush())
}
}

#[cfg(test)]
mod tests {
use once_cell::sync::Lazy;
Expand Down
16 changes: 14 additions & 2 deletions core/src/layers/capability_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,13 @@ impl<A: Access> Debug for CapabilityAccessor<A> {
impl<A: Access> LayeredAccess for CapabilityAccessor<A> {
type Inner = A;
type Reader = A::Reader;
type BlockingReader = A::BlockingReader;
type Writer = A::Writer;
type BlockingWriter = A::BlockingWriter;
type Lister = A::Lister;
type Deleter = A::Deleter;
type BlockingReader = A::BlockingReader;
type BlockingWriter = A::BlockingWriter;
type BlockingLister = A::BlockingLister;
type BlockingDeleter = A::BlockingDeleter;

fn inner(&self) -> &Self::Inner {
&self.inner
Expand Down Expand Up @@ -123,6 +125,10 @@ impl<A: Access> LayeredAccess for CapabilityAccessor<A> {
self.inner.write(path, args).await
}

async fn delete(&self) -> crate::Result<(RpDelete, Self::Deleter)> {
self.inner.delete().await
}

async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, Self::Lister)> {
let capability = self.info.full_capability();
if !capability.list_with_version && args.version() {
Expand Down Expand Up @@ -175,6 +181,10 @@ impl<A: Access> LayeredAccess for CapabilityAccessor<A> {
self.inner.blocking_write(path, args)
}

fn blocking_delete(&self) -> crate::Result<(RpDelete, Self::BlockingDeleter)> {
self.inner.blocking_delete()
}

fn blocking_list(
&self,
path: &str,
Expand Down Expand Up @@ -207,9 +217,11 @@ mod tests {
type Reader = oio::Reader;
type Writer = oio::Writer;
type Lister = oio::Lister;
type Deleter = oio::Deleter;
type BlockingReader = oio::BlockingReader;
type BlockingWriter = oio::BlockingWriter;
type BlockingLister = oio::BlockingLister;
type BlockingDeleter = oio::BlockingDeleter;

fn info(&self) -> Arc<AccessorInfo> {
let mut info = AccessorInfo::default();
Expand Down
10 changes: 10 additions & 0 deletions core/src/layers/chaos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ impl<A: Access> LayeredAccess for ChaosAccessor<A> {
type BlockingWriter = A::BlockingWriter;
type Lister = A::Lister;
type BlockingLister = A::BlockingLister;
type Deleter = A::Deleter;
type BlockingDeleter = A::BlockingDeleter;

fn inner(&self) -> &Self::Inner {
&self.inner
Expand Down Expand Up @@ -138,6 +140,14 @@ impl<A: Access> LayeredAccess for ChaosAccessor<A> {
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
self.inner.blocking_list(path, args)
}

async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
self.inner.delete().await
}

fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
self.inner.blocking_delete()
}
}

/// ChaosReader will inject error into read operations.
Expand Down
14 changes: 14 additions & 0 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
type BlockingWriter = CompleteWriter<A::BlockingWriter>;
type Lister = CompleteLister<A, A::Lister>;
type BlockingLister = CompleteLister<A, A::BlockingLister>;
type Deleter = A::Deleter;
type BlockingDeleter = A::BlockingDeleter;

fn inner(&self) -> &Self::Inner {
&self.inner
Expand Down Expand Up @@ -373,10 +375,18 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
self.complete_stat(path, args).await
}

async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
self.inner().delete().await
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
self.complete_list(path, args).await
}

async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
self.inner.presign(path, args).await
}

fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
self.complete_blocking_create_dir(path, args)
}
Expand All @@ -398,6 +408,10 @@ impl<A: Access> LayeredAccess for CompleteAccessor<A> {
self.complete_blocking_stat(path, args)
}

fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
self.inner().blocking_delete()
}

fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
self.complete_blocking_list(path, args)
}
Expand Down
Loading
Loading