Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal.SQLConf

trait HoodieCatalystPlansUtils {
Expand Down Expand Up @@ -111,8 +112,10 @@ trait HoodieCatalystPlansUtils {
/**
* Decomposes [[InsertIntoStatement]] into its arguments allowing to accommodate for API
* changes in Spark 3.3
* @return a option tuple with (table logical plan, userSpecifiedCols, partitionSpec, query, overwrite, ifPartitionNotExists)
* userSpecifiedCols: only than the version of Spark32 will return, other is empty
*/
def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)]
def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan, Boolean, Boolean)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a doc for the new param?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


/**
* Decomposes [[CreateTableLikeCommand]] into its arguments allowing to accommodate for API
Expand Down Expand Up @@ -147,4 +150,9 @@ trait HoodieCatalystPlansUtils {
* true if both plans produce the same attributes in the same order
*/
def produceSameOutput(a: LogicalPlan, b: LogicalPlan): Boolean

/**
* Add a project to use the table column names for INSERT INTO BY NAME with specified cols
*/
def createProjectForByNameQuery(lr: LogicalRelation, plan: LogicalPlan): Option[LogicalPlan]
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

package org.apache.spark.sql.hudi.analysis

import org.apache.hudi.common.util.ReflectionUtils
import org.apache.hudi.common.util.{ReflectionUtils, ValidationUtils}
import org.apache.hudi.common.util.ReflectionUtils.loadClass
import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport}

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
Expand All @@ -37,7 +36,6 @@ import org.apache.spark.sql.hudi.command.procedures.{HoodieProcedures, Procedure
import org.apache.spark.sql.{AnalysisException, SparkSession}

import java.util

import scala.collection.mutable.ListBuffer

object HoodieAnalysis extends SparkAdapterSupport {
Expand Down Expand Up @@ -252,7 +250,7 @@ object HoodieAnalysis extends SparkAdapterSupport {

// NOTE: In case of [[InsertIntoStatement]] Hudi tables could be on both sides -- receiving and providing
// the data, as such we have to make sure that we handle both of these cases
case iis @ MatchInsertIntoStatement(targetTable, _, query, _, _) =>
case iis @ MatchInsertIntoStatement(targetTable, _, _, query, _, _) =>
val updatedTargetTable = targetTable match {
// In the receiving side of the IIS, we can't project meta-field attributes out,
// and instead have to explicitly remove them
Expand Down Expand Up @@ -355,7 +353,7 @@ object HoodieAnalysis extends SparkAdapterSupport {
}

private[sql] object MatchInsertIntoStatement {
def unapply(plan: LogicalPlan): Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] =
def unapply(plan: LogicalPlan): Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] =
sparkAdapter.getCatalystPlanUtils.unapplyInsertIntoStatement(plan)
}

Expand Down Expand Up @@ -408,12 +406,20 @@ case class ResolveImplementationsEarly() extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan match {
// Convert to InsertIntoHoodieTableCommand
case iis @ MatchInsertIntoStatement(relation @ ResolvesToHudiTable(_), partition, query, overwrite, _) if query.resolved =>
case iis @ MatchInsertIntoStatement(relation @ ResolvesToHudiTable(_), userSpecifiedCols, partition, query, overwrite, _) if query.resolved =>
relation match {
// NOTE: In Spark >= 3.2, Hudi relations will be resolved as [[DataSourceV2Relation]]s by default;
// However, currently, fallback will be applied downgrading them to V1 relations, hence
// we need to check whether we could proceed here, or has to wait until fallback rule kicks in
case lr: LogicalRelation => new InsertIntoHoodieTableCommand(lr, query, partition, overwrite)
case lr: LogicalRelation =>
// Create a project if this is an INSERT INTO query with specified cols.
val projectByUserSpecified = if (userSpecifiedCols.nonEmpty) {
ValidationUtils.checkState(lr.catalogTable.isDefined, "Missing catalog table")
sparkAdapter.getCatalystPlanUtils.createProjectForByNameQuery(lr, iis)
} else {
None
}
new InsertIntoHoodieTableCommand(lr, projectByUserSpecified.getOrElse(query), partition, overwrite)
case _ => iis
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2826,4 +2826,163 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
spark.sessionState.conf.unsetConf("hoodie.datasource.insert.dup.policy")
spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation")
}

test("Test insert into with special cols") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

special -> specified

withTempDir { tmp =>
if (HoodieSparkUtils.gteqSpark3_2) {
val targetTableA = generateTableName
val tablePathA = s"${tmp.getCanonicalPath}/$targetTableA"
if (HoodieSparkUtils.isSpark3_4) {
spark.sql("set spark.sql.defaultColumn.enabled = false")
}

spark.sql(
s"""
|create table if not exists $targetTableA (
| id bigint,
| name string,
| price double
|) using hudi
|tblproperties (
| primaryKey = 'id',
| type = 'mor',
| preCombineField = 'name'
|) location '$tablePathA'
|""".stripMargin)

spark.sql(s"insert into $targetTableA (id, price, name) values (1, 12.1, 'aaa')")

checkAnswer(s"select id, price, name from $targetTableA")(
Seq(1, 12.1, "aaa")
)

val targetTableB = generateTableName
val tablePathB = s"${tmp.getCanonicalPath}/$targetTableB"

spark.sql(
s"""
|create table if not exists $targetTableB (
| id bigint,
| name string,
| price double,
| day string,
| hour string
|) using hudi
|tblproperties (
| primaryKey = 'id',
| type = 'mor',
| preCombineField = 'name'
|) partitioned by (day, hour)
|location '$tablePathB'
|""".stripMargin)

spark.sql(s"insert into $targetTableB (id, day, price, name, hour) " +
s"values (2, '01', 12.2, 'bbb', '02')")

spark.sql(s"insert into $targetTableB (id, day, price, name, hour) " +
s"select id, '01' as dt, price, name, '03' as hour from $targetTableA")

spark.sql(s"insert into $targetTableB partition(day='02', hour) (id, hour, price, name) " +
s"values (3, '01', 12.3, 'ccc')")

spark.sql(s"insert into $targetTableB partition(day='02', hour='02') (id, price, name) " +
s"values (4, 12.4, 'ddd')")

checkAnswer(s"select id, price, name, day, hour from $targetTableB")(
Seq(2, 12.2, "bbb", "01", "02"),
Seq(1, 12.1, "aaa", "01", "03"),
Seq(3, 12.3, "ccc", "02", "01"),
Seq(4, 12.4, "ddd", "02", "02")
)

if (HoodieSparkUtils.isSpark3_4) {
spark.sql("set spark.sql.defaultColumn.enabled = true")
checkExceptionContain(s"insert into $targetTableB (id, day, price, name, hour) " +
s"select id, '01' as dt, price, name, '03' as hour from $targetTableA")(
"hudi not support specified cols when enable default columns")
}
}
}
}

test("Test insert overwrite with special cols") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

withTempDir { tmp =>
if (HoodieSparkUtils.gteqSpark3_2) {
val targetTableA = generateTableName
val tablePathA = s"${tmp.getCanonicalPath}/$targetTableA"
if (HoodieSparkUtils.isSpark3_4) {
spark.sql("set spark.sql.defaultColumn.enabled = false")
}

spark.sql(
s"""
|create table if not exists $targetTableA (
| id bigint,
| name string,
| price double
|) using hudi
|tblproperties (
| primaryKey = 'id',
| type = 'mor',
| preCombineField = 'name'
|) location '$tablePathA'
|""".stripMargin)

spark.sql(s"insert overwrite $targetTableA (id, price, name) values (1, 12.1, 'aaa')")

checkAnswer(s"select id, price, name from $targetTableA")(
Seq(1, 12.1, "aaa")
)

val targetTableB = generateTableName
val tablePathB = s"${tmp.getCanonicalPath}/$targetTableB"

spark.sql(
s"""
|create table if not exists $targetTableB (
| id bigint,
| name string,
| price double,
| day string,
| hour string
|) using hudi
|tblproperties (
| primaryKey = 'id',
| type = 'mor',
| preCombineField = 'name'
|) partitioned by (day, hour)
|location '$tablePathB'
|""".stripMargin)

spark.sql(s"insert overwrite $targetTableB (id, day, price, name, hour) " +
s"values (2, '01', 12.2, 'bbb', '02')")

checkAnswer(s"select id, price, name, day, hour from $targetTableB")(
Seq(2, 12.2, "bbb", "01", "02")
)

spark.sql(s"insert overwrite $targetTableB (id, day, price, name, hour) " +
s"select id, '01' as dt, price, name, '03' as hour from $targetTableA")

spark.sql(s"insert overwrite $targetTableB partition(day='02', hour) (id, hour, price, name) " +
s"values (3, '01', 12.3, 'ccc')")

spark.sql(s"insert overwrite $targetTableB partition(day='02', hour='02') (id, price, name) " +
s"values (4, 12.4, 'ddd')")

checkAnswer(s"select id, price, name, day, hour from $targetTableB")(
Seq(1, 12.1, "aaa", "01", "03"),
Seq(3, 12.3, "ccc", "02", "01"),
Seq(4, 12.4, "ddd", "02", "02")
)

if (HoodieSparkUtils.isSpark3_4) {
spark.sql("set spark.sql.defaultColumn.enabled = true")
checkExceptionContain(s"insert overwrite $targetTableB (id, day, price, name, hour) " +
s"select id, '01' as dt, price, name, '03' as hour from $targetTableA")(
"hudi not support specified cols when enable default columns")
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ object HoodieSpark2CatalystPlanUtils extends HoodieCatalystPlansUtils {
Join(left, right, joinType, None)
}

override def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = {
override def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = {
plan match {
case InsertIntoTable(table, partition, query, overwrite, ifPartitionNotExists) =>
Some((table, partition, query, overwrite, ifPartitionNotExists))
Some((table, Seq.empty, partition, query, overwrite, ifPartitionNotExists))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should log some msg here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it unnecessary, because it is not supported at the sql grammar level

case _ => None
}
}
Expand Down Expand Up @@ -129,4 +129,6 @@ object HoodieSpark2CatalystPlanUtils extends HoodieCatalystPlansUtils {
override def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan, Seq[Attribute])] = None

override def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan, String)] = None

override def createProjectForByNameQuery(lr: LogicalRelation, plan: LogicalPlan): Option[LogicalPlan] = None
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, J
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.execution.{ExtendedMode, SimpleMode}
import org.apache.spark.sql.execution.command.{CreateTableLikeCommand, ExplainCommand}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -56,15 +57,6 @@ trait HoodieSpark3CatalystPlanUtils extends HoodieCatalystPlansUtils {
Join(left, right, joinType, None, JoinHint.NONE)
}

override def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = {
plan match {
case insert: InsertIntoStatement =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this impl?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

every subclass has it own impl, so I remove it

Some((insert.table, insert.partitionSpec, insert.query, insert.overwrite, insert.ifPartitionNotExists))
case _ =>
None
}
}


override def unapplyCreateTableLikeCommand(plan: LogicalPlan): Option[(TableIdentifier, TableIdentifier, CatalogStorageFormat, Option[String], Map[String, String], Boolean)] = {
plan match {
Expand All @@ -84,6 +76,8 @@ trait HoodieSpark3CatalystPlanUtils extends HoodieCatalystPlansUtils {
override def produceSameOutput(a: LogicalPlan, b: LogicalPlan): Boolean = {
a.sameOutput(b)
}

override def createProjectForByNameQuery(lr: LogicalRelation, plan: LogicalPlan): Option[LogicalPlan] = None
}

object HoodieSpark3CatalystPlanUtils extends SparkAdapterSupport {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.planning.ScanOperation
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, MergeIntoTable}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, LogicalPlan, MergeIntoTable}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait, ParquetFileFormat}
Expand Down Expand Up @@ -82,4 +82,13 @@ object HoodieSpark30CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
override def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan, Seq[Attribute])] = None

override def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan, String)] = None

