Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ private[sql] class ParquetFileFormat
conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
}

ParquetFileFormat.redirectParquetLogs()

new OutputWriterFactory {
override def newInstance(
path: String,
Expand Down Expand Up @@ -468,9 +470,9 @@ private[sql] class ParquetOutputWriterFactory(
override private[sql] def newWriter(path: String): OutputWriter = new OutputWriter {

// Create TaskAttemptContext that is used to pass on Configuration to the ParquetRecordWriter
private val hadoopTaskAttempId = new TaskAttemptID(new TaskID(new JobID, TaskType.MAP, 0), 0)
private val hadoopTaskAttemptId = new TaskAttemptID(new TaskID(new JobID, TaskType.MAP, 0), 0)
private val hadoopAttemptContext = new TaskAttemptContextImpl(
serializableConf.value, hadoopTaskAttempId)
serializableConf.value, hadoopTaskAttemptId)

// Instance of ParquetRecordWriter that does not use OutputCommitter
private val recordWriter = createNoCommitterRecordWriter(path, hadoopAttemptContext)
Expand Down Expand Up @@ -505,7 +507,7 @@ private[sql] class ParquetOutputWriterFactory(
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
throw new UnsupportedOperationException(
"this verison of newInstance not supported for " +
"this version of newInstance not supported for " +
"ParquetOutputWriterFactory")
}
}
Expand Down Expand Up @@ -665,7 +667,7 @@ private[sql] object ParquetFileFormat extends Logging {
Some(Try(DataType.fromJson(serializedSchema.get))
.recover { case _: Throwable =>
logInfo(
s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
"falling back to the deprecated DataType.fromCaseClassString parser.")
LegacyTypeStringParser.parse(serializedSchema.get)
}
Expand Down Expand Up @@ -880,7 +882,7 @@ private[sql] object ParquetFileFormat extends Logging {
Try(DataType.fromJson(schemaString).asInstanceOf[StructType]).recover {
case _: Throwable =>
logInfo(
s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
"falling back to the deprecated DataType.fromCaseClassString parser.")
LegacyTypeStringParser.parse(schemaString).asInstanceOf[StructType]
}.recoverWith {
Expand Down Expand Up @@ -926,4 +928,9 @@ private[sql] object ParquetFileFormat extends Logging {
// should be removed after this issue is fixed.
}
}

/**
* ParquetFileFormat.prepareWrite calls this function to initialize `redirectParquetLogsViaSLF4J`.
*/
def redirectParquetLogs(): Unit = {}
}