diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala index c0155eeb450a..6754c4713830 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala @@ -106,7 +106,9 @@ final class Decimal extends Ordered[Decimal] with Serializable { */ def set(decimal: BigDecimal, precision: Int, scale: Int): Decimal = { this.decimalVal = decimal.setScale(scale, ROUNDING_MODE) - require(decimalVal.precision <= precision, "Overflowed precision") + require( + decimalVal.precision <= precision, + s"Precision overflow. Max precision: $precision, got: ${decimalVal.precision}") this.longVal = 0L this._precision = precision this._scale = scale diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 6644e85d4a03..290ce472c6d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -283,12 +283,12 @@ private[spark] object SQLConf { defaultValue = Some(true), doc = "Enables Parquet filter push-down optimization when set to true.") - val PARQUET_FOLLOW_PARQUET_FORMAT_SPEC = booleanConf( - key = "spark.sql.parquet.followParquetFormatSpec", + val PARQUET_WRITE_LEGACY_FORMAT = booleanConf( + key = "spark.sql.parquet.writeLegacyParquetFormat", defaultValue = Some(false), - doc = "Whether to follow Parquet's format specification when converting Parquet schema to " + - "Spark SQL schema and vice versa.", - isPublic = false) + doc = "When true, writes Parquet data in legacy format compatible with Spark 1.4.0 and prior " + + "versions, instead of the standard one defined in parquet-format spec.", + isPublic = true) val PARQUET_OUTPUT_COMMITTER_CLASS = stringConf( key = "spark.sql.parquet.output.committer.class", @@ -493,7 +493,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf { private[spark] def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP) - private[spark] def followParquetFormatSpec: Boolean = getConf(PARQUET_FOLLOW_PARQUET_FORMAT_SPEC) + private[spark] def writeLegacyParquetFormat: Boolean = getConf(PARQUET_WRITE_LEGACY_FORMAT) private[spark] def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala index 1551afd7b7bf..46cfa9dc0a0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala @@ -39,7 +39,7 @@ import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetO * * NEVER use [[DirectParquetOutputCommitter]] when appending data, because currently there's * no safe way undo a failed appending job (that's why both `abortTask()` and `abortJob()` are - * left * empty). + * left empty). */ private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) extends ParquetOutputCommitter(outputPath, context) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala deleted file mode 100644 index 6ed3580af072..000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.types.{MapData, ArrayData} - -// TODO Removes this while fixing SPARK-8848 -private[sql] object CatalystConverter { - // This is mostly Parquet convention (see, e.g., `ConversionPatterns`). - // Note that "array" for the array elements is chosen by ParquetAvro. - // Using a different value will result in Parquet silently dropping columns. - val ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME = "bag" - val ARRAY_ELEMENTS_SCHEMA_NAME = "array" - - val MAP_KEY_SCHEMA_NAME = "key" - val MAP_VALUE_SCHEMA_NAME = "value" - val MAP_SCHEMA_NAME = "map" - - // TODO: consider using Array[T] for arrays to avoid boxing of primitive types - type ArrayScalaType = ArrayData - type StructScalaType = InternalRow - type MapScalaType = MapData -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetReadSupport.scala similarity index 85% rename from sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala rename to sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetReadSupport.scala index 975fec101d9c..46418a4a8dcd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetReadSupport.scala @@ -31,7 +31,7 @@ import org.apache.spark.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.StructType -private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging { +private[parquet] class ParquetReadSupport extends ReadSupport[InternalRow] with Logging { override def prepareForRead( conf: Configuration, keyValueMetaData: JMap[String, String], @@ -39,24 +39,24 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with readContext: ReadContext): RecordMaterializer[InternalRow] = { log.debug(s"Preparing for read Parquet file with message type: $fileSchema") - val toCatalyst = new CatalystSchemaConverter(conf) + val toCatalyst = new ParquetSchemaConverter(conf) val parquetRequestedSchema = readContext.getRequestedSchema val catalystRequestedSchema = Option(readContext.getReadSupportMetadata).map(_.toMap).flatMap { metadata => metadata // First tries to read requested schema, which may result from projections - .get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA) + .get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA) // If not available, tries to read Catalyst schema from file metadata. It's only // available if the target file is written by Spark SQL. - .orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY)) + .orElse(metadata.get(ParquetReadSupport.SPARK_METADATA_KEY)) }.map(StructType.fromString).getOrElse { logDebug("Catalyst schema not available, falling back to Parquet schema") toCatalyst.convert(parquetRequestedSchema) } logDebug(s"Catalyst schema used to read Parquet files: $catalystRequestedSchema") - new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema) + new ParquetRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema) } override def init(context: InitContext): ReadContext = { @@ -64,11 +64,11 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with // If the target file was written by Spark SQL, we should be able to find a serialized Catalyst // schema of this file from its the metadata. - val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA)) + val maybeRowSchema = Option(conf.get(ParquetWriteSupport.SPARK_ROW_SCHEMA)) // Optional schema of requested columns, in the form of a string serialized from a Catalyst // `StructType` containing all requested columns. - val maybeRequestedSchema = Option(conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)) + val maybeRequestedSchema = Option(conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA)) // Below we construct a Parquet schema containing all requested columns. This schema tells // Parquet which columns to read. @@ -110,7 +110,7 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with // different physical structures. val parquetRequestedSchema = maybeRequestedSchema.fold(context.getFileSchema) { schemaString => - val toParquet = new CatalystSchemaConverter(conf) + val toParquet = new ParquetSchemaConverter(conf) val fileSchema = context.getFileSchema.asGroupType() val fileFieldNames = fileSchema.getFields.map(_.getName).toSet @@ -121,7 +121,9 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with if (fileFieldNames.contains(field.name)) { // If the field exists in the target Parquet file, extracts the field type from the // full file schema and makes a single-field Parquet schema - new MessageType("root", fileSchema.getType(field.name)) + new MessageType( + ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME, + fileSchema.getType(field.name)) } else { // Otherwise, just resorts to `CatalystSchemaConverter` toParquet.convert(StructType(Array(field))) @@ -131,22 +133,22 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with // columns. Note that it's possible that no columns are requested at all (e.g., count // some partition column of a partitioned Parquet table). That's why `fold` is used here // and always fallback to an empty Parquet schema. - .fold(new MessageType("root")) { + .fold(new MessageType(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)) { _ union _ } } val metadata = Map.empty[String, String] ++ - maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++ - maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _) + maybeRequestedSchema.map(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++ + maybeRowSchema.map(ParquetWriteSupport.SPARK_ROW_SCHEMA -> _) logInfo(s"Going to read Parquet file with these requested columns: $parquetRequestedSchema") new ReadContext(parquetRequestedSchema, metadata) } } -private[parquet] object CatalystReadSupport { +private[parquet] object ParquetReadSupport { val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema" val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRecordMaterializer.scala similarity index 90% rename from sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRecordMaterializer.scala rename to sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRecordMaterializer.scala index 84f1dccfeb78..8c1971dbe4d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRecordMaterializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRecordMaterializer.scala @@ -29,11 +29,11 @@ import org.apache.spark.sql.types.StructType * @param parquetSchema Parquet schema of the records to be read * @param catalystSchema Catalyst schema of the rows to be constructed */ -private[parquet] class CatalystRecordMaterializer( +private[parquet] class ParquetRecordMaterializer( parquetSchema: MessageType, catalystSchema: StructType) extends RecordMaterializer[InternalRow] { - private val rootConverter = new CatalystRowConverter(parquetSchema, catalystSchema, NoopUpdater) + private val rootConverter = new ParquetRowConverter(parquetSchema, catalystSchema, NoopUpdater) override def getCurrentRecord: InternalRow = rootConverter.currentRow diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index b4337a48dbd8..af5485134699 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -37,16 +37,16 @@ import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetRecordReader, _ import org.apache.parquet.schema.MessageType import org.apache.parquet.{Log => ParquetLog} -import org.apache.spark.{Logging, Partition => SparkPartition, SparkException} import org.apache.spark.broadcast.Broadcast -import org.apache.spark.rdd.{SqlNewHadoopPartition, SqlNewHadoopRDD, RDD} import org.apache.spark.rdd.RDD._ +import org.apache.spark.rdd.{RDD, SqlNewHadoopPartition, SqlNewHadoopRDD} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.PartitionSpec import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.util.{SerializableConfiguration, Utils} +import org.apache.spark.{Logging, Partition => SparkPartition, SparkException} private[sql] class DefaultSource extends HadoopFsRelationProvider { @@ -228,18 +228,13 @@ private[sql] class ParquetRelation( // bundled with `ParquetOutputFormat[Row]`. job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]]) - // TODO There's no need to use two kinds of WriteSupport - // We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and - // complex types. - val writeSupportClass = - if (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) { - classOf[MutableRowWriteSupport] - } else { - classOf[RowWriteSupport] - } + ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport]) + ParquetWriteSupport.setSchema(dataSchema, conf) - ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass) - RowWriteSupport.setSchema(dataSchema.toAttributes, conf) + // Sets flag for Parquet schema converter (converting Catalyst schema to Parquet schema) + conf.set( + SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, + sqlContext.conf.writeLegacyParquetFormat.toString) // Sets compression scheme conf.set( @@ -267,7 +262,6 @@ private[sql] class ParquetRelation( val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp - val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec // Create the function to set variable Parquet confs at both driver and executor side. val initLocalJobFuncOpt = @@ -278,8 +272,7 @@ private[sql] class ParquetRelation( useMetadataCache, parquetFilterPushDown, assumeBinaryIsString, - assumeInt96IsTimestamp, - followParquetFormatSpec) _ + assumeInt96IsTimestamp) _ // Create the function to set input paths at the driver side. val setInputPaths = ParquetRelation.initializeDriverSideJobFunc(inputFiles) _ @@ -479,10 +472,9 @@ private[sql] object ParquetRelation extends Logging { useMetadataCache: Boolean, parquetFilterPushDown: Boolean, assumeBinaryIsString: Boolean, - assumeInt96IsTimestamp: Boolean, - followParquetFormatSpec: Boolean)(job: Job): Unit = { + assumeInt96IsTimestamp: Boolean)(job: Job): Unit = { val conf = job.getConfiguration - conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName) + conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) // Try to push down filters when filter push-down is enabled. if (parquetFilterPushDown) { @@ -495,22 +487,21 @@ private[sql] object ParquetRelation extends Logging { .foreach(ParquetInputFormat.setFilterPredicate(conf, _)) } - conf.set(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, { + conf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, { val requestedSchema = StructType(requiredColumns.map(dataSchema(_))) - CatalystSchemaConverter.checkFieldNames(requestedSchema).json + ParquetSchemaConverter.checkFieldNames(requestedSchema).json }) conf.set( - RowWriteSupport.SPARK_ROW_SCHEMA, - CatalystSchemaConverter.checkFieldNames(dataSchema).json) + ParquetWriteSupport.SPARK_ROW_SCHEMA, + ParquetSchemaConverter.checkFieldNames(dataSchema).json) // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache) - // Sets flags for Parquet schema conversion + // Sets flags for Parquet schema converter (converting Parquet schema to Catalyst schema) conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString) conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp) - conf.setBoolean(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key, followParquetFormatSpec) } /** This closure sets input paths at the driver side. */ @@ -527,10 +518,10 @@ private[sql] object ParquetRelation extends Logging { footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = { def parseParquetSchema(schema: MessageType): StructType = { - val converter = new CatalystSchemaConverter( + val converter = new ParquetSchemaConverter( sqlContext.conf.isParquetBinaryAsString, sqlContext.conf.isParquetBinaryAsString, - sqlContext.conf.followParquetFormatSpec) + sqlContext.conf.writeLegacyParquetFormat) converter.convert(schema) } @@ -541,7 +532,7 @@ private[sql] object ParquetRelation extends Logging { val serializedSchema = metadata .getKeyValueMetaData .toMap - .get(CatalystReadSupport.SPARK_METADATA_KEY) + .get(ParquetReadSupport.SPARK_METADATA_KEY) if (serializedSchema.isEmpty) { // Falls back to Parquet schema if no Spark SQL schema found. Some(parseParquetSchema(metadata.getSchema)) @@ -664,7 +655,7 @@ private[sql] object ParquetRelation extends Logging { filesToTouch: Seq[FileStatus], sqlContext: SQLContext): Option[StructType] = { val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp - val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec + val writeLegacyParquetFormat = sqlContext.conf.writeLegacyParquetFormat val serializedConf = new SerializableConfiguration(sqlContext.sparkContext.hadoopConfiguration) // HACK ALERT: @@ -701,10 +692,10 @@ private[sql] object ParquetRelation extends Logging { // Converter used to convert Parquet `MessageType` to Spark SQL `StructType` val converter = - new CatalystSchemaConverter( + new ParquetSchemaConverter( assumeBinaryIsString = assumeBinaryIsString, assumeInt96IsTimestamp = assumeInt96IsTimestamp, - followParquetFormatSpec = followParquetFormatSpec) + writeLegacyParquetFormat = writeLegacyParquetFormat) footers.map { footer => ParquetRelation.readSchemaFromFooter(footer, converter) @@ -720,12 +711,12 @@ private[sql] object ParquetRelation extends Logging { * a [[StructType]] converted from the [[MessageType]] stored in this footer. */ def readSchemaFromFooter( - footer: Footer, converter: CatalystSchemaConverter): StructType = { + footer: Footer, converter: ParquetSchemaConverter): StructType = { val fileMetaData = footer.getParquetMetadata.getFileMetaData fileMetaData .getKeyValueMetaData .toMap - .get(CatalystReadSupport.SPARK_METADATA_KEY) + .get(ParquetReadSupport.SPARK_METADATA_KEY) .flatMap(deserializeSchemaString) .getOrElse(converter.convert(fileMetaData.getSchema)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRowConverter.scala similarity index 91% rename from sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRowConverter.scala rename to sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRowConverter.scala index 6938b071065c..48ab2f384867 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRowConverter.scala @@ -56,7 +56,7 @@ private[parquet] trait ParentContainerUpdater { private[parquet] object NoopUpdater extends ParentContainerUpdater /** - * A [[CatalystRowConverter]] is used to convert Parquet "structs" into Spark SQL [[InternalRow]]s. + * A [[ParquetRowConverter]] is used to convert Parquet records into Spark SQL [[InternalRow]]s. * Since any Parquet record is also a struct, this converter can also be used as root converter. * * When used as a root converter, [[NoopUpdater]] should be used since root converters don't have @@ -66,14 +66,14 @@ private[parquet] object NoopUpdater extends ParentContainerUpdater * @param catalystType Spark SQL schema that corresponds to the Parquet record type * @param updater An updater which propagates converted field values to the parent container */ -private[parquet] class CatalystRowConverter( +private[parquet] class ParquetRowConverter( parquetType: GroupType, catalystType: StructType, updater: ParentContainerUpdater) extends GroupConverter { /** - * Updater used together with field converters within a [[CatalystRowConverter]]. It propagates + * Updater used together with field converters within a [[ParquetRowConverter]]. It propagates * converted filed values to the `ordinal`-th cell in `currentRow`. */ private final class RowUpdater(row: MutableRow, ordinal: Int) extends ParentContainerUpdater { @@ -126,7 +126,7 @@ private[parquet] class CatalystRowConverter( catalystType match { case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType => - new CatalystPrimitiveConverter(updater) + new ParquetPrimitiveConverter(updater) case ByteType => new PrimitiveConverter { @@ -141,10 +141,10 @@ private[parquet] class CatalystRowConverter( } case t: DecimalType => - new CatalystDecimalConverter(t, updater) + new ParquetDecimalConverter(t, updater) case StringType => - new CatalystStringConverter(updater) + new ParquetStringConverter(updater) case TimestampType => // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that. @@ -172,13 +172,13 @@ private[parquet] class CatalystRowConverter( } case t: ArrayType => - new CatalystArrayConverter(parquetType.asGroupType(), t, updater) + new ParquetArrayConverter(parquetType.asGroupType(), t, updater) case t: MapType => - new CatalystMapConverter(parquetType.asGroupType(), t, updater) + new ParquetMapConverter(parquetType.asGroupType(), t, updater) case t: StructType => - new CatalystRowConverter(parquetType.asGroupType(), t, new ParentContainerUpdater { + new ParquetRowConverter(parquetType.asGroupType(), t, new ParentContainerUpdater { override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy()) }) @@ -186,7 +186,7 @@ private[parquet] class CatalystRowConverter( val catalystTypeForUDT = t.sqlType val nullable = parquetType.isRepetition(Repetition.OPTIONAL) val field = StructField("udt", catalystTypeForUDT, nullable) - val parquetTypeForUDT = new CatalystSchemaConverter().convertField(field) + val parquetTypeForUDT = new ParquetSchemaConverter().convertField(field) newConverter(parquetTypeForUDT, catalystTypeForUDT, updater) case _ => @@ -200,7 +200,7 @@ private[parquet] class CatalystRowConverter( * are handled by this converter. Parquet primitive types are only a subset of those of Spark * SQL. For example, BYTE, SHORT, and INT in Spark SQL are all covered by INT32 in Parquet. */ - private final class CatalystPrimitiveConverter(updater: ParentContainerUpdater) + private final class ParquetPrimitiveConverter(updater: ParentContainerUpdater) extends PrimitiveConverter { override def addBoolean(value: Boolean): Unit = updater.setBoolean(value) @@ -214,7 +214,7 @@ private[parquet] class CatalystRowConverter( /** * Parquet converter for strings. A dictionary is used to minimize string decoding cost. */ - private final class CatalystStringConverter(updater: ParentContainerUpdater) + private final class ParquetStringConverter(updater: ParentContainerUpdater) extends PrimitiveConverter { private var expandedDictionary: Array[UTF8String] = null @@ -239,7 +239,7 @@ private[parquet] class CatalystRowConverter( /** * Parquet converter for fixed-precision decimals. */ - private final class CatalystDecimalConverter( + private final class ParquetDecimalConverter( decimalType: DecimalType, updater: ParentContainerUpdater) extends PrimitiveConverter { @@ -264,7 +264,7 @@ private[parquet] class CatalystRowConverter( val scale = decimalType.scale val bytes = value.getBytes - if (precision <= 8) { + def bytesToUnscaledLong(bytes: Array[Byte]): Long = { // Constructs a `Decimal` with an unscaled `Long` value if possible. var unscaled = 0L var i = 0 @@ -275,11 +275,17 @@ private[parquet] class CatalystRowConverter( } val bits = 8 * bytes.length - unscaled = (unscaled << (64 - bits)) >> (64 - bits) + (unscaled << (64 - bits)) >> (64 - bits) + } + + if (precision <= ParquetSchemaConverter.MAX_PRECISION_FOR_INT64) { + // Constructs a `Decimal` with an unscaled `Long` value if possible. + val unscaled = bytesToUnscaledLong(bytes) Decimal(unscaled, precision, scale) } else { // Otherwise, resorts to an unscaled `BigInteger` instead. - Decimal(new BigDecimal(new BigInteger(bytes), scale), precision, scale) + val unscaled = new BigInteger(bytes) + Decimal(new BigDecimal(unscaled, scale), precision, scale) } } } @@ -302,7 +308,7 @@ private[parquet] class CatalystRowConverter( * * @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists */ - private final class CatalystArrayConverter( + private final class ParquetArrayConverter( parquetSchema: GroupType, catalystSchema: ArrayType, updater: ParentContainerUpdater) @@ -379,7 +385,7 @@ private[parquet] class CatalystRowConverter( } /** Parquet converter for maps */ - private final class CatalystMapConverter( + private final class ParquetMapConverter( parquetType: GroupType, catalystType: MapType, updater: ParentContainerUpdater) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetSchemaConverter.scala similarity index 59% rename from sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala rename to sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetSchemaConverter.scala index d43ca95b4eea..26ca6b6cc594 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetSchemaConverter.scala @@ -25,6 +25,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.parquet.schema.Type.Repetition._ import org.apache.parquet.schema._ +import org.apache.spark.sql.parquet.ParquetSchemaConverter.{MAX_PRECISION_FOR_INT32, MAX_PRECISION_FOR_INT64, maxPrecisionForBytes, minBytesForPrecision} import org.apache.spark.sql.types._ import org.apache.spark.sql.{AnalysisException, SQLConf} @@ -40,35 +41,33 @@ import org.apache.spark.sql.{AnalysisException, SQLConf} * @constructor * @param assumeBinaryIsString Whether unannotated BINARY fields should be assumed to be Spark SQL * [[StringType]] fields when converting Parquet a [[MessageType]] to Spark SQL - * [[StructType]]. + * [[StructType]]. This argument only affects Parquet read path. * @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be assumed to be Spark SQL * [[TimestampType]] fields when converting Parquet a [[MessageType]] to Spark SQL * [[StructType]]. Note that Spark SQL [[TimestampType]] is similar to Hive timestamp, which * has optional nanosecond precision, but different from `TIME_MILLS` and `TIMESTAMP_MILLIS` - * described in Parquet format spec. - * @param followParquetFormatSpec Whether to generate standard DECIMAL, LIST, and MAP structure when - * converting Spark SQL [[StructType]] to Parquet [[MessageType]]. For Spark 1.4.x and - * prior versions, Spark SQL only supports decimals with a max precision of 18 digits, and - * uses non-standard LIST and MAP structure. Note that the current Parquet format spec is - * backwards-compatible with these settings. If this argument is set to `false`, we fallback - * to old style non-standard behaviors. + * described in Parquet format spec. This argument only affects Parquet read path. + * @param writeLegacyParquetFormat Whether to use legacy Parquet format compatible with Spark 1.4 + * and prior versions when converting a Catalyst [[StructType]] to a Parquet [[MessageType]]. + * When set to false, use standard format defined in parquet-format spec. This argument only + * affects Parquet write path. */ -private[parquet] class CatalystSchemaConverter( +private[parquet] class ParquetSchemaConverter( private val assumeBinaryIsString: Boolean, private val assumeInt96IsTimestamp: Boolean, - private val followParquetFormatSpec: Boolean) { + private val writeLegacyParquetFormat: Boolean) { // Only used when constructing converter for converting Spark SQL schema to Parquet schema, in // which case `assumeInt96IsTimestamp` and `assumeBinaryIsString` are irrelevant. def this() = this( assumeBinaryIsString = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get, assumeInt96IsTimestamp = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get, - followParquetFormatSpec = SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get) + writeLegacyParquetFormat = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get) def this(conf: SQLConf) = this( assumeBinaryIsString = conf.isParquetBinaryAsString, assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp, - followParquetFormatSpec = conf.followParquetFormatSpec) + writeLegacyParquetFormat = conf.writeLegacyParquetFormat) def this(conf: Configuration) = this( assumeBinaryIsString = @@ -79,10 +78,10 @@ private[parquet] class CatalystSchemaConverter( conf.getBoolean( SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get), - followParquetFormatSpec = + writeLegacyParquetFormat = conf.getBoolean( - SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key, - SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get)) + SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, + SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get)) /** * Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]]. @@ -135,7 +134,7 @@ private[parquet] class CatalystSchemaConverter( val precision = field.getDecimalMetadata.getPrecision val scale = field.getDecimalMetadata.getScale - CatalystSchemaConverter.analysisRequire( + ParquetSchemaConverter.checkConversionRequirement( maxPrecision == -1 || 1 <= precision && precision <= maxPrecision, s"Invalid decimal precision: $typeName cannot store $precision digits (max $maxPrecision)") @@ -155,7 +154,7 @@ private[parquet] class CatalystSchemaConverter( case INT_16 => ShortType case INT_32 | null => IntegerType case DATE => DateType - case DECIMAL => makeDecimalType(maxPrecisionForBytes(4)) + case DECIMAL => makeDecimalType(MAX_PRECISION_FOR_INT32) case TIME_MILLIS => typeNotImplemented() case _ => illegalType() } @@ -163,13 +162,13 @@ private[parquet] class CatalystSchemaConverter( case INT64 => originalType match { case INT_64 | null => LongType - case DECIMAL => makeDecimalType(maxPrecisionForBytes(8)) + case DECIMAL => makeDecimalType(MAX_PRECISION_FOR_INT64) case TIMESTAMP_MILLIS => typeNotImplemented() case _ => illegalType() } case INT96 => - CatalystSchemaConverter.analysisRequire( + ParquetSchemaConverter.checkConversionRequirement( assumeInt96IsTimestamp, "INT96 is not supported unless it's interpreted as timestamp. " + s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to true.") @@ -211,11 +210,11 @@ private[parquet] class CatalystSchemaConverter( // // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists case LIST => - CatalystSchemaConverter.analysisRequire( + ParquetSchemaConverter.checkConversionRequirement( field.getFieldCount == 1, s"Invalid list type $field") val repeatedType = field.getType(0) - CatalystSchemaConverter.analysisRequire( + ParquetSchemaConverter.checkConversionRequirement( repeatedType.isRepetition(REPEATED), s"Invalid list type $field") if (isElementType(repeatedType, field.getName)) { @@ -231,17 +230,17 @@ private[parquet] class CatalystSchemaConverter( // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1 // scalastyle:on case MAP | MAP_KEY_VALUE => - CatalystSchemaConverter.analysisRequire( + ParquetSchemaConverter.checkConversionRequirement( field.getFieldCount == 1 && !field.getType(0).isPrimitive, s"Invalid map type: $field") val keyValueType = field.getType(0).asGroupType() - CatalystSchemaConverter.analysisRequire( + ParquetSchemaConverter.checkConversionRequirement( keyValueType.isRepetition(REPEATED) && keyValueType.getFieldCount == 2, s"Invalid map type: $field") val keyType = keyValueType.getType(0) - CatalystSchemaConverter.analysisRequire( + ParquetSchemaConverter.checkConversionRequirement( keyType.isPrimitive, s"Map key type is expected to be a primitive type, but found: $keyType") @@ -313,7 +312,10 @@ private[parquet] class CatalystSchemaConverter( * Converts a Spark SQL [[StructType]] to a Parquet [[MessageType]]. */ def convert(catalystSchema: StructType): MessageType = { - Types.buildMessage().addFields(catalystSchema.map(convertField): _*).named("root") + Types + .buildMessage() + .addFields(catalystSchema.map(convertField): _*) + .named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) } /** @@ -324,13 +326,9 @@ private[parquet] class CatalystSchemaConverter( } private def convertField(field: StructField, repetition: Type.Repetition): Type = { - CatalystSchemaConverter.checkFieldName(field.name) + ParquetSchemaConverter.checkFieldName(field.name) field.dataType match { - // =================== - // Simple atomic types - // =================== - case BooleanType => Types.primitive(BOOLEAN, repetition).named(field.name) @@ -361,171 +359,123 @@ private[parquet] class CatalystSchemaConverter( // NOTE: Spark SQL TimestampType is NOT a well defined type in Parquet format spec. // // As stated in PARQUET-323, Parquet `INT96` was originally introduced to represent nanosecond - // timestamp in Impala for some historical reasons, it's not recommended to be used for any - // other types and will probably be deprecated in future Parquet format spec. That's the - // reason why Parquet format spec only defines `TIMESTAMP_MILLIS` and `TIMESTAMP_MICROS` which - // are both logical types annotating `INT64`. + // timestamp in Impala for some historical reasons. It's not recommended to be used for any + // other types and will probably be deprecated in some future version of parquet-format spec. + // That's the reason why parquet-format spec only defines `TIMESTAMP_MILLIS` and + // `TIMESTAMP_MICROS` which are both logical types annotating `INT64`. // // Originally, Spark SQL uses the same nanosecond timestamp type as Impala and Hive. Starting - // from Spark 1.5.0, we resort to a timestamp type with 100 ns precision so that we can store - // a timestamp into a `Long`. This design decision is subject to change though, for example, - // we may resort to microsecond precision in the future. + // from Spark 1.5.0, we resort to microsecond timestamp type. // - // For Parquet, we plan to write all `TimestampType` value as `TIMESTAMP_MICROS`, but it's - // currently not implemented yet because parquet-mr 1.7.0 (the version we're currently using) - // hasn't implemented `TIMESTAMP_MICROS` yet. + // We plan to write all `TimestampType` values as `TIMESTAMP_MICROS`, but up to writing, the + // most recent version of parquet-mr (1.8.1) hasn't implemented `TIMESTAMP_MICROS` yet. // - // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that. + // TODO Converts to `TIMESTAMP_MICROS` once parquet-mr implements that. case TimestampType => Types.primitive(INT96, repetition).named(field.name) case BinaryType => Types.primitive(BINARY, repetition).named(field.name) - // ===================================== - // Decimals (for Spark version <= 1.4.x) - // ===================================== - - // Spark 1.4.x and prior versions only support decimals with a maximum precision of 18 and - // always store decimals in fixed-length byte arrays. To keep compatibility with these older - // versions, here we convert decimals with all precisions to `FIXED_LEN_BYTE_ARRAY` annotated - // by `DECIMAL`. - case DecimalType.Fixed(precision, scale) if !followParquetFormatSpec => - Types - .primitive(FIXED_LEN_BYTE_ARRAY, repetition) - .as(DECIMAL) - .precision(precision) - .scale(scale) - .length(CatalystSchemaConverter.minBytesForPrecision(precision)) - .named(field.name) - - // ===================================== - // Decimals (follow Parquet format spec) - // ===================================== - - // Uses INT32 for 1 <= precision <= 9 - case DecimalType.Fixed(precision, scale) - if precision <= maxPrecisionForBytes(4) && followParquetFormatSpec => - Types - .primitive(INT32, repetition) - .as(DECIMAL) - .precision(precision) - .scale(scale) - .named(field.name) - - // Uses INT64 for 1 <= precision <= 18 - case DecimalType.Fixed(precision, scale) - if precision <= maxPrecisionForBytes(8) && followParquetFormatSpec => - Types - .primitive(INT64, repetition) - .as(DECIMAL) - .precision(precision) - .scale(scale) - .named(field.name) - - // Uses FIXED_LEN_BYTE_ARRAY for all other precisions - case DecimalType.Fixed(precision, scale) if followParquetFormatSpec => - Types - .primitive(FIXED_LEN_BYTE_ARRAY, repetition) - .as(DECIMAL) - .precision(precision) - .scale(scale) - .length(CatalystSchemaConverter.minBytesForPrecision(precision)) - .named(field.name) - - // =================================================== - // ArrayType and MapType (for Spark versions <= 1.4.x) - // =================================================== - - // Spark 1.4.x and prior versions convert ArrayType with nullable elements into a 3-level - // LIST structure. This behavior mimics parquet-hive (1.6.0rc3). Note that this case is - // covered by the backwards-compatibility rules implemented in `isElementType()`. - case ArrayType(elementType, nullable @ true) if !followParquetFormatSpec => - // group (LIST) { - // optional group bag { - // repeated element; - // } - // } - ConversionPatterns.listType( - repetition, - field.name, - Types - .buildGroup(REPEATED) - // "array_element" is the name chosen by parquet-hive (1.7.0 and prior version) - .addField(convertField(StructField("array_element", elementType, nullable))) - .named(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME)) - - // Spark 1.4.x and prior versions convert ArrayType with non-nullable elements into a 2-level - // LIST structure. This behavior mimics parquet-avro (1.6.0rc3). Note that this case is - // covered by the backwards-compatibility rules implemented in `isElementType()`. - case ArrayType(elementType, nullable @ false) if !followParquetFormatSpec => - // group (LIST) { - // repeated element; - // } - ConversionPatterns.listType( - repetition, - field.name, - // "array" is the name chosen by parquet-avro (1.7.0 and prior version) - convertField(StructField("array", elementType, nullable), REPEATED)) - - // Spark 1.4.x and prior versions convert MapType into a 3-level group annotated by - // MAP_KEY_VALUE. This is covered by `convertGroupField(field: GroupType): DataType`. - case MapType(keyType, valueType, valueContainsNull) if !followParquetFormatSpec => - // group (MAP) { - // repeated group map (MAP_KEY_VALUE) { - // required key; - // value; - // } - // } - ConversionPatterns.mapType( - repetition, - field.name, - convertField(StructField("key", keyType, nullable = false)), - convertField(StructField("value", valueType, valueContainsNull))) - - // ================================================== - // ArrayType and MapType (follow Parquet format spec) - // ================================================== - - case ArrayType(elementType, containsNull) if followParquetFormatSpec => - // group (LIST) { - // repeated group list { - // element; - // } - // } - Types - .buildGroup(repetition).as(LIST) - .addField( - Types.repeatedGroup() - .addField(convertField(StructField("element", elementType, containsNull))) - .named("list")) - .named(field.name) - - case MapType(keyType, valueType, valueContainsNull) => - // group (MAP) { - // repeated group key_value { - // required key; - // value; - // } - // } - Types - .buildGroup(repetition).as(MAP) - .addField( + case DecimalType.Fixed(precision, scale) => + val builder = writeLegacyParquetFormat match { + // Standard mode, 1 <= precision <= 9, converts to INT32 based DECIMAL + case false if precision <= MAX_PRECISION_FOR_INT32 => + Types.primitive(INT32, repetition) + + // Standard mode, 10 <= precision <= 18, converts to INT64 based DECIMAL + case false if precision <= MAX_PRECISION_FOR_INT64 => + Types.primitive(INT64, repetition) + + // All other cases: + // - Standard mode, 19 <= precision <= 38, converts to FIXED_LEN_BYTE_ARRAY based DECIMAL + // - Legacy mode, 1 <= precision <= 38, converts to FIXED_LEN_BYTE_ARRAY based DECIMAL + case _ => + val numBytes = minBytesForPrecision(precision) + Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition).length(numBytes) + } + + builder.as(DECIMAL).precision(precision).scale(scale).named(field.name) + + case t: ArrayType => + val repeatedType = (writeLegacyParquetFormat, t.containsNull) match { + case (true, true) => + // Legacy mode: Spark 1.4.x and prior versions convert `ArrayType` with nullable + // elements into a 3-level `LIST` structure. This behavior is somewhat a hybrid of + // parquet-hive and parquet-avro (1.6.0rc3): the 3-level structure is similar to + // parquet-hive while the 3rd level anonymous field name "array" is from parquet-avro. + // + // group (LIST) { + // repeated group bag { | + // optional array; |- repeatedType + // } | + // } Types .repeatedGroup() - .addField(convertField(StructField("key", keyType, nullable = false))) - .addField(convertField(StructField("value", valueType, valueContainsNull))) - .named("key_value")) - .named(field.name) + .addField(convertField(StructField("array", t.elementType, t.containsNull))) + .named("bag") + + case (true, false) => + // Legacy mode: Spark 1.4.x and prior versions convert `ArrayType` with non-nullable + // elements into a 2-level `LIST` structure. This behavior mimics parquet-avro + // (1.6.0rc3). + // + // group (LIST) { + // repeated array; <- repeatedType + // } + convertField(StructField("array", t.elementType, t.containsNull), REPEATED) + + case (false, _) => + // Standard mode: + // + // group (LIST) { + // repeated group list { | + // element; |- repeatedType + // } | + // } + Types + .repeatedGroup() + .addField(convertField(StructField("element", t.elementType, t.containsNull))) + .named("list") + } - // =========== - // Other types - // =========== + Types.buildGroup(repetition).as(LIST).addField(repeatedType).named(field.name) - case StructType(fields) => - fields.foldLeft(Types.buildGroup(repetition)) { (builder, field) => - builder.addField(convertField(field)) - }.named(field.name) + case t: MapType => + val repeatedGroupBuilder = + Types + .repeatedGroup() + .addField(convertField(StructField("key", t.keyType, nullable = false))) + .addField(convertField(StructField("value", t.valueType, t.valueContainsNull))) + + val repeatedGroup = if (writeLegacyParquetFormat) { + // Legacy mode: Spark 1.4.x and prior versions convert MapType into a 3-level group + // annotated by MAP_KEY_VALUE. + // + // group (MAP) { + // repeated group map (MAP_KEY_VALUE) { | + // required key; |- repeatedGroup + // value; | + // } | + // } + repeatedGroupBuilder.as(MAP_KEY_VALUE).named("map") + } else { + // Standard mode: + // + // group (MAP) { + // repeated group key_value { | + // required key; |- repeatedGroup + // value; | + // } | + // } + repeatedGroupBuilder.named("key_value") + } + + Types.buildGroup(repetition).as(MAP).addField(repeatedGroup).named(field.name) + + case t: StructType => + val parquetFields = t.fields.map(convertField) + Types.buildGroup(repetition).addFields(parquetFields: _*).named(field.name) case udt: UserDefinedType[_] => convertField(field.copy(dataType = udt.sqlType)) @@ -534,21 +484,15 @@ private[parquet] class CatalystSchemaConverter( throw new AnalysisException(s"Unsupported data type $field.dataType") } } - - // Max precision of a decimal value stored in `numBytes` bytes - private def maxPrecisionForBytes(numBytes: Int): Int = { - Math.round( // convert double to long - Math.floor(Math.log10( // number of base-10 digits - Math.pow(2, 8 * numBytes - 1) - 1))) // max value stored in numBytes - .asInstanceOf[Int] - } } -private[parquet] object CatalystSchemaConverter { +private[parquet] object ParquetSchemaConverter { + val SPARK_PARQUET_SCHEMA_NAME = "spark_schema" + def checkFieldName(name: String): Unit = { // ,;{}()\n\t= and space are special characters in Parquet schema - analysisRequire( + checkConversionRequirement( !name.matches(".*[ ,;{}()\n\t=].*"), s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\\n\\t=". |Please use alias to rename it. @@ -560,13 +504,14 @@ private[parquet] object CatalystSchemaConverter { schema } - def analysisRequire(f: => Boolean, message: String): Unit = { + def checkConversionRequirement(f: => Boolean, message: String): Unit = { if (!f) { throw new AnalysisException(message) } } - private def computeMinBytesForPrecision(precision : Int) : Int = { + // The minimum number of bytes needed to store a decimal with a given `precision`. + val minBytesForPrecision = Array.tabulate[Int](DecimalType.MAX_PRECISION + 1) { precision => var numBytes = 1 while (math.pow(2.0, 8 * numBytes - 1) < math.pow(10.0, precision)) { numBytes += 1 @@ -574,14 +519,15 @@ private[parquet] object CatalystSchemaConverter { numBytes } - private val MIN_BYTES_FOR_PRECISION = Array.tabulate[Int](39)(computeMinBytesForPrecision) + val MAX_PRECISION_FOR_INT32 = maxPrecisionForBytes(4) - // Returns the minimum number of bytes needed to store a decimal with a given `precision`. - def minBytesForPrecision(precision : Int) : Int = { - if (precision < MIN_BYTES_FOR_PRECISION.length) { - MIN_BYTES_FOR_PRECISION(precision) - } else { - computeMinBytesForPrecision(precision) - } + val MAX_PRECISION_FOR_INT64 = maxPrecisionForBytes(8) + + // Max precision of a decimal value stored in `numBytes` bytes + def maxPrecisionForBytes(numBytes: Int): Int = { + Math.round( // convert double to long + Math.floor(Math.log10( // number of base-10 digits + Math.pow(2, 8 * numBytes - 1) - 1))) // max value stored in numBytes + .asInstanceOf[Int] } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala deleted file mode 100644 index 9cd0250f9c51..000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ /dev/null @@ -1,322 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import java.math.BigInteger -import java.nio.{ByteBuffer, ByteOrder} -import java.util.{HashMap => JHashMap} - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.column.ParquetProperties -import org.apache.parquet.hadoop.ParquetOutputFormat -import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.io.api._ - -import org.apache.spark.Logging -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String - -/** - * A `parquet.hadoop.api.WriteSupport` for Row objects. - */ -private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with Logging { - - private[parquet] var writer: RecordConsumer = null - private[parquet] var attributes: Array[Attribute] = null - - override def init(configuration: Configuration): WriteSupport.WriteContext = { - val origAttributesStr: String = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA) - val metadata = new JHashMap[String, String]() - metadata.put(CatalystReadSupport.SPARK_METADATA_KEY, origAttributesStr) - - if (attributes == null) { - attributes = ParquetTypesConverter.convertFromString(origAttributesStr).toArray - } - - log.debug(s"write support initialized for requested schema $attributes") - ParquetRelation.enableLogForwarding() - new WriteSupport.WriteContext(ParquetTypesConverter.convertFromAttributes(attributes), metadata) - } - - override def prepareForWrite(recordConsumer: RecordConsumer): Unit = { - writer = recordConsumer - log.debug(s"preparing for write with schema $attributes") - } - - override def write(record: InternalRow): Unit = { - val attributesSize = attributes.size - if (attributesSize > record.numFields) { - throw new IndexOutOfBoundsException("Trying to write more fields than contained in row " + - s"($attributesSize > ${record.numFields})") - } - - var index = 0 - writer.startMessage() - while(index < attributesSize) { - // null values indicate optional fields but we do not check currently - if (!record.isNullAt(index)) { - writer.startField(attributes(index).name, index) - writeValue(attributes(index).dataType, record.get(index, attributes(index).dataType)) - writer.endField(attributes(index).name, index) - } - index = index + 1 - } - writer.endMessage() - } - - private[parquet] def writeValue(schema: DataType, value: Any): Unit = { - if (value != null) { - schema match { - case t: UserDefinedType[_] => writeValue(t.sqlType, value) - case t @ ArrayType(_, _) => writeArray( - t, - value.asInstanceOf[CatalystConverter.ArrayScalaType]) - case t @ MapType(_, _, _) => writeMap( - t, - value.asInstanceOf[CatalystConverter.MapScalaType]) - case t @ StructType(_) => writeStruct( - t, - value.asInstanceOf[CatalystConverter.StructScalaType]) - case _ => writePrimitive(schema.asInstanceOf[AtomicType], value) - } - } - } - - private[parquet] def writePrimitive(schema: DataType, value: Any): Unit = { - if (value != null) { - schema match { - case BooleanType => writer.addBoolean(value.asInstanceOf[Boolean]) - case ByteType => writer.addInteger(value.asInstanceOf[Byte]) - case ShortType => writer.addInteger(value.asInstanceOf[Short]) - case IntegerType | DateType => writer.addInteger(value.asInstanceOf[Int]) - case LongType => writer.addLong(value.asInstanceOf[Long]) - case TimestampType => writeTimestamp(value.asInstanceOf[Long]) - case FloatType => writer.addFloat(value.asInstanceOf[Float]) - case DoubleType => writer.addDouble(value.asInstanceOf[Double]) - case StringType => writer.addBinary( - Binary.fromByteArray(value.asInstanceOf[UTF8String].getBytes)) - case BinaryType => writer.addBinary( - Binary.fromByteArray(value.asInstanceOf[Array[Byte]])) - case DecimalType.Fixed(precision, _) => - writeDecimal(value.asInstanceOf[Decimal], precision) - case _ => sys.error(s"Do not know how to writer $schema to consumer") - } - } - } - - private[parquet] def writeStruct( - schema: StructType, - struct: CatalystConverter.StructScalaType): Unit = { - if (struct != null) { - val fields = schema.fields.toArray - writer.startGroup() - var i = 0 - while(i < fields.length) { - if (!struct.isNullAt(i)) { - writer.startField(fields(i).name, i) - writeValue(fields(i).dataType, struct.get(i, fields(i).dataType)) - writer.endField(fields(i).name, i) - } - i = i + 1 - } - writer.endGroup() - } - } - - private[parquet] def writeArray( - schema: ArrayType, - array: CatalystConverter.ArrayScalaType): Unit = { - val elementType = schema.elementType - writer.startGroup() - if (array.numElements() > 0) { - if (schema.containsNull) { - writer.startField(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME, 0) - var i = 0 - while (i < array.numElements()) { - writer.startGroup() - if (!array.isNullAt(i)) { - writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0) - writeValue(elementType, array.get(i, elementType)) - writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0) - } - writer.endGroup() - i = i + 1 - } - writer.endField(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME, 0) - } else { - writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0) - var i = 0 - while (i < array.numElements()) { - writeValue(elementType, array.get(i, elementType)) - i = i + 1 - } - writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0) - } - } - writer.endGroup() - } - - private[parquet] def writeMap( - schema: MapType, - map: CatalystConverter.MapScalaType): Unit = { - writer.startGroup() - val length = map.numElements() - if (length > 0) { - writer.startField(CatalystConverter.MAP_SCHEMA_NAME, 0) - map.foreach(schema.keyType, schema.valueType, (key, value) => { - writer.startGroup() - writer.startField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0) - writeValue(schema.keyType, key) - writer.endField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0) - if (value != null) { - writer.startField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1) - writeValue(schema.valueType, value) - writer.endField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1) - } - writer.endGroup() - }) - writer.endField(CatalystConverter.MAP_SCHEMA_NAME, 0) - } - writer.endGroup() - } - - // Scratch array used to write decimals as fixed-length byte array - private[this] var reusableDecimalBytes = new Array[Byte](16) - - private[parquet] def writeDecimal(decimal: Decimal, precision: Int): Unit = { - val numBytes = CatalystSchemaConverter.minBytesForPrecision(precision) - - def longToBinary(unscaled: Long): Binary = { - var i = 0 - var shift = 8 * (numBytes - 1) - while (i < numBytes) { - reusableDecimalBytes(i) = (unscaled >> shift).toByte - i += 1 - shift -= 8 - } - Binary.fromByteArray(reusableDecimalBytes, 0, numBytes) - } - - def bigIntegerToBinary(unscaled: BigInteger): Binary = { - unscaled.toByteArray match { - case bytes if bytes.length == numBytes => - Binary.fromByteArray(bytes) - - case bytes if bytes.length <= reusableDecimalBytes.length => - val signedByte = (if (bytes.head < 0) -1 else 0).toByte - java.util.Arrays.fill(reusableDecimalBytes, 0, numBytes - bytes.length, signedByte) - System.arraycopy(bytes, 0, reusableDecimalBytes, numBytes - bytes.length, bytes.length) - Binary.fromByteArray(reusableDecimalBytes, 0, numBytes) - - case bytes => - reusableDecimalBytes = new Array[Byte](bytes.length) - bigIntegerToBinary(unscaled) - } - } - - val binary = if (numBytes <= 8) { - longToBinary(decimal.toUnscaledLong) - } else { - bigIntegerToBinary(decimal.toJavaBigDecimal.unscaledValue()) - } - - writer.addBinary(binary) - } - - // array used to write Timestamp as Int96 (fixed-length binary) - private[this] val int96buf = new Array[Byte](12) - - private[parquet] def writeTimestamp(ts: Long): Unit = { - val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(ts) - val buf = ByteBuffer.wrap(int96buf) - buf.order(ByteOrder.LITTLE_ENDIAN) - buf.putLong(timeOfDayNanos) - buf.putInt(julianDay) - writer.addBinary(Binary.fromByteArray(int96buf)) - } -} - -// Optimized for non-nested rows -private[parquet] class MutableRowWriteSupport extends RowWriteSupport { - override def write(record: InternalRow): Unit = { - val attributesSize = attributes.size - if (attributesSize > record.numFields) { - throw new IndexOutOfBoundsException("Trying to write more fields than contained in row " + - s"($attributesSize > ${record.numFields})") - } - - var index = 0 - writer.startMessage() - while(index < attributesSize) { - // null values indicate optional fields but we do not check currently - if (!record.isNullAt(index) && !record.isNullAt(index)) { - writer.startField(attributes(index).name, index) - consumeType(attributes(index).dataType, record, index) - writer.endField(attributes(index).name, index) - } - index = index + 1 - } - writer.endMessage() - } - - private def consumeType( - ctype: DataType, - record: InternalRow, - index: Int): Unit = { - ctype match { - case BooleanType => writer.addBoolean(record.getBoolean(index)) - case ByteType => writer.addInteger(record.getByte(index)) - case ShortType => writer.addInteger(record.getShort(index)) - case IntegerType | DateType => writer.addInteger(record.getInt(index)) - case LongType => writer.addLong(record.getLong(index)) - case TimestampType => writeTimestamp(record.getLong(index)) - case FloatType => writer.addFloat(record.getFloat(index)) - case DoubleType => writer.addDouble(record.getDouble(index)) - case StringType => - writer.addBinary(Binary.fromByteArray(record.getUTF8String(index).getBytes)) - case BinaryType => - writer.addBinary(Binary.fromByteArray(record.getBinary(index))) - case DecimalType.Fixed(precision, scale) => - writeDecimal(record.getDecimal(index, precision, scale), precision) - case _ => sys.error(s"Unsupported datatype $ctype, cannot write to consumer") - } - } -} - -private[parquet] object RowWriteSupport { - val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes" - - def getSchema(configuration: Configuration): Seq[Attribute] = { - val schemaString = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA) - if (schemaString == null) { - throw new RuntimeException("Missing schema!") - } - ParquetTypesConverter.convertFromString(schemaString) - } - - def setSchema(schema: Seq[Attribute], configuration: Configuration) { - val encoded = ParquetTypesConverter.convertToString(schema) - configuration.set(SPARK_ROW_SCHEMA, encoded) - configuration.set( - ParquetOutputFormat.WRITER_VERSION, - ParquetProperties.WriterVersion.PARQUET_1_0.toString) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala deleted file mode 100644 index 3854f5bd39fb..000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import java.io.IOException - -import scala.collection.JavaConversions._ -import scala.util.Try - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.mapreduce.Job -import org.apache.parquet.format.converter.ParquetMetadataConverter -import org.apache.parquet.hadoop.metadata.{FileMetaData, ParquetMetadata} -import org.apache.parquet.hadoop.util.ContextUtil -import org.apache.parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter} -import org.apache.parquet.schema.MessageType - -import org.apache.spark.Logging -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.types._ - - -private[parquet] object ParquetTypesConverter extends Logging { - def isPrimitiveType(ctype: DataType): Boolean = ctype match { - case _: NumericType | BooleanType | DateType | TimestampType | StringType | BinaryType => true - case _ => false - } - - /** - * Compute the FIXED_LEN_BYTE_ARRAY length needed to represent a given DECIMAL precision. - */ - private[parquet] val BYTES_FOR_PRECISION = Array.tabulate[Int](38) { precision => - var length = 1 - while (math.pow(2.0, 8 * length - 1) < math.pow(10.0, precision)) { - length += 1 - } - length - } - - def convertFromAttributes(attributes: Seq[Attribute]): MessageType = { - val converter = new CatalystSchemaConverter() - converter.convert(StructType.fromAttributes(attributes)) - } - - def convertFromString(string: String): Seq[Attribute] = { - Try(DataType.fromJson(string)).getOrElse(DataType.fromCaseClassString(string)) match { - case s: StructType => s.toAttributes - case other => sys.error(s"Can convert $string to row") - } - } - - def convertToString(schema: Seq[Attribute]): String = { - schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName) - StructType.fromAttributes(schema).json - } - - def writeMetaData(attributes: Seq[Attribute], origPath: Path, conf: Configuration): Unit = { - if (origPath == null) { - throw new IllegalArgumentException("Unable to write Parquet metadata: path is null") - } - val fs = origPath.getFileSystem(conf) - if (fs == null) { - throw new IllegalArgumentException( - s"Unable to write Parquet metadata: path $origPath is incorrectly formatted") - } - val path = origPath.makeQualified(fs) - if (fs.exists(path) && !fs.getFileStatus(path).isDir) { - throw new IllegalArgumentException(s"Expected to write to directory $path but found file") - } - val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE) - if (fs.exists(metadataPath)) { - try { - fs.delete(metadataPath, true) - } catch { - case e: IOException => - throw new IOException(s"Unable to delete previous PARQUET_METADATA_FILE at $metadataPath") - } - } - val extraMetadata = new java.util.HashMap[String, String]() - extraMetadata.put( - CatalystReadSupport.SPARK_METADATA_KEY, - ParquetTypesConverter.convertToString(attributes)) - // TODO: add extra data, e.g., table name, date, etc.? - - val parquetSchema: MessageType = ParquetTypesConverter.convertFromAttributes(attributes) - val metaData: FileMetaData = new FileMetaData( - parquetSchema, - extraMetadata, - "Spark") - - ParquetRelation.enableLogForwarding() - ParquetFileWriter.writeMetadataFile( - conf, - path, - new Footer(path, new ParquetMetadata(metaData, Nil)) :: Nil) - } - - /** - * Try to read Parquet metadata at the given Path. We first see if there is a summary file - * in the parent directory. If so, this is used. Else we read the actual footer at the given - * location. - * @param origPath The path at which we expect one (or more) Parquet files. - * @param configuration The Hadoop configuration to use. - * @return The `ParquetMetadata` containing among other things the schema. - */ - def readMetaData(origPath: Path, configuration: Option[Configuration]): ParquetMetadata = { - if (origPath == null) { - throw new IllegalArgumentException("Unable to read Parquet metadata: path is null") - } - val job = new Job() - val conf = configuration.getOrElse(ContextUtil.getConfiguration(job)) - val fs: FileSystem = origPath.getFileSystem(conf) - if (fs == null) { - throw new IllegalArgumentException(s"Incorrectly formatted Parquet metadata path $origPath") - } - val path = origPath.makeQualified(fs) - - val children = - fs - .globStatus(path) - .flatMap { status => if (status.isDir) fs.listStatus(status.getPath) else List(status) } - .filterNot { status => - val name = status.getPath.getName - (name(0) == '.' || name(0) == '_') && name != ParquetFileWriter.PARQUET_METADATA_FILE - } - - ParquetRelation.enableLogForwarding() - - // NOTE (lian): Parquet "_metadata" file can be very slow if the file consists of lots of row - // groups. Since Parquet schema is replicated among all row groups, we only need to touch a - // single row group to read schema related metadata. Notice that we are making assumptions that - // all data in a single Parquet file have the same schema, which is normally true. - children - // Try any non-"_metadata" file first... - .find(_.getPath.getName != ParquetFileWriter.PARQUET_METADATA_FILE) - // ... and fallback to "_metadata" if no such file exists (which implies the Parquet file is - // empty, thus normally the "_metadata" file is expected to be fairly small). - .orElse(children.find(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)) - .map(ParquetFileReader.readFooter(conf, _, ParquetMetadataConverter.NO_FILTER)) - .getOrElse( - throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path")) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetWriteSupport.scala new file mode 100644 index 000000000000..17e0aaa360fb --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetWriteSupport.scala @@ -0,0 +1,433 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parquet + +import java.nio.{ByteBuffer, ByteOrder} +import java.util + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.column.ParquetProperties +import org.apache.parquet.hadoop.ParquetOutputFormat +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.hadoop.api.WriteSupport.WriteContext +import org.apache.parquet.io.api.{Binary, RecordConsumer} + +import org.apache.spark.Logging +import org.apache.spark.sql.SQLConf +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{SpecializedGetters, SpecificMutableRow} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.parquet.ParquetSchemaConverter.{MAX_PRECISION_FOR_INT32, MAX_PRECISION_FOR_INT64, minBytesForPrecision} +import org.apache.spark.sql.types._ + +/** + * A Parquet [[WriteSupport]] implementation that writes Catalyst [[InternalRow]]s as Parquet + * messages. This class can write Parquet data in two modes: + * + * - Standard mode: Parquet data are written in standard format defined in parquet-format spec. + * - Legacy mode: Parquet data are written in legacy format compatible with Spark 1.4 and prior. + * + * This behavior can be controlled by SQL option `spark.sql.parquet.writeLegacyParquetFormat`. The + * value of the option is propagated to this class by the `init()` method and its Hadoop + * configuration argument. + */ +private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { + // A `ValueWriter` is responsible for writing a field of an `InternalRow` to the record consumer. + // Here we are using `SpecializedGetters` rather than `InternalRow` so that we can directly access + // data in `ArrayData` without the help of `SpecificMutableRow`. + private type ValueWriter = (SpecializedGetters, Int) => Unit + + // Schema of the `InternalRow`s to be written + private var schema: StructType = _ + + // `ValueWriter`s for all fields of the schema + private var rootFieldWriters: Seq[ValueWriter] = _ + + // The Parquet `RecordConsumer` to which all `InternalRow`s are written + private var recordConsumer: RecordConsumer = _ + + // Whether to write data in legacy Parquet format compatible with Spark 1.4 and prior versions + private var writeLegacyParquetFormat: Boolean = _ + + // Reusable byte array used to write timestamps as Parquet INT96 values + private val timestampBuffer = new Array[Byte](12) + + // Reusable byte array used to write decimal values + private val decimalBuffer = new Array[Byte](minBytesForPrecision(DecimalType.MAX_PRECISION)) + + override def init(configuration: Configuration): WriteContext = { + val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA) + schema = StructType.fromString(schemaString) + rootFieldWriters = schema.map(_.dataType).map(makeWriter) + writeLegacyParquetFormat = { + assert(configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key) != null) + configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean + } + + val messageType = new ParquetSchemaConverter(configuration).convert(schema) + val metadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schemaString).asJava + + logDebug( + s"""Initialized Parquet WriteSupport with Catalyst schema: + |${schema.prettyJson} + |and corresponding Parquet message type: + |$messageType + """.stripMargin) + + new WriteContext(messageType, metadata) + } + + override def prepareForWrite(recordConsumer: RecordConsumer): Unit = { + this.recordConsumer = recordConsumer + } + + override def write(row: InternalRow): Unit = { + consumeMessage(writeFields(row, schema, rootFieldWriters)) + } + + private def writeFields( + row: InternalRow, schema: StructType, fieldWriters: Seq[ValueWriter]): Unit = { + var i = 0 + while (i < row.numFields) { + if (!row.isNullAt(i)) { + consumeField(schema(i).name, i) { + fieldWriters(i).apply(row, i) + } + } + i += 1 + } + } + + private def makeWriter(dataType: DataType): ValueWriter = { + dataType match { + case BooleanType => + (row: SpecializedGetters, ordinal: Int) => + recordConsumer.addBoolean(row.getBoolean(ordinal)) + + case ByteType => + (row: SpecializedGetters, ordinal: Int) => + recordConsumer.addInteger(row.getByte(ordinal)) + + case ShortType => + (row: SpecializedGetters, ordinal: Int) => + recordConsumer.addInteger(row.getShort(ordinal)) + + case IntegerType | DateType => + (row: SpecializedGetters, ordinal: Int) => + recordConsumer.addInteger(row.getInt(ordinal)) + + case LongType => + (row: SpecializedGetters, ordinal: Int) => + recordConsumer.addLong(row.getLong(ordinal)) + + case FloatType => + (row: SpecializedGetters, ordinal: Int) => + recordConsumer.addFloat(row.getFloat(ordinal)) + + case DoubleType => + (row: SpecializedGetters, ordinal: Int) => + recordConsumer.addDouble(row.getDouble(ordinal)) + + case StringType => + (row: SpecializedGetters, ordinal: Int) => + recordConsumer.addBinary(Binary.fromByteArray(row.getUTF8String(ordinal).getBytes)) + + case TimestampType => + (row: SpecializedGetters, ordinal: Int) => { + // TODO Writes `TimestampType` values as `TIMESTAMP_MICROS` once parquet-mr implements it + // Currently we only support timestamps stored as INT96, which is compatible with Hive + // and Impala. However, INT96 is to be deprecated. We plan to support `TIMESTAMP_MICROS` + // defined in the parquet-format spec. But up until writing, the most recent parquet-mr + // version (1.8.1) hasn't implemented it yet. + + // NOTE: Starting from Spark 1.5, Spark SQL `TimestampType` only has microsecond + // precision. Nanosecond parts of timestamp values read from INT96 are simply stripped. + val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal)) + val buf = ByteBuffer.wrap(timestampBuffer) + buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay) + recordConsumer.addBinary(Binary.fromByteArray(timestampBuffer)) + } + + case BinaryType => + (row: SpecializedGetters, ordinal: Int) => + recordConsumer.addBinary(Binary.fromByteArray(row.getBinary(ordinal))) + + case DecimalType.Fixed(precision, scale) => + makeDecimalWriter(precision, scale) + + case t: StructType => + val fieldWriters = t.map(_.dataType).map(makeWriter) + (row: SpecializedGetters, ordinal: Int) => + consumeGroup(writeFields(row.getStruct(ordinal, t.length), t, fieldWriters)) + + case t: ArrayType => makeArrayWriter(t) + + case t: MapType => makeMapWriter(t) + + case t: UserDefinedType[_] => makeWriter(t.sqlType) + + // TODO Adds IntervalType support + case _ => sys.error(s"Unsupported data type $dataType.") + } + } + + private def makeDecimalWriter(precision: Int, scale: Int): ValueWriter = { + assert( + precision <= DecimalType.MAX_PRECISION, + s"Precision overflow: $precision is greater than ${DecimalType.MAX_PRECISION}") + + val numBytes = minBytesForPrecision(precision) + + val int32Writer = + (row: SpecializedGetters, ordinal: Int) => + recordConsumer.addInteger(row.getDecimal(ordinal, precision, scale).toUnscaledLong.toInt) + + val int64Writer = + (row: SpecializedGetters, ordinal: Int) => + recordConsumer.addLong(row.getDecimal(ordinal, precision, scale).toUnscaledLong) + + val binaryWriterUsingUnscaledLong = + (row: SpecializedGetters, ordinal: Int) => { + // When the precision is low enough (<= 18) to squeeze the decimal value into a `Long`, we + // can build a fixed-length byte array with length `numBytes` using the unscaled `Long` + // value and the `decimalBuffer` for better performance. + val unscaled = row.getDecimal(ordinal, precision, scale).toUnscaledLong + var i = 0 + var shift = 8 * (numBytes - 1) + + while (i < numBytes) { + decimalBuffer(i) = (unscaled >> shift).toByte + i += 1 + shift -= 8 + } + + recordConsumer.addBinary(Binary.fromByteArray(decimalBuffer, 0, numBytes)) + } + + val binaryWriterUsingUnscaledBytes = + (row: SpecializedGetters, ordinal: Int) => { + val decimal = row.getDecimal(ordinal, precision, scale) + val bytes = decimal.toJavaBigDecimal.unscaledValue().toByteArray + val fixedLengthBytes = if (bytes.length == numBytes) { + // If the length of the underlying byte array of the unscaled `BigInteger` happens to be + // `numBytes`, just reuse it, so that we don't bother copying it to `decimalBuffer`. + bytes + } else { + // Otherwise, the length must be less than `numBytes`. In this case we copy contents of + // the underlying bytes with padding sign bytes to `decimalBuffer` to form the result + // fixed-length byte array. + val signByte = if (bytes.head < 0) -1: Byte else 0: Byte + util.Arrays.fill(decimalBuffer, 0, numBytes - bytes.length, signByte) + System.arraycopy(bytes, 0, decimalBuffer, numBytes - bytes.length, bytes.length) + decimalBuffer + } + + recordConsumer.addBinary(Binary.fromByteArray(fixedLengthBytes, 0, numBytes)) + } + + writeLegacyParquetFormat match { + // Standard mode, 1 <= precision <= 9, writes as INT32 + case false if precision <= MAX_PRECISION_FOR_INT32 => int32Writer + + // Standard mode, 10 <= precision <= 18, writes as INT64 + case false if precision <= MAX_PRECISION_FOR_INT64 => int64Writer + + // Legacy mode, 1 <= precision <= 18, writes as FIXED_LEN_BYTE_ARRAY + case true if precision <= MAX_PRECISION_FOR_INT64 => binaryWriterUsingUnscaledLong + + // Either standard or legacy mode, 19 <= precision <= 38, writes as FIXED_LEN_BYTE_ARRAY + case _ => binaryWriterUsingUnscaledBytes + } + } + + def makeArrayWriter(arrayType: ArrayType): ValueWriter = { + val elementWriter = makeWriter(arrayType.elementType) + + def threeLevelArrayWriter(repeatedGroupName: String, elementFieldName: String): ValueWriter = + (row: SpecializedGetters, ordinal: Int) => { + val array = row.getArray(ordinal) + consumeGroup { + // Only creates the repeated field if the array is non-empty. + if (array.numElements() > 0) { + consumeField(repeatedGroupName, 0) { + var i = 0 + while (i < array.numElements()) { + consumeGroup { + // Only creates the element field if the current array element is not null. + if (!array.isNullAt(i)) { + consumeField(elementFieldName, 0)(elementWriter.apply(array, i)) + } + } + i += 1 + } + } + } + } + } + + def twoLevelArrayWriter(repeatedFieldName: String): ValueWriter = + (row: SpecializedGetters, ordinal: Int) => { + val array = row.getArray(ordinal) + consumeGroup { + // Only creates the repeated field if the array is non-empty. + if (array.numElements() > 0) { + consumeField(repeatedFieldName, 0) { + var i = 0 + while (i < array.numElements()) { + elementWriter.apply(array, i) + i += 1 + } + } + } + } + } + + (writeLegacyParquetFormat, arrayType.containsNull) match { + case (false, _) => + // Standard mode: + // + // group (LIST) { + // repeated group list { + // ^~~~ repeatedGroupName + // element; + // ^~~~~~~ elementFieldName + // } + // } + threeLevelArrayWriter(repeatedGroupName = "list", elementFieldName = "element") + + case (true, true) => + // Legacy mode, with nullable elements: + // + // group (LIST) { + // optional group bag { + // ^~~ repeatedGroupName + // repeated array; + // ^~~~~ elementFieldName + // } + // } + threeLevelArrayWriter(repeatedGroupName = "bag", elementFieldName = "array") + + case (true, false) => + // Legacy mode, with non-nullable elements: + // + // group (LIST) { + // repeated array; + // ^~~~~ repeatedFieldName + // } + twoLevelArrayWriter(repeatedFieldName = "array") + } + } + + private def makeMapWriter(mapType: MapType): ValueWriter = { + val keyType = mapType.keyType + val valueType = mapType.valueType + val keyWriter = makeWriter(keyType) + val valueWriter = makeWriter(valueType) + val mutableRow = new SpecificMutableRow(keyType :: valueType :: Nil) + val repeatedGroupName = if (writeLegacyParquetFormat) { + // Legacy mode: + // + // group (MAP) { + // repeated group map (MAP_KEY_VALUE) { + // ^~~ repeatedGroupName + // required key; + // value; + // } + // } + "map" + } else { + // Standard mode: + // + // group (MAP) { + // repeated group key_value { + // ^~~~~~~~~ repeatedGroupName + // required key; + // value; + // } + // } + "key_value" + } + + (row: SpecializedGetters, ordinal: Int) => { + val map = row.getMap(ordinal) + consumeGroup { + // Only creates the repeated field if the map is non-empty. + if (map.numElements() > 0) { + consumeField(repeatedGroupName, 0) { + var i = 0 + while (i < map.numElements()) { + consumeGroup { + mutableRow.update(0, map.keyArray().get(i, keyType)) + consumeField("key", 0)(keyWriter.apply(mutableRow, 0)) + // Only creates the "value" field if the value if non-empty + if (!map.valueArray().isNullAt(i)) { + mutableRow.update(1, map.valueArray().get(i, valueType)) + consumeField("value", 1)(valueWriter.apply(mutableRow, 1)) + } + } + i += 1 + } + } + } + } + } + } + + private def consumeMessage(f: => Unit): Unit = { + recordConsumer.startMessage() + f + recordConsumer.endMessage() + } + + private def consumeGroup(f: => Unit): Unit = { + recordConsumer.startGroup() + f + recordConsumer.endGroup() + } + + private def consumeField(field: String, index: Int)(f: => Unit): Unit = { + recordConsumer.startField(field, index) + f + recordConsumer.endField(field, index) + } +} + +private[parquet] object ParquetWriteSupport { + val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes" + + def setSchema(schema: StructType, configuration: Configuration): Unit = { + schema.map(_.name).foreach(ParquetSchemaConverter.checkFieldName) + configuration.set(SPARK_ROW_SCHEMA, schema.json) + configuration.set( + ParquetOutputFormat.WRITER_VERSION, + ParquetProperties.WriterVersion.PARQUET_1_0.toString) + } + + /** + * Compute the FIXED_LEN_BYTE_ARRAY length needed to represent a given DECIMAL precision. + */ + private[parquet] val BYTES_FOR_PRECISION = Array.tabulate[Int](38) { precision => + var length = 1 + while (math.pow(2.0, 8 * length - 1) < math.pow(10.0, precision)) { + length += 1 + } + length + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetCompatibilityTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetCompatibilityTest.scala index b4cdfd9e98f6..0238f0f0a9a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetCompatibilityTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetCompatibilityTest.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.parquet + import java.io.File import scala.collection.JavaConversions._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala index b415da5b8c13..f340277a9087 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala @@ -29,7 +29,7 @@ import org.apache.parquet.example.data.{Group, GroupWriter} import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.hadoop.api.WriteSupport.WriteContext import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata} -import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, ParquetOutputCommitter, ParquetWriter} +import org.apache.parquet.hadoop._ import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.{MessageType, MessageTypeParser} @@ -101,7 +101,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest { def makeDecimalRDD(decimal: DecimalType): DataFrame = sqlContext.sparkContext .parallelize(0 to 1000) - .map(i => Tuple1(i / 100.0)) + .map(i => Tuple1((i - 500) / 100.0)) .toDF() // Parquet doesn't allow column names with spaces, have to add an alias here .select($"_1" cast decimal as "dec") @@ -203,15 +203,14 @@ class ParquetIOSuite extends QueryTest with ParquetTest { } test("compression codec") { - def compressionCodecFor(path: String): String = { - val codecs = ParquetTypesConverter - .readMetaData(new Path(path), Some(configuration)) - .getBlocks - .flatMap(_.getColumns) - .map(_.getCodec.name()) - .distinct - - assert(codecs.size === 1) + def compressionCodecFor(path: String, codecName: String): String = { + val codecs = for { + footer <- readAllFootersWithoutSummaryFiles(new Path(path), configuration) + block <- footer.getParquetMetadata.getBlocks + column <- block.getColumns + } yield column.getCodec.name() + + assert(codecs.distinct === Seq(codecName)) codecs.head } @@ -221,7 +220,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest { withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) { withParquetFile(data) { path => assertResult(sqlContext.conf.parquetCompressionCodec.toUpperCase) { - compressionCodecFor(path) + compressionCodecFor(path, codec.name()) } } } @@ -277,15 +276,14 @@ class ParquetIOSuite extends QueryTest with ParquetTest { withTempPath { file => val path = new Path(file.toURI.toString) val fs = FileSystem.getLocal(configuration) - val attributes = ScalaReflection.attributesFor[(Int, String)] - ParquetTypesConverter.writeMetaData(attributes, path, configuration) + val schema = StructType.fromAttributes(ScalaReflection.attributesFor[(Int, String)]) + writeMetadata(schema, path, configuration) assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE))) assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE))) - val metaData = ParquetTypesConverter.readMetaData(path, Some(configuration)) - val actualSchema = metaData.getFileMetaData.getSchema - val expectedSchema = ParquetTypesConverter.convertFromAttributes(attributes) + val expectedSchema = new ParquetSchemaConverter(configuration).convert(schema) + val actualSchema = readFooter(path, configuration).getFileMetaData.getSchema actualSchema.checkContains(expectedSchema) expectedSchema.checkContains(actualSchema) @@ -348,7 +346,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest { """.stripMargin) withTempPath { location => - val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> sparkSchema.toString) + val extraMetadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> sparkSchema.toString) val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, "Spark") val path = new Path(location.getCanonicalPath) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala index 4a0b3b60f419..230983b24377 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala @@ -36,32 +36,29 @@ abstract class ParquetSchemaTest extends SparkFunSuite with ParquetTest { protected def testSchemaInference[T <: Product: ClassTag: TypeTag]( testName: String, messageType: String, - binaryAsString: Boolean = true, - int96AsTimestamp: Boolean = true, - followParquetFormatSpec: Boolean = false, - isThriftDerived: Boolean = false): Unit = { + binaryAsString: Boolean, + int96AsTimestamp: Boolean, + writeLegacyParquetFormat: Boolean): Unit = { testSchema( testName, StructType.fromAttributes(ScalaReflection.attributesFor[T]), messageType, binaryAsString, int96AsTimestamp, - followParquetFormatSpec, - isThriftDerived) + writeLegacyParquetFormat) } protected def testParquetToCatalyst( testName: String, sqlSchema: StructType, parquetSchema: String, - binaryAsString: Boolean = true, - int96AsTimestamp: Boolean = true, - followParquetFormatSpec: Boolean = false, - isThriftDerived: Boolean = false): Unit = { - val converter = new CatalystSchemaConverter( + binaryAsString: Boolean, + int96AsTimestamp: Boolean, + writeLegacyParquetFormat: Boolean): Unit = { + val converter = new ParquetSchemaConverter( assumeBinaryIsString = binaryAsString, assumeInt96IsTimestamp = int96AsTimestamp, - followParquetFormatSpec = followParquetFormatSpec) + writeLegacyParquetFormat = writeLegacyParquetFormat) test(s"sql <= parquet: $testName") { val actual = converter.convert(MessageTypeParser.parseMessageType(parquetSchema)) @@ -79,14 +76,13 @@ abstract class ParquetSchemaTest extends SparkFunSuite with ParquetTest { testName: String, sqlSchema: StructType, parquetSchema: String, - binaryAsString: Boolean = true, - int96AsTimestamp: Boolean = true, - followParquetFormatSpec: Boolean = false, - isThriftDerived: Boolean = false): Unit = { - val converter = new CatalystSchemaConverter( + binaryAsString: Boolean, + int96AsTimestamp: Boolean, + writeLegacyParquetFormat: Boolean): Unit = { + val converter = new ParquetSchemaConverter( assumeBinaryIsString = binaryAsString, assumeInt96IsTimestamp = int96AsTimestamp, - followParquetFormatSpec = followParquetFormatSpec) + writeLegacyParquetFormat = writeLegacyParquetFormat) test(s"sql => parquet: $testName") { val actual = converter.convert(sqlSchema) @@ -100,10 +96,9 @@ abstract class ParquetSchemaTest extends SparkFunSuite with ParquetTest { testName: String, sqlSchema: StructType, parquetSchema: String, - binaryAsString: Boolean = true, - int96AsTimestamp: Boolean = true, - followParquetFormatSpec: Boolean = false, - isThriftDerived: Boolean = false): Unit = { + binaryAsString: Boolean, + int96AsTimestamp: Boolean, + writeLegacyParquetFormat: Boolean): Unit = { testCatalystToParquet( testName, @@ -111,8 +106,7 @@ abstract class ParquetSchemaTest extends SparkFunSuite with ParquetTest { parquetSchema, binaryAsString, int96AsTimestamp, - followParquetFormatSpec, - isThriftDerived) + writeLegacyParquetFormat) testParquetToCatalyst( testName, @@ -120,8 +114,7 @@ abstract class ParquetSchemaTest extends SparkFunSuite with ParquetTest { parquetSchema, binaryAsString, int96AsTimestamp, - followParquetFormatSpec, - isThriftDerived) + writeLegacyParquetFormat) } } @@ -138,7 +131,9 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { | optional binary _6; |} """.stripMargin, - binaryAsString = false) + binaryAsString = false, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testSchemaInference[(Byte, Short, Int, Long, java.sql.Date)]( "logical integral types", @@ -150,7 +145,10 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { | required int64 _4 (INT_64); | optional int32 _5 (DATE); |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testSchemaInference[Tuple1[String]]( "string", @@ -159,7 +157,9 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { | optional binary _1 (UTF8); |} """.stripMargin, - binaryAsString = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testSchemaInference[Tuple1[String]]( "binary enum as string", @@ -167,7 +167,10 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { |message root { | optional binary _1 (ENUM); |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testSchemaInference[Tuple1[Seq[Int]]]( "non-nullable array - non-standard", @@ -177,7 +180,10 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { | repeated int32 array; | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testSchemaInference[Tuple1[Seq[Int]]]( "non-nullable array - standard", @@ -190,7 +196,9 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { | } |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testSchemaInference[Tuple1[Seq[Integer]]]( "nullable array - non-standard", @@ -198,11 +206,14 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { |message root { | optional group _1 (LIST) { | repeated group bag { - | optional int32 array_element; + | optional int32 array; | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testSchemaInference[Tuple1[Seq[Integer]]]( "nullable array - standard", @@ -215,7 +226,9 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { | } |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testSchemaInference[Tuple1[Map[Int, String]]]( "map - standard", @@ -229,7 +242,9 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { | } |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testSchemaInference[Tuple1[Map[Int, String]]]( "map - non-standard", @@ -242,7 +257,10 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testSchemaInference[Tuple1[Pair[Int, String]]]( "struct", @@ -254,7 +272,9 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { | } |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testSchemaInference[Tuple1[Map[Int, (String, Seq[(Int, Double)])]]]( "deeply nested type - non-standard", @@ -267,7 +287,7 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { | optional binary _1 (UTF8); | optional group _2 (LIST) { | repeated group bag { - | optional group array_element { + | optional group array { | required int32 _1; | required double _2; | } @@ -277,7 +297,10 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testSchemaInference[Tuple1[Map[Int, (String, Seq[(Int, Double)])]]]( "deeply nested type - standard", @@ -301,7 +324,9 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { | } |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testSchemaInference[(Option[Int], Map[Int, Option[Double]])]( "optional types", @@ -316,7 +341,9 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { | } |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) // Parquet files generated by parquet-thrift are already handled by the schema converter, but // let's leave this test here until both read path and write path are all updated. @@ -325,26 +352,28 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { // as expected from attributes testSchemaInference[( Array[Byte], Array[Byte], Array[Byte], Seq[Int], Map[Array[Byte], Seq[Int]])]( - "thrift generated parquet schema", - """ - |message root { - | optional binary _1 (UTF8); - | optional binary _2 (UTF8); - | optional binary _3 (UTF8); - | optional group _4 (LIST) { - | repeated int32 _4_tuple; - | } - | optional group _5 (MAP) { - | repeated group map (MAP_KEY_VALUE) { - | required binary key (UTF8); - | optional group value (LIST) { - | repeated int32 value_tuple; - | } - | } - | } - |} - """.stripMargin, - isThriftDerived = true) + "thrift generated parquet schema", + """ + |message root { + | optional binary _1 (UTF8); + | optional binary _2 (UTF8); + | optional binary _3 (UTF8); + | optional group _4 (LIST) { + | repeated int32 _4_tuple; + | } + | optional group _5 (MAP) { + | repeated group map (MAP_KEY_VALUE) { + | required binary key (UTF8); + | optional group value (LIST) { + | repeated int32 value_tuple; + | } + | } + | } + |} + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) } } @@ -361,8 +390,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { val jsonString = """{"type":"struct","fields":[{"name":"c1","type":"integer","nullable":false,"metadata":{}},{"name":"c2","type":"binary","nullable":true,"metadata":{}}]}""" // scalastyle:on - val fromCaseClassString = ParquetTypesConverter.convertFromString(caseClassString) - val fromJson = ParquetTypesConverter.convertFromString(jsonString) + val fromCaseClassString = StructType.fromString(caseClassString) + val fromJson = StructType.fromString(jsonString) (fromCaseClassString, fromJson).zipped.foreach { (a, b) => assert(a.name == b.name) @@ -471,7 +500,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testParquetToCatalyst( "Backwards-compatibility: LIST with nullable element type - 2", @@ -487,7 +519,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type - 1 - standard", @@ -500,7 +535,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type - 2", @@ -513,7 +551,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type - 3", @@ -524,7 +565,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | repeated int32 element; | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type - 4", @@ -545,7 +589,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type - 5 - parquet-avro style", @@ -564,7 +611,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type - 6 - parquet-thrift style", @@ -583,7 +633,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) // ======================================================= // Tests for converting Catalyst ArrayType to Parquet LIST @@ -604,7 +657,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testCatalystToParquet( "Backwards-compatibility: LIST with nullable element type - 2 - prior to 1.4.x", @@ -616,11 +671,14 @@ class ParquetSchemaSuite extends ParquetSchemaTest { """message root { | optional group f1 (LIST) { | repeated group bag { - | optional int32 array_element; + | optional int32 array; | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testCatalystToParquet( "Backwards-compatibility: LIST with non-nullable element type - 1 - standard", @@ -637,7 +695,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testCatalystToParquet( "Backwards-compatibility: LIST with non-nullable element type - 2 - prior to 1.4.x", @@ -651,7 +711,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | repeated int32 array; | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) // ==================================================== // Tests for converting Parquet Map to Catalyst MapType @@ -672,7 +735,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testParquetToCatalyst( "Backwards-compatibility: MAP with non-nullable value type - 2", @@ -689,7 +755,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testParquetToCatalyst( "Backwards-compatibility: MAP with non-nullable value type - 3 - prior to 1.4.x", @@ -706,7 +775,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testParquetToCatalyst( "Backwards-compatibility: MAP with nullable value type - 1 - standard", @@ -723,7 +795,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testParquetToCatalyst( "Backwards-compatibility: MAP with nullable value type - 2", @@ -740,7 +815,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testParquetToCatalyst( "Backwards-compatibility: MAP with nullable value type - 3 - parquet-avro style", @@ -757,7 +835,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) // ==================================================== // Tests for converting Catalyst MapType to Parquet Map @@ -779,7 +860,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testCatalystToParquet( "Backwards-compatibility: MAP with non-nullable value type - 2 - prior to 1.4.x", @@ -796,7 +879,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testCatalystToParquet( "Backwards-compatibility: MAP with nullable value type - 1 - standard", @@ -814,7 +900,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testCatalystToParquet( "Backwards-compatibility: MAP with nullable value type - 3 - prior to 1.4.x", @@ -831,7 +919,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) // ================================= // Tests for conversion for decimals @@ -844,7 +935,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | optional int32 f1 (DECIMAL(1, 0)); |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testSchema( "DECIMAL(8, 3) - standard", @@ -853,7 +946,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | optional int32 f1 (DECIMAL(8, 3)); |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testSchema( "DECIMAL(9, 3) - standard", @@ -862,7 +957,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | optional int32 f1 (DECIMAL(9, 3)); |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testSchema( "DECIMAL(18, 3) - standard", @@ -871,7 +968,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | optional int64 f1 (DECIMAL(18, 3)); |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testSchema( "DECIMAL(19, 3) - standard", @@ -880,7 +979,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | optional fixed_len_byte_array(9) f1 (DECIMAL(19, 3)); |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testSchema( "DECIMAL(1, 0) - prior to 1.4.x", @@ -888,7 +989,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { """message root { | optional fixed_len_byte_array(1) f1 (DECIMAL(1, 0)); |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testSchema( "DECIMAL(8, 3) - prior to 1.4.x", @@ -896,7 +1000,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { """message root { | optional fixed_len_byte_array(4) f1 (DECIMAL(8, 3)); |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testSchema( "DECIMAL(9, 3) - prior to 1.4.x", @@ -904,7 +1011,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { """message root { | optional fixed_len_byte_array(5) f1 (DECIMAL(9, 3)); |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testSchema( "DECIMAL(18, 3) - prior to 1.4.x", @@ -912,5 +1022,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { """message root { | optional fixed_len_byte_array(8) f1 (DECIMAL(18, 3)); |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetTest.scala index 64e94056f209..58cf28794143 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetTest.scala @@ -19,11 +19,19 @@ package org.apache.spark.sql.parquet import java.io.File +import scala.collection.JavaConverters.{iterableAsScalaIterableConverter, mapAsJavaMapConverter, seqAsJavaListConverter} import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.parquet.format.converter.ParquetMetadataConverter +import org.apache.parquet.hadoop.metadata.{BlockMetaData, FileMetaData, ParquetMetadata} +import org.apache.parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter} + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SaveMode} /** @@ -97,4 +105,27 @@ private[sql] trait ParquetTest extends SQLTestUtils { this: SparkFunSuite => assert(partDir.mkdirs(), s"Couldn't create directory $partDir") partDir } + + def writeMetadata(schema: StructType, path: Path, configuration: Configuration): Unit = { + val parquetSchema = new ParquetSchemaConverter(configuration).convert(schema) + val extraMetadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schema.json).asJava + val createdBy = s"Apache Spark ${org.apache.spark.SPARK_VERSION}" + val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, createdBy) + val parquetMetadata = new ParquetMetadata(fileMetadata, Seq.empty[BlockMetaData].asJava) + val footer = new Footer(path, parquetMetadata) + ParquetFileWriter.writeMetadataFile(configuration, path, Seq(footer).asJava) + } + + def readAllFootersWithoutSummaryFiles( + path: Path, configuration: Configuration): Seq[Footer] = { + val fs = path.getFileSystem(configuration) + ParquetFileReader.readAllFootersInParallel(configuration, fs.getFileStatus(path)).asScala.toSeq + } + + def readFooter(path: Path, configuration: Configuration): ParquetMetadata = { + ParquetFileReader.readFooter( + configuration, + new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE), + ParquetMetadataConverter.NO_FILTER) + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index f56fb96c52d3..0cb9f048c9fc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -608,7 +608,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest { val conf = Seq( HiveContext.CONVERT_METASTORE_PARQUET.key -> "false", SQLConf.PARQUET_BINARY_AS_STRING.key -> "true", - SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key -> "true") + SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false") withSQLConf(conf: _*) { sql(