Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private[sql] class ParquetRelation2(
.map(DataType.fromJson(_).asInstanceOf[StructType])

private lazy val metadataCache: MetadataCache = {
val meta = new MetadataCache
val meta = new MetadataCache()
meta.refresh()
meta
}
Expand Down Expand Up @@ -249,8 +249,6 @@ private[sql] class ParquetRelation2(
// Create the function to set input paths at the driver side.
val setInputPaths = ParquetRelation2.initializeDriverSideJobFunc(inputFiles) _

val footers = inputFiles.map(f => metadataCache.footers(f.getPath))

Utils.withDummyCallSite(sqlContext.sparkContext) {
// TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
// After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects
Expand All @@ -277,12 +275,6 @@ private[sql] class ParquetRelation2(
f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithEscapedAuthority)
}.toSeq

@transient val cachedFooters = footers.map { f =>
// In order to encode the authority of a Path containing special characters such as /,
// we need to use the string returned by the URI of the path to create a new Path.
new Footer(escapePathUserInfo(f.getFile), f.getParquetMetadata)
}.toSeq

private def escapePathUserInfo(path: Path): Path = {
val uri = path.toUri
new Path(new URI(
Expand All @@ -295,7 +287,6 @@ private[sql] class ParquetRelation2(
val inputFormat = if (cacheMetadata) {
new FilteringParquetRowInputFormat {
override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
}
} else {
new FilteringParquetRowInputFormat
Expand All @@ -313,14 +304,16 @@ private[sql] class ParquetRelation2(
}

private class MetadataCache {
import ParquetRelation2.RelationMemo

// `FileStatus` objects of all "_metadata" files.
private var metadataStatuses: Array[FileStatus] = _

// `FileStatus` objects of all "_common_metadata" files.
private var commonMetadataStatuses: Array[FileStatus] = _

// Parquet footer cache.
var footers: Map[Path, Footer] = _
var footers: RelationMemo[Map[Path, Footer]] = _

// `FileStatus` objects of all data files (Parquet part-files).
var dataStatuses: Array[FileStatus] = _
Expand All @@ -347,8 +340,9 @@ private[sql] class ParquetRelation2(
commonMetadataStatuses =
leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)

footers = {
val conf = SparkHadoopUtil.get.conf
val conf = SparkHadoopUtil.get.conf

def getFooters() = {
val taskSideMetaData = conf.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true)
val rawFooters = if (shouldMergeSchemas) {
ParquetFileReader.readAllFootersInParallel(
Expand All @@ -361,13 +355,15 @@ private[sql] class ParquetRelation2(
rawFooters.map(footer => footer.getFile -> footer).toMap
}

footers = new RelationMemo(getFooters())
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lazy to prevent reading all the footers in the referenced table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've recently upgraded Parquet from 1.6.0rc3 to 1.7.0, which doesn't require read footers from the driver side any more if a global arbitratie schema is already available.

I'm working on refactoring this part by:

  1. Don't read footers on driver side to compute Parquet file splits
  2. When no schema is provided and schema merging is not enabled, read the footer of a single Parquet file to figure out the schema. If the summary file is available, read it. Otherwise, pick a random data file.
  3. When no schema is provided and schema merging is enabled, read footers from all data files with a Spark job to figure out the schema.


// If we already get the schema, don't need to re-compute it since the schema merging is
// time-consuming.
if (dataSchema == null) {
dataSchema = {
val dataSchema0 = maybeDataSchema
.orElse(readSchema())
.orElse(maybeMetastoreSchema)
.orElse(readSchema())
.getOrElse(throw new AnalysisException(
s"Failed to discover schema of Parquet file(s) in the following location(s):\n" +
paths.mkString("\n\t")))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Metastore we trust. If the Metastore schema is available we bypass reading the footers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why we put readSchema() in front of maybeMetastoreSchema is that Parquet is case sensitive and case preserving, while Hive is neither. Also, table fields in Hive are always nullable, while Parquet is not.

This change is OK for Parquet files that are written by Hive at the first place, but for use cases where Parquet files written by other systems/tools are registered as an external Hive table, it may fail if uppercase letters and/or non-nullable fields appear in the Parquet schema. That's why we must read Parquet schema first and reconcile possible schema conflicts with ParquetRelation2.mergeMetastoreParquetSchema.

I think one thing we can do here can be having a SQLConf option spark.sql.parquet.trustMetastoreSchema which is set to false by default.

Expand Down Expand Up @@ -431,12 +427,20 @@ private[sql] class ParquetRelation2(
"No schema defined, " +
s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.")

ParquetRelation2.readSchema(filesToTouch.map(f => footers.apply(f.getPath)), sqlContext)
ParquetRelation2.readSchema(filesToTouch.map(f => footers.value.apply(f.getPath)), sqlContext)
}
}
}

private[sql] object ParquetRelation2 extends Logging {

// TODO: Move to utils
private[sql] class RelationMemo[T](memo: => T) {
lazy val value = {
memo
}
}

// Whether we should merge schemas collected from all Parquet part-files.
private[sql] val MERGE_SCHEMA = "mergeSchema"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.sources

import scala.collection.mutable
import scala.collection.{AbstractIterator, mutable}
import scala.util.Try

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -379,31 +379,25 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]

def refresh(): Unit = {
// We don't filter files/directories whose name start with "_" except "_temporary" here, as
// specific data sources may take advantages over them (e.g. Parquet _metadata and
// _common_metadata files). "_temporary" directories are explicitly ignored since failed
// tasks/jobs may leave partial/corrupted data files there.
def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): Set[FileStatus] = {
if (status.getPath.getName.toLowerCase == "_temporary") {
Set.empty
} else {
val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
val leafDirs = if (dirs.isEmpty) Set(status) else Set.empty[FileStatus]
files.toSet ++ leafDirs ++ dirs.flatMap(dir => listLeafFilesAndDirs(fs, dir))
}
}

leafFiles.clear()

val statuses = paths.flatMap { path =>
val statuses = paths.par.flatMap { path =>
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
Try(fs.getFileStatus(qualified)).toOption.toArray.flatMap(listLeafFilesAndDirs(fs, _))
val it = fs.listFiles(qualified, true)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Hadoop API to read all files for a partition instead of using Spark custom recursive code, that could poke the namenode more than necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this API doesn't exist in Hadoop 1...

val its = new AbstractIterator[FileStatus] {
def hasNext: Boolean = it.hasNext
def next(): FileStatus = {
it.next()
}
}
its
}.filterNot { status =>
// SPARK-8037: Ignores files like ".DS_Store" and other hidden files/directories
status.getPath.getName.startsWith(".")
}
}.toArray

val files = statuses.filterNot(_.isDir)
leafFiles ++= files.map(f => f.getPath -> f).toMap
Expand Down