File tree Expand file tree Collapse file tree 1 file changed +17
-14
lines changed
sql/core/src/main/scala/org/apache/spark/sql/execution/joins Expand file tree Collapse file tree 1 file changed +17
-14
lines changed Original file line number Diff line number Diff line change @@ -681,26 +681,29 @@ case class SortMergeJoinExec(
681681 val cond = BindReferences .bindReference(
682682 condition.get, streamedPlan.output ++ bufferedPlan.output).genCode(ctx)
683683 // Evaluate the columns those used by condition before loop
684- val before =
685- s """
686- |boolean $loaded = false;
687- | $streamedBefore
688- """ .stripMargin
689-
690- val loadStreamed =
691- s """
692- |if (! $loaded) {
693- | $loaded = true;
694- | $streamedAfter
695- |}
684+ val before = joinType match {
685+ case LeftAnti =>
686+ // No need to initialize `loaded` variable for Left Anti join.
687+ streamedBefore.trim
688+ case _ =>
689+ s """
690+ |boolean $loaded = false;
691+ | $streamedBefore
696692 """ .stripMargin
693+ }
697694
698695 val loadStreamedAfterCondition = joinType match {
699696 case LeftAnti =>
700697 // No need to evaluate columns not used by condition from streamed side, as for Left Anti
701698 // join, streamed row with match is not outputted.
702699 " "
703- case _ => loadStreamed
700+ case _ =>
701+ s """
702+ |if (! $loaded) {
703+ | $loaded = true;
704+ | $streamedAfter
705+ |}
706+ """ .stripMargin
704707 }
705708
706709 val loadBufferedAfterCondition = joinType match {
@@ -722,7 +725,7 @@ case class SortMergeJoinExec(
722725 | $loadStreamedAfterCondition
723726 | $loadBufferedAfterCondition
724727 """ .stripMargin
725- (before, checking.trim, loadStreamed )
728+ (before, checking.trim, streamedAfter.trim )
726729 } else {
727730 (evaluateVariables(streamedVars), " " , " " )
728731 }
You can’t perform that action at this time.
0 commit comments