diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index b3896484da178..fd31c56c20e0d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -198,6 +198,16 @@ case class InsertIntoHiveTable( if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) { throw new SparkException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg) } + + // For Hive tables, table.outputSet includes both data columns and partition columns + assert(table.output.size > partition.size) + + val numDataCols = table.output.size - partition.size + if (numDataCols + numDynamicPartitions > child.output.size) { + throw new SparkException("Cannot insert into target table because column number are " + + s"different: Table requires `${numDataCols + numDynamicPartitions}` columns, but the " + + s"input has `${child.output.size}` columns") + } } val jobConf = new JobConf(hadoopConf) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index fae59001b98e1..3ea66a97416d9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -257,6 +257,31 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef } } + test("Dynamic partitioning - not enough input columns") { + withTempDir { tmpDir => + val table = "table_with_partition" + withTable(table) { + sql( + s""" + |CREATE TABLE $table(c1 string) + |PARTITIONED by (p1 string,p2 string) + |location '${tmpDir.toURI.toString}' + """.stripMargin) + + val e = intercept[SparkException] { + sql( + s""" + |INSERT OVERWRITE TABLE $table + |partition (p1='a',p2) + |SELECT 'blarr1' + """.stripMargin) + }.getMessage + assert(e.contains("Cannot insert into target table because column number are " + + "different: Table requires `2` columns, but the input has `1` columns")) + } + } + } + test("Detect table partitioning with correct partition order") { withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) { sql("CREATE TABLE source (id bigint, part2 string, part1 string, data string)") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 0a2bab4f5d1e1..fcb082e4c7662 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -1057,8 +1057,11 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { sql("SET hive.exec.dynamic.partition.mode=nonstrict") sql("CREATE TABLE IF NOT EXISTS withparts LIKE srcpart") - sql("INSERT INTO TABLE withparts PARTITION(ds, hr) SELECT key, value FROM src") - .queryExecution.analyzed + sql( + s""" + |INSERT INTO TABLE withparts PARTITION(ds, hr) + |SELECT key, value, '2008-04-09', '12' FROM src + """.stripMargin).queryExecution.analyzed } assertResult(2, "Duplicated project detected\n" + analyzedPlan) {