diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index f200fb0e0f71..6aff9280ffad 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -78,8 +78,9 @@ use datafusion_expr::expr::{ use datafusion_expr::expr_rewriter::unnormalize_cols; use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary; use datafusion_expr::{ - DescribeTable, DmlStatement, Extension, FetchType, Filter, JoinType, RecursiveQuery, - SkipType, SortExpr, StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp, + Analyze, DescribeTable, DmlStatement, Explain, Extension, FetchType, Filter, + JoinType, RecursiveQuery, SkipType, SortExpr, StringifiedPlan, WindowFrame, + WindowFrameBound, WriteOp, }; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use datafusion_physical_expr::expressions::Literal; @@ -177,16 +178,17 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { logical_plan: &LogicalPlan, session_state: &SessionState, ) -> Result> { - match self.handle_explain(logical_plan, session_state).await? { - Some(plan) => Ok(plan), - None => { - let plan = self - .create_initial_plan(logical_plan, session_state) - .await?; - - self.optimize_physical_plan(plan, session_state, |_, _| {}) - } + if let Some(plan) = self + .handle_explain_or_analyze(logical_plan, session_state) + .await? + { + return Ok(plan); } + let plan = self + .create_initial_plan(logical_plan, session_state) + .await?; + + self.optimize_physical_plan(plan, session_state, |_, _| {}) } /// Create a physical expression from a logical expression @@ -1715,167 +1717,179 @@ impl DefaultPhysicalPlanner { /// Returns /// Some(plan) if optimized, and None if logical_plan was not an /// explain (and thus needs to be optimized as normal) - async fn handle_explain( + async fn handle_explain_or_analyze( &self, logical_plan: &LogicalPlan, session_state: &SessionState, ) -> Result>> { - if let LogicalPlan::Explain(e) = logical_plan { - use PlanType::*; - let mut stringified_plans = vec![]; + let execution_plan = match logical_plan { + LogicalPlan::Explain(e) => self.handle_explain(e, session_state).await?, + LogicalPlan::Analyze(a) => self.handle_analyze(a, session_state).await?, + _ => return Ok(None), + }; + Ok(Some(execution_plan)) + } + + /// Planner for `LogicalPlan::Explain` + async fn handle_explain( + &self, + e: &Explain, + session_state: &SessionState, + ) -> Result> { + use PlanType::*; + let mut stringified_plans = vec![]; - let config = &session_state.config_options().explain; - let explain_format = DisplayFormatType::from_str(&config.format)?; + let config = &session_state.config_options().explain; + let explain_format = DisplayFormatType::from_str(&config.format)?; - let skip_logical_plan = config.physical_plan_only - || explain_format == DisplayFormatType::TreeRender; + let skip_logical_plan = + config.physical_plan_only || explain_format == DisplayFormatType::TreeRender; - if !skip_logical_plan { - stringified_plans.clone_from(&e.stringified_plans); - if e.logical_optimization_succeeded { - stringified_plans.push(e.plan.to_stringified(FinalLogicalPlan)); - } + if !skip_logical_plan { + stringified_plans.clone_from(&e.stringified_plans); + if e.logical_optimization_succeeded { + stringified_plans.push(e.plan.to_stringified(FinalLogicalPlan)); } + } - if !config.logical_plan_only && e.logical_optimization_succeeded { - match self - .create_initial_plan(e.plan.as_ref(), session_state) - .await - { - Ok(input) => { - // Include statistics / schema if enabled - stringified_plans.push( - displayable(input.as_ref()) - .set_show_statistics(config.show_statistics) - .set_show_schema(config.show_schema) - .to_stringified( - e.verbose, - InitialPhysicalPlan, - explain_format, - ), - ); + if !config.logical_plan_only && e.logical_optimization_succeeded { + match self + .create_initial_plan(e.plan.as_ref(), session_state) + .await + { + Ok(input) => { + // Include statistics / schema if enabled + stringified_plans.push( + displayable(input.as_ref()) + .set_show_statistics(config.show_statistics) + .set_show_schema(config.show_schema) + .to_stringified( + e.verbose, + InitialPhysicalPlan, + explain_format, + ), + ); - // Show statistics + schema in verbose output even if not - // explicitly requested - if e.verbose { - if !config.show_statistics { - stringified_plans.push( - displayable(input.as_ref()) - .set_show_statistics(true) - .to_stringified( - e.verbose, - InitialPhysicalPlanWithStats, - explain_format, - ), - ); - } - if !config.show_schema { - stringified_plans.push( - displayable(input.as_ref()) - .set_show_schema(true) - .to_stringified( - e.verbose, - InitialPhysicalPlanWithSchema, - explain_format, - ), - ); - } + // Show statistics + schema in verbose output even if not + // explicitly requested + if e.verbose { + if !config.show_statistics { + stringified_plans.push( + displayable(input.as_ref()) + .set_show_statistics(true) + .to_stringified( + e.verbose, + InitialPhysicalPlanWithStats, + explain_format, + ), + ); } + if !config.show_schema { + stringified_plans.push( + displayable(input.as_ref()) + .set_show_schema(true) + .to_stringified( + e.verbose, + InitialPhysicalPlanWithSchema, + explain_format, + ), + ); + } + } - let optimized_plan = self.optimize_physical_plan( - input, - session_state, - |plan, optimizer| { - let optimizer_name = optimizer.name().to_string(); - let plan_type = OptimizedPhysicalPlan { optimizer_name }; - stringified_plans.push( - displayable(plan) - .set_show_statistics(config.show_statistics) - .set_show_schema(config.show_schema) - .to_stringified( - e.verbose, - plan_type, - explain_format, - ), - ); - }, - ); - match optimized_plan { - Ok(input) => { - // This plan will includes statistics if show_statistics is on - stringified_plans.push( - displayable(input.as_ref()) - .set_show_statistics(config.show_statistics) - .set_show_schema(config.show_schema) - .to_stringified( - e.verbose, - FinalPhysicalPlan, - explain_format, - ), - ); - - // Show statistics + schema in verbose output even if not - // explicitly requested - if e.verbose { - if !config.show_statistics { - stringified_plans.push( - displayable(input.as_ref()) - .set_show_statistics(true) - .to_stringified( - e.verbose, - FinalPhysicalPlanWithStats, - explain_format, - ), - ); - } - if !config.show_schema { - stringified_plans.push( - displayable(input.as_ref()) - .set_show_schema(true) - .to_stringified( - e.verbose, - FinalPhysicalPlanWithSchema, - explain_format, - ), - ); - } + let optimized_plan = self.optimize_physical_plan( + input, + session_state, + |plan, optimizer| { + let optimizer_name = optimizer.name().to_string(); + let plan_type = OptimizedPhysicalPlan { optimizer_name }; + stringified_plans.push( + displayable(plan) + .set_show_statistics(config.show_statistics) + .set_show_schema(config.show_schema) + .to_stringified(e.verbose, plan_type, explain_format), + ); + }, + ); + match optimized_plan { + Ok(input) => { + // This plan will includes statistics if show_statistics is on + stringified_plans.push( + displayable(input.as_ref()) + .set_show_statistics(config.show_statistics) + .set_show_schema(config.show_schema) + .to_stringified( + e.verbose, + FinalPhysicalPlan, + explain_format, + ), + ); + + // Show statistics + schema in verbose output even if not + // explicitly requested + if e.verbose { + if !config.show_statistics { + stringified_plans.push( + displayable(input.as_ref()) + .set_show_statistics(true) + .to_stringified( + e.verbose, + FinalPhysicalPlanWithStats, + explain_format, + ), + ); + } + if !config.show_schema { + stringified_plans.push( + displayable(input.as_ref()) + .set_show_schema(true) + .to_stringified( + e.verbose, + FinalPhysicalPlanWithSchema, + explain_format, + ), + ); } } - Err(DataFusionError::Context(optimizer_name, e)) => { - let plan_type = OptimizedPhysicalPlan { optimizer_name }; - stringified_plans - .push(StringifiedPlan::new(plan_type, e.to_string())) - } - Err(e) => return Err(e), } - } - Err(err) => { - stringified_plans.push(StringifiedPlan::new( - PhysicalPlanError, - err.strip_backtrace(), - )); + Err(DataFusionError::Context(optimizer_name, e)) => { + let plan_type = OptimizedPhysicalPlan { optimizer_name }; + stringified_plans + .push(StringifiedPlan::new(plan_type, e.to_string())) + } + Err(e) => return Err(e), } } + Err(err) => { + stringified_plans.push(StringifiedPlan::new( + PhysicalPlanError, + err.strip_backtrace(), + )); + } } - - Ok(Some(Arc::new(ExplainExec::new( - SchemaRef::new(e.schema.as_ref().to_owned().into()), - stringified_plans, - e.verbose, - )))) - } else if let LogicalPlan::Analyze(a) = logical_plan { - let input = self.create_physical_plan(&a.input, session_state).await?; - let schema = SchemaRef::new((*a.schema).clone().into()); - let show_statistics = session_state.config_options().explain.show_statistics; - Ok(Some(Arc::new(AnalyzeExec::new( - a.verbose, - show_statistics, - input, - schema, - )))) - } else { - Ok(None) } + + Ok(Arc::new(ExplainExec::new( + Arc::clone(e.schema.inner()), + stringified_plans, + e.verbose, + ))) + } + + async fn handle_analyze( + &self, + a: &Analyze, + session_state: &SessionState, + ) -> Result> { + let input = self.create_physical_plan(&a.input, session_state).await?; + let schema = SchemaRef::new((*a.schema).clone().into()); + let show_statistics = session_state.config_options().explain.show_statistics; + Ok(Arc::new(AnalyzeExec::new( + a.verbose, + show_statistics, + input, + schema, + ))) } /// Optimize a physical plan by applying each physical optimizer,