diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala index 40e62ddd0efc0..42dcc02cca439 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan} +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.internal.SQLConf trait HoodieCatalystPlansUtils { @@ -111,8 +112,10 @@ trait HoodieCatalystPlansUtils { /** * Decomposes [[InsertIntoStatement]] into its arguments allowing to accommodate for API * changes in Spark 3.3 + * @return a option tuple with (table logical plan, userSpecifiedCols, partitionSpec, query, overwrite, ifPartitionNotExists) + * userSpecifiedCols: only than the version of Spark32 will return, other is empty */ - def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] + def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] /** * Decomposes [[CreateTableLikeCommand]] into its arguments allowing to accommodate for API @@ -147,4 +150,9 @@ trait HoodieCatalystPlansUtils { * true if both plans produce the same attributes in the same order */ def produceSameOutput(a: LogicalPlan, b: LogicalPlan): Boolean + + /** + * Add a project to use the table column names for INSERT INTO BY NAME with specified cols + */ + def createProjectForByNameQuery(lr: LogicalRelation, plan: LogicalPlan): Option[LogicalPlan] } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index aef98b4e91d86..e6c7b8c1ab073 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -17,10 +17,9 @@ package org.apache.spark.sql.hudi.analysis -import org.apache.hudi.common.util.ReflectionUtils +import org.apache.hudi.common.util.{ReflectionUtils, ValidationUtils} import org.apache.hudi.common.util.ReflectionUtils.loadClass import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport} - import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable} @@ -37,7 +36,6 @@ import org.apache.spark.sql.hudi.command.procedures.{HoodieProcedures, Procedure import org.apache.spark.sql.{AnalysisException, SparkSession} import java.util - import scala.collection.mutable.ListBuffer object HoodieAnalysis extends SparkAdapterSupport { @@ -252,7 +250,7 @@ object HoodieAnalysis extends SparkAdapterSupport { // NOTE: In case of [[InsertIntoStatement]] Hudi tables could be on both sides -- receiving and providing // the data, as such we have to make sure that we handle both of these cases - case iis @ MatchInsertIntoStatement(targetTable, _, query, _, _) => + case iis @ MatchInsertIntoStatement(targetTable, _, _, query, _, _) => val updatedTargetTable = targetTable match { // In the receiving side of the IIS, we can't project meta-field attributes out, // and instead have to explicitly remove them @@ -355,7 +353,7 @@ object HoodieAnalysis extends SparkAdapterSupport { } private[sql] object MatchInsertIntoStatement { - def unapply(plan: LogicalPlan): Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = + def unapply(plan: LogicalPlan): Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = sparkAdapter.getCatalystPlanUtils.unapplyInsertIntoStatement(plan) } @@ -408,12 +406,20 @@ case class ResolveImplementationsEarly() extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { plan match { // Convert to InsertIntoHoodieTableCommand - case iis @ MatchInsertIntoStatement(relation @ ResolvesToHudiTable(_), partition, query, overwrite, _) if query.resolved => + case iis @ MatchInsertIntoStatement(relation @ ResolvesToHudiTable(_), userSpecifiedCols, partition, query, overwrite, _) if query.resolved => relation match { // NOTE: In Spark >= 3.2, Hudi relations will be resolved as [[DataSourceV2Relation]]s by default; // However, currently, fallback will be applied downgrading them to V1 relations, hence // we need to check whether we could proceed here, or has to wait until fallback rule kicks in - case lr: LogicalRelation => new InsertIntoHoodieTableCommand(lr, query, partition, overwrite) + case lr: LogicalRelation => + // Create a project if this is an INSERT INTO query with specified cols. + val projectByUserSpecified = if (userSpecifiedCols.nonEmpty) { + ValidationUtils.checkState(lr.catalogTable.isDefined, "Missing catalog table") + sparkAdapter.getCatalystPlanUtils.createProjectForByNameQuery(lr, iis) + } else { + None + } + new InsertIntoHoodieTableCommand(lr, projectByUserSpecified.getOrElse(query), partition, overwrite) case _ => iis } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala index 0b1d4ca8999f0..f53db8e58194d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala @@ -2826,4 +2826,163 @@ class TestInsertTable extends HoodieSparkSqlTestBase { spark.sessionState.conf.unsetConf("hoodie.datasource.insert.dup.policy") spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation") } + + test("Test insert into with special cols") { + withTempDir { tmp => + if (HoodieSparkUtils.gteqSpark3_2) { + val targetTableA = generateTableName + val tablePathA = s"${tmp.getCanonicalPath}/$targetTableA" + if (HoodieSparkUtils.isSpark3_4) { + spark.sql("set spark.sql.defaultColumn.enabled = false") + } + + spark.sql( + s""" + |create table if not exists $targetTableA ( + | id bigint, + | name string, + | price double + |) using hudi + |tblproperties ( + | primaryKey = 'id', + | type = 'mor', + | preCombineField = 'name' + |) location '$tablePathA' + |""".stripMargin) + + spark.sql(s"insert into $targetTableA (id, price, name) values (1, 12.1, 'aaa')") + + checkAnswer(s"select id, price, name from $targetTableA")( + Seq(1, 12.1, "aaa") + ) + + val targetTableB = generateTableName + val tablePathB = s"${tmp.getCanonicalPath}/$targetTableB" + + spark.sql( + s""" + |create table if not exists $targetTableB ( + | id bigint, + | name string, + | price double, + | day string, + | hour string + |) using hudi + |tblproperties ( + | primaryKey = 'id', + | type = 'mor', + | preCombineField = 'name' + |) partitioned by (day, hour) + |location '$tablePathB' + |""".stripMargin) + + spark.sql(s"insert into $targetTableB (id, day, price, name, hour) " + + s"values (2, '01', 12.2, 'bbb', '02')") + + spark.sql(s"insert into $targetTableB (id, day, price, name, hour) " + + s"select id, '01' as dt, price, name, '03' as hour from $targetTableA") + + spark.sql(s"insert into $targetTableB partition(day='02', hour) (id, hour, price, name) " + + s"values (3, '01', 12.3, 'ccc')") + + spark.sql(s"insert into $targetTableB partition(day='02', hour='02') (id, price, name) " + + s"values (4, 12.4, 'ddd')") + + checkAnswer(s"select id, price, name, day, hour from $targetTableB")( + Seq(2, 12.2, "bbb", "01", "02"), + Seq(1, 12.1, "aaa", "01", "03"), + Seq(3, 12.3, "ccc", "02", "01"), + Seq(4, 12.4, "ddd", "02", "02") + ) + + if (HoodieSparkUtils.isSpark3_4) { + spark.sql("set spark.sql.defaultColumn.enabled = true") + checkExceptionContain(s"insert into $targetTableB (id, day, price, name, hour) " + + s"select id, '01' as dt, price, name, '03' as hour from $targetTableA")( + "hudi not support specified cols when enable default columns") + } + } + } + } + + test("Test insert overwrite with special cols") { + withTempDir { tmp => + if (HoodieSparkUtils.gteqSpark3_2) { + val targetTableA = generateTableName + val tablePathA = s"${tmp.getCanonicalPath}/$targetTableA" + if (HoodieSparkUtils.isSpark3_4) { + spark.sql("set spark.sql.defaultColumn.enabled = false") + } + + spark.sql( + s""" + |create table if not exists $targetTableA ( + | id bigint, + | name string, + | price double + |) using hudi + |tblproperties ( + | primaryKey = 'id', + | type = 'mor', + | preCombineField = 'name' + |) location '$tablePathA' + |""".stripMargin) + + spark.sql(s"insert overwrite $targetTableA (id, price, name) values (1, 12.1, 'aaa')") + + checkAnswer(s"select id, price, name from $targetTableA")( + Seq(1, 12.1, "aaa") + ) + + val targetTableB = generateTableName + val tablePathB = s"${tmp.getCanonicalPath}/$targetTableB" + + spark.sql( + s""" + |create table if not exists $targetTableB ( + | id bigint, + | name string, + | price double, + | day string, + | hour string + |) using hudi + |tblproperties ( + | primaryKey = 'id', + | type = 'mor', + | preCombineField = 'name' + |) partitioned by (day, hour) + |location '$tablePathB' + |""".stripMargin) + + spark.sql(s"insert overwrite $targetTableB (id, day, price, name, hour) " + + s"values (2, '01', 12.2, 'bbb', '02')") + + checkAnswer(s"select id, price, name, day, hour from $targetTableB")( + Seq(2, 12.2, "bbb", "01", "02") + ) + + spark.sql(s"insert overwrite $targetTableB (id, day, price, name, hour) " + + s"select id, '01' as dt, price, name, '03' as hour from $targetTableA") + + spark.sql(s"insert overwrite $targetTableB partition(day='02', hour) (id, hour, price, name) " + + s"values (3, '01', 12.3, 'ccc')") + + spark.sql(s"insert overwrite $targetTableB partition(day='02', hour='02') (id, price, name) " + + s"values (4, 12.4, 'ddd')") + + checkAnswer(s"select id, price, name, day, hour from $targetTableB")( + Seq(1, 12.1, "aaa", "01", "03"), + Seq(3, 12.3, "ccc", "02", "01"), + Seq(4, 12.4, "ddd", "02", "02") + ) + + if (HoodieSparkUtils.isSpark3_4) { + spark.sql("set spark.sql.defaultColumn.enabled = true") + checkExceptionContain(s"insert overwrite $targetTableB (id, day, price, name, hour) " + + s"select id, '01' as dt, price, name, '03' as hour from $targetTableA")( + "hudi not support specified cols when enable default columns") + } + } + } + } } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala index 9f3a5ce03a808..c7ed7928f0564 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala @@ -61,10 +61,10 @@ object HoodieSpark2CatalystPlanUtils extends HoodieCatalystPlansUtils { Join(left, right, joinType, None) } - override def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = { + override def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = { plan match { case InsertIntoTable(table, partition, query, overwrite, ifPartitionNotExists) => - Some((table, partition, query, overwrite, ifPartitionNotExists)) + Some((table, Seq.empty, partition, query, overwrite, ifPartitionNotExists)) case _ => None } } @@ -129,4 +129,6 @@ object HoodieSpark2CatalystPlanUtils extends HoodieCatalystPlansUtils { override def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan, Seq[Attribute])] = None override def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan, String)] = None + + override def createProjectForByNameQuery(lr: LogicalRelation, plan: LogicalPlan): Option[LogicalPlan] = None } diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala index a0938b94671b5..4faf1437b521d 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, J import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} import org.apache.spark.sql.execution.{ExtendedMode, SimpleMode} import org.apache.spark.sql.execution.command.{CreateTableLikeCommand, ExplainCommand} +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -56,15 +57,6 @@ trait HoodieSpark3CatalystPlanUtils extends HoodieCatalystPlansUtils { Join(left, right, joinType, None, JoinHint.NONE) } - override def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = { - plan match { - case insert: InsertIntoStatement => - Some((insert.table, insert.partitionSpec, insert.query, insert.overwrite, insert.ifPartitionNotExists)) - case _ => - None - } - } - override def unapplyCreateTableLikeCommand(plan: LogicalPlan): Option[(TableIdentifier, TableIdentifier, CatalogStorageFormat, Option[String], Map[String, String], Boolean)] = { plan match { @@ -84,6 +76,8 @@ trait HoodieSpark3CatalystPlanUtils extends HoodieCatalystPlansUtils { override def produceSameOutput(a: LogicalPlan, b: LogicalPlan): Boolean = { a.sameOutput(b) } + + override def createProjectForByNameQuery(lr: LogicalRelation, plan: LogicalPlan): Option[LogicalPlan] = None } object HoodieSpark3CatalystPlanUtils extends SparkAdapterSupport { diff --git a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala index dbe68bb33e5fb..1ea04b512e635 100644 --- a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/HoodieSpark30CatalystPlanUtils.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.ResolvedTable import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, ProjectionOverSchema} import org.apache.spark.sql.catalyst.planning.ScanOperation -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, MergeIntoTable} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, LogicalPlan, MergeIntoTable} import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait, ParquetFileFormat} @@ -82,4 +82,13 @@ object HoodieSpark30CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { override def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan, Seq[Attribute])] = None override def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan, String)] = None + + override def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = { + plan match { + case insert: InsertIntoStatement => + Some((insert.table, Seq.empty, insert.partitionSpec, insert.query, insert.overwrite, insert.ifPartitionNotExists)) + case _ => + None + } + } } diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala index 765a9b06de52d..7462d41d299ab 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala @@ -18,12 +18,11 @@ package org.apache.spark.sql -import org.apache.hudi.SparkHoodieTableFileIndex import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.ResolvedTable import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, ProjectionOverSchema} import org.apache.spark.sql.catalyst.planning.ScanOperation -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, MergeIntoTable, Project} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, LogicalPlan, MergeIntoTable} import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait, ParquetFileFormat} @@ -83,4 +82,13 @@ object HoodieSpark31CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { override def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan, Seq[Attribute])] = None override def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan, String)] = None + + override def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = { + plan match { + case insert: InsertIntoStatement => + Some((insert.table, Seq.empty, insert.partitionSpec, insert.query, insert.overwrite, insert.ifPartitionNotExists)) + case _ => + None + } + } } diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala index 8562ca14d3e8b..2f147b89b80d1 100644 --- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala @@ -127,4 +127,22 @@ object HoodieSpark32CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { None } } + + override def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = { + plan match { + case insert: InsertIntoStatement => + Some((insert.table, insert.userSpecifiedCols, insert.partitionSpec, insert.query, insert.overwrite, insert.ifPartitionNotExists)) + case _ => + None + } + } + + override def createProjectForByNameQuery(lr: LogicalRelation, plan: LogicalPlan): Option[LogicalPlan] = { + plan match { + case insert: InsertIntoStatement => + Some(ResolveInsertionBase.createProjectForByNameQuery(lr.catalogTable.get.qualifiedName, insert)) + case _ => + None + } + } } diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/ResolveInsertionBase.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/ResolveInsertionBase.scala new file mode 100644 index 0000000000000..868e225476d0e --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/ResolveInsertionBase.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.hudi.SparkAdapterSupport +import org.apache.spark.sql.catalyst.analysis.Resolver +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, LogicalPlan, Project} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +/** + * this class is similar with spark::ResolveInsertionBase in spark3.4 + */ +object ResolveInsertionBase extends SparkAdapterSupport { + def resolver: Resolver = SQLConf.get.resolver + + /** Add a project to use the table column names for INSERT INTO BY NAME */ + def createProjectForByNameQuery(tblName: String, + i: InsertIntoStatement): LogicalPlan = { + sparkAdapter.getSchemaUtils.checkColumnNameDuplication(i.userSpecifiedCols, + "in the table definition of " + tblName, SQLConf.get.caseSensitiveAnalysis) + + if (i.userSpecifiedCols.size != i.query.output.size) { + if (i.userSpecifiedCols.size > i.query.output.size) { + throw cannotWriteNotEnoughColumnsToTableError( + tblName, i.userSpecifiedCols, i.query.output) + } else { + throw cannotWriteNotEnoughColumnsToTableError( + tblName, i.userSpecifiedCols, i.query.output) + } + } + val projectByName = i.userSpecifiedCols.zip(i.query.output) + .map { case (userSpecifiedCol, queryOutputCol) => + val resolvedCol = i.table.resolve(Seq(userSpecifiedCol), resolver) + .getOrElse( + throw unresolvedAttributeError( + "UNRESOLVED_COLUMN", userSpecifiedCol, i.table.output.map(_.name))) + (queryOutputCol.dataType, resolvedCol.dataType) match { + case (input: StructType, expected: StructType) => + // Rename inner fields of the input column to pass the by-name INSERT analysis. + Alias(Cast(queryOutputCol, renameFieldsInStruct(input, expected)), resolvedCol.name)() + case _ => + Alias(queryOutputCol, resolvedCol.name)() + } + } + Project(projectByName, i.query) + } + + private def renameFieldsInStruct(input: StructType, expected: StructType): StructType = { + if (input.length == expected.length) { + val newFields = input.zip(expected).map { case (f1, f2) => + (f1.dataType, f2.dataType) match { + case (s1: StructType, s2: StructType) => + f1.copy(name = f2.name, dataType = renameFieldsInStruct(s1, s2)) + case _ => + f1.copy(name = f2.name) + } + } + StructType(newFields) + } else { + input + } + } + + def cannotWriteNotEnoughColumnsToTableError(tableName: String, + expected: Seq[String], + queryOutput: Seq[Attribute]): Throwable = { + new AnalysisException("table: " + tableName + "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS, expect: " + + expected.mkString(",") + " queryOutput: " + queryOutput.map(attr => attr.name).mkString(",")) + } + + def unresolvedAttributeError(errorClass: String, + colName: String, + candidates: Seq[String]): Throwable = { + new AnalysisException(errorClass + " expect col is : " + + colName + " table output is : " + candidates.mkString(",")) + } +} diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala index 2d9250bbe9eab..651059212cf6f 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala @@ -111,4 +111,22 @@ object HoodieSpark33CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { None } } + + override def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = { + plan match { + case insert: InsertIntoStatement => + Some((insert.table, insert.userSpecifiedCols, insert.partitionSpec, insert.query, insert.overwrite, insert.ifPartitionNotExists)) + case _ => + None + } + } + + override def createProjectForByNameQuery(lr: LogicalRelation, plan: LogicalPlan): Option[LogicalPlan] = { + plan match { + case insert: InsertIntoStatement => + Some(ResolveInsertionBase.createProjectForByNameQuery(lr.catalogTable.get.qualifiedName, insert)) + case _ => + None + } + } } diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala index 2c50d21cbe589..890a7cc821dea 100644 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/HoodieSpark34CatalystPlanUtils.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} import org.apache.spark.sql.execution.command.RepairTableCommand import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait, ParquetFileFormat} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType object HoodieSpark34CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { @@ -115,4 +116,35 @@ object HoodieSpark34CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { None } } + + override def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = { + plan match { + case insert: InsertIntoStatement => + // https://github.com/apache/spark/pull/36077 + // first: in this pr, spark34 support default value for insert into, it will regenerate the user specified cols + // so, no need deal with it in hudi side + // second: in this pr, it will append hoodie meta field with default value, has some bug, it look like be fixed + // in spark35(https://github.com/apache/spark/pull/41262), so if user want specified cols, need disable default feature. + if (SQLConf.get.enableDefaultColumns) { + if (insert.userSpecifiedCols.nonEmpty) { + throw new AnalysisException("hudi not support specified cols when enable default columns, " + + "please disable 'spark.sql.defaultColumn.enabled'") + } + Some((insert.table, Seq.empty, insert.partitionSpec, insert.query, insert.overwrite, insert.ifPartitionNotExists)) + } else { + Some((insert.table, insert.userSpecifiedCols, insert.partitionSpec, insert.query, insert.overwrite, insert.ifPartitionNotExists)) + } + case _ => + None + } + } + + override def createProjectForByNameQuery(lr: LogicalRelation, plan: LogicalPlan): Option[LogicalPlan] = { + plan match { + case insert: InsertIntoStatement => + Some(ResolveInsertionBase.createProjectForByNameQuery(lr.catalogTable.get.qualifiedName, insert)) + case _ => + None + } + } } diff --git a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala index b95ee94e4826a..22316ddacaca0 100644 --- a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/HoodieSpark35CatalystPlanUtils.scala @@ -115,4 +115,22 @@ object HoodieSpark35CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils { None } } + + override def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = { + plan match { + case insert: InsertIntoStatement => + Some((insert.table, insert.userSpecifiedCols, insert.partitionSpec, insert.query, insert.overwrite, insert.ifPartitionNotExists)) + case _ => + None + } + } + + override def createProjectForByNameQuery(lr: LogicalRelation, plan: LogicalPlan): Option[LogicalPlan] = { + plan match { + case insert: InsertIntoStatement => + Some(ResolveInsertionBase.createProjectForByNameQuery(lr.catalogTable.get.qualifiedName, insert)) + case _ => + None + } + } }