Skip to content

Conversation

@singhpk234
Copy link
Contributor

About the change :

PR to address #5056 (comment),

This PR contains commits in following order :

  1. Spark: Support Spark 3.3 #5056
    a) move spark/v3.2 to spark/v3.3
    b) changes to make spark 3.3 work
  2. copy spark/v3.3 files to spark/v3.2 (done after 1.a)
  3. ci changes to make spark/v3.2 and spark/v3.3 work together

This is based on proposal by @hililiwei : https://lists.apache.org/thread/3qk85db08rvdon15fyfvyf0y4cd3d8sq

issue:


cc @rdblue @aokolnychyi @jackye1995 @RussellSpitzer @kbendick @amogh-jahagirdar @rajarshisarkar @flyrain @hililiwei @pan3793

@rdblue
Copy link
Contributor

rdblue commented Jun 20, 2022

@singhpk234, can you post a gist of the diff between 3.2 and 3.3? I see there's an addition for metadata columns.

@rdblue
Copy link
Contributor

rdblue commented Jun 20, 2022

Here's a gist with the diff: https://gist.github.com/rdblue/73c575f7ada567c3fb3b997918f3bf9d

// to make sure io.netty.buffer only comes from project(':iceberg-arrow')
exclude group: 'io.netty', module: 'netty-buffer'
exclude group: 'org.roaringbitmap'
exclude group: 'com.fasterxml'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this added?

Copy link
Contributor Author

@singhpk234 singhpk234 Jun 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in my fork when I was running the UT with the spark 3.3 snapshot, my ut's were failing due to conflicts in jar, was
getting : https://github.com/singhpk234/iceberg/runs/6792894997?check_suite_focus=true

org.apache.iceberg.spark.sql.TestTimestampWithoutZone > testCreateNewTableShouldHaveTimestampWithoutZoneIcebergType[catalogName = spark_catalog, implementation = org.apache.iceberg.spark.SparkSessionCatalog, config = {type=hive, default-namespace=default, parquet-enabled=true, cache-enabled=false}] FAILED
java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.catalyst.util.RebaseDateTime$

even after forcing the com.fasterxml verison to 2.12.3

I removed this with released 3.3 and see ut's which were failing earlier now succeeding , seems like problem was in un-published 3.3 i.e snapshot, removed it from this.

