diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d1dfdd1f81c58..c9d2021772965 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3409,7 +3409,9 @@ class Analyzer(override val catalogManager: CatalogManager) tableOutput: Seq[Attribute], cols: Seq[NamedExpression], query: LogicalPlan): LogicalPlan = { - if (cols.size != query.output.size) { + // No need to check column size when USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES is enabled, + // since all omitted column(s) will be added back to query automatically during parse. + if (!conf.useNullsForMissingDefaultColumnValues && cols.size != query.output.size) { throw QueryCompilationErrors.writeTableWithMismatchedColumnsError( cols.size, query.output.size, query) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala index 2cd069e5858da..a3bf476af0689 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala @@ -36,7 +36,9 @@ object TableOutputResolver { byName: Boolean, conf: SQLConf): LogicalPlan = { - if (expected.size < query.output.size) { + // No need to check column size when USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES is enabled, + // since all omitted column(s) will be added back to query automatically during parse. + if (!conf.useNullsForMissingDefaultColumnValues && expected.size < query.output.size) { throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError(tableName, expected, query) } @@ -44,7 +46,7 @@ object TableOutputResolver { val resolved: Seq[NamedExpression] = if (byName) { reorderColumnsByName(query.output, expected, conf, errors += _) } else { - if (expected.size > query.output.size) { + if (!conf.useNullsForMissingDefaultColumnValues && expected.size > query.output.size) { throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError( tableName, expected, query) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index af43f8d1c1bd8..ddd8bd83f95a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -381,7 +381,10 @@ object PreprocessTableInsertion extends Rule[LogicalPlan] { val staticPartCols = normalizedPartSpec.filter(_._2.isDefined).keySet val expectedColumns = insert.table.output.filterNot(a => staticPartCols.contains(a.name)) - if (expectedColumns.length != insert.query.schema.length) { + // No need to check column size when USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES is enabled, + // since all omitted column(s) will be added back to query automatically during parse. + if (!conf.useNullsForMissingDefaultColumnValues && + expectedColumns.length != insert.query.schema.length) { throw QueryCompilationErrors.mismatchedInsertedDataColumnNumberError( tblName, insert, staticPartCols) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 2483055880e97..eb0c0082a0fd9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -101,6 +101,26 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { ) } + test("SPARK-38707 Allow user to insert into only certain columns of a table") { + withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") { + withTable("t") { + sql("create table t(i boolean, s bigint) using parquet") + sql("insert into t(i) values(true)") + checkAnswer(sql("select i, s from t"), Seq(Row(true, null))) + } + } + + withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") { + withTable("t") { + sql("create table t(i boolean, s bigint) using parquet") + assert(intercept[AnalysisException] { + sql("insert into t(i) values(true)") + }.getMessage.contains("requires that the data to be inserted have the same number of " + + "columns as the target")) + } + } + } + test("insert into a temp view that does not point to an insertable data source") { import testImplicits._ withTempView("t1", "t2") {