Skip to content

Commit

Permalink
fix(core/prometheus): Fix metrics from prometheus not correct for rea…
Browse files Browse the repository at this point in the history
…der (#4691)

Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo authored Jun 5, 2024
1 parent e948ec8 commit 3fb31a4
Show file tree
Hide file tree
Showing 2 changed files with 438 additions and 244 deletions.
235 changes: 159 additions & 76 deletions core/src/layers/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::fmt::Formatter;
use std::sync::Arc;

use bytes::Buf;
use futures::FutureExt;
use futures::TryFutureExt;
use log::debug;
use prometheus::core::AtomicU64;
Expand All @@ -32,6 +31,7 @@ use prometheus::register_int_counter_vec_with_registry;
use prometheus::HistogramVec;
use prometheus::Registry;

use crate::raw::oio::{ReadOperation, WriteOperation};
use crate::raw::Access;
use crate::raw::*;
use crate::*;
Expand Down Expand Up @@ -314,9 +314,11 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> {
.requests_duration_seconds
.with_label_values(&labels)
.start_timer();
let res = self.inner.read(path, args).await;
timer.observe_duration();

let read_res = self.inner.read(path, args).await.map(|(rp, r)| {
(
match res {
Ok((rp, r)) => Ok((
rp,
PrometheusMetricWrapper::new(
r,
Expand All @@ -325,13 +327,13 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> {
self.scheme,
&path.to_string(),
),
)
});
timer.observe_duration();
read_res.map_err(|e| {
self.stats.increment_errors_total(Operation::Read, e.kind());
e
})
)),
Err(err) => {
self.stats
.increment_errors_total(Operation::Read, err.kind());
Err(err)
}
}
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
Expand All @@ -347,31 +349,26 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> {
.requests_duration_seconds
.with_label_values(&labels)
.start_timer();

let write_res = self
.inner
.write(path, args)
.map(|v| {
v.map(|(rp, r)| {
(
rp,
PrometheusMetricWrapper::new(
r,
Operation::Write,
self.stats.clone(),
self.scheme,
&path.to_string(),
),
)
})
})
.await;
let res = self.inner.write(path, args).await;
timer.observe_duration();
write_res.map_err(|e| {
self.stats
.increment_errors_total(Operation::Write, e.kind());
e
})

match res {
Ok((rp, w)) => Ok((
rp,
PrometheusMetricWrapper::new(
w,
Operation::Write,
self.stats.clone(),
self.scheme,
&path.to_string(),
),
)),
Err(err) => {
self.stats
.increment_errors_total(Operation::Write, err.kind());
Err(err)
}
}
}

async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
Expand Down Expand Up @@ -683,10 +680,19 @@ impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> {
async fn read(&mut self) -> Result<Buffer> {
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
Operation::Read.into_static(),
ReadOperation::Read.into_static(),
&self.path,
);
match self.inner.read().await {

let timer = self
.stats
.requests_duration_seconds
.with_label_values(&labels)
.start_timer();
let res = self.inner.read().await;
timer.observe_duration();

match res {
Ok(bytes) => {
self.stats
.bytes_total
Expand All @@ -706,60 +712,111 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
fn read(&mut self) -> Result<Buffer> {
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
Operation::BlockingRead.into_static(),
ReadOperation::BlockingRead.into_static(),
&self.path,
);
self.inner
.read()
.map(|bs| {

let timer = self
.stats
.requests_duration_seconds
.with_label_values(&labels)
.start_timer();
let res = self.inner.read();
timer.observe_duration();

match res {
Ok(bs) => {
self.stats
.bytes_total
.with_label_values(&labels)
.observe(bs.remaining() as f64);
bs
})
.map_err(|e| {
self.stats.increment_errors_total(self.op, e.kind());
e
})
Ok(bs)
}
Err(err) => {
self.stats.increment_errors_total(self.op, err.kind());
Err(err)
}
}
}
}

impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
async fn write(&mut self, bs: Buffer) -> Result<usize> {
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
Operation::Write.into_static(),
WriteOperation::Write.into_static(),
&self.path,
);
self.inner
.write(bs)
.await
.map(|n| {

let timer = self
.stats
.requests_duration_seconds
.with_label_values(&labels)
.start_timer();
let res = self.inner.write(bs).await;
timer.observe_duration();

match res {
Ok(n) => {
self.stats
.bytes_total
.with_label_values(&labels)
.observe(n as f64);
n
})
.map_err(|err| {
Ok(n)
}
Err(err) => {
self.stats.increment_errors_total(self.op, err.kind());
err
})
Err(err)
}
}
}

async fn abort(&mut self) -> Result<()> {
self.inner.abort().await.map_err(|err| {
self.stats.increment_errors_total(self.op, err.kind());
err
})
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
WriteOperation::Abort.into_static(),
&self.path,
);

let timer = self
.stats
.requests_duration_seconds
.with_label_values(&labels)
.start_timer();
let res = self.inner.abort().await;
timer.observe_duration();

match res {
Ok(()) => Ok(()),
Err(err) => {
self.stats.increment_errors_total(self.op, err.kind());
Err(err)
}
}
}

async fn close(&mut self) -> Result<()> {
self.inner.close().await.map_err(|err| {
self.stats.increment_errors_total(self.op, err.kind());
err
})
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
WriteOperation::Close.into_static(),
&self.path,
);

let timer = self
.stats
.requests_duration_seconds
.with_label_values(&labels)
.start_timer();
let res = self.inner.close().await;
timer.observe_duration();

match res {
Ok(()) => Ok(()),
Err(err) => {
self.stats.increment_errors_total(self.op, err.kind());
Err(err)
}
}
}
}

Expand All @@ -770,26 +827,52 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
Operation::BlockingWrite.into_static(),
&self.path,
);
self.inner
.write(bs)
.map(|n| {

let timer = self
.stats
.requests_duration_seconds
.with_label_values(&labels)
.start_timer();
let res = self.inner.write(bs);
timer.observe_duration();

match res {
Ok(n) => {
self.stats
.bytes_total
.with_label_values(&labels)
.observe(n as f64);
n
})
.map_err(|err| {
Ok(n)
}
Err(err) => {
self.stats.increment_errors_total(self.op, err.kind());
err
})
Err(err)
}
}
}

fn close(&mut self) -> Result<()> {
self.inner.close().map_err(|err| {
self.stats.increment_errors_total(self.op, err.kind());
err
})
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
WriteOperation::BlockingClose.into_static(),
&self.path,
);

let timer = self
.stats
.requests_duration_seconds
.with_label_values(&labels)
.start_timer();
let res = self.inner.close();
timer.observe_duration();

match res {
Ok(()) => Ok(()),
Err(err) => {
self.stats.increment_errors_total(self.op, err.kind());
Err(err)
}
}
}
}

Expand Down
Loading

0 comments on commit 3fb31a4

Please sign in to comment.