diff --git a/analytic_engine/src/instance/read.rs b/analytic_engine/src/instance/read.rs index 09f261fd18..f769ec689d 100644 --- a/analytic_engine/src/instance/read.rs +++ b/analytic_engine/src/instance/read.rs @@ -106,16 +106,10 @@ impl Instance { let now = current_time_millis() as i64; let query_time_range = (end_time as f64 - start_time as f64) / 1000.0; - table_data - .metrics - .maybe_table_level_metrics() - .query_time_range - .observe(query_time_range); - + let table_metrics = table_data.metrics.maybe_table_level_metrics(); + table_metrics.query_time_range.observe(query_time_range); let since_start = (now as f64 - start_time as f64) / 1000.0; - table_data - .metrics - .maybe_table_level_metrics() + table_metrics .duration_since_query_query_start_time .observe(since_start); @@ -132,11 +126,7 @@ impl Instance { let sst_read_options = create_sst_read_option( ScanType::Query, self.scan_options.clone(), - table_data - .metrics - .maybe_table_level_metrics() - .sst_metrics - .clone(), + table_metrics.sst_metrics.clone(), table_options.num_rows_per_row_group, request.projected_schema.clone(), request.predicate.clone(), diff --git a/analytic_engine/src/sst/metrics.rs b/analytic_engine/src/sst/metrics.rs index 3ae4e1f9fc..5200181d75 100644 --- a/analytic_engine/src/sst/metrics.rs +++ b/analytic_engine/src/sst/metrics.rs @@ -12,10 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::atomic::{AtomicU64, Ordering}; + use lazy_static::lazy_static; use prometheus::{ - exponential_buckets, register_counter, register_histogram, register_int_counter_vec, Counter, - Histogram, IntCounter, IntCounterVec, + exponential_buckets, register_counter, register_histogram, register_histogram_vec, + register_int_counter_vec, Counter, Histogram, HistogramVec, IntCounter, IntCounterVec, }; lazy_static! { @@ -48,12 +50,21 @@ lazy_static! { "The counter for row group after prune", &["table"] ).unwrap(); + + static ref FETCHED_SST_BYTES_HISTOGRAM: HistogramVec = register_histogram_vec!( + "fetched_sst_bytes", + "Histogram for sst get range length", + &["table"], + exponential_buckets(100.0, 2.0, 5).unwrap() + ).unwrap(); } #[derive(Debug)] pub struct MaybeTableLevelMetrics { pub row_group_before_prune_counter: IntCounter, pub row_group_after_prune_counter: IntCounter, + pub fetched_sst_bytes_hist: Histogram, + pub fetched_sst_bytes: AtomicU64, } impl MaybeTableLevelMetrics { @@ -63,6 +74,20 @@ impl MaybeTableLevelMetrics { .with_label_values(&[table]), row_group_after_prune_counter: ROW_GROUP_AFTER_PRUNE_COUNTER .with_label_values(&[table]), + fetched_sst_bytes_hist: FETCHED_SST_BYTES_HISTOGRAM.with_label_values(&[table]), + fetched_sst_bytes: AtomicU64::new(0), } } + + #[inline] + pub fn observe_fetched_sst_bytes(&self) { + self.fetched_sst_bytes_hist + .observe(self.fetched_sst_bytes.load(Ordering::Relaxed) as f64) + } +} + +impl Drop for MaybeTableLevelMetrics { + fn drop(&mut self) { + self.observe_fetched_sst_bytes(); + } } diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index d4d378adef..8e34c125d5 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -17,7 +17,7 @@ use std::{ ops::Range, pin::Pin, - sync::Arc, + sync::{atomic::Ordering, Arc}, task::{Context, Poll}, time::{Duration, Instant}, }; @@ -43,7 +43,10 @@ use parquet::{ arrow::{arrow_reader::RowSelection, ParquetRecordBatchStreamBuilder, ProjectionMask}, file::metadata::RowGroupMetaData, }; -use parquet_ext::{meta_data::ChunkReader, reader::ObjectStoreReader}; +use parquet_ext::{ + meta_data::ChunkReader, + reader::{MetricsObserver, ObjectStoreReader}, +}; use runtime::{AbortOnDropMany, JoinHandle, Runtime}; use snafu::ResultExt; use table_engine::predicate::PredicateRef; @@ -235,7 +238,6 @@ impl<'a> Reader<'a> { } // TODO: remove it and use the suggested api. - #[allow(deprecated)] async fn fetch_record_batch_streams( &mut self, suggested_parallelism: usize, @@ -307,11 +309,15 @@ impl<'a> Reader<'a> { ); let mut streams = Vec::with_capacity(target_row_group_chunks.len()); + let metrics_collector = ObjectStoreMetricsObserver { + table_level_sst_metrics: self.table_level_sst_metrics.clone(), + }; for chunk in target_row_group_chunks { - let object_store_reader = ObjectStoreReader::new( + let object_store_reader = ObjectStoreReader::with_metrics( self.store.clone(), self.path.clone(), parquet_metadata.clone(), + metrics_collector.clone(), ); let mut builder = ParquetRecordBatchStreamBuilder::new(object_store_reader) .await @@ -323,7 +329,7 @@ impl<'a> Reader<'a> { debug!( "Build row selection for file path:{}, result:{row_selection:?}, page indexes:{}", self.path, - parquet_metadata.page_indexes().is_some() + parquet_metadata.column_index().is_some() ); if let Some(selection) = row_selection { builder = builder.with_row_selection(selection); @@ -755,6 +761,23 @@ impl<'a> SstReader for ThreadedReader<'a> { } } +#[derive(Clone)] +struct ObjectStoreMetricsObserver { + table_level_sst_metrics: Arc, +} + +impl MetricsObserver for ObjectStoreMetricsObserver { + fn elapsed(&self, path: &Path, elapsed: Duration) { + debug!("ObjectStoreReader dropped, path:{path}, elapsed:{elapsed:?}",); + } + + fn num_bytes_fetched(&self, _: &Path, num_bytes: usize) { + self.table_level_sst_metrics + .fetched_sst_bytes + .fetch_add(num_bytes as u64, Ordering::Relaxed); + } +} + #[cfg(test)] mod tests { use std::{ diff --git a/analytic_engine/src/table/metrics.rs b/analytic_engine/src/table/metrics.rs index ea3b0c526c..c6b85917d0 100644 --- a/analytic_engine/src/table/metrics.rs +++ b/analytic_engine/src/table/metrics.rs @@ -165,12 +165,11 @@ impl From<&AtomicTableStats> for TableStats { /// Now the registered labels won't remove from the metrics vec to avoid panic /// on concurrent removal. pub struct Metrics { - // Stats of a single table. + /// The table name used for metric label + maybe_table_name: String, + /// Stats of a single table. stats: Arc, - // Maybe table level sst metrics - maybe_table_level_metrics: Arc, - compaction_input_sst_size_histogram: Histogram, compaction_output_sst_size_histogram: Histogram, compaction_input_sst_row_num_histogram: Histogram, @@ -193,8 +192,8 @@ pub struct Metrics { impl Default for Metrics { fn default() -> Self { Self { + maybe_table_name: DEFAULT_METRICS_KEY.to_string(), stats: Arc::new(AtomicTableStats::default()), - maybe_table_level_metrics: Arc::new(MaybeTableLevelMetrics::new(DEFAULT_METRICS_KEY)), compaction_input_sst_size_histogram: TABLE_COMPACTION_SST_SIZE_HISTOGRAM .with_label_values(&["input"]), compaction_output_sst_size_histogram: TABLE_COMPACTION_SST_SIZE_HISTOGRAM @@ -290,16 +289,15 @@ impl<'a> MetricsContext<'a> { impl Metrics { pub fn new(mut metric_ctx: MetricsContext) -> Self { Self { - maybe_table_level_metrics: Arc::new(MaybeTableLevelMetrics::new( - metric_ctx.maybe_table_name(), - )), + maybe_table_name: metric_ctx.maybe_table_name().to_string(), ..Default::default() } } + /// Generate a table-level metric observer. #[inline] pub fn maybe_table_level_metrics(&self) -> Arc { - self.maybe_table_level_metrics.clone() + Arc::new(MaybeTableLevelMetrics::new(&self.maybe_table_name)) } #[inline] diff --git a/components/parquet_ext/src/meta_data.rs b/components/parquet_ext/src/meta_data.rs index fa2dd6b01d..edae1e01e5 100644 --- a/components/parquet_ext/src/meta_data.rs +++ b/components/parquet_ext/src/meta_data.rs @@ -23,7 +23,7 @@ use parquet::{ file::{footer, metadata::ParquetMetaData}, }; -use crate::reader::ObjectStoreReader; +use crate::reader::{NoopMetricsObserver, ObjectStoreReader}; #[async_trait] pub trait ChunkReader: Sync + Send { @@ -86,7 +86,7 @@ pub async fn fetch_parquet_metadata( /// TODO: Currently there is no method to build page indexes for meta data in /// `parquet`, maybe we can write a issue in `arrow-rs` . pub async fn meta_with_page_indexes( - object_store_reader: ObjectStoreReader, + object_store_reader: ObjectStoreReader, ) -> Result> { let read_options = ArrowReaderOptions::new().with_page_index(true); let builder = diff --git a/components/parquet_ext/src/reader.rs b/components/parquet_ext/src/reader.rs index 389779d792..57d58671f0 100644 --- a/components/parquet_ext/src/reader.rs +++ b/components/parquet_ext/src/reader.rs @@ -12,60 +12,94 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{ops::Range, sync::Arc, time::Instant}; +use std::{ + ops::Range, + sync::Arc, + time::{Duration, Instant}, +}; use bytes::Bytes; use futures::{ future::{BoxFuture, FutureExt}, TryFutureExt, }; -use logger::debug; use object_store::{ObjectStoreRef, Path}; use parquet::{arrow::async_reader::AsyncFileReader, file::metadata::ParquetMetaData}; -/// Implemention AsyncFileReader based on `ObjectStore` -/// -/// TODO: Perhaps we should avoid importing `object_store` in `parquet_ext` to -/// keep the crate `parquet_ext` more pure. +/// The observer for metrics of [ObjectStoreReader]. +pub trait MetricsObserver: Send { + fn elapsed(&self, path: &Path, elapsed: Duration); + fn num_bytes_fetched(&self, path: &Path, num_bytes: usize); +} + +#[derive(Debug, Clone)] +pub struct NoopMetricsObserver; + +impl MetricsObserver for NoopMetricsObserver { + fn elapsed(&self, _: &Path, _: Duration) {} + + fn num_bytes_fetched(&self, _: &Path, _: usize) {} +} + +/// The implementation based on `ObjectStore` for [`AsyncFileReader`]. #[derive(Clone)] -pub struct ObjectStoreReader { +pub struct ObjectStoreReader { storage: ObjectStoreRef, path: Path, meta_data: Arc, begin: Instant, + metrics: T, } -impl ObjectStoreReader { +impl ObjectStoreReader { pub fn new(storage: ObjectStoreRef, path: Path, meta_data: Arc) -> Self { + Self::with_metrics(storage, path, meta_data, NoopMetricsObserver) + } +} + +impl ObjectStoreReader { + pub fn with_metrics( + storage: ObjectStoreRef, + path: Path, + meta_data: Arc, + metrics: T, + ) -> Self { Self { storage, path, meta_data, begin: Instant::now(), + metrics, } } } -impl Drop for ObjectStoreReader { +impl Drop for ObjectStoreReader { fn drop(&mut self) { - debug!( - "ObjectStoreReader dropped, path:{}, elapsed:{:?}", - &self.path, - self.begin.elapsed() - ); + self.metrics.elapsed(&self.path, self.begin.elapsed()) } } -impl AsyncFileReader for ObjectStoreReader { +impl AsyncFileReader for ObjectStoreReader { fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { - self.storage - .get_range(&self.path, range) - .map_err(|e| { - parquet::errors::ParquetError::General(format!( - "Failed to fetch range from object store, err:{e}" - )) - }) - .boxed() + async move { + let get_res = self + .storage + .get_range(&self.path, range) + .map_err(|e| { + parquet::errors::ParquetError::General(format!( + "Failed to fetch range from object store, err:{e}" + )) + }) + .await; + + if let Ok(bytes) = &get_res { + self.metrics.num_bytes_fetched(&self.path, bytes.len()); + } + + get_res + } + .boxed() } fn get_byte_ranges(