From 5e72fbc93ec0783d5a440f8f70c7653f8fc39d9a Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Sat, 10 Oct 2015 15:59:52 +0900 Subject: [PATCH 1/6] [SPARK-11044][SQL] Apply the writer version if given. --- .../execution/datasources/parquet/CatalystWriteSupport.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala index 483363d2c1a2..f3ebf34d7e4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala @@ -431,6 +431,7 @@ private[parquet] object CatalystWriteSupport { configuration.set(SPARK_ROW_SCHEMA, schema.json) configuration.set( ParquetOutputFormat.WRITER_VERSION, - ParquetProperties.WriterVersion.PARQUET_1_0.toString) + configuration.get(ParquetOutputFormat.WRITER_VERSION, + ParquetProperties.WriterVersion.PARQUET_1_0.toString)) } } From 2eee7e37b6f366336cbe19bd9545f07abb13f7db Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sun, 11 Oct 2015 13:20:52 +0900 Subject: [PATCH 2/6] [SPARK-11044][SQL] Use function that sets if unset. --- .../execution/datasources/parquet/CatalystWriteSupport.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala index f3ebf34d7e4d..6862dea5e6c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala @@ -429,9 +429,8 @@ private[parquet] object CatalystWriteSupport { def setSchema(schema: StructType, configuration: Configuration): Unit = { schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName) configuration.set(SPARK_ROW_SCHEMA, schema.json) - configuration.set( + configuration.setIfUnset( ParquetOutputFormat.WRITER_VERSION, - configuration.get(ParquetOutputFormat.WRITER_VERSION, - ParquetProperties.WriterVersion.PARQUET_1_0.toString)) + ParquetProperties.WriterVersion.PARQUET_1_0.toString) } } From 2d1d343ab4a0218cfcbc6aaaa21c6fccb77397e7 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 13 Nov 2015 11:44:23 +0900 Subject: [PATCH 3/6] [SPARK-11044][SQL] Add a test checking encoding types. --- .../datasources/parquet/ParquetIOSuite.scala | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 599cf948e76a..3aa2ef153913 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.parquet import java.util.Collections +import org.apache.parquet.column.{Encoding, ParquetProperties} + import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag @@ -488,6 +490,38 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue)) } } + + test("SPARK-11044 Parquet writer version fixed as version1 ") { + + // For dictionary encoding, Parquet changes the encoding types according to its writer version + // So, this test checks the encoding types in order to ensure that the file is written with + // writer version2. + withTempPath { dir => + val clonedConf = new Configuration(hadoopConfiguration) + try { + + // Write a Parquet file with writer version 2 + hadoopConfiguration.set(ParquetOutputFormat.WRITER_VERSION, + ParquetProperties.WriterVersion.PARQUET_2_0.toString) + hadoopConfiguration.setBoolean(ParquetOutputFormat.ENABLE_DICTIONARY, true) + val path = s"${dir.getCanonicalPath}/part-r-0.parquet" + sqlContext.range(1 << 16).selectExpr("(id % 4) AS i") + .coalesce(1).write.mode("overwrite").parquet(path) + + val blockMetadata = readFooter(new Path(path), hadoopConfiguration).getBlocks.asScala.head + val columnChunkMetadata = blockMetadata.getColumns.asScala.head + + // If the file is written with version 2, this should include + // [[Encoding.RLE_DICTIONARY]] type. For version 1, it is Encoding.PLAIN_DICTIONARY + assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY)) + } finally { + + // Manually clear the hadoop configuration for other tests. + hadoopConfiguration.clear() + clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue)) + } + } + } } class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) From 7e80ad6082a9f5b53f08800bfb519a2a80632ec8 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 13 Nov 2015 11:59:40 +0900 Subject: [PATCH 4/6] [SPARK-11044][SQL] Add comments --- .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index ebf77b690489..ebc2f0eb99dc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -527,6 +527,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { // Write a Parquet file with writer version 2 hadoopConfiguration.set(ParquetOutputFormat.WRITER_VERSION, ParquetProperties.WriterVersion.PARQUET_2_0.toString) + + // By default, dictionary encoding is enabled from Parquet 1.2.0 but + // it is enabled just in case. hadoopConfiguration.setBoolean(ParquetOutputFormat.ENABLE_DICTIONARY, true) val path = s"${dir.getCanonicalPath}/part-r-0.parquet" sqlContext.range(1 << 16).selectExpr("(id % 4) AS i") @@ -540,7 +543,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY)) } finally { - // Manually clear the hadoop configuration for other tests. + // Manually clear the hadoop configuration for other tests.git hadoopConfiguration.clear() clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue)) } From 78449ec530007bbebf729c19e74364dd0e001b81 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 13 Nov 2015 14:21:45 +0900 Subject: [PATCH 5/6] [SPARK-11044][SQL] Fix typos --- .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index ebc2f0eb99dc..6684ef65e463 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -543,7 +543,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY)) } finally { - // Manually clear the hadoop configuration for other tests.git + // Manually clear the hadoop configuration for other tests. hadoopConfiguration.clear() clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue)) } From cea50348da091e5d83c14474a76d4f49e1ff3c9b Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 16 Nov 2015 09:33:28 +0900 Subject: [PATCH 6/6] [SPARK-11044][SQL] Remove empty newlines and correct comments. --- .../datasources/parquet/ParquetIOSuite.scala | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 6684ef65e463..aaf2d193f9e1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -516,15 +516,13 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } test("SPARK-11044 Parquet writer version fixed as version1 ") { - - // For dictionary encoding, Parquet changes the encoding types according to its writer version - // So, this test checks the encoding types in order to ensure that the file is written with - // writer version2. + // For dictionary encoding, Parquet changes the encoding types according to its writer + // version. So, this test checks one of the encoding types in order to ensure that + // the file is written with writer version2. withTempPath { dir => val clonedConf = new Configuration(hadoopConfiguration) try { - - // Write a Parquet file with writer version 2 + // Write a Parquet file with writer version2. hadoopConfiguration.set(ParquetOutputFormat.WRITER_VERSION, ParquetProperties.WriterVersion.PARQUET_2_0.toString) @@ -538,11 +536,10 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { val blockMetadata = readFooter(new Path(path), hadoopConfiguration).getBlocks.asScala.head val columnChunkMetadata = blockMetadata.getColumns.asScala.head - // If the file is written with version 2, this should include - // [[Encoding.RLE_DICTIONARY]] type. For version 1, it is Encoding.PLAIN_DICTIONARY + // If the file is written with version2, this should include + // Encoding.RLE_DICTIONARY type. For version1, it is Encoding.PLAIN_DICTIONARY assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY)) } finally { - // Manually clear the hadoop configuration for other tests. hadoopConfiguration.clear() clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))