Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,43 @@
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.avro.file.DataFileConstants;
import org.apache.avro.file.*;

/**
* A mapper class from Spark supported avro compression codecs to avro compression codecs.
*/
public enum AvroCompressionCodec {
UNCOMPRESSED(DataFileConstants.NULL_CODEC),
DEFLATE(DataFileConstants.DEFLATE_CODEC),
SNAPPY(DataFileConstants.SNAPPY_CODEC),
BZIP2(DataFileConstants.BZIP2_CODEC),
XZ(DataFileConstants.XZ_CODEC),
ZSTANDARD(DataFileConstants.ZSTANDARD_CODEC);
UNCOMPRESSED(DataFileConstants.NULL_CODEC, false, -1),
DEFLATE(DataFileConstants.DEFLATE_CODEC, true, CodecFactory.DEFAULT_DEFLATE_LEVEL),
SNAPPY(DataFileConstants.SNAPPY_CODEC, false, -1),
BZIP2(DataFileConstants.BZIP2_CODEC, false, -1),
XZ(DataFileConstants.XZ_CODEC, true, CodecFactory.DEFAULT_XZ_LEVEL),
ZSTANDARD(DataFileConstants.ZSTANDARD_CODEC, true, CodecFactory.DEFAULT_ZSTANDARD_LEVEL);

private final String codecName;
private final boolean supportCompressionLevel;
private final int defaultCompressionLevel;

AvroCompressionCodec(String codecName) {
AvroCompressionCodec(
String codecName,
boolean supportCompressionLevel, int defaultCompressionLevel) {
this.codecName = codecName;
this.supportCompressionLevel = supportCompressionLevel;
this.defaultCompressionLevel = defaultCompressionLevel;
}

public String getCodecName() {
return this.codecName;
}

public boolean getSupportCompressionLevel() {
return this.supportCompressionLevel;
}

public int getDefaultCompressionLevel() {
return this.defaultCompressionLevel;
}

private static final Map<String, String> codecNameMap =
Arrays.stream(AvroCompressionCodec.values()).collect(
Collectors.toMap(codec -> codec.name(), codec -> codec.name().toLowerCase(Locale.ROOT)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.jdk.CollectionConverters._
import org.apache.avro.Schema
import org.apache.avro.file.{DataFileReader, FileReader}
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.mapred.{AvroOutputFormat, FsInput}
import org.apache.avro.mapred.FsInput
import org.apache.avro.mapreduce.AvroJob
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
Expand Down Expand Up @@ -110,10 +110,12 @@ private[sql] object AvroUtils extends Logging {
case compressed =>
job.getConfiguration.setBoolean("mapred.output.compress", true)
job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, compressed.getCodecName)
if (compressed == DEFLATE) {
val deflateLevel = sqlConf.avroDeflateLevel
logInfo(s"Compressing Avro output using the $codecName codec at level $deflateLevel")
job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel)
if (compressed.getSupportCompressionLevel) {
val level = sqlConf.getConfString(s"spark.sql.avro.$codecName.level",
compressed.getDefaultCompressionLevel.toString)
logInfo(s"Compressing Avro output using the $codecName codec at level $level")
val s = if (compressed == ZSTANDARD) "zstd" else codecName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we should put logInfo(s"Compressing Avro output using the $codecName codec at level $level") after val s = if (compressed == ZSTANDARD) "zstd" else codecName.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@beliefer May I ask your reason? For me, it's not required because Avro's real codec name is zstandard instead of zstd .

AVRO REPO https://github.com/apache/avro/blob/8d610fb5c7d3958256801848dbd80d6f9d3c556b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileConstants.java#L41

public static final String ZSTANDARD_CODEC = "zstandard";

SPARK REPO

ZSTANDARD(DataFileConstants.ZSTANDARD_CODEC, true, CodecFactory.DEFAULT_ZSTANDARD_LEVEL);

Copy link
Contributor

@beliefer beliefer Jan 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. zstd only used for avro.mapred.zstd.level.
@dongjoon-hyun Thank you for your explanation.

job.getConfiguration.setInt(s"avro.mapred.$s.level", level.toInt)
} else {
logInfo(s"Compressing Avro output using the $codecName codec")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.avro
import java.util.Locale

import org.apache.spark.SparkIllegalArgumentException
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.datasources.FileSourceCodecSuite
import org.apache.spark.sql.internal.SQLConf

Expand Down Expand Up @@ -58,4 +59,20 @@ class AvroCodecSuite extends FileSourceCodecSuite {
parameters = Map("codecName" -> "unsupported")
)
}

test("SPARK-46759: compression level support for zstandard codec") {
Seq("9", "1").foreach { level =>
withSQLConf(
(SQLConf.AVRO_COMPRESSION_CODEC.key -> "zstandard"),
(SQLConf.AVRO_ZSTANDARD_LEVEL.key -> level)) {
withTable("avro_t") {
sql(
s"""CREATE TABLE avro_t
|USING $format
|AS SELECT 1 as id""".stripMargin)
checkAnswer(spark.table("avro_t"), Seq(Row(1)))
}
}
}
}
}
18 changes: 18 additions & 0 deletions docs/sql-data-sources-avro.md
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,24 @@ Configuration of Avro can be done via `spark.conf.set` or by running `SET key=va
</td>
<td>2.4.0</td>
</tr>
<tr>
<td>spark.sql.avro.xz.level</td>
<td>6</td>
<td>
Compression level for the xz codec used in writing of AVRO files. Valid value must be in
the range of from 1 to 9 inclusive. The default value is 6 in the current implementation.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td>spark.sql.avro.zstandard.level</td>
<td>3</td>
<td>
Compression level for the zstandard codec used in writing of AVRO files.
The default value is 3 in the current implementation.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td>spark.sql.avro.datetimeRebaseModeInRead</td>
<td><code>EXCEPTION</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3625,7 +3625,23 @@ object SQLConf {
.version("2.4.0")
.intConf
.checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
.createWithDefault(Deflater.DEFAULT_COMPRESSION)
.createOptional
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to remove the default Deflater.DEFAULT_COMPRESSION here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, nvm. I found it goes to DEFLATE(DataFileConstants.DEFLATE_CODEC, true, CodecFactory.DEFAULT_DEFLATE_LEVEL),.


val AVRO_XZ_LEVEL = buildConf("spark.sql.avro.zx.level")
.doc("Compression level for the xz codec used in writing of AVRO files. " +
"Valid value must be in the range of from 1 to 9 inclusive " +
"The default value is 6.")
.version("4.0.0")
.intConf
.checkValue(v => v > 0 && v <= 9, "The value must be in the range of from 1 to 9 inclusive.")
.createOptional

val AVRO_ZSTANDARD_LEVEL = buildConf("spark.sql.avro.zstandard.level")
.doc("Compression level for the zstandard codec used in writing of AVRO files. " +
"The default value is 3.")
.version("4.0.0")
.intConf
.createOptional

val LEGACY_SIZE_OF_NULL = buildConf("spark.sql.legacy.sizeOfNull")
.internal()
Expand Down Expand Up @@ -5421,8 +5437,6 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def avroCompressionCodec: String = getConf(SQLConf.AVRO_COMPRESSION_CODEC)

def avroDeflateLevel: Int = getConf(SQLConf.AVRO_DEFLATE_LEVEL)

def replaceDatabricksSparkAvroEnabled: Boolean =
getConf(SQLConf.LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED)

Expand Down