From 7d94924a7e36c2934595ed0043b7887dad797450 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 1 Sep 2015 21:48:04 +0800 Subject: [PATCH 1/2] Deprecates SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC --- .../scala/org/apache/spark/sql/SQLConf.scala | 21 +- .../parquet/CatalystReadSupport.scala | 2 +- .../parquet/CatalystSchemaConverter.scala | 61 ++-- .../datasources/parquet/ParquetRelation.scala | 14 +- .../parquet/ParquetSchemaSuite.scala | 289 ++++++++++++------ .../apache/spark/sql/hive/parquetSuites.scala | 2 +- 6 files changed, 241 insertions(+), 148 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 9de75f4c4d08..cf04e1d5e5d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -290,13 +290,20 @@ private[spark] object SQLConf { defaultValue = Some(true), doc = "Enables Parquet filter push-down optimization when set to true.") - val PARQUET_FOLLOW_PARQUET_FORMAT_SPEC = booleanConf( - key = "spark.sql.parquet.followParquetFormatSpec", - defaultValue = Some(false), + val PARQUET_WRITE_LEGACY_FORMAT = booleanConf( + key = "spark.sql.parquet.writeLegacyFormat", + defaultValue = Some(true), doc = "Whether to follow Parquet's format specification when converting Parquet schema to " + "Spark SQL schema and vice versa.", isPublic = false) + @deprecated(s"use ${PARQUET_WRITE_LEGACY_FORMAT.key} instead", "1.6.0") + val PARQUET_FOLLOW_PARQUET_FORMAT_SPEC = booleanConf( + key = "spark.sql.parquet.followParquetFormatSpec", + defaultValue = Some(!PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get), + doc = s"Deprecated, use ${PARQUET_WRITE_LEGACY_FORMAT.key} instead.", + isPublic = false) + val PARQUET_OUTPUT_COMMITTER_CLASS = stringConf( key = "spark.sql.parquet.output.committer.class", defaultValue = Some(classOf[ParquetOutputCommitter].getName), @@ -304,8 +311,7 @@ private[spark] object SQLConf { "subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " + "of org.apache.parquet.hadoop.ParquetOutputCommitter. NOTE: 1. Instead of SQLConf, this " + "option must be set in Hadoop Configuration. 2. This option overrides " + - "\"spark.sql.sources.outputCommitterClass\"." - ) + "\"spark.sql.sources.outputCommitterClass\".") val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown", defaultValue = Some(false), @@ -497,7 +503,10 @@ private[sql] class SQLConf extends Serializable with CatalystConf { private[spark] def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP) - private[spark] def followParquetFormatSpec: Boolean = getConf(PARQUET_FOLLOW_PARQUET_FORMAT_SPEC) + private[spark] def writeLegacyParquetFormat: Boolean = getConf(PARQUET_WRITE_LEGACY_FORMAT) + + @deprecated("use SQLConf.writeLegacyParquetFormat", "1.6.0") + private[spark] def followParquetFormatSpec: Boolean = !writeLegacyParquetFormat private[spark] def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala index 8c819f1a48cd..b32897ee0f49 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala @@ -265,7 +265,7 @@ private[parquet] object CatalystReadSupport { private def clipParquetGroupFields( parquetRecord: GroupType, structType: StructType): Seq[Type] = { val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap - val toParquet = new CatalystSchemaConverter(followParquetFormatSpec = true) + val toParquet = new CatalystSchemaConverter(writeLegacyParquetFormat = false) structType.map { f => parquetFieldMap .get(f.name) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala index 2d237da81c20..cd666f0020af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala @@ -41,34 +41,31 @@ import org.apache.spark.sql.{AnalysisException, SQLConf} * @constructor * @param assumeBinaryIsString Whether unannotated BINARY fields should be assumed to be Spark SQL * [[StringType]] fields when converting Parquet a [[MessageType]] to Spark SQL - * [[StructType]]. + * [[StructType]]. This argument only affects Parquet read path. * @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be assumed to be Spark SQL * [[TimestampType]] fields when converting Parquet a [[MessageType]] to Spark SQL * [[StructType]]. Note that Spark SQL [[TimestampType]] is similar to Hive timestamp, which * has optional nanosecond precision, but different from `TIME_MILLS` and `TIMESTAMP_MILLIS` - * described in Parquet format spec. - * @param followParquetFormatSpec Whether to generate standard DECIMAL, LIST, and MAP structure when - * converting Spark SQL [[StructType]] to Parquet [[MessageType]]. For Spark 1.4.x and - * prior versions, Spark SQL only supports decimals with a max precision of 18 digits, and - * uses non-standard LIST and MAP structure. Note that the current Parquet format spec is - * backwards-compatible with these settings. If this argument is set to `false`, we fallback - * to old style non-standard behaviors. + * described in Parquet format spec. This argument only affects Parquet read path. + * @param writeLegacyParquetFormat Whether to use legacy Parquet format compatible with Spark 1.4 + * and prior versions when converting a Catalyst [[StructType]] to a Parquet [[MessageType]]. + * When set to false, use standard format defined in parquet-format spec. This argument only + * affects Parquet write path. */ private[parquet] class CatalystSchemaConverter( assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get, assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get, - followParquetFormatSpec: Boolean = SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get -) { + writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get) { def this(conf: SQLConf) = this( assumeBinaryIsString = conf.isParquetBinaryAsString, assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp, - followParquetFormatSpec = conf.followParquetFormatSpec) + writeLegacyParquetFormat = conf.writeLegacyParquetFormat) def this(conf: Configuration) = this( assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean, assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean, - followParquetFormatSpec = conf.get(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key).toBoolean) + writeLegacyParquetFormat = conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean) /** * Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]]. @@ -371,15 +368,15 @@ private[parquet] class CatalystSchemaConverter( case BinaryType => Types.primitive(BINARY, repetition).named(field.name) - // ===================================== - // Decimals (for Spark version <= 1.4.x) - // ===================================== + // ====================== + // Decimals (legacy mode) + // ====================== // Spark 1.4.x and prior versions only support decimals with a maximum precision of 18 and // always store decimals in fixed-length byte arrays. To keep compatibility with these older // versions, here we convert decimals with all precisions to `FIXED_LEN_BYTE_ARRAY` annotated // by `DECIMAL`. - case DecimalType.Fixed(precision, scale) if !followParquetFormatSpec => + case DecimalType.Fixed(precision, scale) if writeLegacyParquetFormat => Types .primitive(FIXED_LEN_BYTE_ARRAY, repetition) .as(DECIMAL) @@ -388,13 +385,13 @@ private[parquet] class CatalystSchemaConverter( .length(CatalystSchemaConverter.minBytesForPrecision(precision)) .named(field.name) - // ===================================== - // Decimals (follow Parquet format spec) - // ===================================== + // ======================== + // Decimals (standard mode) + // ======================== // Uses INT32 for 1 <= precision <= 9 case DecimalType.Fixed(precision, scale) - if precision <= MAX_PRECISION_FOR_INT32 && followParquetFormatSpec => + if precision <= MAX_PRECISION_FOR_INT32 && !writeLegacyParquetFormat => Types .primitive(INT32, repetition) .as(DECIMAL) @@ -404,7 +401,7 @@ private[parquet] class CatalystSchemaConverter( // Uses INT64 for 1 <= precision <= 18 case DecimalType.Fixed(precision, scale) - if precision <= MAX_PRECISION_FOR_INT64 && followParquetFormatSpec => + if precision <= MAX_PRECISION_FOR_INT64 && !writeLegacyParquetFormat => Types .primitive(INT64, repetition) .as(DECIMAL) @@ -413,7 +410,7 @@ private[parquet] class CatalystSchemaConverter( .named(field.name) // Uses FIXED_LEN_BYTE_ARRAY for all other precisions - case DecimalType.Fixed(precision, scale) if followParquetFormatSpec => + case DecimalType.Fixed(precision, scale) if !writeLegacyParquetFormat => Types .primitive(FIXED_LEN_BYTE_ARRAY, repetition) .as(DECIMAL) @@ -422,15 +419,15 @@ private[parquet] class CatalystSchemaConverter( .length(CatalystSchemaConverter.minBytesForPrecision(precision)) .named(field.name) - // =================================================== - // ArrayType and MapType (for Spark versions <= 1.4.x) - // =================================================== + // =================================== + // ArrayType and MapType (legacy mode) + // =================================== // Spark 1.4.x and prior versions convert `ArrayType` with nullable elements into a 3-level // `LIST` structure. This behavior is somewhat a hybrid of parquet-hive and parquet-avro // (1.6.0rc3): the 3-level structure is similar to parquet-hive while the 3rd level element // field name "array" is borrowed from parquet-avro. - case ArrayType(elementType, nullable @ true) if !followParquetFormatSpec => + case ArrayType(elementType, nullable @ true) if writeLegacyParquetFormat => // group (LIST) { // optional group bag { // repeated array; @@ -448,7 +445,7 @@ private[parquet] class CatalystSchemaConverter( // Spark 1.4.x and prior versions convert ArrayType with non-nullable elements into a 2-level // LIST structure. This behavior mimics parquet-avro (1.6.0rc3). Note that this case is // covered by the backwards-compatibility rules implemented in `isElementType()`. - case ArrayType(elementType, nullable @ false) if !followParquetFormatSpec => + case ArrayType(elementType, nullable @ false) if writeLegacyParquetFormat => // group (LIST) { // repeated element; // } @@ -460,7 +457,7 @@ private[parquet] class CatalystSchemaConverter( // Spark 1.4.x and prior versions convert MapType into a 3-level group annotated by // MAP_KEY_VALUE. This is covered by `convertGroupField(field: GroupType): DataType`. - case MapType(keyType, valueType, valueContainsNull) if !followParquetFormatSpec => + case MapType(keyType, valueType, valueContainsNull) if writeLegacyParquetFormat => // group (MAP) { // repeated group map (MAP_KEY_VALUE) { // required key; @@ -473,11 +470,11 @@ private[parquet] class CatalystSchemaConverter( convertField(StructField("key", keyType, nullable = false)), convertField(StructField("value", valueType, valueContainsNull))) - // ================================================== - // ArrayType and MapType (follow Parquet format spec) - // ================================================== + // ===================================== + // ArrayType and MapType (standard mode) + // ===================================== - case ArrayType(elementType, containsNull) if followParquetFormatSpec => + case ArrayType(elementType, containsNull) if !writeLegacyParquetFormat => // group (LIST) { // repeated group list { // element; diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 953fcab12697..8a9c0e733a9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -287,7 +287,7 @@ private[sql] class ParquetRelation( val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp - val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec + val writeLegacyParquetFormat = sqlContext.conf.writeLegacyParquetFormat // Parquet row group size. We will use this value as the value for // mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value @@ -305,7 +305,7 @@ private[sql] class ParquetRelation( parquetFilterPushDown, assumeBinaryIsString, assumeInt96IsTimestamp, - followParquetFormatSpec) _ + writeLegacyParquetFormat) _ // Create the function to set input paths at the driver side. val setInputPaths = @@ -531,7 +531,7 @@ private[sql] object ParquetRelation extends Logging { parquetFilterPushDown: Boolean, assumeBinaryIsString: Boolean, assumeInt96IsTimestamp: Boolean, - followParquetFormatSpec: Boolean)(job: Job): Unit = { + writeLegacyParquetFormat: Boolean)(job: Job): Unit = { val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job) conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName) @@ -561,7 +561,7 @@ private[sql] object ParquetRelation extends Logging { // Sets flags for Parquet schema conversion conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString) conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp) - conf.setBoolean(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key, followParquetFormatSpec) + conf.setBoolean(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, writeLegacyParquetFormat) overrideMinSplitSize(parquetBlockSize, conf) } @@ -586,7 +586,7 @@ private[sql] object ParquetRelation extends Logging { val converter = new CatalystSchemaConverter( sqlContext.conf.isParquetBinaryAsString, sqlContext.conf.isParquetBinaryAsString, - sqlContext.conf.followParquetFormatSpec) + sqlContext.conf.writeLegacyParquetFormat) converter.convert(schema) } @@ -720,7 +720,7 @@ private[sql] object ParquetRelation extends Logging { filesToTouch: Seq[FileStatus], sqlContext: SQLContext): Option[StructType] = { val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp - val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec + val writeLegacyParquetFormat = sqlContext.conf.writeLegacyParquetFormat val serializedConf = new SerializableConfiguration(sqlContext.sparkContext.hadoopConfiguration) // !! HACK ALERT !! @@ -760,7 +760,7 @@ private[sql] object ParquetRelation extends Logging { new CatalystSchemaConverter( assumeBinaryIsString = assumeBinaryIsString, assumeInt96IsTimestamp = assumeInt96IsTimestamp, - followParquetFormatSpec = followParquetFormatSpec) + writeLegacyParquetFormat = writeLegacyParquetFormat) footers.map { footer => ParquetRelation.readSchemaFromFooter(footer, converter) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 5a8f772c3228..f17fb36f25fe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -22,7 +22,6 @@ import scala.reflect.runtime.universe.TypeTag import org.apache.parquet.schema.MessageTypeParser -import org.apache.spark.sql.SQLConf import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -35,32 +34,29 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext { protected def testSchemaInference[T <: Product: ClassTag: TypeTag]( testName: String, messageType: String, - binaryAsString: Boolean = true, - int96AsTimestamp: Boolean = true, - followParquetFormatSpec: Boolean = false, - isThriftDerived: Boolean = false): Unit = { + binaryAsString: Boolean, + int96AsTimestamp: Boolean, + writeLegacyParquetFormat: Boolean): Unit = { testSchema( testName, StructType.fromAttributes(ScalaReflection.attributesFor[T]), messageType, binaryAsString, int96AsTimestamp, - followParquetFormatSpec, - isThriftDerived) + writeLegacyParquetFormat) } protected def testParquetToCatalyst( testName: String, sqlSchema: StructType, parquetSchema: String, - binaryAsString: Boolean = true, - int96AsTimestamp: Boolean = true, - followParquetFormatSpec: Boolean = false, - isThriftDerived: Boolean = false): Unit = { + binaryAsString: Boolean, + int96AsTimestamp: Boolean, + writeLegacyParquetFormat: Boolean): Unit = { val converter = new CatalystSchemaConverter( assumeBinaryIsString = binaryAsString, assumeInt96IsTimestamp = int96AsTimestamp, - followParquetFormatSpec = followParquetFormatSpec) + writeLegacyParquetFormat = writeLegacyParquetFormat) test(s"sql <= parquet: $testName") { val actual = converter.convert(MessageTypeParser.parseMessageType(parquetSchema)) @@ -78,14 +74,13 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext { testName: String, sqlSchema: StructType, parquetSchema: String, - binaryAsString: Boolean = true, - int96AsTimestamp: Boolean = true, - followParquetFormatSpec: Boolean = false, - isThriftDerived: Boolean = false): Unit = { + binaryAsString: Boolean, + int96AsTimestamp: Boolean, + writeLegacyParquetFormat: Boolean): Unit = { val converter = new CatalystSchemaConverter( assumeBinaryIsString = binaryAsString, assumeInt96IsTimestamp = int96AsTimestamp, - followParquetFormatSpec = followParquetFormatSpec) + writeLegacyParquetFormat = writeLegacyParquetFormat) test(s"sql => parquet: $testName") { val actual = converter.convert(sqlSchema) @@ -99,10 +94,9 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext { testName: String, sqlSchema: StructType, parquetSchema: String, - binaryAsString: Boolean = true, - int96AsTimestamp: Boolean = true, - followParquetFormatSpec: Boolean = false, - isThriftDerived: Boolean = false): Unit = { + binaryAsString: Boolean, + int96AsTimestamp: Boolean, + writeLegacyParquetFormat: Boolean): Unit = { testCatalystToParquet( testName, @@ -110,8 +104,7 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext { parquetSchema, binaryAsString, int96AsTimestamp, - followParquetFormatSpec, - isThriftDerived) + writeLegacyParquetFormat) testParquetToCatalyst( testName, @@ -119,8 +112,7 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSQLContext { parquetSchema, binaryAsString, int96AsTimestamp, - followParquetFormatSpec, - isThriftDerived) + writeLegacyParquetFormat) } } @@ -137,7 +129,9 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { | optional binary _6; |} """.stripMargin, - binaryAsString = false) + binaryAsString = false, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testSchemaInference[(Byte, Short, Int, Long, java.sql.Date)]( "logical integral types", @@ -149,7 +143,10 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { | required int64 _4 (INT_64); | optional int32 _5 (DATE); |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testSchemaInference[Tuple1[String]]( "string", @@ -158,7 +155,9 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { | optional binary _1 (UTF8); |} """.stripMargin, - binaryAsString = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testSchemaInference[Tuple1[String]]( "binary enum as string", @@ -166,7 +165,10 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { |message root { | optional binary _1 (ENUM); |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testSchemaInference[Tuple1[Seq[Int]]]( "non-nullable array - non-standard", @@ -176,7 +178,10 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { | repeated int32 array; | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testSchemaInference[Tuple1[Seq[Int]]]( "non-nullable array - standard", @@ -189,7 +194,9 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { | } |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testSchemaInference[Tuple1[Seq[Integer]]]( "nullable array - non-standard", @@ -201,7 +208,10 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testSchemaInference[Tuple1[Seq[Integer]]]( "nullable array - standard", @@ -214,7 +224,9 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { | } |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testSchemaInference[Tuple1[Map[Int, String]]]( "map - standard", @@ -228,7 +240,9 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { | } |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testSchemaInference[Tuple1[Map[Int, String]]]( "map - non-standard", @@ -241,7 +255,10 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testSchemaInference[Tuple1[Pair[Int, String]]]( "struct", @@ -253,7 +270,9 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { | } |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testSchemaInference[Tuple1[Map[Int, (String, Seq[(Int, Double)])]]]( "deeply nested type - non-standard", @@ -276,7 +295,10 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testSchemaInference[Tuple1[Map[Int, (String, Seq[(Int, Double)])]]]( "deeply nested type - standard", @@ -300,7 +322,9 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { | } |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testSchemaInference[(Option[Int], Map[Int, Option[Double]])]( "optional types", @@ -315,36 +339,9 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { | } |} """.stripMargin, - followParquetFormatSpec = true) - - // Parquet files generated by parquet-thrift are already handled by the schema converter, but - // let's leave this test here until both read path and write path are all updated. - ignore("thrift generated parquet schema") { - // Test for SPARK-4520 -- ensure that thrift generated parquet schema is generated - // as expected from attributes - testSchemaInference[( - Array[Byte], Array[Byte], Array[Byte], Seq[Int], Map[Array[Byte], Seq[Int]])]( - "thrift generated parquet schema", - """ - |message root { - | optional binary _1 (UTF8); - | optional binary _2 (UTF8); - | optional binary _3 (UTF8); - | optional group _4 (LIST) { - | repeated int32 _4_tuple; - | } - | optional group _5 (MAP) { - | repeated group map (MAP_KEY_VALUE) { - | required binary key (UTF8); - | optional group value (LIST) { - | repeated int32 value_tuple; - | } - | } - | } - |} - """.stripMargin, - isThriftDerived = true) - } + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) } class ParquetSchemaSuite extends ParquetSchemaTest { @@ -470,7 +467,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testParquetToCatalyst( "Backwards-compatibility: LIST with nullable element type - 2", @@ -486,7 +486,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type - 1 - standard", @@ -499,7 +502,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type - 2", @@ -512,7 +518,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type - 3", @@ -523,7 +532,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | repeated int32 element; | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type - 4", @@ -544,7 +556,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type - 5 - parquet-avro style", @@ -563,7 +578,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type - 6 - parquet-thrift style", @@ -582,7 +600,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type 7 - " + @@ -592,7 +613,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { """message root { | repeated int32 f1; |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testParquetToCatalyst( "Backwards-compatibility: LIST with non-nullable element type 8 - " + @@ -612,7 +636,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | required int32 c2; | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) // ======================================================= // Tests for converting Catalyst ArrayType to Parquet LIST @@ -633,7 +660,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testCatalystToParquet( "Backwards-compatibility: LIST with nullable element type - 2 - prior to 1.4.x", @@ -649,7 +678,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testCatalystToParquet( "Backwards-compatibility: LIST with non-nullable element type - 1 - standard", @@ -666,7 +698,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testCatalystToParquet( "Backwards-compatibility: LIST with non-nullable element type - 2 - prior to 1.4.x", @@ -680,7 +714,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | repeated int32 array; | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) // ==================================================== // Tests for converting Parquet Map to Catalyst MapType @@ -701,7 +738,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testParquetToCatalyst( "Backwards-compatibility: MAP with non-nullable value type - 2", @@ -718,7 +758,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testParquetToCatalyst( "Backwards-compatibility: MAP with non-nullable value type - 3 - prior to 1.4.x", @@ -735,7 +778,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testParquetToCatalyst( "Backwards-compatibility: MAP with nullable value type - 1 - standard", @@ -752,7 +798,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testParquetToCatalyst( "Backwards-compatibility: MAP with nullable value type - 2", @@ -769,7 +818,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testParquetToCatalyst( "Backwards-compatibility: MAP with nullable value type - 3 - parquet-avro style", @@ -786,7 +838,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) // ==================================================== // Tests for converting Catalyst MapType to Parquet Map @@ -808,7 +863,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testCatalystToParquet( "Backwards-compatibility: MAP with non-nullable value type - 2 - prior to 1.4.x", @@ -825,7 +882,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testCatalystToParquet( "Backwards-compatibility: MAP with nullable value type - 1 - standard", @@ -843,7 +903,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testCatalystToParquet( "Backwards-compatibility: MAP with nullable value type - 3 - prior to 1.4.x", @@ -860,7 +922,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | } | } |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) // ================================= // Tests for conversion for decimals @@ -873,7 +938,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | optional int32 f1 (DECIMAL(1, 0)); |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testSchema( "DECIMAL(8, 3) - standard", @@ -882,7 +949,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | optional int32 f1 (DECIMAL(8, 3)); |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testSchema( "DECIMAL(9, 3) - standard", @@ -891,7 +960,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | optional int32 f1 (DECIMAL(9, 3)); |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testSchema( "DECIMAL(18, 3) - standard", @@ -900,7 +971,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | optional int64 f1 (DECIMAL(18, 3)); |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testSchema( "DECIMAL(19, 3) - standard", @@ -909,7 +982,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest { | optional fixed_len_byte_array(9) f1 (DECIMAL(19, 3)); |} """.stripMargin, - followParquetFormatSpec = true) + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = false) testSchema( "DECIMAL(1, 0) - prior to 1.4.x", @@ -917,7 +992,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { """message root { | optional fixed_len_byte_array(1) f1 (DECIMAL(1, 0)); |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testSchema( "DECIMAL(8, 3) - prior to 1.4.x", @@ -925,7 +1003,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { """message root { | optional fixed_len_byte_array(4) f1 (DECIMAL(8, 3)); |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testSchema( "DECIMAL(9, 3) - prior to 1.4.x", @@ -933,7 +1014,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { """message root { | optional fixed_len_byte_array(5) f1 (DECIMAL(9, 3)); |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) testSchema( "DECIMAL(18, 3) - prior to 1.4.x", @@ -941,7 +1025,10 @@ class ParquetSchemaSuite extends ParquetSchemaTest { """message root { | optional fixed_len_byte_array(8) f1 (DECIMAL(18, 3)); |} - """.stripMargin) + """.stripMargin, + binaryAsString = true, + int96AsTimestamp = true, + writeLegacyParquetFormat = true) private def testSchemaClipping( testName: String, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 6842ec2b5eb3..d09dd6ad5819 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -620,7 +620,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest { val conf = Seq( HiveContext.CONVERT_METASTORE_PARQUET.key -> "false", SQLConf.PARQUET_BINARY_AS_STRING.key -> "true", - SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key -> "true") + SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false") withSQLConf(conf: _*) { sql( From 85bbfde13f1b7cfdff56e766d1d7ec897adf5d0e Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 2 Sep 2015 15:11:18 +0800 Subject: [PATCH 2/2] Removes instead of deprecates the old option --- .../src/main/scala/org/apache/spark/sql/SQLConf.scala | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index cf04e1d5e5d2..75f2979951ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -297,13 +297,6 @@ private[spark] object SQLConf { "Spark SQL schema and vice versa.", isPublic = false) - @deprecated(s"use ${PARQUET_WRITE_LEGACY_FORMAT.key} instead", "1.6.0") - val PARQUET_FOLLOW_PARQUET_FORMAT_SPEC = booleanConf( - key = "spark.sql.parquet.followParquetFormatSpec", - defaultValue = Some(!PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get), - doc = s"Deprecated, use ${PARQUET_WRITE_LEGACY_FORMAT.key} instead.", - isPublic = false) - val PARQUET_OUTPUT_COMMITTER_CLASS = stringConf( key = "spark.sql.parquet.output.committer.class", defaultValue = Some(classOf[ParquetOutputCommitter].getName), @@ -505,9 +498,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf { private[spark] def writeLegacyParquetFormat: Boolean = getConf(PARQUET_WRITE_LEGACY_FORMAT) - @deprecated("use SQLConf.writeLegacyParquetFormat", "1.6.0") - private[spark] def followParquetFormatSpec: Boolean = !writeLegacyParquetFormat - private[spark] def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING) private[spark] def columnNameOfCorruptRecord: String = getConf(COLUMN_NAME_OF_CORRUPT_RECORD)