From 489c0d90502eadb2d5d4c14bbe0c22b4cb84d403 Mon Sep 17 00:00:00 2001 From: j178 <10510431+j178@users.noreply.github.com> Date: Wed, 24 May 2023 21:28:14 +0800 Subject: [PATCH 1/3] refactor(core): Implement RFC-2299 for read_with --- core/src/types/operator/operator.rs | 128 +++++++++----------- core/src/types/operator/operator_futures.rs | 49 ++++++++ 2 files changed, 103 insertions(+), 74 deletions(-) diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs index 3d9a78478ef..1db1d1e6185 100644 --- a/core/src/types/operator/operator.rs +++ b/core/src/types/operator/operator.rs @@ -452,12 +452,62 @@ impl Operator { /// # use futures::TryStreamExt; /// # #[tokio::main] /// # async fn test(op: Operator) -> Result<()> { - /// let bs = op.read_with("path/to/file", OpRead::new()).await?; + /// let bs = op.read_with("path/to/file").await?; /// # Ok(()) /// # } /// ``` - pub async fn read_with(&self, path: &str, args: OpRead) -> Result> { - self.range_read_with(path, .., args).await + pub fn read_with(&self, path: &str) -> FutureRead { + let path = normalize_path(path); + + let fut = FutureRead(OperatorFuture::new( + self.inner().clone(), + path, + OpRead::default(), + |inner, path, args| { + let fut = async move { + if !validate_path(&path, EntryMode::FILE) { + return Err(Error::new( + ErrorKind::IsADirectory, + "read path is a directory", + ) + .with_operation("range_read") + .with_context("service", inner.info().scheme()) + .with_context("path", &path)); + } + + let br = args.range(); + let (rp, mut s) = inner.read(&path, args).await?; + + let length = rp.into_metadata().content_length() as usize; + let mut buffer = Vec::with_capacity(length); + + let dst = buffer.spare_capacity_mut(); + let mut buf = ReadBuf::uninit(dst); + + // Safety: the input buffer is created with_capacity(length). + unsafe { buf.assume_init(length) }; + + // TODO: use native read api + s.read_exact(buf.initialized_mut()).await.map_err(|err| { + Error::new(ErrorKind::Unexpected, "read from storage") + .with_operation("range_read") + .with_context("service", inner.info().scheme().into_static()) + .with_context("path", &path) + .with_context("range", br.to_string()) + .set_source(err) + })?; + + // Safety: read_exact makes sure this buffer has been filled. + unsafe { buffer.set_len(length) } + + Ok(buffer) + }; + + Box::pin(fut) + }, + )); + + fut } /// Read the specified range of path into a bytes. @@ -483,77 +533,7 @@ impl Operator { /// # } /// ``` pub async fn range_read(&self, path: &str, range: impl RangeBounds) -> Result> { - self.range_read_with(path, range, OpRead::new()).await - } - - /// Read the specified range of path into a bytes with extra options.. - /// - /// This function will allocate a new bytes internally. For more precise memory control or - /// reading data lazily, please use [`Operator::range_reader`] - /// - /// # Notes - /// - /// - The returning content's length may be smaller than the range specified. - /// - /// # Examples - /// - /// ``` - /// # use std::io::Result; - /// # use opendal::Operator; - /// # use opendal::ops::OpRead; - /// # use futures::TryStreamExt; - /// # #[tokio::main] - /// # async fn test(op: Operator) -> Result<()> { - /// let bs = op - /// .range_read_with("path/to/file", 1024..2048, OpRead::new()) - /// .await?; - /// # Ok(()) - /// # } - /// ``` - pub async fn range_read_with( - &self, - path: &str, - range: impl RangeBounds, - args: OpRead, - ) -> Result> { - let path = normalize_path(path); - - if !validate_path(&path, EntryMode::FILE) { - return Err( - Error::new(ErrorKind::IsADirectory, "read path is a directory") - .with_operation("range_read") - .with_context("service", self.inner().info().scheme()) - .with_context("path", &path), - ); - } - - let br = BytesRange::from(range); - - let (rp, mut s) = self.inner().read(&path, args.with_range(br)).await?; - - let length = rp.into_metadata().content_length() as usize; - let mut buffer = Vec::with_capacity(length); - - let dst = buffer.spare_capacity_mut(); - let mut buf = ReadBuf::uninit(dst); - - // Safety: the input buffer is created with_capacity(length). - unsafe { buf.assume_init(length) }; - - // TODO: use native read api - s.read_exact(buf.initialized_mut()).await.map_err(|err| { - Error::new(ErrorKind::Unexpected, "read from storage") - .with_operation("range_read") - .with_context("service", self.inner().info().scheme().into_static()) - .with_context("path", &path) - .with_context("range", br.to_string()) - .set_source(err) - })?; - - // Safety: read_exact makes sure this buffer has been filled. - unsafe { buffer.set_len(length) } - - Ok(buffer) + self.read_with(path).range(range.into()).await } /// Create a new reader which can read the whole path. diff --git a/core/src/types/operator/operator_futures.rs b/core/src/types/operator/operator_futures.rs index b794e7dfde5..deb877a7b8a 100644 --- a/core/src/types/operator/operator_futures.rs +++ b/core/src/types/operator/operator_futures.rs @@ -172,3 +172,52 @@ impl Future for FutureAppender { self.0.poll_unpin(cx) } } + +/// Future that generated by [`Operator::read_with`]. +/// +/// Users can add more options by public functions provided by this struct. +pub struct FutureRead(pub(crate) OperatorFuture>); + +impl FutureRead { + /// Set the range header for this operation. + pub fn range(mut self, range: BytesRange) -> Self { + self.0 = self.0.map_args(|args| args.with_range(range)); + self + } + + /// Sets the content-disposition header that should be send back by the remote read operation. + pub fn override_content_disposition(mut self, content_disposition: &str) -> Self { + self.0 = self + .0 + .map_args(|args| args.with_override_content_disposition(content_disposition)); + self + } + + /// Sets the cache-control header that should be send back by the remote read operation. + pub fn override_cache_control(mut self, cache_control: &str) -> Self { + self.0 = self + .0 + .map_args(|args| args.with_override_cache_control(cache_control)); + self + } + + /// Set the If-Match for this operation. + pub fn if_match(mut self, v: &str) -> Self { + self.0 = self.0.map_args(|args| args.with_if_match(v)); + self + } + + /// Set the If-None-Match for this operation. + pub fn if_none_match(mut self, v: &str) -> Self { + self.0 = self.0.map_args(|args| args.with_if_none_match(v)); + self + } +} + +impl Future for FutureRead { + type Output = Result>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.0.poll_unpin(cx) + } +} From c7fc22347455b0e843cf455c9d4348fe63c69663 Mon Sep 17 00:00:00 2001 From: j178 <10510431+j178@users.noreply.github.com> Date: Wed, 24 May 2023 21:53:39 +0800 Subject: [PATCH 2/3] fix tests --- core/src/services/http/backend.rs | 4 +--- core/tests/behavior/read_only.rs | 26 +++++++++----------------- core/tests/behavior/write.rs | 25 +++++++++---------------- 3 files changed, 19 insertions(+), 36 deletions(-) diff --git a/core/src/services/http/backend.rs b/core/src/services/http/backend.rs index e719c42b4b4..d3705f29798 100644 --- a/core/src/services/http/backend.rs +++ b/core/src/services/http/backend.rs @@ -534,9 +534,7 @@ mod tests { builder.root("/"); let op = Operator::new(builder)?.finish(); - let match_bs = op - .read_with("hello", OpRead::new().with_if_none_match("*")) - .await?; + let match_bs = op.read_with("hello").if_none_match("*").await?; assert_eq!(match_bs, b"Hello, World!"); Ok(()) diff --git a/core/tests/behavior/read_only.rs b/core/tests/behavior/read_only.rs index da99322c846..659c72107ad 100644 --- a/core/tests/behavior/read_only.rs +++ b/core/tests/behavior/read_only.rs @@ -17,7 +17,6 @@ use anyhow::Result; use futures::AsyncReadExt; -use opendal::ops::OpRead; use opendal::EntryMode; use opendal::ErrorKind; use opendal::Operator; @@ -310,18 +309,13 @@ pub async fn test_read_with_if_match(op: Operator) -> Result<()> { let meta = op.stat(path).await?; - let mut op_read = OpRead::default(); - op_read = op_read.with_if_match("invalid_etag"); - - let res = op.read_with(path, op_read).await; + let res = op.read_with(path).if_match("invalid_etag").await; assert!(res.is_err()); assert_eq!(res.unwrap_err().kind(), ErrorKind::ConditionNotMatch); - let mut op_read = OpRead::default(); - op_read = op_read.with_if_match(meta.etag().expect("etag must exist")); - let bs = op - .read_with(path, op_read) + .read_with(path) + .if_match(meta.etag().expect("etag must exist")) .await .expect("read must succeed"); assert_eq!(bs.len(), 262144, "read size"); @@ -344,18 +338,16 @@ pub async fn test_read_with_if_none_match(op: Operator) -> Result<()> { let meta = op.stat(path).await?; - let mut op_read = OpRead::default(); - op_read = op_read.with_if_none_match(meta.etag().expect("etag must exist")); - - let res = op.read_with(path, op_read).await; + let res = op + .read_with(path) + .if_none_match(meta.etag().expect("etag must exist")) + .await; assert!(res.is_err()); assert_eq!(res.unwrap_err().kind(), ErrorKind::ConditionNotMatch); - let mut op_read = OpRead::default(); - op_read = op_read.with_if_none_match("invalid_etag"); - let bs = op - .read_with(path, op_read) + .read_with(path) + .if_none_match("invalid_etag") .await .expect("read must succeed"); assert_eq!(bs.len(), 262144, "read size"); diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs index 74b2366cfa2..6fddd303483 100644 --- a/core/tests/behavior/write.rs +++ b/core/tests/behavior/write.rs @@ -646,18 +646,13 @@ pub async fn test_read_with_if_match(op: Operator) -> Result<()> { let meta = op.stat(&path).await?; - let mut op_read = OpRead::default(); - op_read = op_read.with_if_match("\"invalid_etag\""); - - let res = op.read_with(&path, op_read).await; + let res = op.read_with(&path).if_match("\"invalid_etag\"").await; assert!(res.is_err()); assert_eq!(res.unwrap_err().kind(), ErrorKind::ConditionNotMatch); - let mut op_read = OpRead::default(); - op_read = op_read.with_if_match(meta.etag().expect("etag must exist")); - let bs = op - .read_with(&path, op_read) + .read_with(&path) + .if_match(meta.etag().expect("etag must exist")) .await .expect("read must succeed"); assert_eq!(bs, content); @@ -682,18 +677,16 @@ pub async fn test_read_with_if_none_match(op: Operator) -> Result<()> { let meta = op.stat(&path).await?; - let mut op_read = OpRead::default(); - op_read = op_read.with_if_none_match(meta.etag().expect("etag must exist")); - - let res = op.read_with(&path, op_read).await; + let res = op + .read_with(&path) + .if_none_match(meta.etag().expect("etag must exist")) + .await; assert!(res.is_err()); assert_eq!(res.unwrap_err().kind(), ErrorKind::ConditionNotMatch); - let mut op_read = OpRead::default(); - op_read = op_read.with_if_none_match("\"invalid_etag\""); - let bs = op - .read_with(&path, op_read) + .read_with(&path) + .if_none_match("\"invalid_etag\"") .await .expect("read must succeed"); assert_eq!(bs, content); From d602f9dd5b553cc1707989d16b692ef4748c7e33 Mon Sep 17 00:00:00 2001 From: j178 <10510431+j178@users.noreply.github.com> Date: Wed, 24 May 2023 21:57:36 +0800 Subject: [PATCH 3/3] apply suggestion --- core/src/types/operator/operator.rs | 2 +- core/src/types/operator/operator_futures.rs | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs index 1db1d1e6185..cc7a3265702 100644 --- a/core/src/types/operator/operator.rs +++ b/core/src/types/operator/operator.rs @@ -533,7 +533,7 @@ impl Operator { /// # } /// ``` pub async fn range_read(&self, path: &str, range: impl RangeBounds) -> Result> { - self.read_with(path).range(range.into()).await + self.read_with(path).range(range).await } /// Create a new reader which can read the whole path. diff --git a/core/src/types/operator/operator_futures.rs b/core/src/types/operator/operator_futures.rs index deb877a7b8a..80fbf98b98b 100644 --- a/core/src/types/operator/operator_futures.rs +++ b/core/src/types/operator/operator_futures.rs @@ -20,6 +20,7 @@ //! By using futures, users can add more options for operation. use std::mem; +use std::ops::RangeBounds; use std::pin::Pin; use std::task::{Context, Poll}; @@ -180,8 +181,8 @@ pub struct FutureRead(pub(crate) OperatorFuture>); impl FutureRead { /// Set the range header for this operation. - pub fn range(mut self, range: BytesRange) -> Self { - self.0 = self.0.map_args(|args| args.with_range(range)); + pub fn range(mut self, range: impl RangeBounds) -> Self { + self.0 = self.0.map_args(|args| args.with_range(range.into())); self }