Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Streaming reading while chunk is not set #4658

Merged
merged 65 commits into from
Jun 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
83dd50d
Save current work
Xuanwo May 30, 2024
cdee6e7
Build pass
Xuanwo May 31, 2024
bcc5200
Ready for s3 tests
Xuanwo May 31, 2024
b6d6c20
Fix tests
Xuanwo May 31, 2024
5af2da8
Fix build
Xuanwo May 31, 2024
b2a09bf
Build pass
Xuanwo May 31, 2024
f334eb1
introduce HttpBody
Xuanwo Jun 1, 2024
92bd6d7
Add some comments
Xuanwo Jun 1, 2024
47c17f8
Remove from_stream
Xuanwo Jun 1, 2024
978650d
Address check
Xuanwo Jun 1, 2024
b9a88ee
Fix clippy
Xuanwo Jun 1, 2024
7cc22c6
Fix s3
Xuanwo Jun 1, 2024
3f26833
Fix aliyun drive
Xuanwo Jun 1, 2024
a19a084
Fix alluxio
Xuanwo Jun 1, 2024
ab33002
Fix azblob
Xuanwo Jun 1, 2024
347da0a
Fix azdls
Xuanwo Jun 1, 2024
43f8886
Fix azfile
Xuanwo Jun 1, 2024
1489e3e
Fix B2
Xuanwo Jun 1, 2024
0205fa8
Fix chainsafe
Xuanwo Jun 1, 2024
6c6cb3e
Fix cos
Xuanwo Jun 1, 2024
1d6b234
Fix dbfs
Xuanwo Jun 1, 2024
4e0c38d
Fix dropbox
Xuanwo Jun 1, 2024
869f047
Fix gcs
Xuanwo Jun 1, 2024
9a75a6a
Fix gdrive
Xuanwo Jun 1, 2024
84b8200
Fix ghac
Xuanwo Jun 1, 2024
5a0af34
Fix github
Xuanwo Jun 1, 2024
85aa872
Fix http
Xuanwo Jun 1, 2024
0ba8f5f
Fix hugging face
Xuanwo Jun 1, 2024
4e5cbd1
Fix icloud
Xuanwo Jun 1, 2024
ab6823d
Fix ipfs
Xuanwo Jun 1, 2024
8c0c35a
Fix ipmfs
Xuanwo Jun 1, 2024
b1a48ed
Fix koofr
Xuanwo Jun 1, 2024
c19bbdc
Fix obs
Xuanwo Jun 1, 2024
be7adbf
Fix onedrive
Xuanwo Jun 1, 2024
fdfe4ed
Fix oss
Xuanwo Jun 1, 2024
5566f30
Fix pcloud
Xuanwo Jun 1, 2024
2941c88
Fix seafile
Xuanwo Jun 1, 2024
9a31282
Fix supabase
Xuanwo Jun 1, 2024
8c34479
Fix swift
Xuanwo Jun 1, 2024
0c4a385
Fix upyun
Xuanwo Jun 1, 2024
504e634
Fix vercel blob
Xuanwo Jun 1, 2024
8396a52
Fix webdav
Xuanwo Jun 1, 2024
44d234f
Fix webhdfs
Xuanwo Jun 1, 2024
d7dabf9
Fix yandex
Xuanwo Jun 1, 2024
d00852d
Fix vercel_artifacts
Xuanwo Jun 1, 2024
6aa492b
Fix ftp
Xuanwo Jun 1, 2024
5d3f16a
Refactor blocking
Xuanwo Jun 1, 2024
c5f7f14
Fs build passing
Xuanwo Jun 2, 2024
e5b0495
Fix hdfs
Xuanwo Jun 2, 2024
38446ed
Fix sftp
Xuanwo Jun 2, 2024
59834b2
Build pass!
Xuanwo Jun 2, 2024
c223cb6
Check passed
Xuanwo Jun 2, 2024
0f51d36
Clippy pass
Xuanwo Jun 2, 2024
23d45ed
Bring blocking back
Xuanwo Jun 2, 2024
96d1646
Fix range
Xuanwo Jun 2, 2024
6ff7bfa
Fix hdfs
Xuanwo Jun 2, 2024
2123e94
Make http body happy in wasm32
Xuanwo Jun 2, 2024
4389cfa
Add tests
Xuanwo Jun 2, 2024
164d516
Fix tests
Xuanwo Jun 2, 2024
39b1e25
Fix tests for timeout
Xuanwo Jun 2, 2024
0156fd0
Fix bytes stream
Xuanwo Jun 2, 2024
422b22f
Fix tests for reader
Xuanwo Jun 2, 2024
a566973
Fix retry
Xuanwo Jun 2, 2024
c01c60c
Fix bytes
Xuanwo Jun 2, 2024
ea3301c
Merge branch 'main' into streaming-read
Xuanwo Jun 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bin/ofs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions bindings/c/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ impl opendal_bytes {
}

