From aa49f15b819f07508853f486b5359515f2734f5a Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Thu, 9 Dec 2021 16:00:53 +0800 Subject: [PATCH 1/4] [SPARK-37592][SQL] Improve performance of `JoinSelection` --- .../spark/sql/catalyst/plans/logical/hints.scala | 2 ++ .../org/apache/spark/sql/execution/Columnar.scala | 6 ++---- .../spark/sql/execution/SparkStrategies.scala | 14 +++++++++----- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala index edfe5d5dd032..5dc3eb707f6c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala @@ -62,6 +62,8 @@ case class ResolvedHint(child: LogicalPlan, hints: HintInfo = HintInfo()) */ case class JoinHint(leftHint: Option[HintInfo], rightHint: Option[HintInfo]) { + def isEmpty: Boolean = leftHint.isEmpty && rightHint.isEmpty + override def toString: String = { Seq( leftHint.map("leftHint=" + _), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala index d1e916842a21..f94d4197508d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala @@ -548,11 +548,9 @@ case class ApplyColumnarRulesAndInsertTransitions( def apply(plan: SparkPlan): SparkPlan = { var preInsertPlan: SparkPlan = plan - columnarRules.foreach((r : ColumnarRule) => - preInsertPlan = r.preColumnarTransitions(preInsertPlan)) + columnarRules.foreach( r => preInsertPlan = r.preColumnarTransitions(preInsertPlan)) var postInsertPlan = insertTransitions(preInsertPlan, outputsColumnar) - columnarRules.reverse.foreach((r : ColumnarRule) => - postInsertPlan = r.postColumnarTransitions(postInsertPlan)) + columnarRules.reverse.foreach( r => postInsertPlan = r.postColumnarTransitions(postInsertPlan)) postInsertPlan } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 90c2507a1e11..70d0638c124a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -266,11 +266,15 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } - createBroadcastHashJoin(true) - .orElse { if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None } - .orElse(createShuffleHashJoin(true)) - .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None } - .getOrElse(createJoinWithoutHint()) + if (hint.isEmpty) { + createJoinWithoutHint() + } else { + createBroadcastHashJoin(true) + .orElse { if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None } + .orElse(createShuffleHashJoin(true)) + .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None } + .getOrElse(createJoinWithoutHint()) + } case j @ ExtractSingleColumnNullAwareAntiJoin(leftKeys, rightKeys) => Seq(joins.BroadcastHashJoinExec(leftKeys, rightKeys, LeftAnti, BuildRight, From 56919dc23b1ec990322e473dd935514cdfabb886 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Fri, 10 Dec 2021 10:37:29 +0800 Subject: [PATCH 2/4] Update code --- .../apache/spark/sql/execution/SparkStrategies.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 70d0638c124a..9c2195d42786 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -343,10 +343,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } - createBroadcastNLJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint)) - .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None } - .getOrElse(createJoinWithoutHint()) - + if (hint.isEmpty) { + createJoinWithoutHint() + } else { + createBroadcastNLJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint)) + .orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None } + .getOrElse(createJoinWithoutHint()) + } // --- Cases where this strategy does not apply --------------------------------------------- case _ => Nil From 0e82c06bdc488f4e0694bb38e0026db7a0ed9b66 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Mon, 13 Dec 2021 14:05:55 +0800 Subject: [PATCH 3/4] Update sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala Co-authored-by: Wenchen Fan --- .../main/scala/org/apache/spark/sql/execution/Columnar.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala index f94d4197508d..0e6165fb6d80 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala @@ -548,7 +548,7 @@ case class ApplyColumnarRulesAndInsertTransitions( def apply(plan: SparkPlan): SparkPlan = { var preInsertPlan: SparkPlan = plan - columnarRules.foreach( r => preInsertPlan = r.preColumnarTransitions(preInsertPlan)) + columnarRules.foreach(r => preInsertPlan = r.preColumnarTransitions(preInsertPlan)) var postInsertPlan = insertTransitions(preInsertPlan, outputsColumnar) columnarRules.reverse.foreach( r => postInsertPlan = r.postColumnarTransitions(postInsertPlan)) postInsertPlan From 3ce77ee19851d3b721d849314effa826cf6251d8 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Mon, 13 Dec 2021 14:06:02 +0800 Subject: [PATCH 4/4] Update sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala Co-authored-by: Wenchen Fan --- .../main/scala/org/apache/spark/sql/execution/Columnar.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala index 0e6165fb6d80..147285c31fb4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala @@ -550,7 +550,7 @@ case class ApplyColumnarRulesAndInsertTransitions( var preInsertPlan: SparkPlan = plan columnarRules.foreach(r => preInsertPlan = r.preColumnarTransitions(preInsertPlan)) var postInsertPlan = insertTransitions(preInsertPlan, outputsColumnar) - columnarRules.reverse.foreach( r => postInsertPlan = r.postColumnarTransitions(postInsertPlan)) + columnarRules.reverse.foreach(r => postInsertPlan = r.postColumnarTransitions(postInsertPlan)) postInsertPlan } }