From ec9c130a30cb9d5c0ac8d4a11b31b99ab17c7e6c Mon Sep 17 00:00:00 2001 From: npoberezkin Date: Tue, 28 Aug 2018 16:32:52 +0400 Subject: [PATCH 1/2] [SPARK-25102][Spark Core] Write Spark version information to Parquet file footers --- .../execution/datasources/parquet/ParquetWriteSupport.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index b40b8c2e61f3..31e17656e515 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -29,6 +29,7 @@ import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.hadoop.api.WriteSupport.WriteContext import org.apache.parquet.io.api.{Binary, RecordConsumer} +import org.apache.spark.{SPARK_VERSION} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters @@ -115,6 +116,10 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit } } + override def getName: String = { + SPARK_VERSION + } + private def writeFields( row: InternalRow, schema: StructType, fieldWriters: Array[ValueWriter]): Unit = { var i = 0 From 979218552e2fdeab103480bb043b6fcabb3450c6 Mon Sep 17 00:00:00 2001 From: npoberezkin Date: Thu, 30 Aug 2018 11:02:17 +0400 Subject: [PATCH 2/2] [SPARK-25102][Spark Core] Write Spark version information to Parquet file footer (Added test on reading writer.model.name from footer metadata) --- .../parquet/ParquetWriteSupport.scala | 2 +- .../parquet/ParquetFileFormatSuite.scala | 23 +++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index 31e17656e515..54299b0ba340 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -29,7 +29,7 @@ import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.hadoop.api.WriteSupport.WriteContext import org.apache.parquet.io.api.{Binary, RecordConsumer} -import org.apache.spark.{SPARK_VERSION} +import org.apache.spark.SPARK_VERSION import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala index 94abf115cef3..7c1591a6a1e8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.execution.datasources.parquet import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.parquet.format.converter.ParquetMetadataConverter +import org.apache.parquet.hadoop.ParquetFileReader import org.apache.spark.SparkException import org.apache.spark.sql.QueryTest @@ -56,4 +58,25 @@ class ParquetFileFormatSuite extends QueryTest with ParquetTest with SharedSQLCo }.getCause assert(exception.getMessage().contains("Could not read footer for file")) } + + test("read version from parquet file footer metadata") { + withTempDir { dir => + val config = spark.sessionState.newHadoopConf() + val fs = FileSystem.get(config) + val basePath = dir.getCanonicalPath + + val pathToDirectory = new Path(basePath, "path") + + spark.range(1).toDF("a").coalesce(1).write.save(pathToDirectory.toString) + + val pathToFile = fs.globStatus(new Path(basePath, "path/part*"))(0).getPath + + val fileFooter = ParquetFileReader + .readFooter(config, pathToFile, ParquetMetadataConverter.SKIP_ROW_GROUPS) + val fileWriterModelName = fileFooter.getFileMetaData + .getKeyValueMetaData.get("writer.model.name") + + assert(fileWriterModelName == spark.version) + } + } }