From b4a7b72042c432ac05621d38e875244110bca9e7 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Mon, 23 Jun 2025 14:21:00 +0800 Subject: [PATCH] [SPARK-52339][SQL] Fix comparison of `InMemoryFileIndex` instances ### What changes were proposed in this pull request? This PR changes `InMemoryFileIndex#equals` to compare a non-distinct collection of root paths rather than a distinct set of root paths. Without this change, `InMemoryFileIndex#equals` considers the following two collections of root paths to be equal, even though they represent a different number of rows: ``` ["/tmp/test", "/tmp/test"] ["/tmp/test", "/tmp/test", "/tmp/test"] ``` ### Why are the changes needed? The bug can cause correctness issues, e.g. ``` // create test data val data = Seq((1, 2), (2, 3)).toDF("a", "b") data.write.mode("overwrite").csv("/tmp/test") val fileList1 = List.fill(2)("/tmp/test") val fileList2 = List.fill(3)("/tmp/test") val df1 = spark.read.schema("a int, b int").csv(fileList1: _*) val df2 = spark.read.schema("a int, b int").csv(fileList2: _*) df1.count() // correctly returns 4 df2.count() // correctly returns 6 // the following is the same as above, except df1 is persisted val df1 = spark.read.schema("a int, b int").csv(fileList1: _*).persist val df2 = spark.read.schema("a int, b int").csv(fileList2: _*) df1.count() // correctly returns 4 df2.count() // incorrectly returns 4!! ``` In the above example, df1 and df2 were created with a different number of paths: df1 has 2, and df2 has 3. But since the distinct set of root paths is the same (e.g., `Set("/tmp/test") == Set("/tmp/test"))`, the two dataframes are considered equal. Thus, when df1 is persisted, df2 uses df1's cached plan. The same bug also causes inappropriate exchange reuse. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51043 from bersprockets/multi_path_issue. Authored-by: Bruce Robbins Signed-off-by: Wenchen Fan --- .../datasources/InMemoryFileIndex.scala | 2 +- .../datasources/FileIndexSuite.scala | 24 +++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 44d31131e9c6d..8920ff88be519 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -101,7 +101,7 @@ class InMemoryFileIndex( } override def equals(other: Any): Boolean = other match { - case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet + case hdfs: InMemoryFileIndex => rootPaths.sorted == hdfs.rootPaths.sorted case _ => false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 9ac61f0cee5fc..54403ea99c813 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -554,6 +554,30 @@ class FileIndexSuite extends SharedSparkSession { assert(FileIndexOptions.isValidOption("modifiedafter")) assert(FileIndexOptions.isValidOption("pathglobfilter")) } + + test("SPARK-52339: Correctly compare root paths") { + withTempDir { dir => + val file1 = new File(dir, "text1.txt") + stringToFile(file1, "text1") + val file2 = new File(dir, "text2.txt") + stringToFile(file2, "text2") + val path1 = new Path(file1.getCanonicalPath) + val path2 = new Path(file2.getCanonicalPath) + + val schema = StructType(Seq(StructField("a", StringType, false))) + + // Verify that the order of paths doesn't matter + val fileIndex1a = new InMemoryFileIndex(spark, Seq(path1, path2), Map.empty, Some(schema)) + val fileIndex1b = new InMemoryFileIndex(spark, Seq(path2, path1), Map.empty, Some(schema)) + assert(fileIndex1a == fileIndex1b) + + // Verify that a different number of paths does matter + val fileIndex2a = new InMemoryFileIndex(spark, Seq(path1, path1), Map.empty, Some(schema)) + val fileIndex2b = new InMemoryFileIndex(spark, Seq(path1, path1, path1), + Map.empty, Some(schema)) + assert(fileIndex2a != fileIndex2b) + } + } } object DeletionRaceFileSystem {