Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
320 changes: 167 additions & 153 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ use datafusion_expr::expr::{
use datafusion_expr::expr_rewriter::unnormalize_cols;
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
use datafusion_expr::{
DescribeTable, DmlStatement, Extension, FetchType, Filter, JoinType, RecursiveQuery,
SkipType, SortExpr, StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp,
Analyze, DescribeTable, DmlStatement, Explain, Extension, FetchType, Filter,
JoinType, RecursiveQuery, SkipType, SortExpr, StringifiedPlan, WindowFrame,
WindowFrameBound, WriteOp,
};
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::expressions::Literal;
Expand Down Expand Up @@ -177,16 +178,17 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
logical_plan: &LogicalPlan,
session_state: &SessionState,
) -> Result<Arc<dyn ExecutionPlan>> {
match self.handle_explain(logical_plan, session_state).await? {
Some(plan) => Ok(plan),
None => {
let plan = self
.create_initial_plan(logical_plan, session_state)
.await?;

self.optimize_physical_plan(plan, session_state, |_, _| {})
}
if let Some(plan) = self
.handle_explain_or_analyze(logical_plan, session_state)
.await?
{
return Ok(plan);
}
let plan = self
.create_initial_plan(logical_plan, session_state)
.await?;

self.optimize_physical_plan(plan, session_state, |_, _| {})
}

/// Create a physical expression from a logical expression
Expand Down Expand Up @@ -1715,167 +1717,179 @@ impl DefaultPhysicalPlanner {
/// Returns
/// Some(plan) if optimized, and None if logical_plan was not an
/// explain (and thus needs to be optimized as normal)
async fn handle_explain(
async fn handle_explain_or_analyze(
&self,
logical_plan: &LogicalPlan,
session_state: &SessionState,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if let LogicalPlan::Explain(e) = logical_plan {
use PlanType::*;
let mut stringified_plans = vec![];
let execution_plan = match logical_plan {
LogicalPlan::Explain(e) => self.handle_explain(e, session_state).await?,
LogicalPlan::Analyze(a) => self.handle_analyze(a, session_state).await?,
_ => return Ok(None),
};
Ok(Some(execution_plan))
}

/// Planner for `LogicalPlan::Explain`
async fn handle_explain(
&self,
e: &Explain,
session_state: &SessionState,
) -> Result<Arc<dyn ExecutionPlan>> {
use PlanType::*;
let mut stringified_plans = vec![];

let config = &session_state.config_options().explain;
let explain_format = DisplayFormatType::from_str(&config.format)?;
let config = &session_state.config_options().explain;
let explain_format = DisplayFormatType::from_str(&config.format)?;

let skip_logical_plan = config.physical_plan_only
|| explain_format == DisplayFormatType::TreeRender;
let skip_logical_plan =
config.physical_plan_only || explain_format == DisplayFormatType::TreeRender;

if !skip_logical_plan {
stringified_plans.clone_from(&e.stringified_plans);
if e.logical_optimization_succeeded {
stringified_plans.push(e.plan.to_stringified(FinalLogicalPlan));
}
if !skip_logical_plan {
stringified_plans.clone_from(&e.stringified_plans);
if e.logical_optimization_succeeded {
stringified_plans.push(e.plan.to_stringified(FinalLogicalPlan));
}
}

if !config.logical_plan_only && e.logical_optimization_succeeded {
match self
.create_initial_plan(e.plan.as_ref(), session_state)
.await
{
Ok(input) => {
// Include statistics / schema if enabled
stringified_plans.push(
displayable(input.as_ref())
.set_show_statistics(config.show_statistics)
.set_show_schema(config.show_schema)
.to_stringified(
e.verbose,
InitialPhysicalPlan,
explain_format,
),
);
if !config.logical_plan_only && e.logical_optimization_succeeded {
match self
.create_initial_plan(e.plan.as_ref(), session_state)
.await
{
Ok(input) => {
// Include statistics / schema if enabled
stringified_plans.push(
displayable(input.as_ref())
.set_show_statistics(config.show_statistics)
.set_show_schema(config.show_schema)
.to_stringified(
e.verbose,
InitialPhysicalPlan,
explain_format,
),
);

// Show statistics + schema in verbose output even if not
// explicitly requested
if e.verbose {
if !config.show_statistics {
stringified_plans.push(
displayable(input.as_ref())
.set_show_statistics(true)
.to_stringified(
e.verbose,
InitialPhysicalPlanWithStats,
explain_format,
),
);
}
if !config.show_schema {
stringified_plans.push(
displayable(input.as_ref())
.set_show_schema(true)
.to_stringified(
e.verbose,
InitialPhysicalPlanWithSchema,
explain_format,
),
);
}
// Show statistics + schema in verbose output even if not
// explicitly requested
if e.verbose {
if !config.show_statistics {
stringified_plans.push(
displayable(input.as_ref())
.set_show_statistics(true)
.to_stringified(
e.verbose,
InitialPhysicalPlanWithStats,
explain_format,
),
);
}
if !config.show_schema {
stringified_plans.push(
displayable(input.as_ref())
.set_show_schema(true)
.to_stringified(
e.verbose,
InitialPhysicalPlanWithSchema,
explain_format,
),
);
}
}

let optimized_plan = self.optimize_physical_plan(
input,
session_state,
|plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
let plan_type = OptimizedPhysicalPlan { optimizer_name };
stringified_plans.push(
displayable(plan)
.set_show_statistics(config.show_statistics)
.set_show_schema(config.show_schema)
.to_stringified(
e.verbose,
plan_type,
explain_format,
),
);
},
);
match optimized_plan {
Ok(input) => {
// This plan will includes statistics if show_statistics is on
stringified_plans.push(
displayable(input.as_ref())
.set_show_statistics(config.show_statistics)
.set_show_schema(config.show_schema)
.to_stringified(
e.verbose,
FinalPhysicalPlan,
explain_format,
),
);

// Show statistics + schema in verbose output even if not
// explicitly requested
if e.verbose {
if !config.show_statistics {
stringified_plans.push(
displayable(input.as_ref())
.set_show_statistics(true)
.to_stringified(
e.verbose,
FinalPhysicalPlanWithStats,
explain_format,
),
);
}
if !config.show_schema {
stringified_plans.push(
displayable(input.as_ref())
.set_show_schema(true)
.to_stringified(
e.verbose,
FinalPhysicalPlanWithSchema,
explain_format,
),
);
}
let optimized_plan = self.optimize_physical_plan(
input,
session_state,
|plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
let plan_type = OptimizedPhysicalPlan { optimizer_name };
stringified_plans.push(
displayable(plan)
.set_show_statistics(config.show_statistics)
.set_show_schema(config.show_schema)
.to_stringified(e.verbose, plan_type, explain_format),
);
},
);
match optimized_plan {
Ok(input) => {
// This plan will includes statistics if show_statistics is on
stringified_plans.push(
displayable(input.as_ref())
.set_show_statistics(config.show_statistics)
.set_show_schema(config.show_schema)
.to_stringified(
e.verbose,
FinalPhysicalPlan,
explain_format,
),
);

// Show statistics + schema in verbose output even if not
// explicitly requested
if e.verbose {
if !config.show_statistics {
stringified_plans.push(
displayable(input.as_ref())
.set_show_statistics(true)
.to_stringified(
e.verbose,
FinalPhysicalPlanWithStats,
explain_format,
),
);
}
if !config.show_schema {
stringified_plans.push(
displayable(input.as_ref())
.set_show_schema(true)
.to_stringified(
e.verbose,
FinalPhysicalPlanWithSchema,
explain_format,
),
);
}
}
Err(DataFusionError::Context(optimizer_name, e)) => {
let plan_type = OptimizedPhysicalPlan { optimizer_name };
stringified_plans
.push(StringifiedPlan::new(plan_type, e.to_string()))
}
Err(e) => return Err(e),
}
}
Err(err) => {
stringified_plans.push(StringifiedPlan::new(
PhysicalPlanError,
err.strip_backtrace(),
));
Err(DataFusionError::Context(optimizer_name, e)) => {
let plan_type = OptimizedPhysicalPlan { optimizer_name };
stringified_plans
.push(StringifiedPlan::new(plan_type, e.to_string()))
}
Err(e) => return Err(e),
}
}
Err(err) => {
stringified_plans.push(StringifiedPlan::new(
PhysicalPlanError,
err.strip_backtrace(),
));
}
}

Ok(Some(Arc::new(ExplainExec::new(
SchemaRef::new(e.schema.as_ref().to_owned().into()),
stringified_plans,
e.verbose,
))))
} else if let LogicalPlan::Analyze(a) = logical_plan {
let input = self.create_physical_plan(&a.input, session_state).await?;
let schema = SchemaRef::new((*a.schema).clone().into());
let show_statistics = session_state.config_options().explain.show_statistics;
Ok(Some(Arc::new(AnalyzeExec::new(
a.verbose,
show_statistics,
input,
schema,
))))
} else {
Ok(None)
}

Ok(Arc::new(ExplainExec::new(
Arc::clone(e.schema.inner()),
stringified_plans,
e.verbose,
)))
}

async fn handle_analyze(
&self,
a: &Analyze,
session_state: &SessionState,
) -> Result<Arc<dyn ExecutionPlan>> {
let input = self.create_physical_plan(&a.input, session_state).await?;
let schema = SchemaRef::new((*a.schema).clone().into());
let show_statistics = session_state.config_options().explain.show_statistics;
Ok(Arc::new(AnalyzeExec::new(
a.verbose,
show_statistics,
input,
schema,
)))
}

/// Optimize a physical plan by applying each physical optimizer,
Expand Down