Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0a62098
Optimize cartesian order
Sephiroth-Lin Jul 15, 2015
61d1a7e
Fix code sytle
Sephiroth-Lin Jul 15, 2015
23deb4b
Update
Sephiroth-Lin Jul 16, 2015
eb9d155
Fix code style
Sephiroth-Lin Jul 16, 2015
8198648
Fix unit test failed
Sephiroth-Lin Jul 16, 2015
1006d46
Fix code style
Sephiroth-Lin Jul 17, 2015
f0ce447
Update
Sephiroth-Lin Jul 17, 2015
2bc0991
Update code style
Sephiroth-Lin Jul 17, 2015
547242e
code style
Sephiroth-Lin Jul 17, 2015
bca7a07
Fix unit test failed
Sephiroth-Lin Jul 17, 2015
a168900
Fix NullPointerException
Sephiroth-Lin Jul 20, 2015
99bcde7
Update thread pool name
Sephiroth-Lin Jul 22, 2015
4310536
Update
Sephiroth-Lin Aug 1, 2015
b2a0ae8
Update
Sephiroth-Lin Aug 1, 2015
5ca1d26
Use BroadcastNestedLoopJoin replace BroadcastCartesianProduct
Sephiroth-Lin Aug 1, 2015
04678d1
Fix unit test failed
Sephiroth-Lin Aug 1, 2015
f1cebae
Merge branch 'master' into SPARK-9066
Sephiroth-Lin Sep 8, 2015
8a8658c
Add Inner for do cartesian broadcast, SPARK-10484 point out this
Sephiroth-Lin Sep 8, 2015
60f2102
Update
Sephiroth-Lin Sep 9, 2015
e01c8f0
fix error
Sephiroth-Lin Sep 9, 2015
dd77444
Merge branch 'master' of https://github.com/apache/spark into SPARK-9066
Sephiroth-Lin Sep 12, 2015
d9aef91
Merge branch 'master' into SPARK-9066
Sephiroth-Lin Sep 29, 2015
a66f475
Add some unit test which PR#8652 have done, and fix unit test error
Sephiroth-Lin Sep 29, 2015
9812242
Fix unit test error
Sephiroth-Lin Sep 29, 2015
ce6ad25
Delete unused unit test
Sephiroth-Lin Sep 29, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _}
import org.apache.spark.sql.execution.joins.BuildSide
import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{SQLContext, Strategy, execution}
Expand Down Expand Up @@ -274,12 +275,30 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}

