diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index c79a9ac2dad81..260c0d7c0a037 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -370,17 +370,14 @@ private[parquet] object ParquetTypesConverter extends Logging { throw new IllegalArgumentException(s"Incorrectly formatted Parquet metadata path $origPath") } val path = origPath.makeQualified(fs) - if (!fs.getFileStatus(path).isDir) { - throw new IllegalArgumentException( - s"Expected $path for be a directory with Parquet files/metadata") - } - ParquetRelation.enableLogForwarding() val children = fs.listStatus(path).filterNot { status => val name = status.getPath.getName name(0) == '.' || name == FileOutputCommitter.SUCCEEDED_FILE_NAME } + ParquetRelation.enableLogForwarding() + // NOTE (lian): Parquet "_metadata" file can be very slow if the file consists of lots of row // groups. Since Parquet schema is replicated among all row groups, we only need to touch a // single row group to read schema related metadata. Notice that we are making assumptions that diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 172dcd6aa0ee3..2667ef1108c78 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -35,7 +35,6 @@ import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.test.TestSQLContext._ import org.apache.spark.util.Utils - case class TestRDDEntry(key: Int, value: String) case class NullReflectData( @@ -318,8 +317,30 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA val rdd_copy = sql("SELECT * FROM tmpx").collect() val rdd_orig = rdd.collect() for(i <- 0 to 99) { - assert(rdd_copy(i).apply(0) === rdd_orig(i).key, s"key error in line $i") - assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value in line $i") + assert(rdd_copy(i).apply(0) === rdd_orig(i).key, s"key error in line $i") + assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value error in line $i") + } + Utils.deleteRecursively(file) + } + + test("Read a parquet file instead of a directory") { + val file = getTempFilePath("parquet") + val path = file.toString + val fsPath = new Path(path) + val fs: FileSystem = fsPath.getFileSystem(TestSQLContext.sparkContext.hadoopConfiguration) + val rdd = TestSQLContext.sparkContext.parallelize((1 to 100)) + .map(i => TestRDDEntry(i, s"val_$i")) + rdd.coalesce(1).saveAsParquetFile(path) + + val children = fs.listStatus(fsPath).filter(_.getPath.getName.endsWith(".parquet")) + assert(children.length > 0) + val readFile = parquetFile(path + "/" + children(0).getPath.getName) + readFile.registerTempTable("tmpx") + val rdd_copy = sql("SELECT * FROM tmpx").collect() + val rdd_orig = rdd.collect() + for(i <- 0 to 99) { + assert(rdd_copy(i).apply(0) === rdd_orig(i).key, s"key error in line $i") + assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value error in line $i") } Utils.deleteRecursively(file) }