Skip to content

Commit

Permalink
fix join column handling logic for On and Using constraints
Browse files Browse the repository at this point in the history
  • Loading branch information
houqp committed Jun 23, 2021
1 parent d55a105 commit ecf923f
Show file tree
Hide file tree
Showing 20 changed files with 610 additions and 333 deletions.
12 changes: 9 additions & 3 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -378,12 +378,18 @@ enum JoinType {
ANTI = 5;
}

enum JoinConstraint {
ON = 0;
USING = 1;
}

message JoinNode {
LogicalPlanNode left = 1;
LogicalPlanNode right = 2;
JoinType join_type = 3;
repeated Column left_join_column = 4;
repeated Column right_join_column = 5;
JoinConstraint join_constraint = 4;
repeated Column left_join_column = 5;
repeated Column right_join_column = 6;
}

message LimitNode {
Expand Down Expand Up @@ -570,7 +576,7 @@ message HashJoinExecNode {
PhysicalPlanNode right = 2;
repeated JoinOn on = 3;
JoinType join_type = 4;

JoinConstraint join_constraint = 5;
}

message PhysicalColumn {
Expand Down
43 changes: 27 additions & 16 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use datafusion::logical_plan::window_frames::{
};
use datafusion::logical_plan::{
abs, acos, asin, atan, ceil, cos, exp, floor, ln, log10, log2, round, signum, sin,
sqrt, tan, trunc, Column, DFField, DFSchema, Expr, JoinType, LogicalPlan,
LogicalPlanBuilder, Operator,
sqrt, tan, trunc, Column, DFField, DFSchema, Expr, JoinConstraint, JoinType,
LogicalPlan, LogicalPlanBuilder, Operator,
};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::csv::CsvReadOptions;
Expand Down Expand Up @@ -247,23 +247,34 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
join.join_type
))
})?;
let join_type = match join_type {
protobuf::JoinType::Inner => JoinType::Inner,
protobuf::JoinType::Left => JoinType::Left,
protobuf::JoinType::Right => JoinType::Right,
protobuf::JoinType::Full => JoinType::Full,
protobuf::JoinType::Semi => JoinType::Semi,
protobuf::JoinType::Anti => JoinType::Anti,
};
LogicalPlanBuilder::from(&convert_box_required!(join.left)?)
.join(
let join_constraint = protobuf::JoinConstraint::from_i32(
join.join_constraint,
)
.ok_or_else(|| {
proto_error(format!(
"Received a JoinNode message with unknown JoinConstraint {}",
join.join_constraint
))
})?;

let builder =
LogicalPlanBuilder::from(&convert_box_required!(join.left)?);

let builder = match join_constraint.into() {
JoinConstraint::On => builder.join(
&convert_box_required!(join.right)?,
join_type,
join_type.into(),
left_keys,
right_keys,
)?
.build()
.map_err(|e| e.into())
)?,
JoinConstraint::Using => builder.join_using(
&convert_box_required!(join.right)?,
join_type.into(),
left_keys,
)?,
};

builder.build().map_err(|e| e.into())
}
}
}
Expand Down
15 changes: 6 additions & 9 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUn
use datafusion::datasource::CsvFile;
use datafusion::logical_plan::{
window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
Column, Expr, JoinType, LogicalPlan,
Column, Expr, JoinConstraint, JoinType, LogicalPlan,
};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::functions::BuiltinScalarFunction;
Expand Down Expand Up @@ -804,26 +804,23 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
right,
on,
join_type,
join_constraint,
..
} => {
let left: protobuf::LogicalPlanNode = left.as_ref().try_into()?;
let right: protobuf::LogicalPlanNode = right.as_ref().try_into()?;
let join_type = match join_type {
JoinType::Inner => protobuf::JoinType::Inner,
JoinType::Left => protobuf::JoinType::Left,
JoinType::Right => protobuf::JoinType::Right,
JoinType::Full => protobuf::JoinType::Full,
JoinType::Semi => protobuf::JoinType::Semi,
JoinType::Anti => protobuf::JoinType::Anti,
};
let (left_join_column, right_join_column) =
on.iter().map(|(l, r)| (l.into(), r.into())).unzip();
let join_type: protobuf::JoinType = join_type.to_owned().into();
let join_constraint: protobuf::JoinConstraint =
join_constraint.to_owned().into();
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Join(Box::new(
protobuf::JoinNode {
left: Some(Box::new(left)),
right: Some(Box::new(right)),
join_type: join_type.into(),
join_constraint: join_constraint.into(),
left_join_column,
right_join_column,
},
Expand Down
46 changes: 45 additions & 1 deletion ballista/rust/core/src/serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::{convert::TryInto, io::Cursor};

use datafusion::logical_plan::Operator;
use datafusion::logical_plan::{JoinConstraint, JoinType, Operator};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::window_functions::BuiltInWindowFunction;

Expand Down Expand Up @@ -291,3 +291,47 @@ impl Into<datafusion::arrow::datatypes::DataType> for protobuf::PrimitiveScalarT
}
}
}

impl From<protobuf::JoinType> for JoinType {
fn from(t: protobuf::JoinType) -> Self {
match t {
protobuf::JoinType::Inner => JoinType::Inner,
protobuf::JoinType::Left => JoinType::Left,
protobuf::JoinType::Right => JoinType::Right,
protobuf::JoinType::Full => JoinType::Full,
protobuf::JoinType::Semi => JoinType::Semi,
protobuf::JoinType::Anti => JoinType::Anti,
}
}
}

