Skip to content

Commit

Permalink
feat: add record batch mem stats (#1058)
Browse files Browse the repository at this point in the history
## Rationale
When some metrics contains more than one subtask, it's helpful to
collect them together for output.

## Detailed Changes
- Update metric macro, add metric operator.

## Test Plan

Update `basic` UT to test this. This is what I get in my test env:
```
# Before
        scan_sst_6655:                                                                                                                                                                            
            project_record_batch=847.03µs                                                                                                                                                         
            project_record_batch=1.182922ms                                                                                                                                                       
            project_record_batch=1.208636ms                                                                                                                                                       
            project_record_batch=1.246589ms                                                                                                                                                       
            project_record_batch=1.296161ms                                                                                                                                                       
            project_record_batch=1.323357ms                                                                                                                                                       
            project_record_batch=1.4293ms                                                                                                                                                                      

# After
        scan_sst_6655:                                                                                                                                                                            
            project_record_batch=10.170503ms 

```

---------

Co-authored-by: WEI Xikai <ShiKaiWi@users.noreply.github.com>
  • Loading branch information
jiacai2050 and ShiKaiWi authored Jul 11, 2023
1 parent 6478a83 commit aa00ef2
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 60 deletions.
2 changes: 2 additions & 0 deletions analytic_engine/src/instance/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ impl Instance {
request.metrics_collector.collect(Metric::boolean(
MERGE_SORT_METRIC_NAME.to_string(),
need_merge_sort,
None,
));

if need_merge_sort {
Expand Down Expand Up @@ -216,6 +217,7 @@ impl Instance {
request.metrics_collector.collect(Metric::number(
ITER_NUM_METRIC_NAME.to_string(),
iters.len(),
None,
));

Ok(iters)
Expand Down
48 changes: 29 additions & 19 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ impl<'a> Reader<'a> {
.into_iter()
.map(|stream| {
Box::new(RecordBatchProjector::new(
self.path.to_string(),
stream,
row_projector.clone(),
sst_meta_data.clone(),
self.metrics.metrics_collector.clone(),
)) as _
})
.collect();
Expand Down Expand Up @@ -448,45 +448,49 @@ impl<'a> ChunkReader for ChunkReaderAdapter<'a> {
}
}

#[derive(Default, Debug, Clone, TraceMetricWhenDrop)]
pub(crate) struct ProjectorMetrics {
#[metric(number, sum)]
pub row_num: usize,
#[metric(number, sum)]
pub row_mem: usize,
#[metric(duration, sum)]
pub project_record_batch: Duration,
#[metric(collector)]
pub metrics_collector: Option<MetricsCollector>,
}

struct RecordBatchProjector {
path: String,
stream: SendableRecordBatchStream,
row_projector: ArrowRecordBatchProjector,

row_num: usize,
metrics: ProjectorMetrics,
start_time: Instant,
sst_meta: ParquetMetaDataRef,
}

impl RecordBatchProjector {
fn new(
path: String,
stream: SendableRecordBatchStream,
row_projector: ArrowRecordBatchProjector,
sst_meta: ParquetMetaDataRef,
metrics_collector: Option<MetricsCollector>,
) -> Self {
let metrics = ProjectorMetrics {
metrics_collector,
..Default::default()
};

Self {
path,
stream,
row_projector,
row_num: 0,
metrics,
start_time: Instant::now(),
sst_meta,
}
}
}

impl Drop for RecordBatchProjector {
fn drop(&mut self) {
debug!(
"RecordBatchProjector dropped, path:{} rows:{}, cost:{}ms.",
self.path,
self.row_num,
self.start_time.saturating_elapsed().as_millis(),
);
}
}

impl Stream for RecordBatchProjector {
type Item = Result<RecordBatchWithKey>;

Expand All @@ -505,7 +509,10 @@ impl Stream for RecordBatchProjector {
.box_err()
.context(DecodeRecordBatch)?;

projector.row_num += record_batch.num_rows();
for col in record_batch.columns() {
projector.metrics.row_mem += col.get_array_memory_size();
}
projector.metrics.row_num += record_batch.num_rows();

let projected_batch = projector
.row_projector
Expand All @@ -518,7 +525,10 @@ impl Stream for RecordBatchProjector {
}
}
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(None) => {
projector.metrics.project_record_batch += projector.start_time.saturating_elapsed();
Poll::Ready(None)
}
}
}

Expand Down
47 changes: 40 additions & 7 deletions components/trace_metric/src/collector.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

use std::sync::{Arc, Mutex};
use std::{
collections::BTreeMap,
sync::{Arc, Mutex},
};

use crate::metric::Metric;
use crate::metric::{Metric, MetricAggregator};

/// A collector for metrics of a single read request.
///
Expand Down Expand Up @@ -46,8 +49,36 @@ impl MetricsCollector {
/// Calls a closure on each top-level metrics of this collector.
pub fn for_each_metric(&self, f: &mut impl FnMut(&Metric)) {
let metrics = self.metrics.lock().unwrap();

let mut metrics_by_name = BTreeMap::new();
for metric in metrics.iter() {
f(metric);
metrics_by_name
.entry(metric.name())
.or_insert_with(Vec::new)
.push(metric);
}

for metrics in metrics_by_name.values() {
if metrics.is_empty() {
continue;
}

if let Some(op) = metrics[0].aggregator() {
match op {
MetricAggregator::Sum => {
let mut first = metrics[0].clone();
for m in &metrics[1..] {
first.sum(m);
}
// only apply fn to first metric.
f(&first);
}
}
} else {
for metric in metrics {
f(metric);
}
}
}
}

Expand Down Expand Up @@ -111,23 +142,25 @@ mod tests {
#[test]
fn test_metrics_collector() {
let collector = MetricsCollector::new("root".to_string());
collector.collect(Metric::number("counter".to_string(), 1));
collector.collect(Metric::number("counter".to_string(), 1, None));
collector.collect(Metric::duration(
"elapsed".to_string(),
Duration::from_millis(100),
None,
));
let child_1_0 = collector.span("child_1_0".to_string());
child_1_0.collect(Metric::boolean("boolean".to_string(), false));
child_1_0.collect(Metric::boolean("boolean".to_string(), false, None));

let child_2_0 = child_1_0.span("child_2_0".to_string());
child_2_0.collect(Metric::number("counter".to_string(), 1));
child_2_0.collect(Metric::number("counter".to_string(), 1, None));
child_2_0.collect(Metric::duration(
"elapsed".to_string(),
Duration::from_millis(100),
None,
));

let child_1_1 = collector.span("child_1_1".to_string());
child_1_1.collect(Metric::boolean("boolean".to_string(), false));
child_1_1.collect(Metric::boolean("boolean".to_string(), false, None));
let _child_1_2 = collector.span("child_1_2".to_string());

let mut visitor = FormatCollectorVisitor::default();
Expand Down
62 changes: 56 additions & 6 deletions components/trace_metric/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@

use std::{fmt, time::Duration};

#[derive(Clone)]
pub enum MetricAggregator {
Sum,
}

#[derive(Clone)]
pub struct MetricValue<T: Clone + fmt::Debug> {
pub name: String,
pub val: T,
pub aggregator: Option<MetricAggregator>,
}

#[derive(Clone)]
Expand All @@ -17,18 +23,62 @@ pub enum Metric {

impl Metric {
#[inline]
pub fn number(name: String, val: usize) -> Self {
Metric::Number(MetricValue { name, val })
pub fn number(name: String, val: usize, aggregator: Option<MetricAggregator>) -> Self {
Metric::Number(MetricValue {
name,
val,
aggregator,
})
}

#[inline]
pub fn duration(name: String, val: Duration) -> Self {
Metric::Duration(MetricValue { name, val })
pub fn duration(name: String, val: Duration, aggregator: Option<MetricAggregator>) -> Self {
Metric::Duration(MetricValue {
name,
val,
aggregator,
})
}

#[inline]
pub fn boolean(name: String, val: bool) -> Self {
Metric::Boolean(MetricValue { name, val })
pub fn boolean(name: String, val: bool, aggregator: Option<MetricAggregator>) -> Self {
Metric::Boolean(MetricValue {
name,
val,
aggregator,
})
}

#[inline]
pub fn name(&self) -> &str {
match self {
Self::Boolean(v) => &v.name,
Self::Number(v) => &v.name,
Self::Duration(v) => &v.name,
}
}

#[inline]
pub fn aggregator(&self) -> &Option<MetricAggregator> {
match self {
Self::Boolean(v) => &v.aggregator,
Self::Number(v) => &v.aggregator,
Self::Duration(v) => &v.aggregator,
}
}

// Sum metric values together when metrics are same type,
// Panic if their types are different.
#[inline]
pub fn sum(&mut self, rhs: &Self) {
match (self, rhs) {
(Self::Boolean(lhs), Self::Boolean(rhs)) => lhs.val |= rhs.val,
(Self::Number(lhs), Self::Number(rhs)) => lhs.val += rhs.val,
(Self::Duration(lhs), Self::Duration(rhs)) => lhs.val += rhs.val,
(lhs, rhs) => {
panic!("Only same type metric could be applied, lhs:{lhs:?}, rhs:{rhs:?}")
}
}
}
}

Expand Down
Loading

0 comments on commit aa00ef2

Please sign in to comment.