From 523ced2628a987d3eec2f62681547fbf2e5661f5 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 6 Oct 2016 00:13:32 -0700 Subject: [PATCH 1/4] [SPARK-17796][SQL] Support wildcard character in filename for LOAD DATA LOCAL INPATH --- .../spark/sql/execution/command/tables.scala | 23 ++++++++++++- .../sql/hive/execution/SQLQuerySuite.scala | 32 +++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 424ef58d76c5..80e350a58026 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command import java.io.File import java.net.URI +import java.nio.file.FileSystems import java.util.Date import scala.collection.mutable.ArrayBuffer @@ -246,7 +247,27 @@ case class LoadDataCommand( val loadPath = if (isLocal) { val uri = Utils.resolveURI(path) - if (!new File(uri.getPath()).exists()) { + val filePath = uri.getPath() + val exists = if (filePath.contains("*")) { + val splitPath = filePath.split(File.separator) + val filePattern = splitPath.last + val dir = splitPath.dropRight(1).mkString(File.separator) + if (dir.contains("*")) { + throw new AnalysisException( + s"LOAD DATA input path allows only filename wildcard: $path") + } + + val files = new File(dir).listFiles() + if (files == null) { + false + } else { + val matcher = FileSystems.getDefault.getPathMatcher("glob:" + filePattern) + files.exists(f => matcher.matches(FileSystems.getDefault.getPath(f.getName))) + } + } else { + new File(filePath).exists() + } + if (!exists) { throw new AnalysisException(s"LOAD DATA input path does not exist: $path") } uri diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 6f2a16662bf1..a4d604f5e43c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hive.execution +import java.io.{File, PrintWriter} import java.sql.{Date, Timestamp} import scala.sys.process.{Process, ProcessLogger} @@ -1886,6 +1887,37 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } + test("SPARK-17796 Support wildcard character in filename for LOAD DATA LOCAL INPATH") { + withTempDir { dir => + for (i <- 1 to 3) { + val writer = new PrintWriter(new File(s"$dir/part-r-0000$i")) + writer.write(s"$i") + writer.close() + } + for (i <- 5 to 7) { + val writer = new PrintWriter(new File(s"$dir/part-s-0000$i")) + writer.write(s"$i") + writer.close() + } + + withTable("load_t") { + sql("CREATE TABLE load_t (a STRING)") + sql(s"LOAD DATA LOCAL INPATH '$dir/*part-r*' INTO TABLE load_t") + checkAnswer(sql("SELECT * FROM load_t"), Seq(Row("1"), Row("2"), Row("3"))) + + val m = intercept[AnalysisException] { + sql("LOAD DATA LOCAL INPATH '/non-exist-folder/*part*' INTO TABLE load_t") + }.getMessage + assert(m.contains("LOAD DATA input path does not exist")) + + val m2 = intercept[AnalysisException] { + sql(s"LOAD DATA LOCAL INPATH '$dir*/*part*' INTO TABLE load_t") + }.getMessage + assert(m2.contains("LOAD DATA input path allows only filename wildcard")) + } + } + } + def testCommandAvailable(command: String): Boolean = { val attempt = Try(Process(command).run(ProcessLogger(_ => ())).exitValue()) attempt.isSuccess && attempt.get == 0 From 401c4eea6aa40f09280aadf300eb74c5a847f638 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 16 Oct 2016 22:32:17 -0700 Subject: [PATCH 2/4] Address comments. --- .../apache/spark/sql/execution/command/tables.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 80e350a58026..42cbd416a5c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -249,9 +249,10 @@ case class LoadDataCommand( val uri = Utils.resolveURI(path) val filePath = uri.getPath() val exists = if (filePath.contains("*")) { - val splitPath = filePath.split(File.separator) - val filePattern = splitPath.last - val dir = splitPath.dropRight(1).mkString(File.separator) + val fileSystem = FileSystems.getDefault + val pathPattern = fileSystem.getPath(filePath) + val dir = pathPattern.getParent.toString + val filePattern = pathPattern.getName(pathPattern.getNameCount - 1).toString if (dir.contains("*")) { throw new AnalysisException( s"LOAD DATA input path allows only filename wildcard: $path") @@ -261,8 +262,8 @@ case class LoadDataCommand( if (files == null) { false } else { - val matcher = FileSystems.getDefault.getPathMatcher("glob:" + filePattern) - files.exists(f => matcher.matches(FileSystems.getDefault.getPath(f.getName))) + val matcher = fileSystem.getPathMatcher("glob:" + filePattern) + files.exists(f => matcher.matches(fileSystem.getPath(f.getName))) } } else { new File(filePath).exists() From c74191ba1a3c0867b92953f3320716f93853db56 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 17 Oct 2016 12:01:28 -0700 Subject: [PATCH 3/4] Use Guava Files.write and `getFileName`. --- .../apache/spark/sql/execution/command/tables.scala | 2 +- .../spark/sql/hive/execution/SQLQuerySuite.scala | 10 ++++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 42cbd416a5c6..e31c8baaeeef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -252,7 +252,7 @@ case class LoadDataCommand( val fileSystem = FileSystems.getDefault val pathPattern = fileSystem.getPath(filePath) val dir = pathPattern.getParent.toString - val filePattern = pathPattern.getName(pathPattern.getNameCount - 1).toString + val filePattern = pathPattern.getFileName.toString if (dir.contains("*")) { throw new AnalysisException( s"LOAD DATA input path allows only filename wildcard: $path") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index a4d604f5e43c..130de024c00e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -18,11 +18,13 @@ package org.apache.spark.sql.hive.execution import java.io.{File, PrintWriter} +import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import scala.sys.process.{Process, ProcessLogger} import scala.util.Try +import com.google.common.io.Files import org.apache.hadoop.fs.Path import org.apache.spark.sql._ @@ -1890,14 +1892,10 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { test("SPARK-17796 Support wildcard character in filename for LOAD DATA LOCAL INPATH") { withTempDir { dir => for (i <- 1 to 3) { - val writer = new PrintWriter(new File(s"$dir/part-r-0000$i")) - writer.write(s"$i") - writer.close() + Files.write(s"$i", new File(s"$dir/part-r-0000$i"), StandardCharsets.UTF_8) } for (i <- 5 to 7) { - val writer = new PrintWriter(new File(s"$dir/part-s-0000$i")) - writer.write(s"$i") - writer.close() + Files.write(s"$i", new File(s"$dir/part-s-0000$i"), StandardCharsets.UTF_8) } withTable("load_t") { From 933ad856c7fb3712a39557e09da4bc22b75b905c Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 17 Oct 2016 12:20:03 -0700 Subject: [PATCH 4/4] Use absolute path pattern match. --- .../org/apache/spark/sql/execution/command/tables.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index e31c8baaeeef..5cc78914cf49 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -252,7 +252,6 @@ case class LoadDataCommand( val fileSystem = FileSystems.getDefault val pathPattern = fileSystem.getPath(filePath) val dir = pathPattern.getParent.toString - val filePattern = pathPattern.getFileName.toString if (dir.contains("*")) { throw new AnalysisException( s"LOAD DATA input path allows only filename wildcard: $path") @@ -262,8 +261,8 @@ case class LoadDataCommand( if (files == null) { false } else { - val matcher = fileSystem.getPathMatcher("glob:" + filePattern) - files.exists(f => matcher.matches(fileSystem.getPath(f.getName))) + val matcher = fileSystem.getPathMatcher("glob:" + pathPattern.toAbsolutePath) + files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) } } else { new File(filePath).exists()