Skip to content

Commit 5fbf5f9

Browse files
ericlrxin
authored andcommitted
[SPARK-16818] Exchange reuse incorrectly reuses scans over different sets of partitions
#14425 rebased for branch-2.0 Author: Eric Liang <ekl@databricks.com> Closes #14427 from ericl/spark-16818-br-2.
1 parent 1813bbd commit 5fbf5f9

File tree

2 files changed

+36
-1
lines changed

2 files changed

+36
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
202202
partitions
203203
}
204204

205+
// These metadata values make scan plans uniquely identifiable for equality checking.
205206
val meta = Map(
207+
"PartitionFilters" -> partitionKeyFilters.mkString("[", ", ", "]"),
206208
"Format" -> files.fileFormat.toString,
207209
"ReadSchema" -> prunedDataSchema.simpleString,
208210
PUSHED_FILTERS -> pushedDownFilters.mkString("[", ", ", "]"),

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.sql._
2929
import org.apache.spark.sql.catalyst.InternalRow
3030
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper}
3131
import org.apache.spark.sql.catalyst.util
32-
import org.apache.spark.sql.execution.DataSourceScanExec
32+
import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan}
3333
import org.apache.spark.sql.functions._
3434
import org.apache.spark.sql.internal.SQLConf
3535
import org.apache.spark.sql.sources._
@@ -407,6 +407,39 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
407407
}
408408
}
409409

410+
test("[SPARK-16818] partition pruned file scans implement sameResult correctly") {
411+
withTempPath { path =>
412+
val tempDir = path.getCanonicalPath
413+
spark.range(100)
414+
.selectExpr("id", "id as b")
415+
.write
416+
.partitionBy("id")
417+
.parquet(tempDir)
418+
val df = spark.read.parquet(tempDir)
419+
def getPlan(df: DataFrame): SparkPlan = {
420+
df.queryExecution.executedPlan
421+
}
422+
assert(getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 2"))))
423+
assert(!getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 3"))))
424+
}
425+
}
426+
427+
test("[SPARK-16818] exchange reuse respects differences in partition pruning") {
428+
spark.conf.set("spark.sql.exchange.reuse", true)
429+
withTempPath { path =>
430+
val tempDir = path.getCanonicalPath
431+
spark.range(10)
432+
.selectExpr("id % 2 as a", "id % 3 as b", "id as c")
433+
.write
434+
.partitionBy("a")
435+
.parquet(tempDir)
436+
val df = spark.read.parquet(tempDir)
437+
val df1 = df.where("a = 0").groupBy("b").agg("c" -> "sum")
438+
val df2 = df.where("a = 1").groupBy("b").agg("c" -> "sum")
439+
checkAnswer(df1.join(df2, "b"), Row(0, 6, 12) :: Row(1, 4, 8) :: Row(2, 10, 5) :: Nil)
440+
}
441+
}
442+
410443
// Helpers for checking the arguments passed to the FileFormat.
411444

412445
protected val checkPartitionSchema =

0 commit comments

Comments
 (0)