Skip to content

Commit

Permalink
enable explain for ballista (#2163)
Browse files Browse the repository at this point in the history
* explain

* fmt
  • Loading branch information
doki23 authored Apr 7, 2022
1 parent 0da1f37 commit 6504d2a
Show file tree
Hide file tree
Showing 9 changed files with 225 additions and 58 deletions.
9 changes: 8 additions & 1 deletion ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ message AnalyzeNode {
bool verbose = 2;
}

message ExplainNode{
message ExplainNode {
LogicalPlanNode input = 1;
bool verbose = 2;
}
Expand Down Expand Up @@ -269,6 +269,7 @@ message PhysicalPlanNode {
AvroScanExecNode avro_scan = 20;
PhysicalExtensionNode extension = 21;
UnionExecNode union = 22;
ExplainExecNode explain = 23;
}
}

Expand Down Expand Up @@ -461,6 +462,12 @@ message UnionExecNode {
repeated PhysicalPlanNode inputs = 1;
}

message ExplainExecNode {
datafusion.Schema schema = 1;
repeated datafusion.StringifiedPlan stringified_plans = 2;
bool verbose = 3;
}

message CrossJoinExecNode {
PhysicalPlanNode left = 1;
PhysicalPlanNode right = 2;
Expand Down
24 changes: 24 additions & 0 deletions ballista/rust/core/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -515,3 +515,27 @@ message ArrowType{
// }
//}
message EmptyMessage{}

message OptimizedLogicalPlanType {
string optimizer_name = 1;
}

message OptimizedPhysicalPlanType {
string optimizer_name = 1;
}

message PlanType {
oneof plan_type_enum {
EmptyMessage InitialLogicalPlan = 1;
OptimizedLogicalPlanType OptimizedLogicalPlan = 2;
EmptyMessage FinalLogicalPlan = 3;
EmptyMessage InitialPhysicalPlan = 4;
OptimizedPhysicalPlanType OptimizedPhysicalPlan = 5;
EmptyMessage FinalPhysicalPlan = 6;
}
}

message StringifiedPlan {
PlanType plan_type = 1;
string plan = 2;
}
77 changes: 52 additions & 25 deletions ballista/rust/core/src/serde/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,12 @@
// specific language governing permissions and limitations
// under the License.

use crate::error::BallistaError;
use crate::execution_plans::{
ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec,
};
use crate::serde::physical_plan::from_proto::{
parse_physical_expr, parse_protobuf_hash_partitioning,
};
use crate::serde::protobuf::physical_expr_node::ExprType;
use crate::serde::protobuf::physical_plan_node::PhysicalPlanType;
use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
use std::convert::TryInto;
use std::sync::Arc;

use prost::bytes::BufMut;
use prost::Message;

use crate::serde::protobuf::{PhysicalExtensionNode, PhysicalPlanNode};
use crate::serde::scheduler::PartitionLocation;
use crate::serde::{
byte_to_string, proto_error, protobuf, str_to_byte, AsExecutionPlan,
PhysicalExtensionCodec,
};
use crate::{convert_required, into_physical_plan, into_required};
use datafusion::arrow::compute::SortOptions;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::datafusion_data_access::object_store::local::LocalFileSystem;
Expand All @@ -45,6 +33,7 @@ use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::cross_join::CrossJoinExec;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::explain::ExplainExec;
use datafusion::physical_plan::expressions::{Column, PhysicalSortExpr};
use datafusion::physical_plan::file_format::{
AvroExec, CsvExec, FileScanConfig, ParquetExec,
Expand All @@ -62,10 +51,24 @@ use datafusion::physical_plan::{
AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, WindowExpr,
};
use datafusion_proto::from_proto::parse_expr;
use prost::bytes::BufMut;
use prost::Message;
use std::convert::TryInto;
use std::sync::Arc;

use crate::error::BallistaError;
use crate::execution_plans::{
ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec,
};
use crate::serde::physical_plan::from_proto::{
parse_physical_expr, parse_protobuf_hash_partitioning,
};
use crate::serde::protobuf::physical_expr_node::ExprType;
use crate::serde::protobuf::physical_plan_node::PhysicalPlanType;
use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
use crate::serde::protobuf::{PhysicalExtensionNode, PhysicalPlanNode};
use crate::serde::scheduler::PartitionLocation;
use crate::serde::{
byte_to_string, proto_error, protobuf, str_to_byte, AsExecutionPlan,
PhysicalExtensionCodec,
};
use crate::{convert_required, into_physical_plan, into_required};

pub mod from_proto;
pub mod to_proto;
Expand Down Expand Up @@ -103,6 +106,15 @@ impl AsExecutionPlan for PhysicalPlanNode {
))
})?;
match plan {
PhysicalPlanType::Explain(explain) => Ok(Arc::new(ExplainExec::new(
Arc::new(explain.schema.as_ref().unwrap().try_into()?),
explain
.stringified_plans
.iter()
.map(|plan| plan.into())
.collect(),
explain.verbose,
))),
PhysicalPlanType::Projection(projection) => {
let input: Arc<dyn ExecutionPlan> = into_physical_plan!(
projection.input,
Expand Down Expand Up @@ -587,7 +599,21 @@ impl AsExecutionPlan for PhysicalPlanNode {
let plan_clone = plan.clone();
let plan = plan.as_any();

if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Explain(
protobuf::ExplainExecNode {
schema: Some(exec.schema().as_ref().into()),
stringified_plans: exec
.stringified_plans()
.iter()
.map(|plan| plan.into())
.collect(),
verbose: exec.verbose(),
},
)),
})
} else if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.input().to_owned(),
extension_codec,
Expand Down Expand Up @@ -1038,7 +1064,6 @@ mod roundtrip_tests {
use std::ops::Deref;
use std::sync::Arc;

use crate::serde::{AsExecutionPlan, BallistaCodec};
use datafusion::arrow::array::ArrayRef;
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_plan::create_udf;
Expand Down Expand Up @@ -1071,10 +1096,12 @@ mod roundtrip_tests {
scalar::ScalarValue,
};

use super::super::super::error::Result;
use super::super::protobuf;
use crate::execution_plans::ShuffleWriterExec;
use crate::serde::protobuf::{LogicalPlanNode, PhysicalPlanNode};
use crate::serde::{AsExecutionPlan, BallistaCodec};

use super::super::super::error::Result;
use super::super::protobuf;

fn roundtrip_test(exec_plan: Arc<dyn ExecutionPlan>) -> Result<()> {
let ctx = SessionContext::new();
Expand Down
18 changes: 9 additions & 9 deletions datafusion/core/src/optimizer/limit_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ impl LimitPushDown {
}

fn limit_push_down(
optimizer: &LimitPushDown,
_optimizer: &LimitPushDown,
upper_limit: Option<usize>,
plan: &LogicalPlan,
execution_props: &ExecutionProps,
_execution_props: &ExecutionProps,
) -> Result<LogicalPlan> {
match (plan, upper_limit) {
(LogicalPlan::Limit(Limit { n, input }), upper_limit) => {
Expand All @@ -51,10 +51,10 @@ fn limit_push_down(
n: smallest,
// push down limit to plan (minimum of upper limit and current limit)
input: Arc::new(limit_push_down(
optimizer,
_optimizer,
Some(smallest),
input.as_ref(),
execution_props,
_execution_props,
)?),
}))
}
Expand Down Expand Up @@ -91,10 +91,10 @@ fn limit_push_down(
Ok(LogicalPlan::Projection(Projection {
expr: expr.clone(),
input: Arc::new(limit_push_down(
optimizer,
_optimizer,
upper_limit,
input.as_ref(),
execution_props,
_execution_props,
)?),
schema: schema.clone(),
alias: alias.clone(),
Expand All @@ -115,10 +115,10 @@ fn limit_push_down(
Ok(LogicalPlan::Limit(Limit {
n: upper_limit,
input: Arc::new(limit_push_down(
optimizer,
_optimizer,
Some(upper_limit),
x,
execution_props,
_execution_props,
)?),
}))
})
Expand All @@ -138,7 +138,7 @@ fn limit_push_down(
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
.map(|plan| limit_push_down(optimizer, None, plan, execution_props))
.map(|plan| limit_push_down(_optimizer, None, plan, _execution_props))
.collect::<Result<Vec<_>>>()?;

utils::from_plan(plan, &expr, &new_inputs)
Expand Down
36 changes: 18 additions & 18 deletions datafusion/core/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ fn get_projected_schema(

/// Recursively transverses the logical plan removing expressions and that are not needed.
fn optimize_plan(
optimizer: &ProjectionPushDown,
_optimizer: &ProjectionPushDown,
plan: &LogicalPlan,
required_columns: &HashSet<Column>, // set of columns required up to this step
has_projection: bool,
execution_props: &ExecutionProps,
_execution_props: &ExecutionProps,
) -> Result<LogicalPlan> {
let mut new_required_columns = required_columns.clone();
match plan {
Expand Down Expand Up @@ -165,11 +165,11 @@ fn optimize_plan(
})?;

let new_input = optimize_plan(
optimizer,
_optimizer,
input,
&new_required_columns,
true,
execution_props,
_execution_props,
)?;

let new_required_columns_optimized = new_input
Expand Down Expand Up @@ -211,19 +211,19 @@ fn optimize_plan(
}

let optimized_left = Arc::new(optimize_plan(
optimizer,
_optimizer,
left,
&new_required_columns,
true,
execution_props,
_execution_props,
)?);

let optimized_right = Arc::new(optimize_plan(
optimizer,
_optimizer,
right,
&new_required_columns,
true,
execution_props,
_execution_props,
)?);

let schema = build_join_schema(
Expand Down Expand Up @@ -272,11 +272,11 @@ fn optimize_plan(
)?;

LogicalPlanBuilder::from(optimize_plan(
optimizer,
_optimizer,
input,
&new_required_columns,
true,
execution_props,
_execution_props,
)?)
.window(new_window_expr)?
.build()
Expand Down Expand Up @@ -324,11 +324,11 @@ fn optimize_plan(
group_expr: group_expr.clone(),
aggr_expr: new_aggr_expr,
input: Arc::new(optimize_plan(
optimizer,
_optimizer,
input,
&new_required_columns,
true,
execution_props,
_execution_props,
)?),
schema: DFSchemaRef::new(new_schema),
}))
Expand Down Expand Up @@ -373,11 +373,11 @@ fn optimize_plan(

Ok(LogicalPlan::Analyze(Analyze {
input: Arc::new(optimize_plan(
optimizer,
_optimizer,
&a.input,
&required_columns,
false,
execution_props,
_execution_props,
)?),
verbose: a.verbose,
schema: a.schema.clone(),
Expand Down Expand Up @@ -409,11 +409,11 @@ fn optimize_plan(
new_required_columns.insert(f.qualified_column());
});
optimize_plan(
optimizer,
_optimizer,
input_plan,
&new_required_columns,
has_projection,
execution_props,
_execution_props,
)
})
.collect::<Result<Vec<_>>>()?;
Expand Down Expand Up @@ -457,11 +457,11 @@ fn optimize_plan(
.iter()
.map(|input_plan| {
optimize_plan(
optimizer,
_optimizer,
input_plan,
&new_required_columns,
has_projection,
execution_props,
_execution_props,
)
})
.collect::<Result<Vec<_>>>()?;
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/physical_plan/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ impl ExplainExec {
pub fn stringified_plans(&self) -> &[StringifiedPlan] {
&self.stringified_plans
}

/// access to verbose
pub fn verbose(&self) -> bool {
self.verbose
}
}

#[async_trait]
Expand Down
Loading

0 comments on commit 6504d2a

Please sign in to comment.