-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark 4.1: Add tests for MERGE INTO schema evolution nested case #15028
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark 4.1: Add tests for MERGE INTO schema evolution nested case #15028
Conversation
|
PTAL @huaxingao @singhpk234 @szehon-ho I went through the discussions in Spark PRs related to this feature. |
szehon-ho
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea , was going to do this, you beat me to it :)
The only missing part here is that there is a flag that needs to be set (as its disabled by default in Spark 4.1.x). We will probably turn it on in later version if there are no complaints on the behavior
42a9fe1 to
acd524a
Compare
|
@szehon-ho Thanks for the original features and tests, This new test is a small tweak based on your existing one.
Please let me know if any further changes are needed. @huaxingao @singhpk234 |
|
PTAL @huaxingao @singhpk234 |
szehon-ho
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks mostly fine, but i wonder if we should hold the pr until 4.1.1 is supported in iceberg
| public void testMergeWithSchemaEvolutionNestedStructSourceHasFewerFields() { | ||
| assumeThat(branch).as("Schema evolution does not work for branches currently").isNull(); | ||
|
|
||
| spark.conf().set("spark.sql.mergeNestedTypeCoercion.enabled", "true"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be in try/catch. Do we have any utility in iceberg spark test cases to wrap code in spark configs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes correct, There is withSQLConf at spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBase.java::186 which handles spark configs required for some action and It handles try block in its implementation.
I took reference from testMergeToWapBranch() at spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java::2914 which implements similar thing.
Ready for review.
|
sorry, just re-read and see test is running already against 4.1.1 , that's great. only one comment |
singhpk234
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM as well, thanks @varun-lakhyani !
| + "{ \"id\": 3, \"s\": { \"c1\": 30, \"c2\": \"c\" } }"); | ||
|
|
||
| // Rows should have null for missing c3 nested field from source | ||
| ImmutableList<Object[]> expectedRows = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, can we move this block in the with? its a bit disjointing to see the expectation before the merge :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iceberg/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
Lines 2914 to 2946 in 4f57687
| public void testMergeToWapBranch() { | |
| assumeThat(branch).as("WAP branch only works for table identifier without branch").isNull(); | |
| createAndInitTable("id INT", "{\"id\": -1}"); | |
| ImmutableList<Object[]> originalRows = ImmutableList.of(row(-1)); | |
| sql( | |
| "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')", | |
| tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED); | |
| spark.range(0, 5).coalesce(1).createOrReplaceTempView("source"); | |
| ImmutableList<Object[]> expectedRows = | |
| ImmutableList.of(row(-1), row(0), row(1), row(2), row(3), row(4)); | |
| withSQLConf( | |
| ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"), | |
| () -> { | |
| sql( | |
| "MERGE INTO %s t USING source s ON t.id = s.id " | |
| + "WHEN MATCHED THEN UPDATE SET *" | |
| + "WHEN NOT MATCHED THEN INSERT *", | |
| tableName); | |
| assertEquals( | |
| "Should have expected rows when reading table", | |
| expectedRows, | |
| sql("SELECT * FROM %s ORDER BY id", tableName)); | |
| assertEquals( | |
| "Should have expected rows when reading WAP branch", | |
| expectedRows, | |
| sql("SELECT * FROM %s.branch_wap ORDER BY id", tableName)); | |
| assertEquals( | |
| "Should not modify main branch", | |
| originalRows, | |
| sql("SELECT * FROM %s.branch_main ORDER BY id", tableName)); | |
| }); |
I felt the same way but found lot of places where this was followed as one referenced here.
Should I push the change with expected rows inside withSQLConf after MERGE command?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i see, thanks for checking. well, i did it all this way in the other tests in this file, and we both agree, so id say let's just do it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Updated in latest commit
|
PTAL @szehon-ho. |
|
merged, thanks @varun-lakhyani , and all for reviews |
|
Thanks @huaxingao @singhpk234 @szehon-ho for reviews and merging |
Extends Tests for PR #14970
This PR creates a test for MERGE INTO schema evolution in nested case where source has fewer fields than target and by default it keeps the non existing fields in target intact and null for new source rows.
Spark Updates can be found at https://issues.apache.org/jira/browse/SPARK-54274
This specific feature https://issues.apache.org/jira/browse/SPARK-54621
It also renames an existing nested test like other tests so can be better understood
(
testMergeWithSchemaEvolutionNestedStruct()->testMergeWithSchemaEvolutionNestedStructSourceHasMoreFields())