Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.avro

import java.io._
import java.net.URI
import java.util.zip.Deflater

import scala.util.control.NonFatal

Expand All @@ -31,18 +30,18 @@ import org.apache.avro.mapreduce.AvroJob
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.Job
import org.slf4j.LoggerFactory

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
private val log = LoggerFactory.getLogger(getClass)
private[avro] class AvroFileFormat extends FileFormat
with DataSourceRegister with Logging with Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you make it Serializable specially? If so, what is the reason?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks it has to be otherwise it throws an exception about serialization. Same thing is done in ParquetFileFormat too to allow to log everywhere.


override def equals(other: Any): Boolean = other match {
case _: AvroFileFormat => true
Expand Down Expand Up @@ -121,23 +120,23 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {

parsedOptions.compression match {
case "uncompressed" =>
log.info("writing uncompressed Avro records")
logInfo("writing uncompressed Avro records")
job.getConfiguration.setBoolean(COMPRESS_KEY, false)

case "snappy" =>
log.info("compressing Avro output using Snappy")
logInfo("compressing Avro output using Snappy")
job.getConfiguration.setBoolean(COMPRESS_KEY, true)
job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC)

case "deflate" =>
val deflateLevel = spark.sessionState.conf.avroDeflateLevel
log.info(s"compressing Avro output using deflate (level=$deflateLevel)")
logInfo(s"compressing Avro output using deflate (level=$deflateLevel)")
job.getConfiguration.setBoolean(COMPRESS_KEY, true)
job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC)
job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel)

case unknown: String =>
log.error(s"unsupported compression codec $unknown")
logError(s"unsupported compression codec $unknown")
}

new AvroOutputWriterFactory(dataSchema, outputAvroSchema.toString)
Expand All @@ -157,7 +156,6 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
val parsedOptions = new AvroOptions(options, hadoopConf)

(file: PartitionedFile) => {
val log = LoggerFactory.getLogger(classOf[AvroFileFormat])
val conf = broadcastedConf.value.value
val userProvidedSchema = parsedOptions.schema.map(new Schema.Parser().parse)

Expand All @@ -176,7 +174,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
DataFileReader.openReader(in, datumReader)
} catch {
case NonFatal(e) =>
log.error("Exception while opening DataFileReader", e)
logError("Exception while opening DataFileReader", e)
in.close()
throw e
}
Expand Down