Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,10 @@ case class DataSource(
private def getOrInferFileFormatSchema(
format: FileFormat,
justPartitioning: Boolean = false): (StructType, StructType) = {
// the operations below are expensive therefore try not to do them if we don't need to
lazy val tempFileCatalog = {
// the operations below are expensive therefore try not to do them if we don't need to, e.g.,
// in streaming mode, we have already inferred and registered partition columns, we will
// never have to materialize the lazy val below
lazy val tempFileIndex = {
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val globbedPaths = allPaths.toSeq.flatMap { path =>
Expand All @@ -133,25 +135,25 @@ case class DataSource(
val partitionSchema = if (partitionColumns.isEmpty && catalogTable.isEmpty) {
// Try to infer partitioning, because no DataSource in the read path provides the partitioning
// columns properly unless it is a Hive DataSource
val resolved = tempFileCatalog.partitionSchema.map { partitionField =>
val resolved = tempFileIndex.partitionSchema.map { partitionField =>
val equality = sparkSession.sessionState.conf.resolver
// SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred
userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse(
partitionField)
}
StructType(resolved)
} else {
// in streaming mode, we have already inferred and registered partition columns, we will
// never have to materialize the lazy val below
lazy val inferredPartitions = tempFileCatalog.partitionSchema
// maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this lazy val so that it's easy to track when tempFileCatalog is materialized.

// partitioning
if (userSpecifiedSchema.isEmpty) {
val inferredPartitions = tempFileIndex.partitionSchema
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still keep the inferredPartitions variable to document the meaning of tempFileIndex.partitionSchema.

inferredPartitions
} else {
val partitionFields = partitionColumns.map { partitionColumn =>
userSpecifiedSchema.flatMap(_.find(_.name == partitionColumn)).orElse {
val inferredOpt = inferredPartitions.find(_.name == partitionColumn)
val equality = sparkSession.sessionState.conf.resolver
userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse {
val inferredPartitions = tempFileIndex.partitionSchema
val inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn))
if (inferredOpt.isDefined) {
logDebug(
s"""Type of partition column: $partitionColumn not found in specified schema
Expand All @@ -163,7 +165,7 @@ case class DataSource(
|Falling back to inferred dataType if it exists.
""".stripMargin)
}
inferredPartitions.find(_.name == partitionColumn)
inferredOpt
}.getOrElse {
throw new AnalysisException(s"Failed to resolve the schema for $format for " +
s"the partition column: $partitionColumn. It must be specified manually.")
Expand All @@ -182,7 +184,7 @@ case class DataSource(
format.inferSchema(
sparkSession,
caseInsensitiveOptions,
tempFileCatalog.allFiles())
tempFileIndex.allFiles())
}.getOrElse {
throw new AnalysisException(
s"Unable to infer schema for $format. It must be specified manually.")
Expand Down Expand Up @@ -224,8 +226,11 @@ case class DataSource(
"you may be able to create a static DataFrame on that directory with " +
"'spark.read.load(directory)' and infer schema from it.")
}
val (schema, partCols) = getOrInferFileFormatSchema(format)
SourceInfo(s"FileSource[$path]", StructType(schema ++ partCols), partCols.fieldNames)
val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format)
SourceInfo(
s"FileSource[$path]",
StructType(dataSchema ++ partitionSchema),
partitionSchema.fieldNames)

case _ =>
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -379,7 +384,7 @@ case class DataSource(
globPath
}.toArray

val (dataSchema, inferredPartitionSchema) = getOrInferFileFormatSchema(format)
val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format)

val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {
Expand All @@ -388,12 +393,12 @@ case class DataSource(
catalogTable.get,
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(0L))
} else {
new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(inferredPartitionSchema))
new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema))
}

HadoopFsRelation(
fileCatalog,
partitionSchema = inferredPartitionSchema,
partitionSchema = partitionSchema,
dataSchema = dataSchema.asNullable,
bucketSpec = bucketSpec,
format,
Expand Down