From 552955a5293bc22da1cd644ffde90b883fc560e8 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 27 Feb 2017 15:46:18 +0800 Subject: [PATCH 1/4] [SPARK-19748][SQL]refresh function has an wrong order to do cache invalidate and regenerate the inmemory var for InMemoryFileIndex with FileStatusCache --- .../datasources/InMemoryFileIndex.scala | 2 +- .../datasources/FileIndexSuite.scala | 26 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 7531f0ae02e7..ee4d0863d977 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -66,8 +66,8 @@ class InMemoryFileIndex( } override def refresh(): Unit = { - refresh0() fileStatusCache.invalidateAll() + refresh0() } private def refresh0(): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 2b4c9f3ed327..5ed5ad17136d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -178,6 +178,32 @@ class FileIndexSuite extends SharedSQLContext { assert(catalog2.allFiles().nonEmpty) } } + + test("refresh for InMemoryFileIndex with FileStatusCache") { + withTempDir { dir => + val fileStatusCache = FileStatusCache.getOrCreate(spark) + val dirPath = new Path(dir.getCanonicalPath) + val catalog = new InMemoryFileIndex(spark, Seq(dirPath), Map.empty, + None, fileStatusCache) { + def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq + def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq + } + + assert(catalog.leafDirPaths.isEmpty) + assert(catalog.leafFilePaths.isEmpty) + + val file = new File(dir, "text.txt") + stringToFile(file, "text") + + catalog.refresh() + + val path = new Path(file.getCanonicalPath) + assert(catalog.leafFilePaths.nonEmpty && catalog + .leafFilePaths.forall(p => p.toString.startsWith("file:/"))) + assert(catalog.leafDirPaths.nonEmpty && catalog + .leafDirPaths.forall(p => p.toString.startsWith("file:/"))) + } + } } class FakeParentPathFileSystem extends RawLocalFileSystem { From fd3bb21597809409e7f33796589c9178744063c5 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 27 Feb 2017 15:53:46 +0800 Subject: [PATCH 2/4] modify the test case --- .../sql/execution/datasources/FileIndexSuite.scala | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 5ed5ad17136d..e4de77d40167 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -182,7 +182,7 @@ class FileIndexSuite extends SharedSQLContext { test("refresh for InMemoryFileIndex with FileStatusCache") { withTempDir { dir => val fileStatusCache = FileStatusCache.getOrCreate(spark) - val dirPath = new Path(dir.getCanonicalPath) + val dirPath = new Path(dir.getAbsolutePath) val catalog = new InMemoryFileIndex(spark, Seq(dirPath), Map.empty, None, fileStatusCache) { def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq @@ -197,11 +197,13 @@ class FileIndexSuite extends SharedSQLContext { catalog.refresh() - val path = new Path(file.getCanonicalPath) - assert(catalog.leafFilePaths.nonEmpty && catalog - .leafFilePaths.forall(p => p.toString.startsWith("file:/"))) - assert(catalog.leafDirPaths.nonEmpty && catalog - .leafDirPaths.forall(p => p.toString.startsWith("file:/"))) + assert(catalog.leafFilePaths.size == 1) + assert(catalog.leafFilePaths.head.toString.stripSuffix("/") == + s"file:${file.getAbsolutePath.stripSuffix("/")}") + + assert(catalog.leafDirPaths.size == 1) + assert(catalog.leafDirPaths.head.toString.stripSuffix("/") == + s"file:${dir.getAbsolutePath.stripSuffix("/")}") } } } From 1ec20a5146e7e7c79386d0c0e9fdffad3254cdc6 Mon Sep 17 00:00:00 2001 From: windpiger Date: Tue, 28 Feb 2017 09:34:17 +0800 Subject: [PATCH 3/4] fix some cody style --- .../sql/execution/datasources/FileIndexSuite.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index e4de77d40167..609d97045f95 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -183,8 +183,9 @@ class FileIndexSuite extends SharedSQLContext { withTempDir { dir => val fileStatusCache = FileStatusCache.getOrCreate(spark) val dirPath = new Path(dir.getAbsolutePath) - val catalog = new InMemoryFileIndex(spark, Seq(dirPath), Map.empty, - None, fileStatusCache) { + val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf()) + val catalog = + new InMemoryFileIndex(spark, Seq(dirPath), Map.empty, None, fileStatusCache) { def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq } @@ -198,12 +199,10 @@ class FileIndexSuite extends SharedSQLContext { catalog.refresh() assert(catalog.leafFilePaths.size == 1) - assert(catalog.leafFilePaths.head.toString.stripSuffix("/") == - s"file:${file.getAbsolutePath.stripSuffix("/")}") + assert(catalog.leafFilePaths.head == fs.makeQualified(new Path(file.getAbsolutePath))) assert(catalog.leafDirPaths.size == 1) - assert(catalog.leafDirPaths.head.toString.stripSuffix("/") == - s"file:${dir.getAbsolutePath.stripSuffix("/")}") + assert(catalog.leafDirPaths.head == fs.makeQualified(dirPath)) } } } From 94879a8528a3d50cf67c8952b05ac9e408f5ecdd Mon Sep 17 00:00:00 2001 From: windpiger Date: Tue, 28 Feb 2017 12:15:23 +0800 Subject: [PATCH 4/4] fix some code style --- .../sql/execution/datasources/FileIndexSuite.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 609d97045f95..efbfc2417dfe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -186,15 +186,14 @@ class FileIndexSuite extends SharedSQLContext { val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf()) val catalog = new InMemoryFileIndex(spark, Seq(dirPath), Map.empty, None, fileStatusCache) { - def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq - def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq - } - - assert(catalog.leafDirPaths.isEmpty) - assert(catalog.leafFilePaths.isEmpty) + def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq + def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq + } val file = new File(dir, "text.txt") stringToFile(file, "text") + assert(catalog.leafDirPaths.isEmpty) + assert(catalog.leafFilePaths.isEmpty) catalog.refresh()