object CartesianProduct extends Strategy {
def getSmallSide(left: LogicalPlan, right: LogicalPlan): BuildSide = {
if (right.statistics.sizeInBytes < left.statistics.sizeInBytes) {
joins.BuildRight
} else {
joins.BuildLeft
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure comparing the total size will give us a very useful heuristics because the size of a table does not imply anything about the size of a partition. Also, I am inclined to get https://github.com/apache/spark/pull/8652/files merged first and then add any other optimization in a follow-up PR. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, no problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @yhuai and @chenghao-intel. Just would like to provide a concrete example: compression ratio of Parquet can be easily ten times of JSON. When joining a Parquet table and a JSON table, you may even end up with a much worse situation.


def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// If plan can broadcast we use BroadcastNestedLoopJoin, as we know for inner join with true
// condition is same as Cartesian.
case logical.Join(CanBroadcast(left), right, joinType, condition) =>
execution.joins.BroadcastNestedLoopJoin(
planLater(left), planLater(right), joins.BuildLeft, joinType, condition) :: Nil
case logical.Join(left, CanBroadcast(right), joinType, condition) =>
execution.joins.BroadcastNestedLoopJoin(
planLater(left), planLater(right), joins.BuildRight, joinType, condition) :: Nil
case logical.Join(left, right, _, None) =>
execution.joins.CartesianProduct(planLater(left), planLater(right)) :: Nil
execution.joins.CartesianProduct(planLater(left), planLater(right),
getSmallSide(left, right)) :: Nil
case logical.Join(left, right, Inner, Some(condition)) =>
execution.Filter(condition,
execution.joins.CartesianProduct(planLater(left), planLater(right))) :: Nil
execution.joins.CartesianProduct(planLater(left), planLater(right),
getSmallSide(left, right))) :: Nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of passing a BuildSide to CartesianProduct, why not just change the parameters order according to the data size? like

if (left < right) {
  CartesianProduct(left, right)
} else {
  CartesianProduct(right, left)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I am a little concern about the side switch based on the statistic, as I commented previously. And also as @cloud-fan comment out:

for (x <- rdd1.iterator(currSplit.s1, context);
     y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)

What we actually cared is the average amount of records in each partition in both sides, and, I don't think we can say, the one take the bigger file size in statistics will also with more average amount of records in its partition(most likely the average amount of records in each partition should be same).

Probably we'd better add more statistic info says partition number logical plan or average file size of each partition, and in order not to make confusing for the further improvement, I think we'd better remove this optimization rule for cartesian join. And that's why I didn't do that at #8652

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! This optimization should depend on record numbers, not data size.

case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.util.collection.CompactBuffer
Expand Down Expand Up @@ -71,6 +71,7 @@ case class BroadcastNestedLoopJoin(
left.output.map(_.withNullability(true)) ++ right.output
case FullOuter =>
left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
case Inner => left.output ++ right.output
case x =>
throw new IllegalArgumentException(
s"BroadcastNestedLoopJoin should not take $x as the JoinType")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
* :: DeveloperApi ::
*/
@DeveloperApi
case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode {
case class CartesianProduct(
left: SparkPlan,
right: SparkPlan,
buildSide: BuildSide) extends BinaryNode {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not so sure if the change necessary, as if either side of the table is small enough, we will resort to the BroadcastNestedLoopJoin; since both side tables are bigger enough, the change may not improve the performance as we expected.
BTW, we didn't build hash map in CaresianProduct, using buildSide seems a little confusing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Sephiroth-Lin Can you explain the reason that we need buildSide?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yhuai use buildSide just want to know which side is small, and use this to decide whether we need to change the order.

override def output: Seq[Attribute] = left.output ++ right.output

override private[sql] lazy val metrics = Map(
Expand All @@ -50,11 +53,25 @@ case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNod
row.copy()
}

leftResults.cartesian(rightResults).mapPartitions { iter =>
val (smallResults, bigResults) = buildSide match {
case BuildRight => (rightResults, leftResults)
case BuildLeft => (leftResults, rightResults)
}

// Use the small size rdd as cartesian left rdd.
smallResults.cartesian(bigResults).mapPartitions { iter =>
val joinedRow = new JoinedRow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick question. Why not use sizeInBytes? I assume we want to move as little data as possible? Using sizeInBytes would be a bit more involved, since this would involve the planner, and (probably) adding a BuildSide parameter to CartesianProduct...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, use partition size here is not accurate, see a rdd with 100 partitions, and each partition has one record and a rdd with 10 partition and each partition has 100 million records, use the method above will cause more scan from hdfs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell Yes, use sizeInBytes is better, but also have a problem, if leftResults only have 1 record and this record size are big, and rightResults have many records and these records total size are small, then at this scenario will cause worse performance. The best way is we check the total records for the partition, but now we can not get it.

iter.map { r =>
numOutputRows += 1
joinedRow(r._1, r._2)
buildSide match {
case BuildLeft =>
iter.map { r =>
numOutputRows += 1
joinedRow(r._1, r._2)
}
case BuildRight =>
iter.map { r =>
numOutputRows += 1
joinedRow(r._2, r._1)
}
}
}
}
Expand Down
80 changes: 80 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ class JoinSuite extends QueryTest with SharedSQLContext {

setupTestData()

def statisticSizeInByte(df: DataFrame): BigInt = {
df.queryExecution.optimizedPlan.statistics.sizeInBytes
}

test("equi-join is hash-join") {
val x = testData2.as("x")
val y = testData2.as("y")
Expand Down Expand Up @@ -465,6 +469,82 @@ class JoinSuite extends QueryTest with SharedSQLContext {
sql("UNCACHE TABLE testData")
}

test("cross join with broadcast") {
sql("CACHE TABLE testData")

val sizeInByteOfTestData = statisticSizeInByte(sqlContext.table("testData"))

// we set the threshold is greater than statistic of the cached table testData
withSQLConf(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> (sizeInByteOfTestData + 1).toString()) {

assert(statisticSizeInByte(sqlContext.table("testData2")) >
sqlContext.conf.autoBroadcastJoinThreshold)

assert(statisticSizeInByte(sqlContext.table("testData")) <
sqlContext.conf.autoBroadcastJoinThreshold)

Seq(
("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a",
classOf[LeftSemiJoinHash]),
("SELECT * FROM testData LEFT SEMI JOIN testData2",
classOf[LeftSemiJoinBNL]),
("SELECT * FROM testData JOIN testData2",
classOf[BroadcastNestedLoopJoin]),
("SELECT * FROM testData JOIN testData2 WHERE key = 2",
classOf[BroadcastNestedLoopJoin]),
("SELECT * FROM testData LEFT JOIN testData2",
classOf[BroadcastNestedLoopJoin]),
("SELECT * FROM testData RIGHT JOIN testData2",
classOf[BroadcastNestedLoopJoin]),
("SELECT * FROM testData FULL OUTER JOIN testData2",
classOf[BroadcastNestedLoopJoin]),
("SELECT * FROM testData LEFT JOIN testData2 WHERE key = 2",
classOf[BroadcastNestedLoopJoin]),
("SELECT * FROM testData RIGHT JOIN testData2 WHERE key = 2",
classOf[BroadcastNestedLoopJoin]),
("SELECT * FROM testData FULL OUTER JOIN testData2 WHERE key = 2",
classOf[BroadcastNestedLoopJoin]),
("SELECT * FROM testData JOIN testData2 WHERE key > a",
classOf[BroadcastNestedLoopJoin]),
("SELECT * FROM testData FULL OUTER JOIN testData2 WHERE key > a",
classOf[BroadcastNestedLoopJoin]),
("SELECT * FROM testData left JOIN testData2 ON (key * a != key + a)",
classOf[BroadcastNestedLoopJoin]),
("SELECT * FROM testData right JOIN testData2 ON (key * a != key + a)",
classOf[BroadcastNestedLoopJoin]),
("SELECT * FROM testData full JOIN testData2 ON (key * a != key + a)",
classOf[BroadcastNestedLoopJoin])
).foreach { case (query, joinClass) => assertJoin(query, joinClass) }

checkAnswer(
sql(
"""
SELECT x.value, y.a, y.b FROM testData x JOIN testData2 y WHERE x.key = 2
""".stripMargin),
Row("2", 1, 1) ::
Row("2", 1, 2) ::
Row("2", 2, 1) ::
Row("2", 2, 2) ::
Row("2", 3, 1) ::
Row("2", 3, 2) :: Nil)

checkAnswer(
sql(
"""
SELECT x.value, y.a, y.b FROM testData x JOIN testData2 y WHERE x.key < y.a
""".stripMargin),
Row("1", 2, 1) ::
Row("1", 2, 2) ::
Row("1", 3, 1) ::
Row("1", 3, 2) ::
Row("2", 3, 1) ::
Row("2", 3, 2) :: Nil)
}

sql("UNCACHE TABLE testData")
}

test("left semi join") {
val df = sql("SELECT * FROM testData2 LEFT SEMI JOIN testData ON key = a")
checkAnswer(df,
Expand Down