Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added sql/core/src/test/resources/dec-in-fixed-len.parquet
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.execution.datasources.parquet

import java.util.Collections

import org.apache.parquet.column.{Encoding, ParquetProperties}

import scala.collection.JavaConverters._
Expand All @@ -33,7 +31,7 @@ import org.apache.parquet.example.data.{Group, GroupWriter}
import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata}
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.io.api.RecordConsumer
import org.apache.parquet.schema.{MessageType, MessageTypeParser}

Expand Down Expand Up @@ -243,15 +241,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
""".stripMargin)

withTempPath { location =>
val extraMetadata = Map.empty[String, String].asJava
val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, "Spark")
val path = new Path(location.getCanonicalPath)
val footer = List(
new Footer(path, new ParquetMetadata(fileMetadata, Collections.emptyList()))
).asJava

ParquetFileWriter.writeMetadataFile(sparkContext.hadoopConfiguration, path, footer)

val conf = sparkContext.hadoopConfiguration
writeMetadata(parquetSchema, path, conf)
val errorMessage = intercept[Throwable] {
sqlContext.read.parquet(path.toString).printSchema()
}.toString
Expand All @@ -267,20 +259,14 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
|}
""".stripMargin)

val expectedSparkTypes = Seq(StringType, BinaryType)

withTempPath { location =>
val extraMetadata = Map.empty[String, String].asJava
val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, "Spark")
val path = new Path(location.getCanonicalPath)
val footer = List(
new Footer(path, new ParquetMetadata(fileMetadata, Collections.emptyList()))
).asJava

ParquetFileWriter.writeMetadataFile(sparkContext.hadoopConfiguration, path, footer)

val jsonDataType = sqlContext.read.parquet(path.toString).schema(0).dataType
assert(jsonDataType === StringType)
val bsonDataType = sqlContext.read.parquet(path.toString).schema(1).dataType
assert(bsonDataType === BinaryType)
val conf = sparkContext.hadoopConfiguration
writeMetadata(parquetSchema, path, conf)
val sparkTypes = sqlContext.read.parquet(path.toString).schema.map(_.dataType)
assert(sparkTypes === expectedSparkTypes)
}
}

Expand Down Expand Up @@ -607,10 +593,12 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
sqlContext.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'i64_dec))
}

// TODO Adds test case for reading dictionary encoded decimals written as `FIXED_LEN_BYTE_ARRAY`
// The Parquet writer version Spark 1.6 and prior versions use is `PARQUET_1_0`, which doesn't
// provide dictionary encoding support for `FIXED_LEN_BYTE_ARRAY`. Should add a test here once
// we upgrade to `PARQUET_2_0`.
test("read dictionary encoded decimals written as FIXED_LEN_BYTE_ARRAY") {
checkAnswer(
// Decimal column in this file is encoded using plain dictionary
readResourceParquetFile("dec-in-fixed-len.parquet"),
sqlContext.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'fixed_len_dec))
}
}

class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
Expand Down