diff --git a/core/src/layers/prometheus_client.rs b/core/src/layers/prometheus_client.rs index 3410704061f5..ab12badaef95 100644 --- a/core/src/layers/prometheus_client.rs +++ b/core/src/layers/prometheus_client.rs @@ -15,16 +15,16 @@ // specific language governing permissions and limitations // under the License. +use std::fmt; use std::fmt::Debug; -use std::fmt::Formatter; use std::sync::Arc; use std::time::Duration; use std::time::Instant; use bytes::Buf; -use futures::TryFutureExt; use prometheus_client::encoding::EncodeLabel; use prometheus_client::encoding::EncodeLabelSet; +use prometheus_client::encoding::LabelSetEncoder; use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::family::Family; use prometheus_client::metrics::histogram; @@ -79,7 +79,7 @@ use crate::*; /// Ok(()) /// } /// ``` -#[derive(Debug, Clone)] +#[derive(Clone, Debug)] pub struct PrometheusClientLayer { metrics: PrometheusClientMetricDefinitions, } @@ -95,7 +95,7 @@ impl PrometheusClientLayer { } impl Layer for PrometheusClientLayer { - type LayeredAccess = PrometheusAccessor; + type LayeredAccess = PrometheusClientAccessor; fn layer(&self, inner: A) -> Self::LayeredAccess { let meta = inner.info(); @@ -105,11 +105,11 @@ impl Layer for PrometheusClientLayer { let defs = Arc::new(self.metrics.clone()); let metrics = PrometheusClientMetrics::new(defs, scheme, root, name); - PrometheusAccessor { inner, metrics } + PrometheusClientAccessor { inner, metrics } } } -#[derive(Debug, Clone, Hash, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq, Hash)] struct OperationLabels { op: &'static str, scheme: &'static str, @@ -118,37 +118,31 @@ struct OperationLabels { } impl EncodeLabelSet for OperationLabels { - fn encode( - &self, - mut encoder: prometheus_client::encoding::LabelSetEncoder, - ) -> std::result::Result<(), std::fmt::Error> { + fn encode(&self, mut encoder: LabelSetEncoder) -> Result<(), fmt::Error> { ("op", self.op).encode(encoder.encode_label())?; ("scheme", self.scheme).encode(encoder.encode_label())?; - ("namespace", self.namespace.as_str()).encode(encoder.encode_label())?; ("root", self.root.as_str()).encode(encoder.encode_label())?; + ("namespace", self.namespace.as_str()).encode(encoder.encode_label())?; Ok(()) } } -#[derive(Debug, Clone, Hash, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq, Hash)] struct ErrorLabels { op: &'static str, scheme: &'static str, - err: &'static str, root: Arc, namespace: Arc, + error: &'static str, } impl EncodeLabelSet for ErrorLabels { - fn encode( - &self, - mut encoder: prometheus_client::encoding::LabelSetEncoder, - ) -> std::result::Result<(), std::fmt::Error> { + fn encode(&self, mut encoder: LabelSetEncoder) -> Result<(), fmt::Error> { ("op", self.op).encode(encoder.encode_label())?; ("scheme", self.scheme).encode(encoder.encode_label())?; - ("error", self.err).encode(encoder.encode_label())?; - ("namespace", self.namespace.as_str()).encode(encoder.encode_label())?; ("root", self.root.as_str()).encode(encoder.encode_label())?; + ("namespace", self.namespace.as_str()).encode(encoder.encode_label())?; + ("error", self.error).encode(encoder.encode_label())?; Ok(()) } } @@ -201,11 +195,7 @@ impl PrometheusClientMetricDefinitions { ); registry.register("opendal_bytes", "Total size of bytes", bytes_total.clone()); - registry.register( - "opendal_bytes_histogram", - "The size of bytes", - bytes.clone(), - ); + registry.register("opendal_bytes", "The size of bytes", bytes.clone()); Self { requests_total, @@ -240,78 +230,73 @@ impl PrometheusClientMetrics { } } + fn increment_request_total(&self, labels: &OperationLabels) { + self.metrics.requests_total.get_or_create(labels).inc(); + } + fn increment_errors_total(&self, op: Operation, err: ErrorKind) { - let labels = ErrorLabels { - op: op.into_static(), - scheme: self.scheme.into_static(), - err: err.into_static(), - root: self.root.clone(), - namespace: self.name.clone(), - }; + let labels = self.gen_error_labels(op, err); self.metrics.errors_total.get_or_create(&labels).inc(); } - fn increment_request_total(&self, op: Operation) { - let labels = OperationLabels { - op: op.into_static(), - scheme: self.scheme.into_static(), - root: self.root.clone(), - namespace: self.name.clone(), - }; - self.metrics.requests_total.get_or_create(&labels).inc(); + fn observe_request_duration(&self, labels: &OperationLabels, duration: Duration) { + self.metrics + .request_duration_seconds + .get_or_create(labels) + .observe(duration.as_secs_f64()); + } + + fn observe_bytes_total(&self, labels: &OperationLabels, bytes: usize) { + self.metrics + .bytes_total + .get_or_create(labels) + .inc_by(bytes as u64); + self.metrics + .bytes + .get_or_create(labels) + .observe(bytes as f64); } - fn observe_bytes_total(&self, op: Operation, bytes: usize) { - let labels = OperationLabels { + fn gen_operation_labels(&self, op: Operation) -> OperationLabels { + OperationLabels { op: op.into_static(), scheme: self.scheme.into_static(), root: self.root.clone(), namespace: self.name.clone(), - }; - self.metrics - .bytes - .get_or_create(&labels) - .observe(bytes as f64); - self.metrics - .bytes_total - .get_or_create(&labels) - .inc_by(bytes as u64); + } } - fn observe_request_duration(&self, op: Operation, duration: Duration) { - let labels = OperationLabels { + fn gen_error_labels(&self, op: Operation, err: ErrorKind) -> ErrorLabels { + ErrorLabels { op: op.into_static(), scheme: self.scheme.into_static(), root: self.root.clone(), namespace: self.name.clone(), - }; - self.metrics - .request_duration_seconds - .get_or_create(&labels) - .observe(duration.as_secs_f64()); + error: err.into_static(), + } } } #[derive(Clone)] -pub struct PrometheusAccessor { +pub struct PrometheusClientAccessor { inner: A, metrics: PrometheusClientMetrics, } -impl Debug for PrometheusAccessor { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { +impl Debug for PrometheusClientAccessor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("PrometheusAccessor") .field("inner", &self.inner) .finish_non_exhaustive() } } -impl LayeredAccess for PrometheusAccessor { +impl LayeredAccess for PrometheusClientAccessor { type Inner = A; - type Reader = PrometheusMetricWrapper; - type BlockingReader = PrometheusMetricWrapper; - type Writer = PrometheusMetricWrapper; - type BlockingWriter = PrometheusMetricWrapper; + type Reader = PrometheusClientMetricWrapper; + type BlockingReader = PrometheusClientMetricWrapper; + type Writer = PrometheusClientMetricWrapper; + type BlockingWriter = PrometheusClientMetricWrapper; type Lister = A::Lister; type BlockingLister = A::BlockingLister; @@ -320,301 +305,320 @@ impl LayeredAccess for PrometheusAccessor { } async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { - self.metrics.increment_request_total(Operation::CreateDir); + let op = Operation::CreateDir; + let labels = self.metrics.gen_operation_labels(op); - let start_time = Instant::now(); - let create_res = self.inner.create_dir(path, args).await; + self.metrics.increment_request_total(&labels); + let start = Instant::now(); + let result = self.inner.create_dir(path, args).await; self.metrics - .observe_request_duration(Operation::CreateDir, start_time.elapsed()); + .observe_request_duration(&labels, start.elapsed()); - create_res.map_err(|e| { - self.metrics - .increment_errors_total(Operation::CreateDir, e.kind()); - e + result.map_err(|err| { + self.metrics.increment_errors_total(op, err.kind()); + err }) } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - self.metrics.increment_request_total(Operation::Read); + let op = Operation::Read; + let labels = self.metrics.gen_operation_labels(op); - let start = Instant::now(); - let res = self.inner.read(path, args).await; + self.metrics.increment_request_total(&labels); + let start = Instant::now(); + let result = self.inner.read(path, args).await; self.metrics - .observe_request_duration(Operation::Read, start.elapsed()); + .observe_request_duration(&labels, start.elapsed()); - match res { - Ok((rp, r)) => Ok((rp, PrometheusMetricWrapper::new(r, self.metrics.clone()))), + match result { + Ok((rp, r)) => Ok(( + rp, + PrometheusClientMetricWrapper::new(r, self.metrics.clone()), + )), Err(err) => { - self.metrics - .increment_errors_total(Operation::Read, err.kind()); + self.metrics.increment_errors_total(op, err.kind()); Err(err) } } } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - self.metrics.increment_request_total(Operation::Write); + let op = Operation::Write; + let labels = self.metrics.gen_operation_labels(op); - let start = Instant::now(); - let res = self.inner.write(path, args).await; + self.metrics.increment_request_total(&labels); + let start = Instant::now(); + let result = self.inner.write(path, args).await; self.metrics - .observe_request_duration(Operation::Write, start.elapsed()); + .observe_request_duration(&labels, start.elapsed()); - match res { - Ok((rp, w)) => Ok((rp, PrometheusMetricWrapper::new(w, self.metrics.clone()))), + match result { + Ok((rp, w)) => Ok(( + rp, + PrometheusClientMetricWrapper::new(w, self.metrics.clone()), + )), Err(err) => { - self.metrics - .increment_errors_total(Operation::Write, err.kind()); + self.metrics.increment_errors_total(op, err.kind()); Err(err) } } } async fn stat(&self, path: &str, args: OpStat) -> Result { - self.metrics.increment_request_total(Operation::Stat); + let op = Operation::Stat; + let labels = self.metrics.gen_operation_labels(op); - let start_time = Instant::now(); - let stat_res = self - .inner - .stat(path, args) - .inspect_err(|e| { - self.metrics - .increment_errors_total(Operation::Stat, e.kind()); - }) - .await; + self.metrics.increment_request_total(&labels); + let start = Instant::now(); + let result = self.inner.stat(path, args).await; self.metrics - .observe_request_duration(Operation::Stat, start_time.elapsed()); + .observe_request_duration(&labels, start.elapsed()); - stat_res.map_err(|e| { - self.metrics - .increment_errors_total(Operation::Stat, e.kind()); - e + result.map_err(|err| { + self.metrics.increment_errors_total(op, err.kind()); + err }) } async fn delete(&self, path: &str, args: OpDelete) -> Result { - self.metrics.increment_request_total(Operation::Delete); + let op = Operation::Delete; + let labels = self.metrics.gen_operation_labels(op); - let start_time = Instant::now(); - let delete_res = self.inner.delete(path, args).await; + self.metrics.increment_request_total(&labels); + let start = Instant::now(); + let result = self.inner.delete(path, args).await; self.metrics - .observe_request_duration(Operation::Delete, start_time.elapsed()); + .observe_request_duration(&labels, start.elapsed()); - delete_res.map_err(|e| { - self.metrics - .increment_errors_total(Operation::Delete, e.kind()); - e + result.map_err(|err| { + self.metrics.increment_errors_total(op, err.kind()); + err }) } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - self.metrics.increment_request_total(Operation::List); + let op = Operation::List; + let labels = self.metrics.gen_operation_labels(op); - let start_time = Instant::now(); - let list_res = self.inner.list(path, args).await; + self.metrics.increment_request_total(&labels); + let start = Instant::now(); + let result = self.inner.list(path, args).await; self.metrics - .observe_request_duration(Operation::List, start_time.elapsed()); + .observe_request_duration(&labels, start.elapsed()); - list_res.map_err(|e| { - self.metrics - .increment_errors_total(Operation::List, e.kind()); - e + result.map_err(|err| { + self.metrics.increment_errors_total(op, err.kind()); + err }) } async fn batch(&self, args: OpBatch) -> Result { - self.metrics.increment_request_total(Operation::Batch); + let op = Operation::Batch; + let labels = self.metrics.gen_operation_labels(op); - let start_time = Instant::now(); - let result = self.inner.batch(args).await; + self.metrics.increment_request_total(&labels); + let start = Instant::now(); + let result = self.inner.batch(args).await; self.metrics - .observe_request_duration(Operation::Batch, start_time.elapsed()); + .observe_request_duration(&labels, start.elapsed()); - result.map_err(|e| { - self.metrics - .increment_errors_total(Operation::Batch, e.kind()); - e + result.map_err(|err| { + self.metrics.increment_errors_total(op, err.kind()); + err }) } async fn presign(&self, path: &str, args: OpPresign) -> Result { - self.metrics.increment_request_total(Operation::Presign); + let op = Operation::Presign; + let labels = self.metrics.gen_operation_labels(op); - let start_time = Instant::now(); - let result = self.inner.presign(path, args).await; + self.metrics.increment_request_total(&labels); + let start = Instant::now(); + let result = self.inner.presign(path, args).await; self.metrics - .observe_request_duration(Operation::Presign, start_time.elapsed()); + .observe_request_duration(&labels, start.elapsed()); - result.map_err(|e| { - self.metrics - .increment_errors_total(Operation::Presign, e.kind()); - e + result.map_err(|err| { + self.metrics.increment_errors_total(op, err.kind()); + err }) } fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { - self.metrics - .increment_request_total(Operation::BlockingCreateDir); + let op = Operation::BlockingCreateDir; + let labels = self.metrics.gen_operation_labels(op); - let start_time = Instant::now(); - let result = self.inner.blocking_create_dir(path, args); + self.metrics.increment_request_total(&labels); + let start = Instant::now(); + let result = self.inner.blocking_create_dir(path, args); self.metrics - .observe_request_duration(Operation::BlockingCreateDir, start_time.elapsed()); + .observe_request_duration(&labels, start.elapsed()); - result.map_err(|e| { - self.metrics - .increment_errors_total(Operation::BlockingCreateDir, e.kind()); - e + result.map_err(|err| { + self.metrics.increment_errors_total(op, err.kind()); + err }) } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - self.metrics - .increment_request_total(Operation::BlockingRead); + let op = Operation::BlockingRead; + let labels = self.metrics.gen_operation_labels(op); + + self.metrics.increment_request_total(&labels); - let result = self - .inner - .blocking_read(path, args) - .map(|(rp, r)| (rp, PrometheusMetricWrapper::new(r, self.metrics.clone()))); + let result = self.inner.blocking_read(path, args).map(|(rp, r)| { + ( + rp, + PrometheusClientMetricWrapper::new(r, self.metrics.clone()), + ) + }); - result.map_err(|e| { - self.metrics - .increment_errors_total(Operation::BlockingRead, e.kind()); - e + result.map_err(|err| { + self.metrics.increment_errors_total(op, err.kind()); + err }) } fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - self.metrics - .increment_request_total(Operation::BlockingWrite); + let op = Operation::BlockingWrite; + let labels = self.metrics.gen_operation_labels(op); + + self.metrics.increment_request_total(&labels); - let result = self - .inner - .blocking_write(path, args) - .map(|(rp, r)| (rp, PrometheusMetricWrapper::new(r, self.metrics.clone()))); + let result = self.inner.blocking_write(path, args).map(|(rp, r)| { + ( + rp, + PrometheusClientMetricWrapper::new(r, self.metrics.clone()), + ) + }); - result.map_err(|e| { - self.metrics - .increment_errors_total(Operation::BlockingWrite, e.kind()); - e + result.map_err(|err| { + self.metrics.increment_errors_total(op, err.kind()); + err }) } fn blocking_stat(&self, path: &str, args: OpStat) -> Result { - self.metrics - .increment_request_total(Operation::BlockingStat); + let op = Operation::BlockingList; + let labels = self.metrics.gen_operation_labels(op); - let start_time = Instant::now(); - let result = self.inner.blocking_stat(path, args); + self.metrics.increment_request_total(&labels); + let start = Instant::now(); + let result = self.inner.blocking_stat(path, args); self.metrics - .observe_request_duration(Operation::BlockingStat, start_time.elapsed()); + .observe_request_duration(&labels, start.elapsed()); - result.map_err(|e| { - self.metrics - .increment_errors_total(Operation::BlockingStat, e.kind()); - e + result.map_err(|err| { + self.metrics.increment_errors_total(op, err.kind()); + err }) } fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { - self.metrics - .increment_request_total(Operation::BlockingDelete); + let op = Operation::BlockingDelete; + let labels = self.metrics.gen_operation_labels(op); - let start_time = Instant::now(); - let result = self.inner.blocking_delete(path, args); + self.metrics.increment_request_total(&labels); + let start = Instant::now(); + let result = self.inner.blocking_delete(path, args); self.metrics - .observe_request_duration(Operation::BlockingDelete, start_time.elapsed()); + .observe_request_duration(&labels, start.elapsed()); - result.map_err(|e| { - self.metrics - .increment_errors_total(Operation::BlockingDelete, e.kind()); - e + result.map_err(|err| { + self.metrics.increment_errors_total(op, err.kind()); + err }) } fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { - self.metrics - .increment_request_total(Operation::BlockingList); + let op = Operation::BlockingList; + let labels = self.metrics.gen_operation_labels(op); - let start_time = Instant::now(); - let result = self.inner.blocking_list(path, args); + self.metrics.increment_request_total(&labels); + let start = Instant::now(); + let result = self.inner.blocking_list(path, args); self.metrics - .observe_request_duration(Operation::BlockingList, start_time.elapsed()); + .observe_request_duration(&labels, start.elapsed()); - result.map_err(|e| { - self.metrics - .increment_errors_total(Operation::BlockingList, e.kind()); - e + result.map_err(|err| { + self.metrics.increment_errors_total(op, err.kind()); + err }) } } -pub struct PrometheusMetricWrapper { +pub struct PrometheusClientMetricWrapper { inner: R, metrics: PrometheusClientMetrics, } -impl PrometheusMetricWrapper { +impl PrometheusClientMetricWrapper { fn new(inner: R, metrics: PrometheusClientMetrics) -> Self { Self { inner, metrics } } } -impl oio::Read for PrometheusMetricWrapper { +impl oio::Read for PrometheusClientMetricWrapper { async fn read(&mut self) -> Result { + let op = Operation::ReaderRead; + let labels = self.metrics.gen_operation_labels(op); + let start = Instant::now(); match self.inner.read().await { Ok(bs) => { + self.metrics.observe_bytes_total(&labels, bs.remaining()); self.metrics - .observe_bytes_total(Operation::ReaderRead, bs.remaining()); - self.metrics - .observe_request_duration(Operation::ReaderRead, start.elapsed()); + .observe_request_duration(&labels, start.elapsed()); Ok(bs) } - Err(e) => { - self.metrics - .increment_errors_total(Operation::ReaderRead, e.kind()); - Err(e) + Err(err) => { + self.metrics.increment_errors_total(op, err.kind()); + Err(err) } } } } -impl oio::BlockingRead for PrometheusMetricWrapper { +impl oio::BlockingRead for PrometheusClientMetricWrapper { fn read(&mut self) -> Result { + let op = Operation::BlockingReaderRead; + let labels = self.metrics.gen_operation_labels(op); + let start = Instant::now(); + self.inner .read() .map(|bs| { + self.metrics.observe_bytes_total(&labels, bs.remaining()); self.metrics - .observe_bytes_total(Operation::BlockingReaderRead, bs.remaining()); - self.metrics - .observe_request_duration(Operation::BlockingReaderRead, start.elapsed()); + .observe_request_duration(&labels, start.elapsed()); bs }) - .map_err(|e| { - self.metrics - .increment_errors_total(Operation::BlockingReaderRead, e.kind()); - e + .map_err(|err| { + self.metrics.increment_errors_total(op, err.kind()); + err }) } } -impl oio::Write for PrometheusMetricWrapper { +impl oio::Write for PrometheusClientMetricWrapper { async fn write(&mut self, bs: Buffer) -> Result<()> { + let op = Operation::WriterWrite; + let labels = self.metrics.gen_operation_labels(op); + let start = Instant::now(); let size = bs.len(); @@ -622,19 +626,20 @@ impl oio::Write for PrometheusMetricWrapper { .write(bs) .await .map(|_| { + self.metrics.observe_bytes_total(&labels, size); self.metrics - .observe_bytes_total(Operation::WriterWrite, size); - self.metrics - .observe_request_duration(Operation::WriterWrite, start.elapsed()); + .observe_request_duration(&labels, start.elapsed()); }) .map_err(|err| { - self.metrics - .increment_errors_total(Operation::WriterWrite, err.kind()); + self.metrics.increment_errors_total(op, err.kind()); err }) } async fn close(&mut self) -> Result<()> { + let op = Operation::WriterClose; + let labels = self.metrics.gen_operation_labels(op); + let start = Instant::now(); self.inner @@ -642,16 +647,18 @@ impl oio::Write for PrometheusMetricWrapper { .await .map(|_| { self.metrics - .observe_request_duration(Operation::WriterClose, start.elapsed()); + .observe_request_duration(&labels, start.elapsed()); }) .map_err(|err| { - self.metrics - .increment_errors_total(Operation::WriterClose, err.kind()); + self.metrics.increment_errors_total(op, err.kind()); err }) } async fn abort(&mut self) -> Result<()> { + let op = Operation::WriterAbort; + let labels = self.metrics.gen_operation_labels(op); + let start = Instant::now(); self.inner @@ -659,48 +666,50 @@ impl oio::Write for PrometheusMetricWrapper { .await .map(|_| { self.metrics - .observe_request_duration(Operation::WriterAbort, start.elapsed()); + .observe_request_duration(&labels, start.elapsed()); }) .map_err(|err| { - self.metrics - .increment_errors_total(Operation::WriterAbort, err.kind()); + self.metrics.increment_errors_total(op, err.kind()); err }) } } -impl oio::BlockingWrite for PrometheusMetricWrapper { +impl oio::BlockingWrite for PrometheusClientMetricWrapper { fn write(&mut self, bs: Buffer) -> Result<()> { + let op = Operation::BlockingWriterWrite; + let labels = self.metrics.gen_operation_labels(op); + let start = Instant::now(); let size = bs.len(); self.inner .write(bs) .map(|_| { + self.metrics.observe_bytes_total(&labels, size); self.metrics - .observe_bytes_total(Operation::BlockingWriterWrite, size); - self.metrics - .observe_request_duration(Operation::BlockingWriterWrite, start.elapsed()); + .observe_request_duration(&labels, start.elapsed()); }) .map_err(|err| { - self.metrics - .increment_errors_total(Operation::BlockingWriterWrite, err.kind()); + self.metrics.increment_errors_total(op, err.kind()); err }) } fn close(&mut self) -> Result<()> { + let op = Operation::BlockingWriterClose; + let labels = self.metrics.gen_operation_labels(op); + let start = Instant::now(); self.inner .close() .map(|_| { self.metrics - .observe_request_duration(Operation::BlockingWriterClose, start.elapsed()); + .observe_request_duration(&labels, start.elapsed()); }) .map_err(|err| { - self.metrics - .increment_errors_total(Operation::BlockingWriterClose, err.kind()); + self.metrics.increment_errors_total(op, err.kind()); err }) }