Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -485,17 +485,19 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
def castChildOutput(p: InsertIntoTable, table: MetastoreRelation, child: LogicalPlan)
: LogicalPlan = {
val childOutputDataTypes = child.output.map(_.dataType)
val numDynamicPartitions = p.partition.values.count(_.isEmpty)
val tableOutputDataTypes =
(table.attributes ++ table.partitionKeys).take(child.output.length).map(_.dataType)
(table.attributes ++ table.partitionKeys.takeRight(numDynamicPartitions))
.take(child.output.length).map(_.dataType)

if (childOutputDataTypes == tableOutputDataTypes) {
p
InsertIntoHiveTable(table, p.partition, p.child, p.overwrite, p.ifNotExists)
} else if (childOutputDataTypes.size == tableOutputDataTypes.size &&
childOutputDataTypes.zip(tableOutputDataTypes)
.forall { case (left, right) => left.sameType(right) }) {
// If both types ignoring nullability of ArrayType, MapType, StructType are the same,
// use InsertIntoHiveTable instead of InsertIntoTable.
InsertIntoHiveTable(p.table, p.partition, p.child, p.overwrite, p.ifNotExists)
InsertIntoHiveTable(table, p.partition, p.child, p.overwrite, p.ifNotExists)
} else {
// Only do the casting when child output data types differ from table output data types.
val castedChildOutput = child.output.zip(table.output).map {
Expand Down Expand Up @@ -530,7 +532,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
* because Hive table doesn't have nullability for ARRAY, MAP, STRUCT types.
*/
private[hive] case class InsertIntoHiveTable(
table: LogicalPlan,
table: MetastoreRelation,
partition: Map[String, Option[String]],
child: LogicalPlan,
overwrite: Boolean,
Expand All @@ -540,7 +542,13 @@ private[hive] case class InsertIntoHiveTable(
override def children: Seq[LogicalPlan] = child :: Nil
override def output: Seq[Attribute] = child.output

override lazy val resolved: Boolean = childrenResolved && child.output.zip(table.output).forall {
val numDynamicPartitions = partition.values.count(_.isEmpty)

// This is the expected schema of the table prepared to be inserted into,
// including dynamic partition columns.
val tableOutput = table.attributes ++ table.partitionKeys.takeRight(numDynamicPartitions)

override lazy val resolved: Boolean = childrenResolved && child.output.zip(tableOutput).forall {
case (childAttr, tableAttr) => childAttr.dataType.sameType(tableAttr.dataType)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars

import org.apache.spark.{SparkFiles, SparkException}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.test.TestHive
Expand Down Expand Up @@ -418,6 +419,25 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
|SELECT * FROM createdtable;
""".stripMargin)

test("SPARK-7270: consider dynamic partition when comparing table output") {
sql(s"CREATE TABLE test_partition (a STRING) PARTITIONED BY (b BIGINT, c STRING)")
sql(s"CREATE TABLE ptest (a STRING, b BIGINT, c STRING)")

val analyzedPlan = sql(
"""
|INSERT OVERWRITE table test_partition PARTITION (b=1, c)
|SELECT 'a', 'c' from ptest
""".stripMargin).queryExecution.analyzed

assertResult(false, "Incorrect cast detected\n" + analyzedPlan) {
var hasCast = false
analyzedPlan.collect {
case p: Project => p.transformExpressionsUp { case c: Cast => hasCast = true; c }
}
hasCast
}
}

createQueryTest("transform",
"SELECT TRANSFORM (key) USING 'cat' AS (tKey) FROM src")

Expand Down