public SparkTable loadTable(Identifier ident, long timestamp) throws NoSuchTableException {
try {
Pair<Table, Long> icebergTable = load(ident);
return new SparkTable(icebergTable.first(), SnapshotUtil.snapshotIdAsOfTime(icebergTable.first(), timestamp),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move snapshotIdAsOfTime to a separate line? This makes line wrapping weird so it would be better to save it in a local variable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also want to follow up with tests for the new load table clauses.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may sense to move these sorts of changes out into another PR? Try to keep this one to just getting all test passing and the build working

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, can add it in a follow-up pr, if everyone is onboard.

try {
return getSessionCatalog().loadNamespaceMetadata(namespace);
} catch (Exception e) {
// work around because it's now throwing NoSuchDbException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We avoid catching Exception because it is too broad. Can you make this catch clause more specific?

Copy link
Contributor Author

@singhpk234 singhpk234 Jun 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried specifying NoSuchDatabaseException but since loadNamespaceMetadata doesn't specify throwing NoSuchDatabaseException in it's signature my ide started complaining and hence I had to swtich to catching generic exception.

I think spark must need to do additional checks here for catalog.databaseExists(db) then load db : https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala#L246

I though of filing an issue in spark, but didn't since it can be worked around in iceberg, though it's not good.

Your thoughs ?

Copy link
Contributor Author

@singhpk234 singhpk234 Jun 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some digging and found we can work this around if we override the namespaceExists and delegate it to V2SessionCatalog#namespaceExists, default implementation present in SupportNameSpace uses loadNamespaceMetadata which is causing this issue.

Never the less the behavior of V2SessionCatalog#loadNamespaceMetadata seemed a off as compared to other exisiting catalogs in spark for ex. JDBC catalog, have filed a jira and a PR in spark for same as well.

Copy link
Contributor Author

@singhpk234 singhpk234 Jun 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

V2SessionCatalog#loadNamespaceMetadata seemed a off as compared to other exisiting catalogs in spark for ex. JDBC catalog

The fix for this has been merged in the upstream, never the less the work around to override namespaceExists seems reasonable to me, will mark this comment resolved if everyone is onboard.

.set("org.apache.spark.sql.parquet.row.attributes", sparkSchema.json())
.set("spark.sql.parquet.writeLegacyFormat", "false")
.set("spark.sql.parquet.outputTimestampType", "INT96")
.set("spark.sql.parquet.fieldId.write.enabled", "true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this needed?

Copy link
Contributor Author

@singhpk234 singhpk234 Jun 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a newly introduced conf in spark 3.3, since we directly pass conf from here to ParquetWriter -> ParquetWriteSupport -> SparkToParquetSchemaConverter . then this tries to get the value of this conf :

  def this(conf: Configuration) = this(
    writeLegacyParquetFormat = conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean,
    outputTimestampType = SQLConf.ParquetOutputTimestampType.withName(
      conf.get(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key)),
    useFieldId = conf.get(SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key).toBoolean)

now if this conf is not set in the config we passed it will fail with NPE. trying to call to get boolean value of a null value. Hence setting this in the conf we are passing would be required

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. Sounds like a Spark bug, but I think it's reasonable to set it in our tests. Also, since this is writing extra data it is good to ensure that we can consume the files that were produced.


EliminateSubqueryAliases(aliasedTable) match {
case r @ DataSourceV2Relation(tbl: SupportsRowLevelOperations, _, _, _, _) =>
val table = buildOperationTable(tbl, MERGE, CaseInsensitiveStringMap.empty())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use r.options

Copy link
Contributor Author

@singhpk234 singhpk234 Jun 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have passed empty options based on this discussion in supporting delete pr in spark : apache/spark#35395 (comment)

These are options passed into newRowLevelOperationBuilder and I thought they should come from the SQL operation. For example, if Spark adds a clause OPTIONS to its SQL for DELETE, UPDATE, MERGE, then these values will be propagated here.

based on the rationale above by @aokolnychyi I decided to keep them same here as well.

Copy link
Contributor

@aokolnychyi aokolnychyi Jun 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using empty options when building row-level ops seems correct to me.

SortOrder(catalystChild, toCatalyst(direction), toCatalyst(nullOrdering), Seq.empty)
case IdentityTransform(ref) =>
resolveRef[NamedExpression](ref, query)
case t :Transform if BucketTransform.unapply(t).isDefined =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not update to use the matcher? It looks like Spark added an argument?

Copy link
Contributor Author

@singhpk234 singhpk234 Jun 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BucketTransform extractor now expects the unapply to be of type Transform post this 3.3 commit. since expr is of type V2Expression on specifying the matcher compiler complains with no corresponding unapply() found. Hence I had to it this way

.config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname))
.config("spark.sql.shuffle.partitions", "4")
.config("spark.sql.hive.metastorePartitionPruningFallbackOnException", "true")
.config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was adding this needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added as we in our UT's were providing, schema via json via DataFrameReader.schema(schema).json(jsonDataset) which was not respecting the nullability in the schema provided and hence an UT in TestMerge was failing, #5056 (comment)

Hence to preserve this behaviour in our helper I turned this on. Do you recommend refactoring our Test Utils

This was a breaking change and spark has documented this here as well : https://spark.apache.org/docs/latest/sql-migration-guide.html#upgrading-from-spark-sql-32-to-33

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's probably a cleanup, but this seems fine for now.

@rdblue
Copy link
Contributor

rdblue commented Jun 20, 2022

@singhpk234, thanks for putting this together! I did a round of review. Hopefully we can get this in soon!

It would be great if @aokolnychyi could take a look as well.

@singhpk234
Copy link
Contributor Author

Many thanks @rdblue, for the feedbacks, have addressed most of them and reverted on the rest.

It would be great if @aokolnychyi could take a look as well.

+1, it would be awesome :) !!!

Here is a gist post addressing feedbacks : https://gist.github.com/singhpk234/640953c2658a6b16fa94c60c8066c4c0

@hililiwei
Copy link
Contributor

Just as a reminder:

move spark/v3.2 to spark/v3.3 requires a separate Commiter history, so dont use squash when merging this PR.

@singhpk234 singhpk234 force-pushed the feature/3.2_on_top_of_3.3 branch from 44cdca9 to 13be405 Compare June 21, 2022 18:57
@aokolnychyi
Copy link
Contributor

I'll be able to take a look today. Sorry for the delay!

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a quick look over the Iceberg code. I'll take a look at the extensions tomorrow.

}
}

private Metadata fieldMetadata(int fieldId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this a necessary change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for the deletes that doesn't uses extension, we require this.
Basically now when we filter-out attributes from query of ReplaceData the query now contains attributes created here and the filtering now checks if an attribute is MetadataAtrribute or not, which without this change will not be filtered. Hence we face an issue like #5056 (comment) without this change.


@Override
public Identifier[] listFunctions(String[] namespace) {
return new Identifier[0];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue, should we delegate to the Spark catalog in new function-related methods?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this is in, yes.

case t: Transform if BucketTransform.unapply(t).isDefined =>
t match {
// sort columns will be empty for bucket.
case BucketTransform(numBuckets, cols, _) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we use the extractor right away?

Copy link
Contributor Author

@singhpk234 singhpk234 Jun 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BucketTransform extractor now expects the unapply to be of type Transform post this 3.3 commit. since expr is of type V2Expression on specifying in the matcher, compiler complains with no corresponding unapply() found. Hence I had to do it this way

@singhpk234
Copy link
Contributor Author

singhpk234 commented Jun 22, 2022

I am a little bit confused. Shouldn't #5056 be reviewed and merged first?

It was what I had in mind originally as well :

  1. we would first merge Spark: Support Spark 3.3 #5056, which will make master have only 3.3 and not 3.2
  2. then we would merge this pr adding back 3.2 and make ci work with 3.3.

since we started with this pr, I think post finalizing 3.3 changes will make this pr 3.3 only and move 3.2 to #5056.

which @aokolnychyi suggested as well.

@singhpk234
Copy link
Contributor Author

singhpk234 commented Jun 22, 2022

Thanks @aokolnychyi for reviewing this PR, have addressed the feedbacks from round 1 of your review.

Here is the updated gist post addressing feedbacks: https://gist.github.com/singhpk234/640953c2658a6b16fa94c60c8066c4c0

@singhpk234
Copy link
Contributor Author

Thanks @RussellSpitzer, for the feedbacks, have addressed most of them and reverted on the rest

Here is the updated gist post addressing feedbacks: https://gist.github.com/singhpk234/640953c2658a6b16fa94c60c8066c4c0

@singhpk234 singhpk234 force-pushed the feature/3.2_on_top_of_3.3 branch from 0400057 to d7c9b8a Compare June 25, 2022 15:43
Copy link
Contributor

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@singhpk234, I think this is ready to commit. What PR should we commit first? This one?

@rdblue
Copy link
Contributor

rdblue commented Jun 28, 2022

Actually, I think the idea is to rebase and merge this rather than squash and merge. Unfortunately, rebase and merge doesn't work. @singhpk234, can you rebase and fix the problems and then I can rebase & merge without conflicts tomorrow?

@singhpk234 singhpk234 force-pushed the feature/3.2_on_top_of_3.3 branch from d7c9b8a to 697cf26 Compare June 28, 2022 05:05
@singhpk234
Copy link
Contributor Author

@singhpk234, I think this is ready to commit. What PR should we commit first? This one?

I was initially thinking this could be first one to get in having just 3.3 without 3.2 and immediately we would merge a followup pr to addback a 3.2 with making 3.3 and 3.2 ci work together.

I think the idea is to rebase and merge this rather than squash and merge

+1, This is a good idea, if we don't squash we will be able to preserve the history.

@singhpk234, can you rebase and fix the problems and then

Thanks @rdblue, have rebased the changes, here is the updated gist post rebase : https://gist.github.com/singhpk234/640953c2658a6b16fa94c60c8066c4c0

@singhpk234 singhpk234 force-pushed the feature/3.2_on_top_of_3.3 branch from 697cf26 to 75be0a3 Compare June 28, 2022 13:33
@rdblue rdblue merged commit b9dfdc2 into apache:master Jun 28, 2022
@rdblue
Copy link
Contributor

rdblue commented Jun 28, 2022

Thanks, @singhpk234! Great to have Spark 3.3 support!

Can you check for any updates to Spark 3.2 that have gone in since this was started so we can keep 3.3 up to date?

@singhpk234
Copy link
Contributor Author

singhpk234 commented Jun 28, 2022

Can you check for any updates to Spark 3.2 that have gone in since this was started so we can keep 3.3 up to date?

confirmed last 3 commits that went to 3.2 (master) is also present in both 3.2 / 3.3

  1. revert - 612fa2d
  2. Spark: Validate HMS uri in SparkSessionCatalog #5134
  3. Spark: Add __metadata_col metadata in metadata columns when doing Schema Conversion #5075

I did the following as well (before pushing final changes to the pr) :

  1. resetted this branch to master
  2. re-did mv & copy and applied / squashed review comments in make spark 3.3 work
  3. verified the diff of v3.2 with v3.3 against approved diff gist's.

@singhpk234
Copy link
Contributor Author

Thank you everyone for your awesome reviews :) !!!

I learned a lot from this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants