From dc5ef4f73f76e68a60590d7203f82b3ebecc6fb0 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 17 Nov 2015 11:03:32 +0900 Subject: [PATCH] [SPARK-11694][FOLLOW-UP] Clean up imports, use a common function for metadata and add a test for FIXED_LEN_BYTE_ARRAY --- .../test/resources/dec-in-fixed-len.parquet | Bin 0 -> 460 bytes .../datasources/parquet/ParquetIOSuite.scala | 42 +++++++----------- 2 files changed, 15 insertions(+), 27 deletions(-) create mode 100644 sql/core/src/test/resources/dec-in-fixed-len.parquet diff --git a/sql/core/src/test/resources/dec-in-fixed-len.parquet b/sql/core/src/test/resources/dec-in-fixed-len.parquet new file mode 100644 index 0000000000000000000000000000000000000000..6ad37d5639511cdb430f33fa6165eb70cd9034c0 GIT binary patch literal 460 zcmZuu%SyvQ6rI#oO3~7VwT&8yPVu5U4^0 z3hK5WJW4SNaflr_N}BXrgbo(V<<*j?`)dokQEarvSr7`N-{ZFH k+I`P - val extraMetadata = Map.empty[String, String].asJava - val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, "Spark") val path = new Path(location.getCanonicalPath) - val footer = List( - new Footer(path, new ParquetMetadata(fileMetadata, Collections.emptyList())) - ).asJava - - ParquetFileWriter.writeMetadataFile(sparkContext.hadoopConfiguration, path, footer) - + val conf = sparkContext.hadoopConfiguration + writeMetadata(parquetSchema, path, conf) val errorMessage = intercept[Throwable] { sqlContext.read.parquet(path.toString).printSchema() }.toString @@ -267,20 +259,14 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { |} """.stripMargin) + val expectedSparkTypes = Seq(StringType, BinaryType) + withTempPath { location => - val extraMetadata = Map.empty[String, String].asJava - val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, "Spark") val path = new Path(location.getCanonicalPath) - val footer = List( - new Footer(path, new ParquetMetadata(fileMetadata, Collections.emptyList())) - ).asJava - - ParquetFileWriter.writeMetadataFile(sparkContext.hadoopConfiguration, path, footer) - - val jsonDataType = sqlContext.read.parquet(path.toString).schema(0).dataType - assert(jsonDataType === StringType) - val bsonDataType = sqlContext.read.parquet(path.toString).schema(1).dataType - assert(bsonDataType === BinaryType) + val conf = sparkContext.hadoopConfiguration + writeMetadata(parquetSchema, path, conf) + val sparkTypes = sqlContext.read.parquet(path.toString).schema.map(_.dataType) + assert(sparkTypes === expectedSparkTypes) } } @@ -607,10 +593,12 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { sqlContext.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'i64_dec)) } - // TODO Adds test case for reading dictionary encoded decimals written as `FIXED_LEN_BYTE_ARRAY` - // The Parquet writer version Spark 1.6 and prior versions use is `PARQUET_1_0`, which doesn't - // provide dictionary encoding support for `FIXED_LEN_BYTE_ARRAY`. Should add a test here once - // we upgrade to `PARQUET_2_0`. + test("read dictionary encoded decimals written as FIXED_LEN_BYTE_ARRAY") { + checkAnswer( + // Decimal column in this file is encoded using plain dictionary + readResourceParquetFile("dec-in-fixed-len.parquet"), + sqlContext.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'fixed_len_dec)) + } } class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)