diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index e2cfda6da42d..71205f967b18 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -210,7 +210,7 @@ class ParquetFilters( longColumn(n), Option(v).map(d => decimalToInt64(d.asInstanceOf[JBigDecimal])).orNull) case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, FIXED_LEN_BYTE_ARRAY, length) - if pushDownDecimal => + if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(d => decimalToByteArray(d.asInstanceOf[JBigDecimal], length)).orNull) @@ -261,7 +261,7 @@ class ParquetFilters( longColumn(n), Option(v).map(d => decimalToInt64(d.asInstanceOf[JBigDecimal])).orNull) case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, FIXED_LEN_BYTE_ARRAY, length) - if pushDownDecimal => + if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.notEq( binaryColumn(n), Option(v).map(d => decimalToByteArray(d.asInstanceOf[JBigDecimal], length)).orNull) @@ -300,7 +300,7 @@ class ParquetFilters( (n: Array[String], v: Any) => FilterApi.lt(longColumn(n), decimalToInt64(v.asInstanceOf[JBigDecimal])) case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, FIXED_LEN_BYTE_ARRAY, length) - if pushDownDecimal => + if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.lt(binaryColumn(n), decimalToByteArray(v.asInstanceOf[JBigDecimal], length)) } @@ -338,7 +338,7 @@ class ParquetFilters( (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), decimalToInt64(v.asInstanceOf[JBigDecimal])) case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, FIXED_LEN_BYTE_ARRAY, length) - if pushDownDecimal => + if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.ltEq(binaryColumn(n), decimalToByteArray(v.asInstanceOf[JBigDecimal], length)) } @@ -376,7 +376,7 @@ class ParquetFilters( (n: Array[String], v: Any) => FilterApi.gt(longColumn(n), decimalToInt64(v.asInstanceOf[JBigDecimal])) case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, FIXED_LEN_BYTE_ARRAY, length) - if pushDownDecimal => + if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.gt(binaryColumn(n), decimalToByteArray(v.asInstanceOf[JBigDecimal], length)) } @@ -414,7 +414,7 @@ class ParquetFilters( (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), decimalToInt64(v.asInstanceOf[JBigDecimal])) case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, FIXED_LEN_BYTE_ARRAY, length) - if pushDownDecimal => + if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.gtEq(binaryColumn(n), decimalToByteArray(v.asInstanceOf[JBigDecimal], length)) } @@ -486,21 +486,22 @@ class ParquetFilters( FilterApi.gtEq(longColumn(n), statistics.genericGetMin().asInstanceOf[JLong]), FilterApi.ltEq(longColumn(n), statistics.genericGetMax().asInstanceOf[JLong])) - case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => + case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT32, _) if pushDownDecimal => (n: Array[String], v: Array[Any], statistics: ParquetStatistics[_]) => v.map(_.asInstanceOf[JBigDecimal]).map(decimalToInt32).foreach(statistics.updateStats(_)) FilterApi.and( FilterApi.gtEq(intColumn(n), statistics.genericGetMin().asInstanceOf[Integer]), FilterApi.ltEq(intColumn(n), statistics.genericGetMax().asInstanceOf[Integer])) - case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal => + case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT64, _) if pushDownDecimal => (n: Array[String], v: Array[Any], statistics: ParquetStatistics[_]) => v.map(_.asInstanceOf[JBigDecimal]).map(decimalToInt64).foreach(statistics.updateStats(_)) FilterApi.and( FilterApi.gtEq(longColumn(n), statistics.genericGetMin().asInstanceOf[JLong]), FilterApi.ltEq(longColumn(n), statistics.genericGetMax().asInstanceOf[JLong])) - case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if pushDownDecimal => + case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, FIXED_LEN_BYTE_ARRAY, length) + if pushDownDecimal => (path: Array[String], v: Array[Any], statistics: ParquetStatistics[_]) => v.map(d => decimalToByteArray(d.asInstanceOf[JBigDecimal], length)) .foreach(statistics.updateStats)