#[allow(clippy::from_over_into)]
impl Into<bytes::Bytes> for opendal_bytes {
fn into(self) -> bytes::Bytes {
impl Into<Buffer> for opendal_bytes {
fn into(self) -> Buffer {
let slice = unsafe { std::slice::from_raw_parts(self.data, self.len) };
bytes::Bytes::copy_from_slice(slice)
Buffer::from(bytes::Bytes::copy_from_slice(slice))
}
}

Expand Down
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ tests = [
"services-fs",
"services-http",
"services-memory",
"internal-tokio-rt",
"services-s3",
]

Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,8 @@ impl<I> BlockingWrapper<I> {
}

impl<I: oio::Read + 'static> oio::BlockingRead for BlockingWrapper<I> {
fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
self.handle.block_on(self.inner.read_at(offset, size))
fn read(&mut self) -> Result<Buffer> {
self.handle.block_on(self.inner.read())
}
}

Expand Down
8 changes: 4 additions & 4 deletions core/src/layers/chaos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,19 +171,19 @@ impl<R> ChaosReader<R> {
}

impl<R: oio::Read> oio::Read for ChaosReader<R> {
async fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
async fn read(&mut self) -> Result<Buffer> {
if self.i_feel_lucky() {
self.inner.read_at(offset, size).await
self.inner.read().await
} else {
Err(Self::unexpected_eof())
}
}
}

impl<R: oio::BlockingRead> oio::BlockingRead for ChaosReader<R> {
fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
fn read(&mut self) -> Result<Buffer> {
if self.i_feel_lucky() {
self.inner.read_at(offset, size)
self.inner.read()
} else {
Err(Self::unexpected_eof())
}
Expand Down
34 changes: 5 additions & 29 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,39 +587,15 @@ pub type CompleteLister<A, P> =
pub struct CompleteReader<R>(R);

impl<R: oio::Read> oio::Read for CompleteReader<R> {
async fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
if size == 0 {
return Ok(Buffer::new());
}

let buf = self.0.read_at(offset, size).await?;
if buf.len() != size {
return Err(Error::new(
ErrorKind::RangeNotSatisfied,
"service didn't return the expected size",
)
.with_context("expect", size.to_string())
.with_context("actual", buf.len().to_string()));
}
async fn read(&mut self) -> Result<Buffer> {
let buf = self.0.read().await?;
Ok(buf)
}
}

impl<R: oio::BlockingRead> oio::BlockingRead for CompleteReader<R> {
fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
if size == 0 {
return Ok(Buffer::new());
}

let buf = self.0.read_at(offset, size)?;
if buf.len() != size {
return Err(Error::new(
ErrorKind::RangeNotSatisfied,
"service didn't return the expected size",
)
.with_context("expect", size.to_string())
.with_context("actual", buf.len().to_string()));
}
fn read(&mut self) -> Result<Buffer> {
let buf = self.0.read()?;
Ok(buf)
}
}
Expand Down Expand Up @@ -742,7 +718,7 @@ mod tests {
}

async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> {
Ok((RpRead::new(), Arc::new(bytes::Bytes::new())))
Ok((RpRead::new(), Box::new(bytes::Bytes::new())))
}

async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
Expand Down
8 changes: 4 additions & 4 deletions core/src/layers/concurrent_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,14 +250,14 @@ impl<R> ConcurrentLimitWrapper<R> {
}

impl<R: oio::Read> oio::Read for ConcurrentLimitWrapper<R> {
async fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
self.inner.read_at(offset, size).await
async fn read(&mut self) -> Result<Buffer> {
self.inner.read().await
}
}

impl<R: oio::BlockingRead> oio::BlockingRead for ConcurrentLimitWrapper<R> {
fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
self.inner.read_at(offset, size)
fn read(&mut self) -> Result<Buffer> {
self.inner.read()
}
}

Expand Down
8 changes: 4 additions & 4 deletions core/src/layers/dtrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,10 +340,10 @@ impl<R> DtraceLayerWrapper<R> {
}

impl<R: oio::Read> oio::Read for DtraceLayerWrapper<R> {
async fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
async fn read(&mut self) -> Result<Buffer> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, reader_read_start, c_path.as_ptr());
match self.inner.read_at(offset, size).await {
match self.inner.read().await {
Ok(bs) => {
probe_lazy!(opendal, reader_read_ok, c_path.as_ptr(), bs.remaining());
Ok(bs)
Expand All @@ -357,11 +357,11 @@ impl<R: oio::Read> oio::Read for DtraceLayerWrapper<R> {
}

impl<R: oio::BlockingRead> oio::BlockingRead for DtraceLayerWrapper<R> {
fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
fn read(&mut self) -> Result<Buffer> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, blocking_reader_read_start, c_path.as_ptr());
self.inner
.read_at(offset, size)
.read()
.map(|bs| {
probe_lazy!(
opendal,
Expand Down
12 changes: 4 additions & 8 deletions core/src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,25 +339,21 @@ pub struct ErrorContextWrapper<T> {
}

impl<T: oio::Read> oio::Read for ErrorContextWrapper<T> {
async fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
self.inner.read_at(offset, size).await.map_err(|err| {
async fn read(&mut self) -> Result<Buffer> {
self.inner.read().await.map_err(|err| {
err.with_operation(ReadOperation::Read)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("offset", offset.to_string())
.with_context("size", size.to_string())
})
}
}

impl<T: oio::BlockingRead> oio::BlockingRead for ErrorContextWrapper<T> {
fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
self.inner.read_at(offset, size).map_err(|err| {
fn read(&mut self) -> Result<Buffer> {
self.inner.read().map_err(|err| {
err.with_operation(ReadOperation::BlockingRead)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("offset", offset.to_string())
.with_context("size", size.to_string())
})
}
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -978,8 +978,8 @@ impl<R> Drop for LoggingReader<R> {
}

impl<R: oio::Read> oio::Read for LoggingReader<R> {
async fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
match self.inner.read_at(offset, size).await {
async fn read(&mut self) -> Result<Buffer> {
match self.inner.read().await {
Ok(bs) => {
self.read
.fetch_add(bs.remaining() as u64, Ordering::Relaxed);
Expand Down Expand Up @@ -1014,8 +1014,8 @@ impl<R: oio::Read> oio::Read for LoggingReader<R> {
}

impl<R: oio::BlockingRead> oio::BlockingRead for LoggingReader<R> {
fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
match self.inner.read_at(offset, size) {
fn read(&mut self) -> Result<Buffer> {
match self.inner.read() {
Ok(bs) => {
self.read
.fetch_add(bs.remaining() as u64, Ordering::Relaxed);
Expand Down
7 changes: 3 additions & 4 deletions core/src/layers/madsim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,9 @@ pub struct MadsimReader {
}

impl oio::Read for MadsimReader {
async fn read_at(&self, offset: u64, size: usize) -> crate::Result<Buffer> {
if let Some(ref data) = self.data {
let size = min(size, data.len());
Ok(data.clone().split_to(size).into())
async fn read(&mut self) -> crate::Result<Buffer> {
if let Some(data) = self.data.take() {
Ok(Buffer::from(data))
} else {
Ok(Buffer::new())
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/layers/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,10 +747,10 @@ impl<R> MetricWrapper<R> {
}

impl<R: oio::Read> oio::Read for MetricWrapper<R> {
async fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
async fn read(&mut self) -> Result<Buffer> {
let start = Instant::now();

match self.inner.read_at(offset, size).await {
match self.inner.read().await {
Ok(bs) => {
self.bytes_counter.increment(bs.remaining() as u64);
self.requests_duration_seconds
Expand All @@ -766,11 +766,11 @@ impl<R: oio::Read> oio::Read for MetricWrapper<R> {
}

impl<R: oio::BlockingRead> oio::BlockingRead for MetricWrapper<R> {
fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
fn read(&mut self) -> Result<Buffer> {
let start = Instant::now();

self.inner
.read_at(offset, size)
.read()
.map(|bs| {
self.bytes_counter.increment(bs.remaining() as u64);
self.requests_duration_seconds
Expand Down
8 changes: 4 additions & 4 deletions core/src/layers/minitrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,16 +292,16 @@ impl<R> MinitraceWrapper<R> {

impl<R: oio::Read> oio::Read for MinitraceWrapper<R> {
#[trace(enter_on_poll = true)]
async fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
self.inner.read_at(offset, size).await
async fn read(&mut self) -> Result<Buffer> {
self.inner.read().await
}
}

impl<R: oio::BlockingRead> oio::BlockingRead for MinitraceWrapper<R> {
fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
fn read(&mut self) -> Result<Buffer> {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(ReadOperation::BlockingRead.into_static());
self.inner.read_at(offset, size)
self.inner.read()
}
}

Expand Down
8 changes: 4 additions & 4 deletions core/src/layers/oteltrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,14 @@ impl<R> OtelTraceWrapper<R> {
}

impl<R: oio::Read> oio::Read for OtelTraceWrapper<R> {
async fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
self.inner.read_at(offset, size).await
async fn read(&mut self) -> Result<Buffer> {
self.inner.read().await
}
}

impl<R: oio::BlockingRead> oio::BlockingRead for OtelTraceWrapper<R> {
fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
self.inner.read_at(offset, size)
fn read(&mut self) -> Result<Buffer> {
self.inner.read()
}
}

Expand Down
8 changes: 4 additions & 4 deletions core/src/layers/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,13 +680,13 @@ impl<R> PrometheusMetricWrapper<R> {
}

impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> {
async fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
async fn read(&mut self) -> Result<Buffer> {
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
Operation::Read.into_static(),
&self.path,
);
match self.inner.read_at(offset, size).await {
match self.inner.read().await {
Ok(bytes) => {
self.stats
.bytes_total
Expand All @@ -703,14 +703,14 @@ impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> {
}

impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
fn read(&mut self) -> Result<Buffer> {
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
Operation::BlockingRead.into_static(),
&self.path,
);
self.inner
.read_at(offset, size)
.read()
.map(|bs| {
self.stats
.bytes_total
Expand Down
8 changes: 4 additions & 4 deletions core/src/layers/prometheus_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,10 +533,10 @@ impl<R> PrometheusMetricWrapper<R> {
}

impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> {
async fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
async fn read(&mut self) -> Result<Buffer> {
let start = Instant::now();

match self.inner.read_at(offset, size).await {
match self.inner.read().await {
Ok(bs) => {
self.metrics
.observe_bytes_total(self.scheme, self.op, bs.remaining());
Expand All @@ -554,10 +554,10 @@ impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> {
}

impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
fn read(&mut self) -> Result<Buffer> {
let start = Instant::now();
self.inner
.read_at(offset, size)
.read()
.map(|bs| {
self.metrics
.observe_bytes_total(self.scheme, self.op, bs.remaining());
Expand Down
Loading
Loading