Skip to content

Commit

Permalink
fix(layers/prometheus-client): remove duplicated `increment_request_t…
Browse files Browse the repository at this point in the history
…otal` of write operation (#5023)

* fix(layers/prometheus-client): remove duplicated increment_request_total of write opeartion

* unify format

* update doc example
  • Loading branch information
koushiro authored Aug 20, 2024
1 parent d001321 commit a7e1d00
Showing 1 changed file with 34 additions and 25 deletions.
59 changes: 34 additions & 25 deletions core/src/layers/prometheus_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use crate::*;
///
/// let op = Operator::new(builder)
/// .expect("must init")
/// .layer(PrometheusClientLayer::with_registry(&mut registry))
/// .layer(PrometheusClientLayer::new(&mut registry))
/// .finish();
/// debug!("operator: {op:?}");
///
Expand Down Expand Up @@ -320,6 +320,7 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> {
Operation::CreateDir.into_static(),
start_time.elapsed(),
);

create_res.map_err(|e| {
self.metrics
.increment_errors_total(Operation::CreateDir.into_static(), e.kind());
Expand All @@ -328,16 +329,17 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> {
}

async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
let start = Instant::now();
self.metrics
.increment_request_total(self.scheme, Operation::Read.into_static());

let start = Instant::now();
let res = self.inner.read(path, args).await;

self.metrics.observe_request_duration(
self.scheme,
Operation::Read.into_static(),
start.elapsed(),
);
self.metrics
.increment_request_total(self.scheme, Operation::Read.into_static());

match res {
Ok((rp, r)) => Ok((
Expand All @@ -353,19 +355,17 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> {
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let start = Instant::now();

self.metrics
.increment_request_total(self.scheme, Operation::Write.into_static());

let start = Instant::now();
let res = self.inner.write(path, args).await;

self.metrics.observe_request_duration(
self.scheme,
Operation::Write.into_static(),
start.elapsed(),
);
self.metrics
.increment_request_total(self.scheme, Operation::Write.into_static());

match res {
Ok((rp, w)) => Ok((
Expand All @@ -383,8 +383,8 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> {
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
self.metrics
.increment_request_total(self.scheme, Operation::Stat.into_static());
let start_time = Instant::now();

let start_time = Instant::now();
let stat_res = self
.inner
.stat(path, args)
Expand All @@ -399,6 +399,7 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> {
Operation::Stat.into_static(),
start_time.elapsed(),
);

stat_res.map_err(|e| {
self.metrics
.increment_errors_total(Operation::Stat.into_static(), e.kind());
Expand All @@ -409,15 +410,16 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> {
async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
self.metrics
.increment_request_total(self.scheme, Operation::Delete.into_static());
let start_time = Instant::now();

let start_time = Instant::now();
let delete_res = self.inner.delete(path, args).await;

self.metrics.observe_request_duration(
self.scheme,
Operation::Delete.into_static(),
start_time.elapsed(),
);

delete_res.map_err(|e| {
self.metrics
.increment_errors_total(Operation::Delete.into_static(), e.kind());
Expand All @@ -428,15 +430,16 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> {
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
self.metrics
.increment_request_total(self.scheme, Operation::List.into_static());
let start_time = Instant::now();

let start_time = Instant::now();
let list_res = self.inner.list(path, args).await;

self.metrics.observe_request_duration(
self.scheme,
Operation::List.into_static(),
start_time.elapsed(),
);

list_res.map_err(|e| {
self.metrics
.increment_errors_total(Operation::List.into_static(), e.kind());
Expand All @@ -447,15 +450,16 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> {
async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
self.metrics
.increment_request_total(self.scheme, Operation::Batch.into_static());
let start_time = Instant::now();

let start_time = Instant::now();
let result = self.inner.batch(args).await;

self.metrics.observe_request_duration(
self.scheme,
Operation::Batch.into_static(),
start_time.elapsed(),
);

result.map_err(|e| {
self.metrics
.increment_errors_total(Operation::Batch.into_static(), e.kind());
Expand All @@ -466,15 +470,16 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> {
async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
self.metrics
.increment_request_total(self.scheme, Operation::Presign.into_static());
let start_time = Instant::now();

let start_time = Instant::now();
let result = self.inner.presign(path, args).await;

self.metrics.observe_request_duration(
self.scheme,
Operation::Presign.into_static(),
start_time.elapsed(),
);

result.map_err(|e| {
self.metrics
.increment_errors_total(Operation::Presign.into_static(), e.kind());
Expand All @@ -485,15 +490,16 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> {
fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
self.metrics
.increment_request_total(self.scheme, Operation::BlockingCreateDir.into_static());
let start_time = Instant::now();

let start_time = Instant::now();
let result = self.inner.blocking_create_dir(path, args);

self.metrics.observe_request_duration(
self.scheme,
Operation::BlockingCreateDir.into_static(),
start_time.elapsed(),
);

result.map_err(|e| {
self.metrics
.increment_errors_total(Operation::BlockingCreateDir.into_static(), e.kind());
Expand Down Expand Up @@ -540,9 +546,10 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> {
fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
self.metrics
.increment_request_total(self.scheme, Operation::BlockingStat.into_static());
let start_time = Instant::now();

let start_time = Instant::now();
let result = self.inner.blocking_stat(path, args);

self.metrics.observe_request_duration(
self.scheme,
Operation::BlockingStat.into_static(),
Expand All @@ -559,15 +566,16 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> {
fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
self.metrics
.increment_request_total(self.scheme, Operation::BlockingDelete.into_static());
let start_time = Instant::now();

let start_time = Instant::now();
let result = self.inner.blocking_delete(path, args);

self.metrics.observe_request_duration(
self.scheme,
Operation::BlockingDelete.into_static(),
start_time.elapsed(),
);

result.map_err(|e| {
self.metrics
.increment_errors_total(Operation::BlockingDelete.into_static(), e.kind());
Expand All @@ -578,15 +586,16 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> {
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
self.metrics
.increment_request_total(self.scheme, Operation::BlockingList.into_static());
let start_time = Instant::now();

let start_time = Instant::now();
let result = self.inner.blocking_list(path, args);

self.metrics.observe_request_duration(
self.scheme,
Operation::BlockingList.into_static(),
start_time.elapsed(),
);

result.map_err(|e| {
self.metrics
.increment_errors_total(Operation::BlockingList.into_static(), e.kind());
Expand Down Expand Up @@ -692,42 +701,42 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
})
}

async fn abort(&mut self) -> Result<()> {
async fn close(&mut self) -> Result<()> {
let start = Instant::now();

self.inner
.abort()
.close()
.await
.map(|_| {
self.metrics.observe_request_duration(
self.scheme,
Operation::WriterAbort.into_static(),
Operation::WriterClose.into_static(),
start.elapsed(),
);
})
.map_err(|err| {
self.metrics
.increment_errors_total(Operation::WriterAbort.into_static(), err.kind());
.increment_errors_total(Operation::WriterClose.into_static(), err.kind());
err
})
}

async fn close(&mut self) -> Result<()> {
async fn abort(&mut self) -> Result<()> {
let start = Instant::now();

self.inner
.close()
.abort()
.await
.map(|_| {
self.metrics.observe_request_duration(
self.scheme,
Operation::WriterClose.into_static(),
Operation::WriterAbort.into_static(),
start.elapsed(),
);
})
.map_err(|err| {
self.metrics
.increment_errors_total(Operation::WriterClose.into_static(), err.kind());
.increment_errors_total(Operation::WriterAbort.into_static(), err.kind());
err
})
}
Expand Down

0 comments on commit a7e1d00

Please sign in to comment.