-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Spark: SQL extention to update partition field atomically #2365
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
I considered adding similar syntax when I added the ADD and DROP DDL. I think the problem with this is that it implies that the original field is changed, rather than dropped and added. In format v2, we can sort of do that by replacing the field and using the previous field's name, but even then the values from the old field will no longer appear. In v1, a drop actually replaces the field with a One option to fix this is to use |
|
@rdblue yes I agree CHANGE might cause some wrong interpretation, but I don't think it is a big issue for end users, especially people who are only interacting through SQL. If they see the I am good with REPLACE, it is also a part of HiveQL, and Athena also supports it for column updates. |
| : CALL multipartIdentifier '(' (callArgument (',' callArgument)*)? ')' #call | ||
| | ALTER TABLE multipartIdentifier ADD PARTITION FIELD transform (AS name=identifier)? #addPartitionField | ||
| | ALTER TABLE multipartIdentifier DROP PARTITION FIELD transform #dropPartitionField | ||
| | ALTER TABLE multipartIdentifier REPLACE PARTITION FIELD transform TO transform (AS name=identifier)? #replacePartitionField |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
REPLACE ... TO doesn't make sense. What about REPLACE ... WITH instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, why support AS? Shouldn't the new partition field use the same name as the old partition field?
I would also expect using a partition field name to work. So if I used bucket(16, id) AS shard to create the partition, then I should also be able to use REPLACE shard WITH bucket(32, id).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the implementation supports looking up the field by name, and AS can support rename, like REPLACE ts_day WITH hour(ts) AS ts_hour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are 2 use cases that have contradicting behaviors:
ADD PARTITION FIELD bucket(id, 16) AS shard, thenREPLACE PARTITION FIELD shard WITH bucket(id, 32)ADD PARTITION FIELD days(ts) AS days_col, thenREPLACE PARTITION FIELD days_col WITH hours(ts)
For case 1, we do want the bucket(id, 32) to also be called shard, but we don't really want to call the hours(ts) partition as days_col.
So here are a couple of observations for REPLACE transformFrom WITH transformTo:
- if
transformFromis an expression, the default partition field has very specific meanings such asts_days,id_bucket_16, and the replaced partition fieldtransformToshould not inherit that name - if there is a custom name for the
transformFrompartition field, the behavior really depends. The 2 examples above shows this contradicting expectations.
So I think the safest approach is to not infer the behavior for the custom partition name. If the caller wants to use the same name, just use the AS clause to specify it again, such as REPLACE PARTITION FIELD shard WITH bucket(id, 32) AS shard.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently we strictly disallow rename -> delete renamed field -> add field with the same name in a single commit for use case 1, but I think that sounds like something we can support in BaseUpdatePartitionSpec, I am thinking about any consequence of it, will update later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you make a good argument that names should not be reused automatically. If the caller wants to use the same name, then it isn't hard to put the name twice: REPLACE shard WITH bucket(32, id) AS shard. Then all we need to support is drop and add with the same name. I think that's simpler to implement and works well.
| import org.apache.spark.sql.connector.expressions.Transform | ||
|
|
||
| case class ReplacePartitionField( | ||
| table: Seq[String], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we normally use 2 indents for arguments in Scala?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah sorry my IDE was configured wrongly for indentation, let me fix it. We should probably add checkstyle rules for this
| case iceberg: SparkTable => | ||
| val schema = iceberg.table.schema | ||
| transformFrom match { | ||
| case IdentityTransform(FieldReference(parts)) if parts.size == 1 && schema.findField(parts.head) == null => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it work to match on FieldReference(Seq(name)) instead of checking parts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current approach is used to be consistent with https://github.com/apache/iceberg/blob/master/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropPartitionFieldExec.scala#L45, is there any consideration to deviate from that logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just thought it might be shorter. Not a problem to have it this way.
| case _ => | ||
| iceberg.table.updateSpec() | ||
| .addField(name.orNull, Spark3Util.toIcebergTerm(transformTo)) | ||
| .removeField(Spark3Util.toIcebergTerm(transformFrom)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't adding and then removing cause a duplicate in the intermediate state? The result of multiple API calls should be the same as the result of multiple commits with a single call. So I think it is safer to remove the term and then add it after.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes you are correct, I added a test for it.
| .build(); | ||
| Assert.assertEquals("Should have new spec field", expected, table.spec()); | ||
|
|
||
| sql("ALTER TABLE %s REPLACE PARTITION FIELD days(ts) TO hours(ts) AS hour_col", tableName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to see a test for the AS use case.
...ensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java
Show resolved
Hide resolved
|
|
||
| override def simpleString(maxFields: Int): String = { | ||
| s"ReplacePartitionField ${table.quoted} ${transformFrom.describe} " + | ||
| s"to ${name.map(n => s"$n=").getOrElse("")}${transformTo.describe}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this also be "with" instead of "to"?
|
This looks ready to go to me. I think there's a minor typo in the |
|
@rdblue thanks for the review, I updated the simple string. |
|
Merged. Thanks @jackye1995! |
* Add 0.12.0 release notes pt 2 * Add more blurbs and fix formatting. - Add blurbs for #2565, #2583, and #2547. - Make formatting consistent. * Add blurb for #2613 Hive Vectorized Reader * Reword blurbs for #2565 and #2365 * More changes based on review comments * More updates to the 0.12.0 release notes * Add blurb for #2232 fix parquet row group filters * Add blurb for #2308
I received some feedback from users about the current Spark SQL extension not able to directly update partition field. Currently it has to first drop and then add the new field, which (1) is not straight-forward for the common use case that updates the granularity of timestamp or bucket transform, (2) creates a time period between 2 commits that is not locked and might cause writer to write data with a wrong partition spec.
This PR introduces the syntax of
ALTER TABLE table CHANGE PARTITION FIELD transform TO transformthat drops the old transform and adds the new transform in a single commit to solve the issue above.There is no similar syntax as reference in other systems, Delta lake took the route of directly adding or dropping the entire partition spec so I could not use that as a basis. I chose the current syntax based on the following reasons:
CHANGEis chosen based on the Hive syntax ofCHANGE COLUMN col ..., I think we might be able to reuse this keyword in the future for column DDL extensions.TOis chosen to be consistent with a similar syntax forRENAME col TO col