Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable explain for ballista #2163

Merged
merged 3 commits into from
Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ message AnalyzeNode {
bool verbose = 2;
}

message ExplainNode{
message ExplainNode {
LogicalPlanNode input = 1;
bool verbose = 2;
}
Expand Down Expand Up @@ -259,6 +259,7 @@ message PhysicalPlanNode {
AvroScanExecNode avro_scan = 20;
PhysicalExtensionNode extension = 21;
UnionExecNode union = 22;
ExplainExecNode explain = 23;
}
}

Expand Down Expand Up @@ -443,6 +444,12 @@ message UnionExecNode {
repeated PhysicalPlanNode inputs = 1;
}

message ExplainExecNode {
datafusion.Schema schema = 1;
repeated datafusion.StringifiedPlan stringified_plans = 2;
bool verbose = 3;
}

message CrossJoinExecNode {
PhysicalPlanNode left = 1;
PhysicalPlanNode right = 2;
Expand Down
24 changes: 24 additions & 0 deletions ballista/rust/core/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -514,3 +514,27 @@ message ArrowType{
// }
//}
message EmptyMessage{}

message OptimizedLogicalPlanType {
string optimizer_name = 1;
}

message OptimizedPhysicalPlanType {
string optimizer_name = 1;
}

message PlanType {
oneof plan_type_enum {
EmptyMessage InitialLogicalPlan = 1;
OptimizedLogicalPlanType OptimizedLogicalPlan = 2;
EmptyMessage FinalLogicalPlan = 3;
EmptyMessage InitialPhysicalPlan = 4;
OptimizedPhysicalPlanType OptimizedPhysicalPlan = 5;
EmptyMessage FinalPhysicalPlan = 6;
}
}

message StringifiedPlan {
PlanType plan_type = 1;
string plan = 2;
}
74 changes: 51 additions & 23 deletions ballista/rust/core/src/serde/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,12 @@
// specific language governing permissions and limitations
// under the License.

use crate::error::BallistaError;
use crate::execution_plans::{
ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec,
};
use crate::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
use crate::serde::protobuf::physical_expr_node::ExprType;
use crate::serde::protobuf::physical_plan_node::PhysicalPlanType;
use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
use std::convert::TryInto;
use std::sync::Arc;

use prost::bytes::BufMut;
use prost::Message;

use crate::serde::protobuf::{PhysicalExtensionNode, PhysicalPlanNode};
use crate::serde::scheduler::PartitionLocation;
use crate::serde::{
byte_to_string, proto_error, protobuf, str_to_byte, AsExecutionPlan,
PhysicalExtensionCodec,
};
use crate::{convert_box_required, convert_required, into_physical_plan, into_required};
use datafusion::arrow::compute::SortOptions;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::datafusion_data_access::object_store::local::LocalFileSystem;
Expand All @@ -43,6 +33,7 @@ use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::cross_join::CrossJoinExec;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::explain::ExplainExec;
use datafusion::physical_plan::expressions::{Column, PhysicalSortExpr};
use datafusion::physical_plan::file_format::{
AvroExec, CsvExec, FileScanConfig, ParquetExec,
Expand All @@ -60,10 +51,23 @@ use datafusion::physical_plan::{
AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, WindowExpr,
};
use datafusion_proto::from_proto::parse_expr;
use prost::bytes::BufMut;
use prost::Message;
use std::convert::TryInto;
use std::sync::Arc;

use crate::error::BallistaError;
use crate::execution_plans::{
ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec,
};

use crate::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
use crate::serde::protobuf::physical_expr_node::ExprType;
use crate::serde::protobuf::physical_plan_node::PhysicalPlanType;
use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
use crate::serde::protobuf::{PhysicalExtensionNode, PhysicalPlanNode};
use crate::serde::scheduler::PartitionLocation;
use crate::serde::{
byte_to_string, proto_error, protobuf, str_to_byte, AsExecutionPlan,
PhysicalExtensionCodec,
};
use crate::{convert_box_required, convert_required, into_physical_plan, into_required};

pub mod from_proto;
pub mod to_proto;
Expand Down Expand Up @@ -101,6 +105,15 @@ impl AsExecutionPlan for PhysicalPlanNode {
))
})?;
match plan {
PhysicalPlanType::Explain(explain) => Ok(Arc::new(ExplainExec::new(
Arc::new(explain.schema.as_ref().unwrap().try_into()?),
explain
.stringified_plans
.iter()
.map(|plan| plan.into())
.collect(),
explain.verbose,
))),
PhysicalPlanType::Projection(projection) => {
let input: Arc<dyn ExecutionPlan> = into_physical_plan!(
projection.input,
Expand Down Expand Up @@ -563,7 +576,21 @@ impl AsExecutionPlan for PhysicalPlanNode {
let plan_clone = plan.clone();
let plan = plan.as_any();

if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Explain(
protobuf::ExplainExecNode {
schema: Some(exec.schema().as_ref().into()),
stringified_plans: exec
.stringified_plans()
.iter()
.map(|plan| plan.into())
.collect(),
verbose: exec.verbose(),
},
)),
})
} else if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.input().to_owned(),
extension_codec,
Expand Down Expand Up @@ -1014,7 +1041,6 @@ mod roundtrip_tests {
use std::ops::Deref;
use std::sync::Arc;

use crate::serde::{AsExecutionPlan, BallistaCodec};
use datafusion::{
arrow::{
compute::kernels::sort::SortOptions,
Expand All @@ -1039,10 +1065,12 @@ mod roundtrip_tests {
scalar::ScalarValue,
};

use super::super::super::error::Result;
use super::super::protobuf;
use crate::execution_plans::ShuffleWriterExec;
use crate::serde::protobuf::{LogicalPlanNode, PhysicalPlanNode};
use crate::serde::{AsExecutionPlan, BallistaCodec};

use super::super::super::error::Result;
use super::super::protobuf;

fn roundtrip_test(exec_plan: Arc<dyn ExecutionPlan>) -> Result<()> {
let ctx = SessionContext::new();
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/physical_plan/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ impl ExplainExec {
pub fn stringified_plans(&self) -> &[StringifiedPlan] {
&self.stringified_plans
}

/// access to verbose
pub fn verbose(&self) -> bool {
self.verbose
}
}

#[async_trait]
Expand Down
24 changes: 24 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -539,3 +539,27 @@ message ArrowType{
// }
//}
message EmptyMessage{}

message OptimizedLogicalPlanType {
string optimizer_name = 1;
}

message OptimizedPhysicalPlanType {
string optimizer_name = 1;
}

message PlanType {
oneof plan_type_enum {
EmptyMessage InitialLogicalPlan = 1;
OptimizedLogicalPlanType OptimizedLogicalPlan = 2;
EmptyMessage FinalLogicalPlan = 3;
EmptyMessage InitialPhysicalPlan = 4;
OptimizedPhysicalPlanType OptimizedPhysicalPlan = 5;
EmptyMessage FinalPhysicalPlan = 6;
}
}

message StringifiedPlan {
PlanType plan_type = 1;
string plan = 2;
}
43 changes: 40 additions & 3 deletions datafusion/proto/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@
// under the License.

use crate::protobuf;
use datafusion::logical_plan::FunctionRegistry;
use crate::protobuf::plan_type::PlanTypeEnum::{
FinalLogicalPlan, FinalPhysicalPlan, InitialLogicalPlan, InitialPhysicalPlan,
OptimizedLogicalPlan, OptimizedPhysicalPlan,
};
use crate::protobuf::{OptimizedLogicalPlanType, OptimizedPhysicalPlanType};
use datafusion::logical_plan::plan::StringifiedPlan;
use datafusion::logical_plan::{FunctionRegistry, PlanType};
use datafusion::prelude::bit_length;
use datafusion::{
arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit, UnionMode},
Expand Down Expand Up @@ -362,6 +368,37 @@ impl TryFrom<&protobuf::Field> for Field {
}
}

impl From<&protobuf::StringifiedPlan> for StringifiedPlan {
fn from(stringified_plan: &protobuf::StringifiedPlan) -> Self {
Self {
plan_type: match stringified_plan
.plan_type
.as_ref()
.unwrap()
.plan_type_enum
.as_ref()
.unwrap()
{
InitialLogicalPlan(_) => PlanType::InitialLogicalPlan,
OptimizedLogicalPlan(OptimizedLogicalPlanType { optimizer_name }) => {
PlanType::OptimizedLogicalPlan {
optimizer_name: optimizer_name.clone(),
}
}
FinalLogicalPlan(_) => PlanType::FinalLogicalPlan,
InitialPhysicalPlan(_) => PlanType::InitialPhysicalPlan,
OptimizedPhysicalPlan(OptimizedPhysicalPlanType { optimizer_name }) => {
PlanType::OptimizedPhysicalPlan {
optimizer_name: optimizer_name.clone(),
}
}
FinalPhysicalPlan(_) => PlanType::FinalPhysicalPlan,
},
plan: Arc::new(stringified_plan.plan.clone()),
}
}
}

impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
fn from(f: &protobuf::ScalarFunction) -> Self {
use protobuf::ScalarFunction;
Expand Down Expand Up @@ -719,7 +756,7 @@ impl TryFrom<&protobuf::PrimitiveScalarType> for ScalarValue {

Ok(match scalar {
PrimitiveScalarType::Null => {
return Err(proto_error("Untyped null is an invalid scalar value"))
return Err(proto_error("Untyped null is an invalid scalar value"));
}
PrimitiveScalarType::Bool => Self::Boolean(None),
PrimitiveScalarType::Uint8 => Self::UInt8(None),
Expand Down Expand Up @@ -1442,7 +1479,7 @@ fn typechecked_scalar_value_conversion(
PrimitiveScalarType::Null => {
return Err(proto_error(
"Untyped scalar null is not a valid scalar value",
))
));
}
PrimitiveScalarType::Decimal128 => {
ScalarValue::Decimal128(None, 0, 0)
Expand Down
47 changes: 45 additions & 2 deletions datafusion/proto/src/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,16 @@
//! processes.

use crate::protobuf;
use crate::protobuf::plan_type::PlanTypeEnum::{
FinalLogicalPlan, FinalPhysicalPlan, InitialLogicalPlan, InitialPhysicalPlan,
OptimizedLogicalPlan, OptimizedPhysicalPlan,
};
use crate::protobuf::{
EmptyMessage, OptimizedLogicalPlanType, OptimizedPhysicalPlanType,
};

use datafusion::logical_plan::plan::StringifiedPlan;
use datafusion::logical_plan::PlanType;
use datafusion::{
arrow::datatypes::{
DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit, UnionMode,
Expand Down Expand Up @@ -143,8 +152,6 @@ impl From<&DataType> for protobuf::ArrowType {

impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum {
fn from(val: &DataType) -> Self {
use protobuf::EmptyMessage;

match val {
DataType::Null => Self::None(EmptyMessage {}),
DataType::Boolean => Self::Bool(EmptyMessage {}),
Expand Down Expand Up @@ -294,6 +301,42 @@ impl From<&DFSchemaRef> for protobuf::DfSchema {
}
}

impl From<&StringifiedPlan> for protobuf::StringifiedPlan {
fn from(stringified_plan: &StringifiedPlan) -> Self {
Self {
plan_type: match stringified_plan.clone().plan_type {
PlanType::InitialLogicalPlan => Some(protobuf::PlanType {
plan_type_enum: Some(InitialLogicalPlan(EmptyMessage {})),
}),
PlanType::OptimizedLogicalPlan { optimizer_name } => {
Some(protobuf::PlanType {
plan_type_enum: Some(OptimizedLogicalPlan(
OptimizedLogicalPlanType { optimizer_name },
)),
})
}
PlanType::FinalLogicalPlan => Some(protobuf::PlanType {
plan_type_enum: Some(FinalLogicalPlan(EmptyMessage {})),
}),
PlanType::InitialPhysicalPlan => Some(protobuf::PlanType {
plan_type_enum: Some(InitialPhysicalPlan(EmptyMessage {})),
}),
PlanType::OptimizedPhysicalPlan { optimizer_name } => {
Some(protobuf::PlanType {
plan_type_enum: Some(OptimizedPhysicalPlan(
OptimizedPhysicalPlanType { optimizer_name },
)),
})
}
PlanType::FinalPhysicalPlan => Some(protobuf::PlanType {
plan_type_enum: Some(FinalPhysicalPlan(EmptyMessage {})),
}),
},
plan: stringified_plan.plan.to_string(),
}
}
}

impl From<&AggregateFunction> for protobuf::AggregateFunction {
fn from(value: &AggregateFunction) -> Self {
match value {
Expand Down