Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1 +1 @@
org.apache.spark.sql.avro.AvroFileFormat
org.apache.spark.sql.v2.avro.AvroDataSourceV2
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,23 @@ import java.net.URI
import scala.util.control.NonFatal

import org.apache.avro.Schema
import org.apache.avro.file.DataFileConstants._
import org.apache.avro.file.DataFileReader
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.mapred.{AvroOutputFormat, FsInput}
import org.apache.avro.mapreduce.AvroJob
import org.apache.avro.mapred.FsInput
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.Job

import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.types._
import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.util.SerializableConfiguration

private[avro] class AvroFileFormat extends FileFormat
private[sql] class AvroFileFormat extends FileFormat
with DataSourceRegister with Logging with Serializable {

override def equals(other: Any): Boolean = other match {
Expand All @@ -56,74 +54,7 @@ private[avro] class AvroFileFormat extends FileFormat
spark: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
val conf = spark.sessionState.newHadoopConf()
if (options.contains("ignoreExtension")) {
logWarning(s"Option ${AvroOptions.ignoreExtensionKey} is deprecated. Please use the " +
"general data source option pathGlobFilter for filtering file names.")
}
val parsedOptions = new AvroOptions(options, conf)

// User can specify an optional avro json schema.
val avroSchema = parsedOptions.schema
.map(new Schema.Parser().parse)
.getOrElse {
inferAvroSchemaFromFiles(files, conf, parsedOptions.ignoreExtension,
spark.sessionState.conf.ignoreCorruptFiles)
}

SchemaConverters.toSqlType(avroSchema).dataType match {
case t: StructType => Some(t)
case _ => throw new RuntimeException(
s"""Avro schema cannot be converted to a Spark SQL StructType:
|
|${avroSchema.toString(true)}
|""".stripMargin)
}
}

private def inferAvroSchemaFromFiles(
files: Seq[FileStatus],
conf: Configuration,
ignoreExtension: Boolean,
ignoreCorruptFiles: Boolean): Schema = {
// Schema evolution is not supported yet. Here we only pick first random readable sample file to
// figure out the schema of the whole dataset.
val avroReader = files.iterator.map { f =>
val path = f.getPath
if (!ignoreExtension && !path.getName.endsWith(".avro")) {
None
} else {
Utils.tryWithResource {
new FsInput(path, conf)
} { in =>
try {
Some(DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]()))
} catch {
case e: IOException =>
if (ignoreCorruptFiles) {
logWarning(s"Skipped the footer in the corrupted file: $path", e)
None
} else {
throw new SparkException(s"Could not read file: $path", e)
}
}
}
}
}.collectFirst {
case Some(reader) => reader
}

avroReader match {
case Some(reader) =>
try {
reader.getSchema
} finally {
reader.close()
}
case None =>
throw new FileNotFoundException(
"No Avro files found. If files don't have .avro extension, set ignoreExtension to true")
}
AvroUtils.inferSchema(spark, options, files)
}

override def shortName(): String = "avro"
Expand All @@ -140,32 +71,7 @@ private[avro] class AvroFileFormat extends FileFormat
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
val parsedOptions = new AvroOptions(options, spark.sessionState.newHadoopConf())
val outputAvroSchema: Schema = parsedOptions.schema
.map(new Schema.Parser().parse)
.getOrElse(SchemaConverters.toAvroType(dataSchema, nullable = false,
parsedOptions.recordName, parsedOptions.recordNamespace))

AvroJob.setOutputKeySchema(job, outputAvroSchema)

if (parsedOptions.compression == "uncompressed") {
job.getConfiguration.setBoolean("mapred.output.compress", false)
} else {
job.getConfiguration.setBoolean("mapred.output.compress", true)
logInfo(s"Compressing Avro output using the ${parsedOptions.compression} codec")
val codec = parsedOptions.compression match {
case DEFLATE_CODEC =>
val deflateLevel = spark.sessionState.conf.avroDeflateLevel
logInfo(s"Avro compression level $deflateLevel will be used for $DEFLATE_CODEC codec.")
job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel)
DEFLATE_CODEC
case codec @ (SNAPPY_CODEC | BZIP2_CODEC | XZ_CODEC) => codec
case unknown => throw new IllegalArgumentException(s"Invalid compression codec: $unknown")
}
job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, codec)
}

new AvroOutputWriterFactory(dataSchema, outputAvroSchema.toString)
AvroUtils.prepareWrite(spark.sessionState.conf, job, options, dataSchema)
}

override def buildReader(
Expand Down Expand Up @@ -250,22 +156,7 @@ private[avro] class AvroFileFormat extends FileFormat
}
}

override def supportDataType(dataType: DataType): Boolean = dataType match {
case _: AtomicType => true

case st: StructType => st.forall { f => supportDataType(f.dataType) }

case ArrayType(elementType, _) => supportDataType(elementType)

case MapType(keyType, valueType, _) =>
supportDataType(keyType) && supportDataType(valueType)

case udt: UserDefinedType[_] => supportDataType(udt.sqlType)

case _: NullType => true

case _ => false
}
override def supportDataType(dataType: DataType): Boolean = AvroUtils.supportsDataType(dataType)
}

