Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable explain for ballista #2163

Merged
merged 3 commits into from
Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ message AnalyzeNode {
bool verbose = 2;
}

message ExplainNode{
message ExplainNode {
LogicalPlanNode input = 1;
bool verbose = 2;
}
Expand Down Expand Up @@ -269,6 +269,7 @@ message PhysicalPlanNode {
AvroScanExecNode avro_scan = 20;
PhysicalExtensionNode extension = 21;
UnionExecNode union = 22;
ExplainExecNode explain = 23;
}
}

Expand Down Expand Up @@ -461,6 +462,12 @@ message UnionExecNode {
repeated PhysicalPlanNode inputs = 1;
}

message ExplainExecNode {
datafusion.Schema schema = 1;
repeated datafusion.StringifiedPlan stringified_plans = 2;
bool verbose = 3;
}

message CrossJoinExecNode {
PhysicalPlanNode left = 1;
PhysicalPlanNode right = 2;
Expand Down
24 changes: 24 additions & 0 deletions ballista/rust/core/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -515,3 +515,27 @@ message ArrowType{
// }
//}
message EmptyMessage{}

message OptimizedLogicalPlanType {
string optimizer_name = 1;
}

message OptimizedPhysicalPlanType {
string optimizer_name = 1;
}

message PlanType {
oneof plan_type_enum {
EmptyMessage InitialLogicalPlan = 1;
OptimizedLogicalPlanType OptimizedLogicalPlan = 2;
EmptyMessage FinalLogicalPlan = 3;
EmptyMessage InitialPhysicalPlan = 4;
OptimizedPhysicalPlanType OptimizedPhysicalPlan = 5;
EmptyMessage FinalPhysicalPlan = 6;
}
}

message StringifiedPlan {
PlanType plan_type = 1;
string plan = 2;
}
77 changes: 52 additions & 25 deletions ballista/rust/core/src/serde/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,12 @@
// specific language governing permissions and limitations
// under the License.

use crate::error::BallistaError;
use crate::execution_plans::{
ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec,
};
use crate::serde::physical_plan::from_proto::{
parse_physical_expr, parse_protobuf_hash_partitioning,
};
use crate::serde::protobuf::physical_expr_node::ExprType;
use crate::serde::protobuf::physical_plan_node::PhysicalPlanType;
use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
use std::convert::TryInto;
use std::sync::Arc;

use prost::bytes::BufMut;
use prost::Message;

use crate::serde::protobuf::{PhysicalExtensionNode, PhysicalPlanNode};
use crate::serde::scheduler::PartitionLocation;
use crate::serde::{
byte_to_string, proto_error, protobuf, str_to_byte, AsExecutionPlan,
PhysicalExtensionCodec,
};
use crate::{convert_required, into_physical_plan, into_required};
use datafusion::arrow::compute::SortOptions;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::datafusion_data_access::object_store::local::LocalFileSystem;
Expand All @@ -45,6 +33,7 @@ use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::cross_join::CrossJoinExec;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::explain::ExplainExec;
use datafusion::physical_plan::expressions::{Column, PhysicalSortExpr};
use datafusion::physical_plan::file_format::{
AvroExec, CsvExec, FileScanConfig, ParquetExec,
Expand All @@ -62,10 +51,24 @@ use datafusion::physical_plan::{
AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, WindowExpr,
};
use datafusion_proto::from_proto::parse_expr;
use prost::bytes::BufMut;
use prost::Message;
use std::convert::TryInto;
use std::sync::Arc;

use crate::error::BallistaError;
use crate::execution_plans::{
ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec,
};
use crate::serde::physical_plan::from_proto::{
parse_physical_expr, parse_protobuf_hash_partitioning,
};
use crate::serde::protobuf::physical_expr_node::ExprType;
use crate::serde::protobuf::physical_plan_node::PhysicalPlanType;
use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
use crate::serde::protobuf::{PhysicalExtensionNode, PhysicalPlanNode};
use crate::serde::scheduler::PartitionLocation;
use crate::serde::{
byte_to_string, proto_error, protobuf, str_to_byte, AsExecutionPlan,
PhysicalExtensionCodec,
};
use crate::{convert_required, into_physical_plan, into_required};

pub mod from_proto;
pub mod to_proto;
Expand Down Expand Up @@ -103,6 +106,15 @@ impl AsExecutionPlan for PhysicalPlanNode {
))
})?;
match plan {
PhysicalPlanType::Explain(explain) => Ok(Arc::new(ExplainExec::new(
Arc::new(explain.schema.as_ref().unwrap().try_into()?),
explain
.stringified_plans
.iter()
.map(|plan| plan.into())
.collect(),
explain.verbose,
))),
PhysicalPlanType::Projection(projection) => {
let input: Arc<dyn ExecutionPlan> = into_physical_plan!(
projection.input,
Expand Down Expand Up @@ -587,7 +599,21 @@ impl AsExecutionPlan for PhysicalPlanNode {
let plan_clone = plan.clone();
let plan = plan.as_any();

if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Explain(
protobuf::ExplainExecNode {
schema: Some(exec.schema().as_ref().into()),
stringified_plans: exec
.stringified_plans()
.iter()
.map(|plan| plan.into())
.collect(),
verbose: exec.verbose(),
},
)),
})
} else if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.input().to_owned(),
extension_codec,
Expand Down Expand Up @@ -1038,7 +1064,6 @@ mod roundtrip_tests {
use std::ops::Deref;
use std::sync::Arc;

use crate::serde::{AsExecutionPlan, BallistaCodec};
use datafusion::arrow::array::ArrayRef;
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_plan::create_udf;
Expand Down Expand Up @@ -1071,10 +1096,12 @@ mod roundtrip_tests {
scalar::ScalarValue,
};

use super::super::super::error::Result;
use super::super::protobuf;
use crate::execution_plans::ShuffleWriterExec;
use crate::serde::protobuf::{LogicalPlanNode, PhysicalPlanNode};
use crate::serde::{AsExecutionPlan, BallistaCodec};

use super::super::super::error::Result;
use super::super::protobuf;

fn roundtrip_test(exec_plan: Arc<dyn ExecutionPlan>) -> Result<()> {
let ctx = SessionContext::new();
Expand Down
18 changes: 9 additions & 9 deletions datafusion/core/src/optimizer/limit_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ impl LimitPushDown {
}

fn limit_push_down(
optimizer: &LimitPushDown,
_optimizer: &LimitPushDown,
upper_limit: Option<usize>,
plan: &LogicalPlan,
execution_props: &ExecutionProps,
_execution_props: &ExecutionProps,
) -> Result<LogicalPlan> {
match (plan, upper_limit) {
(LogicalPlan::Limit(Limit { n, input }), upper_limit) => {
Expand All @@ -51,10 +51,10 @@ fn limit_push_down(
n: smallest,
// push down limit to plan (minimum of upper limit and current limit)
input: Arc::new(limit_push_down(
optimizer,
_optimizer,
Some(smallest),
input.as_ref(),
execution_props,
_execution_props,
)?),
}))
}
Expand Down Expand Up @@ -91,10 +91,10 @@ fn limit_push_down(
Ok(LogicalPlan::Projection(Projection {
expr: expr.clone(),
input: Arc::new(limit_push_down(
optimizer,
_optimizer,
upper_limit,
input.as_ref(),
execution_props,
_execution_props,
)?),
schema: schema.clone(),
alias: alias.clone(),
Expand All @@ -115,10 +115,10 @@ fn limit_push_down(
Ok(LogicalPlan::Limit(Limit {
n: upper_limit,
input: Arc::new(limit_push_down(
optimizer,
_optimizer,
Some(upper_limit),
x,
execution_props,
_execution_props,
)?),
}))
})
Expand All @@ -138,7 +138,7 @@ fn limit_push_down(
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
.map(|plan| limit_push_down(optimizer, None, plan, execution_props))
.map(|plan| limit_push_down(_optimizer, None, plan, _execution_props))
.collect::<Result<Vec<_>>>()?;

utils::from_plan(plan, &expr, &new_inputs)
Expand Down
36 changes: 18 additions & 18 deletions datafusion/core/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ fn get_projected_schema(

/// Recursively transverses the logical plan removing expressions and that are not needed.
fn optimize_plan(
optimizer: &ProjectionPushDown,
_optimizer: &ProjectionPushDown,
plan: &LogicalPlan,
required_columns: &HashSet<Column>, // set of columns required up to this step
has_projection: bool,
execution_props: &ExecutionProps,
_execution_props: &ExecutionProps,
) -> Result<LogicalPlan> {
let mut new_required_columns = required_columns.clone();
match plan {
Expand Down Expand Up @@ -165,11 +165,11 @@ fn optimize_plan(
})?;

let new_input = optimize_plan(
optimizer,
_optimizer,
input,
&new_required_columns,
true,
execution_props,
_execution_props,
)?;

let new_required_columns_optimized = new_input
Expand Down Expand Up @@ -211,19 +211,19 @@ fn optimize_plan(
}

let optimized_left = Arc::new(optimize_plan(
optimizer,
_optimizer,
left,
&new_required_columns,
true,
execution_props,
_execution_props,
)?);

let optimized_right = Arc::new(optimize_plan(
optimizer,
_optimizer,
right,
&new_required_columns,
true,
execution_props,
_execution_props,
)?);

let schema = build_join_schema(
Expand Down Expand Up @@ -272,11 +272,11 @@ fn optimize_plan(
)?;

LogicalPlanBuilder::from(optimize_plan(
optimizer,
_optimizer,
input,
&new_required_columns,
true,
execution_props,
_execution_props,
)?)
.window(new_window_expr)?
.build()
Expand Down Expand Up @@ -324,11 +324,11 @@ fn optimize_plan(
group_expr: group_expr.clone(),
aggr_expr: new_aggr_expr,
input: Arc::new(optimize_plan(
optimizer,
_optimizer,
input,
&new_required_columns,
true,
execution_props,
_execution_props,
)?),
schema: DFSchemaRef::new(new_schema),
}))
Expand Down Expand Up @@ -373,11 +373,11 @@ fn optimize_plan(

Ok(LogicalPlan::Analyze(Analyze {
input: Arc::new(optimize_plan(
optimizer,
_optimizer,
&a.input,
&required_columns,
false,
execution_props,
_execution_props,
)?),
verbose: a.verbose,
schema: a.schema.clone(),
Expand Down Expand Up @@ -409,11 +409,11 @@ fn optimize_plan(
new_required_columns.insert(f.qualified_column());
});
optimize_plan(
optimizer,
_optimizer,
input_plan,
&new_required_columns,
has_projection,
execution_props,
_execution_props,
)
})
.collect::<Result<Vec<_>>>()?;
Expand Down Expand Up @@ -457,11 +457,11 @@ fn optimize_plan(
.iter()
.map(|input_plan| {
optimize_plan(
optimizer,
_optimizer,
input_plan,
&new_required_columns,
has_projection,
execution_props,
_execution_props,
)
})
.collect::<Result<Vec<_>>>()?;
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/physical_plan/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ impl ExplainExec {
pub fn stringified_plans(&self) -> &[StringifiedPlan] {
&self.stringified_plans
}

/// access to verbose
pub fn verbose(&self) -> bool {
self.verbose
}
}

#[async_trait]
Expand Down
Loading