diff --git a/analytic_engine/src/sst/metrics.rs b/analytic_engine/src/sst/metrics.rs new file mode 100644 index 0000000000..35be0545b7 --- /dev/null +++ b/analytic_engine/src/sst/metrics.rs @@ -0,0 +1,14 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +use lazy_static::lazy_static; +use prometheus::{exponential_buckets, register_histogram, Histogram}; + +lazy_static! { + // Histogram: + // Buckets: 100B,200B,400B,...,2KB + pub static ref SST_GET_RANGE_HISTOGRAM: Histogram = register_histogram!( + "sst_get_range_length", + "Histogram for sst get range length", + exponential_buckets(100.0, 2.0, 5).unwrap() + ).unwrap(); +} diff --git a/analytic_engine/src/sst/mod.rs b/analytic_engine/src/sst/mod.rs index 71f7d3c037..76797fb342 100644 --- a/analytic_engine/src/sst/mod.rs +++ b/analytic_engine/src/sst/mod.rs @@ -7,5 +7,6 @@ pub mod factory; pub mod file; pub mod manager; pub mod meta_cache; +pub mod metrics; pub mod parquet; pub mod reader; diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index a801dfcfb4..7c59d34c04 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -27,6 +27,7 @@ use parquet::{ file::metadata::RowGroupMetaData, }; use parquet_ext::ParquetMetaDataRef; +use prometheus::local::LocalHistogram; use snafu::ResultExt; use table_engine::predicate::PredicateRef; use tokio::sync::mpsc::{self, Receiver, Sender}; @@ -36,6 +37,7 @@ use crate::{ factory::SstReaderOptions, file::{BloomFilter, SstMetaData}, meta_cache::{MetaCacheRef, MetaData}, + metrics, parquet::{encoding::ParquetDecoder, row_group_filter::RowGroupFilter}, reader::{error::*, Result, SstReader}, }, @@ -196,9 +198,10 @@ impl<'a> Reader<'a> { } } -#[derive(Debug, Default)] +#[derive(Debug)] struct ReaderMetrics { bytes_scanned: usize, + sst_get_range_length_histogram: LocalHistogram, } struct ObjectStoreReader { @@ -214,7 +217,10 @@ impl ObjectStoreReader { storage, path, meta_data, - metrics: Default::default(), + metrics: ReaderMetrics { + bytes_scanned: 0, + sst_get_range_length_histogram: metrics::SST_GET_RANGE_HISTOGRAM.local(), + }, } } } @@ -228,17 +234,43 @@ impl Drop for ObjectStoreReader { impl AsyncFileReader for ObjectStoreReader { fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { self.metrics.bytes_scanned += range.end - range.start; + self.metrics + .sst_get_range_length_histogram + .observe((range.end - range.start) as f64); self.storage .get_range(&self.path, range) .map_err(|e| { parquet::errors::ParquetError::General(format!( - "ObjectStoreReader::get_bytes error: {}", + "Failed to fetch range from object store, err:{}", e )) }) .boxed() } + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> { + for range in &ranges { + self.metrics + .sst_get_range_length_histogram + .observe((range.end - range.start) as f64); + } + async move { + self.storage + .get_ranges(&self.path, &ranges) + .map_err(|e| { + parquet::errors::ParquetError::General(format!( + "Failed to fetch ranges from object store, err:{}", + e + )) + }) + .await + } + .boxed() + } + fn get_metadata( &mut self, ) -> BoxFuture<'_, parquet::errors::Result>> {