override def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = {
plan match {
case insert: InsertIntoStatement =>
Some((insert.table, Seq.empty, insert.partitionSpec, insert.query, insert.overwrite, insert.ifPartitionNotExists))
case _ =>
None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@

package org.apache.spark.sql

import org.apache.hudi.SparkHoodieTableFileIndex
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.planning.ScanOperation
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, MergeIntoTable, Project}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, LogicalPlan, MergeIntoTable}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
import org.apache.spark.sql.execution.datasources.parquet.{HoodieFormatTrait, ParquetFileFormat}
Expand Down Expand Up @@ -83,4 +82,13 @@ object HoodieSpark31CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
override def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan, Seq[Attribute])] = None

override def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan, String)] = None

override def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = {
plan match {
case insert: InsertIntoStatement =>
Some((insert.table, Seq.empty, insert.partitionSpec, insert.query, insert.overwrite, insert.ifPartitionNotExists))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log some msg here?

case _ =>
None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,22 @@ object HoodieSpark32CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
None
}
}

override def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = {
plan match {
case insert: InsertIntoStatement =>
Some((insert.table, insert.userSpecifiedCols, insert.partitionSpec, insert.query, insert.overwrite, insert.ifPartitionNotExists))
case _ =>
None
}
}

override def createProjectForByNameQuery(lr: LogicalRelation, plan: LogicalPlan): Option[LogicalPlan] = {
plan match {
case insert: InsertIntoStatement =>
Some(ResolveInsertionBase.createProjectForByNameQuery(lr.catalogTable.get.qualifiedName, insert))
case _ =>
None
}
}
}
Loading