private[avro] object AvroFileFormat {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.types.StructType
* @param catalystSchema Catalyst schema of input data.
* @param avroSchemaAsJsonString Avro schema of output result, in JSON string format.
*/
private[avro] class AvroOutputWriterFactory(
private[sql] class AvroOutputWriterFactory(
catalystSchema: StructType,
avroSchemaAsJsonString: String) extends OutputWriterFactory {

Expand Down
163 changes: 163 additions & 0 deletions external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.avro

import java.io.{FileNotFoundException, IOException}

import org.apache.avro.Schema
import org.apache.avro.file.DataFileConstants.{BZIP2_CODEC, DEFLATE_CODEC, SNAPPY_CODEC, XZ_CODEC}
import org.apache.avro.file.DataFileReader
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.mapred.{AvroOutputFormat, FsInput}
import org.apache.avro.mapreduce.AvroJob
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.mapreduce.Job

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.OutputWriterFactory
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

object AvroUtils extends Logging {
def inferSchema(
spark: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
val conf = spark.sessionState.newHadoopConf()
if (options.contains("ignoreExtension")) {
logWarning(s"Option ${AvroOptions.ignoreExtensionKey} is deprecated. Please use the " +
"general data source option pathGlobFilter for filtering file names.")
}
val parsedOptions = new AvroOptions(options, conf)

// User can specify an optional avro json schema.
val avroSchema = parsedOptions.schema
.map(new Schema.Parser().parse)
.getOrElse {
inferAvroSchemaFromFiles(files, conf, parsedOptions.ignoreExtension,
spark.sessionState.conf.ignoreCorruptFiles)
}

SchemaConverters.toSqlType(avroSchema).dataType match {
case t: StructType => Some(t)
case _ => throw new RuntimeException(
s"""Avro schema cannot be converted to a Spark SQL StructType:
|
|${avroSchema.toString(true)}
|""".stripMargin)
}
}

def supportsDataType(dataType: DataType): Boolean = dataType match {
case _: AtomicType => true

case st: StructType => st.forall { f => supportsDataType(f.dataType) }

case ArrayType(elementType, _) => supportsDataType(elementType)

case MapType(keyType, valueType, _) =>
supportsDataType(keyType) && supportsDataType(valueType)

case udt: UserDefinedType[_] => supportsDataType(udt.sqlType)

case _: NullType => true

case _ => false
}

def prepareWrite(
sqlConf: SQLConf,
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
val parsedOptions = new AvroOptions(options, job.getConfiguration)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, this was the following (sharedState.sparkContext.hadoopConfiguration + SQLConf). Is job.getConfiguration enough for Avro?

val parsedOptions = new AvroOptions(options, spark.sessionState.newHadoopConf())

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is enough. Orc/Parquet also use the configuration from job.

val outputAvroSchema: Schema = parsedOptions.schema
.map(new Schema.Parser().parse)
.getOrElse(SchemaConverters.toAvroType(dataSchema, nullable = false,
parsedOptions.recordName, parsedOptions.recordNamespace))

AvroJob.setOutputKeySchema(job, outputAvroSchema)

if (parsedOptions.compression == "uncompressed") {
job.getConfiguration.setBoolean("mapred.output.compress", false)
} else {
job.getConfiguration.setBoolean("mapred.output.compress", true)
logInfo(s"Compressing Avro output using the ${parsedOptions.compression} codec")
val codec = parsedOptions.compression match {
case DEFLATE_CODEC =>
val deflateLevel = sqlConf.avroDeflateLevel
logInfo(s"Avro compression level $deflateLevel will be used for $DEFLATE_CODEC codec.")
job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel)
DEFLATE_CODEC
case codec @ (SNAPPY_CODEC | BZIP2_CODEC | XZ_CODEC) => codec
case unknown => throw new IllegalArgumentException(s"Invalid compression codec: $unknown")
}
job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, codec)
}

new AvroOutputWriterFactory(dataSchema, outputAvroSchema.toString)
}

private def inferAvroSchemaFromFiles(
files: Seq[FileStatus],
conf: Configuration,
ignoreExtension: Boolean,
ignoreCorruptFiles: Boolean): Schema = {
// Schema evolution is not supported yet. Here we only pick first random readable sample file to
// figure out the schema of the whole dataset.
val avroReader = files.iterator.map { f =>
val path = f.getPath
if (!ignoreExtension && !path.getName.endsWith(".avro")) {
None
} else {
Utils.tryWithResource {
new FsInput(path, conf)
} { in =>
try {
Some(DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]()))
} catch {
case e: IOException =>
if (ignoreCorruptFiles) {
logWarning(s"Skipped the footer in the corrupted file: $path", e)
None
} else {
throw new SparkException(s"Could not read file: $path", e)
}
}
}
}
}.collectFirst {
case Some(reader) => reader
}

avroReader match {
case Some(reader) =>
try {
reader.getSchema
} finally {
reader.close()
}
case None =>
throw new FileNotFoundException(
"No Avro files found. If files don't have .avro extension, set ignoreExtension to true")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.v2.avro

import org.apache.spark.sql.avro.AvroFileFormat
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.sources.v2.Table
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

class AvroDataSourceV2 extends FileDataSourceV2 {

override def fallbackFileFormat: Class[_ <: FileFormat] = classOf[AvroFileFormat]

override def shortName(): String = "avro"

override def getTable(options: CaseInsensitiveStringMap): Table = {
val paths = getPaths(options)
val tableName = getTableName(paths)
AvroTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
}

override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
val paths = getPaths(options)
val tableName = getTableName(paths)
AvroTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
}
}
Loading