diff --git a/analytic_engine/src/instance/read.rs b/analytic_engine/src/instance/read.rs index 77adc112c5..18195a966d 100644 --- a/analytic_engine/src/instance/read.rs +++ b/analytic_engine/src/instance/read.rs @@ -104,6 +104,7 @@ impl Instance { request.metrics_collector.collect(Metric::boolean( MERGE_SORT_METRIC_NAME.to_string(), need_merge_sort, + None, )); if need_merge_sort { @@ -216,6 +217,7 @@ impl Instance { request.metrics_collector.collect(Metric::number( ITER_NUM_METRIC_NAME.to_string(), iters.len(), + None, )); Ok(iters) diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index b2181f727b..3092ccd3b2 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -153,10 +153,10 @@ impl<'a> Reader<'a> { .into_iter() .map(|stream| { Box::new(RecordBatchProjector::new( - self.path.to_string(), stream, row_projector.clone(), sst_meta_data.clone(), + self.metrics.metrics_collector.clone(), )) as _ }) .collect(); @@ -448,45 +448,49 @@ impl<'a> ChunkReader for ChunkReaderAdapter<'a> { } } +#[derive(Default, Debug, Clone, TraceMetricWhenDrop)] +pub(crate) struct ProjectorMetrics { + #[metric(number, sum)] + pub row_num: usize, + #[metric(number, sum)] + pub row_mem: usize, + #[metric(duration, sum)] + pub project_record_batch: Duration, + #[metric(collector)] + pub metrics_collector: Option, +} + struct RecordBatchProjector { - path: String, stream: SendableRecordBatchStream, row_projector: ArrowRecordBatchProjector, - row_num: usize, + metrics: ProjectorMetrics, start_time: Instant, sst_meta: ParquetMetaDataRef, } impl RecordBatchProjector { fn new( - path: String, stream: SendableRecordBatchStream, row_projector: ArrowRecordBatchProjector, sst_meta: ParquetMetaDataRef, + metrics_collector: Option, ) -> Self { + let metrics = ProjectorMetrics { + metrics_collector, + ..Default::default() + }; + Self { - path, stream, row_projector, - row_num: 0, + metrics, start_time: Instant::now(), sst_meta, } } } -impl Drop for RecordBatchProjector { - fn drop(&mut self) { - debug!( - "RecordBatchProjector dropped, path:{} rows:{}, cost:{}ms.", - self.path, - self.row_num, - self.start_time.saturating_elapsed().as_millis(), - ); - } -} - impl Stream for RecordBatchProjector { type Item = Result; @@ -505,7 +509,10 @@ impl Stream for RecordBatchProjector { .box_err() .context(DecodeRecordBatch)?; - projector.row_num += record_batch.num_rows(); + for col in record_batch.columns() { + projector.metrics.row_mem += col.get_array_memory_size(); + } + projector.metrics.row_num += record_batch.num_rows(); let projected_batch = projector .row_projector @@ -518,7 +525,10 @@ impl Stream for RecordBatchProjector { } } Poll::Pending => Poll::Pending, - Poll::Ready(None) => Poll::Ready(None), + Poll::Ready(None) => { + projector.metrics.project_record_batch += projector.start_time.saturating_elapsed(); + Poll::Ready(None) + } } } diff --git a/components/trace_metric/src/collector.rs b/components/trace_metric/src/collector.rs index a83d967d23..f91a6dfc92 100644 --- a/components/trace_metric/src/collector.rs +++ b/components/trace_metric/src/collector.rs @@ -1,8 +1,11 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -use std::sync::{Arc, Mutex}; +use std::{ + collections::BTreeMap, + sync::{Arc, Mutex}, +}; -use crate::metric::Metric; +use crate::metric::{Metric, MetricAggregator}; /// A collector for metrics of a single read request. /// @@ -46,8 +49,36 @@ impl MetricsCollector { /// Calls a closure on each top-level metrics of this collector. pub fn for_each_metric(&self, f: &mut impl FnMut(&Metric)) { let metrics = self.metrics.lock().unwrap(); + + let mut metrics_by_name = BTreeMap::new(); for metric in metrics.iter() { - f(metric); + metrics_by_name + .entry(metric.name()) + .or_insert_with(Vec::new) + .push(metric); + } + + for metrics in metrics_by_name.values() { + if metrics.is_empty() { + continue; + } + + if let Some(op) = metrics[0].aggregator() { + match op { + MetricAggregator::Sum => { + let mut first = metrics[0].clone(); + for m in &metrics[1..] { + first.sum(m); + } + // only apply fn to first metric. + f(&first); + } + } + } else { + for metric in metrics { + f(metric); + } + } } } @@ -111,23 +142,25 @@ mod tests { #[test] fn test_metrics_collector() { let collector = MetricsCollector::new("root".to_string()); - collector.collect(Metric::number("counter".to_string(), 1)); + collector.collect(Metric::number("counter".to_string(), 1, None)); collector.collect(Metric::duration( "elapsed".to_string(), Duration::from_millis(100), + None, )); let child_1_0 = collector.span("child_1_0".to_string()); - child_1_0.collect(Metric::boolean("boolean".to_string(), false)); + child_1_0.collect(Metric::boolean("boolean".to_string(), false, None)); let child_2_0 = child_1_0.span("child_2_0".to_string()); - child_2_0.collect(Metric::number("counter".to_string(), 1)); + child_2_0.collect(Metric::number("counter".to_string(), 1, None)); child_2_0.collect(Metric::duration( "elapsed".to_string(), Duration::from_millis(100), + None, )); let child_1_1 = collector.span("child_1_1".to_string()); - child_1_1.collect(Metric::boolean("boolean".to_string(), false)); + child_1_1.collect(Metric::boolean("boolean".to_string(), false, None)); let _child_1_2 = collector.span("child_1_2".to_string()); let mut visitor = FormatCollectorVisitor::default(); diff --git a/components/trace_metric/src/metric.rs b/components/trace_metric/src/metric.rs index cd9e95dffd..8c0f924c2c 100644 --- a/components/trace_metric/src/metric.rs +++ b/components/trace_metric/src/metric.rs @@ -2,10 +2,16 @@ use std::{fmt, time::Duration}; +#[derive(Clone)] +pub enum MetricAggregator { + Sum, +} + #[derive(Clone)] pub struct MetricValue { pub name: String, pub val: T, + pub aggregator: Option, } #[derive(Clone)] @@ -17,18 +23,62 @@ pub enum Metric { impl Metric { #[inline] - pub fn number(name: String, val: usize) -> Self { - Metric::Number(MetricValue { name, val }) + pub fn number(name: String, val: usize, aggregator: Option) -> Self { + Metric::Number(MetricValue { + name, + val, + aggregator, + }) } #[inline] - pub fn duration(name: String, val: Duration) -> Self { - Metric::Duration(MetricValue { name, val }) + pub fn duration(name: String, val: Duration, aggregator: Option) -> Self { + Metric::Duration(MetricValue { + name, + val, + aggregator, + }) } #[inline] - pub fn boolean(name: String, val: bool) -> Self { - Metric::Boolean(MetricValue { name, val }) + pub fn boolean(name: String, val: bool, aggregator: Option) -> Self { + Metric::Boolean(MetricValue { + name, + val, + aggregator, + }) + } + + #[inline] + pub fn name(&self) -> &str { + match self { + Self::Boolean(v) => &v.name, + Self::Number(v) => &v.name, + Self::Duration(v) => &v.name, + } + } + + #[inline] + pub fn aggregator(&self) -> &Option { + match self { + Self::Boolean(v) => &v.aggregator, + Self::Number(v) => &v.aggregator, + Self::Duration(v) => &v.aggregator, + } + } + + // Sum metric values together when metrics are same type, + // Panic if their types are different. + #[inline] + pub fn sum(&mut self, rhs: &Self) { + match (self, rhs) { + (Self::Boolean(lhs), Self::Boolean(rhs)) => lhs.val |= rhs.val, + (Self::Number(lhs), Self::Number(rhs)) => lhs.val += rhs.val, + (Self::Duration(lhs), Self::Duration(rhs)) => lhs.val += rhs.val, + (lhs, rhs) => { + panic!("Only same type metric could be applied, lhs:{lhs:?}, rhs:{rhs:?}") + } + } } } diff --git a/components/trace_metric_derive/src/builder.rs b/components/trace_metric_derive/src/builder.rs index 1a96fc65d7..1e0ea3392e 100644 --- a/components/trace_metric_derive/src/builder.rs +++ b/components/trace_metric_derive/src/builder.rs @@ -1,36 +1,99 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. use proc_macro::TokenStream; -use quote::{quote, ToTokens}; +use proc_macro2::Span; +use quote::{quote, ToTokens, TokenStreamExt}; use syn::{DeriveInput, Field, Generics, Ident}; const COLLECTOR_FIELD_TOKENS: &str = "(collector)"; -const NUMBER_FIELD_TOKENS: &str = "(number)"; -const DURATION_FIELD_TOKENS: &str = "(duration)"; -const BOOLEAN_FIELD_TOKENS: &str = "(boolean)"; +const NUMBER_FIELD_TOKENS: &str = "number"; +const DURATION_FIELD_TOKENS: &str = "duration"; +const BOOLEAN_FIELD_TOKENS: &str = "boolean"; +#[derive(Debug, Clone)] +enum MetricAggregator { + Sum, +} + +impl ToTokens for MetricAggregator { + fn to_tokens(&self, tokens: &mut proc_macro2::TokenStream) { + tokens.append(Ident::new(&format!("{self:?}"), Span::call_site())); + } +} + +#[derive(Debug)] enum MetricType { Number, Duration, Boolean, } -impl MetricType { - fn try_from_tokens(s: &str) -> Option { +impl ToTokens for MetricType { + fn to_tokens(&self, tokens: &mut proc_macro2::TokenStream) { + tokens.append(Ident::new( + &format!("{self:?}").to_lowercase(), + Span::call_site(), + )); + } +} + +struct MetricMetadata { + typ: MetricType, + aggregator: Option, +} + +impl MetricMetadata { + fn parse_aggregator(s: &str) -> Option { + match s.to_lowercase().as_str() { + "sum" => Some(MetricAggregator::Sum), + _ => None, + } + } + + fn parse_type(s: &str) -> Option { if s == NUMBER_FIELD_TOKENS { - Some(Self::Number) + Some(MetricType::Number) } else if s == DURATION_FIELD_TOKENS { - Some(Self::Duration) + Some(MetricType::Duration) } else if s == BOOLEAN_FIELD_TOKENS { - Some(Self::Boolean) + Some(MetricType::Boolean) } else { None } } + + fn try_from_tokens(tokens: &proc_macro2::TokenStream) -> Option { + for tree in tokens.clone().into_iter() { + if let proc_macro2::TokenTree::Group(group) = tree { + let trees = group.stream().into_iter().collect::>(); + match trees.len() { + // #[metric(number)] + 1 => { + return Self::parse_type(&trees[0].to_string()).map(|typ| Self { + typ, + aggregator: None, + }) + } + // #[metric(number, add)] + 3 => { + let typ = Self::parse_type(&trees[0].to_string())?; + let aggregator = Self::parse_aggregator(&trees[2].to_string())?; + return Some(Self { + typ, + aggregator: Some(aggregator), + }); + } + _ => return None, + } + } + } + + None + } } struct MetricField { - metric_type: MetricType, + metric_metadata: MetricMetadata, field_name: Ident, } @@ -42,11 +105,10 @@ impl MetricField { } let field_name = field.ident.expect("Metric field must have a name"); - let metric_type_tokens = attr.tokens.to_string(); - let metric_type = - MetricType::try_from_tokens(&metric_type_tokens).expect("Unknown metric type"); + let metric_metadata = + MetricMetadata::try_from_tokens(&attr.tokens).expect("Unknown metric type"); return Some(Self { - metric_type, + metric_metadata, field_name, }); } @@ -124,17 +186,21 @@ impl Builder { let mut collect_statements = Vec::with_capacity(self.metric_fields.len()); for metric_field in self.metric_fields.iter() { let field_name = &metric_field.field_name; - let metric = match metric_field.metric_type { - MetricType::Number => { - quote! { ::trace_metric::Metric::number(stringify!(#field_name).to_string(), self.#field_name) } - } - MetricType::Duration => { - quote! { ::trace_metric::Metric::duration(stringify!(#field_name).to_string(), self.#field_name) } + let metadata = &metric_field.metric_metadata; + let aggregator = &metadata.aggregator; + let metric_type = &metadata.typ; + let metric = if let Some(aggregator) = aggregator { + quote! { ::trace_metric::Metric::#metric_type(stringify!(#field_name).to_string(), + self.#field_name, + Some(::trace_metric::metric::MetricAggregator::#aggregator)) } - MetricType::Boolean => { - quote! { ::trace_metric::Metric::boolean(stringify!(#field_name).to_string(), self.#field_name) } + } else { + quote! { ::trace_metric::Metric::#metric_type(stringify!(#field_name).to_string(), + self.#field_name, + None) } }; + let statement = quote! { collector.collect(#metric); }; diff --git a/components/trace_metric_derive_tests/src/lib.rs b/components/trace_metric_derive_tests/src/lib.rs index 5b1a15ade6..35821cb6c8 100644 --- a/components/trace_metric_derive_tests/src/lib.rs +++ b/components/trace_metric_derive_tests/src/lib.rs @@ -6,7 +6,7 @@ use trace_metric::{MetricsCollector, TraceMetricWhenDrop}; #[derive(Debug, Clone, TraceMetricWhenDrop)] pub struct ExampleMetrics { - #[metric(number)] + #[metric(number, sum)] pub counter: usize, #[metric(duration)] pub elapsed: Duration, @@ -32,18 +32,28 @@ mod test { counter: 1, elapsed: Duration::from_secs(1), boolean: true, + foo: "foor".to_owned(), + collector: collector.clone(), + }; + let _ = ExampleMetrics { + counter: 10, + elapsed: Duration::from_secs(2), + boolean: false, foo: "bar".to_owned(), collector: collector.clone(), }; } let mut formatter = FormatCollectorVisitor::default(); collector.visit(&mut formatter); - let expect_output = r#"test: - counter=1 - elapsed=1s + let actual = formatter.into_string(); + + let expected = r#"test: boolean=true + boolean=false + counter=11 + elapsed=1s + elapsed=2s "#; - - assert_eq!(expect_output, &formatter.into_string()); + assert_eq!(expected, actual); } }