Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add record batch mem stats #1058

Merged
merged 11 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions analytic_engine/src/instance/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ impl Instance {
request.metrics_collector.collect(Metric::boolean(
MERGE_SORT_METRIC_NAME.to_string(),
need_merge_sort,
None,
));

if need_merge_sort {
Expand Down Expand Up @@ -216,6 +217,7 @@ impl Instance {
request.metrics_collector.collect(Metric::number(
ITER_NUM_METRIC_NAME.to_string(),
iters.len(),
None,
));

Ok(iters)
Expand Down
48 changes: 29 additions & 19 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ impl<'a> Reader<'a> {
.into_iter()
.map(|stream| {
Box::new(RecordBatchProjector::new(
self.path.to_string(),
stream,
row_projector.clone(),
sst_meta_data.clone(),
self.metrics.metrics_collector.clone(),
)) as _
})
.collect();
Expand Down Expand Up @@ -448,45 +448,49 @@ impl<'a> ChunkReader for ChunkReaderAdapter<'a> {
}
}

#[derive(Default, Debug, Clone, TraceMetricWhenDrop)]
pub(crate) struct ProjectorMetrics {
#[metric(number, sum)]
pub row_num: usize,
#[metric(number, sum)]
pub row_mem: usize,
#[metric(duration, sum)]
pub project_record_batch: Duration,
#[metric(collector)]
pub metrics_collector: Option<MetricsCollector>,
}

struct RecordBatchProjector {
path: String,
stream: SendableRecordBatchStream,
row_projector: ArrowRecordBatchProjector,

row_num: usize,
metrics: ProjectorMetrics,
start_time: Instant,
sst_meta: ParquetMetaDataRef,
}

impl RecordBatchProjector {
fn new(
path: String,
stream: SendableRecordBatchStream,
row_projector: ArrowRecordBatchProjector,
sst_meta: ParquetMetaDataRef,
metrics_collector: Option<MetricsCollector>,
) -> Self {
let metrics = ProjectorMetrics {
metrics_collector,
..Default::default()
};

Self {
path,
stream,
row_projector,
row_num: 0,
metrics,
start_time: Instant::now(),
sst_meta,
}
}
}

impl Drop for RecordBatchProjector {
fn drop(&mut self) {
debug!(
"RecordBatchProjector dropped, path:{} rows:{}, cost:{}ms.",
self.path,
self.row_num,
self.start_time.saturating_elapsed().as_millis(),
);
}
}

impl Stream for RecordBatchProjector {
type Item = Result<RecordBatchWithKey>;

Expand All @@ -505,7 +509,10 @@ impl Stream for RecordBatchProjector {
.box_err()
.context(DecodeRecordBatch)?;

projector.row_num += record_batch.num_rows();
for col in record_batch.columns() {
projector.metrics.row_mem += col.get_array_memory_size();
}
projector.metrics.row_num += record_batch.num_rows();

let projected_batch = projector
.row_projector
Expand All @@ -518,7 +525,10 @@ impl Stream for RecordBatchProjector {
}
}
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(None) => {
projector.metrics.project_record_batch += projector.start_time.saturating_elapsed();
Poll::Ready(None)
}
}
}

Expand Down
47 changes: 40 additions & 7 deletions components/trace_metric/src/collector.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

use std::sync::{Arc, Mutex};
use std::{
collections::BTreeMap,
sync::{Arc, Mutex},
};

use crate::metric::Metric;
use crate::metric::{Metric, MetricAggregator};

/// A collector for metrics of a single read request.
///
Expand Down Expand Up @@ -46,8 +49,36 @@ impl MetricsCollector {
/// Calls a closure on each top-level metrics of this collector.
pub fn for_each_metric(&self, f: &mut impl FnMut(&Metric)) {
let metrics = self.metrics.lock().unwrap();

let mut metrics_by_name = BTreeMap::new();
for metric in metrics.iter() {
f(metric);
metrics_by_name
.entry(metric.name())
.or_insert_with(Vec::new)
.push(metric);
}

for metrics in metrics_by_name.values() {
if metrics.is_empty() {
continue;
}

if let Some(op) = metrics[0].aggregator() {
match op {
MetricAggregator::Sum => {
let mut first = metrics[0].clone();
for m in &metrics[1..] {
first.sum(m);
}
// only apply fn to first metric.
f(&first);
}
}
} else {
for metric in metrics {
f(metric);
}
}
}
}

Expand Down Expand Up @@ -111,23 +142,25 @@ mod tests {
#[test]
fn test_metrics_collector() {
let collector = MetricsCollector::new("root".to_string());
collector.collect(Metric::number("counter".to_string(), 1));
collector.collect(Metric::number("counter".to_string(), 1, None));
collector.collect(Metric::duration(
"elapsed".to_string(),
Duration::from_millis(100),
None,
));
let child_1_0 = collector.span("child_1_0".to_string());
child_1_0.collect(Metric::boolean("boolean".to_string(), false));
child_1_0.collect(Metric::boolean("boolean".to_string(), false, None));

let child_2_0 = child_1_0.span("child_2_0".to_string());
child_2_0.collect(Metric::number("counter".to_string(), 1));
child_2_0.collect(Metric::number("counter".to_string(), 1, None));
child_2_0.collect(Metric::duration(
"elapsed".to_string(),
Duration::from_millis(100),
None,
));

let child_1_1 = collector.span("child_1_1".to_string());
child_1_1.collect(Metric::boolean("boolean".to_string(), false));
child_1_1.collect(Metric::boolean("boolean".to_string(), false, None));
let _child_1_2 = collector.span("child_1_2".to_string());

let mut visitor = FormatCollectorVisitor::default();
Expand Down
62 changes: 56 additions & 6 deletions components/trace_metric/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@

use std::{fmt, time::Duration};

#[derive(Clone)]
pub enum MetricAggregator {
Sum,
}

#[derive(Clone)]
pub struct MetricValue<T: Clone + fmt::Debug> {
pub name: String,
pub val: T,
pub aggregator: Option<MetricAggregator>,
}

#[derive(Clone)]
Expand All @@ -17,18 +23,62 @@ pub enum Metric {

impl Metric {
#[inline]
pub fn number(name: String, val: usize) -> Self {
Metric::Number(MetricValue { name, val })
pub fn number(name: String, val: usize, aggregator: Option<MetricAggregator>) -> Self {
Metric::Number(MetricValue {
name,
val,
aggregator,
})
}

#[inline]
pub fn duration(name: String, val: Duration) -> Self {
Metric::Duration(MetricValue { name, val })
pub fn duration(name: String, val: Duration, aggregator: Option<MetricAggregator>) -> Self {
Metric::Duration(MetricValue {
name,
val,
aggregator,
})
}

#[inline]
pub fn boolean(name: String, val: bool) -> Self {
Metric::Boolean(MetricValue { name, val })
pub fn boolean(name: String, val: bool, aggregator: Option<MetricAggregator>) -> Self {
Metric::Boolean(MetricValue {
name,
val,
aggregator,
})
}

#[inline]
pub fn name(&self) -> &str {
match self {
Self::Boolean(v) => &v.name,
Self::Number(v) => &v.name,
Self::Duration(v) => &v.name,
}
}

#[inline]
pub fn aggregator(&self) -> &Option<MetricAggregator> {
match self {
Self::Boolean(v) => &v.aggregator,
Self::Number(v) => &v.aggregator,
Self::Duration(v) => &v.aggregator,
}
}

// Sum metric values together when metrics are same type,
// Panic if their types are different.
#[inline]
pub fn sum(&mut self, rhs: &Self) {
match (self, rhs) {
(Self::Boolean(lhs), Self::Boolean(rhs)) => lhs.val |= rhs.val,
(Self::Number(lhs), Self::Number(rhs)) => lhs.val += rhs.val,
(Self::Duration(lhs), Self::Duration(rhs)) => lhs.val += rhs.val,
(lhs, rhs) => {
panic!("Only same type metric could be applied, lhs:{lhs:?}, rhs:{rhs:?}")
}
}
}
}

Expand Down
Loading