diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 877c3f89e88c..d76846120b6b 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -111,6 +111,9 @@ private[sql] class AvroDeserializer( private lazy val preventReadingIncorrectType = !SQLConf.get .getConf(SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA) + private lazy val allowIncompatibleDecimalType = SQLConf.get + .getConf(SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_DECIMAL_TYPE) + def deserialize(data: Any): Option[Any] = converter(data) /** @@ -238,8 +241,7 @@ private[sql] class AvroDeserializer( case (FIXED, dt: DecimalType) => val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal] - if (preventReadingIncorrectType && - d.getPrecision - d.getScale > dt.precision - dt.scale) { + if (preventReadingIncorrectType && !isDecimalTypeCompatible(d, dt)) { throw QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath), toFieldStr(catalystPath), realDataType.catalogString, dt.catalogString) } @@ -251,8 +253,7 @@ private[sql] class AvroDeserializer( case (BYTES, dt: DecimalType) => val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal] - if (preventReadingIncorrectType && - d.getPrecision - d.getScale > dt.precision - dt.scale) { + if (preventReadingIncorrectType && !isDecimalTypeCompatible(d, dt)) { throw QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath), toFieldStr(catalystPath), realDataType.catalogString, dt.catalogString) } @@ -394,6 +395,16 @@ private[sql] class AvroDeserializer( } } + private def isDecimalTypeCompatible(d: LogicalTypes.Decimal, dt: DecimalType): Boolean = { + if (allowIncompatibleDecimalType) { + dt.precision - dt.scale >= d.getPrecision - d.getScale + } else { + val precisionIncrease = dt.precision - d.getPrecision + val scaleIncrease = dt.scale - d.getScale + scaleIncrease >= 0 && precisionIncrease >= scaleIncrease + } + } + // TODO: move the following method in Decimal object on creating Decimal from BigDecimal? private def createDecimal(decimal: BigDecimal, precision: Int, scale: Int): Decimal = { if (precision <= Decimal.MAX_LONG_DIGITS) { diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 1f00392420be..615e4afa88d6 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -902,11 +902,7 @@ abstract class AvroSuite matchPVals = true ) } - // The following used to work, so it should still work with the flag enabled - checkAnswer( - spark.read.schema("a DECIMAL(5, 3)").format("avro").load(path.toString), - Row(new java.math.BigDecimal("13.123")) - ) + withSQLConf(confKey -> "true") { // With the flag enabled, we return a null silently, which isn't great checkAnswer( @@ -921,6 +917,43 @@ abstract class AvroSuite } } + test("SPARK-49095: Add scale comparison to avoid loss of precision in the decimal part") { + withTempPath { path => + val schemaConfKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key + val decimalConfKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_DECIMAL_TYPE.key + sql("SELECT 13.1234567890 a").write.format("avro").save(path.toString) + withSQLConf(schemaConfKey -> "false") { + withSQLConf(decimalConfKey -> "false") { + val ex = intercept[SparkException] { + spark.read.schema("a DECIMAL(5, 3)").format("avro").load(path.toString).collect() + } + assert(ex.getErrorClass.startsWith("FAILED_READ_FILE")) + checkError( + exception = ex.getCause.asInstanceOf[AnalysisException], + errorClass = "AVRO_INCOMPATIBLE_READ_TYPE", + parameters = Map("avroPath" -> "field 'a'", + "sqlPath" -> "field 'a'", + "avroType" -> "decimal\\(12,10\\)", + "sqlType" -> "\"DECIMAL\\(5,3\\)\""), + matchPVals = true + ) + + checkAnswer( + spark.read.schema("a DECIMAL(13, 11)").format("avro").load(path.toString), + Row(new java.math.BigDecimal("13.1234567890")) + ) + } + + withSQLConf(decimalConfKey -> "true") { + checkAnswer( + spark.read.schema("a DECIMAL(5, 3)").format("avro").load(path.toString), + Row(new java.math.BigDecimal("13.123")) + ) + } + } + } + } + test("SPARK-49082: Widening type promotions in AvroDeserializer") { withTempPath { tempPath => // Int -> Long diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index ad678c44657e..93c320b71a75 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -60,6 +60,8 @@ license: | - Since Spark 4.0, By default views tolerate column type changes in the query and compensate with casts. To restore the previous behavior, allowing up-casts only, set `spark.sql.legacy.viewSchemaCompensation` to `false`. - Since Spark 4.0, Views allow control over how they react to underlying query changes. By default views tolerate column type changes in the query and compensate with casts. To disable this feature set `spark.sql.legacy.viewSchemaBindingMode` to `false`. This also removes the clause from `DESCRIBE EXTENDED` and `SHOW CREATE TABLE`. - Since Spark 4.0, The Storage-Partitioned Join feature flag `spark.sql.sources.v2.bucketing.pushPartValues.enabled` is set to `true`. To restore the previous behavior, set `spark.sql.sources.v2.bucketing.pushPartValues.enabled` to `false`. +- Since Spark 4.0, when reading decimal fields from Avro data source, it requires that not only the `precision - scale` value of the `DecimalType` should be greater than or equal to that of decimal fields, but also the scale of the `DecimalType` should be greater than or equal to the scale of decimal fields. Otherwise, an `AnalysisException` will be thrown. To restore the legacy behavior, set `spark.sql.legacy.avro.allowIncompatibleDecimalType` to `true`. + ## Upgrading from Spark SQL 3.5.1 to 3.5.2 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4202f2453c92..d3f10e00df9a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4841,6 +4841,19 @@ object SQLConf { .booleanConf .createWithDefault(false) + val LEGACY_AVRO_ALLOW_INCOMPATIBLE_DECIMAL_TYPE = + buildConf("spark.sql.legacy.avro.allowIncompatibleDecimalType") + .internal() + .doc("When set to false, when reading decimal fields from Avro data source, it requires " + + "that not only the `precision - scale` value of the `DecimalType` should be greater " + + "than or equal to that of decimal fields, but also the scale of the `DecimalType` " + + "should be greater than or equal to the scale of decimal fields. When set to true, it " + + "restores the legacy behavior, allowing the scale of `DecimalType` to be less than " + + "the scale of decimal fields, which may cause a loss of precision in the decimal part.") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + val LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME = buildConf("spark.sql.legacy.v1IdentifierNoCatalog") .internal()