diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index 1d0f40e1ce92..780e4570f697 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -146,7 +146,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister { log.error(s"unsupported compression codec $unknown") } - new AvroOutputWriterFactory(dataSchema, new SerializableSchema(outputAvroSchema)) + new AvroOutputWriterFactory(dataSchema, outputAvroSchema.toString) } override def buildReader( diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala index 18a6d9395140..116020ed5c43 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala @@ -17,14 +17,22 @@ package org.apache.spark.sql.avro +import org.apache.avro.Schema import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} import org.apache.spark.sql.types.StructType +/** + * A factory that produces [[AvroOutputWriter]]. + * @param catalystSchema Catalyst schema of input data. + * @param avroSchemaAsJsonString Avro schema of output result, in JSON string format. + */ private[avro] class AvroOutputWriterFactory( - schema: StructType, - avroSchema: SerializableSchema) extends OutputWriterFactory { + catalystSchema: StructType, + avroSchemaAsJsonString: String) extends OutputWriterFactory { + + private lazy val avroSchema = new Schema.Parser().parse(avroSchemaAsJsonString) override def getFileExtension(context: TaskAttemptContext): String = ".avro" @@ -32,6 +40,6 @@ private[avro] class AvroOutputWriterFactory( path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { - new AvroOutputWriter(path, context, schema, avroSchema.value) + new AvroOutputWriter(path, context, catalystSchema, avroSchema) } } diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/SerializableSchema.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/SerializableSchema.scala deleted file mode 100644 index ec0ddc778c8f..000000000000 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/SerializableSchema.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.avro - -import java.io._ - -import scala.util.control.NonFatal - -import com.esotericsoftware.kryo.{Kryo, KryoSerializable} -import com.esotericsoftware.kryo.io.{Input, Output} -import org.apache.avro.Schema -import org.slf4j.LoggerFactory - -class SerializableSchema(@transient var value: Schema) - extends Serializable with KryoSerializable { - - @transient private[avro] lazy val log = LoggerFactory.getLogger(getClass) - - private def writeObject(out: ObjectOutputStream): Unit = tryOrIOException { - out.defaultWriteObject() - out.writeUTF(value.toString()) - out.flush() - } - - private def readObject(in: ObjectInputStream): Unit = tryOrIOException { - val json = in.readUTF() - value = new Schema.Parser().parse(json) - } - - private def tryOrIOException[T](block: => T): T = { - try { - block - } catch { - case e: IOException => - log.error("Exception encountered", e) - throw e - case NonFatal(e) => - log.error("Exception encountered", e) - throw new IOException(e) - } - } - - def write(kryo: Kryo, out: Output): Unit = { - val dos = new DataOutputStream(out) - dos.writeUTF(value.toString()) - dos.flush() - } - - def read(kryo: Kryo, in: Input): Unit = { - val dis = new DataInputStream(in) - val json = dis.readUTF() - value = new Schema.Parser().parse(json) - } -} diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableSchemaSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableSchemaSuite.scala deleted file mode 100644 index 510bcbdd3192..000000000000 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableSchemaSuite.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.avro - -import org.apache.avro.Schema - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerInstance} - -class SerializableSchemaSuite extends SparkFunSuite { - - private def testSerialization(serializer: SerializerInstance): Unit = { - val avroTypeJson = - s""" - |{ - | "type": "string", - | "name": "my_string" - |} - """.stripMargin - val avroSchema = new Schema.Parser().parse(avroTypeJson) - val serializableSchema = new SerializableSchema(avroSchema) - val serialized = serializer.serialize(serializableSchema) - - serializer.deserialize[Any](serialized) match { - case c: SerializableSchema => - assert(c.log != null, "log was null") - assert(c.value != null, "value was null") - assert(c.value == avroSchema) - case other => fail( - s"Expecting ${classOf[SerializableSchema]}, but got ${other.getClass}.") - } - } - - test("serialization with JavaSerializer") { - testSerialization(new JavaSerializer(new SparkConf()).newInstance()) - } - - test("serialization with KryoSerializer") { - testSerialization(new KryoSerializer(new SparkConf()).newInstance()) - } -}