Skip to content

Commit ff78fdb

Browse files
committed
Fix InsertIntoHiveDirCommand output schema
1 parent ed249db commit ff78fdb

File tree

6 files changed

+22
-23
lines changed

6 files changed

+22
-23
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import org.apache.spark.sql.execution.SparkPlan
2626
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
2727
import org.apache.spark.sql.execution.datasources.FileFormatWriter
2828
import org.apache.spark.sql.execution.metric.SQLMetric
29-
import org.apache.spark.sql.types.StructType
3029
import org.apache.spark.util.SerializableConfiguration
3130

3231
/**
@@ -75,18 +74,4 @@ object DataWritingCommand {
7574
attr.withName(outputName)
7675
}
7776
}
78-
79-
/**
80-
* Returns schema of logical plan with provided names.
81-
* The length of provided names should be the same of the length of [[LogicalPlan.schema]].
82-
*/
83-
def logicalPlanSchemaWithNames(
84-
query: LogicalPlan,
85-
names: Seq[String]): StructType = {
86-
assert(query.schema.length == names.length,
87-
"The length of provided names doesn't match the length of query schema.")
88-
StructType(query.schema.zip(names).map { case (structField, outputName) =>
89-
structField.copy(name = outputName)
90-
})
91-
}
9277
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ case class CreateHiveTableAsSelectCommand(
6969
// add the relation into catalog, just in case of failure occurs while data
7070
// processing.
7171
assert(tableDesc.schema.isEmpty)
72-
val schema = DataWritingCommand.logicalPlanSchemaWithNames(query, outputColumnNames)
73-
catalog.createTable(tableDesc.copy(schema = schema), ignoreIfExists = false)
72+
catalog.createTable(
73+
tableDesc.copy(schema = outputColumns.toStructType), ignoreIfExists = false)
7474

7575
try {
7676
// Read back the metadata of the table which was created just now.

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ case class InsertIntoHiveDirCommand(
6666
identifier = TableIdentifier(storage.locationUri.get.toString, Some("default")),
6767
tableType = org.apache.spark.sql.catalyst.catalog.CatalogTableType.VIEW,
6868
storage = storage,
69-
schema = query.schema
69+
schema = outputColumns.toStructType
7070
))
7171
hiveTable.getMetadata.put(serdeConstants.SERIALIZATION_LIB,
7272
storage.serde.getOrElse(classOf[LazySimpleSerDe].getName))
@@ -104,8 +104,7 @@ case class InsertIntoHiveDirCommand(
104104
plan = child,
105105
hadoopConf = hadoopConf,
106106
fileSinkConf = fileSinkConf,
107-
outputLocation = tmpPath.toString,
108-
allColumns = outputColumns)
107+
outputLocation = tmpPath.toString)
109108

110109
val fs = writeToPath.getFileSystem(hadoopConf)
111110
if (overwrite && fs.exists(writeToPath)) {

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,6 @@ case class InsertIntoHiveTable(
198198
hadoopConf = hadoopConf,
199199
fileSinkConf = fileSinkConf,
200200
outputLocation = tmpLocation.toString,
201-
allColumns = outputColumns,
202201
partitionAttributes = partitionAttributes)
203202

204203
if (partition.nonEmpty) {

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
5151
hadoopConf: Configuration,
5252
fileSinkConf: FileSinkDesc,
5353
outputLocation: String,
54-
allColumns: Seq[Attribute],
5554
customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty,
5655
partitionAttributes: Seq[Attribute] = Nil): Set[String] = {
5756

@@ -90,7 +89,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
9089
fileFormat = new HiveFileFormat(fileSinkConf),
9190
committer = committer,
9291
outputSpec =
93-
FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations, allColumns),
92+
FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations, outputColumns),
9493
hadoopConf = hadoopConf,
9594
partitionColumns = partitionAttributes,
9695
bucketSpec = None,

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -803,6 +803,23 @@ class HiveDDLSuite
803803
}
804804
}
805805

806+
test("Insert overwrite directory should output correct schema") {
807+
withSQLConf(CONVERT_METASTORE_PARQUET.key -> "false") {
808+
withView("view1") {
809+
spark.sql("CREATE TABLE tbl(id long)")
810+
spark.sql("INSERT OVERWRITE TABLE tbl VALUES 4")
811+
spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl")
812+
withTempPath { path =>
813+
spark.sql(s"INSERT OVERWRITE LOCAL DIRECTORY '${path.getCanonicalPath}' " +
814+
"STORED AS PARQUET SELECT ID FROM view1")
815+
val expectedSchema = StructType(Seq(StructField("ID", LongType, true)))
816+
assert(spark.read.parquet(path.toString).schema == expectedSchema)
817+
checkAnswer(spark.read.parquet(path.toString), Seq(Row(4)))
818+
}
819+
}
820+
}
821+
}
822+
806823
test("alter table partition - storage information") {
807824
sql("CREATE TABLE boxes (height INT, length INT) PARTITIONED BY (width INT)")
808825
sql("INSERT OVERWRITE TABLE boxes PARTITION (width=4) SELECT 4, 4")

0 commit comments

Comments
 (0)