impl From<JoinType> for protobuf::JoinType {
fn from(t: JoinType) -> Self {
match t {
JoinType::Inner => protobuf::JoinType::Inner,
JoinType::Left => protobuf::JoinType::Left,
JoinType::Right => protobuf::JoinType::Right,
JoinType::Full => protobuf::JoinType::Full,
JoinType::Semi => protobuf::JoinType::Semi,
JoinType::Anti => protobuf::JoinType::Anti,
}
}
}

impl From<protobuf::JoinConstraint> for JoinConstraint {
fn from(t: protobuf::JoinConstraint) -> Self {
match t {
protobuf::JoinConstraint::On => JoinConstraint::On,
protobuf::JoinConstraint::Using => JoinConstraint::Using,
}
}
}

impl From<JoinConstraint> for protobuf::JoinConstraint {
fn from(t: JoinConstraint) -> Self {
match t {
JoinConstraint::On => protobuf::JoinConstraint::On,
JoinConstraint::Using => protobuf::JoinConstraint::Using,
}
}
}
26 changes: 15 additions & 11 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ use datafusion::catalog::catalog::{
use datafusion::execution::context::{
ExecutionConfig, ExecutionContextState, ExecutionProps,
};
use datafusion::logical_plan::{window_frames::WindowFrame, DFSchema, Expr};
use datafusion::logical_plan::{
window_frames::WindowFrame, DFSchema, Expr, JoinConstraint, JoinType,
};
use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunction};
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use datafusion::physical_plan::hash_join::PartitionMode;
Expand All @@ -57,7 +59,6 @@ use datafusion::physical_plan::{
filter::FilterExec,
functions::{self, BuiltinScalarFunction, ScalarFunctionExpr},
hash_join::HashJoinExec,
hash_utils::JoinType,
limit::{GlobalLimitExec, LocalLimitExec},
parquet::ParquetExec,
projection::ProjectionExec,
Expand Down Expand Up @@ -348,19 +349,22 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
hashjoin.join_type
))
})?;
let join_type = match join_type {
protobuf::JoinType::Inner => JoinType::Inner,
protobuf::JoinType::Left => JoinType::Left,
protobuf::JoinType::Right => JoinType::Right,
protobuf::JoinType::Full => JoinType::Full,
protobuf::JoinType::Semi => JoinType::Semi,
protobuf::JoinType::Anti => JoinType::Anti,
};

let join_constraint =
protobuf::JoinConstraint::from_i32(hashjoin.join_constraint)
.ok_or_else(|| {
proto_error(format!(
"Received a HashJoinNode message with unknown JoinConstraint {}",
hashjoin.join_constraint,
))
})?;

Ok(Arc::new(HashJoinExec::try_new(
left,
right,
on,
&join_type,
&join_type.into(),
join_constraint.into(),
PartitionMode::CollectLeft,
)?))
}
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/core/src/serde/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@ mod roundtrip_tests {
compute::kernels::sort::SortOptions,
datatypes::{DataType, Field, Schema},
},
logical_plan::Operator,
logical_plan::{JoinConstraint, JoinType, Operator},
physical_plan::{
empty::EmptyExec,
expressions::{binary, col, lit, InListExpr, NotExpr},
expressions::{Avg, Column, PhysicalSortExpr},
filter::FilterExec,
hash_aggregate::{AggregateMode, HashAggregateExec},
hash_join::{HashJoinExec, PartitionMode},
hash_utils::JoinType,
limit::{GlobalLimitExec, LocalLimitExec},
sort::SortExec,
AggregateExpr, ColumnarValue, Distribution, ExecutionPlan, Partitioning,
Expand Down Expand Up @@ -93,6 +92,7 @@ mod roundtrip_tests {
Arc::new(EmptyExec::new(false, Arc::new(schema_right))),
on,
&JoinType::Inner,
JoinConstraint::On,
PartitionMode::CollectLeft,
)?))
}
Expand Down
14 changes: 5 additions & 9 deletions ballista/rust/core/src/serde/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::{
sync::Arc,
};

use datafusion::logical_plan::{JoinConstraint, JoinType};
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::csv::CsvExec;
use datafusion::physical_plan::expressions::{
Expand All @@ -35,7 +36,6 @@ use datafusion::physical_plan::expressions::{CastExpr, TryCastExpr};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::hash_aggregate::AggregateMode;
use datafusion::physical_plan::hash_join::HashJoinExec;
use datafusion::physical_plan::hash_utils::JoinType;
use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion::physical_plan::parquet::ParquetExec;
use datafusion::physical_plan::projection::ProjectionExec;
Expand Down Expand Up @@ -135,21 +135,17 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
}),
})
.collect();
let join_type = match exec.join_type() {
JoinType::Inner => protobuf::JoinType::Inner,
JoinType::Left => protobuf::JoinType::Left,
JoinType::Right => protobuf::JoinType::Right,
JoinType::Full => protobuf::JoinType::Full,
JoinType::Semi => protobuf::JoinType::Semi,
JoinType::Anti => protobuf::JoinType::Anti,
};
let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
let join_constraint: protobuf::JoinConstraint = exec.join_constraint().into();

Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
protobuf::HashJoinExecNode {
left: Some(Box::new(left)),
right: Some(Box::new(right)),
on,
join_type: join_type.into(),
join_constraint: join_constraint.into(),
},
))),
})
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/queries/q7.sql
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ group by
order by
supp_nation,
cust_nation,
l_year;
l_year;
Loading

0 comments on commit ecf923f

Please sign in to comment.