Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core)!: Make oio::Write always write all given buffer #4880

Merged
merged 4 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/layers/async_backtrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for AsyncBacktraceWrapper<R> {

impl<R: oio::Write> oio::Write for AsyncBacktraceWrapper<R> {
#[async_backtrace::framed]
async fn write(&mut self, bs: Buffer) -> Result<usize> {
async fn write(&mut self, bs: Buffer) -> Result<()> {
self.inner.write(bs).await
}

Expand All @@ -185,7 +185,7 @@ impl<R: oio::Write> oio::Write for AsyncBacktraceWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for AsyncBacktraceWrapper<R> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<()> {
self.inner.write(bs)
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/await_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for AwaitTreeWrapper<R> {
}

impl<R: oio::Write> oio::Write for AwaitTreeWrapper<R> {
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + MaybeSend {
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + MaybeSend {
self.inner
.write(bs)
.instrument_await(format!("opendal::{}", WriteOperation::Write.into_static()))
Expand All @@ -211,7 +211,7 @@ impl<R: oio::Write> oio::Write for AwaitTreeWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for AwaitTreeWrapper<R> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<()> {
self.inner.write(bs)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/layers/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ impl<I: oio::Read + 'static> oio::BlockingRead for BlockingWrapper<I> {
}

impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<()> {
self.handle.block_on(self.inner.write(bs))
}

Expand Down
7 changes: 3 additions & 4 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ impl<W> oio::Write for CompleteWriter<W>
where
W: oio::Write,
{
async fn write(&mut self, bs: Buffer) -> Result<usize> {
async fn write(&mut self, bs: Buffer) -> Result<()> {
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;
Expand Down Expand Up @@ -689,13 +689,12 @@ impl<W> oio::BlockingWrite for CompleteWriter<W>
where
W: oio::BlockingWrite,
{
fn write(&mut self, bs: Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<()> {
let w = self.inner.as_mut().ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "writer has been closed or aborted")
})?;
let n = w.write(bs)?;

Ok(n)
w.write(bs)
}

fn close(&mut self) -> Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/concurrent_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for ConcurrentLimitWrapper<R> {
}

impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
async fn write(&mut self, bs: Buffer) -> Result<()> {
self.inner.write(bs).await
}

Expand All @@ -276,7 +276,7 @@ impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<()> {
self.inner.write(bs)
}

Expand Down
14 changes: 6 additions & 8 deletions core/src/layers/dtrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,15 +379,14 @@ impl<R: oio::BlockingRead> oio::BlockingRead for DtraceLayerWrapper<R> {
}

impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
async fn write(&mut self, bs: Buffer) -> Result<()> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, writer_write_start, c_path.as_ptr());
self.inner
.write(bs)
.await
.map(|n| {
probe_lazy!(opendal, writer_write_ok, c_path.as_ptr(), n);
n
.map(|_| {
probe_lazy!(opendal, writer_write_ok, c_path.as_ptr());
})
.map_err(|err| {
probe_lazy!(opendal, writer_write_error, c_path.as_ptr());
Expand Down Expand Up @@ -427,14 +426,13 @@ impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for DtraceLayerWrapper<R> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<()> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, blocking_writer_write_start, c_path.as_ptr());
self.inner
.write(bs)
.map(|n| {
probe_lazy!(opendal, blocking_writer_write_ok, c_path.as_ptr(), n);
n
.map(|_| {
probe_lazy!(opendal, blocking_writer_write_ok, c_path.as_ptr());
})
.map_err(|err| {
probe_lazy!(opendal, blocking_writer_write_error, c_path.as_ptr());
Expand Down
14 changes: 6 additions & 8 deletions core/src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,14 +385,13 @@ impl<T: oio::BlockingRead> oio::BlockingRead for ErrorContextWrapper<T> {
}

impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
async fn write(&mut self, bs: Buffer) -> Result<()> {
let size = bs.len();
self.inner
.write(bs)
.await
.map(|n| {
self.processed += n as u64;
n
.map(|_| {
self.processed += size as u64;
})
.map_err(|err| {
err.with_operation(WriteOperation::Write)
Expand Down Expand Up @@ -423,13 +422,12 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> {
}

impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<()> {
let size = bs.len();
self.inner
.write(bs)
.map(|n| {
self.processed += n as u64;
n
.map(|_| {
self.processed += size as u64;
})
.map_err(|err| {
err.with_operation(WriteOperation::BlockingWrite)
Expand Down
25 changes: 11 additions & 14 deletions core/src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1072,21 +1072,20 @@ impl<W> LoggingWriter<W> {
}

impl<W: oio::Write> oio::Write for LoggingWriter<W> {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
match self.inner.write(bs.clone()).await {
Ok(n) => {
self.written += n as u64;
async fn write(&mut self, bs: Buffer) -> Result<()> {
let size = bs.len();
match self.inner.write(bs).await {
Ok(_) => {
trace!(
target: LOGGING_TARGET,
"service={} operation={} path={} written={}B -> input data {}B, write {}B",
"service={} operation={} path={} written={}B -> data write {}B",
self.ctx.scheme,
WriteOperation::Write,
self.path,
self.written,
bs.len(),
n,
size,
);
Ok(n)
Ok(())
}
Err(err) => {
if let Some(lvl) = self.ctx.error_level(&err) {
Expand Down Expand Up @@ -1170,21 +1169,19 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
}

impl<W: oio::BlockingWrite> oio::BlockingWrite for LoggingWriter<W> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<()> {
match self.inner.write(bs.clone()) {
Ok(n) => {
self.written += n as u64;
Ok(_) => {
trace!(
target: LOGGING_TARGET,
"service={} operation={} path={} written={}B -> input data {}B, write {}B",
"service={} operation={} path={} written={}B -> data write {}B",
self.ctx.scheme,
WriteOperation::BlockingWrite,
self.path,
self.written,
bs.len(),
n
);
Ok(n)
Ok(())
}
Err(err) => {
if let Some(lvl) = self.ctx.error_level(&err) {
Expand Down
17 changes: 9 additions & 8 deletions core/src/layers/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,17 +785,17 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MetricWrapper<R> {
}

impl<R: oio::Write> oio::Write for MetricWrapper<R> {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
async fn write(&mut self, bs: Buffer) -> Result<()> {
let start = Instant::now();
let size = bs.len();

self.inner
.write(bs)
.await
.map(|n| {
self.bytes_counter.increment(n as u64);
.map(|_| {
self.bytes_counter.increment(size as u64);
self.requests_duration_seconds
.record(start.elapsed().as_secs_f64());
n
})
.map_err(|err| {
self.handle.increment_errors_total(self.op, err.kind());
Expand All @@ -819,12 +819,13 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for MetricWrapper<R> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<()> {
let size = bs.len();

self.inner
.write(bs)
.map(|n| {
self.bytes_counter.increment(n as u64);
n
.map(|_| {
self.bytes_counter.increment(size as u64);
})
.map_err(|err| {
self.handle.increment_errors_total(self.op, err.kind());
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/minitrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for MinitraceWrapper<R> {
}

impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + MaybeSend {
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + MaybeSend {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(WriteOperation::Write.into_static());
self.inner.write(bs)
Expand All @@ -326,7 +326,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for MinitraceWrapper<R> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<()> {
let _g = self.span.set_local_parent();
let _span = LocalSpan::enter_with_local_parent(WriteOperation::BlockingWrite.into_static());
self.inner.write(bs)
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/oteltrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ impl<R: oio::BlockingRead> oio::BlockingRead for OtelTraceWrapper<R> {
}

impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<usize>> + MaybeSend {
fn write(&mut self, bs: Buffer) -> impl Future<Output = Result<()>> + MaybeSend {
self.inner.write(bs)
}

Expand All @@ -298,7 +298,7 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for OtelTraceWrapper<R> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<()> {
self.inner.write(bs)
}

Expand Down
20 changes: 12 additions & 8 deletions core/src/layers/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,9 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
}

impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
async fn write(&mut self, bs: Buffer) -> Result<()> {
let size = bs.len();

let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
WriteOperation::Write.into_static(),
Expand All @@ -758,12 +760,12 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
timer.observe_duration();

match res {
Ok(n) => {
Ok(_) => {
self.stats
.bytes_total
.with_label_values(&labels)
.observe(n as f64);
Ok(n)
.observe(size as f64);
Ok(())
}
Err(err) => {
self.stats.increment_errors_total(self.op, err.kind());
Expand Down Expand Up @@ -822,7 +824,9 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
}

impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
fn write(&mut self, bs: Buffer) -> Result<usize> {
fn write(&mut self, bs: Buffer) -> Result<()> {
let size = bs.len();

let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
Operation::BlockingWrite.into_static(),
Expand All @@ -838,12 +842,12 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
timer.observe_duration();

match res {
Ok(n) => {
Ok(_) => {
self.stats
.bytes_total
.with_label_values(&labels)
.observe(n as f64);
Ok(n)
.observe(size as f64);
Ok(())
}
Err(err) => {
self.stats.increment_errors_total(self.op, err.kind());
Expand Down
Loading
Loading