Skip to content

Commit 60e56bc

Browse files
wangyumdongjoon-hyun
authored andcommitted
[SPARK-25313][SQL][FOLLOW-UP][BACKPORT-2.3] Fix InsertIntoHiveDirCommand output schema in Parquet issue
## What changes were proposed in this pull request? Backport #22359 to branch-2.3. ## How was this patch tested? unit tests Closes #22387 from wangyum/SPARK-25313-FOLLOW-UP-branch-2.3. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 4b57818 commit 60e56bc

File tree

6 files changed

+24
-23
lines changed

6 files changed

+24
-23
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import org.apache.spark.sql.execution.SparkPlan
2626
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
2727
import org.apache.spark.sql.execution.datasources.FileFormatWriter
2828
import org.apache.spark.sql.execution.metric.SQLMetric
29-
import org.apache.spark.sql.types.StructType
3029
import org.apache.spark.util.SerializableConfiguration
3130

3231
/**
@@ -75,18 +74,4 @@ object DataWritingCommand {
7574
attr.withName(outputName)
7675
}
7776
}
78-
79-
/**
80-
* Returns schema of logical plan with provided names.
81-
* The length of provided names should be the same of the length of [[LogicalPlan.schema]].
82-
*/
83-
def logicalPlanSchemaWithNames(
84-
query: LogicalPlan,
85-
names: Seq[String]): StructType = {
86-
assert(query.schema.length == names.length,
87-
"The length of provided names doesn't match the length of query schema.")
88-
StructType(query.schema.zip(names).map { case (structField, outputName) =>
89-
structField.copy(name = outputName)
90-
})
91-
}
9277
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ case class CreateHiveTableAsSelectCommand(
6969
// add the relation into catalog, just in case of failure occurs while data
7070
// processing.
7171
assert(tableDesc.schema.isEmpty)
72-
val schema = DataWritingCommand.logicalPlanSchemaWithNames(query, outputColumnNames)
73-
catalog.createTable(tableDesc.copy(schema = schema), ignoreIfExists = false)
72+
catalog.createTable(
73+
tableDesc.copy(schema = outputColumns.toStructType), ignoreIfExists = false)
7474

7575
try {
7676
// Read back the metadata of the table which was created just now.

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ case class InsertIntoHiveDirCommand(
6666
identifier = TableIdentifier(storage.locationUri.get.toString, Some("default")),
6767
tableType = org.apache.spark.sql.catalyst.catalog.CatalogTableType.VIEW,
6868
storage = storage,
69-
schema = query.schema
69+
schema = outputColumns.toStructType
7070
))
7171
hiveTable.getMetadata.put(serdeConstants.SERIALIZATION_LIB,
7272
storage.serde.getOrElse(classOf[LazySimpleSerDe].getName))
@@ -104,8 +104,7 @@ case class InsertIntoHiveDirCommand(
104104
plan = child,
105105
hadoopConf = hadoopConf,
106106
fileSinkConf = fileSinkConf,
107-
outputLocation = tmpPath.toString,
108-
allColumns = outputColumns)
107+
outputLocation = tmpPath.toString)
109108

110109
val fs = writeToPath.getFileSystem(hadoopConf)
111110
if (overwrite && fs.exists(writeToPath)) {

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,6 @@ case class InsertIntoHiveTable(
198198
hadoopConf = hadoopConf,
199199
fileSinkConf = fileSinkConf,
200200
outputLocation = tmpLocation.toString,
201-
allColumns = outputColumns,
202201
partitionAttributes = partitionAttributes)
203202

204203
if (partition.nonEmpty) {

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
5151
hadoopConf: Configuration,
5252
fileSinkConf: FileSinkDesc,
5353
outputLocation: String,
54-
allColumns: Seq[Attribute],
5554
customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty,
5655
partitionAttributes: Seq[Attribute] = Nil): Set[String] = {
5756

@@ -90,7 +89,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
9089
fileFormat = new HiveFileFormat(fileSinkConf),
9190
committer = committer,
9291
outputSpec =
93-
FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations, allColumns),
92+
FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations, outputColumns),
9493
hadoopConf = hadoopConf,
9594
partitionColumns = partitionAttributes,
9695
bucketSpec = None,

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -799,6 +799,25 @@ class HiveDDLSuite
799799
}
800800
}
801801

802+
test("SPARK-25313 Insert overwrite directory should output correct schema") {
803+
withSQLConf(CONVERT_METASTORE_PARQUET.key -> "false") {
804+
withTable("tbl") {
805+
withView("view1") {
806+
spark.sql("CREATE TABLE tbl(id long)")
807+
spark.sql("INSERT OVERWRITE TABLE tbl VALUES 4")
808+
spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl")
809+
withTempPath { path =>
810+
spark.sql(s"INSERT OVERWRITE LOCAL DIRECTORY '${path.getCanonicalPath}' " +
811+
"STORED AS PARQUET SELECT ID FROM view1")
812+
val expectedSchema = StructType(Seq(StructField("ID", LongType, true)))
813+
assert(spark.read.parquet(path.toString).schema == expectedSchema)
814+
checkAnswer(spark.read.parquet(path.toString), Seq(Row(4)))
815+
}
816+
}
817+
}
818+
}
819+
}
820+
802821
test("alter table partition - storage information") {
803822
sql("CREATE TABLE boxes (height INT, length INT) PARTITIONED BY (width INT)")
804823
sql("INSERT OVERWRITE TABLE boxes PARTITION (width=4) SELECT 4, 4")

0 commit comments

Comments
 (0)