Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Cleanup of oio::Read, docs, comments, naming #4345

Merged
merged 3 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/layers/chaos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ impl<R> ChaosReader<R> {
}

impl<R: oio::Read> oio::Read for ChaosReader<R> {
async fn read(&mut self, size: usize) -> Result<Bytes> {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
if self.i_feel_lucky() {
self.inner.read(size).await
self.inner.read(limit).await
} else {
Err(Self::unexpected_eof())
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/concurrent_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,8 @@ impl<R: oio::Read> oio::Read for ConcurrentLimitWrapper<R> {
self.inner.seek(pos).await
}

async fn read(&mut self, size: usize) -> Result<Bytes> {
self.inner.read(size).await
async fn read(&mut self, limit: usize) -> Result<Bytes> {
self.inner.read(limit).await
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/dtrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,10 @@ impl<R> DtraceLayerWrapper<R> {
}

impl<R: oio::Read> oio::Read for DtraceLayerWrapper<R> {
async fn read(&mut self, size: usize) -> Result<Bytes> {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, reader_read_start, c_path.as_ptr());
match self.inner.read(size).await {
match self.inner.read(limit).await {
Ok(bs) => {
probe_lazy!(opendal, reader_read_ok, c_path.as_ptr(), bs.len());
Ok(bs)
Expand Down
12 changes: 6 additions & 6 deletions core/src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,17 +349,17 @@ pub struct ErrorContextWrapper<T> {
}

impl<T: oio::Read> oio::Read for ErrorContextWrapper<T> {
async fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
self.inner.seek(pos).await.map_err(|err| {
err.with_operation(ReadOperation::Seek)
async fn read(&mut self, limit: usize) -> Result<Bytes> {
self.inner.read(limit).await.map_err(|err| {
err.with_operation(ReadOperation::Read)
.with_context("service", self.scheme)
.with_context("path", &self.path)
})
}

async fn read(&mut self, size: usize) -> Result<Bytes> {
self.inner.read(size).await.map_err(|err| {
err.with_operation(ReadOperation::Next)
async fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
self.inner.seek(pos).await.map_err(|err| {
err.with_operation(ReadOperation::Seek)
.with_context("service", self.scheme)
.with_context("path", &self.path)
})
Expand Down
36 changes: 18 additions & 18 deletions core/src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -991,27 +991,29 @@ impl<R> Drop for LoggingReader<R> {
}

impl<R: oio::Read> oio::Read for LoggingReader<R> {
async fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
match self.inner.seek(pos).await {
Ok(n) => {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
match self.inner.read(limit).await {
Ok(bs) => {
self.read += bs.len() as u64;
trace!(
target: LOGGING_TARGET,
"service={} operation={} path={} read={} -> seek to {pos:?}, current offset {n}",
"service={} operation={} path={} read={} -> next returns {}B",
self.ctx.scheme,
ReadOperation::Seek,
ReadOperation::Read,
self.path,
self.read,
bs.len()
);
Ok(n)
Ok(bs)
}
Err(err) => {
if let Some(lvl) = self.ctx.error_level(&err) {
log!(
target: LOGGING_TARGET,
lvl,
"service={} operation={} path={} read={} -> seek to {pos:?} failed: {}",
"service={} operation={} path={} read={} -> next failed: {}",
self.ctx.scheme,
ReadOperation::Seek,
ReadOperation::Read,
self.path,
self.read,
self.ctx.error_print(&err),
Expand All @@ -1022,29 +1024,27 @@ impl<R: oio::Read> oio::Read for LoggingReader<R> {
}
}

async fn read(&mut self, size: usize) -> Result<Bytes> {
match self.inner.read(size).await {
Ok(bs) => {
self.read += bs.len() as u64;
async fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
match self.inner.seek(pos).await {
Ok(n) => {
trace!(
target: LOGGING_TARGET,
"service={} operation={} path={} read={} -> next returns {}B",
"service={} operation={} path={} read={} -> seek to {pos:?}, current offset {n}",
self.ctx.scheme,
ReadOperation::Next,
ReadOperation::Seek,
self.path,
self.read,
bs.len()
);
Ok(bs)
Ok(n)
}
Err(err) => {
if let Some(lvl) = self.ctx.error_level(&err) {
log!(
target: LOGGING_TARGET,
lvl,
"service={} operation={} path={} read={} -> next failed: {}",
"service={} operation={} path={} read={} -> seek to {pos:?} failed: {}",
self.ctx.scheme,
ReadOperation::Next,
ReadOperation::Seek,
self.path,
self.read,
self.ctx.error_print(&err),
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,8 +773,8 @@ impl<R> Drop for MetricWrapper<R> {
}

impl<R: oio::Read> oio::Read for MetricWrapper<R> {
async fn read(&mut self, size: usize) -> Result<Bytes> {
match self.inner.read(size).await {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
match self.inner.read(limit).await {
Ok(bytes) => {
self.bytes += bytes.len() as u64;
Ok(bytes)
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/minitrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,8 @@ impl<R> MinitraceWrapper<R> {

impl<R: oio::Read> oio::Read for MinitraceWrapper<R> {
#[trace(enter_on_poll = true)]
async fn read(&mut self, size: usize) -> Result<Bytes> {
self.inner.read(size).await
async fn read(&mut self, limit: usize) -> Result<Bytes> {
self.inner.read(limit).await
}

#[trace(enter_on_poll = true)]
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/oteltrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ impl<R> OtelTraceWrapper<R> {
}

impl<R: oio::Read> oio::Read for OtelTraceWrapper<R> {
async fn read(&mut self, size: usize) -> Result<Bytes> {
self.inner.read(size).await
async fn read(&mut self, limit: usize) -> Result<Bytes> {
self.inner.read(limit).await
}

async fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,13 +686,13 @@ impl<R> PrometheusMetricWrapper<R> {
}

impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> {
async fn read(&mut self, size: usize) -> Result<Bytes> {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
Operation::Read.into_static(),
&self.path,
);
match self.inner.read(size).await {
match self.inner.read(limit).await {
Ok(bytes) => {
self.stats
.bytes_total
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/prometheus_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,8 +539,8 @@ impl<R> PrometheusMetricWrapper<R> {
}

impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> {
async fn read(&mut self, size: usize) -> Result<Bytes> {
match self.inner.read(size).await {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
match self.inner.read(limit).await {
Ok(bytes) => {
self.bytes_total += bytes.len();
Ok(bytes)
Expand Down
6 changes: 3 additions & 3 deletions core/src/layers/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,14 +703,14 @@ impl<R: oio::Read, I: RetryInterceptor> oio::Read for RetryWrapper<R, I> {
res
}

async fn read(&mut self, size: usize) -> Result<Bytes> {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
use backon::RetryableWithContext;

let inner = self.inner.take().expect("inner must be valid");

let (inner, res) = {
|mut r: R| async move {
let res = r.read(size).await;
let res = r.read(limit).await;

(r, res)
}
Expand All @@ -723,7 +723,7 @@ impl<R: oio::Read, I: RetryInterceptor> oio::Read for RetryWrapper<R, I> {
err,
dur,
&[
("operation", ReadOperation::Next.into_static()),
("operation", ReadOperation::Read.into_static()),
("path", &self.path),
],
)
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ impl<R> ThrottleWrapper<R> {
}

impl<R: oio::Read> oio::Read for ThrottleWrapper<R> {
async fn read(&mut self, size: usize) -> Result<Bytes> {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
// TODO: How can we handle buffer reads with a limiter?
self.inner.read(size).await
self.inner.read(limit).await
}

async fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
Expand Down
10 changes: 5 additions & 5 deletions core/src/layers/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,15 +322,15 @@ impl<R> TimeoutWrapper<R> {
}

impl<R: oio::Read> oio::Read for TimeoutWrapper<R> {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
let fut = self.inner.read(limit);
Self::io_timeout(self.timeout, ReadOperation::Read.into_static(), fut).await
}

async fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
let fut = self.inner.seek(pos);
Self::io_timeout(self.timeout, ReadOperation::Seek.into_static(), fut).await
}

async fn read(&mut self, size: usize) -> Result<Bytes> {
let fut = self.inner.read(size);
Self::io_timeout(self.timeout, ReadOperation::Next.into_static(), fut).await
}
}

impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,8 @@ impl<R: oio::Read> oio::Read for TracingWrapper<R> {
parent = &self.span,
level = "trace",
skip_all)]
async fn read(&mut self, size: usize) -> Result<Bytes> {
self.inner.read(size).await
async fn read(&mut self, limit: usize) -> Result<Bytes> {
self.inner.read(limit).await
}

#[tracing::instrument(
Expand Down
24 changes: 12 additions & 12 deletions core/src/raw/enum_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ impl<ONE: oio::Read, TWO: oio::Read> oio::Read for TwoWays<ONE, TWO> {
}
}

async fn read(&mut self, size: usize) -> Result<Bytes> {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
match self {
Self::One(v) => v.read(size).await,
Self::Two(v) => v.read(size).await,
Self::One(v) => v.read(limit).await,
Self::Two(v) => v.read(limit).await,
}
}
}
Expand Down Expand Up @@ -140,11 +140,11 @@ impl<ONE: oio::Read, TWO: oio::Read, THREE: oio::Read> oio::Read for ThreeWays<O
}
}

async fn read(&mut self, size: usize) -> Result<Bytes> {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
match self {
Self::One(v) => v.read(size).await,
Self::Two(v) => v.read(size).await,
Self::Three(v) => v.read(size).await,
Self::One(v) => v.read(limit).await,
Self::Two(v) => v.read(limit).await,
Self::Three(v) => v.read(limit).await,
}
}
}
Expand Down Expand Up @@ -235,12 +235,12 @@ where
}
}

async fn read(&mut self, size: usize) -> Result<Bytes> {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
match self {
Self::One(v) => v.read(size).await,
Self::Two(v) => v.read(size).await,
Self::Three(v) => v.read(size).await,
Self::Four(v) => v.read(size).await,
Self::One(v) => v.read(limit).await,
Self::Two(v) => v.read(limit).await,
Self::Three(v) => v.read(limit).await,
Self::Four(v) => v.read(limit).await,
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/raw/http_util/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl IncomingAsyncBody {
}

impl oio::Read for IncomingAsyncBody {
async fn read(&mut self, size: usize) -> Result<Bytes> {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
if self.size == Some(0) {
return Ok(Bytes::new());
}
Expand All @@ -182,7 +182,7 @@ impl oio::Read for IncomingAsyncBody {
};
}

let size = min(size, self.chunk.len());
let size = min(limit, self.chunk.len());
self.consumed += size as u64;
let bs = self.chunk.split_to(size);
Ok(bs)
Expand Down
4 changes: 2 additions & 2 deletions core/src/raw/oio/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ impl oio::Read for Cursor {
Ok(n)
}

async fn read(&mut self, size: usize) -> Result<Bytes> {
async fn read(&mut self, limit: usize) -> Result<Bytes> {
if self.is_empty() {
Ok(Bytes::new())
} else {
// The clone here is required as we don't want to change it.
let mut bs = self.inner.clone().split_off(self.pos as usize);
let bs = bs.split_to(min(bs.len(), size));
let bs = bs.split_to(min(bs.len(), limit));
self.pos += bs.len() as u64;
Ok(bs)
}
Expand Down
Loading
Loading