Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ private[sql] class ParquetFileFormat
// bundled with `ParquetOutputFormat[Row]`.
job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])

ParquetOutputFormat.setWriteSupportClass(job, classOf[CatalystWriteSupport])
ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])

// We want to clear this temporary metadata from saving into Parquet file.
// This metadata is only useful for detecting optional columns when pushdowning filters.
val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
dataSchema).asInstanceOf[StructType]
CatalystWriteSupport.setSchema(dataSchemaToWrite, conf)
ParquetWriteSupport.setSchema(dataSchemaToWrite, conf)

// Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema)
// and `CatalystWriteSupport` (writing actual rows to Parquet files).
Expand Down Expand Up @@ -295,14 +295,14 @@ private[sql] class ParquetFileFormat
ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
ParquetSchemaConverter.checkFieldNames(requiredSchema).json)
hadoopConf.set(
CatalystWriteSupport.SPARK_ROW_SCHEMA,
ParquetWriteSupport.SPARK_ROW_SCHEMA,
ParquetSchemaConverter.checkFieldNames(requiredSchema).json)

// We want to clear this temporary metadata from saving into Parquet file.
// This metadata is only useful for detecting optional columns when pushdowning filters.
val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
requiredSchema).asInstanceOf[StructType]
CatalystWriteSupport.setSchema(dataSchemaToWrite, hadoopConf)
ParquetWriteSupport.setSchema(dataSchemaToWrite, hadoopConf)

// Sets flags for `CatalystSchemaConverter`
hadoopConf.setBoolean(
Expand Down Expand Up @@ -435,14 +435,14 @@ private[sql] class ParquetOutputWriterFactory(
// bundled with `ParquetOutputFormat[Row]`.
job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])

ParquetOutputFormat.setWriteSupportClass(job, classOf[CatalystWriteSupport])
ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])

// We want to clear this temporary metadata from saving into Parquet file.
// This metadata is only useful for detecting optional columns when pushdowning filters.
val dataSchemaToWrite = StructType.removeMetadata(
StructType.metadataKeyForOptionalField,
dataSchema).asInstanceOf[StructType]
CatalystWriteSupport.setSchema(dataSchemaToWrite, conf)
ParquetWriteSupport.setSchema(dataSchemaToWrite, conf)

// Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema)
// and `CatalystWriteSupport` (writing actual rows to Parquet files).
Expand Down Expand Up @@ -611,7 +611,7 @@ private[sql] object ParquetFileFormat extends Logging {
})

conf.set(
CatalystWriteSupport.SPARK_ROW_SCHEMA,
ParquetWriteSupport.SPARK_ROW_SCHEMA,
ParquetSchemaConverter.checkFieldNames(dataSchema).json)

// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import org.apache.spark.sql.types._
* of this option is propagated to this class by the `init()` method and its Hadoop configuration
* argument.
*/
private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] with Logging {
private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
// A `ValueWriter` is responsible for writing a field of an `InternalRow` to the record consumer.
// Here we are using `SpecializedGetters` rather than `InternalRow` so that we can directly access
// data in `ArrayData` without the help of `SpecificMutableRow`.
Expand All @@ -73,7 +73,7 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi
private val decimalBuffer = new Array[Byte](minBytesForPrecision(DecimalType.MAX_PRECISION))

override def init(configuration: Configuration): WriteContext = {
val schemaString = configuration.get(CatalystWriteSupport.SPARK_ROW_SCHEMA)
val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA)
this.schema = StructType.fromString(schemaString)
this.writeLegacyParquetFormat = {
// `SQLConf.PARQUET_WRITE_LEGACY_FORMAT` should always be explicitly set in ParquetRelation
Expand Down Expand Up @@ -424,7 +424,7 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi
}
}

private[parquet] object CatalystWriteSupport {
private[parquet] object ParquetWriteSupport {
val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes"

def setSchema(schema: StructType, configuration: Configuration): Unit = {
Expand Down