From 870a37a0ed2ce3af0a1507c529620a160e854ece Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 12 Nov 2015 19:17:53 +0900 Subject: [PATCH 1/7] [SPARK-11694][SQL] Parquet logical types are not being tested properly --- .../datasources/parquet/ParquetIOSuite.scala | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 72744799897b..a84018877ca2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -91,6 +91,32 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } } + test("SPARK-11694 Parquet logical types are not being tested properly") { + val parquetSchema = MessageTypeParser.parseMessageType( + """message root { + | required int32 a(INT_8); + | required int32 b(INT_16); + | required int32 c(DATE); + | required int32 d(DECIMAL(1,0)); + | required int64 e(DECIMAL(10,0)); + |} + """.stripMargin) + + withTempPath { location => + val extraMetadata = Map.empty[String, String].asJava + val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, "Spark") + val path = new Path(location.getCanonicalPath) + val footer = List( + new Footer(path, new ParquetMetadata(fileMetadata, Collections.emptyList())) + ).asJava + + ParquetFileWriter.writeMetadataFile(sparkContext.hadoopConfiguration, path, footer) + val sparkTypes = sqlContext.read.parquet(path.toString).schema.map(_.dataType) + + assert(sparkTypes == + Seq(ByteType, ShortType, DateType, DecimalType(1, 0), DecimalType(10, 0))) + } + } + test("string") { val data = (1 to 4).map(i => Tuple1(i.toString)) // Property spark.sql.parquet.binaryAsString shouldn't affect Parquet files written by Spark SQL From 9a5c2a303616f809046817d5b90d088d42734239 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Thu, 12 Nov 2015 22:09:14 +0900 Subject: [PATCH 2/7] [SPARK-11694][SQL] Remove mistakenly added characters. --- .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index a84018877ca2..3505e0e6dd7b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -99,7 +99,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { | required int32 c(DATE); | required int32 d(DECIMAL(1,0)); | required int64 e(DECIMAL(10,0)); - |} + """.stripMargin) + |} + """.stripMargin) withTempPath { location => val extraMetadata = Map.empty[String, String].asJava From 02f3ef95fd9d6b155291a3516ec826c54be16d30 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 13 Nov 2015 09:48:26 +0900 Subject: [PATCH 3/7] [SPARK-11694][SQL] Overload writeMetaFile() and Add all the logical types for test --- .../datasources/parquet/ParquetIOSuite.scala | 36 ++++++++----------- .../datasources/parquet/ParquetTest.scala | 17 +++++++++ 2 files changed, 31 insertions(+), 22 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 3505e0e6dd7b..2879ef9325a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.datasources.parquet -import java.util.Collections - import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag @@ -31,7 +29,7 @@ import org.apache.parquet.example.data.{Group, GroupWriter} import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.hadoop.api.WriteSupport.WriteContext -import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata} +import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.{MessageType, MessageTypeParser} @@ -99,22 +97,22 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { | required int32 c(DATE); | required int32 d(DECIMAL(1,0)); | required int64 e(DECIMAL(10,0)); + | required binary f(UTF8); + | required binary g(ENUM); + | required binary h(DECIMAL(32,0)); + | required fixed_len_byte_array(32) i(DECIMAL(32,0)); |} """.stripMargin) + val expectedSparkTypes = Seq(ByteType, ShortType, DateType, DecimalType(1, 0), DecimalType(10, 0), + StringType, StringType, DecimalType(32, 0), DecimalType(32, 0)) + withTempPath { location => - val extraMetadata = Map.empty[String, String].asJava - val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, "Spark") val path = new Path(location.getCanonicalPath) - val footer = List( - new Footer(path, new ParquetMetadata(fileMetadata, Collections.emptyList())) - ).asJava - - ParquetFileWriter.writeMetadataFile(sparkContext.hadoopConfiguration, path, footer) + val conf = sparkContext.hadoopConfiguration + writeMetadata(parquetSchema, path, conf) val sparkTypes = sqlContext.read.parquet(path.toString).schema.map(_.dataType) - - assert(sparkTypes == - Seq(ByteType, ShortType, DateType, DecimalType(1, 0), DecimalType(10, 0))) + assert(sparkTypes === expectedSparkTypes) } } @@ -377,16 +375,10 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { """.stripMargin) withTempPath { location => - val extraMetadata = Collections.singletonMap( - CatalystReadSupport.SPARK_METADATA_KEY, sparkSchema.toString) - val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, "Spark") + val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> sparkSchema.toString) val path = new Path(location.getCanonicalPath) - - ParquetFileWriter.writeMetadataFile( - sparkContext.hadoopConfiguration, - path, - Collections.singletonList( - new Footer(path, new ParquetMetadata(fileMetadata, Collections.emptyList())))) + val conf = sparkContext.hadoopConfiguration + writeMetadata(parquetSchema, path, conf, extraMetadata) assertResult(sqlContext.read.parquet(path.toString).schema) { StructType( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala index 8ffb01fc5b58..fdd7697c91f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.parquet import java.io.File +import org.apache.parquet.schema.MessageType + import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag @@ -117,6 +119,21 @@ private[sql] trait ParquetTest extends SQLTestUtils { ParquetFileWriter.writeMetadataFile(configuration, path, Seq(footer).asJava) } + /** + * This is an overloaded version of `writeMetadata` above to allow writing customized + * Parquet schema. + */ + protected def writeMetadata( + parquetSchema: MessageType, path: Path, configuration: Configuration, + extraMetadata: Map[String, String] = Map.empty[String, String]): Unit = { + val extraMetadataAsJava = extraMetadata.asJava + val createdBy = s"Apache Spark ${org.apache.spark.SPARK_VERSION}" + val fileMetadata = new FileMetaData(parquetSchema, extraMetadataAsJava, createdBy) + val parquetMetadata = new ParquetMetadata(fileMetadata, Seq.empty[BlockMetaData].asJava) + val footer = new Footer(path, parquetMetadata) + ParquetFileWriter.writeMetadataFile(configuration, path, Seq(footer).asJava) + } + protected def readAllFootersWithoutSummaryFiles( path: Path, configuration: Configuration): Seq[Footer] = { val fs = path.getFileSystem(configuration) From 22e13748299de37c2cffd2a67952930991adeb4a Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 13 Nov 2015 10:12:45 +0900 Subject: [PATCH 4/7] [SPARK-11694][SQL] Shorten line less than 100 characters --- .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 2879ef9325a5..8af4d53684ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -104,8 +104,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { |} """.stripMargin) - val expectedSparkTypes = Seq(ByteType, ShortType, DateType, DecimalType(1, 0), DecimalType(10, 0), - StringType, StringType, DecimalType(32, 0), DecimalType(32, 0)) + val expectedSparkTypes = Seq(ByteType, ShortType, DateType, DecimalType(1, 0), + DecimalType(10, 0), StringType, StringType, DecimalType(32, 0), DecimalType(32, 0)) withTempPath { location => val path = new Path(location.getCanonicalPath) From 8e3e470d44bdfcba758c6bbfb6a26882ba67fe4b Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 13 Nov 2015 12:04:12 +0900 Subject: [PATCH 5/7] [SPARK-11694][SQL] Add a import manually --- .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 8af4d53684ff..49ebc61f0790 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -29,7 +29,7 @@ import org.apache.parquet.example.data.{Group, GroupWriter} import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.hadoop.api.WriteSupport.WriteContext -import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData} import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.{MessageType, MessageTypeParser} From 3c7277abe2efce4ed0bcc37cf15aebc04732f1f5 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 13 Nov 2015 12:38:05 +0900 Subject: [PATCH 6/7] [SPARK-11694][SQL] Add an import for `ParquetMetadata` --- .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 49ebc61f0790..3748e6cbc4da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -29,7 +29,7 @@ import org.apache.parquet.example.data.{Group, GroupWriter} import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.hadoop.api.WriteSupport.WriteContext -import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData} +import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata} import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.{MessageType, MessageTypeParser} From ce36be02943b10c9122b35b541d71931171f41aa Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 13 Nov 2015 13:53:48 +0900 Subject: [PATCH 7/7] [SPARK-11694][SQL] Add an import for `Collections` --- .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 3748e6cbc4da..280c0afd9f01 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources.parquet +import java.util.Collections + import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag