Skip to content

Commit

Permalink
Add support for EXPLAIN ANALYZE
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Aug 11, 2021
1 parent a4f6282 commit 3ff7ffd
Show file tree
Hide file tree
Showing 22 changed files with 506 additions and 105 deletions.
6 changes: 6 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ message LogicalPlanNode {
RepartitionNode repartition = 9;
EmptyRelationNode empty_relation = 10;
CreateExternalTableNode create_external_table = 11;
AnalyzeNode analyze = 14;
ExplainNode explain = 12;
WindowNode window = 13;
}
Expand Down Expand Up @@ -323,6 +324,11 @@ enum FileType{
CSV = 2;
}

message AnalyzeNode {
LogicalPlanNode input = 1;
bool verbose = 2;
}

message ExplainNode{
LogicalPlanNode input = 1;
bool verbose = 2;
Expand Down
9 changes: 8 additions & 1 deletion ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,17 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
has_header: create_extern_table.has_header,
})
}
LogicalPlanType::Analyze(analyze) => {
let input: LogicalPlan = convert_box_required!(analyze.input)?;
LogicalPlanBuilder::from(input)
.explain(analyze.verbose, true)?
.build()
.map_err(|e| e.into())
}
LogicalPlanType::Explain(explain) => {
let input: LogicalPlan = convert_box_required!(explain.input)?;
LogicalPlanBuilder::from(input)
.explain(explain.verbose)?
.explain(explain.verbose, false)?
.build()
.map_err(|e| e.into())
}
Expand Down
41 changes: 39 additions & 2 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,43 @@ mod roundtrip_tests {
Ok(())
}

#[test]
fn roundtrip_analyze() -> Result<()> {
let schema = Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("first_name", DataType::Utf8, false),
Field::new("last_name", DataType::Utf8, false),
Field::new("state", DataType::Utf8, false),
Field::new("salary", DataType::Int32, false),
]);

let verbose_plan = LogicalPlanBuilder::scan_csv(
"employee.csv",
CsvReadOptions::new().schema(&schema).has_header(true),
Some(vec![3, 4]),
)
.and_then(|plan| plan.sort(vec![col("salary")]))
.and_then(|plan| plan.explain(true, true))
.and_then(|plan| plan.build())
.map_err(BallistaError::DataFusionError)?;

let plan = LogicalPlanBuilder::scan_csv(
"employee.csv",
CsvReadOptions::new().schema(&schema).has_header(true),
Some(vec![3, 4]),
)
.and_then(|plan| plan.sort(vec![col("salary")]))
.and_then(|plan| plan.explain(false, true))
.and_then(|plan| plan.build())
.map_err(BallistaError::DataFusionError)?;

roundtrip_test!(plan);

roundtrip_test!(verbose_plan);

Ok(())
}

