From 1ed764bad9fa093770ad64a0ea46f51db0a9b208 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 8 Mar 2015 00:21:50 +0800 Subject: [PATCH 1/7] Initial draft for Parquet converters refactoring --- .../expressions/SpecificMutableRow.scala | 1 + .../apache/spark/sql/types/dataTypes.scala | 10 + .../sql/parquet/ParquetTableSupport.scala | 14 +- .../spark/sql/parquet/readSupport.scala | 398 ++++++++++++++++++ .../spark/sql/parquet/ParquetIOSuite.scala | 14 +- 5 files changed, 422 insertions(+), 15 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/parquet/readSupport.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala index 47b6f358ed1b..acc29207f360 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala @@ -202,6 +202,7 @@ final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableR case DoubleType => new MutableDouble case BooleanType => new MutableBoolean case LongType => new MutableLong + case DateType => new MutableInt case _ => new MutableAny }.toArray) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala index cdf2bc68d9c5..c76958b91a15 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala @@ -24,6 +24,7 @@ import scala.math._ import scala.math.Numeric.{FloatAsIfIntegral, DoubleAsIfIntegral} import scala.reflect.ClassTag import scala.reflect.runtime.universe.{TypeTag, runtimeMirror, typeTag} +import scala.util.Try import scala.util.parsing.combinator.RegexParsers import org.json4s._ @@ -39,6 +40,10 @@ import org.apache.spark.util.Utils object DataType { + private[sql] def fromString(raw: String) = { + Try(DataType.fromJson(raw)).getOrElse(DataType.fromCaseClassString(raw)) + } + def fromJson(json: String): DataType = parseDataType(parse(json)) private object JSortedObject { @@ -887,6 +892,11 @@ case class StructField( object StructType { + private[sql] def fromString(raw: String): StructType = DataType.fromString(raw) match { + case t: StructType => t + case _ => throw new RuntimeException(s"Failed parsing StructType: $raw") + } + protected[sql] def fromAttributes(attributes: Seq[Attribute]): StructType = StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index 5a1b15490d27..b2ad5cf8f1d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -33,18 +33,18 @@ import org.apache.spark.sql.types._ /** * A `parquet.io.api.RecordMaterializer` for Rows. - * - *@param root The root group converter for the record. */ -private[parquet] class RowRecordMaterializer(root: CatalystConverter) +private[parquet] class RowRecordMaterializer(parquetSchema: MessageType, attributes: Seq[Attribute]) extends RecordMaterializer[Row] { - def this(parquetSchema: MessageType, attributes: Seq[Attribute]) = - this(CatalystConverter.createRootConverter(parquetSchema, attributes)) + private val root = { + val noopUpdater = new ParentContainerUpdater {} + new StructConverter(parquetSchema, StructType.fromAttributes(attributes), noopUpdater) + } - override def getCurrentRecord: Row = root.getCurrentRecord + override def getCurrentRecord: Row = root.currentRow - override def getRootConverter: GroupConverter = root.asInstanceOf[GroupConverter] + override def getRootConverter: GroupConverter = root } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/readSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/readSupport.scala new file mode 100644 index 000000000000..a451b4062584 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/readSupport.scala @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parquet + +import java.util + +import scala.collection.JavaConversions._ +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import parquet.column.Dictionary +import parquet.hadoop.api.ReadSupport.ReadContext +import parquet.hadoop.api.{InitContext, ReadSupport} +import parquet.io.api._ +import parquet.schema.{PrimitiveType => ParquetPrimitiveType, _} + +import org.apache.spark.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow +import org.apache.spark.sql.types._ + +private[parquet] object SparkRowReadSupport { + val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema" + + val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata" + + val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes" +} + +private[parquet] class SparkRowReadSupport extends ReadSupport[Row] with Logging { + override def init(context: InitContext): ReadContext = { + val conf = context.getConfiguration + val maybeRequestedSchema = Option(conf.get(SparkRowReadSupport.SPARK_ROW_REQUESTED_SCHEMA)) + val maybeRowSchema = Option(conf.get(SparkRowReadSupport.SPARK_ROW_SCHEMA)) + + val fullParquetSchema = context.getFileSchema + val parquetRequestedSchema = + maybeRequestedSchema.map { value => + val requestedFieldNames = StructType.fromString(value).fieldNames.toSet + pruneFields(fullParquetSchema, requestedFieldNames) + }.getOrElse(fullParquetSchema) + + val metadata = + Map.empty[String, String] ++ + maybeRequestedSchema.map(SparkRowReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++ + maybeRowSchema.map(SparkRowReadSupport.SPARK_ROW_SCHEMA -> _) + + new ReadContext(parquetRequestedSchema, metadata) + } + + override def prepareForRead( + configuration: Configuration, + keyValueMetaData: util.Map[String, String], + fileSchema: MessageType, + readContext: ReadContext): RecordMaterializer[Row] = { + + logDebug(s"Preparing for reading with Parquet file schema $fileSchema") + + val parquetRequestedSchema = readContext.getRequestedSchema + val metadata = readContext.getReadSupportMetadata + + val maybeSparkSchema = Option(metadata.get(SparkRowReadSupport.SPARK_METADATA_KEY)) + val maybeSparkRequestedSchema = Option(metadata.get(SparkRowReadSupport.SPARK_ROW_REQUESTED_SCHEMA)) + + val sparkSchema = + maybeSparkRequestedSchema + .orElse(maybeSparkSchema) + .map(ParquetTypesConverter.convertFromString) + .getOrElse( + ParquetTypesConverter.convertToAttributes( + parquetRequestedSchema, + isBinaryAsString = false, + isInt96AsTimestamp = true)) + + new SparkRowRecordMaterializer(parquetRequestedSchema, StructType.fromAttributes(sparkSchema)) + } + + /** + * Removes those Parquet fields that are not requested. + */ + private def pruneFields( + schema: MessageType, + requestedFieldNames: Set[String]): MessageType = { + val requestedFields = schema.getFields.filter(f => requestedFieldNames.contains(f.getName)) + new MessageType("root", requestedFields: _*) + } +} + +private[parquet] class SparkRowRecordMaterializer(parquetSchema: MessageType, sparkSchema: StructType) + extends RecordMaterializer[Row] { + + private val rootConverter = { + val noopUpdater = new ParentContainerUpdater {} + new StructConverter(parquetSchema, sparkSchema, noopUpdater) + } + + override def getCurrentRecord: Row = rootConverter.currentRow + + override def getRootConverter: GroupConverter = rootConverter +} + +private[parquet] trait ParentContainerUpdater { + def set(value: Any): Unit = () + def setBoolean(value: Boolean): Unit = set(value) + def setByte(value: Byte): Unit = set(value) + def setShort(value: Short): Unit = set(value) + def setInt(value: Int): Unit = set(value) + def setLong(value: Long): Unit = set(value) + def setFloat(value: Float): Unit = set(value) + def setDouble(value: Double): Unit = set(value) +} + +private[parquet] class StructConverter( + parquetType: GroupType, + sparkType: StructType, + updater: ParentContainerUpdater) + extends GroupConverter { + + val currentRow = new SpecificMutableRow(sparkType.map(_.dataType)) + + private val fieldConverters: Array[Converter] = { + parquetType.getFields.zip(sparkType).zipWithIndex.map { + case ((parquetFieldType, sparkField), ordinal) => + newConverter(parquetFieldType, sparkField.dataType, new ParentContainerUpdater { + override def set(value: Any): Unit = currentRow(ordinal) = value + override def setBoolean(value: Boolean): Unit = currentRow.setBoolean(ordinal, value) + override def setByte(value: Byte): Unit = currentRow.setByte(ordinal, value) + override def setShort(value: Short): Unit = currentRow.setShort(ordinal, value) + override def setInt(value: Int): Unit = currentRow.setInt(ordinal, value) + override def setLong(value: Long): Unit = currentRow.setLong(ordinal, value) + override def setDouble(value: Double): Unit = currentRow.setDouble(ordinal, value) + override def setFloat(value: Float): Unit = currentRow.setFloat(ordinal, value) + }) + }.toArray + } + + override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex) + + override def end(): Unit = updater.set(currentRow) + + override def start(): Unit = { + var i = 0 + while (i < currentRow.length) { + currentRow.setNullAt(i) + i += 1 + } + } + + private def newConverter( + parquetType: Type, + sparkType: DataType, + updater: ParentContainerUpdater): Converter = { + + sparkType match { + case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType => + new SimplePrimitiveConverter(updater) + + case ByteType => + new PrimitiveConverter { + override def addInt(value: Int): Unit = + updater.setByte(value.asInstanceOf[ByteType#JvmType]) + } + + case ShortType => + new PrimitiveConverter { + override def addInt(value: Int): Unit = + updater.setShort(value.asInstanceOf[ShortType#JvmType]) + } + + case t: DecimalType => + new DecimalConverter(t, updater) + + case StringType => + new StringConverter(updater) + + case TimestampType => + new PrimitiveConverter { + override def addBinary(value: Binary): Unit = { + updater.set(CatalystTimestampConverter.convertToTimestamp(value)) + } + } + + case DateType => + new PrimitiveConverter { + override def addInt(value: Int): Unit = { + updater.setInt(value.asInstanceOf[DateType#JvmType]) + } + } + + case t: ArrayType => + new ArrayConverter(parquetType.asGroupType(), t, updater) + + case t: MapType => + new MapConverter(parquetType.asGroupType(), t, updater) + + case t: StructType => + new StructConverter(parquetType.asGroupType(), t, updater) + + case t: UserDefinedType[_] => + val sparkTypeForUDT = t.sqlType + val parquetTypeForUDT = ParquetTypesConverter.fromDataType(sparkTypeForUDT, "") + newConverter(parquetTypeForUDT, sparkTypeForUDT, updater) + + case _ => + throw new RuntimeException( + s"Unable to create Parquet converter for data type ${sparkType.json}") + } + } + + ///////////////////////////////////////////////////////////////////////////// + // Other converters + ///////////////////////////////////////////////////////////////////////////// + + class SimplePrimitiveConverter(updater: ParentContainerUpdater) extends PrimitiveConverter { + override def addBoolean(value: Boolean): Unit = updater.setBoolean(value) + override def addInt(value: Int): Unit = updater.setInt(value) + override def addLong(value: Long): Unit = updater.setLong(value) + override def addFloat(value: Float): Unit = updater.setFloat(value) + override def addDouble(value: Double): Unit = updater.setDouble(value) + override def addBinary(value: Binary): Unit = updater.set(value.getBytes) + } + + class StringConverter(updater: ParentContainerUpdater) extends PrimitiveConverter { + private var expandedDictionary: Array[String] = null + + override def hasDictionarySupport: Boolean = true + + override def setDictionary(dictionary: Dictionary): Unit = { + this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { + dictionary.decodeToBinary(_).toStringUsingUTF8 + } + } + + override def addValueFromDictionary(dictionaryId: Int): Unit = { + updater.set(expandedDictionary(dictionaryId)) + } + + override def addBinary(value: Binary): Unit = updater.set(value.toStringUsingUTF8) + } + + class DecimalConverter(decimalType: DecimalType, updater: ParentContainerUpdater) + extends PrimitiveConverter { + + override def addBinary(value: Binary): Unit = { + updater.set(toDecimal(value)) + } + + private def toDecimal(value: Binary): Decimal = { + val precision = decimalType.precision + val scale = decimalType.scale + val bytes = value.getBytes + + require(bytes.length <= 16, "Decimal field too large to read") + + var unscaled = 0L + var i = 0 + + while (i < bytes.length) { + unscaled = (unscaled << 8) | (bytes(i) & 0xff) + i += 1 + } + + val bits = 8 * bytes.length + unscaled = (unscaled << (64 - bits)) >> (64 - bits) + Decimal(unscaled, precision, scale) + } + } + + class ArrayConverter( + parquetSchema: GroupType, + sparkSchema: ArrayType, + updater: ParentContainerUpdater) + extends GroupConverter { + + // TODO This is slow! Needs specialization. + private val array = ArrayBuffer.empty[Any] + + private val elementConverter: Converter = { + val repeatedType = parquetSchema.getType(0) + val elementType = sparkSchema.elementType + + if (isElementType(repeatedType, elementType)) { + newConverter(repeatedType, elementType, new ParentContainerUpdater { + override def set(value: Any): Unit = array += value + }) + } else { + new ElementConverter(repeatedType.asGroupType().getType(0), elementType) + } + } + + override def getConverter(fieldIndex: Int): Converter = elementConverter + + override def end(): Unit = updater.set(array) + + override def start(): Unit = array.clear() + + private def isElementType(repeatedType: Type, elementType: DataType): Boolean = { + (repeatedType, elementType) match { + case (t: ParquetPrimitiveType, _) => true + case (t: GroupType, _) if t.getFieldCount > 1 => true + case (t: GroupType, StructType(Array(f))) if f.name == t.getFieldName(0) => true + case _ => false + } + } + + /** Array element converter */ + private class ElementConverter( + parquetType: Type, + sparkType: DataType) + extends GroupConverter { + + private var currentElement: Any = _ + + private val converter = newConverter(parquetType, sparkType, new ParentContainerUpdater { + override def set(value: Any): Unit = currentElement = value + }) + + override def getConverter(fieldIndex: Int): Converter = converter + + override def end(): Unit = array += currentElement + + override def start(): Unit = currentElement = null + } + } + + class MapConverter( + parquetType: GroupType, + sparkType: MapType, + updater: ParentContainerUpdater) + extends GroupConverter { + + private val map = mutable.Map.empty[Any, Any] + + private val keyValueConverter = { + val repeatedType = parquetType.getType(0).asGroupType() + new KeyValueConverter( + repeatedType.getType(0), + repeatedType.getType(1), + sparkType.keyType, + sparkType.valueType) + } + + override def getConverter(fieldIndex: Int): Converter = keyValueConverter + + override def end(): Unit = updater.set(map) + + override def start(): Unit = map.clear() + + private class KeyValueConverter( + parquetKeyType: Type, + parquetValueType: Type, + sparkKeyType: DataType, + sparkValueType: DataType) + extends GroupConverter { + + private var currentKey: Any = _ + + private var currentValue: Any = _ + + private val keyConverter = + newConverter(parquetKeyType, sparkKeyType, new ParentContainerUpdater { + override def set(value: Any): Unit = currentKey = value + }) + + private val valueConverter = + newConverter(parquetValueType, sparkValueType, new ParentContainerUpdater { + override def set(value: Any): Unit = currentValue = value + }) + + override def getConverter(fieldIndex: Int): Converter = { + if (fieldIndex == 0) keyConverter else valueConverter + } + + override def end(): Unit = map(currentKey) = currentValue + + override def start(): Unit = { + currentKey = null + currentValue = null + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala index 203bc79f153d..4b6a80d325b0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala @@ -28,7 +28,7 @@ import parquet.example.data.simple.SimpleGroup import parquet.example.data.{Group, GroupWriter} import parquet.hadoop.api.WriteSupport import parquet.hadoop.api.WriteSupport.WriteContext -import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData, CompressionCodecName} +import parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata} import parquet.hadoop.{Footer, ParquetFileWriter, ParquetWriter} import parquet.io.api.RecordConsumer import parquet.schema.{MessageType, MessageTypeParser} @@ -101,10 +101,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { } test("fixed-length decimals") { - - def makeDecimalRDD(decimal: DecimalType): DataFrame = - sparkContext - .parallelize(0 to 1000) + def makeDecimalDataFrame(decimal: DecimalType): DataFrame = + (0 to 1000) .map(i => Tuple1(i / 100.0)) .toDF() // Parquet doesn't allow column names with spaces, have to add an alias here @@ -112,7 +110,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17))) { withTempPath { dir => - val data = makeDecimalRDD(DecimalType(precision, scale)) + val data = makeDecimalDataFrame(DecimalType(precision, scale)) data.saveAsParquetFile(dir.getCanonicalPath) checkAnswer(parquetFile(dir.getCanonicalPath), data.collect().toSeq) } @@ -121,7 +119,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { // Decimals with precision above 18 are not yet supported intercept[RuntimeException] { withTempPath { dir => - makeDecimalRDD(DecimalType(19, 10)).saveAsParquetFile(dir.getCanonicalPath) + makeDecimalDataFrame(DecimalType(19, 10)).saveAsParquetFile(dir.getCanonicalPath) parquetFile(dir.getCanonicalPath).collect() } } @@ -129,7 +127,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { // Unlimited-length decimals are not yet supported intercept[RuntimeException] { withTempPath { dir => - makeDecimalRDD(DecimalType.Unlimited).saveAsParquetFile(dir.getCanonicalPath) + makeDecimalDataFrame(DecimalType.Unlimited).saveAsParquetFile(dir.getCanonicalPath) parquetFile(dir.getCanonicalPath).collect() } } From 43e2252966a74823fa514d93904c7daa92ed3aef Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 2 Apr 2015 00:12:22 +0800 Subject: [PATCH 2/7] Removes old converters --- .../spark/sql/parquet/ParquetConverter.scala | 812 +----------------- 1 file changed, 3 insertions(+), 809 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala index 43ca359b5173..66b3954d6cb9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala @@ -18,44 +18,14 @@ package org.apache.spark.sql.parquet import java.sql.Timestamp -import java.util.{TimeZone, Calendar} - -import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap} +import java.util.{Calendar, TimeZone} import jodd.datetime.JDateTime -import parquet.column.Dictionary -import parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Converter} -import parquet.schema.MessageType +import parquet.io.api.Binary import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.parquet.CatalystConverter.FieldType -import org.apache.spark.sql.types._ import org.apache.spark.sql.parquet.timestamp.NanoTime - -/** - * Collection of converters of Parquet types (group and primitive types) that - * model arrays and maps. The conversions are partly based on the AvroParquet - * converters that are part of Parquet in order to be able to process these - * types. - * - * There are several types of converters: - *
    - *
  • [[org.apache.spark.sql.parquet.CatalystPrimitiveConverter]] for primitive - * (numeric, boolean and String) types
  • - *
  • [[org.apache.spark.sql.parquet.CatalystNativeArrayConverter]] for arrays - * of native JVM element types; note: currently null values are not supported!
  • - *
  • [[org.apache.spark.sql.parquet.CatalystArrayConverter]] for arrays of - * arbitrary element types (including nested element types); note: currently - * null values are not supported!
  • - *
  • [[org.apache.spark.sql.parquet.CatalystStructConverter]] for structs
  • - *
  • [[org.apache.spark.sql.parquet.CatalystMapConverter]] for maps; note: - * currently null values are not supported!
  • - *
  • [[org.apache.spark.sql.parquet.CatalystPrimitiveRowConverter]] for rows - * of only primitive element types
  • - *
  • [[org.apache.spark.sql.parquet.CatalystGroupConverter]] for other nested - * records, including the top-level row record
  • - *
- */ +import org.apache.spark.sql.types._ private[sql] object CatalystConverter { // The type internally used for fields @@ -79,419 +49,6 @@ private[sql] object CatalystConverter { type ArrayScalaType[T] = Seq[T] type StructScalaType[T] = Row type MapScalaType[K, V] = Map[K, V] - - protected[parquet] def createConverter( - field: FieldType, - fieldIndex: Int, - parent: CatalystConverter): Converter = { - val fieldType: DataType = field.dataType - fieldType match { - case udt: UserDefinedType[_] => { - createConverter(field.copy(dataType = udt.sqlType), fieldIndex, parent) - } - // For native JVM types we use a converter with native arrays - case ArrayType(elementType: NativeType, false) => { - new CatalystNativeArrayConverter(elementType, fieldIndex, parent) - } - // This is for other types of arrays, including those with nested fields - case ArrayType(elementType: DataType, false) => { - new CatalystArrayConverter(elementType, fieldIndex, parent) - } - case ArrayType(elementType: DataType, true) => { - new CatalystArrayContainsNullConverter(elementType, fieldIndex, parent) - } - case StructType(fields: Array[StructField]) => { - new CatalystStructConverter(fields, fieldIndex, parent) - } - case MapType(keyType: DataType, valueType: DataType, valueContainsNull: Boolean) => { - new CatalystMapConverter( - Array( - new FieldType(MAP_KEY_SCHEMA_NAME, keyType, false), - new FieldType(MAP_VALUE_SCHEMA_NAME, valueType, valueContainsNull)), - fieldIndex, - parent) - } - // Strings, Shorts and Bytes do not have a corresponding type in Parquet - // so we need to treat them separately - case StringType => - new CatalystPrimitiveStringConverter(parent, fieldIndex) - case ShortType => { - new CatalystPrimitiveConverter(parent, fieldIndex) { - override def addInt(value: Int): Unit = - parent.updateShort(fieldIndex, value.asInstanceOf[ShortType.JvmType]) - } - } - case ByteType => { - new CatalystPrimitiveConverter(parent, fieldIndex) { - override def addInt(value: Int): Unit = - parent.updateByte(fieldIndex, value.asInstanceOf[ByteType.JvmType]) - } - } - case DateType => { - new CatalystPrimitiveConverter(parent, fieldIndex) { - override def addInt(value: Int): Unit = - parent.updateDate(fieldIndex, value.asInstanceOf[DateType.JvmType]) - } - } - case d: DecimalType => { - new CatalystPrimitiveConverter(parent, fieldIndex) { - override def addBinary(value: Binary): Unit = - parent.updateDecimal(fieldIndex, value, d) - } - } - case TimestampType => { - new CatalystPrimitiveConverter(parent, fieldIndex) { - override def addBinary(value: Binary): Unit = - parent.updateTimestamp(fieldIndex, value) - } - } - // All other primitive types use the default converter - case ctype: PrimitiveType => { // note: need the type tag here! - new CatalystPrimitiveConverter(parent, fieldIndex) - } - case _ => throw new RuntimeException( - s"unable to convert datatype ${field.dataType.toString} in CatalystConverter") - } - } - - protected[parquet] def createRootConverter( - parquetSchema: MessageType, - attributes: Seq[Attribute]): CatalystConverter = { - // For non-nested types we use the optimized Row converter - if (attributes.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))) { - new CatalystPrimitiveRowConverter(attributes.toArray) - } else { - new CatalystGroupConverter(attributes.toArray) - } - } -} - -private[parquet] abstract class CatalystConverter extends GroupConverter { - /** - * The number of fields this group has - */ - protected[parquet] val size: Int - - /** - * The index of this converter in the parent - */ - protected[parquet] val index: Int - - /** - * The parent converter - */ - protected[parquet] val parent: CatalystConverter - - /** - * Called by child converters to update their value in its parent (this). - * Note that if possible the more specific update methods below should be used - * to avoid auto-boxing of native JVM types. - * - * @param fieldIndex - * @param value - */ - protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit - - protected[parquet] def updateBoolean(fieldIndex: Int, value: Boolean): Unit = - updateField(fieldIndex, value) - - protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit = - updateField(fieldIndex, value) - - protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit = - updateField(fieldIndex, value) - - protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit = - updateField(fieldIndex, value) - - protected[parquet] def updateShort(fieldIndex: Int, value: Short): Unit = - updateField(fieldIndex, value) - - protected[parquet] def updateByte(fieldIndex: Int, value: Byte): Unit = - updateField(fieldIndex, value) - - protected[parquet] def updateDouble(fieldIndex: Int, value: Double): Unit = - updateField(fieldIndex, value) - - protected[parquet] def updateFloat(fieldIndex: Int, value: Float): Unit = - updateField(fieldIndex, value) - - protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit = - updateField(fieldIndex, value.getBytes) - - protected[parquet] def updateString(fieldIndex: Int, value: String): Unit = - updateField(fieldIndex, value) - - protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit = - updateField(fieldIndex, readTimestamp(value)) - - protected[parquet] def updateDecimal(fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = - updateField(fieldIndex, readDecimal(new Decimal(), value, ctype)) - - protected[parquet] def isRootConverter: Boolean = parent == null - - protected[parquet] def clearBuffer(): Unit - - /** - * Should only be called in the root (group) converter! - * - * @return - */ - def getCurrentRecord: Row = throw new UnsupportedOperationException - - /** - * Read a decimal value from a Parquet Binary into "dest". Only supports decimals that fit in - * a long (i.e. precision <= 18) - */ - protected[parquet] def readDecimal(dest: Decimal, value: Binary, ctype: DecimalType): Unit = { - val precision = ctype.precisionInfo.get.precision - val scale = ctype.precisionInfo.get.scale - val bytes = value.getBytes - require(bytes.length <= 16, "Decimal field too large to read") - var unscaled = 0L - var i = 0 - while (i < bytes.length) { - unscaled = (unscaled << 8) | (bytes(i) & 0xFF) - i += 1 - } - // Make sure unscaled has the right sign, by sign-extending the first bit - val numBits = 8 * bytes.length - unscaled = (unscaled << (64 - numBits)) >> (64 - numBits) - dest.set(unscaled, precision, scale) - } - - /** - * Read a Timestamp value from a Parquet Int96Value - */ - protected[parquet] def readTimestamp(value: Binary): Timestamp = { - CatalystTimestampConverter.convertToTimestamp(value) - } -} - -/** - * A `parquet.io.api.GroupConverter` that is able to convert a Parquet record - * to a [[org.apache.spark.sql.catalyst.expressions.Row]] object. - * - * @param schema The corresponding Catalyst schema in the form of a list of attributes. - */ -private[parquet] class CatalystGroupConverter( - protected[parquet] val schema: Array[FieldType], - protected[parquet] val index: Int, - protected[parquet] val parent: CatalystConverter, - protected[parquet] var current: ArrayBuffer[Any], - protected[parquet] var buffer: ArrayBuffer[Row]) - extends CatalystConverter { - - def this(schema: Array[FieldType], index: Int, parent: CatalystConverter) = - this( - schema, - index, - parent, - current = null, - buffer = new ArrayBuffer[Row]( - CatalystArrayConverter.INITIAL_ARRAY_SIZE)) - - /** - * This constructor is used for the root converter only! - */ - def this(attributes: Array[Attribute]) = - this(attributes.map(a => new FieldType(a.name, a.dataType, a.nullable)), 0, null) - - protected [parquet] val converters: Array[Converter] = - schema.zipWithIndex.map { - case (field, idx) => CatalystConverter.createConverter(field, idx, this) - }.toArray - - override val size = schema.size - - override def getCurrentRecord: Row = { - assert(isRootConverter, "getCurrentRecord should only be called in root group converter!") - // TODO: use iterators if possible - // Note: this will ever only be called in the root converter when the record has been - // fully processed. Therefore it will be difficult to use mutable rows instead, since - // any non-root converter never would be sure when it would be safe to re-use the buffer. - new GenericRow(current.toArray) - } - - override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex) - - // for child converters to update upstream values - override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = { - current.update(fieldIndex, value) - } - - override protected[parquet] def clearBuffer(): Unit = buffer.clear() - - override def start(): Unit = { - current = ArrayBuffer.fill(size)(null) - converters.foreach { - converter => if (!converter.isPrimitive) { - converter.asInstanceOf[CatalystConverter].clearBuffer - } - } - } - - override def end(): Unit = { - if (!isRootConverter) { - assert(current != null) // there should be no empty groups - buffer.append(new GenericRow(current.toArray)) - parent.updateField(index, new GenericRow(buffer.toArray.asInstanceOf[Array[Any]])) - } - } -} - -/** - * A `parquet.io.api.GroupConverter` that is able to convert a Parquet record - * to a [[org.apache.spark.sql.catalyst.expressions.Row]] object. Note that his - * converter is optimized for rows of primitive types (non-nested records). - */ -private[parquet] class CatalystPrimitiveRowConverter( - protected[parquet] val schema: Array[FieldType], - protected[parquet] var current: MutableRow) - extends CatalystConverter { - - // This constructor is used for the root converter only - def this(attributes: Array[Attribute]) = - this( - attributes.map(a => new FieldType(a.name, a.dataType, a.nullable)), - new SpecificMutableRow(attributes.map(_.dataType))) - - protected [parquet] val converters: Array[Converter] = - schema.zipWithIndex.map { - case (field, idx) => CatalystConverter.createConverter(field, idx, this) - }.toArray - - override val size = schema.size - - override val index = 0 - - override val parent = null - - // Should be only called in root group converter! - override def getCurrentRecord: Row = current - - override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex) - - // for child converters to update upstream values - override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = { - throw new UnsupportedOperationException // child converters should use the - // specific update methods below - } - - override protected[parquet] def clearBuffer(): Unit = {} - - override def start(): Unit = { - var i = 0 - while (i < size) { - current.setNullAt(i) - i = i + 1 - } - } - - override def end(): Unit = {} - - // Overridden here to avoid auto-boxing for primitive types - override protected[parquet] def updateBoolean(fieldIndex: Int, value: Boolean): Unit = - current.setBoolean(fieldIndex, value) - - override protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit = - current.setInt(fieldIndex, value) - - override protected[parquet] def updateDate(fieldIndex: Int, value: Int): Unit = - current.update(fieldIndex, value) - - override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit = - current.setLong(fieldIndex, value) - - override protected[parquet] def updateShort(fieldIndex: Int, value: Short): Unit = - current.setShort(fieldIndex, value) - - override protected[parquet] def updateByte(fieldIndex: Int, value: Byte): Unit = - current.setByte(fieldIndex, value) - - override protected[parquet] def updateDouble(fieldIndex: Int, value: Double): Unit = - current.setDouble(fieldIndex, value) - - override protected[parquet] def updateFloat(fieldIndex: Int, value: Float): Unit = - current.setFloat(fieldIndex, value) - - override protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit = - current.update(fieldIndex, value.getBytes) - - override protected[parquet] def updateString(fieldIndex: Int, value: String): Unit = - current.setString(fieldIndex, value) - - override protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit = - current.update(fieldIndex, readTimestamp(value)) - - override protected[parquet] def updateDecimal( - fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = { - var decimal = current(fieldIndex).asInstanceOf[Decimal] - if (decimal == null) { - decimal = new Decimal - current(fieldIndex) = decimal - } - readDecimal(decimal, value, ctype) - } -} - -/** - * A `parquet.io.api.PrimitiveConverter` that converts Parquet types to Catalyst types. - * - * @param parent The parent group converter. - * @param fieldIndex The index inside the record. - */ -private[parquet] class CatalystPrimitiveConverter( - parent: CatalystConverter, - fieldIndex: Int) extends PrimitiveConverter { - override def addBinary(value: Binary): Unit = - parent.updateBinary(fieldIndex, value) - - override def addBoolean(value: Boolean): Unit = - parent.updateBoolean(fieldIndex, value) - - override def addDouble(value: Double): Unit = - parent.updateDouble(fieldIndex, value) - - override def addFloat(value: Float): Unit = - parent.updateFloat(fieldIndex, value) - - override def addInt(value: Int): Unit = - parent.updateInt(fieldIndex, value) - - override def addLong(value: Long): Unit = - parent.updateLong(fieldIndex, value) -} - -/** - * A `parquet.io.api.PrimitiveConverter` that converts Parquet Binary to Catalyst String. - * Supports dictionaries to reduce Binary to String conversion overhead. - * - * Follows pattern in Parquet of using dictionaries, where supported, for String conversion. - * - * @param parent The parent group converter. - * @param fieldIndex The index inside the record. - */ -private[parquet] class CatalystPrimitiveStringConverter(parent: CatalystConverter, fieldIndex: Int) - extends CatalystPrimitiveConverter(parent, fieldIndex) { - - private[this] var dict: Array[String] = null - - override def hasDictionarySupport: Boolean = true - - override def setDictionary(dictionary: Dictionary):Unit = - dict = Array.tabulate(dictionary.getMaxId + 1) {dictionary.decodeToBinary(_).toStringUsingUTF8} - - - override def addValueFromDictionary(dictionaryId: Int): Unit = - parent.updateString(fieldIndex, dict(dictionaryId)) - - override def addBinary(value: Binary): Unit = - parent.updateString(fieldIndex, value.toStringUsingUTF8) -} - -private[parquet] object CatalystArrayConverter { - val INITIAL_ARRAY_SIZE = 20 } private[parquet] object CatalystTimestampConverter { @@ -560,366 +117,3 @@ private[parquet] object CatalystTimestampConverter { NanoTime(julianDay, nanosOfDay).toBinary } } - -/** - * A `parquet.io.api.GroupConverter` that converts a single-element groups that - * match the characteristics of an array (see - * [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an - * [[org.apache.spark.sql.types.ArrayType]]. - * - * @param elementType The type of the array elements (complex or primitive) - * @param index The position of this (array) field inside its parent converter - * @param parent The parent converter - * @param buffer A data buffer - */ -private[parquet] class CatalystArrayConverter( - val elementType: DataType, - val index: Int, - protected[parquet] val parent: CatalystConverter, - protected[parquet] var buffer: Buffer[Any]) - extends CatalystConverter { - - def this(elementType: DataType, index: Int, parent: CatalystConverter) = - this( - elementType, - index, - parent, - new ArrayBuffer[Any](CatalystArrayConverter.INITIAL_ARRAY_SIZE)) - - protected[parquet] val converter: Converter = CatalystConverter.createConverter( - new CatalystConverter.FieldType( - CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, - elementType, - false), - fieldIndex=0, - parent=this) - - override def getConverter(fieldIndex: Int): Converter = converter - - // arrays have only one (repeated) field, which is its elements - override val size = 1 - - override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = { - // fieldIndex is ignored (assumed to be zero but not checked) - if(value == null) { - throw new IllegalArgumentException("Null values inside Parquet arrays are not supported!") - } - buffer += value - } - - override protected[parquet] def clearBuffer(): Unit = { - buffer.clear() - } - - override def start(): Unit = { - if (!converter.isPrimitive) { - converter.asInstanceOf[CatalystConverter].clearBuffer - } - } - - override def end(): Unit = { - assert(parent != null) - // here we need to make sure to use ArrayScalaType - parent.updateField(index, buffer.toArray.toSeq) - clearBuffer() - } -} - -/** - * A `parquet.io.api.GroupConverter` that converts a single-element groups that - * match the characteristics of an array (see - * [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an - * [[org.apache.spark.sql.types.ArrayType]]. - * - * @param elementType The type of the array elements (native) - * @param index The position of this (array) field inside its parent converter - * @param parent The parent converter - * @param capacity The (initial) capacity of the buffer - */ -private[parquet] class CatalystNativeArrayConverter( - val elementType: NativeType, - val index: Int, - protected[parquet] val parent: CatalystConverter, - protected[parquet] var capacity: Int = CatalystArrayConverter.INITIAL_ARRAY_SIZE) - extends CatalystConverter { - - type NativeType = elementType.JvmType - - private var buffer: Array[NativeType] = elementType.classTag.newArray(capacity) - - private var elements: Int = 0 - - protected[parquet] val converter: Converter = CatalystConverter.createConverter( - new CatalystConverter.FieldType( - CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, - elementType, - false), - fieldIndex=0, - parent=this) - - override def getConverter(fieldIndex: Int): Converter = converter - - // arrays have only one (repeated) field, which is its elements - override val size = 1 - - override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = - throw new UnsupportedOperationException - - // Overridden here to avoid auto-boxing for primitive types - override protected[parquet] def updateBoolean(fieldIndex: Int, value: Boolean): Unit = { - checkGrowBuffer() - buffer(elements) = value.asInstanceOf[NativeType] - elements += 1 - } - - override protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit = { - checkGrowBuffer() - buffer(elements) = value.asInstanceOf[NativeType] - elements += 1 - } - - override protected[parquet] def updateShort(fieldIndex: Int, value: Short): Unit = { - checkGrowBuffer() - buffer(elements) = value.asInstanceOf[NativeType] - elements += 1 - } - - override protected[parquet] def updateByte(fieldIndex: Int, value: Byte): Unit = { - checkGrowBuffer() - buffer(elements) = value.asInstanceOf[NativeType] - elements += 1 - } - - override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit = { - checkGrowBuffer() - buffer(elements) = value.asInstanceOf[NativeType] - elements += 1 - } - - override protected[parquet] def updateDouble(fieldIndex: Int, value: Double): Unit = { - checkGrowBuffer() - buffer(elements) = value.asInstanceOf[NativeType] - elements += 1 - } - - override protected[parquet] def updateFloat(fieldIndex: Int, value: Float): Unit = { - checkGrowBuffer() - buffer(elements) = value.asInstanceOf[NativeType] - elements += 1 - } - - override protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit = { - checkGrowBuffer() - buffer(elements) = value.getBytes.asInstanceOf[NativeType] - elements += 1 - } - - override protected[parquet] def updateString(fieldIndex: Int, value: String): Unit = { - checkGrowBuffer() - buffer(elements) = value.asInstanceOf[NativeType] - elements += 1 - } - - override protected[parquet] def clearBuffer(): Unit = { - elements = 0 - } - - override def start(): Unit = {} - - override def end(): Unit = { - assert(parent != null) - // here we need to make sure to use ArrayScalaType - parent.updateField( - index, - buffer.slice(0, elements).toSeq) - clearBuffer() - } - - private def checkGrowBuffer(): Unit = { - if (elements >= capacity) { - val newCapacity = 2 * capacity - val tmp: Array[NativeType] = elementType.classTag.newArray(newCapacity) - Array.copy(buffer, 0, tmp, 0, capacity) - buffer = tmp - capacity = newCapacity - } - } -} - -/** - * A `parquet.io.api.GroupConverter` that converts a single-element groups that - * match the characteristics of an array contains null (see - * [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an - * [[org.apache.spark.sql.types.ArrayType]]. - * - * @param elementType The type of the array elements (complex or primitive) - * @param index The position of this (array) field inside its parent converter - * @param parent The parent converter - * @param buffer A data buffer - */ -private[parquet] class CatalystArrayContainsNullConverter( - val elementType: DataType, - val index: Int, - protected[parquet] val parent: CatalystConverter, - protected[parquet] var buffer: Buffer[Any]) - extends CatalystConverter { - - def this(elementType: DataType, index: Int, parent: CatalystConverter) = - this( - elementType, - index, - parent, - new ArrayBuffer[Any](CatalystArrayConverter.INITIAL_ARRAY_SIZE)) - - protected[parquet] val converter: Converter = new CatalystConverter { - - private var current: Any = null - - val converter = CatalystConverter.createConverter( - new CatalystConverter.FieldType( - CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, - elementType, - false), - fieldIndex = 0, - parent = this) - - override def getConverter(fieldIndex: Int): Converter = converter - - override def end(): Unit = parent.updateField(index, current) - - override def start(): Unit = { - current = null - } - - override protected[parquet] val size: Int = 1 - override protected[parquet] val index: Int = 0 - override protected[parquet] val parent = CatalystArrayContainsNullConverter.this - - override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = { - current = value - } - - override protected[parquet] def clearBuffer(): Unit = {} - } - - override def getConverter(fieldIndex: Int): Converter = converter - - // arrays have only one (repeated) field, which is its elements - override val size = 1 - - override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = { - buffer += value - } - - override protected[parquet] def clearBuffer(): Unit = { - buffer.clear() - } - - override def start(): Unit = {} - - override def end(): Unit = { - assert(parent != null) - // here we need to make sure to use ArrayScalaType - parent.updateField(index, buffer.toArray.toSeq) - clearBuffer() - } -} - -/** - * This converter is for multi-element groups of primitive or complex types - * that have repetition level optional or required (so struct fields). - * - * @param schema The corresponding Catalyst schema in the form of a list of - * attributes. - * @param index - * @param parent - */ -private[parquet] class CatalystStructConverter( - override protected[parquet] val schema: Array[FieldType], - override protected[parquet] val index: Int, - override protected[parquet] val parent: CatalystConverter) - extends CatalystGroupConverter(schema, index, parent) { - - override protected[parquet] def clearBuffer(): Unit = {} - - // TODO: think about reusing the buffer - override def end(): Unit = { - assert(!isRootConverter) - // here we need to make sure to use StructScalaType - // Note: we need to actually make a copy of the array since we - // may be in a nested field - parent.updateField(index, new GenericRow(current.toArray)) - } -} - -/** - * A `parquet.io.api.GroupConverter` that converts two-element groups that - * match the characteristics of a map (see - * [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an - * [[org.apache.spark.sql.types.MapType]]. - * - * @param schema - * @param index - * @param parent - */ -private[parquet] class CatalystMapConverter( - protected[parquet] val schema: Array[FieldType], - override protected[parquet] val index: Int, - override protected[parquet] val parent: CatalystConverter) - extends CatalystConverter { - - private val map = new HashMap[Any, Any]() - - private val keyValueConverter = new CatalystConverter { - private var currentKey: Any = null - private var currentValue: Any = null - val keyConverter = CatalystConverter.createConverter(schema(0), 0, this) - val valueConverter = CatalystConverter.createConverter(schema(1), 1, this) - - override def getConverter(fieldIndex: Int): Converter = { - if (fieldIndex == 0) keyConverter else valueConverter - } - - override def end(): Unit = CatalystMapConverter.this.map += currentKey -> currentValue - - override def start(): Unit = { - currentKey = null - currentValue = null - } - - override protected[parquet] val size: Int = 2 - override protected[parquet] val index: Int = 0 - override protected[parquet] val parent: CatalystConverter = CatalystMapConverter.this - - override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = { - fieldIndex match { - case 0 => - currentKey = value - case 1 => - currentValue = value - case _ => - new RuntimePermission(s"trying to update Map with fieldIndex $fieldIndex") - } - } - - override protected[parquet] def clearBuffer(): Unit = {} - } - - override protected[parquet] val size: Int = 1 - - override protected[parquet] def clearBuffer(): Unit = {} - - override def start(): Unit = { - map.clear() - } - - override def end(): Unit = { - // here we need to make sure to use MapScalaType - parent.updateField(index, map.toMap) - } - - override def getConverter(fieldIndex: Int): Converter = keyValueConverter - - override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = - throw new UnsupportedOperationException -} From 184eb278456ce24436b8a5b10020ff3016af14e6 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 2 Apr 2015 00:36:45 +0800 Subject: [PATCH 3/7] Minor cleanups --- .../sql/parquet/ParquetTableSupport.scala | 10 +- .../spark/sql/parquet/readSupport.scala | 99 ++++++++++--------- 2 files changed, 57 insertions(+), 52 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index b2ad5cf8f1d0..63ec7d97fb45 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -37,14 +37,12 @@ import org.apache.spark.sql.types._ private[parquet] class RowRecordMaterializer(parquetSchema: MessageType, attributes: Seq[Attribute]) extends RecordMaterializer[Row] { - private val root = { - val noopUpdater = new ParentContainerUpdater {} - new StructConverter(parquetSchema, StructType.fromAttributes(attributes), noopUpdater) - } + private val rootConverter = + new CatalystStructConverter(parquetSchema, StructType.fromAttributes(attributes), NoopUpdater) - override def getCurrentRecord: Row = root.currentRow + override def getCurrentRecord: Row = rootConverter.currentRow - override def getRootConverter: GroupConverter = root + override def getRootConverter: GroupConverter = rootConverter } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/readSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/readSupport.scala index a451b4062584..c5cfd7dbfe6c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/readSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/readSupport.scala @@ -32,7 +32,7 @@ import parquet.schema.{PrimitiveType => ParquetPrimitiveType, _} import org.apache.spark.Logging import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow +import org.apache.spark.sql.catalyst.expressions.{MutableRow, SpecificMutableRow} import org.apache.spark.sql.types._ private[parquet] object SparkRowReadSupport { @@ -102,13 +102,12 @@ private[parquet] class SparkRowReadSupport extends ReadSupport[Row] with Logging } } -private[parquet] class SparkRowRecordMaterializer(parquetSchema: MessageType, sparkSchema: StructType) +private[parquet] class SparkRowRecordMaterializer( + parquetSchema: MessageType, + sparkSchema: StructType) extends RecordMaterializer[Row] { - private val rootConverter = { - val noopUpdater = new ParentContainerUpdater {} - new StructConverter(parquetSchema, sparkSchema, noopUpdater) - } + private val rootConverter = new CatalystStructConverter(parquetSchema, sparkSchema, NoopUpdater) override def getCurrentRecord: Row = rootConverter.currentRow @@ -126,7 +125,20 @@ private[parquet] trait ParentContainerUpdater { def setDouble(value: Double): Unit = set(value) } -private[parquet] class StructConverter( +private[parquet] object NoopUpdater extends ParentContainerUpdater + +private[parquet] class RowUpdater(row: MutableRow, ordinal: Int) extends ParentContainerUpdater { + override def set(value: Any): Unit = row(ordinal) = value + override def setBoolean(value: Boolean): Unit = row.setBoolean(ordinal, value) + override def setByte(value: Byte): Unit = row.setByte(ordinal, value) + override def setShort(value: Short): Unit = row.setShort(ordinal, value) + override def setInt(value: Int): Unit = row.setInt(ordinal, value) + override def setLong(value: Long): Unit = row.setLong(ordinal, value) + override def setDouble(value: Double): Unit = row.setDouble(ordinal, value) + override def setFloat(value: Float): Unit = row.setFloat(ordinal, value) +} + +private[parquet] class CatalystStructConverter( parquetType: GroupType, sparkType: StructType, updater: ParentContainerUpdater) @@ -137,16 +149,7 @@ private[parquet] class StructConverter( private val fieldConverters: Array[Converter] = { parquetType.getFields.zip(sparkType).zipWithIndex.map { case ((parquetFieldType, sparkField), ordinal) => - newConverter(parquetFieldType, sparkField.dataType, new ParentContainerUpdater { - override def set(value: Any): Unit = currentRow(ordinal) = value - override def setBoolean(value: Boolean): Unit = currentRow.setBoolean(ordinal, value) - override def setByte(value: Byte): Unit = currentRow.setByte(ordinal, value) - override def setShort(value: Short): Unit = currentRow.setShort(ordinal, value) - override def setInt(value: Int): Unit = currentRow.setInt(ordinal, value) - override def setLong(value: Long): Unit = currentRow.setLong(ordinal, value) - override def setDouble(value: Double): Unit = currentRow.setDouble(ordinal, value) - override def setFloat(value: Float): Unit = currentRow.setFloat(ordinal, value) - }) + newConverter(parquetFieldType, sparkField.dataType, new RowUpdater(currentRow, ordinal)) }.toArray } @@ -169,7 +172,7 @@ private[parquet] class StructConverter( sparkType match { case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType => - new SimplePrimitiveConverter(updater) + new CatalystPrimitiveConverter(updater) case ByteType => new PrimitiveConverter { @@ -184,10 +187,10 @@ private[parquet] class StructConverter( } case t: DecimalType => - new DecimalConverter(t, updater) + new CatalystDecimalConverter(t, updater) case StringType => - new StringConverter(updater) + new CatalystStringConverter(updater) case TimestampType => new PrimitiveConverter { @@ -204,13 +207,13 @@ private[parquet] class StructConverter( } case t: ArrayType => - new ArrayConverter(parquetType.asGroupType(), t, updater) + new CatalystArrayConverter(parquetType.asGroupType(), t, updater) case t: MapType => - new MapConverter(parquetType.asGroupType(), t, updater) + new CatalystMapConverter(parquetType.asGroupType(), t, updater) case t: StructType => - new StructConverter(parquetType.asGroupType(), t, updater) + new CatalystStructConverter(parquetType.asGroupType(), t, updater) case t: UserDefinedType[_] => val sparkTypeForUDT = t.sqlType @@ -227,7 +230,9 @@ private[parquet] class StructConverter( // Other converters ///////////////////////////////////////////////////////////////////////////// - class SimplePrimitiveConverter(updater: ParentContainerUpdater) extends PrimitiveConverter { + private final class CatalystPrimitiveConverter(updater: ParentContainerUpdater) + extends PrimitiveConverter { + override def addBoolean(value: Boolean): Unit = updater.setBoolean(value) override def addInt(value: Int): Unit = updater.setInt(value) override def addLong(value: Long): Unit = updater.setLong(value) @@ -236,7 +241,9 @@ private[parquet] class StructConverter( override def addBinary(value: Binary): Unit = updater.set(value.getBytes) } - class StringConverter(updater: ParentContainerUpdater) extends PrimitiveConverter { + private final class CatalystStringConverter(updater: ParentContainerUpdater) + extends PrimitiveConverter { + private var expandedDictionary: Array[String] = null override def hasDictionarySupport: Boolean = true @@ -254,7 +261,10 @@ private[parquet] class StructConverter( override def addBinary(value: Binary): Unit = updater.set(value.toStringUsingUTF8) } - class DecimalConverter(decimalType: DecimalType, updater: ParentContainerUpdater) + // TODO Handle decimals stored as INT32 and INT64 + private final class CatalystDecimalConverter( + decimalType: DecimalType, + updater: ParentContainerUpdater) extends PrimitiveConverter { override def addBinary(value: Binary): Unit = { @@ -282,14 +292,14 @@ private[parquet] class StructConverter( } } - class ArrayConverter( + private final class CatalystArrayConverter( parquetSchema: GroupType, sparkSchema: ArrayType, updater: ParentContainerUpdater) extends GroupConverter { // TODO This is slow! Needs specialization. - private val array = ArrayBuffer.empty[Any] + private val currentArray = ArrayBuffer.empty[Any] private val elementConverter: Converter = { val repeatedType = parquetSchema.getType(0) @@ -297,7 +307,7 @@ private[parquet] class StructConverter( if (isElementType(repeatedType, elementType)) { newConverter(repeatedType, elementType, new ParentContainerUpdater { - override def set(value: Any): Unit = array += value + override def set(value: Any): Unit = currentArray += value }) } else { new ElementConverter(repeatedType.asGroupType().getType(0), elementType) @@ -306,9 +316,9 @@ private[parquet] class StructConverter( override def getConverter(fieldIndex: Int): Converter = elementConverter - override def end(): Unit = updater.set(array) + override def end(): Unit = updater.set(currentArray) - override def start(): Unit = array.clear() + override def start(): Unit = currentArray.clear() private def isElementType(repeatedType: Type, elementType: DataType): Boolean = { (repeatedType, elementType) match { @@ -320,7 +330,7 @@ private[parquet] class StructConverter( } /** Array element converter */ - private class ElementConverter( + private final class ElementConverter( parquetType: Type, sparkType: DataType) extends GroupConverter { @@ -333,19 +343,19 @@ private[parquet] class StructConverter( override def getConverter(fieldIndex: Int): Converter = converter - override def end(): Unit = array += currentElement + override def end(): Unit = currentArray += currentElement override def start(): Unit = currentElement = null } } - class MapConverter( + private final class CatalystMapConverter( parquetType: GroupType, sparkType: MapType, updater: ParentContainerUpdater) extends GroupConverter { - private val map = mutable.Map.empty[Any, Any] + private val currentMap = mutable.Map.empty[Any, Any] private val keyValueConverter = { val repeatedType = parquetType.getType(0).asGroupType() @@ -358,11 +368,11 @@ private[parquet] class StructConverter( override def getConverter(fieldIndex: Int): Converter = keyValueConverter - override def end(): Unit = updater.set(map) + override def end(): Unit = updater.set(currentMap) - override def start(): Unit = map.clear() + override def start(): Unit = currentMap.clear() - private class KeyValueConverter( + private final class KeyValueConverter( parquetKeyType: Type, parquetValueType: Type, sparkKeyType: DataType, @@ -373,21 +383,18 @@ private[parquet] class StructConverter( private var currentValue: Any = _ - private val keyConverter = + private val converters = Array( newConverter(parquetKeyType, sparkKeyType, new ParentContainerUpdater { override def set(value: Any): Unit = currentKey = value - }) + }), - private val valueConverter = newConverter(parquetValueType, sparkValueType, new ParentContainerUpdater { override def set(value: Any): Unit = currentValue = value - }) + })) - override def getConverter(fieldIndex: Int): Converter = { - if (fieldIndex == 0) keyConverter else valueConverter - } + override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex) - override def end(): Unit = map(currentKey) = currentValue + override def end(): Unit = currentMap(currentKey) = currentValue override def start(): Unit = { currentKey = null From 058ad8410485b1365eac077296d10127d2affaf1 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 2 Apr 2015 18:28:56 +0800 Subject: [PATCH 4/7] More cleanups, and comments --- .../sql/parquet/ParquetTableSupport.scala | 2 +- .../spark/sql/parquet/readSupport.scala | 85 ++++++++++++++----- 2 files changed, 65 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index 63ec7d97fb45..6c3329590e44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -38,7 +38,7 @@ private[parquet] class RowRecordMaterializer(parquetSchema: MessageType, attribu extends RecordMaterializer[Row] { private val rootConverter = - new CatalystStructConverter(parquetSchema, StructType.fromAttributes(attributes), NoopUpdater) + new CatalystRowConverter(parquetSchema, StructType.fromAttributes(attributes), NoopUpdater) override def getCurrentRecord: Row = rootConverter.currentRow diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/readSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/readSupport.scala index c5cfd7dbfe6c..f7601e9ab2ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/readSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/readSupport.scala @@ -44,10 +44,12 @@ private[parquet] object SparkRowReadSupport { } private[parquet] class SparkRowReadSupport extends ReadSupport[Row] with Logging { + import SparkRowReadSupport._ + override def init(context: InitContext): ReadContext = { val conf = context.getConfiguration - val maybeRequestedSchema = Option(conf.get(SparkRowReadSupport.SPARK_ROW_REQUESTED_SCHEMA)) - val maybeRowSchema = Option(conf.get(SparkRowReadSupport.SPARK_ROW_SCHEMA)) + val maybeRequestedSchema = Option(conf.get(SPARK_ROW_REQUESTED_SCHEMA)) + val maybeRowSchema = Option(conf.get(SPARK_ROW_SCHEMA)) val fullParquetSchema = context.getFileSchema val parquetRequestedSchema = @@ -58,8 +60,8 @@ private[parquet] class SparkRowReadSupport extends ReadSupport[Row] with Logging val metadata = Map.empty[String, String] ++ - maybeRequestedSchema.map(SparkRowReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++ - maybeRowSchema.map(SparkRowReadSupport.SPARK_ROW_SCHEMA -> _) + maybeRequestedSchema.map(SPARK_ROW_REQUESTED_SCHEMA -> _) ++ + maybeRowSchema.map(SPARK_ROW_SCHEMA -> _) new ReadContext(parquetRequestedSchema, metadata) } @@ -75,8 +77,8 @@ private[parquet] class SparkRowReadSupport extends ReadSupport[Row] with Logging val parquetRequestedSchema = readContext.getRequestedSchema val metadata = readContext.getReadSupportMetadata - val maybeSparkSchema = Option(metadata.get(SparkRowReadSupport.SPARK_METADATA_KEY)) - val maybeSparkRequestedSchema = Option(metadata.get(SparkRowReadSupport.SPARK_ROW_REQUESTED_SCHEMA)) + val maybeSparkSchema = Option(metadata.get(SPARK_METADATA_KEY)) + val maybeSparkRequestedSchema = Option(metadata.get(SPARK_ROW_REQUESTED_SCHEMA)) val sparkSchema = maybeSparkRequestedSchema @@ -107,13 +109,19 @@ private[parquet] class SparkRowRecordMaterializer( sparkSchema: StructType) extends RecordMaterializer[Row] { - private val rootConverter = new CatalystStructConverter(parquetSchema, sparkSchema, NoopUpdater) + private val rootConverter = new CatalystRowConverter(parquetSchema, sparkSchema, NoopUpdater) override def getCurrentRecord: Row = rootConverter.currentRow override def getRootConverter: GroupConverter = rootConverter } +/** + * A [[ParentContainerUpdater]] is used by a Parquet converter to set converted values to some + * corresponding parent container. For example, a converter for a `Struct` field may set converted + * values to a [[MutableRow]]; or a converter for array element may append converted values to an + * [[ArrayBuffer]]. + */ private[parquet] trait ParentContainerUpdater { def set(value: Any): Unit = () def setBoolean(value: Boolean): Unit = set(value) @@ -125,27 +133,45 @@ private[parquet] trait ParentContainerUpdater { def setDouble(value: Double): Unit = set(value) } +/** A no-op updater used for root converter (who doesn't have a parent). */ private[parquet] object NoopUpdater extends ParentContainerUpdater -private[parquet] class RowUpdater(row: MutableRow, ordinal: Int) extends ParentContainerUpdater { - override def set(value: Any): Unit = row(ordinal) = value - override def setBoolean(value: Boolean): Unit = row.setBoolean(ordinal, value) - override def setByte(value: Byte): Unit = row.setByte(ordinal, value) - override def setShort(value: Short): Unit = row.setShort(ordinal, value) - override def setInt(value: Int): Unit = row.setInt(ordinal, value) - override def setLong(value: Long): Unit = row.setLong(ordinal, value) - override def setDouble(value: Double): Unit = row.setDouble(ordinal, value) - override def setFloat(value: Float): Unit = row.setFloat(ordinal, value) -} - -private[parquet] class CatalystStructConverter( +/** + * This Parquet converter converts Parquet records to Spark SQL rows. + * + * @param parquetType Parquet type of Parquet records + * @param sparkType A Spark SQL struct type that corresponds to the Parquet record type + * @param updater An updater which takes care of the converted row object + */ +private[parquet] class CatalystRowConverter( parquetType: GroupType, sparkType: StructType, updater: ParentContainerUpdater) extends GroupConverter { + /** + * Updater used together with [[CatalystRowConverter]]. + * + * @constructor Constructs a [[RowUpdater]] which sets converted filed values to the `ordinal`-th + * cell in `row`. + */ + private final class RowUpdater(row: MutableRow, ordinal: Int) extends ParentContainerUpdater { + override def set(value: Any): Unit = row(ordinal) = value + override def setBoolean(value: Boolean): Unit = row.setBoolean(ordinal, value) + override def setByte(value: Byte): Unit = row.setByte(ordinal, value) + override def setShort(value: Short): Unit = row.setShort(ordinal, value) + override def setInt(value: Int): Unit = row.setInt(ordinal, value) + override def setLong(value: Long): Unit = row.setLong(ordinal, value) + override def setDouble(value: Double): Unit = row.setDouble(ordinal, value) + override def setFloat(value: Float): Unit = row.setFloat(ordinal, value) + } + + /** + * Represents the converted row object once an entire Parquet record is converted. + */ val currentRow = new SpecificMutableRow(sparkType.map(_.dataType)) + // Converters for each field. private val fieldConverters: Array[Converter] = { parquetType.getFields.zip(sparkType).zipWithIndex.map { case ((parquetFieldType, sparkField), ordinal) => @@ -165,6 +191,10 @@ private[parquet] class CatalystStructConverter( } } + /** + * Creates a converter for the given Parquet type `parquetType` and Spark SQL data type + * `sparkType`. Converted values are handled by `updater`. + */ private def newConverter( parquetType: Type, sparkType: DataType, @@ -213,7 +243,7 @@ private[parquet] class CatalystStructConverter( new CatalystMapConverter(parquetType.asGroupType(), t, updater) case t: StructType => - new CatalystStructConverter(parquetType.asGroupType(), t, updater) + new CatalystRowConverter(parquetType.asGroupType(), t, updater) case t: UserDefinedType[_] => val sparkTypeForUDT = t.sqlType @@ -230,6 +260,7 @@ private[parquet] class CatalystStructConverter( // Other converters ///////////////////////////////////////////////////////////////////////////// + /** Parquet converter for Parquet primitive types. */ private final class CatalystPrimitiveConverter(updater: ParentContainerUpdater) extends PrimitiveConverter { @@ -241,6 +272,9 @@ private[parquet] class CatalystStructConverter( override def addBinary(value: Binary): Unit = updater.set(value.getBytes) } + /** + * Parquet converter for strings. A dictionary is used to avoid unnecessary string decoding cost. + */ private final class CatalystStringConverter(updater: ParentContainerUpdater) extends PrimitiveConverter { @@ -261,7 +295,11 @@ private[parquet] class CatalystStructConverter( override def addBinary(value: Binary): Unit = updater.set(value.toStringUsingUTF8) } - // TODO Handle decimals stored as INT32 and INT64 + /** + * Parquet converter for decimals. + * + * @todo Handle decimals stored as INT32 and INT64 + */ private final class CatalystDecimalConverter( decimalType: DecimalType, updater: ParentContainerUpdater) @@ -292,6 +330,7 @@ private[parquet] class CatalystStructConverter( } } + /** Parquet converter for arrays. */ private final class CatalystArrayConverter( parquetSchema: GroupType, sparkSchema: ArrayType, @@ -349,6 +388,7 @@ private[parquet] class CatalystStructConverter( } } + /** Parquet converter for maps */ private final class CatalystMapConverter( parquetType: GroupType, sparkType: MapType, @@ -372,6 +412,7 @@ private[parquet] class CatalystStructConverter( override def start(): Unit = currentMap.clear() + /** Parquet converter for key-value pairs within the map. */ private final class KeyValueConverter( parquetKeyType: Type, parquetValueType: Type, @@ -384,10 +425,12 @@ private[parquet] class CatalystStructConverter( private var currentValue: Any = _ private val converters = Array( + // Converter for keys newConverter(parquetKeyType, sparkKeyType, new ParentContainerUpdater { override def set(value: Any): Unit = currentKey = value }), + // Converter for values newConverter(parquetValueType, sparkValueType, new ParentContainerUpdater { override def set(value: Any): Unit = currentValue = value })) From 0de2e8b83fb4b616603c2013792db3351c82f324 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 6 Apr 2015 19:58:57 +0800 Subject: [PATCH 5/7] Moves converter classes back to ParquetConverter.scala --- .../spark/sql/parquet/ParquetConverter.scala | 373 ++++++++++++++- .../spark/sql/parquet/readSupport.scala | 448 ------------------ 2 files changed, 372 insertions(+), 449 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/parquet/readSupport.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala index 66b3954d6cb9..fdae38782aad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala @@ -20,13 +20,384 @@ package org.apache.spark.sql.parquet import java.sql.Timestamp import java.util.{Calendar, TimeZone} +import scala.collection.JavaConversions._ +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + import jodd.datetime.JDateTime -import parquet.io.api.Binary +import parquet.column.Dictionary +import parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter} +import parquet.schema.{GroupType, PrimitiveType => ParquetPrimitiveType, Type} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.parquet.timestamp.NanoTime import org.apache.spark.sql.types._ +/** + * A [[ParentContainerUpdater]] is used by a Parquet converter to set converted values to some + * corresponding parent container. For example, a converter for a `Struct` field may set converted + * values to a [[MutableRow]]; or a converter for array element may append converted values to an + * [[ArrayBuffer]]. + */ +private[parquet] trait ParentContainerUpdater { + def set(value: Any): Unit = () + def setBoolean(value: Boolean): Unit = set(value) + def setByte(value: Byte): Unit = set(value) + def setShort(value: Short): Unit = set(value) + def setInt(value: Int): Unit = set(value) + def setLong(value: Long): Unit = set(value) + def setFloat(value: Float): Unit = set(value) + def setDouble(value: Double): Unit = set(value) +} + +/** A no-op updater used for root converter (who doesn't have a parent). */ +private[parquet] object NoopUpdater extends ParentContainerUpdater + +/** + * This Parquet converter converts Parquet records to Spark SQL rows. + * + * @param parquetType Parquet type of Parquet records + * @param sparkType A Spark SQL struct type that corresponds to the Parquet record type + * @param updater An updater which takes care of the converted row object + */ +private[parquet] class CatalystRowConverter( + parquetType: GroupType, + sparkType: StructType, + updater: ParentContainerUpdater) + extends GroupConverter { + + /** + * Updater used together with [[CatalystRowConverter]]. + * + * @constructor Constructs a [[RowUpdater]] which sets converted filed values to the `ordinal`-th + * cell in `row`. + */ + private final class RowUpdater(row: MutableRow, ordinal: Int) extends ParentContainerUpdater { + override def set(value: Any): Unit = row(ordinal) = value + override def setBoolean(value: Boolean): Unit = row.setBoolean(ordinal, value) + override def setByte(value: Byte): Unit = row.setByte(ordinal, value) + override def setShort(value: Short): Unit = row.setShort(ordinal, value) + override def setInt(value: Int): Unit = row.setInt(ordinal, value) + override def setLong(value: Long): Unit = row.setLong(ordinal, value) + override def setDouble(value: Double): Unit = row.setDouble(ordinal, value) + override def setFloat(value: Float): Unit = row.setFloat(ordinal, value) + } + + /** + * Represents the converted row object once an entire Parquet record is converted. + */ + val currentRow = new SpecificMutableRow(sparkType.map(_.dataType)) + + // Converters for each field. + private val fieldConverters: Array[Converter] = { + parquetType.getFields.zip(sparkType).zipWithIndex.map { + case ((parquetFieldType, sparkField), ordinal) => + newConverter(parquetFieldType, sparkField.dataType, new RowUpdater(currentRow, ordinal)) + }.toArray + } + + override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex) + + override def end(): Unit = updater.set(currentRow) + + override def start(): Unit = { + var i = 0 + while (i < currentRow.length) { + currentRow.setNullAt(i) + i += 1 + } + } + + /** + * Creates a converter for the given Parquet type `parquetType` and Spark SQL data type + * `sparkType`. Converted values are handled by `updater`. + */ + private def newConverter( + parquetType: Type, + sparkType: DataType, + updater: ParentContainerUpdater): Converter = { + + sparkType match { + case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType => + new CatalystPrimitiveConverter(updater) + + case ByteType => + new PrimitiveConverter { + override def addInt(value: Int): Unit = + updater.setByte(value.asInstanceOf[ByteType#JvmType]) + } + + case ShortType => + new PrimitiveConverter { + override def addInt(value: Int): Unit = + updater.setShort(value.asInstanceOf[ShortType#JvmType]) + } + + case t: DecimalType => + new CatalystDecimalConverter(t, updater) + + case StringType => + new CatalystStringConverter(updater) + + case TimestampType => + new PrimitiveConverter { + override def addBinary(value: Binary): Unit = { + updater.set(CatalystTimestampConverter.convertToTimestamp(value)) + } + } + + case DateType => + new PrimitiveConverter { + override def addInt(value: Int): Unit = { + updater.setInt(value.asInstanceOf[DateType#JvmType]) + } + } + + case t: ArrayType => + new CatalystArrayConverter(parquetType.asGroupType(), t, updater) + + case t: MapType => + new CatalystMapConverter(parquetType.asGroupType(), t, updater) + + case t: StructType => + new CatalystRowConverter(parquetType.asGroupType(), t, updater) + + case t: UserDefinedType[_] => + val sparkTypeForUDT = t.sqlType + val parquetTypeForUDT = ParquetTypesConverter.fromDataType(sparkTypeForUDT, "") + newConverter(parquetTypeForUDT, sparkTypeForUDT, updater) + + case _ => + throw new RuntimeException( + s"Unable to create Parquet converter for data type ${sparkType.json}") + } + } + + /** + * Parquet converter for Parquet primitive types. Note that not all Spark SQL primitive types + * are handled by this converter. Parquet primitive types are only a subset of those of Spark + * SQL. For example, BYTE, SHORT and INT in Spark SQL are all covered by INT32 in Parquet. + */ + private final class CatalystPrimitiveConverter(updater: ParentContainerUpdater) + extends PrimitiveConverter { + + override def addBoolean(value: Boolean): Unit = updater.setBoolean(value) + override def addInt(value: Int): Unit = updater.setInt(value) + override def addLong(value: Long): Unit = updater.setLong(value) + override def addFloat(value: Float): Unit = updater.setFloat(value) + override def addDouble(value: Double): Unit = updater.setDouble(value) + override def addBinary(value: Binary): Unit = updater.set(value.getBytes) + } + + /** + * Parquet converter for strings. A dictionary is used to avoid minimize string decoding cost. + */ + private final class CatalystStringConverter(updater: ParentContainerUpdater) + extends PrimitiveConverter { + + private var expandedDictionary: Array[String] = null + + override def hasDictionarySupport: Boolean = true + + override def setDictionary(dictionary: Dictionary): Unit = { + this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { + dictionary.decodeToBinary(_).toStringUsingUTF8 + } + } + + override def addValueFromDictionary(dictionaryId: Int): Unit = { + updater.set(expandedDictionary(dictionaryId)) + } + + override def addBinary(value: Binary): Unit = updater.set(value.toStringUsingUTF8) + } + + /** + * Parquet converter for fixed-precision decimals. + * + * @todo Handle fixed-precision decimals stored as INT32 and INT64 + */ + private final class CatalystDecimalConverter( + decimalType: DecimalType, + updater: ParentContainerUpdater) + extends PrimitiveConverter { + + override def addBinary(value: Binary): Unit = { + updater.set(toDecimal(value)) + } + + private def toDecimal(value: Binary): Decimal = { + val precision = decimalType.precision + val scale = decimalType.scale + val bytes = value.getBytes + + require(bytes.length <= 16, "Decimal field too large to read") + + var unscaled = 0L + var i = 0 + + while (i < bytes.length) { + unscaled = (unscaled << 8) | (bytes(i) & 0xff) + i += 1 + } + + val bits = 8 * bytes.length + unscaled = (unscaled << (64 - bits)) >> (64 - bits) + Decimal(unscaled, precision, scale) + } + } + + /** + * Parquet converter for arrays. Spark SQL arrays are represented as Parquet lists. Standard + * Parquet lists are represented as a 3-level group annotated by `LIST`: + * {{{ + * group (LIST) { <-- parquetSchema points here + * repeated group list { + * element; + * } + * } + * }}} + * The `parquetSchema` constructor argument points to the outermost group. + * + * However, before this representation is standardized, some Parquet libraries/tools also use some + * non-standard formats to represent list-like structures. Backwards-compatibility rules for + * handling these cases are described in Parquet format spec. + * + * @see https://github.com/apache/incubator-parquet-format/blob/master/LogicalTypes.md#lists + */ + private final class CatalystArrayConverter( + parquetSchema: GroupType, + sparkSchema: ArrayType, + updater: ParentContainerUpdater) + extends GroupConverter { + + // TODO This is slow! Needs specialization. + private val currentArray = ArrayBuffer.empty[Any] + + private val elementConverter: Converter = { + val repeatedType = parquetSchema.getType(0) + val elementType = sparkSchema.elementType + + if (isElementType(repeatedType, elementType)) { + newConverter(repeatedType, elementType, new ParentContainerUpdater { + override def set(value: Any): Unit = currentArray += value + }) + } else { + new ElementConverter(repeatedType.asGroupType().getType(0), elementType) + } + } + + override def getConverter(fieldIndex: Int): Converter = elementConverter + + override def end(): Unit = updater.set(currentArray) + + override def start(): Unit = currentArray.clear() + + // scalastyle:off + /** + * Returns whether the given type is the element type of a list or is a syntactic group with + * one field that is the element type. This is determined by checking whether the type can be + * a syntactic group and by checking whether a potential syntactic group matches the expected + * schema. + * {{{ + * group (LIST) { + * repeated group list { <-- repeatedType points here + * element; + * } + * } + * }}} + * In short, here we handle Parquet list backwards-compatibility rules on the read path. This + * method is based on `AvroIndexedRecordConverter.isElementType`. + * + * @see https://github.com/apache/incubator-parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules + */ + // scalastyle:on + private def isElementType(repeatedType: Type, elementType: DataType): Boolean = { + (repeatedType, elementType) match { + case (t: ParquetPrimitiveType, _) => true + case (t: GroupType, _) if t.getFieldCount > 1 => true + case (t: GroupType, StructType(Array(f))) if f.name == t.getFieldName(0) => true + case _ => false + } + } + + /** Array element converter */ + private final class ElementConverter(parquetType: Type, sparkType: DataType) + extends GroupConverter { + + private var currentElement: Any = _ + + private val converter = newConverter(parquetType, sparkType, new ParentContainerUpdater { + override def set(value: Any): Unit = currentElement = value + }) + + override def getConverter(fieldIndex: Int): Converter = converter + + override def end(): Unit = currentArray += currentElement + + override def start(): Unit = currentElement = null + } + } + + /** Parquet converter for maps */ + private final class CatalystMapConverter( + parquetType: GroupType, + sparkType: MapType, + updater: ParentContainerUpdater) + extends GroupConverter { + + private val currentMap = mutable.Map.empty[Any, Any] + + private val keyValueConverter = { + val repeatedType = parquetType.getType(0).asGroupType() + new KeyValueConverter( + repeatedType.getType(0), + repeatedType.getType(1), + sparkType.keyType, + sparkType.valueType) + } + + override def getConverter(fieldIndex: Int): Converter = keyValueConverter + + override def end(): Unit = updater.set(currentMap) + + override def start(): Unit = currentMap.clear() + + /** Parquet converter for key-value pairs within the map. */ + private final class KeyValueConverter( + parquetKeyType: Type, + parquetValueType: Type, + sparkKeyType: DataType, + sparkValueType: DataType) + extends GroupConverter { + + private var currentKey: Any = _ + + private var currentValue: Any = _ + + private val converters = Array( + // Converter for keys + newConverter(parquetKeyType, sparkKeyType, new ParentContainerUpdater { + override def set(value: Any): Unit = currentKey = value + }), + + // Converter for values + newConverter(parquetValueType, sparkValueType, new ParentContainerUpdater { + override def set(value: Any): Unit = currentValue = value + })) + + override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex) + + override def end(): Unit = currentMap(currentKey) = currentValue + + override def start(): Unit = { + currentKey = null + currentValue = null + } + } + } +} + private[sql] object CatalystConverter { // The type internally used for fields type FieldType = StructField diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/readSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/readSupport.scala deleted file mode 100644 index f7601e9ab2ce..000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/readSupport.scala +++ /dev/null @@ -1,448 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import java.util - -import scala.collection.JavaConversions._ -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer - -import org.apache.hadoop.conf.Configuration -import parquet.column.Dictionary -import parquet.hadoop.api.ReadSupport.ReadContext -import parquet.hadoop.api.{InitContext, ReadSupport} -import parquet.io.api._ -import parquet.schema.{PrimitiveType => ParquetPrimitiveType, _} - -import org.apache.spark.Logging -import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.expressions.{MutableRow, SpecificMutableRow} -import org.apache.spark.sql.types._ - -private[parquet] object SparkRowReadSupport { - val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema" - - val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata" - - val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes" -} - -private[parquet] class SparkRowReadSupport extends ReadSupport[Row] with Logging { - import SparkRowReadSupport._ - - override def init(context: InitContext): ReadContext = { - val conf = context.getConfiguration - val maybeRequestedSchema = Option(conf.get(SPARK_ROW_REQUESTED_SCHEMA)) - val maybeRowSchema = Option(conf.get(SPARK_ROW_SCHEMA)) - - val fullParquetSchema = context.getFileSchema - val parquetRequestedSchema = - maybeRequestedSchema.map { value => - val requestedFieldNames = StructType.fromString(value).fieldNames.toSet - pruneFields(fullParquetSchema, requestedFieldNames) - }.getOrElse(fullParquetSchema) - - val metadata = - Map.empty[String, String] ++ - maybeRequestedSchema.map(SPARK_ROW_REQUESTED_SCHEMA -> _) ++ - maybeRowSchema.map(SPARK_ROW_SCHEMA -> _) - - new ReadContext(parquetRequestedSchema, metadata) - } - - override def prepareForRead( - configuration: Configuration, - keyValueMetaData: util.Map[String, String], - fileSchema: MessageType, - readContext: ReadContext): RecordMaterializer[Row] = { - - logDebug(s"Preparing for reading with Parquet file schema $fileSchema") - - val parquetRequestedSchema = readContext.getRequestedSchema - val metadata = readContext.getReadSupportMetadata - - val maybeSparkSchema = Option(metadata.get(SPARK_METADATA_KEY)) - val maybeSparkRequestedSchema = Option(metadata.get(SPARK_ROW_REQUESTED_SCHEMA)) - - val sparkSchema = - maybeSparkRequestedSchema - .orElse(maybeSparkSchema) - .map(ParquetTypesConverter.convertFromString) - .getOrElse( - ParquetTypesConverter.convertToAttributes( - parquetRequestedSchema, - isBinaryAsString = false, - isInt96AsTimestamp = true)) - - new SparkRowRecordMaterializer(parquetRequestedSchema, StructType.fromAttributes(sparkSchema)) - } - - /** - * Removes those Parquet fields that are not requested. - */ - private def pruneFields( - schema: MessageType, - requestedFieldNames: Set[String]): MessageType = { - val requestedFields = schema.getFields.filter(f => requestedFieldNames.contains(f.getName)) - new MessageType("root", requestedFields: _*) - } -} - -private[parquet] class SparkRowRecordMaterializer( - parquetSchema: MessageType, - sparkSchema: StructType) - extends RecordMaterializer[Row] { - - private val rootConverter = new CatalystRowConverter(parquetSchema, sparkSchema, NoopUpdater) - - override def getCurrentRecord: Row = rootConverter.currentRow - - override def getRootConverter: GroupConverter = rootConverter -} - -/** - * A [[ParentContainerUpdater]] is used by a Parquet converter to set converted values to some - * corresponding parent container. For example, a converter for a `Struct` field may set converted - * values to a [[MutableRow]]; or a converter for array element may append converted values to an - * [[ArrayBuffer]]. - */ -private[parquet] trait ParentContainerUpdater { - def set(value: Any): Unit = () - def setBoolean(value: Boolean): Unit = set(value) - def setByte(value: Byte): Unit = set(value) - def setShort(value: Short): Unit = set(value) - def setInt(value: Int): Unit = set(value) - def setLong(value: Long): Unit = set(value) - def setFloat(value: Float): Unit = set(value) - def setDouble(value: Double): Unit = set(value) -} - -/** A no-op updater used for root converter (who doesn't have a parent). */ -private[parquet] object NoopUpdater extends ParentContainerUpdater - -/** - * This Parquet converter converts Parquet records to Spark SQL rows. - * - * @param parquetType Parquet type of Parquet records - * @param sparkType A Spark SQL struct type that corresponds to the Parquet record type - * @param updater An updater which takes care of the converted row object - */ -private[parquet] class CatalystRowConverter( - parquetType: GroupType, - sparkType: StructType, - updater: ParentContainerUpdater) - extends GroupConverter { - - /** - * Updater used together with [[CatalystRowConverter]]. - * - * @constructor Constructs a [[RowUpdater]] which sets converted filed values to the `ordinal`-th - * cell in `row`. - */ - private final class RowUpdater(row: MutableRow, ordinal: Int) extends ParentContainerUpdater { - override def set(value: Any): Unit = row(ordinal) = value - override def setBoolean(value: Boolean): Unit = row.setBoolean(ordinal, value) - override def setByte(value: Byte): Unit = row.setByte(ordinal, value) - override def setShort(value: Short): Unit = row.setShort(ordinal, value) - override def setInt(value: Int): Unit = row.setInt(ordinal, value) - override def setLong(value: Long): Unit = row.setLong(ordinal, value) - override def setDouble(value: Double): Unit = row.setDouble(ordinal, value) - override def setFloat(value: Float): Unit = row.setFloat(ordinal, value) - } - - /** - * Represents the converted row object once an entire Parquet record is converted. - */ - val currentRow = new SpecificMutableRow(sparkType.map(_.dataType)) - - // Converters for each field. - private val fieldConverters: Array[Converter] = { - parquetType.getFields.zip(sparkType).zipWithIndex.map { - case ((parquetFieldType, sparkField), ordinal) => - newConverter(parquetFieldType, sparkField.dataType, new RowUpdater(currentRow, ordinal)) - }.toArray - } - - override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex) - - override def end(): Unit = updater.set(currentRow) - - override def start(): Unit = { - var i = 0 - while (i < currentRow.length) { - currentRow.setNullAt(i) - i += 1 - } - } - - /** - * Creates a converter for the given Parquet type `parquetType` and Spark SQL data type - * `sparkType`. Converted values are handled by `updater`. - */ - private def newConverter( - parquetType: Type, - sparkType: DataType, - updater: ParentContainerUpdater): Converter = { - - sparkType match { - case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType => - new CatalystPrimitiveConverter(updater) - - case ByteType => - new PrimitiveConverter { - override def addInt(value: Int): Unit = - updater.setByte(value.asInstanceOf[ByteType#JvmType]) - } - - case ShortType => - new PrimitiveConverter { - override def addInt(value: Int): Unit = - updater.setShort(value.asInstanceOf[ShortType#JvmType]) - } - - case t: DecimalType => - new CatalystDecimalConverter(t, updater) - - case StringType => - new CatalystStringConverter(updater) - - case TimestampType => - new PrimitiveConverter { - override def addBinary(value: Binary): Unit = { - updater.set(CatalystTimestampConverter.convertToTimestamp(value)) - } - } - - case DateType => - new PrimitiveConverter { - override def addInt(value: Int): Unit = { - updater.setInt(value.asInstanceOf[DateType#JvmType]) - } - } - - case t: ArrayType => - new CatalystArrayConverter(parquetType.asGroupType(), t, updater) - - case t: MapType => - new CatalystMapConverter(parquetType.asGroupType(), t, updater) - - case t: StructType => - new CatalystRowConverter(parquetType.asGroupType(), t, updater) - - case t: UserDefinedType[_] => - val sparkTypeForUDT = t.sqlType - val parquetTypeForUDT = ParquetTypesConverter.fromDataType(sparkTypeForUDT, "") - newConverter(parquetTypeForUDT, sparkTypeForUDT, updater) - - case _ => - throw new RuntimeException( - s"Unable to create Parquet converter for data type ${sparkType.json}") - } - } - - ///////////////////////////////////////////////////////////////////////////// - // Other converters - ///////////////////////////////////////////////////////////////////////////// - - /** Parquet converter for Parquet primitive types. */ - private final class CatalystPrimitiveConverter(updater: ParentContainerUpdater) - extends PrimitiveConverter { - - override def addBoolean(value: Boolean): Unit = updater.setBoolean(value) - override def addInt(value: Int): Unit = updater.setInt(value) - override def addLong(value: Long): Unit = updater.setLong(value) - override def addFloat(value: Float): Unit = updater.setFloat(value) - override def addDouble(value: Double): Unit = updater.setDouble(value) - override def addBinary(value: Binary): Unit = updater.set(value.getBytes) - } - - /** - * Parquet converter for strings. A dictionary is used to avoid unnecessary string decoding cost. - */ - private final class CatalystStringConverter(updater: ParentContainerUpdater) - extends PrimitiveConverter { - - private var expandedDictionary: Array[String] = null - - override def hasDictionarySupport: Boolean = true - - override def setDictionary(dictionary: Dictionary): Unit = { - this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { - dictionary.decodeToBinary(_).toStringUsingUTF8 - } - } - - override def addValueFromDictionary(dictionaryId: Int): Unit = { - updater.set(expandedDictionary(dictionaryId)) - } - - override def addBinary(value: Binary): Unit = updater.set(value.toStringUsingUTF8) - } - - /** - * Parquet converter for decimals. - * - * @todo Handle decimals stored as INT32 and INT64 - */ - private final class CatalystDecimalConverter( - decimalType: DecimalType, - updater: ParentContainerUpdater) - extends PrimitiveConverter { - - override def addBinary(value: Binary): Unit = { - updater.set(toDecimal(value)) - } - - private def toDecimal(value: Binary): Decimal = { - val precision = decimalType.precision - val scale = decimalType.scale - val bytes = value.getBytes - - require(bytes.length <= 16, "Decimal field too large to read") - - var unscaled = 0L - var i = 0 - - while (i < bytes.length) { - unscaled = (unscaled << 8) | (bytes(i) & 0xff) - i += 1 - } - - val bits = 8 * bytes.length - unscaled = (unscaled << (64 - bits)) >> (64 - bits) - Decimal(unscaled, precision, scale) - } - } - - /** Parquet converter for arrays. */ - private final class CatalystArrayConverter( - parquetSchema: GroupType, - sparkSchema: ArrayType, - updater: ParentContainerUpdater) - extends GroupConverter { - - // TODO This is slow! Needs specialization. - private val currentArray = ArrayBuffer.empty[Any] - - private val elementConverter: Converter = { - val repeatedType = parquetSchema.getType(0) - val elementType = sparkSchema.elementType - - if (isElementType(repeatedType, elementType)) { - newConverter(repeatedType, elementType, new ParentContainerUpdater { - override def set(value: Any): Unit = currentArray += value - }) - } else { - new ElementConverter(repeatedType.asGroupType().getType(0), elementType) - } - } - - override def getConverter(fieldIndex: Int): Converter = elementConverter - - override def end(): Unit = updater.set(currentArray) - - override def start(): Unit = currentArray.clear() - - private def isElementType(repeatedType: Type, elementType: DataType): Boolean = { - (repeatedType, elementType) match { - case (t: ParquetPrimitiveType, _) => true - case (t: GroupType, _) if t.getFieldCount > 1 => true - case (t: GroupType, StructType(Array(f))) if f.name == t.getFieldName(0) => true - case _ => false - } - } - - /** Array element converter */ - private final class ElementConverter( - parquetType: Type, - sparkType: DataType) - extends GroupConverter { - - private var currentElement: Any = _ - - private val converter = newConverter(parquetType, sparkType, new ParentContainerUpdater { - override def set(value: Any): Unit = currentElement = value - }) - - override def getConverter(fieldIndex: Int): Converter = converter - - override def end(): Unit = currentArray += currentElement - - override def start(): Unit = currentElement = null - } - } - - /** Parquet converter for maps */ - private final class CatalystMapConverter( - parquetType: GroupType, - sparkType: MapType, - updater: ParentContainerUpdater) - extends GroupConverter { - - private val currentMap = mutable.Map.empty[Any, Any] - - private val keyValueConverter = { - val repeatedType = parquetType.getType(0).asGroupType() - new KeyValueConverter( - repeatedType.getType(0), - repeatedType.getType(1), - sparkType.keyType, - sparkType.valueType) - } - - override def getConverter(fieldIndex: Int): Converter = keyValueConverter - - override def end(): Unit = updater.set(currentMap) - - override def start(): Unit = currentMap.clear() - - /** Parquet converter for key-value pairs within the map. */ - private final class KeyValueConverter( - parquetKeyType: Type, - parquetValueType: Type, - sparkKeyType: DataType, - sparkValueType: DataType) - extends GroupConverter { - - private var currentKey: Any = _ - - private var currentValue: Any = _ - - private val converters = Array( - // Converter for keys - newConverter(parquetKeyType, sparkKeyType, new ParentContainerUpdater { - override def set(value: Any): Unit = currentKey = value - }), - - // Converter for values - newConverter(parquetValueType, sparkValueType, new ParentContainerUpdater { - override def set(value: Any): Unit = currentValue = value - })) - - override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex) - - override def end(): Unit = currentMap(currentKey) = currentValue - - override def start(): Unit = { - currentKey = null - currentValue = null - } - } - } -} From 95229493ec2d3a69762f4dd4fb707a87911980b3 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 8 Apr 2015 21:34:44 +0800 Subject: [PATCH 6/7] Fixes comments --- .../spark/sql/parquet/ParquetConverter.scala | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala index fdae38782aad..5a7edd077876 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala @@ -57,7 +57,7 @@ private[parquet] object NoopUpdater extends ParentContainerUpdater * This Parquet converter converts Parquet records to Spark SQL rows. * * @param parquetType Parquet type of Parquet records - * @param sparkType A Spark SQL struct type that corresponds to the Parquet record type + * @param sparkType Spark SQL schema that corresponds to the Parquet record type * @param updater An updater which takes care of the converted row object */ private[parquet] class CatalystRowConverter( @@ -67,10 +67,8 @@ private[parquet] class CatalystRowConverter( extends GroupConverter { /** - * Updater used together with [[CatalystRowConverter]]. - * - * @constructor Constructs a [[RowUpdater]] which sets converted filed values to the `ordinal`-th - * cell in `row`. + * Updater used together with [[CatalystRowConverter]]. It sets converted filed values to the + * `ordinal`-th cell in `row`. */ private final class RowUpdater(row: MutableRow, ordinal: Int) extends ParentContainerUpdater { override def set(value: Any): Unit = row(ordinal) = value @@ -83,15 +81,14 @@ private[parquet] class CatalystRowConverter( override def setFloat(value: Float): Unit = row.setFloat(ordinal, value) } - /** - * Represents the converted row object once an entire Parquet record is converted. - */ + /** Represents the converted row object once an entire Parquet record is converted. */ val currentRow = new SpecificMutableRow(sparkType.map(_.dataType)) // Converters for each field. private val fieldConverters: Array[Converter] = { parquetType.getFields.zip(sparkType).zipWithIndex.map { case ((parquetFieldType, sparkField), ordinal) => + // Converted field value should be set to the `ordinal`-th cell of `currentRow` newConverter(parquetFieldType, sparkField.dataType, new RowUpdater(currentRow, ordinal)) }.toArray } @@ -190,7 +187,7 @@ private[parquet] class CatalystRowConverter( } /** - * Parquet converter for strings. A dictionary is used to avoid minimize string decoding cost. + * Parquet converter for strings. A dictionary is used to minimize string decoding cost. */ private final class CatalystStringConverter(updater: ParentContainerUpdater) extends PrimitiveConverter { From 2529b76141c068fe69a03e29ddc50ca496cd9dcd Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 9 Apr 2015 00:17:16 +0800 Subject: [PATCH 7/7] Removes DateType support from SpecificMutableRow --- .../spark/sql/catalyst/expressions/SpecificMutableRow.scala | 1 - .../scala/org/apache/spark/sql/parquet/ParquetConverter.scala | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala index acc29207f360..47b6f358ed1b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala @@ -202,7 +202,6 @@ final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableR case DoubleType => new MutableDouble case BooleanType => new MutableBoolean case LongType => new MutableLong - case DateType => new MutableInt case _ => new MutableAny }.toArray) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala index 5a7edd077876..02defc2a4dcc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala @@ -146,7 +146,8 @@ private[parquet] class CatalystRowConverter( case DateType => new PrimitiveConverter { override def addInt(value: Int): Unit = { - updater.setInt(value.asInstanceOf[DateType#JvmType]) + // DateType is not specialized in `SpecificMutableRow`, have to box it here. + updater.set(value.asInstanceOf[DateType#JvmType]) } }