-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-46759][SQL][AVRO] Codec xz and zstandard support compression level for avro files #44786
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…evel for avro files
| .intConf | ||
| .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) | ||
| .createWithDefault(Deflater.DEFAULT_COMPRESSION) | ||
| .createOptional |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to remove the default Deflater.DEFAULT_COMPRESSION here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, nvm. I found it goes to DEFLATE(DataFileConstants.DEFLATE_CODEC, true, CodecFactory.DEFAULT_DEFLATE_LEVEL),.
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really a nice improvement. Thank you, @yaooqinn .
|
Merged to master for Apache Spark 4.0.0. |
|
Thank you very much @dongjoon-hyun |
| val level = sqlConf.getConfString(s"spark.sql.avro.$codecName.level", | ||
| compressed.getDefaultCompressionLevel.toString) | ||
| logInfo(s"Compressing Avro output using the $codecName codec at level $level") | ||
| val s = if (compressed == ZSTANDARD) "zstd" else codecName |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we should put logInfo(s"Compressing Avro output using the $codecName codec at level $level") after val s = if (compressed == ZSTANDARD) "zstd" else codecName.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@beliefer May I ask your reason? For me, it's not required because Avro's real codec name is zstandard instead of zstd .
public static final String ZSTANDARD_CODEC = "zstandard";
SPARK REPO
spark/connector/avro/src/main/java/org/apache/spark/sql/avro/AvroCompressionCodec.java
Line 36 in 39f8e1a
| ZSTANDARD(DataFileConstants.ZSTANDARD_CODEC, true, CodecFactory.DEFAULT_ZSTANDARD_LEVEL); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. zstd only used for avro.mapred.zstd.level.
@dongjoon-hyun Thank you for your explanation.
What changes were proposed in this pull request?
This PR introduces 2 keys in the form of 'spark.sql.avro.$codecName.level' just like the existing 'spark.sql.avro.deflate.level' for standard and xz codec. W/ this patch, users are able to play the trade-off between the speed and compression ratio when they use AVRO compressed by zstd or xz.
Why are the changes needed?
Avro supports compression level for deflate, xz and zstd, but we have only supported deflate.
Does this PR introduce any user-facing change?
yes, new configurations added
How was this patch tested?
new tests
Was this patch authored or co-authored using generative AI tooling?
no