Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ case class CreateDataSourceTableAsSelectCommand(
catalogTable = if (tableExists) Some(table) else None)

try {
dataSource.write(mode, Dataset.ofRows(session, query))
dataSource.writeAndRead(mode, Dataset.ofRows(session, query))
} catch {
case ex: AnalysisException =>
logError(s"Failed to write to table ${table.identifier.unquotedString}", ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,10 +413,85 @@ case class DataSource(
relation
}

/** Writes the given [[DataFrame]] out to this [[DataSource]]. */
def write(
mode: SaveMode,
data: DataFrame): BaseRelation = {
/**
* Writes the given [[DataFrame]] out in this [[FileFormat]].
*/
private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: DataFrame): Unit = {
// Don't glob path for the write path. The contracts here are:
// 1. Only one output path can be specified on the write path;
// 2. Output path must be a legal HDFS style file system path;
// 3. It's OK that the output path doesn't exist yet;
val allPaths = paths ++ caseInsensitiveOptions.get("path")
val outputPath = if (allPaths.length == 1) {
val path = new Path(allPaths.head)
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
} else {
throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
s"got: ${allPaths.mkString(", ")}")
}

val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, caseSensitive)

// If we are appending to a table that already exists, make sure the partitioning matches
// up. If we fail to load the table for whatever reason, ignore the check.
if (mode == SaveMode.Append) {
val existingPartitionColumns = Try {
getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList
}.getOrElse(Seq.empty[String])
// TODO: Case sensitivity.
val sameColumns =
existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
if (existingPartitionColumns.nonEmpty && !sameColumns) {
throw new AnalysisException(
s"""Requested partitioning does not match existing partitioning.
|Existing partitioning columns:
| ${existingPartitionColumns.mkString(", ")}
|Requested partitioning columns:
| ${partitionColumns.mkString(", ")}
|""".stripMargin)
}
}

// SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
// not need to have the query as child, to avoid to analyze an optimized query,
// because InsertIntoHadoopFsRelationCommand will be optimized first.
val partitionAttributes = partitionColumns.map { name =>
val plan = data.logicalPlan
plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse {
throw new AnalysisException(
s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]")
}.asInstanceOf[Attribute]
}
val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
sparkSession.table(tableIdent).queryExecution.analyzed.collect {
case LogicalRelation(t: HadoopFsRelation, _, _) => t.location
}.head
}
// For partitioned relation r, r.schema's column ordering can be different from the column
// ordering of data.logicalPlan (partition columns are all moved after data column). This
// will be adjusted within InsertIntoHadoopFsRelation.
val plan =
InsertIntoHadoopFsRelationCommand(
outputPath = outputPath,
staticPartitions = Map.empty,
partitionColumns = partitionAttributes,
bucketSpec = bucketSpec,
fileFormat = format,
options = options,
query = data.logicalPlan,
mode = mode,
catalogTable = catalogTable,
fileIndex = fileIndex)
sparkSession.sessionState.executePlan(plan).toRdd
Copy link
Member Author

@gatorsmile gatorsmile Jan 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To the reviewers, the code in writeInFileFormat is copied from the case FileFormat of the original write function.

}

/**
* Writes the given [[DataFrame]] out to this [[DataSource]] and returns a [[BaseRelation]] for
* the following reading.
*/
def writeAndRead(mode: SaveMode, data: DataFrame): BaseRelation = {
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
}
Expand All @@ -425,78 +500,27 @@ case class DataSource(
case dataSource: CreatableRelationProvider =>
dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
case format: FileFormat =>
// Don't glob path for the write path. The contracts here are:
// 1. Only one output path can be specified on the write path;
// 2. Output path must be a legal HDFS style file system path;
// 3. It's OK that the output path doesn't exist yet;
val allPaths = paths ++ caseInsensitiveOptions.get("path")
val outputPath = if (allPaths.length == 1) {
val path = new Path(allPaths.head)
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
} else {
throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
s"got: ${allPaths.mkString(", ")}")
}

val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
PartitioningUtils.validatePartitionColumn(
data.schema, partitionColumns, caseSensitive)

