-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-7949] insert into hudi table with columns specified #11568
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-7949] insert into hudi table with columns specified #11568
Conversation
| * changes in Spark 3.3 | ||
| */ | ||
| def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] | ||
| def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a doc for the new param?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| case lr: LogicalRelation => | ||
| // Create a project if this is an INSERT INTO query with specified cols. | ||
| val projectByUserSpecified = if (userSpecifiedCols.nonEmpty) { | ||
| assert(lr.catalogTable.isDefined, "Missing catalog table") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use ValidationUtils.checkState instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| plan match { | ||
| case InsertIntoTable(table, partition, query, overwrite, ifPartitionNotExists) => | ||
| Some((table, partition, query, overwrite, ifPartitionNotExists)) | ||
| Some((table, Seq.empty, partition, query, overwrite, ifPartitionNotExists)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think we should log some msg here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it unnecessary, because it is not supported at the sql grammar level
|
|
||
| override def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = { | ||
| plan match { | ||
| case insert: InsertIntoStatement => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why remove this impl?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
every subclass has it own impl, so I remove it
| override def unapplyInsertIntoStatement(plan: LogicalPlan): Option[(LogicalPlan, Seq[String], Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = { | ||
| plan match { | ||
| case insert: InsertIntoStatement => | ||
| Some((insert.table, Seq.empty, insert.partitionSpec, insert.query, insert.overwrite, insert.ifPartitionNotExists)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we log some msg here?
|
@leesf Do you have intreast to reivew this PR? |
| spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation") | ||
| } | ||
|
|
||
| test("Test insert into with special cols") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
special -> specified
| } | ||
| } | ||
|
|
||
| test("Test insert overwrite with special cols") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
LGTM, with minor comments |
|
@KnightChess Looks like this PR does not handle partition spec? Can you please check HUDI-7964? |
|
@codope hi, need specified like |
|
@KnightChess Yes it is not due to this PR. I just tested by creating a parquet table and it's still the same behavior. So, issue is something unrelated to Hudi. You could try as well: This is what directory structure looks like under the base path: |
|
@codope got it o( ̄▽ ̄)o |
|
@codope if specified, look like work well. |


as #11552 describe, hoodie insert not support column specified, support this case:
Change Logs
only support spark version >= 3.2
Projectspark34 not support when default value enable
apache/spark#36077
so, no need deal with it in hudi side
in spark35([SPARK-43742][SQL] Refactor default column value resolution spark#41262), so if user want specified cols in spark34, need disable default feature.
Impact
none
Risk level (write none, low medium or high below)
none
Documentation Update
none
Contributor's checklist