Skip to content

Commit 0ebe356

Browse files
committed
parquet support for date type
1 parent a96b727 commit 0ebe356

File tree

4 files changed

+16
-2
lines changed

4 files changed

+16
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.parquet
1919

20+
import java.sql.Date
21+
2022
import org.apache.spark.sql.catalyst.types.decimal.Decimal
2123

2224
import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap}
@@ -181,6 +183,9 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
181183
protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit =
182184
updateField(fieldIndex, value)
183185

186+
protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit =
187+
updateField(fieldIndex, new Date(value))
188+
184189
protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit =
185190
updateField(fieldIndex, value)
186191

@@ -368,6 +373,9 @@ private[parquet] class CatalystPrimitiveRowConverter(
368373
override protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit =
369374
current.setInt(fieldIndex, value)
370375

376+
override protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit =
377+
current.update(fieldIndex, new Date(value))
378+
371379
override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit =
372380
current.setLong(fieldIndex, value)
373381

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
190190
}
191191
}
192192

193-
private[parquet] def writePrimitive(schema: PrimitiveType, value: Any): Unit = {
193+
private[parquet] def writePrimitive(schema: DataType, value: Any): Unit = {
194194
if (value != null) {
195195
schema match {
196196
case StringType => writer.addBinary(
@@ -207,6 +207,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
207207
case DoubleType => writer.addDouble(value.asInstanceOf[Double])
208208
case FloatType => writer.addFloat(value.asInstanceOf[Float])
209209
case BooleanType => writer.addBoolean(value.asInstanceOf[Boolean])
210+
case DateType => writer.addInteger(value.asInstanceOf[java.sql.Date].getTime.toInt)
210211
case d: DecimalType =>
211212
if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) {
212213
sys.error(s"Unsupported datatype $d, cannot write to consumer")

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ private[parquet] object ParquetTypesConverter extends Logging {
6464
case ParquetPrimitiveTypeName.BOOLEAN => BooleanType
6565
case ParquetPrimitiveTypeName.DOUBLE => DoubleType
6666
case ParquetPrimitiveTypeName.FLOAT => FloatType
67+
case ParquetPrimitiveTypeName.INT32
68+
if originalType == ParquetOriginalType.DATE => DateType
6769
case ParquetPrimitiveTypeName.INT32 => IntegerType
6870
case ParquetPrimitiveTypeName.INT64 => LongType
6971
case ParquetPrimitiveTypeName.INT96 =>
@@ -209,6 +211,8 @@ private[parquet] object ParquetTypesConverter extends Logging {
209211
// There is no type for Byte or Short so we promote them to INT32.
210212
case ShortType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32))
211213
case ByteType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT32))
214+
case DateType => Some(ParquetTypeInfo(
215+
ParquetPrimitiveTypeName.INT32, Some(ParquetOriginalType.DATE)))
212216
case LongType => Some(ParquetTypeInfo(ParquetPrimitiveTypeName.INT64))
213217
case DecimalType.Fixed(precision, scale) if precision <= 18 =>
214218
// TODO: for now, our writer only supports decimals that fit in a Long

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,15 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
5656
|}
5757
""".stripMargin)
5858

59-
testSchema[(Byte, Short, Int, Long)](
59+
testSchema[(Byte, Short, Int, Long, java.sql.Date)](
6060
"logical integral types",
6161
"""
6262
|message root {
6363
| required int32 _1 (INT_8);
6464
| required int32 _2 (INT_16);
6565
| required int32 _3 (INT_32);
6666
| required int64 _4 (INT_64);
67+
| optional int32 _5 (DATE);
6768
|}
6869
""".stripMargin)
6970

0 commit comments

Comments
 (0)