Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ message Join {
Relation right = 2;
Expression join_condition = 3;
JoinType join_type = 4;
// Optional. using_columns provides a list of columns that should present on both sides of
// the join inputs that this Join will join on. For example A JOIN B USING col_name is
// equivalent to A JOIN B on A.col_name = B.col_name.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also mention that this can't co-exist with join_condition

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

//
// This field does not co-exist with join_condition.
repeated string using_columns = 5;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can join_condition and using_columns co-exist?

Copy link
Contributor Author

@amaliujia amaliujia Oct 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the current Catalyst implementation. My read is yes.

My understanding is current Catalyst implementation does not support JOIN USING col_name,... in the join_condition. It is a separate code path. Is it true?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the API layer (DF and SQL), I don't think they can co-exist. E.g. the parser rule is

joinCriteria
    : ON booleanExpression
    | USING identifierList
    ;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. To match with our API (but not implementation), I added a check to make sure only one of these two will be set.


enum JoinType {
JOIN_TYPE_UNSPECIFIED = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,45 @@ package object dsl {
.build()

def join(
otherPlan: proto.Relation,
joinType: JoinType,
condition: Option[proto.Expression]): proto.Relation = {
join(otherPlan, joinType, Seq(), condition)
}

def join(otherPlan: proto.Relation, condition: Option[proto.Expression]): proto.Relation = {
join(otherPlan, JoinType.JOIN_TYPE_INNER, Seq(), condition)
}

def join(otherPlan: proto.Relation): proto.Relation = {
join(otherPlan, JoinType.JOIN_TYPE_INNER, Seq(), None)
}

def join(otherPlan: proto.Relation, joinType: JoinType): proto.Relation = {
join(otherPlan, joinType, Seq(), None)
}

def join(
otherPlan: proto.Relation,
joinType: JoinType,
usingColumns: Seq[String]): proto.Relation = {
join(otherPlan, joinType, usingColumns, None)
}

private def join(
otherPlan: proto.Relation,
joinType: JoinType = JoinType.JOIN_TYPE_INNER,
condition: Option[proto.Expression] = None): proto.Relation = {
usingColumns: Seq[String],
condition: Option[proto.Expression]): proto.Relation = {
val relation = proto.Relation.newBuilder()
val join = proto.Join.newBuilder()
join
.setLeft(logicalPlan)
.setRight(otherPlan)
.setJoinType(joinType)
if (usingColumns.nonEmpty) {
join.addAllUsingColumns(usingColumns.asJava)
}
if (condition.isDefined) {
join.setJoinCondition(condition.get)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttrib
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.{logical, FullOuter, Inner, JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.catalyst.plans.{logical, FullOuter, Inner, JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter, UsingJoin}
import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, LogicalPlan, Sample, SubqueryAlias}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.QueryExecution
Expand Down Expand Up @@ -292,14 +292,23 @@ class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) {

private def transformJoin(rel: proto.Join): LogicalPlan = {
assert(rel.hasLeft && rel.hasRight, "Both join sides must be present")
if (rel.hasJoinCondition && rel.getUsingColumnsCount > 0) {
throw InvalidPlanInput(
s"Using columns or join conditions cannot be set at the same time in Join")
}
val joinCondition =
if (rel.hasJoinCondition) Some(transformExpression(rel.getJoinCondition)) else None

val catalystJointype = transformJoinType(
if (rel.getJoinType != null) rel.getJoinType else proto.Join.JoinType.JOIN_TYPE_INNER)
val joinType = if (rel.getUsingColumnsCount > 0) {
UsingJoin(catalystJointype, rel.getUsingColumnsList.asScala.toSeq)
} else {
catalystJointype
}
logical.Join(
left = transformRelation(rel.getLeft),
right = transformRelation(rel.getRight),
joinType = transformJoinType(
if (rel.getJoinType != null) rel.getJoinType else proto.Join.JoinType.JOIN_TYPE_INNER),
joinType = joinType,
condition = joinCondition,
hint = logical.JoinHint.NONE)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,20 @@ class SparkConnectPlannerSuite extends SparkFunSuite with SparkConnectPlanTest {
assert(res.nodeName == "Join")
assert(res != null)

val e = intercept[InvalidPlanInput] {
val simpleJoin = proto.Relation.newBuilder
.setJoin(
proto.Join.newBuilder
.setLeft(readRel)
.setRight(readRel)
.addUsingColumns("test_col")
.setJoinCondition(joinCondition))
.build()
transform(simpleJoin)
}
assert(
e.getMessage.contains(
"Using columns or join conditions cannot be set at the same time in Join"))
}

test("Simple Projection") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.Join.JoinType
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftAnti, LeftOuter, LeftSemi, PlanTest, RightOuter}
import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftAnti, LeftOuter, LeftSemi, PlanTest, RightOuter, UsingJoin}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation

/**
Expand All @@ -32,11 +32,13 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {

lazy val connectTestRelation = createLocalRelationProto(Seq($"id".int, $"name".string))

lazy val connectTestRelation2 = createLocalRelationProto(Seq($"key".int, $"value".int))
lazy val connectTestRelation2 = createLocalRelationProto(
Seq($"key".int, $"value".int, $"name".string))

lazy val sparkTestRelation: LocalRelation = LocalRelation($"id".int, $"name".string)

lazy val sparkTestRelation2: LocalRelation = LocalRelation($"key".int, $"value".int)
lazy val sparkTestRelation2: LocalRelation =
LocalRelation($"key".int, $"value".int, $"name".string)

test("Basic select") {
val connectPlan = {
Expand Down Expand Up @@ -117,6 +119,14 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
val sparkPlan3 = sparkTestRelation.join(sparkTestRelation2, y)
comparePlans(connectPlan3.analyze, sparkPlan3.analyze, false)
}

val connectPlan4 = {
import org.apache.spark.sql.connect.dsl.plans._
transform(
connectTestRelation.join(connectTestRelation2, JoinType.JOIN_TYPE_INNER, Seq("name")))
}
val sparkPlan4 = sparkTestRelation.join(sparkTestRelation2, UsingJoin(Inner, Seq("name")))
comparePlans(connectPlan4.analyze, sparkPlan4.analyze, false)
}

test("Test sample") {
Expand Down
Loading