From 0e8042f80bf0b924ead4d22d4ef09b0be846fe3e Mon Sep 17 00:00:00 2001 From: Wei Guo Date: Fri, 2 Aug 2024 17:43:47 +0800 Subject: [PATCH 1/3] update --- .../spark/sql/avro/AvroDeserializer.scala | 9 ++++++ .../org/apache/spark/sql/avro/AvroSuite.scala | 28 +++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 139c45adb442..877c3f89e88c 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -141,6 +141,12 @@ private[sql] class AvroDeserializer( case (INT, IntegerType) => (updater, ordinal, value) => updater.setInt(ordinal, value.asInstanceOf[Int]) + case (INT, LongType) => (updater, ordinal, value) => + updater.setLong(ordinal, value.asInstanceOf[Int]) + + case (INT, DoubleType) => (updater, ordinal, value) => + updater.setDouble(ordinal, value.asInstanceOf[Int]) + case (INT, dt: DatetimeType) if preventReadingIncorrectType && realDataType.isInstanceOf[YearMonthIntervalType] => throw QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath), @@ -194,6 +200,9 @@ private[sql] class AvroDeserializer( case (FLOAT, FloatType) => (updater, ordinal, value) => updater.setFloat(ordinal, value.asInstanceOf[Float]) + case (FLOAT, DoubleType) => (updater, ordinal, value) => + updater.setDouble(ordinal, value.asInstanceOf[Float]) + case (DOUBLE, DoubleType) => (updater, ordinal, value) => updater.setDouble(ordinal, value.asInstanceOf[Double]) diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index ce38ada7c9e4..976dcb440c6d 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -921,6 +921,34 @@ abstract class AvroSuite } } + test("SPARK-49082: Widening type promotions in AvroDeserializer") { + withTempPath { tempPath => + // Int -> Long + val intPath = s"$tempPath/int_data" + val intDf = Seq(1, Int.MinValue, Int.MaxValue).toDF("col") + intDf.write.format("avro").save(intPath) + checkAnswer( + spark.read.schema("col Long").format("avro").load(intPath), + Seq(Row(1L), Row(-2147483648L), Row(2147483647L)) + ) + + // Int -> Double + checkAnswer( + spark.read.schema("col Double").format("avro").load(intPath), + Seq(Row(1D), Row(-2147483648D), Row(2147483647D)) + ) + + // Float -> Double + val floatPath = s"$tempPath/float_data1" + val floatDf = Seq(1, -1f, 2f).toDF("col") + floatDf.write.format("avro").save(floatPath) + checkAnswer( + spark.read.schema("col Double").format("avro").load(floatPath), + Seq(Row(1D), Row(-1D), Row(2D)) + ) + } + } + test("SPARK-43380: Fix Avro data type conversion" + " of DayTimeIntervalType to avoid producing incorrect results") { withTempPath { path => From 418170e81067a34212c0a7d06be0d013348bd0c2 Mon Sep 17 00:00:00 2001 From: Wei Guo Date: Mon, 5 Aug 2024 18:55:43 +0800 Subject: [PATCH 2/3] update --- .../org/apache/spark/sql/avro/AvroDeserializer.scala | 2 +- .../scala/org/apache/spark/sql/avro/AvroSuite.scala | 11 ++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 877c3f89e88c..2a18db6ca91f 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -201,7 +201,7 @@ private[sql] class AvroDeserializer( updater.setFloat(ordinal, value.asInstanceOf[Float]) case (FLOAT, DoubleType) => (updater, ordinal, value) => - updater.setDouble(ordinal, value.asInstanceOf[Float]) + updater.setDouble(ordinal, value.asInstanceOf[Float].toString.toDouble) case (DOUBLE, DoubleType) => (updater, ordinal, value) => updater.setDouble(ordinal, value.asInstanceOf[Double]) diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 976dcb440c6d..dac8ebbd03a4 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -939,12 +939,17 @@ abstract class AvroSuite ) // Float -> Double - val floatPath = s"$tempPath/float_data1" - val floatDf = Seq(1, -1f, 2f).toDF("col") + val floatPath = s"$tempPath/float_data" + val floatDf = Seq(1.34F, + Float.MinValue, Float.MinPositiveValue, Float.MaxValue, + Float.NaN, Float.NegativeInfinity, Float.PositiveInfinity + ).toDF("col") floatDf.write.format("avro").save(floatPath) checkAnswer( spark.read.schema("col Double").format("avro").load(floatPath), - Seq(Row(1D), Row(-1D), Row(2D)) + Seq(Row(1.34D), + Row(-3.4028235E38D), Row(1.4E-45D), Row(3.4028235E38D), + Row(Double.NaN), Row(Double.NegativeInfinity), Row(Double.PositiveInfinity)) ) } } From a7b7793898fe7742dcc2f8f6ea4526eafcd0c95c Mon Sep 17 00:00:00 2001 From: Wei Guo Date: Tue, 6 Aug 2024 22:24:58 +0800 Subject: [PATCH 3/3] update --- .../scala/org/apache/spark/sql/avro/AvroDeserializer.scala | 2 +- .../test/scala/org/apache/spark/sql/avro/AvroSuite.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 2a18db6ca91f..877c3f89e88c 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -201,7 +201,7 @@ private[sql] class AvroDeserializer( updater.setFloat(ordinal, value.asInstanceOf[Float]) case (FLOAT, DoubleType) => (updater, ordinal, value) => - updater.setDouble(ordinal, value.asInstanceOf[Float].toString.toDouble) + updater.setDouble(ordinal, value.asInstanceOf[Float]) case (DOUBLE, DoubleType) => (updater, ordinal, value) => updater.setDouble(ordinal, value.asInstanceOf[Double]) diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index dac8ebbd03a4..1f00392420be 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -940,15 +940,15 @@ abstract class AvroSuite // Float -> Double val floatPath = s"$tempPath/float_data" - val floatDf = Seq(1.34F, + val floatDf = Seq(1F, Float.MinValue, Float.MinPositiveValue, Float.MaxValue, Float.NaN, Float.NegativeInfinity, Float.PositiveInfinity ).toDF("col") floatDf.write.format("avro").save(floatPath) checkAnswer( spark.read.schema("col Double").format("avro").load(floatPath), - Seq(Row(1.34D), - Row(-3.4028235E38D), Row(1.4E-45D), Row(3.4028235E38D), + Seq(Row(1D), + Row(-3.4028234663852886E38D), Row(1.401298464324817E-45D), Row(3.4028234663852886E38D), Row(Double.NaN), Row(Double.NegativeInfinity), Row(Double.PositiveInfinity)) ) }