Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support metrics for number of bytes fetched from object storage #1363

Merged
merged 3 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 4 additions & 14 deletions analytic_engine/src/instance/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,10 @@ impl Instance {
let now = current_time_millis() as i64;

let query_time_range = (end_time as f64 - start_time as f64) / 1000.0;
table_data
.metrics
.maybe_table_level_metrics()
.query_time_range
.observe(query_time_range);

let table_metrics = table_data.metrics.maybe_table_level_metrics();
table_metrics.query_time_range.observe(query_time_range);
let since_start = (now as f64 - start_time as f64) / 1000.0;
table_data
.metrics
.maybe_table_level_metrics()
table_metrics
.duration_since_query_query_start_time
.observe(since_start);

Expand All @@ -132,11 +126,7 @@ impl Instance {
let sst_read_options = create_sst_read_option(
ScanType::Query,
self.scan_options.clone(),
table_data
.metrics
.maybe_table_level_metrics()
.sst_metrics
.clone(),
table_metrics.sst_metrics.clone(),
table_options.num_rows_per_row_group,
request.projected_schema.clone(),
request.predicate.clone(),
Expand Down
29 changes: 27 additions & 2 deletions analytic_engine/src/sst/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::atomic::{AtomicU64, Ordering};

use lazy_static::lazy_static;
use prometheus::{
exponential_buckets, register_counter, register_histogram, register_int_counter_vec, Counter,
Histogram, IntCounter, IntCounterVec,
exponential_buckets, register_counter, register_histogram, register_histogram_vec,
register_int_counter_vec, Counter, Histogram, HistogramVec, IntCounter, IntCounterVec,
};

lazy_static! {
Expand Down Expand Up @@ -48,12 +50,21 @@ lazy_static! {
"The counter for row group after prune",
&["table"]
).unwrap();

static ref FETCHED_SST_BYTES_HISTOGRAM: HistogramVec = register_histogram_vec!(
"fetched_sst_bytes",
"Histogram for sst get range length",
&["table"],
exponential_buckets(100.0, 2.0, 5).unwrap()
).unwrap();
}

#[derive(Debug)]
pub struct MaybeTableLevelMetrics {
pub row_group_before_prune_counter: IntCounter,
pub row_group_after_prune_counter: IntCounter,
pub fetched_sst_bytes_hist: Histogram,
pub fetched_sst_bytes: AtomicU64,
}

impl MaybeTableLevelMetrics {
Expand All @@ -63,6 +74,20 @@ impl MaybeTableLevelMetrics {
.with_label_values(&[table]),
row_group_after_prune_counter: ROW_GROUP_AFTER_PRUNE_COUNTER
.with_label_values(&[table]),
fetched_sst_bytes_hist: FETCHED_SST_BYTES_HISTOGRAM.with_label_values(&[table]),
fetched_sst_bytes: AtomicU64::new(0),
}
}

#[inline]
pub fn observe_fetched_sst_bytes(&self) {
self.fetched_sst_bytes_hist
.observe(self.fetched_sst_bytes.load(Ordering::Relaxed) as f64)
}
}

impl Drop for MaybeTableLevelMetrics {
fn drop(&mut self) {
self.observe_fetched_sst_bytes();
}
}
33 changes: 28 additions & 5 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use std::{
ops::Range,
pin::Pin,
sync::Arc,
sync::{atomic::Ordering, Arc},
task::{Context, Poll},
time::{Duration, Instant},
};
Expand All @@ -43,7 +43,10 @@ use parquet::{
arrow::{arrow_reader::RowSelection, ParquetRecordBatchStreamBuilder, ProjectionMask},
file::metadata::RowGroupMetaData,
};
use parquet_ext::{meta_data::ChunkReader, reader::ObjectStoreReader};
use parquet_ext::{
meta_data::ChunkReader,
reader::{MetricsObserver, ObjectStoreReader},
};
use runtime::{AbortOnDropMany, JoinHandle, Runtime};
use snafu::ResultExt;
use table_engine::predicate::PredicateRef;
Expand Down Expand Up @@ -235,7 +238,6 @@ impl<'a> Reader<'a> {
}

// TODO: remove it and use the suggested api.
#[allow(deprecated)]
async fn fetch_record_batch_streams(
&mut self,
suggested_parallelism: usize,
Expand Down Expand Up @@ -307,11 +309,15 @@ impl<'a> Reader<'a> {
);

let mut streams = Vec::with_capacity(target_row_group_chunks.len());
let metrics_collector = ObjectStoreMetricsObserver {
table_level_sst_metrics: self.table_level_sst_metrics.clone(),
};
for chunk in target_row_group_chunks {
let object_store_reader = ObjectStoreReader::new(
let object_store_reader = ObjectStoreReader::with_metrics(
self.store.clone(),
self.path.clone(),
parquet_metadata.clone(),
metrics_collector.clone(),
);
let mut builder = ParquetRecordBatchStreamBuilder::new(object_store_reader)
.await
Expand All @@ -323,7 +329,7 @@ impl<'a> Reader<'a> {
debug!(
"Build row selection for file path:{}, result:{row_selection:?}, page indexes:{}",
self.path,
parquet_metadata.page_indexes().is_some()
parquet_metadata.column_index().is_some()
);
if let Some(selection) = row_selection {
builder = builder.with_row_selection(selection);
Expand Down Expand Up @@ -755,6 +761,23 @@ impl<'a> SstReader for ThreadedReader<'a> {
}
}

#[derive(Clone)]
struct ObjectStoreMetricsObserver {
table_level_sst_metrics: Arc<MaybeTableLevelMetrics>,
}

impl MetricsObserver for ObjectStoreMetricsObserver {
fn elapsed(&self, path: &Path, elapsed: Duration) {
debug!("ObjectStoreReader dropped, path:{path}, elapsed:{elapsed:?}",);
}

fn num_bytes_fetched(&self, _: &Path, num_bytes: usize) {
self.table_level_sst_metrics
.fetched_sst_bytes
.fetch_add(num_bytes as u64, Ordering::Relaxed);
}
}

#[cfg(test)]
mod tests {
use std::{
Expand Down
16 changes: 7 additions & 9 deletions analytic_engine/src/table/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,11 @@ impl From<&AtomicTableStats> for TableStats {
/// Now the registered labels won't remove from the metrics vec to avoid panic
/// on concurrent removal.
pub struct Metrics {
// Stats of a single table.
/// The table name used for metric label
maybe_table_name: String,
/// Stats of a single table.
stats: Arc<AtomicTableStats>,

// Maybe table level sst metrics
maybe_table_level_metrics: Arc<MaybeTableLevelMetrics>,

compaction_input_sst_size_histogram: Histogram,
compaction_output_sst_size_histogram: Histogram,
compaction_input_sst_row_num_histogram: Histogram,
Expand All @@ -193,8 +192,8 @@ pub struct Metrics {
impl Default for Metrics {
fn default() -> Self {
Self {
maybe_table_name: DEFAULT_METRICS_KEY.to_string(),
stats: Arc::new(AtomicTableStats::default()),
maybe_table_level_metrics: Arc::new(MaybeTableLevelMetrics::new(DEFAULT_METRICS_KEY)),
compaction_input_sst_size_histogram: TABLE_COMPACTION_SST_SIZE_HISTOGRAM
.with_label_values(&["input"]),
compaction_output_sst_size_histogram: TABLE_COMPACTION_SST_SIZE_HISTOGRAM
Expand Down Expand Up @@ -290,16 +289,15 @@ impl<'a> MetricsContext<'a> {
impl Metrics {
pub fn new(mut metric_ctx: MetricsContext) -> Self {
Self {
maybe_table_level_metrics: Arc::new(MaybeTableLevelMetrics::new(
metric_ctx.maybe_table_name(),
)),
maybe_table_name: metric_ctx.maybe_table_name().to_string(),
..Default::default()
}
}

/// Generate a table-level metric observer.
#[inline]
pub fn maybe_table_level_metrics(&self) -> Arc<MaybeTableLevelMetrics> {
self.maybe_table_level_metrics.clone()
Arc::new(MaybeTableLevelMetrics::new(&self.maybe_table_name))
}

#[inline]
Expand Down
4 changes: 2 additions & 2 deletions components/parquet_ext/src/meta_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use parquet::{
file::{footer, metadata::ParquetMetaData},
};

use crate::reader::ObjectStoreReader;
use crate::reader::{NoopMetricsObserver, ObjectStoreReader};

#[async_trait]
pub trait ChunkReader: Sync + Send {
Expand Down Expand Up @@ -86,7 +86,7 @@ pub async fn fetch_parquet_metadata(
/// TODO: Currently there is no method to build page indexes for meta data in
/// `parquet`, maybe we can write a issue in `arrow-rs` .
pub async fn meta_with_page_indexes(
object_store_reader: ObjectStoreReader,
object_store_reader: ObjectStoreReader<NoopMetricsObserver>,
) -> Result<Arc<ParquetMetaData>> {
let read_options = ArrowReaderOptions::new().with_page_index(true);
let builder =
Expand Down
80 changes: 57 additions & 23 deletions components/parquet_ext/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,60 +12,94 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{ops::Range, sync::Arc, time::Instant};
use std::{
ops::Range,
sync::Arc,
time::{Duration, Instant},
};

use bytes::Bytes;
use futures::{
future::{BoxFuture, FutureExt},
TryFutureExt,
};
use logger::debug;
use object_store::{ObjectStoreRef, Path};
use parquet::{arrow::async_reader::AsyncFileReader, file::metadata::ParquetMetaData};

/// Implemention AsyncFileReader based on `ObjectStore`
///
/// TODO: Perhaps we should avoid importing `object_store` in `parquet_ext` to
/// keep the crate `parquet_ext` more pure.
/// The observer for metrics of [ObjectStoreReader].
pub trait MetricsObserver: Send {
fn elapsed(&self, path: &Path, elapsed: Duration);
fn num_bytes_fetched(&self, path: &Path, num_bytes: usize);
}

#[derive(Debug, Clone)]
pub struct NoopMetricsObserver;

impl MetricsObserver for NoopMetricsObserver {
fn elapsed(&self, _: &Path, _: Duration) {}

fn num_bytes_fetched(&self, _: &Path, _: usize) {}
}

/// The implementation based on `ObjectStore` for [`AsyncFileReader`].
#[derive(Clone)]
pub struct ObjectStoreReader {
pub struct ObjectStoreReader<T: MetricsObserver> {
storage: ObjectStoreRef,
path: Path,
meta_data: Arc<ParquetMetaData>,
begin: Instant,
metrics: T,
}

impl ObjectStoreReader {
impl ObjectStoreReader<NoopMetricsObserver> {
pub fn new(storage: ObjectStoreRef, path: Path, meta_data: Arc<ParquetMetaData>) -> Self {
Self::with_metrics(storage, path, meta_data, NoopMetricsObserver)
}
}

impl<T: MetricsObserver> ObjectStoreReader<T> {
pub fn with_metrics(
storage: ObjectStoreRef,
path: Path,
meta_data: Arc<ParquetMetaData>,
metrics: T,
) -> Self {
Self {
storage,
path,
meta_data,
begin: Instant::now(),
metrics,
}
}
}

impl Drop for ObjectStoreReader {
impl<T: MetricsObserver> Drop for ObjectStoreReader<T> {
fn drop(&mut self) {
debug!(
"ObjectStoreReader dropped, path:{}, elapsed:{:?}",
&self.path,
self.begin.elapsed()
);
self.metrics.elapsed(&self.path, self.begin.elapsed())
}
}

impl AsyncFileReader for ObjectStoreReader {
impl<T: MetricsObserver> AsyncFileReader for ObjectStoreReader<T> {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
self.storage
.get_range(&self.path, range)
.map_err(|e| {
parquet::errors::ParquetError::General(format!(
"Failed to fetch range from object store, err:{e}"
))
})
.boxed()
async move {
let get_res = self
.storage
.get_range(&self.path, range)
.map_err(|e| {
parquet::errors::ParquetError::General(format!(
"Failed to fetch range from object store, err:{e}"
))
})
.await;

if let Ok(bytes) = &get_res {
self.metrics.num_bytes_fetched(&self.path, bytes.len());
}

get_res
}
.boxed()
}

fn get_byte_ranges(
Expand Down
Loading