diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index 1df1c8b4af2e9..e0159b9320276 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.avro import java.io._ import java.net.URI -import java.util.zip.Deflater import scala.util.control.NonFatal @@ -31,9 +30,9 @@ import org.apache.avro.mapreduce.AvroJob import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapreduce.Job -import org.slf4j.LoggerFactory import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile} @@ -41,8 +40,8 @@ import org.apache.spark.sql.sources.{DataSourceRegister, Filter} import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration -private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { - private val log = LoggerFactory.getLogger(getClass) +private[avro] class AvroFileFormat extends FileFormat + with DataSourceRegister with Logging with Serializable { override def equals(other: Any): Boolean = other match { case _: AvroFileFormat => true @@ -121,23 +120,23 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { parsedOptions.compression match { case "uncompressed" => - log.info("writing uncompressed Avro records") + logInfo("writing uncompressed Avro records") job.getConfiguration.setBoolean(COMPRESS_KEY, false) case "snappy" => - log.info("compressing Avro output using Snappy") + logInfo("compressing Avro output using Snappy") job.getConfiguration.setBoolean(COMPRESS_KEY, true) job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC) case "deflate" => val deflateLevel = spark.sessionState.conf.avroDeflateLevel - log.info(s"compressing Avro output using deflate (level=$deflateLevel)") + logInfo(s"compressing Avro output using deflate (level=$deflateLevel)") job.getConfiguration.setBoolean(COMPRESS_KEY, true) job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC) job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel) case unknown: String => - log.error(s"unsupported compression codec $unknown") + logError(s"unsupported compression codec $unknown") } new AvroOutputWriterFactory(dataSchema, outputAvroSchema.toString) @@ -157,7 +156,6 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { val parsedOptions = new AvroOptions(options, hadoopConf) (file: PartitionedFile) => { - val log = LoggerFactory.getLogger(classOf[AvroFileFormat]) val conf = broadcastedConf.value.value val userProvidedSchema = parsedOptions.schema.map(new Schema.Parser().parse) @@ -176,7 +174,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { DataFileReader.openReader(in, datumReader) } catch { case NonFatal(e) => - log.error("Exception while opening DataFileReader", e) + logError("Exception while opening DataFileReader", e) in.close() throw e }