Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Support for Spark 4.0 preview #11257

Closed
wants to merge 10 commits into from

Conversation

huaxingao
Copy link
Contributor

No description provided.

@huaxingao huaxingao marked this pull request as draft October 4, 2024 16:44
@huaxingao huaxingao force-pushed the spark_4.0_preview2 branch 4 times, most recently from bf0feef to 3d99252 Compare October 4, 2024 18:16
@huaxingao huaxingao force-pushed the spark_4.0_preview2 branch 2 times, most recently from d0dd067 to f2d9c5e Compare October 8, 2024 16:18
@huaxingao huaxingao changed the title Initial Support for Spark 4.0 preview2 Initial Support for Spark 4.0 preview Oct 8, 2024
@@ -95,7 +95,7 @@ jobs:
runs-on: ubuntu-22.04
strategy:
matrix:
jvm: [11, 17, 21]
jvm: [17, 21]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for build-checks. Not able to do build-checks using java 11 because Spark 4.0 is build using java 17.

@@ -108,7 +108,7 @@ jobs:
runs-on: ubuntu-22.04
strategy:
matrix:
jvm: [11, 17, 21]
jvm: [17, 21]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for build-javadoc. Not able to do build-javadoc using java 11 because Spark 4.0 is build using java 17.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to do build-javadoc on more than one JVM anyway?

@huaxingao
Copy link
Contributor Author

CI for preview1 passed.
CI for preview2 failed.
Trying SNAPSHOT to see if some of the Spark issues in preview2 have been fixed in SNAPSHOT.

@@ -137,6 +139,7 @@ hadoop2-common = { module = "org.apache.hadoop:hadoop-common", version.ref = "ha
hadoop2-hdfs = { module = "org.apache.hadoop:hadoop-hdfs", version.ref = "hadoop2" }
hadoop2-mapreduce-client-core = { module = "org.apache.hadoop:hadoop-mapreduce-client-core", version.ref = "hadoop2" }
hadoop2-minicluster = { module = "org.apache.hadoop:hadoop-minicluster", version.ref = "hadoop2" }
hadoop34-minicluster = { module = "org.apache.hadoop:hadoop-minicluster", version.ref = "hadoop34" }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after hadoop3

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed

@@ -30,7 +30,7 @@ import org.apache.iceberg.NullOrder
import org.apache.iceberg.SortDirection
import org.apache.iceberg.expressions.Term
import org.apache.iceberg.spark.Spark3Util
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.IcebergAnalysisException
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we switching to our internal Exception class here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because Spark 4.0 no longer allows the construction of a new AnalysisException with just a string message. I actually have a separate PR for this. We can probably merge this change in Spark 3.5, although it's not strictly required there.

@@ -260,7 +267,12 @@ private void checkUpdate(RowLevelOperationMode mode, String cond) {
DistributionMode.NONE.modeName());

Dataset<Row> changeDF = spark.table(tableName).where(cond).limit(2).select("id");
changeDF.coalesce(1).writeTo(tableName(CHANGES_TABLE_NAME)).create();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we rethrowing here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the catch block to make the code compile for preview2. Preview1 doesn't need this. After switching to Preview2, I actually got a bunch of CI failures due to TableAlreadyExistsException in a few test suites. Preview1 works fine. I am still trying to figure out which change between Preview1 and Preview2 caused the behavior changes for TableAlreadyExistsException.

def expr(node: ColumnNode): Expression = {
node match {
case ExpressionColumnNode(expression, _) => expression
case node => throw SparkException.internalError("Unsupported ColumnNode: " + node)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be throwing a Spark Internal error here? Seems like this our issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change this to an Iceberg Exception. I am not making the change in this round because I want to try Preview 1 to see if the other changes can pass the CI. I will fix this later when I try Preview 2.

@@ -108,14 +108,14 @@ public static Object[][] parameters() {
SparkCatalogConfig.SPARK.implementation(),
SparkCatalogConfig.SPARK.properties(),
PARQUET,
ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We are only partially alphabetizing here

This is the kind of thing I do love but it should be

gzip
snappy
zstd

I would probably just skip this change for now and do it in another PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I wanted to ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "1") after gzip is that the new Hadoop version uses CompressionLevel to initialize a GzipCompressor, and this COMPRESSION_LEVEL, "1", is carried over to gzip. However, "1" is not a valid compression level for gzip, so it throws an exception.

    org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 6.0 failed 1 times, most recent failure: Lost task 1.0 in stage 6.0 (TID 7) (192.168.50.141 executor driver): java.lang.IllegalArgumentException: No enum constant org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel.1
    	at java.base/java.lang.Enum.valueOf(Enum.java:273)
    	at org.apache.hadoop.conf.Configuration.getEnum(Configuration.java:1786)
    	at org.apache.hadoop.io.compress.zlib.ZlibFactory.getCompressionLevel(ZlibFactory.java:165)
    	at org.apache.hadoop.io.compress.zlib.BuiltInGzipCompressor.init(BuiltInGzipCompressor.java:157)
    	at org.apache.hadoop.io.compress.zlib.BuiltInGzipCompressor.<init>(BuiltInGzipCompressor.java:67)
    	at org.apache.hadoop.io.compress.GzipCodec.createCompressor(GzipCodec.java:64)
    	at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:152)
    	at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:168)
    	at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.<init>(CodecFactory.java:157)
    	at org.apache.parquet.hadoop.CodecFactory.createCompressor(CodecFactory.java:219)
    	at org.apache.parquet.hadoop.CodecFactory.getCompressor(CodecFactory.java:202)
    	at org.apache.iceberg.parquet.ParquetWriter.<init>(ParquetWriter.java:90)
    	at org.apache.iceberg.parquet.Parquet$WriteBuilder.build(Parquet.java:360)

I think this over; rather than switching the order, it's better to unset the COMPRESSION_CODEC and COMPRESSION_LEVEL for each test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably should have a separate PR and fix this in Spark3.5 too. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep sorry, didn't mean to bikeshed on this, obviously it's not important to this PR :)

@huaxingao
Copy link
Contributor Author

@RussellSpitzer Thanks for your review! I have addressed the comments and switched back to Preview1, along with reverting a few changes I made for Preview2/snapshot. I switched back to Preview1 to test if my changes can pass the CI, since I haven't made Preview2 work yet. Could you please take another look when you have time? Thanks!

@RussellSpitzer
Copy link
Member

I'm ready for V2 :) Let me know when you have those changes up. I'm trying to review this on a per commit basis because the diff is so large :)

@huaxingao
Copy link
Contributor Author

@RussellSpitzer There are some conflict files. If I rebase, it will also pick up changes for Spark3.5, so I opened a new PR. I will ping you for review after I resolve all the test failures in the new PR. THanks!

@huaxingao huaxingao closed this Nov 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants