From ca22dd49ee52b7683d59d55f95c9c6607ae11299 Mon Sep 17 00:00:00 2001 From: leiysky Date: Tue, 27 Jun 2023 15:38:39 +0800 Subject: [PATCH] add more execution information --- .../pipeline/transforms/src/processors/mod.rs | 3 +- .../src/processors/profile_wrapper.rs | 101 +++++++++++++-- .../processors/transforms/transform_sort.rs | 6 +- src/query/profile/src/proc.rs | 12 ++ src/query/profile/src/prof.rs | 33 ++++- .../service/src/pipelines/pipeline_builder.rs | 76 ++++++------ .../aggregator/transform_partition_bucket.rs | 4 +- .../it/storages/testdata/columns_table.txt | 2 +- src/query/sql/src/evaluator/block_operator.rs | 9 ++ src/query/sql/src/executor/profile.rs | 117 ++++++------------ .../system/src/query_profile_table.rs | 30 +++-- 11 files changed, 247 insertions(+), 146 deletions(-) diff --git a/src/query/pipeline/transforms/src/processors/mod.rs b/src/query/pipeline/transforms/src/processors/mod.rs index eeb264432828b..b09105d82b247 100644 --- a/src/query/pipeline/transforms/src/processors/mod.rs +++ b/src/query/pipeline/transforms/src/processors/mod.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod profile_wrapper; +pub mod profile_wrapper; pub mod transforms; -pub use profile_wrapper::ProfileWrapper; pub use transforms::Aborting; diff --git a/src/query/pipeline/transforms/src/processors/profile_wrapper.rs b/src/query/pipeline/transforms/src/processors/profile_wrapper.rs index 6eac56c7c3cb3..74b4fcf8eb5a0 100644 --- a/src/query/pipeline/transforms/src/processors/profile_wrapper.rs +++ b/src/query/pipeline/transforms/src/processors/profile_wrapper.rs @@ -12,41 +12,52 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; use std::time::Instant; use common_exception::Result; +use common_expression::DataBlock; +use common_pipeline_core::processors::port::InputPort; +use common_pipeline_core::processors::port::OutputPort; use common_pipeline_core::processors::processor::Event; use common_pipeline_core::processors::Processor; use common_profile::ProcessorProfile; use common_profile::SharedProcessorProfiles; -pub struct ProfileWrapper { +use crate::processors::transforms::Transform; +use crate::processors::transforms::Transformer; + +/// A profile wrapper for `Processor` trait. +/// This wrapper will record the time cost of each processor. +/// But because of the limitation of `Processor` trait, +/// we can't get the number of rows processed by the processor. +pub struct ProcessorProfileWrapper { inner: T, - prof_span_id: u32, - prof_span_set: SharedProcessorProfiles, + prof_id: u32, + proc_profs: SharedProcessorProfiles, prof: ProcessorProfile, } -impl ProfileWrapper +impl ProcessorProfileWrapper where T: Processor + 'static { pub fn create( inner: T, - prof_span_id: u32, - prof_span_set: SharedProcessorProfiles, + prof_id: u32, + proc_profs: SharedProcessorProfiles, ) -> Box { Box::new(Self { inner, - prof_span_id, - prof_span_set, + prof_id, + proc_profs, prof: ProcessorProfile::default(), }) } } #[async_trait::async_trait] -impl Processor for ProfileWrapper +impl Processor for ProcessorProfileWrapper where T: Processor + 'static { fn name(&self) -> String { @@ -60,10 +71,10 @@ where T: Processor + 'static fn event(&mut self) -> Result { match self.inner.event()? { Event::Finished => { - self.prof_span_set + self.proc_profs .lock() .unwrap() - .update(self.prof_span_id, self.prof); + .update(self.prof_id, self.prof); Ok(Event::Finished) } v => Ok(v), @@ -74,7 +85,11 @@ where T: Processor + 'static let instant = Instant::now(); self.inner.process()?; let elapsed = instant.elapsed(); - self.prof = self.prof + ProcessorProfile { cpu_time: elapsed }; + self.prof = self.prof + + ProcessorProfile { + cpu_time: elapsed, + ..Default::default() + }; Ok(()) } @@ -84,3 +99,65 @@ where T: Processor + 'static self.inner.async_process().await } } + +/// A profile wrapper for `Transform` trait. +/// This wrapper will record the time cost and the information +/// about the number of rows processed by the processor. +pub struct TransformProfileWrapper { + inner: T, + prof_id: u32, + proc_profs: SharedProcessorProfiles, + + prof: ProcessorProfile, +} + +impl TransformProfileWrapper +where T: Transform + 'static +{ + pub fn create( + inner: T, + input_port: Arc, + output_port: Arc, + prof_id: u32, + proc_profs: SharedProcessorProfiles, + ) -> Box { + Box::new(Transformer::create(input_port, output_port, Self { + inner, + prof_id, + proc_profs, + prof: ProcessorProfile::default(), + })) + } +} + +impl Transform for TransformProfileWrapper +where T: Transform + 'static +{ + const NAME: &'static str = "TransformProfileWrapper"; + + fn transform(&mut self, data: DataBlock) -> Result { + let input_rows = data.num_rows(); + let input_bytes = data.memory_size(); + + let instant = Instant::now(); + let res = self.inner.transform(data)?; + let elapsed = instant.elapsed(); + self.prof = self.prof + + ProcessorProfile { + cpu_time: elapsed, + input_rows: self.prof.input_rows + input_rows, + input_bytes: self.prof.input_bytes + input_bytes, + output_rows: self.prof.output_rows + res.num_rows(), + output_bytes: self.prof.output_bytes + res.memory_size(), + }; + Ok(res) + } + + fn on_finish(&mut self) -> Result<()> { + self.proc_profs + .lock() + .unwrap() + .update(self.prof_id, self.prof); + Ok(()) + } +} diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_sort.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_sort.rs index f0360d5a12404..28889ee1a814d 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_sort.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_sort.rs @@ -23,7 +23,7 @@ use super::transform_multi_sort_merge::try_add_multi_sort_merge; use super::transform_sort_merge::try_create_transform_sort_merge; use super::transform_sort_merge_limit::try_create_transform_sort_merge_limit; use super::TransformSortPartial; -use crate::processors::ProfileWrapper; +use crate::processors::profile_wrapper::ProcessorProfileWrapper; #[allow(clippy::too_many_arguments)] pub fn build_full_sort_pipeline( @@ -43,7 +43,7 @@ pub fn build_full_sort_pipeline( let transform = TransformSortPartial::try_create(input, output, limit, sort_desc.clone())?; if let Some((plan_id, prof)) = &prof_info { - Ok(ProcessorPtr::create(ProfileWrapper::create( + Ok(ProcessorPtr::create(ProcessorProfileWrapper::create( transform, *plan_id, prof.clone(), @@ -78,7 +78,7 @@ pub fn build_full_sort_pipeline( }; if let Some((plan_id, prof)) = &prof_info { - Ok(ProcessorPtr::create(ProfileWrapper::create( + Ok(ProcessorPtr::create(ProcessorProfileWrapper::create( transform, *plan_id, prof.clone(), diff --git a/src/query/profile/src/proc.rs b/src/query/profile/src/proc.rs index cff15c389264c..e2e474723e148 100644 --- a/src/query/profile/src/proc.rs +++ b/src/query/profile/src/proc.rs @@ -34,6 +34,14 @@ pub type SharedProcessorProfiles = Arc>>; pub struct ProcessorProfile { /// The time spent to process in nanoseconds pub cpu_time: Duration, + /// Row count of the input data + pub input_rows: usize, + /// Byte size of the input data + pub input_bytes: usize, + /// Row count of the output data + pub output_rows: usize, + /// Byte size of the output data + pub output_bytes: usize, } impl std::ops::Add for ProcessorProfile { @@ -42,6 +50,10 @@ impl std::ops::Add for ProcessorProfile { fn add(self, rhs: Self) -> Self::Output { Self { cpu_time: self.cpu_time + rhs.cpu_time, + input_rows: self.input_rows + rhs.input_rows, + input_bytes: self.input_bytes + rhs.input_bytes, + output_rows: self.output_rows + rhs.output_rows, + output_bytes: self.output_bytes + rhs.output_bytes, } } } diff --git a/src/query/profile/src/prof.rs b/src/query/profile/src/prof.rs index 11baf2764ad58..8ef7e0f523641 100644 --- a/src/query/profile/src/prof.rs +++ b/src/query/profile/src/prof.rs @@ -16,6 +16,8 @@ use std::fmt::Display; use std::fmt::Formatter; use std::time::Duration; +use crate::ProcessorProfile; + #[derive(Debug, Clone)] pub struct QueryProfile { /// Query ID of the query profile @@ -45,8 +47,8 @@ pub struct OperatorProfile { /// IDs of the children plan nodes pub children: Vec, - /// The time spent to process data - pub cpu_time: Duration, + /// The execution information of the plan operator + pub execution_info: OperatorExecutionInfo, /// Attribute of the plan operator pub attribute: OperatorAttribute, @@ -95,6 +97,33 @@ impl Display for OperatorType { } } +#[derive(Debug, Clone, Default)] +pub struct OperatorExecutionInfo { + pub process_time: Duration, + pub input_rows: usize, + pub input_bytes: usize, + pub output_rows: usize, + pub output_bytes: usize, +} + +impl From for OperatorExecutionInfo { + fn from(value: ProcessorProfile) -> Self { + (&value).into() + } +} + +impl From<&ProcessorProfile> for OperatorExecutionInfo { + fn from(value: &ProcessorProfile) -> Self { + OperatorExecutionInfo { + process_time: value.cpu_time, + input_rows: value.input_rows, + input_bytes: value.input_bytes, + output_rows: value.output_rows, + output_bytes: value.output_bytes, + } + } +} + #[derive(Debug, Clone)] pub enum OperatorAttribute { Join(JoinAttribute), diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index ed9d1dd66de32..1ff2b1ac908f8 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -42,8 +42,10 @@ use common_pipeline_core::processors::Processor; use common_pipeline_sinks::EmptySink; use common_pipeline_sinks::Sinker; use common_pipeline_sinks::UnionReceiveSink; +use common_pipeline_transforms::processors::profile_wrapper::ProcessorProfileWrapper; +use common_pipeline_transforms::processors::profile_wrapper::TransformProfileWrapper; use common_pipeline_transforms::processors::transforms::build_full_sort_pipeline; -use common_pipeline_transforms::processors::ProfileWrapper; +use common_pipeline_transforms::processors::transforms::Transformer; use common_profile::SharedProcessorProfiles; use common_sql::evaluator::BlockOperator; use common_sql::evaluator::CompoundBlockOperator; @@ -273,7 +275,7 @@ impl PipelineBuilder { self.main_pipeline.add_transform(|input, output| { let transform = TransformRangeJoinLeft::create(input, output, state.clone()); if self.enable_profiling { - Ok(ProcessorPtr::create(ProfileWrapper::create( + Ok(ProcessorPtr::create(ProcessorProfileWrapper::create( transform, range_join.plan_id, self.prof_span_set.clone(), @@ -303,7 +305,7 @@ impl PipelineBuilder { TransformRangeJoinRight::create(state.clone()), ); if self.enable_profiling { - Ok(ProcessorPtr::create(ProfileWrapper::create( + Ok(ProcessorPtr::create(ProcessorProfileWrapper::create( transform, range_join.plan_id, self.prof_span_set.clone(), @@ -356,7 +358,7 @@ impl PipelineBuilder { ); if self.enable_profiling { - Ok(ProcessorPtr::create(ProfileWrapper::create( + Ok(ProcessorPtr::create(ProcessorProfileWrapper::create( transform, hash_join_plan.plan_id, self.prof_span_set.clone(), @@ -483,24 +485,26 @@ impl PipelineBuilder { let num_input_columns = filter.input.output_schema()?.num_fields(); self.main_pipeline.add_transform(|input, output| { - let transform = CompoundBlockOperator::create( - input, - output, - num_input_columns, - self.ctx.get_function_context()?, + let transform = CompoundBlockOperator::new( vec![BlockOperator::Filter { expr: predicate.clone(), }], + self.ctx.get_function_context()?, + num_input_columns, ); if self.enable_profiling { - Ok(ProcessorPtr::create(ProfileWrapper::create( + Ok(ProcessorPtr::create(TransformProfileWrapper::create( transform, + input, + output, filter.plan_id, self.prof_span_set.clone(), ))) } else { - Ok(ProcessorPtr::create(transform)) + Ok(ProcessorPtr::create(Transformer::create( + input, output, transform, + ))) } })?; @@ -553,22 +557,21 @@ impl PipelineBuilder { let num_input_columns = input_schema.num_fields(); self.main_pipeline.add_transform(|input, output| { - let transform = CompoundBlockOperator::create( - input, - output, - num_input_columns, - func_ctx.clone(), - vec![op.clone()], - ); + let transform = + CompoundBlockOperator::new(vec![op.clone()], func_ctx.clone(), num_input_columns); if self.enable_profiling { - Ok(ProcessorPtr::create(ProfileWrapper::create( + Ok(ProcessorPtr::create(TransformProfileWrapper::create( transform, + input, + output, eval_scalar.plan_id, self.prof_span_set.clone(), ))) } else { - Ok(ProcessorPtr::create(transform)) + Ok(ProcessorPtr::create(Transformer::create( + input, output, transform, + ))) } })?; @@ -591,22 +594,21 @@ impl PipelineBuilder { let num_input_columns = project_set.input.output_schema()?.num_fields(); self.main_pipeline.add_transform(|input, output| { - let transform = CompoundBlockOperator::create( - input, - output, - num_input_columns, - func_ctx.clone(), - vec![op.clone()], - ); + let transform = + CompoundBlockOperator::new(vec![op.clone()], func_ctx.clone(), num_input_columns); if self.enable_profiling { - Ok(ProcessorPtr::create(ProfileWrapper::create( + Ok(ProcessorPtr::create(TransformProfileWrapper::create( transform, + input, + output, project_set.plan_id, self.prof_span_set.clone(), ))) } else { - Ok(ProcessorPtr::create(transform)) + Ok(ProcessorPtr::create(Transformer::create( + input, output, transform, + ))) } }) } @@ -678,7 +680,7 @@ impl PipelineBuilder { let transform = PartialSingleStateAggregator::try_create(input, output, ¶ms)?; if self.enable_profiling { - Ok(ProcessorPtr::create(ProfileWrapper::create( + Ok(ProcessorPtr::create(ProcessorProfileWrapper::create( transform, aggregate.plan_id, self.prof_span_set.clone(), @@ -721,7 +723,7 @@ impl PipelineBuilder { }?; if self.enable_profiling { - Ok(ProcessorPtr::create(ProfileWrapper::create( + Ok(ProcessorPtr::create(ProcessorProfileWrapper::create( transform, aggregate.plan_id, self.prof_span_set.clone(), @@ -765,7 +767,7 @@ impl PipelineBuilder { }; if self.enable_profiling { - Ok(ProcessorPtr::create(ProfileWrapper::create( + Ok(ProcessorPtr::create(ProcessorProfileWrapper::create( transform, aggregate.plan_id, self.prof_span_set.clone(), @@ -814,7 +816,7 @@ impl PipelineBuilder { let transform = FinalSingleStateAggregator::try_create(input, output, ¶ms)?; if self.enable_profiling { - Ok(ProcessorPtr::create(ProfileWrapper::create( + Ok(ProcessorPtr::create(ProcessorProfileWrapper::create( transform, aggregate.plan_id, self.prof_span_set.clone(), @@ -1109,7 +1111,7 @@ impl PipelineBuilder { let transform = TransformLimit::try_create(limit.limit, limit.offset, input, output)?; if self.enable_profiling { - Ok(ProcessorPtr::create(ProfileWrapper::create( + Ok(ProcessorPtr::create(ProcessorProfileWrapper::create( transform, limit.plan_id, self.prof_span_set.clone(), @@ -1146,7 +1148,7 @@ impl PipelineBuilder { )?; if self.enable_profiling { - Ok(ProcessorPtr::create(ProfileWrapper::create( + Ok(ProcessorPtr::create(ProcessorProfileWrapper::create( transform, join.plan_id, self.prof_span_set.clone(), @@ -1195,7 +1197,7 @@ impl PipelineBuilder { let transform = UnionReceiveSink::create(Some(tx.clone()), input_port); if self.enable_profiling { - Ok(ProcessorPtr::create(ProfileWrapper::create( + Ok(ProcessorPtr::create(ProcessorProfileWrapper::create( transform, union_plan.plan_id, self.prof_span_set.clone(), @@ -1226,7 +1228,7 @@ impl PipelineBuilder { )?; if self.enable_profiling { - Ok(ProcessorPtr::create(ProfileWrapper::create( + Ok(ProcessorPtr::create(ProcessorProfileWrapper::create( transform, union_all.plan_id, self.prof_span_set.clone(), diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs index 984e3424e41a7..f1991a691c8ea 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_partition_bucket.rs @@ -34,7 +34,7 @@ use common_pipeline_core::processors::processor::Event; use common_pipeline_core::processors::processor::ProcessorPtr; use common_pipeline_core::processors::Processor; use common_pipeline_core::Pipeline; -use common_pipeline_transforms::processors::ProfileWrapper; +use common_pipeline_transforms::processors::profile_wrapper::ProcessorProfileWrapper; use common_profile::SharedProcessorProfiles; use common_storage::DataOperator; use petgraph::matrix_graph::Zero; @@ -452,7 +452,7 @@ pub fn build_partition_bucket, + ctx: FunctionContext, + input_num_columns: usize, + ) -> Self { + let operators = Self::compact_map(operators, input_num_columns); + Self { operators, ctx } + } + pub fn create( input_port: Arc, output_port: Arc, diff --git a/src/query/sql/src/executor/profile.rs b/src/query/sql/src/executor/profile.rs index 5991351c78f45..2debfa3260533 100644 --- a/src/query/sql/src/executor/profile.rs +++ b/src/query/sql/src/executor/profile.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_exception::ErrorCode; use common_exception::Result; use common_functions::BUILTIN_FUNCTIONS; use common_profile::AggregateAttribute; @@ -71,21 +70,19 @@ fn flatten_plan_node_profile( operator_type: OperatorType::TableScan, // We don't record the time spent on table scan for now children: vec![], - cpu_time: Default::default(), + execution_info: Default::default(), attribute: OperatorAttribute::TableScan(TableScanAttribute { qualified_name }), }; plan_node_profs.push(prof); } PhysicalPlan::Filter(filter) => { flatten_plan_node_profile(metadata, &filter.input, profs, plan_node_profs)?; - let proc_prof = profs - .get(&filter.plan_id) - .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let proc_prof = profs.get(&filter.plan_id).copied().unwrap_or_default(); let prof = OperatorProfile { id: filter.plan_id, operator_type: OperatorType::Filter, children: vec![filter.input.get_id()], - cpu_time: proc_prof.cpu_time, + execution_info: proc_prof.into(), attribute: OperatorAttribute::Filter(FilterAttribute { predicate: filter .predicates @@ -98,27 +95,23 @@ fn flatten_plan_node_profile( } PhysicalPlan::Project(project) => { flatten_plan_node_profile(metadata, &project.input, profs, plan_node_profs)?; - let proc_prof = profs - .get(&project.plan_id) - .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let proc_prof = profs.get(&project.plan_id).copied().unwrap_or_default(); let prof = OperatorProfile { id: project.plan_id, operator_type: OperatorType::Project, children: vec![project.input.get_id()], - cpu_time: proc_prof.cpu_time, + execution_info: proc_prof.into(), attribute: OperatorAttribute::Empty, }; plan_node_profs.push(prof); } PhysicalPlan::EvalScalar(eval) => { flatten_plan_node_profile(metadata, &eval.input, profs, plan_node_profs)?; - let proc_prof = profs - .get(&eval.plan_id) - .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let proc_prof = profs.get(&eval.plan_id).copied().unwrap_or_default(); let prof = OperatorProfile { id: eval.plan_id, operator_type: OperatorType::EvalScalar, - cpu_time: proc_prof.cpu_time, + execution_info: proc_prof.into(), children: vec![eval.input.get_id()], attribute: OperatorAttribute::EvalScalar(EvalScalarAttribute { scalars: eval @@ -132,13 +125,11 @@ fn flatten_plan_node_profile( } PhysicalPlan::ProjectSet(project_set) => { flatten_plan_node_profile(metadata, &project_set.input, profs, plan_node_profs)?; - let proc_prof = profs - .get(&project_set.plan_id) - .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let proc_prof = profs.get(&project_set.plan_id).copied().unwrap_or_default(); let prof = OperatorProfile { id: project_set.plan_id, operator_type: OperatorType::ProjectSet, - cpu_time: proc_prof.cpu_time, + execution_info: proc_prof.into(), children: vec![project_set.input.get_id()], attribute: OperatorAttribute::ProjectSet(ProjectSetAttribute { functions: project_set @@ -152,13 +143,11 @@ fn flatten_plan_node_profile( } PhysicalPlan::AggregateExpand(expand) => { flatten_plan_node_profile(metadata, &expand.input, profs, plan_node_profs)?; - let proc_prof = profs - .get(&expand.plan_id) - .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let proc_prof = profs.get(&expand.plan_id).copied().unwrap_or_default(); let prof = OperatorProfile { id: expand.plan_id, operator_type: OperatorType::AggregateExpand, - cpu_time: proc_prof.cpu_time, + execution_info: proc_prof.into(), children: vec![expand.input.get_id()], attribute: OperatorAttribute::AggregateExpand(AggregateExpandAttribute { group_keys: expand @@ -181,13 +170,11 @@ fn flatten_plan_node_profile( } PhysicalPlan::AggregatePartial(agg_partial) => { flatten_plan_node_profile(metadata, &agg_partial.input, profs, plan_node_profs)?; - let proc_prof = profs - .get(&agg_partial.plan_id) - .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let proc_prof = profs.get(&agg_partial.plan_id).copied().unwrap_or_default(); let prof = OperatorProfile { id: agg_partial.plan_id, operator_type: OperatorType::Aggregate, - cpu_time: proc_prof.cpu_time, + execution_info: proc_prof.into(), children: vec![agg_partial.input.get_id()], attribute: OperatorAttribute::Aggregate(AggregateAttribute { group_keys: agg_partial @@ -206,13 +193,11 @@ fn flatten_plan_node_profile( } PhysicalPlan::AggregateFinal(agg_final) => { flatten_plan_node_profile(metadata, &agg_final.input, profs, plan_node_profs)?; - let proc_prof = profs - .get(&agg_final.plan_id) - .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let proc_prof = profs.get(&agg_final.plan_id).copied().unwrap_or_default(); let prof = OperatorProfile { id: agg_final.plan_id, operator_type: OperatorType::Aggregate, - cpu_time: proc_prof.cpu_time, + execution_info: proc_prof.into(), children: vec![agg_final.input.get_id()], attribute: OperatorAttribute::Aggregate(AggregateAttribute { group_keys: agg_final @@ -231,9 +216,7 @@ fn flatten_plan_node_profile( } PhysicalPlan::Window(window) => { flatten_plan_node_profile(metadata, &window.input, profs, plan_node_profs)?; - let proc_prof = profs - .get(&window.plan_id) - .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let proc_prof = profs.get(&window.plan_id).copied().unwrap_or_default(); let partition_by = window .partition_by .iter() @@ -264,7 +247,7 @@ fn flatten_plan_node_profile( id: window.plan_id, operator_type: OperatorType::Window, children: vec![window.input.get_id()], - cpu_time: proc_prof.cpu_time, + execution_info: proc_prof.into(), attribute: OperatorAttribute::Window(WindowAttribute { functions: format!( "{} OVER (PARTITION BY {} ORDER BY {} {})", @@ -276,13 +259,11 @@ fn flatten_plan_node_profile( } PhysicalPlan::Sort(sort) => { flatten_plan_node_profile(metadata, &sort.input, profs, plan_node_profs)?; - let proc_prof = profs - .get(&sort.plan_id) - .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let proc_prof = profs.get(&sort.plan_id).copied().unwrap_or_default(); let prof = OperatorProfile { id: sort.plan_id, operator_type: OperatorType::Sort, - cpu_time: proc_prof.cpu_time, + execution_info: proc_prof.into(), children: vec![sort.input.get_id()], attribute: OperatorAttribute::Sort(SortAttribute { sort_keys: sort @@ -302,13 +283,11 @@ fn flatten_plan_node_profile( } PhysicalPlan::Limit(limit) => { flatten_plan_node_profile(metadata, &limit.input, profs, plan_node_profs)?; - let proc_prof = profs - .get(&limit.plan_id) - .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let proc_prof = profs.get(&limit.plan_id).copied().unwrap_or_default(); let prof = OperatorProfile { id: limit.plan_id, operator_type: OperatorType::Limit, - cpu_time: proc_prof.cpu_time, + execution_info: proc_prof.into(), children: vec![limit.input.get_id()], attribute: OperatorAttribute::Limit(LimitAttribute { limit: limit.limit.unwrap_or_default(), @@ -319,13 +298,11 @@ fn flatten_plan_node_profile( } PhysicalPlan::RowFetch(fetch) => { flatten_plan_node_profile(metadata, &fetch.input, profs, plan_node_profs)?; - let proc_prof = profs - .get(&fetch.plan_id) - .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let proc_prof = profs.get(&fetch.plan_id).copied().unwrap_or_default(); let prof = OperatorProfile { id: fetch.plan_id, operator_type: OperatorType::RowFetch, - cpu_time: proc_prof.cpu_time, + execution_info: proc_prof.into(), children: vec![fetch.input.get_id()], attribute: OperatorAttribute::Empty, }; @@ -334,13 +311,11 @@ fn flatten_plan_node_profile( PhysicalPlan::HashJoin(hash_join) => { flatten_plan_node_profile(metadata, &hash_join.probe, profs, plan_node_profs)?; flatten_plan_node_profile(metadata, &hash_join.build, profs, plan_node_profs)?; - let proc_prof = profs - .get(&hash_join.plan_id) - .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let proc_prof = profs.get(&hash_join.plan_id).copied().unwrap_or_default(); let prof = OperatorProfile { id: hash_join.plan_id, operator_type: OperatorType::Join, - cpu_time: proc_prof.cpu_time, + execution_info: proc_prof.into(), children: vec![hash_join.probe.get_id(), hash_join.build.get_id()], attribute: OperatorAttribute::Join(JoinAttribute { join_type: hash_join.join_type.to_string(), @@ -368,14 +343,12 @@ fn flatten_plan_node_profile( PhysicalPlan::RangeJoin(range_join) => { flatten_plan_node_profile(metadata, &range_join.left, profs, plan_node_profs)?; flatten_plan_node_profile(metadata, &range_join.right, profs, plan_node_profs)?; - let proc_prof = profs - .get(&range_join.plan_id) - .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let proc_prof = profs.get(&range_join.plan_id).copied().unwrap_or_default(); let prof = OperatorProfile { id: range_join.plan_id, operator_type: OperatorType::Join, children: vec![range_join.left.get_id(), range_join.right.get_id()], - cpu_time: proc_prof.cpu_time, + execution_info: proc_prof.into(), attribute: OperatorAttribute::Join(JoinAttribute { join_type: range_join.join_type.to_string(), equi_conditions: range_join @@ -401,13 +374,11 @@ fn flatten_plan_node_profile( } PhysicalPlan::Exchange(exchange) => { flatten_plan_node_profile(metadata, &exchange.input, profs, plan_node_profs)?; - let proc_prof = profs - .get(&exchange.plan_id) - .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let proc_prof = profs.get(&exchange.plan_id).copied().unwrap_or_default(); let prof = OperatorProfile { id: exchange.plan_id, operator_type: OperatorType::Exchange, - cpu_time: proc_prof.cpu_time, + execution_info: proc_prof.into(), children: vec![exchange.input.get_id()], attribute: OperatorAttribute::Exchange(ExchangeAttribute { exchange_mode: match exchange.kind { @@ -423,26 +394,22 @@ fn flatten_plan_node_profile( PhysicalPlan::UnionAll(union) => { flatten_plan_node_profile(metadata, &union.left, profs, plan_node_profs)?; flatten_plan_node_profile(metadata, &union.right, profs, plan_node_profs)?; - let proc_prof = profs - .get(&union.plan_id) - .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let proc_prof = profs.get(&union.plan_id).copied().unwrap_or_default(); let prof = OperatorProfile { id: union.plan_id, operator_type: OperatorType::UnionAll, - cpu_time: proc_prof.cpu_time, + execution_info: proc_prof.into(), children: vec![union.left.get_id(), union.right.get_id()], attribute: OperatorAttribute::Empty, }; plan_node_profs.push(prof); } PhysicalPlan::RuntimeFilterSource(source) => { - let proc_prof = profs - .get(&source.plan_id) - .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let proc_prof = profs.get(&source.plan_id).copied().unwrap_or_default(); let prof = OperatorProfile { id: source.plan_id, operator_type: OperatorType::RuntimeFilter, - cpu_time: proc_prof.cpu_time, + execution_info: proc_prof.into(), children: vec![], attribute: OperatorAttribute::Empty, }; @@ -450,26 +417,22 @@ fn flatten_plan_node_profile( } PhysicalPlan::DistributedInsertSelect(select) => { flatten_plan_node_profile(metadata, &select.input, profs, plan_node_profs)?; - let proc_prof = profs - .get(&select.plan_id) - .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let proc_prof = profs.get(&select.plan_id).copied().unwrap_or_default(); let prof = OperatorProfile { id: select.plan_id, operator_type: OperatorType::Insert, - cpu_time: proc_prof.cpu_time, + execution_info: proc_prof.into(), children: vec![], attribute: OperatorAttribute::Empty, }; plan_node_profs.push(prof); } PhysicalPlan::ExchangeSource(source) => { - let proc_prof = profs - .get(&source.plan_id) - .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let proc_prof = profs.get(&source.plan_id).copied().unwrap_or_default(); let prof = OperatorProfile { id: source.plan_id, operator_type: OperatorType::Exchange, - cpu_time: proc_prof.cpu_time, + execution_info: proc_prof.into(), children: vec![], attribute: OperatorAttribute::Empty, }; @@ -477,13 +440,11 @@ fn flatten_plan_node_profile( } PhysicalPlan::ExchangeSink(sink) => { flatten_plan_node_profile(metadata, &sink.input, profs, plan_node_profs)?; - let proc_prof = profs - .get(&sink.plan_id) - .ok_or_else(|| ErrorCode::Internal("Plan node profile not found"))?; + let proc_prof = profs.get(&sink.plan_id).copied().unwrap_or_default(); let prof = OperatorProfile { id: sink.plan_id, operator_type: OperatorType::Exchange, - cpu_time: proc_prof.cpu_time, + execution_info: proc_prof.into(), children: vec![], attribute: OperatorAttribute::Empty, }; diff --git a/src/query/storages/system/src/query_profile_table.rs b/src/query/storages/system/src/query_profile_table.rs index 67e1925c2e3ec..b08c6ac29eaa8 100644 --- a/src/query/storages/system/src/query_profile_table.rs +++ b/src/query/storages/system/src/query_profile_table.rs @@ -21,7 +21,6 @@ use common_expression::types::ArrayType; use common_expression::types::NumberDataType; use common_expression::types::StringType; use common_expression::types::UInt32Type; -use common_expression::types::UInt64Type; use common_expression::types::ValueType; use common_expression::types::VariantType; use common_expression::DataBlock; @@ -33,6 +32,7 @@ use common_meta_app::schema::TableIdent; use common_meta_app::schema::TableInfo; use common_meta_app::schema::TableMeta; use common_profile::OperatorAttribute; +use common_profile::OperatorExecutionInfo; use common_profile::QueryProfileManager; use crate::SyncOneBlockSystemTable; @@ -87,6 +87,19 @@ fn encode_operator_attribute(attr: &OperatorAttribute) -> jsonb::Value { } } +fn encode_operator_execution_info(info: &OperatorExecutionInfo) -> jsonb::Value { + // Process time represent with number of milliseconds. + let process_time = info.process_time.as_nanos() as f64 / 1e6; + (&serde_json::json!({ + "process_time": process_time, + "input_rows": info.input_rows, + "input_bytes": info.input_bytes, + "output_rows": info.output_rows, + "output_bytes": info.output_bytes, + })) + .into() +} + pub struct QueryProfileTable { table_info: TableInfo, } @@ -101,10 +114,7 @@ impl QueryProfileTable { "operator_children", TableDataType::Array(Box::new(TableDataType::Number(NumberDataType::UInt32))), ), - TableField::new( - "process_time", - TableDataType::Number(NumberDataType::UInt64), - ), + TableField::new("execution_info", TableDataType::Variant), TableField::new("operator_attribute", TableDataType::Variant), ]); @@ -139,7 +149,7 @@ impl SyncSystemTable for QueryProfileTable { let mut operator_ids: Vec = Vec::with_capacity(query_profs.len()); let mut operator_types: Vec> = Vec::with_capacity(query_profs.len()); let mut operator_childrens: Vec> = Vec::with_capacity(query_profs.len()); - let mut process_times: Vec = Vec::with_capacity(query_profs.len()); + let mut execution_infos: Vec> = Vec::with_capacity(query_profs.len()); let mut operator_attributes: Vec> = Vec::with_capacity(query_profs.len()); for prof in query_profs.iter() { @@ -148,7 +158,9 @@ impl SyncSystemTable for QueryProfileTable { operator_ids.push(plan_prof.id); operator_types.push(plan_prof.operator_type.to_string().into_bytes()); operator_childrens.push(plan_prof.children.clone()); - process_times.push(plan_prof.cpu_time.as_nanos() as u64); + + let execution_info = encode_operator_execution_info(&plan_prof.execution_info); + execution_infos.push(execution_info.to_vec()); let attribute_value = encode_operator_attribute(&plan_prof.attribute); operator_attributes.push(attribute_value.to_vec()); @@ -169,8 +181,8 @@ impl SyncSystemTable for QueryProfileTable { .map(|children| UInt32Type::column_from_iter(children.into_iter(), &[])), &[], )), - // process_time - UInt64Type::from_data(process_times), + // execution_info + VariantType::from_data(execution_infos), // operator_attribute VariantType::from_data(operator_attributes), ]);