Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ private[sql] class AvroDeserializer(
private lazy val preventReadingIncorrectType = !SQLConf.get
.getConf(SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA)

private lazy val allowIncompatibleDecimalType = SQLConf.get
.getConf(SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_DECIMAL_TYPE)

def deserialize(data: Any): Option[Any] = converter(data)

/**
Expand Down Expand Up @@ -238,8 +241,7 @@ private[sql] class AvroDeserializer(

case (FIXED, dt: DecimalType) =>
val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
if (preventReadingIncorrectType &&
d.getPrecision - d.getScale > dt.precision - dt.scale) {
if (preventReadingIncorrectType && !isDecimalTypeCompatible(d, dt)) {
throw QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath),
toFieldStr(catalystPath), realDataType.catalogString, dt.catalogString)
}
Expand All @@ -251,8 +253,7 @@ private[sql] class AvroDeserializer(

case (BYTES, dt: DecimalType) =>
val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
if (preventReadingIncorrectType &&
d.getPrecision - d.getScale > dt.precision - dt.scale) {
if (preventReadingIncorrectType && !isDecimalTypeCompatible(d, dt)) {
throw QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath),
toFieldStr(catalystPath), realDataType.catalogString, dt.catalogString)
}
Expand Down Expand Up @@ -394,6 +395,16 @@ private[sql] class AvroDeserializer(
}
}

private def isDecimalTypeCompatible(d: LogicalTypes.Decimal, dt: DecimalType): Boolean = {
if (allowIncompatibleDecimalType) {
dt.precision - dt.scale >= d.getPrecision - d.getScale
} else {
val precisionIncrease = dt.precision - d.getPrecision
val scaleIncrease = dt.scale - d.getScale
scaleIncrease >= 0 && precisionIncrease >= scaleIncrease
}
}

// TODO: move the following method in Decimal object on creating Decimal from BigDecimal?
private def createDecimal(decimal: BigDecimal, precision: Int, scale: Int): Decimal = {
if (precision <= Decimal.MAX_LONG_DIGITS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,11 +902,7 @@ abstract class AvroSuite
matchPVals = true
)
}
// The following used to work, so it should still work with the flag enabled
checkAnswer(
spark.read.schema("a DECIMAL(5, 3)").format("avro").load(path.toString),
Row(new java.math.BigDecimal("13.123"))
)

withSQLConf(confKey -> "true") {
// With the flag enabled, we return a null silently, which isn't great
checkAnswer(
Expand All @@ -921,6 +917,43 @@ abstract class AvroSuite
}
}

test("SPARK-49095: Add scale comparison to avoid loss of precision in the decimal part") {
withTempPath { path =>
val schemaConfKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key
val decimalConfKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_DECIMAL_TYPE.key
sql("SELECT 13.1234567890 a").write.format("avro").save(path.toString)
withSQLConf(schemaConfKey -> "false") {
withSQLConf(decimalConfKey -> "false") {
val ex = intercept[SparkException] {
spark.read.schema("a DECIMAL(5, 3)").format("avro").load(path.toString).collect()
}
assert(ex.getErrorClass.startsWith("FAILED_READ_FILE"))
checkError(
exception = ex.getCause.asInstanceOf[AnalysisException],
errorClass = "AVRO_INCOMPATIBLE_READ_TYPE",
parameters = Map("avroPath" -> "field 'a'",
"sqlPath" -> "field 'a'",
"avroType" -> "decimal\\(12,10\\)",
"sqlType" -> "\"DECIMAL\\(5,3\\)\""),
matchPVals = true
)

checkAnswer(
spark.read.schema("a DECIMAL(13, 11)").format("avro").load(path.toString),
Row(new java.math.BigDecimal("13.1234567890"))
)
}

withSQLConf(decimalConfKey -> "true") {
checkAnswer(
spark.read.schema("a DECIMAL(5, 3)").format("avro").load(path.toString),
Row(new java.math.BigDecimal("13.123"))
)
}
}
}
}

test("SPARK-49082: Widening type promotions in AvroDeserializer") {
withTempPath { tempPath =>
// Int -> Long
Expand Down
2 changes: 2 additions & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ license: |
- Since Spark 4.0, By default views tolerate column type changes in the query and compensate with casts. To restore the previous behavior, allowing up-casts only, set `spark.sql.legacy.viewSchemaCompensation` to `false`.
- Since Spark 4.0, Views allow control over how they react to underlying query changes. By default views tolerate column type changes in the query and compensate with casts. To disable this feature set `spark.sql.legacy.viewSchemaBindingMode` to `false`. This also removes the clause from `DESCRIBE EXTENDED` and `SHOW CREATE TABLE`.
- Since Spark 4.0, The Storage-Partitioned Join feature flag `spark.sql.sources.v2.bucketing.pushPartValues.enabled` is set to `true`. To restore the previous behavior, set `spark.sql.sources.v2.bucketing.pushPartValues.enabled` to `false`.
- Since Spark 4.0, when reading decimal fields from Avro data source, it requires that not only the `precision - scale` value of the `DecimalType` should be greater than or equal to that of decimal fields, but also the scale of the `DecimalType` should be greater than or equal to the scale of decimal fields. Otherwise, an `AnalysisException` will be thrown. To restore the legacy behavior, set `spark.sql.legacy.avro.allowIncompatibleDecimalType` to `true`.


## Upgrading from Spark SQL 3.5.1 to 3.5.2

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4841,6 +4841,19 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val LEGACY_AVRO_ALLOW_INCOMPATIBLE_DECIMAL_TYPE =
buildConf("spark.sql.legacy.avro.allowIncompatibleDecimalType")
.internal()
.doc("When set to false, when reading decimal fields from Avro data source, it requires " +
"that not only the `precision - scale` value of the `DecimalType` should be greater " +
"than or equal to that of decimal fields, but also the scale of the `DecimalType` " +
"should be greater than or equal to the scale of decimal fields. When set to true, it " +
"restores the legacy behavior, allowing the scale of `DecimalType` to be less than " +
"the scale of decimal fields, which may cause a loss of precision in the decimal part.")
.version("4.0.0")
.booleanConf
.createWithDefault(false)

val LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME =
buildConf("spark.sql.legacy.v1IdentifierNoCatalog")
.internal()
Expand Down