#[test]
fn roundtrip_explain() -> Result<()> {
let schema = Schema::new(vec![
Expand All @@ -677,7 +714,7 @@ mod roundtrip_tests {
Some(vec![3, 4]),
)
.and_then(|plan| plan.sort(vec![col("salary")]))
.and_then(|plan| plan.explain(true))
.and_then(|plan| plan.explain(true, false))
.and_then(|plan| plan.build())
.map_err(BallistaError::DataFusionError)?;

Expand All @@ -687,7 +724,7 @@ mod roundtrip_tests {
Some(vec![3, 4]),
)
.and_then(|plan| plan.sort(vec![col("salary")]))
.and_then(|plan| plan.explain(false))
.and_then(|plan| plan.explain(false, false))
.and_then(|plan| plan.build())
.map_err(BallistaError::DataFusionError)?;

Expand Down
11 changes: 11 additions & 0 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,17 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
)),
})
}
LogicalPlan::Analyze { verbose, input, .. } => {
let input: protobuf::LogicalPlanNode = input.as_ref().try_into()?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Analyze(Box::new(
protobuf::AnalyzeNode {
input: Some(Box::new(input)),
verbose: *verbose,
},
))),
})
}
LogicalPlan::Explain { verbose, plan, .. } => {
let input: protobuf::LogicalPlanNode = plan.as_ref().try_into()?;
Ok(protobuf::LogicalPlanNode {
Expand Down
6 changes: 4 additions & 2 deletions datafusion/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,18 +289,20 @@ pub trait DataFrame: Send + Sync {

/// Return a DataFrame with the explanation of its plan so far.
///
/// if `analyze` is specified, runs the plan and reports metrics
///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let batches = df.limit(100)?.explain(false)?.collect().await?;
/// let batches = df.limit(100)?.explain(false, false)?.collect().await?;
/// # Ok(())
/// # }
/// ```
fn explain(&self, verbose: bool) -> Result<Arc<dyn DataFrame>>;
fn explain(&self, verbose: bool, analyze: bool) -> Result<Arc<dyn DataFrame>>;

/// Return a `FunctionRegistry` used to plan udf's calls
///
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,7 @@ mod tests {

let plan = LogicalPlanBuilder::scan_empty(Some("employee"), &schema, None)
.unwrap()
.explain(true)
.explain(true, false)
.unwrap()
.build()
.unwrap();
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ impl DataFrame for DataFrameImpl {
self.plan.schema()
}

fn explain(&self, verbose: bool) -> Result<Arc<dyn DataFrame>> {
fn explain(&self, verbose: bool, analyze: bool) -> Result<Arc<dyn DataFrame>> {
let plan = LogicalPlanBuilder::from(self.to_logical_plan())
.explain(verbose)?
.explain(verbose, analyze)?
.build()?;
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
}
Expand Down Expand Up @@ -318,7 +318,7 @@ mod tests {
let df = df
.select_columns(&["c1", "c2", "c11"])?
.limit(10)?
.explain(false)?;
.explain(false, false)?;
let plan = df.to_logical_plan();

// build query using SQL
Expand Down
36 changes: 25 additions & 11 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,18 +455,32 @@ impl LogicalPlanBuilder {
}

/// Create an expression to represent the explanation of the plan
pub fn explain(&self, verbose: bool) -> Result<Self> {
let stringified_plans =
vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];

///
/// if `analyze` is true, runs the actual plan and produces
/// information about metrics during run.
///
/// if `verbose` is true, prints out additional details.
pub fn explain(&self, verbose: bool, analyze: bool) -> Result<Self> {
let schema = LogicalPlan::explain_schema();

Ok(Self::from(LogicalPlan::Explain {
verbose,
plan: Arc::new(self.plan.clone()),
stringified_plans,
schema: schema.to_dfschema_ref()?,
}))
let schema = schema.to_dfschema_ref()?;

if analyze {
Ok(Self::from(LogicalPlan::Analyze {
verbose,
input: Arc::new(self.plan.clone()),
schema,
}))
} else {
let stringified_plans =
vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];

Ok(Self::from(LogicalPlan::Explain {
verbose,
plan: Arc::new(self.plan.clone()),
stringified_plans,
schema,
}))
}
}

/// Build the plan
Expand Down
16 changes: 16 additions & 0 deletions datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,16 @@ pub enum LogicalPlan {
/// The output schema of the explain (2 columns of text)
schema: DFSchemaRef,
},
/// Runs the actual plan, and then prints the physical plan with
/// with execution metrics.
Analyze {
/// Should extra detail be included?
verbose: bool,
/// The logical plan that is being EXPLAIN ANALYZE'd
input: Arc<LogicalPlan>,
/// The output schema of the explain (2 columns of text)
schema: DFSchemaRef,
},
/// Extension operator defined outside of DataFusion
Extension {
/// The runtime extension operator
Expand All @@ -239,6 +249,7 @@ impl LogicalPlan {
LogicalPlan::Limit { input, .. } => input.schema(),
LogicalPlan::CreateExternalTable { schema, .. } => schema,
LogicalPlan::Explain { schema, .. } => schema,
LogicalPlan::Analyze { schema, .. } => schema,
LogicalPlan::Extension { node } => node.schema(),
LogicalPlan::Union { schema, .. } => schema,
}
Expand Down Expand Up @@ -278,6 +289,7 @@ impl LogicalPlan {
}
LogicalPlan::Extension { node } => vec![node.schema()],
LogicalPlan::Explain { schema, .. }
| LogicalPlan::Analyze { schema, .. }
| LogicalPlan::EmptyRelation { schema, .. }
| LogicalPlan::CreateExternalTable { schema, .. } => vec![schema],
LogicalPlan::Limit { input, .. }
Expand Down Expand Up @@ -327,6 +339,7 @@ impl LogicalPlan {
| LogicalPlan::Limit { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::CrossJoin { .. }
| LogicalPlan::Analyze { .. }
| LogicalPlan::Explain { .. }
| LogicalPlan::Union { .. } => {
vec![]
Expand All @@ -350,6 +363,7 @@ impl LogicalPlan {
LogicalPlan::Extension { node } => node.inputs(),
LogicalPlan::Union { inputs, .. } => inputs.iter().collect(),
LogicalPlan::Explain { plan, .. } => vec![plan],
LogicalPlan::Analyze { input: plan, .. } => vec![plan],
// plans without inputs
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
Expand Down Expand Up @@ -495,6 +509,7 @@ impl LogicalPlan {
true
}
LogicalPlan::Explain { plan, .. } => plan.accept(visitor)?,
LogicalPlan::Analyze { input: plan, .. } => plan.accept(visitor)?,
// plans without inputs
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
Expand Down Expand Up @@ -790,6 +805,7 @@ impl LogicalPlan {
write!(f, "CreateExternalTable: {:?}", name)
}
LogicalPlan::Explain { .. } => write!(f, "Explain"),
LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
LogicalPlan::Union { .. } => write!(f, "Union"),
LogicalPlan::Extension { ref node } => node.fmt_for_explain(f),
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/optimizer/constant_folding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ impl OptimizerRule for ConstantFolding {
| LogicalPlan::Extension { .. }
| LogicalPlan::Sort { .. }
| LogicalPlan::Explain { .. }
| LogicalPlan::Analyze { .. }
| LogicalPlan::Limit { .. }
| LogicalPlan::Union { .. }
| LogicalPlan::Join { .. }
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/optimizer/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
// push the optimization to the plan of this explain
push_down(&state, plan)
}
LogicalPlan::Analyze { .. } => push_down(&state, plan),
LogicalPlan::Filter { input, predicate } => {
let mut predicates = vec![];
split_members(predicate, &mut predicates);
Expand Down
6 changes: 6 additions & 0 deletions datafusion/src/optimizer/hash_build_probe_order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {
// we cannot predict how rows will be repartitioned
None
}
LogicalPlan::Analyze { .. } => {
// Analyze produces one row, verbose produces more
// but it should never be used as an input to a Join anyways
None
}
// the following operators are special cases and not querying data
LogicalPlan::CreateExternalTable { .. } => None,
LogicalPlan::Explain { .. } => None,
Expand Down Expand Up @@ -201,6 +206,7 @@ impl OptimizerRule for HashBuildProbeOrder {
| LogicalPlan::Sort { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::Explain { .. }
| LogicalPlan::Analyze { .. }
| LogicalPlan::Union { .. }
| LogicalPlan::Extension { .. } => {
let expr = plan.expressions();
Expand Down
25 changes: 25 additions & 0 deletions datafusion/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,31 @@ fn optimize_plan(
LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
"Unsupported logical plan: Explain must be root of the plan".to_string(),
)),
LogicalPlan::Analyze {
input,
verbose,
schema,
} => {
// make sure we keep all the columns from the input plan
let required_columns = input
.schema()
.fields()
.iter()
.map(|f| f.qualified_column())
.collect::<HashSet<Column>>();

Ok(LogicalPlan::Analyze {
input: Arc::new(optimize_plan(
optimizer,
input,
&required_columns,
false,
execution_props,
)?),
verbose: *verbose,
schema: schema.clone(),
})
}
LogicalPlan::Union {
inputs,
schema,
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ pub fn from_plan(
LogicalPlan::EmptyRelation { .. }
| LogicalPlan::TableScan { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::Explain { .. } => Ok(plan.clone()),
| LogicalPlan::Explain { .. }
| LogicalPlan::Analyze { .. } => Ok(plan.clone()),
}
}

Expand Down
Loading

0 comments on commit 3ff7ffd

Please sign in to comment.