diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index 0a185b847206..a1bb5af1ab72 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker import org.apache.spark.sql.execution.datasources.FileFormatWriter import org.apache.spark.sql.execution.metric.SQLMetric -import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration /** @@ -75,18 +74,4 @@ object DataWritingCommand { attr.withName(outputName) } } - - /** - * Returns schema of logical plan with provided names. - * The length of provided names should be the same of the length of [[LogicalPlan.schema]]. - */ - def logicalPlanSchemaWithNames( - query: LogicalPlan, - names: Seq[String]): StructType = { - assert(query.schema.length == names.length, - "The length of provided names doesn't match the length of query schema.") - StructType(query.schema.zip(names).map { case (structField, outputName) => - structField.copy(name = outputName) - }) - } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index 1ff680ff9fce..66a8440f3d4c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -69,8 +69,8 @@ case class CreateHiveTableAsSelectCommand( // add the relation into catalog, just in case of failure occurs while data // processing. assert(tableDesc.schema.isEmpty) - val schema = DataWritingCommand.logicalPlanSchemaWithNames(query, outputColumnNames) - catalog.createTable(tableDesc.copy(schema = schema), ignoreIfExists = false) + catalog.createTable( + tableDesc.copy(schema = outputColumns.toStructType), ignoreIfExists = false) try { // Read back the metadata of the table which was created just now. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala index 0a73aaa94bc7..a24e902074c2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala @@ -66,7 +66,7 @@ case class InsertIntoHiveDirCommand( identifier = TableIdentifier(storage.locationUri.get.toString, Some("default")), tableType = org.apache.spark.sql.catalyst.catalog.CatalogTableType.VIEW, storage = storage, - schema = query.schema + schema = outputColumns.toStructType )) hiveTable.getMetadata.put(serdeConstants.SERIALIZATION_LIB, storage.serde.getOrElse(classOf[LazySimpleSerDe].getName)) @@ -104,8 +104,7 @@ case class InsertIntoHiveDirCommand( plan = child, hadoopConf = hadoopConf, fileSinkConf = fileSinkConf, - outputLocation = tmpPath.toString, - allColumns = outputColumns) + outputLocation = tmpPath.toString) val fs = writeToPath.getFileSystem(hadoopConf) if (overwrite && fs.exists(writeToPath)) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 75a0563e72c9..0ed464dad91b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -198,7 +198,6 @@ case class InsertIntoHiveTable( hadoopConf = hadoopConf, fileSinkConf = fileSinkConf, outputLocation = tmpLocation.toString, - allColumns = outputColumns, partitionAttributes = partitionAttributes) if (partition.nonEmpty) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala index e484356906e8..fb221a2d3b86 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala @@ -51,7 +51,6 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { hadoopConf: Configuration, fileSinkConf: FileSinkDesc, outputLocation: String, - allColumns: Seq[Attribute], customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty, partitionAttributes: Seq[Attribute] = Nil): Set[String] = { @@ -90,7 +89,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { fileFormat = new HiveFileFormat(fileSinkConf), committer = committer, outputSpec = - FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations, allColumns), + FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations, outputColumns), hadoopConf = hadoopConf, partitionColumns = partitionAttributes, bucketSpec = None, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 7eaac851b054..03eb21b5456d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -799,6 +799,25 @@ class HiveDDLSuite } } + test("SPARK-25313 Insert overwrite directory should output correct schema") { + withSQLConf(CONVERT_METASTORE_PARQUET.key -> "false") { + withTable("tbl") { + withView("view1") { + spark.sql("CREATE TABLE tbl(id long)") + spark.sql("INSERT OVERWRITE TABLE tbl VALUES 4") + spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl") + withTempPath { path => + spark.sql(s"INSERT OVERWRITE LOCAL DIRECTORY '${path.getCanonicalPath}' " + + "STORED AS PARQUET SELECT ID FROM view1") + val expectedSchema = StructType(Seq(StructField("ID", LongType, true))) + assert(spark.read.parquet(path.toString).schema == expectedSchema) + checkAnswer(spark.read.parquet(path.toString), Seq(Row(4))) + } + } + } + } + } + test("alter table partition - storage information") { sql("CREATE TABLE boxes (height INT, length INT) PARTITIONED BY (width INT)") sql("INSERT OVERWRITE TABLE boxes PARTITION (width=4) SELECT 4, 4")