Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -370,17 +370,14 @@ private[parquet] object ParquetTypesConverter extends Logging {
throw new IllegalArgumentException(s"Incorrectly formatted Parquet metadata path $origPath")
}
val path = origPath.makeQualified(fs)
if (!fs.getFileStatus(path).isDir) {
throw new IllegalArgumentException(
s"Expected $path for be a directory with Parquet files/metadata")
}
ParquetRelation.enableLogForwarding()

val children = fs.listStatus(path).filterNot { status =>
val name = status.getPath.getName
name(0) == '.' || name == FileOutputCommitter.SUCCEEDED_FILE_NAME
}

ParquetRelation.enableLogForwarding()

// NOTE (lian): Parquet "_metadata" file can be very slow if the file consists of lots of row
// groups. Since Parquet schema is replicated among all row groups, we only need to touch a
// single row group to read schema related metadata. Notice that we are making assumptions that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._
import org.apache.spark.util.Utils


case class TestRDDEntry(key: Int, value: String)

case class NullReflectData(
Expand Down Expand Up @@ -318,8 +317,30 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
val rdd_copy = sql("SELECT * FROM tmpx").collect()
val rdd_orig = rdd.collect()
for(i <- 0 to 99) {
assert(rdd_copy(i).apply(0) === rdd_orig(i).key, s"key error in line $i")
assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value in line $i")
assert(rdd_copy(i).apply(0) === rdd_orig(i).key, s"key error in line $i")
assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value error in line $i")
}
Utils.deleteRecursively(file)
}

test("Read a parquet file instead of a directory") {
val file = getTempFilePath("parquet")
val path = file.toString
val fsPath = new Path(path)
val fs: FileSystem = fsPath.getFileSystem(TestSQLContext.sparkContext.hadoopConfiguration)
val rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
.map(i => TestRDDEntry(i, s"val_$i"))
rdd.coalesce(1).saveAsParquetFile(path)

val children = fs.listStatus(fsPath).filter(_.getPath.getName.endsWith(".parquet"))
assert(children.length > 0)
val readFile = parquetFile(path + "/" + children(0).getPath.getName)
readFile.registerTempTable("tmpx")
val rdd_copy = sql("SELECT * FROM tmpx").collect()
val rdd_orig = rdd.collect()
for(i <- 0 to 99) {
assert(rdd_copy(i).apply(0) === rdd_orig(i).key, s"key error in line $i")
assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value error in line $i")
}
Utils.deleteRecursively(file)
}
Expand Down