Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import java.util.zip.Deflater

import scala.util.control.NonFatal

import com.esotericsoftware.kryo.{Kryo, KryoSerializable}
import com.esotericsoftware.kryo.io.{Input, Output}
import org.apache.avro.Schema
import org.apache.avro.file.{DataFileConstants, DataFileReader}
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
Expand All @@ -41,6 +39,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
private val log = LoggerFactory.getLogger(getClass)
Expand Down Expand Up @@ -157,7 +156,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {

val broadcastedConf =
spark.sparkContext.broadcast(new AvroFileFormat.SerializableConfiguration(hadoopConf))
spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
val parsedOptions = new AvroOptions(options, hadoopConf)

(file: PartitionedFile) => {
Expand Down Expand Up @@ -233,43 +232,4 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {

private[avro] object AvroFileFormat {
val IgnoreFilesWithoutExtensionProperty = "avro.mapred.ignore.inputs.without.extension"

class SerializableConfiguration(@transient var value: Configuration)
extends Serializable with KryoSerializable {
@transient private[avro] lazy val log = LoggerFactory.getLogger(getClass)

private def writeObject(out: ObjectOutputStream): Unit = tryOrIOException {
out.defaultWriteObject()
value.write(out)
}

private def readObject(in: ObjectInputStream): Unit = tryOrIOException {
value = new Configuration(false)
value.readFields(in)
}

private def tryOrIOException[T](block: => T): T = {
try {
block
} catch {
case e: IOException =>
log.error("Exception encountered", e)
throw e
case NonFatal(e) =>
log.error("Exception encountered", e)
throw new IOException(e)
}
}

def write(kryo: Kryo, out: Output): Unit = {
val dos = new DataOutputStream(out)
value.write(dos)
dos.flush()
}

def read(kryo: Kryo, in: Input): Unit = {
value = new Configuration(false)
value.readFields(new DataInputStream(in))
}
}
}

This file was deleted.