From 1db59e07213393e937b12e60cceff8552a69da44 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Fri, 7 Jul 2023 17:52:06 +0800 Subject: [PATCH 01/11] metric support operator --- analytic_engine/src/instance/read.rs | 2 + .../src/sst/parquet/async_reader.rs | 86 ++++++++++++----- components/trace_metric/src/metric.rs | 18 ++-- components/trace_metric_derive/src/builder.rs | 94 +++++++++++++++---- 4 files changed, 152 insertions(+), 48 deletions(-) diff --git a/analytic_engine/src/instance/read.rs b/analytic_engine/src/instance/read.rs index 77adc112c5..18195a966d 100644 --- a/analytic_engine/src/instance/read.rs +++ b/analytic_engine/src/instance/read.rs @@ -104,6 +104,7 @@ impl Instance { request.metrics_collector.collect(Metric::boolean( MERGE_SORT_METRIC_NAME.to_string(), need_merge_sort, + None, )); if need_merge_sort { @@ -216,6 +217,7 @@ impl Instance { request.metrics_collector.collect(Metric::number( ITER_NUM_METRIC_NAME.to_string(), iters.len(), + None, )); Ok(iters) diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index b2181f727b..5988dd14b4 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -5,7 +5,10 @@ use std::{ ops::Range, pin::Pin, - sync::Arc, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, task::{Context, Poll}, time::{Duration, Instant}, }; @@ -128,6 +131,7 @@ impl<'a> Reader<'a> { async fn maybe_read_parallelly( &mut self, read_parallelism: usize, + row_mem: Arc, ) -> Result> + Send + Unpin>>> { assert!(read_parallelism > 0); @@ -153,10 +157,11 @@ impl<'a> Reader<'a> { .into_iter() .map(|stream| { Box::new(RecordBatchProjector::new( - self.path.to_string(), stream, row_projector.clone(), sst_meta_data.clone(), + self.metrics.metrics_collector.clone(), + row_mem.clone(), )) as _ }) .collect(); @@ -370,7 +375,8 @@ impl<'a> Reader<'a> { file_path: self.path.to_string(), })?; - MetaData::try_new(&parquet_meta_data, ignore_sst_filter) + //TODO: unstore origin value + MetaData::try_new(&parquet_meta_data, true) .box_err() .context(DecodeSstMeta) } @@ -422,7 +428,7 @@ impl<'a> Reader<'a> { impl<'a> Drop for Reader<'a> { fn drop(&mut self) { - debug!( + log::info!( "Parquet reader dropped, path:{:?}, df_plan_metrics:{}", self.path, self.df_plan_metrics.clone_inner().to_string() @@ -448,45 +454,52 @@ impl<'a> ChunkReader for ChunkReaderAdapter<'a> { } } +#[derive(Default, Debug, Clone, TraceMetricWhenDrop)] +pub(crate) struct ProjectorMetrics { + #[metric(number, add)] + pub row_num: usize, + #[metric(number)] + pub row_mem: usize, + #[metric(duration)] + pub project_record_batch: Duration, + #[metric(collector)] + pub metrics_collector: Option, +} + struct RecordBatchProjector { - path: String, stream: SendableRecordBatchStream, row_projector: ArrowRecordBatchProjector, - row_num: usize, + metrics: ProjectorMetrics, start_time: Instant, sst_meta: ParquetMetaDataRef, + row_mem: Arc, } impl RecordBatchProjector { fn new( - path: String, stream: SendableRecordBatchStream, row_projector: ArrowRecordBatchProjector, sst_meta: ParquetMetaDataRef, + metrics_collector: Option, + row_mem: Arc, ) -> Self { + let metrics = ProjectorMetrics { + metrics_collector, + ..Default::default() + }; + Self { - path, stream, row_projector, - row_num: 0, + metrics, start_time: Instant::now(), sst_meta, + row_mem, } } } -impl Drop for RecordBatchProjector { - fn drop(&mut self) { - debug!( - "RecordBatchProjector dropped, path:{} rows:{}, cost:{}ms.", - self.path, - self.row_num, - self.start_time.saturating_elapsed().as_millis(), - ); - } -} - impl Stream for RecordBatchProjector { type Item = Result; @@ -505,7 +518,13 @@ impl Stream for RecordBatchProjector { .box_err() .context(DecodeRecordBatch)?; - projector.row_num += record_batch.num_rows(); + for col in record_batch.columns() { + projector.metrics.row_mem += col.get_array_memory_size(); + projector + .row_mem + .fetch_add(col.get_array_memory_size() as u64, Ordering::SeqCst); + } + projector.metrics.row_num += record_batch.num_rows(); let projected_batch = projector .row_projector @@ -518,7 +537,10 @@ impl Stream for RecordBatchProjector { } } Poll::Pending => Poll::Pending, - Poll::Ready(None) => Poll::Ready(None), + Poll::Ready(None) => { + projector.metrics.project_record_batch += projector.start_time.saturating_elapsed(); + Poll::Ready(None) + } } } @@ -540,7 +562,9 @@ impl<'a> SstReader for Reader<'a> { async fn read( &mut self, ) -> Result> + Send + Unpin>> { - let mut streams = self.maybe_read_parallelly(1).await?; + let mut streams = self + .maybe_read_parallelly(1, Arc::new(AtomicU64::new(0))) + .await?; assert_eq!(streams.len(), 1); let stream = streams.pop().expect("impossible to fetch no stream"); @@ -553,6 +577,16 @@ struct RecordBatchReceiver { cur_rx_idx: usize, #[allow(dead_code)] drop_helper: AbortOnDropMany<()>, + row_mem: Arc, +} + +impl Drop for RecordBatchReceiver { + fn drop(&mut self) { + log::info!( + "RecordBatchReceiver row_mem:{}", + self.row_mem.load(Ordering::SeqCst) + ); + } } impl Stream for RecordBatchReceiver { @@ -605,6 +639,7 @@ pub struct ThreadedReader<'a> { channel_cap: usize, read_parallelism: usize, + row_mem: Arc, } impl<'a> ThreadedReader<'a> { @@ -624,6 +659,7 @@ impl<'a> ThreadedReader<'a> { runtime, channel_cap, read_parallelism, + row_mem: Arc::new(AtomicU64::new(0)), } } @@ -654,13 +690,14 @@ impl<'a> SstReader for ThreadedReader<'a> { // Get underlying sst readers and channels. let sub_readers = self .inner - .maybe_read_parallelly(self.read_parallelism) + .maybe_read_parallelly(self.read_parallelism, self.row_mem.clone()) .await?; if sub_readers.is_empty() { return Ok(Box::new(RecordBatchReceiver { rx_group: Vec::new(), cur_rx_idx: 0, drop_helper: AbortOnDropMany(Vec::new()), + row_mem: self.row_mem.clone(), }) as _); } @@ -685,6 +722,7 @@ impl<'a> SstReader for ThreadedReader<'a> { rx_group, cur_rx_idx: 0, drop_helper: AbortOnDropMany(handles), + row_mem: self.row_mem.clone(), }) as _) } } diff --git a/components/trace_metric/src/metric.rs b/components/trace_metric/src/metric.rs index cd9e95dffd..0235351aba 100644 --- a/components/trace_metric/src/metric.rs +++ b/components/trace_metric/src/metric.rs @@ -2,10 +2,16 @@ use std::{fmt, time::Duration}; +#[derive(Clone)] +pub enum MetricOp { + Add, +} + #[derive(Clone)] pub struct MetricValue { pub name: String, pub val: T, + pub op: Option, } #[derive(Clone)] @@ -17,18 +23,18 @@ pub enum Metric { impl Metric { #[inline] - pub fn number(name: String, val: usize) -> Self { - Metric::Number(MetricValue { name, val }) + pub fn number(name: String, val: usize, op: Option) -> Self { + Metric::Number(MetricValue { name, val, op }) } #[inline] - pub fn duration(name: String, val: Duration) -> Self { - Metric::Duration(MetricValue { name, val }) + pub fn duration(name: String, val: Duration, op: Option) -> Self { + Metric::Duration(MetricValue { name, val, op }) } #[inline] - pub fn boolean(name: String, val: bool) -> Self { - Metric::Boolean(MetricValue { name, val }) + pub fn boolean(name: String, val: bool, op: Option) -> Self { + Metric::Boolean(MetricValue { name, val, op }) } } diff --git a/components/trace_metric_derive/src/builder.rs b/components/trace_metric_derive/src/builder.rs index 1a96fc65d7..72269ca018 100644 --- a/components/trace_metric_derive/src/builder.rs +++ b/components/trace_metric_derive/src/builder.rs @@ -1,13 +1,24 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. use proc_macro::TokenStream; -use quote::{quote, ToTokens}; +use quote::{quote, ToTokens, TokenStreamExt}; use syn::{DeriveInput, Field, Generics, Ident}; const COLLECTOR_FIELD_TOKENS: &str = "(collector)"; -const NUMBER_FIELD_TOKENS: &str = "(number)"; -const DURATION_FIELD_TOKENS: &str = "(duration)"; -const BOOLEAN_FIELD_TOKENS: &str = "(boolean)"; +const NUMBER_FIELD_TOKENS: &str = "number"; +const DURATION_FIELD_TOKENS: &str = "duration"; +const BOOLEAN_FIELD_TOKENS: &str = "boolean"; + +#[derive(Debug, Clone)] +enum MetricOp { + Add, +} + +impl ToTokens for MetricOp { + fn to_tokens(&self, tokens: &mut proc_macro2::TokenStream) { + tokens.append_all(format!("{self:?}").as_bytes()); + } +} enum MetricType { Number, @@ -15,22 +26,58 @@ enum MetricType { Boolean, } -impl MetricType { - fn try_from_tokens(s: &str) -> Option { +struct MetricMetadata { + typ: MetricType, + op: Option, +} + +impl MetricMetadata { + fn parse_op(s: &str) -> Option { + match s { + "add" => Some(MetricOp::Add), + _ => None, + } + } + + fn parse_type(s: &str) -> Option { if s == NUMBER_FIELD_TOKENS { - Some(Self::Number) + Some(MetricType::Number) } else if s == DURATION_FIELD_TOKENS { - Some(Self::Duration) + Some(MetricType::Duration) } else if s == BOOLEAN_FIELD_TOKENS { - Some(Self::Boolean) + Some(MetricType::Boolean) } else { None } } + + fn try_from_tokens(tokens: &proc_macro2::TokenStream) -> Option { + for tree in tokens.clone().into_iter() { + if let proc_macro2::TokenTree::Group(group) = tree { + let trees = group.stream().into_iter().collect::>(); + match trees.len() { + // #[metric(number)] + 1 => { + return Self::parse_type(&trees[0].to_string()) + .map(|typ| Self { typ, op: None }) + } + // #[metric(number, add)] + 3 => { + let typ = Self::parse_type(&trees[0].to_string())?; + let op = Self::parse_op(&trees[2].to_string())?; + return Some(Self { typ, op: Some(op) }); + } + _ => return None, + } + } + } + + None + } } struct MetricField { - metric_type: MetricType, + metric_metadata: MetricMetadata, field_name: Ident, } @@ -42,11 +89,10 @@ impl MetricField { } let field_name = field.ident.expect("Metric field must have a name"); - let metric_type_tokens = attr.tokens.to_string(); - let metric_type = - MetricType::try_from_tokens(&metric_type_tokens).expect("Unknown metric type"); + let metric_metadata = + MetricMetadata::try_from_tokens(&attr.tokens).expect("Unknown metric type"); return Some(Self { - metric_type, + metric_metadata, field_name, }); } @@ -124,15 +170,27 @@ impl Builder { let mut collect_statements = Vec::with_capacity(self.metric_fields.len()); for metric_field in self.metric_fields.iter() { let field_name = &metric_field.field_name; - let metric = match metric_field.metric_type { + let md = &metric_field.metric_metadata; + let op = &md.op; + let metric = match md.typ { MetricType::Number => { - quote! { ::trace_metric::Metric::number(stringify!(#field_name).to_string(), self.#field_name) } + quote! { ::trace_metric::Metric::number(stringify!(#field_name).to_string(), + self.#field_name, + ::trace_metric::Metric::MetricOp::stringify!(#op)) + } } MetricType::Duration => { - quote! { ::trace_metric::Metric::duration(stringify!(#field_name).to_string(), self.#field_name) } + quote! { ::trace_metric::Metric::duration(stringify!(#field_name).to_string(), + self.#field_name, + ::trace_metric::Metric::MetricOp::stringify!(#op)) + } } MetricType::Boolean => { - quote! { ::trace_metric::Metric::boolean(stringify!(#field_name).to_string(), self.#field_name) } + quote! { ::trace_metric::Metric::boolean(stringify!(#field_name).to_string(), + self.#field_name, + ::trace_metric::Metric::MetricOp::stringify!(#op)) + + } } }; let statement = quote! { From ee773a208548db0c5372c93eeb95a145704e23f8 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Mon, 10 Jul 2023 11:15:14 +0800 Subject: [PATCH 02/11] fix marco --- .../src/sst/parquet/async_reader.rs | 3 +- components/trace_metric_derive/src/builder.rs | 47 ++++++++++--------- .../trace_metric_derive_tests/src/lib.rs | 2 +- 3 files changed, 27 insertions(+), 25 deletions(-) diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index 5988dd14b4..674f69914a 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -375,8 +375,7 @@ impl<'a> Reader<'a> { file_path: self.path.to_string(), })?; - //TODO: unstore origin value - MetaData::try_new(&parquet_meta_data, true) + MetaData::try_new(&parquet_meta_data, ignore_sst_filter) .box_err() .context(DecodeSstMeta) } diff --git a/components/trace_metric_derive/src/builder.rs b/components/trace_metric_derive/src/builder.rs index 72269ca018..abaf7ac8d0 100644 --- a/components/trace_metric_derive/src/builder.rs +++ b/components/trace_metric_derive/src/builder.rs @@ -1,6 +1,7 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. use proc_macro::TokenStream; +use proc_macro2::Span; use quote::{quote, ToTokens, TokenStreamExt}; use syn::{DeriveInput, Field, Generics, Ident}; @@ -16,16 +17,26 @@ enum MetricOp { impl ToTokens for MetricOp { fn to_tokens(&self, tokens: &mut proc_macro2::TokenStream) { - tokens.append_all(format!("{self:?}").as_bytes()); + tokens.append_all(&[Ident::new(format!("{self:?}").as_str(), Span::call_site())]); } } +#[derive(Debug)] enum MetricType { Number, Duration, Boolean, } +impl ToTokens for MetricType { + fn to_tokens(&self, tokens: &mut proc_macro2::TokenStream) { + tokens.append_all(&[Ident::new( + format!("{self:?}").to_lowercase().as_str(), + Span::call_site(), + )]); + } +} + struct MetricMetadata { typ: MetricType, op: Option, @@ -33,7 +44,7 @@ struct MetricMetadata { impl MetricMetadata { fn parse_op(s: &str) -> Option { - match s { + match s.to_lowercase().as_str() { "add" => Some(MetricOp::Add), _ => None, } @@ -170,29 +181,21 @@ impl Builder { let mut collect_statements = Vec::with_capacity(self.metric_fields.len()); for metric_field in self.metric_fields.iter() { let field_name = &metric_field.field_name; - let md = &metric_field.metric_metadata; - let op = &md.op; - let metric = match md.typ { - MetricType::Number => { - quote! { ::trace_metric::Metric::number(stringify!(#field_name).to_string(), - self.#field_name, - ::trace_metric::Metric::MetricOp::stringify!(#op)) - } + let metadata = &metric_field.metric_metadata; + let metric_op = &metadata.op; + let metric_type = &metadata.typ; + let metric = if let Some(op) = metric_op { + quote! { ::trace_metric::Metric::#metric_type(stringify!(#field_name).to_string(), + self.#field_name, + Some(::trace_metric::metric::MetricOp::#op)) } - MetricType::Duration => { - quote! { ::trace_metric::Metric::duration(stringify!(#field_name).to_string(), - self.#field_name, - ::trace_metric::Metric::MetricOp::stringify!(#op)) - } - } - MetricType::Boolean => { - quote! { ::trace_metric::Metric::boolean(stringify!(#field_name).to_string(), - self.#field_name, - ::trace_metric::Metric::MetricOp::stringify!(#op)) - - } + } else { + quote! { ::trace_metric::Metric::#metric_type(stringify!(#field_name).to_string(), + self.#field_name, + None) } }; + let statement = quote! { collector.collect(#metric); }; diff --git a/components/trace_metric_derive_tests/src/lib.rs b/components/trace_metric_derive_tests/src/lib.rs index 5b1a15ade6..8d9964d3b1 100644 --- a/components/trace_metric_derive_tests/src/lib.rs +++ b/components/trace_metric_derive_tests/src/lib.rs @@ -6,7 +6,7 @@ use trace_metric::{MetricsCollector, TraceMetricWhenDrop}; #[derive(Debug, Clone, TraceMetricWhenDrop)] pub struct ExampleMetrics { - #[metric(number)] + #[metric(number, add)] pub counter: usize, #[metric(duration)] pub elapsed: Duration, From 6dcea14993d5f4187060c1a1ffba69fa6dde81ad Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Mon, 10 Jul 2023 14:24:36 +0800 Subject: [PATCH 03/11] add UT --- .../src/sst/parquet/async_reader.rs | 4 +- components/trace_metric/src/collector.rs | 46 ++++++++++++++++--- components/trace_metric/src/metric.rs | 27 +++++++++++ .../trace_metric_derive_tests/src/lib.rs | 13 +++++- 4 files changed, 80 insertions(+), 10 deletions(-) diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index 674f69914a..c45ebd3304 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -457,9 +457,9 @@ impl<'a> ChunkReader for ChunkReaderAdapter<'a> { pub(crate) struct ProjectorMetrics { #[metric(number, add)] pub row_num: usize, - #[metric(number)] + #[metric(number, add)] pub row_mem: usize, - #[metric(duration)] + #[metric(duration, add)] pub project_record_batch: Duration, #[metric(collector)] pub metrics_collector: Option, diff --git a/components/trace_metric/src/collector.rs b/components/trace_metric/src/collector.rs index a83d967d23..a825fe28ee 100644 --- a/components/trace_metric/src/collector.rs +++ b/components/trace_metric/src/collector.rs @@ -1,6 +1,9 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -use std::sync::{Arc, Mutex}; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; use crate::metric::Metric; @@ -46,8 +49,37 @@ impl MetricsCollector { /// Calls a closure on each top-level metrics of this collector. pub fn for_each_metric(&self, f: &mut impl FnMut(&Metric)) { let metrics = self.metrics.lock().unwrap(); + let mut metrics_by_name = HashMap::with_capacity(metrics.len()); + for metric in metrics.iter() { - f(metric); + metrics_by_name + .entry(metric.name().to_string()) + .or_insert_with(|| vec![]) + .push(metric); + } + + for metrics in metrics_by_name.values() { + if metrics.len() == 1 { + f(metrics[0]); + continue; + } + + if let Some(op) = metrics[0].op() { + match op { + crate::metric::MetricOp::Add => { + let mut first = metrics[0].clone(); + for m in &metrics[1..] { + first.add(m); + } + // only apply fn to first metric. + f(&first); + } + } + } else { + for metric in metrics { + f(metric); + } + } } } @@ -111,23 +143,25 @@ mod tests { #[test] fn test_metrics_collector() { let collector = MetricsCollector::new("root".to_string()); - collector.collect(Metric::number("counter".to_string(), 1)); + collector.collect(Metric::number("counter".to_string(), 1, None)); collector.collect(Metric::duration( "elapsed".to_string(), Duration::from_millis(100), + None, )); let child_1_0 = collector.span("child_1_0".to_string()); - child_1_0.collect(Metric::boolean("boolean".to_string(), false)); + child_1_0.collect(Metric::boolean("boolean".to_string(), false, None)); let child_2_0 = child_1_0.span("child_2_0".to_string()); - child_2_0.collect(Metric::number("counter".to_string(), 1)); + child_2_0.collect(Metric::number("counter".to_string(), 1, None)); child_2_0.collect(Metric::duration( "elapsed".to_string(), Duration::from_millis(100), + None, )); let child_1_1 = collector.span("child_1_1".to_string()); - child_1_1.collect(Metric::boolean("boolean".to_string(), false)); + child_1_1.collect(Metric::boolean("boolean".to_string(), false, None)); let _child_1_2 = collector.span("child_1_2".to_string()); let mut visitor = FormatCollectorVisitor::default(); diff --git a/components/trace_metric/src/metric.rs b/components/trace_metric/src/metric.rs index 0235351aba..e0d9fc9d6d 100644 --- a/components/trace_metric/src/metric.rs +++ b/components/trace_metric/src/metric.rs @@ -36,6 +36,33 @@ impl Metric { pub fn boolean(name: String, val: bool, op: Option) -> Self { Metric::Boolean(MetricValue { name, val, op }) } + + pub fn name(&self) -> &str { + match self { + Self::Boolean(v) => &v.name, + Self::Number(v) => &v.name, + Self::Duration(v) => &v.name, + } + } + + pub fn op(&self) -> &Option { + match self { + Self::Boolean(v) => &v.op, + Self::Number(v) => &v.op, + Self::Duration(v) => &v.op, + } + } + + // Add performs value add when metrics are same type + // If their types are different, do nothing. + pub fn add(&mut self, rhs: &Self) { + match (self, rhs) { + (Self::Boolean(v), Self::Boolean(v2)) => v.val |= v2.val, + (Self::Number(v), Self::Number(v2)) => v.val += v2.val, + (Self::Duration(v), Self::Duration(v2)) => v.val += v2.val, + _ => {} + } + } } impl fmt::Debug for MetricValue { diff --git a/components/trace_metric_derive_tests/src/lib.rs b/components/trace_metric_derive_tests/src/lib.rs index 8d9964d3b1..8590ee584d 100644 --- a/components/trace_metric_derive_tests/src/lib.rs +++ b/components/trace_metric_derive_tests/src/lib.rs @@ -32,6 +32,13 @@ mod test { counter: 1, elapsed: Duration::from_secs(1), boolean: true, + foo: "foor".to_owned(), + collector: collector.clone(), + }; + let _ = ExampleMetrics { + counter: 10, + elapsed: Duration::from_secs(2), + boolean: false, foo: "bar".to_owned(), collector: collector.clone(), }; @@ -39,9 +46,11 @@ mod test { let mut formatter = FormatCollectorVisitor::default(); collector.visit(&mut formatter); let expect_output = r#"test: - counter=1 - elapsed=1s + counter=11 boolean=true + boolean=false + elapsed=1s + elapsed=2s "#; assert_eq!(expect_output, &formatter.into_string()); From 4c60b435fa4577dbe02d1ed6171bd75a450a5ee1 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Mon, 10 Jul 2023 14:31:36 +0800 Subject: [PATCH 04/11] fix CI --- .../src/sst/parquet/async_reader.rs | 35 +++---------------- components/trace_metric/src/collector.rs | 8 ++--- 2 files changed, 8 insertions(+), 35 deletions(-) diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index c45ebd3304..dfb66a6b4e 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -5,10 +5,7 @@ use std::{ ops::Range, pin::Pin, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, + sync::Arc, task::{Context, Poll}, time::{Duration, Instant}, }; @@ -131,7 +128,6 @@ impl<'a> Reader<'a> { async fn maybe_read_parallelly( &mut self, read_parallelism: usize, - row_mem: Arc, ) -> Result> + Send + Unpin>>> { assert!(read_parallelism > 0); @@ -161,7 +157,6 @@ impl<'a> Reader<'a> { row_projector.clone(), sst_meta_data.clone(), self.metrics.metrics_collector.clone(), - row_mem.clone(), )) as _ }) .collect(); @@ -427,7 +422,7 @@ impl<'a> Reader<'a> { impl<'a> Drop for Reader<'a> { fn drop(&mut self) { - log::info!( + debug!( "Parquet reader dropped, path:{:?}, df_plan_metrics:{}", self.path, self.df_plan_metrics.clone_inner().to_string() @@ -472,7 +467,6 @@ struct RecordBatchProjector { metrics: ProjectorMetrics, start_time: Instant, sst_meta: ParquetMetaDataRef, - row_mem: Arc, } impl RecordBatchProjector { @@ -481,7 +475,6 @@ impl RecordBatchProjector { row_projector: ArrowRecordBatchProjector, sst_meta: ParquetMetaDataRef, metrics_collector: Option, - row_mem: Arc, ) -> Self { let metrics = ProjectorMetrics { metrics_collector, @@ -494,7 +487,6 @@ impl RecordBatchProjector { metrics, start_time: Instant::now(), sst_meta, - row_mem, } } } @@ -519,9 +511,6 @@ impl Stream for RecordBatchProjector { for col in record_batch.columns() { projector.metrics.row_mem += col.get_array_memory_size(); - projector - .row_mem - .fetch_add(col.get_array_memory_size() as u64, Ordering::SeqCst); } projector.metrics.row_num += record_batch.num_rows(); @@ -561,9 +550,7 @@ impl<'a> SstReader for Reader<'a> { async fn read( &mut self, ) -> Result> + Send + Unpin>> { - let mut streams = self - .maybe_read_parallelly(1, Arc::new(AtomicU64::new(0))) - .await?; + let mut streams = self.maybe_read_parallelly(1).await?; assert_eq!(streams.len(), 1); let stream = streams.pop().expect("impossible to fetch no stream"); @@ -576,16 +563,6 @@ struct RecordBatchReceiver { cur_rx_idx: usize, #[allow(dead_code)] drop_helper: AbortOnDropMany<()>, - row_mem: Arc, -} - -impl Drop for RecordBatchReceiver { - fn drop(&mut self) { - log::info!( - "RecordBatchReceiver row_mem:{}", - self.row_mem.load(Ordering::SeqCst) - ); - } } impl Stream for RecordBatchReceiver { @@ -638,7 +615,6 @@ pub struct ThreadedReader<'a> { channel_cap: usize, read_parallelism: usize, - row_mem: Arc, } impl<'a> ThreadedReader<'a> { @@ -658,7 +634,6 @@ impl<'a> ThreadedReader<'a> { runtime, channel_cap, read_parallelism, - row_mem: Arc::new(AtomicU64::new(0)), } } @@ -689,14 +664,13 @@ impl<'a> SstReader for ThreadedReader<'a> { // Get underlying sst readers and channels. let sub_readers = self .inner - .maybe_read_parallelly(self.read_parallelism, self.row_mem.clone()) + .maybe_read_parallelly(self.read_parallelism) .await?; if sub_readers.is_empty() { return Ok(Box::new(RecordBatchReceiver { rx_group: Vec::new(), cur_rx_idx: 0, drop_helper: AbortOnDropMany(Vec::new()), - row_mem: self.row_mem.clone(), }) as _); } @@ -721,7 +695,6 @@ impl<'a> SstReader for ThreadedReader<'a> { rx_group, cur_rx_idx: 0, drop_helper: AbortOnDropMany(handles), - row_mem: self.row_mem.clone(), }) as _) } } diff --git a/components/trace_metric/src/collector.rs b/components/trace_metric/src/collector.rs index a825fe28ee..560e22a38a 100644 --- a/components/trace_metric/src/collector.rs +++ b/components/trace_metric/src/collector.rs @@ -5,7 +5,7 @@ use std::{ sync::{Arc, Mutex}, }; -use crate::metric::Metric; +use crate::metric::{Metric, MetricOp}; /// A collector for metrics of a single read request. /// @@ -49,12 +49,12 @@ impl MetricsCollector { /// Calls a closure on each top-level metrics of this collector. pub fn for_each_metric(&self, f: &mut impl FnMut(&Metric)) { let metrics = self.metrics.lock().unwrap(); - let mut metrics_by_name = HashMap::with_capacity(metrics.len()); + let mut metrics_by_name = HashMap::with_capacity(metrics.len()); for metric in metrics.iter() { metrics_by_name .entry(metric.name().to_string()) - .or_insert_with(|| vec![]) + .or_insert_with(Vec::new) .push(metric); } @@ -66,7 +66,7 @@ impl MetricsCollector { if let Some(op) = metrics[0].op() { match op { - crate::metric::MetricOp::Add => { + MetricOp::Add => { let mut first = metrics[0].clone(); for m in &metrics[1..] { first.add(m); From d993822bb3fdc7c158cf3f4eed94e3ab7902a963 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Mon, 10 Jul 2023 14:35:55 +0800 Subject: [PATCH 05/11] add inline --- components/trace_metric/src/metric.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/components/trace_metric/src/metric.rs b/components/trace_metric/src/metric.rs index e0d9fc9d6d..2672f1e763 100644 --- a/components/trace_metric/src/metric.rs +++ b/components/trace_metric/src/metric.rs @@ -37,6 +37,7 @@ impl Metric { Metric::Boolean(MetricValue { name, val, op }) } + #[inline] pub fn name(&self) -> &str { match self { Self::Boolean(v) => &v.name, @@ -45,6 +46,7 @@ impl Metric { } } + #[inline] pub fn op(&self) -> &Option { match self { Self::Boolean(v) => &v.op, @@ -53,13 +55,14 @@ impl Metric { } } - // Add performs value add when metrics are same type + // Add performs value add when metrics are same type, // If their types are different, do nothing. + #[inline] pub fn add(&mut self, rhs: &Self) { match (self, rhs) { - (Self::Boolean(v), Self::Boolean(v2)) => v.val |= v2.val, - (Self::Number(v), Self::Number(v2)) => v.val += v2.val, - (Self::Duration(v), Self::Duration(v2)) => v.val += v2.val, + (Self::Boolean(lhs), Self::Boolean(rhs)) => lhs.val |= rhs.val, + (Self::Number(lhs), Self::Number(rhs)) => lhs.val += rhs.val, + (Self::Duration(lhs), Self::Duration(rhs)) => lhs.val += rhs.val, _ => {} } } From c8e953f59d5768525bb8b2f650f0e152898b71f9 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Mon, 10 Jul 2023 14:46:36 +0800 Subject: [PATCH 06/11] fix random tests --- components/trace_metric_derive/src/builder.rs | 8 +++--- .../trace_metric_derive_tests/src/lib.rs | 27 +++++++++++++++---- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/components/trace_metric_derive/src/builder.rs b/components/trace_metric_derive/src/builder.rs index abaf7ac8d0..0af69f51b6 100644 --- a/components/trace_metric_derive/src/builder.rs +++ b/components/trace_metric_derive/src/builder.rs @@ -17,7 +17,7 @@ enum MetricOp { impl ToTokens for MetricOp { fn to_tokens(&self, tokens: &mut proc_macro2::TokenStream) { - tokens.append_all(&[Ident::new(format!("{self:?}").as_str(), Span::call_site())]); + tokens.append(Ident::new(&format!("{self:?}"), Span::call_site())); } } @@ -30,10 +30,10 @@ enum MetricType { impl ToTokens for MetricType { fn to_tokens(&self, tokens: &mut proc_macro2::TokenStream) { - tokens.append_all(&[Ident::new( - format!("{self:?}").to_lowercase().as_str(), + tokens.append(Ident::new( + &format!("{self:?}").to_lowercase(), Span::call_site(), - )]); + )); } } diff --git a/components/trace_metric_derive_tests/src/lib.rs b/components/trace_metric_derive_tests/src/lib.rs index 8590ee584d..457f07ac0d 100644 --- a/components/trace_metric_derive_tests/src/lib.rs +++ b/components/trace_metric_derive_tests/src/lib.rs @@ -45,14 +45,31 @@ mod test { } let mut formatter = FormatCollectorVisitor::default(); collector.visit(&mut formatter); - let expect_output = r#"test: - counter=11 + let actual = formatter.into_string(); + + // The expected output is as follows, but its ordering is not stable, so + // use contains to do check. + // let expect_output = r#"test: + // counter=11 + // boolean=true + // boolean=false + // elapsed=1s + // elapsed=2s + // "#; + + // counter is added together + assert!(actual.contains("counter=11")); + assert!(actual.contains( + r#" boolean=true boolean=false +"# + )); + assert!(actual.contains( + r#" elapsed=1s elapsed=2s -"#; - - assert_eq!(expect_output, &formatter.into_string()); +"# + )); } } From 265b037faadc6bb0379948dce622afb296d4eaaf Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Mon, 10 Jul 2023 14:55:01 +0800 Subject: [PATCH 07/11] remove clone --- components/trace_metric/src/collector.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/components/trace_metric/src/collector.rs b/components/trace_metric/src/collector.rs index 560e22a38a..0ab4d13786 100644 --- a/components/trace_metric/src/collector.rs +++ b/components/trace_metric/src/collector.rs @@ -53,14 +53,13 @@ impl MetricsCollector { let mut metrics_by_name = HashMap::with_capacity(metrics.len()); for metric in metrics.iter() { metrics_by_name - .entry(metric.name().to_string()) + .entry(metric.name()) .or_insert_with(Vec::new) .push(metric); } for metrics in metrics_by_name.values() { - if metrics.len() == 1 { - f(metrics[0]); + if metrics.is_empty() { continue; } From ce937bfa134ee5f8a0b4a07876f3392416205d64 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Mon, 10 Jul 2023 17:10:55 +0800 Subject: [PATCH 08/11] fix CI --- components/trace_metric/src/collector.rs | 4 ++-- .../trace_metric_derive_tests/src/lib.rs | 24 ++++--------------- 2 files changed, 6 insertions(+), 22 deletions(-) diff --git a/components/trace_metric/src/collector.rs b/components/trace_metric/src/collector.rs index 0ab4d13786..f84135a18e 100644 --- a/components/trace_metric/src/collector.rs +++ b/components/trace_metric/src/collector.rs @@ -1,7 +1,7 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. use std::{ - collections::HashMap, + collections::BTreeMap, sync::{Arc, Mutex}, }; @@ -50,7 +50,7 @@ impl MetricsCollector { pub fn for_each_metric(&self, f: &mut impl FnMut(&Metric)) { let metrics = self.metrics.lock().unwrap(); - let mut metrics_by_name = HashMap::with_capacity(metrics.len()); + let mut metrics_by_name = BTreeMap::new(); for metric in metrics.iter() { metrics_by_name .entry(metric.name()) diff --git a/components/trace_metric_derive_tests/src/lib.rs b/components/trace_metric_derive_tests/src/lib.rs index 457f07ac0d..40edd1dca1 100644 --- a/components/trace_metric_derive_tests/src/lib.rs +++ b/components/trace_metric_derive_tests/src/lib.rs @@ -47,29 +47,13 @@ mod test { collector.visit(&mut formatter); let actual = formatter.into_string(); - // The expected output is as follows, but its ordering is not stable, so - // use contains to do check. - // let expect_output = r#"test: - // counter=11 - // boolean=true - // boolean=false - // elapsed=1s - // elapsed=2s - // "#; - - // counter is added together - assert!(actual.contains("counter=11")); - assert!(actual.contains( - r#" + let expected = r#"test: boolean=true boolean=false -"# - )); - assert!(actual.contains( - r#" + counter=11 elapsed=1s elapsed=2s -"# - )); +"#; + assert_eq!(expected, actual); } } From 48c28b0af2f6bc8afa8e7ee6bdd2deda1925f58e Mon Sep 17 00:00:00 2001 From: Jiacai Liu Date: Tue, 11 Jul 2023 10:09:41 +0800 Subject: [PATCH 09/11] Update components/trace_metric/src/metric.rs Co-authored-by: WEI Xikai --- components/trace_metric/src/metric.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/components/trace_metric/src/metric.rs b/components/trace_metric/src/metric.rs index 2672f1e763..0b580ce1ac 100644 --- a/components/trace_metric/src/metric.rs +++ b/components/trace_metric/src/metric.rs @@ -3,8 +3,8 @@ use std::{fmt, time::Duration}; #[derive(Clone)] -pub enum MetricOp { - Add, +pub enum MetricAggregator { + Sum, } #[derive(Clone)] From 47eb0bbdf695789384960c8dabf4a24206453645 Mon Sep 17 00:00:00 2001 From: Jiacai Liu Date: Tue, 11 Jul 2023 10:09:52 +0800 Subject: [PATCH 10/11] Update components/trace_metric_derive/src/builder.rs Co-authored-by: WEI Xikai --- components/trace_metric_derive/src/builder.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/components/trace_metric_derive/src/builder.rs b/components/trace_metric_derive/src/builder.rs index 0af69f51b6..4571067083 100644 --- a/components/trace_metric_derive/src/builder.rs +++ b/components/trace_metric_derive/src/builder.rs @@ -11,8 +11,8 @@ const DURATION_FIELD_TOKENS: &str = "duration"; const BOOLEAN_FIELD_TOKENS: &str = "boolean"; #[derive(Debug, Clone)] -enum MetricOp { - Add, +enum MetricAggregator { + Sum, } impl ToTokens for MetricOp { From 54565cc0f7d7d8d680f6922793dc5ec81b56dc87 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Tue, 11 Jul 2023 10:22:01 +0800 Subject: [PATCH 11/11] fix CR --- .../src/sst/parquet/async_reader.rs | 6 +-- components/trace_metric/src/collector.rs | 8 ++-- components/trace_metric/src/metric.rs | 44 ++++++++++++------- components/trace_metric_derive/src/builder.rs | 27 +++++++----- .../trace_metric_derive_tests/src/lib.rs | 2 +- 5 files changed, 53 insertions(+), 34 deletions(-) diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index dfb66a6b4e..3092ccd3b2 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -450,11 +450,11 @@ impl<'a> ChunkReader for ChunkReaderAdapter<'a> { #[derive(Default, Debug, Clone, TraceMetricWhenDrop)] pub(crate) struct ProjectorMetrics { - #[metric(number, add)] + #[metric(number, sum)] pub row_num: usize, - #[metric(number, add)] + #[metric(number, sum)] pub row_mem: usize, - #[metric(duration, add)] + #[metric(duration, sum)] pub project_record_batch: Duration, #[metric(collector)] pub metrics_collector: Option, diff --git a/components/trace_metric/src/collector.rs b/components/trace_metric/src/collector.rs index f84135a18e..f91a6dfc92 100644 --- a/components/trace_metric/src/collector.rs +++ b/components/trace_metric/src/collector.rs @@ -5,7 +5,7 @@ use std::{ sync::{Arc, Mutex}, }; -use crate::metric::{Metric, MetricOp}; +use crate::metric::{Metric, MetricAggregator}; /// A collector for metrics of a single read request. /// @@ -63,12 +63,12 @@ impl MetricsCollector { continue; } - if let Some(op) = metrics[0].op() { + if let Some(op) = metrics[0].aggregator() { match op { - MetricOp::Add => { + MetricAggregator::Sum => { let mut first = metrics[0].clone(); for m in &metrics[1..] { - first.add(m); + first.sum(m); } // only apply fn to first metric. f(&first); diff --git a/components/trace_metric/src/metric.rs b/components/trace_metric/src/metric.rs index 0b580ce1ac..8c0f924c2c 100644 --- a/components/trace_metric/src/metric.rs +++ b/components/trace_metric/src/metric.rs @@ -11,7 +11,7 @@ pub enum MetricAggregator { pub struct MetricValue { pub name: String, pub val: T, - pub op: Option, + pub aggregator: Option, } #[derive(Clone)] @@ -23,18 +23,30 @@ pub enum Metric { impl Metric { #[inline] - pub fn number(name: String, val: usize, op: Option) -> Self { - Metric::Number(MetricValue { name, val, op }) + pub fn number(name: String, val: usize, aggregator: Option) -> Self { + Metric::Number(MetricValue { + name, + val, + aggregator, + }) } #[inline] - pub fn duration(name: String, val: Duration, op: Option) -> Self { - Metric::Duration(MetricValue { name, val, op }) + pub fn duration(name: String, val: Duration, aggregator: Option) -> Self { + Metric::Duration(MetricValue { + name, + val, + aggregator, + }) } #[inline] - pub fn boolean(name: String, val: bool, op: Option) -> Self { - Metric::Boolean(MetricValue { name, val, op }) + pub fn boolean(name: String, val: bool, aggregator: Option) -> Self { + Metric::Boolean(MetricValue { + name, + val, + aggregator, + }) } #[inline] @@ -47,23 +59,25 @@ impl Metric { } #[inline] - pub fn op(&self) -> &Option { + pub fn aggregator(&self) -> &Option { match self { - Self::Boolean(v) => &v.op, - Self::Number(v) => &v.op, - Self::Duration(v) => &v.op, + Self::Boolean(v) => &v.aggregator, + Self::Number(v) => &v.aggregator, + Self::Duration(v) => &v.aggregator, } } - // Add performs value add when metrics are same type, - // If their types are different, do nothing. + // Sum metric values together when metrics are same type, + // Panic if their types are different. #[inline] - pub fn add(&mut self, rhs: &Self) { + pub fn sum(&mut self, rhs: &Self) { match (self, rhs) { (Self::Boolean(lhs), Self::Boolean(rhs)) => lhs.val |= rhs.val, (Self::Number(lhs), Self::Number(rhs)) => lhs.val += rhs.val, (Self::Duration(lhs), Self::Duration(rhs)) => lhs.val += rhs.val, - _ => {} + (lhs, rhs) => { + panic!("Only same type metric could be applied, lhs:{lhs:?}, rhs:{rhs:?}") + } } } } diff --git a/components/trace_metric_derive/src/builder.rs b/components/trace_metric_derive/src/builder.rs index 4571067083..1e0ea3392e 100644 --- a/components/trace_metric_derive/src/builder.rs +++ b/components/trace_metric_derive/src/builder.rs @@ -15,7 +15,7 @@ enum MetricAggregator { Sum, } -impl ToTokens for MetricOp { +impl ToTokens for MetricAggregator { fn to_tokens(&self, tokens: &mut proc_macro2::TokenStream) { tokens.append(Ident::new(&format!("{self:?}"), Span::call_site())); } @@ -39,13 +39,13 @@ impl ToTokens for MetricType { struct MetricMetadata { typ: MetricType, - op: Option, + aggregator: Option, } impl MetricMetadata { - fn parse_op(s: &str) -> Option { + fn parse_aggregator(s: &str) -> Option { match s.to_lowercase().as_str() { - "add" => Some(MetricOp::Add), + "sum" => Some(MetricAggregator::Sum), _ => None, } } @@ -69,14 +69,19 @@ impl MetricMetadata { match trees.len() { // #[metric(number)] 1 => { - return Self::parse_type(&trees[0].to_string()) - .map(|typ| Self { typ, op: None }) + return Self::parse_type(&trees[0].to_string()).map(|typ| Self { + typ, + aggregator: None, + }) } // #[metric(number, add)] 3 => { let typ = Self::parse_type(&trees[0].to_string())?; - let op = Self::parse_op(&trees[2].to_string())?; - return Some(Self { typ, op: Some(op) }); + let aggregator = Self::parse_aggregator(&trees[2].to_string())?; + return Some(Self { + typ, + aggregator: Some(aggregator), + }); } _ => return None, } @@ -182,12 +187,12 @@ impl Builder { for metric_field in self.metric_fields.iter() { let field_name = &metric_field.field_name; let metadata = &metric_field.metric_metadata; - let metric_op = &metadata.op; + let aggregator = &metadata.aggregator; let metric_type = &metadata.typ; - let metric = if let Some(op) = metric_op { + let metric = if let Some(aggregator) = aggregator { quote! { ::trace_metric::Metric::#metric_type(stringify!(#field_name).to_string(), self.#field_name, - Some(::trace_metric::metric::MetricOp::#op)) + Some(::trace_metric::metric::MetricAggregator::#aggregator)) } } else { quote! { ::trace_metric::Metric::#metric_type(stringify!(#field_name).to_string(), diff --git a/components/trace_metric_derive_tests/src/lib.rs b/components/trace_metric_derive_tests/src/lib.rs index 40edd1dca1..35821cb6c8 100644 --- a/components/trace_metric_derive_tests/src/lib.rs +++ b/components/trace_metric_derive_tests/src/lib.rs @@ -6,7 +6,7 @@ use trace_metric::{MetricsCollector, TraceMetricWhenDrop}; #[derive(Debug, Clone, TraceMetricWhenDrop)] pub struct ExampleMetrics { - #[metric(number, add)] + #[metric(number, sum)] pub counter: usize, #[metric(duration)] pub elapsed: Duration,