diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index b40b8c2e61f3..54299b0ba340 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -29,6 +29,7 @@ import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.hadoop.api.WriteSupport.WriteContext import org.apache.parquet.io.api.{Binary, RecordConsumer} +import org.apache.spark.SPARK_VERSION import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters @@ -115,6 +116,10 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit } } + override def getName: String = { + SPARK_VERSION + } + private def writeFields( row: InternalRow, schema: StructType, fieldWriters: Array[ValueWriter]): Unit = { var i = 0 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala index 94abf115cef3..7c1591a6a1e8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.execution.datasources.parquet import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.parquet.format.converter.ParquetMetadataConverter +import org.apache.parquet.hadoop.ParquetFileReader import org.apache.spark.SparkException import org.apache.spark.sql.QueryTest @@ -56,4 +58,25 @@ class ParquetFileFormatSuite extends QueryTest with ParquetTest with SharedSQLCo }.getCause assert(exception.getMessage().contains("Could not read footer for file")) } + + test("read version from parquet file footer metadata") { + withTempDir { dir => + val config = spark.sessionState.newHadoopConf() + val fs = FileSystem.get(config) + val basePath = dir.getCanonicalPath + + val pathToDirectory = new Path(basePath, "path") + + spark.range(1).toDF("a").coalesce(1).write.save(pathToDirectory.toString) + + val pathToFile = fs.globStatus(new Path(basePath, "path/part*"))(0).getPath + + val fileFooter = ParquetFileReader + .readFooter(config, pathToFile, ParquetMetadataConverter.SKIP_ROW_GROUPS) + val fileWriterModelName = fileFooter.getFileMetaData + .getKeyValueMetaData.get("writer.model.name") + + assert(fileWriterModelName == spark.version) + } + } }