// If we are appending to a table that already exists, make sure the partitioning matches
// up. If we fail to load the table for whatever reason, ignore the check.
if (mode == SaveMode.Append) {
val existingPartitionColumns = Try {
getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList
}.getOrElse(Seq.empty[String])
// TODO: Case sensitivity.
val sameColumns =
existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
if (existingPartitionColumns.nonEmpty && !sameColumns) {
throw new AnalysisException(
s"""Requested partitioning does not match existing partitioning.
|Existing partitioning columns:
| ${existingPartitionColumns.mkString(", ")}
|Requested partitioning columns:
| ${partitionColumns.mkString(", ")}
|""".stripMargin)
}
}

// SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
// not need to have the query as child, to avoid to analyze an optimized query,
// because InsertIntoHadoopFsRelationCommand will be optimized first.
val partitionAttributes = partitionColumns.map { name =>
val plan = data.logicalPlan
plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse {
throw new AnalysisException(
s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]")
}.asInstanceOf[Attribute]
}
val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
sparkSession.table(tableIdent).queryExecution.analyzed.collect {
case LogicalRelation(t: HadoopFsRelation, _, _) => t.location
}.head
}
// For partitioned relation r, r.schema's column ordering can be different from the column
// ordering of data.logicalPlan (partition columns are all moved after data column). This
// will be adjusted within InsertIntoHadoopFsRelation.
val plan =
InsertIntoHadoopFsRelationCommand(
outputPath = outputPath,
staticPartitions = Map.empty,
partitionColumns = partitionAttributes,
bucketSpec = bucketSpec,
fileFormat = format,
options = options,
query = data.logicalPlan,
mode = mode,
catalogTable = catalogTable,
fileIndex = fileIndex)
sparkSession.sessionState.executePlan(plan).toRdd
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it.
writeInFileFormat(format, mode, data)
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
case _ =>
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
}
}

/**
* Writes the given [[DataFrame]] out to this [[DataSource]].
*/
def write(mode: SaveMode, data: DataFrame): Unit = {
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
}

providingClass.newInstance() match {
case dataSource: CreatableRelationProvider =>
dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
case format: FileFormat =>
writeInFileFormat(format, mode, data)
case _ =>
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,12 @@ class PartitionedTablePerfStatsSuite
}

private def setupPartitionedHiveTable(
tableName: String, dir: File, scale: Int,
clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit = {
tableName: String, dir: File, scale: Int, repair: Boolean = true): Unit = {
spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
.partitionBy("partCol1", "partCol2")
.mode("overwrite")
.parquet(dir.getAbsolutePath)

if (clearMetricsBeforeCreate) {
HiveCatalogMetrics.reset()
}

spark.sql(s"""
|create external table $tableName (fieldOne long)
|partitioned by (partCol1 int, partCol2 int)
Expand All @@ -88,17 +83,12 @@ class PartitionedTablePerfStatsSuite
}

private def setupPartitionedDatasourceTable(
tableName: String, dir: File, scale: Int,
clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit = {
tableName: String, dir: File, scale: Int, repair: Boolean = true): Unit = {
spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
.partitionBy("partCol1", "partCol2")
.mode("overwrite")
.parquet(dir.getAbsolutePath)

if (clearMetricsBeforeCreate) {
HiveCatalogMetrics.reset()
}

spark.sql(s"""
|create table $tableName (fieldOne long, partCol1 int, partCol2 int)
|using parquet
Expand Down Expand Up @@ -271,8 +261,8 @@ class PartitionedTablePerfStatsSuite
withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
withTable("test") {
withTempDir { dir =>
setupPartitionedDatasourceTable(
"test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = false)
HiveCatalogMetrics.reset()
setupPartitionedDatasourceTable("test", dir, scale = 10, repair = false)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
}
Expand All @@ -285,8 +275,7 @@ class PartitionedTablePerfStatsSuite
withTable("test") {
withTempDir { dir =>
HiveCatalogMetrics.reset()
setupPartitionedHiveTable(
"test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = false)
setupPartitionedHiveTable("test", dir, scale = 10, repair = false)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
}
Expand Down Expand Up @@ -416,12 +405,8 @@ class PartitionedTablePerfStatsSuite
})
executorPool.shutdown()
executorPool.awaitTermination(30, TimeUnit.SECONDS)
// check the cache hit, we use the metric of METRIC_FILES_DISCOVERED and
// METRIC_PARALLEL_LISTING_JOB_COUNT to check this, while the lock take effect,
// only one thread can really do the build, so the listing job count is 2, the other
// one is cache.load func. Also METRIC_FILES_DISCOVERED is $partition_num * 2
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 100)
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 2)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 50)
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 1)
}
}
}
Expand Down