Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,33 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}

test("SPARK-11694 Parquet logical types are not being tested properly") {
val parquetSchema = MessageTypeParser.parseMessageType(
"""message root {
| required int32 a(INT_8);
| required int32 b(INT_16);
| required int32 c(DATE);
| required int32 d(DECIMAL(1,0));
| required int64 e(DECIMAL(10,0));
| required binary f(UTF8);
| required binary g(ENUM);
| required binary h(DECIMAL(32,0));
| required fixed_len_byte_array(32) i(DECIMAL(32,0));
|}
""".stripMargin)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add all Parquet logical types that Spark supports here for completeness? For example:

fixed_len_byte_array f0(DECIMAL(d, s))
binary f1(UTF8);


val expectedSparkTypes = Seq(ByteType, ShortType, DateType, DecimalType(1, 0),
DecimalType(10, 0), StringType, StringType, DecimalType(32, 0), DecimalType(32, 0))

withTempPath { location =>
val path = new Path(location.getCanonicalPath)
val conf = sparkContext.hadoopConfiguration
writeMetadata(parquetSchema, path, conf)
val sparkTypes = sqlContext.read.parquet(path.toString).schema.map(_.dataType)
assert(sparkTypes === expectedSparkTypes)
}
}

test("string") {
val data = (1 to 4).map(i => Tuple1(i.toString))
// Property spark.sql.parquet.binaryAsString shouldn't affect Parquet files written by Spark SQL
Expand Down Expand Up @@ -350,16 +377,10 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
""".stripMargin)

withTempPath { location =>
val extraMetadata = Collections.singletonMap(
CatalystReadSupport.SPARK_METADATA_KEY, sparkSchema.toString)
val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, "Spark")
val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> sparkSchema.toString)
val path = new Path(location.getCanonicalPath)

ParquetFileWriter.writeMetadataFile(
sparkContext.hadoopConfiguration,
path,
Collections.singletonList(
new Footer(path, new ParquetMetadata(fileMetadata, Collections.emptyList()))))
val conf = sparkContext.hadoopConfiguration
writeMetadata(parquetSchema, path, conf, extraMetadata)

assertResult(sqlContext.read.parquet(path.toString).schema) {
StructType(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.parquet

import java.io.File

import org.apache.parquet.schema.MessageType

import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
Expand Down Expand Up @@ -117,6 +119,21 @@ private[sql] trait ParquetTest extends SQLTestUtils {
ParquetFileWriter.writeMetadataFile(configuration, path, Seq(footer).asJava)
}

/**
* This is an overloaded version of `writeMetadata` above to allow writing customized
* Parquet schema.
*/
protected def writeMetadata(
parquetSchema: MessageType, path: Path, configuration: Configuration,
extraMetadata: Map[String, String] = Map.empty[String, String]): Unit = {
val extraMetadataAsJava = extraMetadata.asJava
val createdBy = s"Apache Spark ${org.apache.spark.SPARK_VERSION}"
val fileMetadata = new FileMetaData(parquetSchema, extraMetadataAsJava, createdBy)
val parquetMetadata = new ParquetMetadata(fileMetadata, Seq.empty[BlockMetaData].asJava)
val footer = new Footer(path, parquetMetadata)
ParquetFileWriter.writeMetadataFile(configuration, path, Seq(footer).asJava)
}

protected def readAllFootersWithoutSummaryFiles(
path: Path, configuration: Configuration): Seq[Footer] = {
val fs = path.getFileSystem(configuration)
Expand Down