From 97fba623260f34fe7b8e928aa3a9c5c11117ed4c Mon Sep 17 00:00:00 2001 From: Joseph Batchik Date: Tue, 23 Jun 2015 09:58:22 -0700 Subject: [PATCH 01/13] Added a custom Kryo serializer for generic Avro records to reduce the network IO involved during a shuffle. This compresses the schema and allows for users to register their schemas ahead of time to further reduce traffic. Changed how the records were serialized to reduce both the time and memory used removed unused variable --- core/pom.xml | 35 ++++ .../scala/org/apache/spark/SparkConf.scala | 12 ++ .../serializer/GenericAvroSerializer.scala | 185 ++++++++++++++++++ .../spark/serializer/KryoSerializer.scala | 5 + .../GenericAvroSerializerSuite.scala | 87 ++++++++ 5 files changed, 324 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala create mode 100644 core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala diff --git a/core/pom.xml b/core/pom.xml index 40a64beccdc2..b5919ff982bc 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -398,6 +398,41 @@ py4j 0.8.2.1 + + org.apache.avro + avro + ${avro.version} + ${hadoop.deps.scope} + + + org.apache.avro + avro-mapred + ${avro.version} + ${avro.mapred.classifier} + ${hive.deps.scope} + + + io.netty + netty + + + org.mortbay.jetty + jetty + + + org.mortbay.jetty + jetty-util + + + org.mortbay.jetty + servlet-api + + + org.apache.velocity + velocity + + + target/scala-${scala.binary.version}/classes diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 6cf36fbbd625..7d84b6c07629 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -20,9 +20,12 @@ package org.apache.spark import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicBoolean +import org.apache.avro.{Schema, SchemaNormalization} + import scala.collection.JavaConverters._ import scala.collection.mutable.LinkedHashSet +import org.apache.spark.serializer.GenericAvroSerializer.avroSchemaKey import org.apache.spark.serializer.KryoSerializer import org.apache.spark.util.Utils @@ -161,6 +164,15 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { this } + /** + * Use Kryo serialization and register the given set of Avro schemas so that the generic + * record serializer can decrease network IO + */ + def registerAvroSchema(schemas: Array[Schema]): SparkConf = + schemas.foldLeft(this) { (conf, schema) => + conf.set(avroSchemaKey(SchemaNormalization.parsingFingerprint64(schema)), schema.toString) + } + /** Remove a parameter from the configuration */ def remove(key: String): SparkConf = { settings.remove(key) diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala new file mode 100644 index 000000000000..319c3b5d675e --- /dev/null +++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.ByteArrayOutputStream +import java.util.zip.{Inflater, Deflater} + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer} +import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.avro.{Schema, SchemaNormalization} + +import org.apache.spark.SparkConf + +import GenericAvroSerializer._ + +object GenericAvroSerializer { + def avroSchemaKey(implicit fingerprint: Long): String = s"avro.schema.$fingerprint" +} + +/** + * Custom serializer used for generic Avro records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + */ +class GenericAvroSerializer(conf: SparkConf) extends KSerializer[GenericRecord] { + + private val serializer = serialize() + private val deserializer = deserialize() + + private def confSchema(implicit fingerprint: Long) = conf.getOption(avroSchemaKey) + + /** + * Used to compress Schemas when they are being sent over the wire. + * The compression results are memoized to reduce the compression time since the + * same schema is compressed many times over + */ + def compressor(): Schema => Array[Byte] = { + val cache = new mutable.HashMap[Schema, Array[Byte]]() + + def compress(schema: Schema): Array[Byte] = cache.getOrElseUpdate(schema, { + val deflater = new Deflater(Deflater.BEST_COMPRESSION) + val schemaBytes = schema.toString.getBytes("UTF-8") + deflater.setInput(schemaBytes) + deflater.finish() + val buffer = Array.ofDim[Byte](schemaBytes.length) + val outputStream = new ByteArrayOutputStream(schemaBytes.length) + while(!deflater.finished()) { + val count = deflater.deflate(buffer) + outputStream.write(buffer, 0, count) + } + outputStream.close() + outputStream.toByteArray + }) + + compress + } + + /** + * Decompresses the schema into the actual in-memory object. Keeps an internal cache of already + * seen values so to limit the number of times that decompression has to be done. + */ + def decompressor(): Array[Byte] => Schema = { + val cache = new mutable.HashMap[Array[Byte], Schema]() + + def decompress(schemaBytes: Array[Byte]): Schema = cache.getOrElseUpdate(schemaBytes, { + val inflater = new Inflater() + inflater.setInput(schemaBytes) + val outputStream = new ByteArrayOutputStream(schemaBytes.length) + val tmpBuffer = Array.ofDim[Byte](1024) + while (!inflater.finished()) { + val count = inflater.inflate(tmpBuffer) + outputStream.write(tmpBuffer, 0, count) + } + inflater.end() + outputStream.close() + new Schema.Parser().parse(new String(outputStream.toByteArray, "UTF-8")) + }) + + decompress + } + + /** + * Serializes generic records into byte buffers. It keeps an internal cache of already seen + * schema as to reduce the amount of required work. + */ + def serialize(): (GenericRecord, KryoOutput) => Unit = { + val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]() + val schemaCache = new mutable.HashMap[Schema, Long]() + val compress = compressor() + + def serialize[R <: GenericRecord](datum: R, schema: Schema, output: KryoOutput): Unit = { + val encoder = EncoderFactory.get.binaryEncoder(output, null) + writerCache.getOrElseUpdate(schema, GenericData.get.createDatumWriter(schema)) + .asInstanceOf[DatumWriter[R]] + .write(datum, encoder) + encoder.flush() + } + + def wrapDatum(datum: GenericRecord, output: KryoOutput): Unit = { + val schema = datum.getSchema + val fingerprint = schemaCache.getOrElseUpdate(schema, { + SchemaNormalization.parsingFingerprint64(schema) + }) + confSchema(fingerprint) match { + case Some(_) => { + output.writeBoolean(true) + output.writeLong(fingerprint) + } + case None => { + output.writeBoolean(false) + val compressedSchema = compress(schema) + output.writeInt(compressedSchema.array.length) + output.writeBytes(compressedSchema.array) + } + } + serialize(datum, schema, output) + } + wrapDatum + } + + /** + * Deserializes generic records into their in-memory form. There is internal + * state to keep a cache of already seen schemas and datum readers. + * @return + */ + def deserialize(): KryoInput => GenericRecord = { + val readerCache = new mutable.HashMap[Schema, DatumReader[_]]() + val schemaCache = new mutable.HashMap[Long, Schema]() + val decompress = decompressor() + + def deserialize(input: KryoInput, schema: Schema): GenericRecord = { + val decoder = DecoderFactory.get.directBinaryDecoder(input, null) + readerCache.getOrElseUpdate(schema, GenericData.get.createDatumReader(schema)) + .asInstanceOf[DatumReader[GenericRecord]] + .read(null.asInstanceOf[GenericRecord], decoder) + } + + def unwrapDatum(input: KryoInput): GenericRecord = { + val schema = { + if (input.readBoolean()) { + val fingerprint = input.readLong() + schemaCache.getOrElseUpdate(fingerprint, { + confSchema(fingerprint) match { + case Some(s) => new Schema.Parser().parse(s) + case None => throw new RuntimeException(s"Unknown fingerprint: $fingerprint") + } + }) + } else { + val length = input.readInt() + decompress(input.readBytes(length)) + } + } + deserialize(input, schema) + } + unwrapDatum + } + + override def write(kryo: Kryo, output: KryoOutput, datum: GenericRecord): Unit = + serializer(datum, output) + + override def read(kryo: Kryo, input: KryoInput, datumClass: Class[GenericRecord]): GenericRecord = + deserializer(input) +} + diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index cd8a82347a1e..1edba877a52a 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -21,6 +21,8 @@ import java.io.{EOFException, IOException, InputStream, OutputStream} import java.nio.ByteBuffer import javax.annotation.Nullable +import org.apache.avro.generic.{GenericData, GenericRecord} + import scala.reflect.ClassTag import com.esotericsoftware.kryo.{Kryo, KryoException} @@ -99,6 +101,9 @@ class KryoSerializer(conf: SparkConf) kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer()) kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer()) + kryo.register(classOf[GenericRecord], new GenericAvroSerializer(conf)) + kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(conf)) + try { // Use the default classloader when calling the user registrator. Thread.currentThread.setContextClassLoader(classLoader) diff --git a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala new file mode 100644 index 000000000000..47c38ffa7ee0 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, OutputStream} + +import com.esotericsoftware.kryo.io.{Output, Input} +import org.apache.avro.generic.GenericData.Record +import org.apache.avro.{SchemaBuilder, Schema} +import org.apache.spark.{SparkFunSuite, SharedSparkContext} + +class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + + val schema : Schema = SchemaBuilder + .record("testRecord").fields() + .requiredString("data") + .endRecord() + val record = new Record(schema) + record.put("data", "test data") + + test("schema compression and decompression") { + val genericSer = new GenericAvroSerializer(conf) + val compressor = genericSer.compressor() + val decompessor = genericSer.decompressor() + + assert(schema === decompessor.compose(compressor)(schema)) + } + + test("record serialization and deserialization") { + val genericSer = new GenericAvroSerializer(conf) + val serializer = genericSer.serialize() + val deserializer = genericSer.deserialize() + + val outputStream = new ByteArrayOutputStream() + val output = new Output(outputStream) + serializer(record, output) + output.flush() + output.close() + + val input = new Input(new ByteArrayInputStream(outputStream.toByteArray)) + assert(deserializer(input) === record) + } + + test("uses schema fingerprint to decrease message size") { + val genericSer = new GenericAvroSerializer(conf) + val serializer = genericSer.serialize() + val output = new Output(new ByteArrayOutputStream()) + + val beginningNormalPosition = output.total() + serializer(record, output) + output.flush() + val normalLength = output.total - beginningNormalPosition + + conf.registerAvroSchema(Array(schema)) + val beginningFingerprintPosition = output.total() + serializer(record, output) + val fingerprintLength = output.total - beginningFingerprintPosition + + assert(fingerprintLength < normalLength) + } + + test("caches previously seen schemas") { + val genericSer = new GenericAvroSerializer(conf) + val compressor = genericSer.compressor() + val decompressor = genericSer.decompressor() + val compressedSchema = compressor(schema) + + assert(compressedSchema.eq(compressor(schema))) + assert(decompressor(compressedSchema).eq(decompressor(compressedSchema))) + } +} From 2b545cc8a75876a86bdf9a0db74fae6eac8c1e2f Mon Sep 17 00:00:00 2001 From: Joseph Batchik Date: Thu, 25 Jun 2015 14:28:08 -0700 Subject: [PATCH 02/13] started working on fixes for pr --- core/pom.xml | 5 +- .../scala/org/apache/spark/SparkConf.scala | 5 +- .../serializer/GenericAvroSerializer.scala | 189 ++++++++---------- .../GenericAvroSerializerSuite.scala | 27 +-- 4 files changed, 99 insertions(+), 127 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index b5919ff982bc..26be53a2452d 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -402,14 +402,13 @@ org.apache.avro avro ${avro.version} - ${hadoop.deps.scope} + compile org.apache.avro avro-mapred ${avro.version} - ${avro.mapred.classifier} - ${hive.deps.scope} + compile io.netty diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 7d84b6c07629..d2b0786cf6ab 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -18,13 +18,12 @@ package org.apache.spark import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.atomic.AtomicBoolean - -import org.apache.avro.{Schema, SchemaNormalization} import scala.collection.JavaConverters._ import scala.collection.mutable.LinkedHashSet +import org.apache.avro.{Schema, SchemaNormalization} + import org.apache.spark.serializer.GenericAvroSerializer.avroSchemaKey import org.apache.spark.serializer.KryoSerializer import org.apache.spark.util.Utils diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala index 319c3b5d675e..7b0bf461ef7e 100644 --- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala @@ -18,6 +18,7 @@ package org.apache.spark.serializer import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer import java.util.zip.{Inflater, Deflater} import scala.collection.mutable @@ -33,7 +34,7 @@ import org.apache.spark.SparkConf import GenericAvroSerializer._ object GenericAvroSerializer { - def avroSchemaKey(implicit fingerprint: Long): String = s"avro.schema.$fingerprint" + def avroSchemaKey(fingerprint: Long): String = s"avro.schema.$fingerprint" } /** @@ -45,141 +46,119 @@ object GenericAvroSerializer { */ class GenericAvroSerializer(conf: SparkConf) extends KSerializer[GenericRecord] { - private val serializer = serialize() - private val deserializer = deserialize() + /** Used to reduce the amount of effort to compress the schema */ + private val compressCache = new mutable.HashMap[Schema, Array[Byte]]() + private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]() - private def confSchema(implicit fingerprint: Long) = conf.getOption(avroSchemaKey) + /** Reuses the same datum reader/writer since the same schema will be used many times */ + private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]() + private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]() + + /** Fingerprinting is very expensive to this alleviates most of the work */ + private val fingerprintCache = new mutable.HashMap[Schema, Long]() + private val schemaCache = new mutable.HashMap[Long, Schema]() + + private def confSchema(fingerprint: Long) = conf.getOption(avroSchemaKey(fingerprint)) /** * Used to compress Schemas when they are being sent over the wire. * The compression results are memoized to reduce the compression time since the * same schema is compressed many times over */ - def compressor(): Schema => Array[Byte] = { - val cache = new mutable.HashMap[Schema, Array[Byte]]() - - def compress(schema: Schema): Array[Byte] = cache.getOrElseUpdate(schema, { - val deflater = new Deflater(Deflater.BEST_COMPRESSION) - val schemaBytes = schema.toString.getBytes("UTF-8") - deflater.setInput(schemaBytes) - deflater.finish() - val buffer = Array.ofDim[Byte](schemaBytes.length) - val outputStream = new ByteArrayOutputStream(schemaBytes.length) - while(!deflater.finished()) { - val count = deflater.deflate(buffer) - outputStream.write(buffer, 0, count) - } - outputStream.close() - outputStream.toByteArray - }) + def compress(schema: Schema): Array[Byte] = compressCache.getOrElseUpdate(schema, { + val deflater = new Deflater(Deflater.BEST_COMPRESSION) + val schemaBytes = schema.toString.getBytes("UTF-8") + deflater.setInput(schemaBytes) + deflater.finish() + val buffer = Array.ofDim[Byte](schemaBytes.length) + val outputStream = new ByteArrayOutputStream(schemaBytes.length) + while(!deflater.finished()) { + val count = deflater.deflate(buffer) + outputStream.write(buffer, 0, count) + } + outputStream.close() + outputStream.toByteArray + }) - compress - } /** * Decompresses the schema into the actual in-memory object. Keeps an internal cache of already * seen values so to limit the number of times that decompression has to be done. */ - def decompressor(): Array[Byte] => Schema = { - val cache = new mutable.HashMap[Array[Byte], Schema]() - - def decompress(schemaBytes: Array[Byte]): Schema = cache.getOrElseUpdate(schemaBytes, { - val inflater = new Inflater() - inflater.setInput(schemaBytes) - val outputStream = new ByteArrayOutputStream(schemaBytes.length) - val tmpBuffer = Array.ofDim[Byte](1024) - while (!inflater.finished()) { - val count = inflater.inflate(tmpBuffer) - outputStream.write(tmpBuffer, 0, count) - } - inflater.end() - outputStream.close() - new Schema.Parser().parse(new String(outputStream.toByteArray, "UTF-8")) - }) - - decompress - } + def decompress(schemaBytes: ByteBuffer): Schema = decompressCache.getOrElseUpdate(schemaBytes, { + val inflater = new Inflater() + val bytes = schemaBytes.array() + inflater.setInput(bytes) + val outputStream = new ByteArrayOutputStream(bytes.length) + val tmpBuffer = Array.ofDim[Byte](1024) + while (!inflater.finished()) { + val count = inflater.inflate(tmpBuffer) + outputStream.write(tmpBuffer, 0, count) + } + inflater.end() + outputStream.close() + new Schema.Parser().parse(new String(outputStream.toByteArray, "UTF-8")) + }) /** - * Serializes generic records into byte buffers. It keeps an internal cache of already seen - * schema as to reduce the amount of required work. + * Serializes a record to the given output stream. It caches a lot of the internal data as + * to not redo work */ - def serialize(): (GenericRecord, KryoOutput) => Unit = { - val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]() - val schemaCache = new mutable.HashMap[Schema, Long]() - val compress = compressor() - - def serialize[R <: GenericRecord](datum: R, schema: Schema, output: KryoOutput): Unit = { - val encoder = EncoderFactory.get.binaryEncoder(output, null) - writerCache.getOrElseUpdate(schema, GenericData.get.createDatumWriter(schema)) - .asInstanceOf[DatumWriter[R]] - .write(datum, encoder) - encoder.flush() - } + def serializeDatum[R <: GenericRecord](datum: R, output: KryoOutput): Unit = { + val encoder = EncoderFactory.get.binaryEncoder(output, null) + val schema = datum.getSchema + val fingerprint = fingerprintCache.getOrElseUpdate(schema, { + SchemaNormalization.parsingFingerprint64(schema) + }) + confSchema(fingerprint) match { + case Some(_) => { + output.writeBoolean(true) + output.writeLong(fingerprint) + } + case None => { + output.writeBoolean(false) + val compressedSchema = compress(schema) + output.writeInt(compressedSchema.length) + output.writeBytes(compressedSchema) - def wrapDatum(datum: GenericRecord, output: KryoOutput): Unit = { - val schema = datum.getSchema - val fingerprint = schemaCache.getOrElseUpdate(schema, { - SchemaNormalization.parsingFingerprint64(schema) - }) - confSchema(fingerprint) match { - case Some(_) => { - output.writeBoolean(true) - output.writeLong(fingerprint) - } - case None => { - output.writeBoolean(false) - val compressedSchema = compress(schema) - output.writeInt(compressedSchema.array.length) - output.writeBytes(compressedSchema.array) - } } - serialize(datum, schema, output) } - wrapDatum + + writerCache.getOrElseUpdate(schema, GenericData.get.createDatumWriter(schema)) + .asInstanceOf[DatumWriter[R]] + .write(datum, encoder) + encoder.flush() } /** * Deserializes generic records into their in-memory form. There is internal * state to keep a cache of already seen schemas and datum readers. - * @return */ - def deserialize(): KryoInput => GenericRecord = { - val readerCache = new mutable.HashMap[Schema, DatumReader[_]]() - val schemaCache = new mutable.HashMap[Long, Schema]() - val decompress = decompressor() - - def deserialize(input: KryoInput, schema: Schema): GenericRecord = { - val decoder = DecoderFactory.get.directBinaryDecoder(input, null) - readerCache.getOrElseUpdate(schema, GenericData.get.createDatumReader(schema)) - .asInstanceOf[DatumReader[GenericRecord]] - .read(null.asInstanceOf[GenericRecord], decoder) - } - - def unwrapDatum(input: KryoInput): GenericRecord = { - val schema = { - if (input.readBoolean()) { - val fingerprint = input.readLong() - schemaCache.getOrElseUpdate(fingerprint, { - confSchema(fingerprint) match { - case Some(s) => new Schema.Parser().parse(s) - case None => throw new RuntimeException(s"Unknown fingerprint: $fingerprint") - } - }) - } else { - val length = input.readInt() - decompress(input.readBytes(length)) - } + def deserializeDatum(input: KryoInput): GenericRecord = { + val schema = { + if (input.readBoolean()) { + val fingerprint = input.readLong() + schemaCache.getOrElseUpdate(fingerprint, { + confSchema(fingerprint) match { + case Some(s) => new Schema.Parser().parse(s) + case None => throw new RuntimeException(s"Unknown fingerprint: $fingerprint") + } + }) + } else { + val length = input.readInt() + decompress(ByteBuffer.wrap(input.readBytes(length))) } - deserialize(input, schema) } - unwrapDatum + val decoder = DecoderFactory.get.directBinaryDecoder(input, null) + readerCache.getOrElseUpdate(schema, GenericData.get.createDatumReader(schema)) + .asInstanceOf[DatumReader[GenericRecord]] + .read(null.asInstanceOf[GenericRecord], decoder) } override def write(kryo: Kryo, output: KryoOutput, datum: GenericRecord): Unit = - serializer(datum, output) + serializeDatum(datum, output) override def read(kryo: Kryo, input: KryoInput, datumClass: Class[GenericRecord]): GenericRecord = - deserializer(input) + deserializeDatum(input) } diff --git a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala index 47c38ffa7ee0..47dfa0eca1b3 100644 --- a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.serializer import java.io.{ByteArrayInputStream, ByteArrayOutputStream, OutputStream} +import java.nio.ByteBuffer import com.esotericsoftware.kryo.io.{Output, Input} import org.apache.avro.generic.GenericData.Record @@ -36,40 +37,35 @@ class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { test("schema compression and decompression") { val genericSer = new GenericAvroSerializer(conf) - val compressor = genericSer.compressor() - val decompessor = genericSer.decompressor() - - assert(schema === decompessor.compose(compressor)(schema)) + assert(schema === genericSer.decompress(ByteBuffer.wrap(genericSer.compress(schema)))) } test("record serialization and deserialization") { val genericSer = new GenericAvroSerializer(conf) - val serializer = genericSer.serialize() - val deserializer = genericSer.deserialize() val outputStream = new ByteArrayOutputStream() val output = new Output(outputStream) - serializer(record, output) + genericSer.serializeDatum(record, output) output.flush() output.close() val input = new Input(new ByteArrayInputStream(outputStream.toByteArray)) - assert(deserializer(input) === record) + assert(genericSer.deserializeDatum(input) === record) } test("uses schema fingerprint to decrease message size") { val genericSer = new GenericAvroSerializer(conf) - val serializer = genericSer.serialize() + val output = new Output(new ByteArrayOutputStream()) val beginningNormalPosition = output.total() - serializer(record, output) + genericSer.serializeDatum(record, output) output.flush() val normalLength = output.total - beginningNormalPosition conf.registerAvroSchema(Array(schema)) val beginningFingerprintPosition = output.total() - serializer(record, output) + genericSer.serializeDatum(record, output) val fingerprintLength = output.total - beginningFingerprintPosition assert(fingerprintLength < normalLength) @@ -77,11 +73,10 @@ class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { test("caches previously seen schemas") { val genericSer = new GenericAvroSerializer(conf) - val compressor = genericSer.compressor() - val decompressor = genericSer.decompressor() - val compressedSchema = compressor(schema) + val compressedSchema = genericSer.compress(schema) + val decompressedScheam = genericSer.decompress(ByteBuffer.wrap(compressedSchema)) - assert(compressedSchema.eq(compressor(schema))) - assert(decompressor(compressedSchema).eq(decompressor(compressedSchema))) + assert(compressedSchema.eq(genericSer.compress(schema))) + assert(decompressedScheam.eq(genericSer.decompress(ByteBuffer.wrap(compressedSchema)))) } } From f4ae251283e2418c3adafb26ef5a9b88a4adbb08 Mon Sep 17 00:00:00 2001 From: Joseph Batchik Date: Thu, 25 Jun 2015 17:01:37 -0700 Subject: [PATCH 03/13] fixed serialization error in that SparkConf cannot be serialized --- .../main/scala/org/apache/spark/SparkConf.scala | 10 ++++++++-- .../spark/serializer/GenericAvroSerializer.scala | 15 ++++++--------- .../apache/spark/serializer/KryoSerializer.scala | 7 +++++-- .../serializer/GenericAvroSerializerSuite.scala | 13 +++++++------ 4 files changed, 26 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index d2b0786cf6ab..a08d843dbc8d 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -24,7 +24,7 @@ import scala.collection.mutable.LinkedHashSet import org.apache.avro.{Schema, SchemaNormalization} -import org.apache.spark.serializer.GenericAvroSerializer.avroSchemaKey +import org.apache.spark.serializer.GenericAvroSerializer.{avroSchemaNamespace, avroSchemaKey} import org.apache.spark.serializer.KryoSerializer import org.apache.spark.util.Utils @@ -162,7 +162,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { set("spark.serializer", classOf[KryoSerializer].getName) this } - /** * Use Kryo serialization and register the given set of Avro schemas so that the generic * record serializer can decrease network IO @@ -172,6 +171,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { conf.set(avroSchemaKey(SchemaNormalization.parsingFingerprint64(schema)), schema.toString) } + /** Gets all the avro schemas in the configuration used in the generic Avro record serializer */ + def getAvroSchema: Map[Long, String] = { + getAll.filter { case (k, v) => k.startsWith(avroSchemaNamespace) } + .map { case (k, v) => (k.substring(avroSchemaNamespace.length).toLong, v) } + .toMap + } + /** Remove a parameter from the configuration */ def remove(key: String): SparkConf = { settings.remove(key) diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala index 7b0bf461ef7e..3071b65f9c70 100644 --- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala @@ -29,12 +29,9 @@ import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.avro.io._ import org.apache.avro.{Schema, SchemaNormalization} -import org.apache.spark.SparkConf - -import GenericAvroSerializer._ - object GenericAvroSerializer { - def avroSchemaKey(fingerprint: Long): String = s"avro.schema.$fingerprint" + val avroSchemaNamespace = "avro.schema." + def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + fingerprint } /** @@ -44,7 +41,7 @@ object GenericAvroSerializer { * Actions like parsing or compressing schemas are computationally expensive so the serializer * caches all previously seen values as to reduce the amount of work needed to do. */ -class GenericAvroSerializer(conf: SparkConf) extends KSerializer[GenericRecord] { +class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[GenericRecord] { /** Used to reduce the amount of effort to compress the schema */ private val compressCache = new mutable.HashMap[Schema, Array[Byte]]() @@ -58,7 +55,7 @@ class GenericAvroSerializer(conf: SparkConf) extends KSerializer[GenericRecord] private val fingerprintCache = new mutable.HashMap[Schema, Long]() private val schemaCache = new mutable.HashMap[Long, Schema]() - private def confSchema(fingerprint: Long) = conf.getOption(avroSchemaKey(fingerprint)) + private def getSchema(fingerprint: Long): Option[String] = schemas.get(fingerprint) /** * Used to compress Schemas when they are being sent over the wire. @@ -110,7 +107,7 @@ class GenericAvroSerializer(conf: SparkConf) extends KSerializer[GenericRecord] val fingerprint = fingerprintCache.getOrElseUpdate(schema, { SchemaNormalization.parsingFingerprint64(schema) }) - confSchema(fingerprint) match { + getSchema(fingerprint) match { case Some(_) => { output.writeBoolean(true) output.writeLong(fingerprint) @@ -139,7 +136,7 @@ class GenericAvroSerializer(conf: SparkConf) extends KSerializer[GenericRecord] if (input.readBoolean()) { val fingerprint = input.readLong() schemaCache.getOrElseUpdate(fingerprint, { - confSchema(fingerprint) match { + getSchema(fingerprint) match { case Some(s) => new Schema.Parser().parse(s) case None => throw new RuntimeException(s"Unknown fingerprint: $fingerprint") } diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 1edba877a52a..a1b7deece399 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -74,6 +74,9 @@ class KryoSerializer(conf: SparkConf) private val classesToRegister = conf.get("spark.kryo.classesToRegister", "") .split(',') .filter(!_.isEmpty) + conf.getExecutorEnv + + private val avroSchemas = conf.getAvroSchema def newKryoOutput(): KryoOutput = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) @@ -101,8 +104,8 @@ class KryoSerializer(conf: SparkConf) kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer()) kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer()) - kryo.register(classOf[GenericRecord], new GenericAvroSerializer(conf)) - kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(conf)) + kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas)) + kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas)) try { // Use the default classloader when calling the user registrator. diff --git a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala index 47dfa0eca1b3..ffb05a9c79c6 100644 --- a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala @@ -36,12 +36,12 @@ class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { record.put("data", "test data") test("schema compression and decompression") { - val genericSer = new GenericAvroSerializer(conf) + val genericSer = new GenericAvroSerializer(conf.getAvroSchema) assert(schema === genericSer.decompress(ByteBuffer.wrap(genericSer.compress(schema)))) } test("record serialization and deserialization") { - val genericSer = new GenericAvroSerializer(conf) + val genericSer = new GenericAvroSerializer(conf.getAvroSchema) val outputStream = new ByteArrayOutputStream() val output = new Output(outputStream) @@ -54,25 +54,26 @@ class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { } test("uses schema fingerprint to decrease message size") { - val genericSer = new GenericAvroSerializer(conf) + val genericSerFull = new GenericAvroSerializer(conf.getAvroSchema) val output = new Output(new ByteArrayOutputStream()) val beginningNormalPosition = output.total() - genericSer.serializeDatum(record, output) + genericSerFull.serializeDatum(record, output) output.flush() val normalLength = output.total - beginningNormalPosition conf.registerAvroSchema(Array(schema)) + val genericSerFinger = new GenericAvroSerializer(conf.getAvroSchema) val beginningFingerprintPosition = output.total() - genericSer.serializeDatum(record, output) + genericSerFinger.serializeDatum(record, output) val fingerprintLength = output.total - beginningFingerprintPosition assert(fingerprintLength < normalLength) } test("caches previously seen schemas") { - val genericSer = new GenericAvroSerializer(conf) + val genericSer = new GenericAvroSerializer(conf.getAvroSchema) val compressedSchema = genericSer.compress(schema) val decompressedScheam = genericSer.decompress(ByteBuffer.wrap(compressedSchema)) From ab46d10a22b4d9d81d93111cf8a71abe387cf415 Mon Sep 17 00:00:00 2001 From: Joseph Batchik Date: Mon, 29 Jun 2015 12:16:08 -0700 Subject: [PATCH 04/13] Changed Avro dependency to be similar to parent --- core/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 26be53a2452d..eb29dca0c1c2 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -402,13 +402,13 @@ org.apache.avro avro ${avro.version} - compile + ${hadoop.deps.scope} org.apache.avro avro-mapred ${avro.version} - compile + ${hadoop.deps.scope} io.netty From d421bf5e54eb4678c6d21a44d100e84fe19d94ed Mon Sep 17 00:00:00 2001 From: Joseph Batchik Date: Wed, 1 Jul 2015 12:47:18 -0700 Subject: [PATCH 05/13] updated pom to removed versions --- core/pom.xml | 49 ++++++++++--------------------------------------- 1 file changed, 10 insertions(+), 39 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index eb29dca0c1c2..7969cb2454a3 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -34,6 +34,16 @@ Spark Project Core http://spark.apache.org/ + + net.sf.py4j + py4j + 0.8.2.1 + + + org.apache.avro + avro-mapred + ${avro.mapred.classifier} + com.google.guava guava @@ -393,45 +403,6 @@ - - net.sf.py4j - py4j - 0.8.2.1 - - - org.apache.avro - avro - ${avro.version} - ${hadoop.deps.scope} - - - org.apache.avro - avro-mapred - ${avro.version} - ${hadoop.deps.scope} - - - io.netty - netty - - - org.mortbay.jetty - jetty - - - org.mortbay.jetty - jetty-util - - - org.mortbay.jetty - servlet-api - - - org.apache.velocity - velocity - - - target/scala-${scala.binary.version}/classes From 6d1925c5ea899e61103d7f3fa332db771b9616b2 Mon Sep 17 00:00:00 2001 From: Joseph Batchik Date: Tue, 14 Jul 2015 10:07:16 -0700 Subject: [PATCH 06/13] fixed to changes suggested by @squito --- core/pom.xml | 10 +++--- .../scala/org/apache/spark/SparkConf.scala | 11 +++---- .../serializer/GenericAvroSerializer.scala | 31 ++++++++++--------- .../spark/serializer/KryoSerializer.scala | 1 - .../GenericAvroSerializerSuite.scala | 2 +- 5 files changed, 28 insertions(+), 27 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 7969cb2454a3..56c3f9572049 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -34,11 +34,6 @@ Spark Project Core http://spark.apache.org/ - - net.sf.py4j - py4j - 0.8.2.1 - org.apache.avro avro-mapred @@ -403,6 +398,11 @@ + + net.sf.py4j + py4j + 0.8.2.1 + target/scala-${scala.binary.version}/classes diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index a08d843dbc8d..0d7a1378bc9a 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -166,17 +166,16 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { * Use Kryo serialization and register the given set of Avro schemas so that the generic * record serializer can decrease network IO */ - def registerAvroSchema(schemas: Array[Schema]): SparkConf = - schemas.foldLeft(this) { (conf, schema) => - conf.set(avroSchemaKey(SchemaNormalization.parsingFingerprint64(schema)), schema.toString) - } + def registerAvroSchemas(schemas: Schema*): SparkConf = schemas.foldLeft(this) { (conf, schema) => + conf.set(avroSchemaKey(schema), schema.toString) + } /** Gets all the avro schemas in the configuration used in the generic Avro record serializer */ - def getAvroSchema: Map[Long, String] = { + def getAvroSchema: Map[Long, String] = getAll.filter { case (k, v) => k.startsWith(avroSchemaNamespace) } .map { case (k, v) => (k.substring(avroSchemaNamespace.length).toLong, v) } .toMap - } + /** Remove a parameter from the configuration */ def remove(key: String): SparkConf = { diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala index 3071b65f9c70..6a4a5c2f2395 100644 --- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala @@ -25,13 +25,17 @@ import scala.collection.mutable import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer} import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} +import org.apache.avro.{Schema, SchemaNormalization} import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.avro.io._ -import org.apache.avro.{Schema, SchemaNormalization} -object GenericAvroSerializer { - val avroSchemaNamespace = "avro.schema." - def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + fingerprint +/** + * + */ +private[spark] object GenericAvroSerializer { + final val avroSchemaNamespace = "avro.schema." + def avroSchemaKey(schema: Schema): String = + avroSchemaNamespace + SchemaNormalization.parsingFingerprint64(schema) } /** @@ -41,7 +45,8 @@ object GenericAvroSerializer { * Actions like parsing or compressing schemas are computationally expensive so the serializer * caches all previously seen values as to reduce the amount of work needed to do. */ -class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[GenericRecord] { +private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) + extends KSerializer[GenericRecord] { /** Used to reduce the amount of effort to compress the schema */ private val compressCache = new mutable.HashMap[Schema, Array[Byte]]() @@ -51,12 +56,10 @@ class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[Gene private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]() private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]() - /** Fingerprinting is very expensive to this alleviates most of the work */ + /** Fingerprinting is very expensive so this alleviates most of the work */ private val fingerprintCache = new mutable.HashMap[Schema, Long]() private val schemaCache = new mutable.HashMap[Long, Schema]() - private def getSchema(fingerprint: Long): Option[String] = schemas.get(fingerprint) - /** * Used to compress Schemas when they are being sent over the wire. * The compression results are memoized to reduce the compression time since the @@ -107,7 +110,7 @@ class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[Gene val fingerprint = fingerprintCache.getOrElseUpdate(schema, { SchemaNormalization.parsingFingerprint64(schema) }) - getSchema(fingerprint) match { + schemas.get(fingerprint) match { case Some(_) => { output.writeBoolean(true) output.writeLong(fingerprint) @@ -122,8 +125,8 @@ class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[Gene } writerCache.getOrElseUpdate(schema, GenericData.get.createDatumWriter(schema)) - .asInstanceOf[DatumWriter[R]] - .write(datum, encoder) + .asInstanceOf[DatumWriter[R]] + .write(datum, encoder) encoder.flush() } @@ -136,7 +139,7 @@ class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[Gene if (input.readBoolean()) { val fingerprint = input.readLong() schemaCache.getOrElseUpdate(fingerprint, { - getSchema(fingerprint) match { + schemas.get(fingerprint) match { case Some(s) => new Schema.Parser().parse(s) case None => throw new RuntimeException(s"Unknown fingerprint: $fingerprint") } @@ -148,8 +151,8 @@ class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[Gene } val decoder = DecoderFactory.get.directBinaryDecoder(input, null) readerCache.getOrElseUpdate(schema, GenericData.get.createDatumReader(schema)) - .asInstanceOf[DatumReader[GenericRecord]] - .read(null.asInstanceOf[GenericRecord], decoder) + .asInstanceOf[DatumReader[GenericRecord]] + .read(null, decoder) } override def write(kryo: Kryo, output: KryoOutput, datum: GenericRecord): Unit = diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index a1b7deece399..0ae3d5ce30ae 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -74,7 +74,6 @@ class KryoSerializer(conf: SparkConf) private val classesToRegister = conf.get("spark.kryo.classesToRegister", "") .split(',') .filter(!_.isEmpty) - conf.getExecutorEnv private val avroSchemas = conf.getAvroSchema diff --git a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala index ffb05a9c79c6..cfc1baf930dd 100644 --- a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala @@ -63,7 +63,7 @@ class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { output.flush() val normalLength = output.total - beginningNormalPosition - conf.registerAvroSchema(Array(schema)) + conf.registerAvroSchemas(schema) val genericSerFinger = new GenericAvroSerializer(conf.getAvroSchema) val beginningFingerprintPosition = output.total() genericSerFinger.serializeDatum(record, output) From 0f5471ae7c2a95c38152640053e38b14154125dd Mon Sep 17 00:00:00 2001 From: Joseph Batchik Date: Tue, 14 Jul 2015 12:52:07 -0700 Subject: [PATCH 07/13] implemented @squito suggestion to use a codec that is already in spark --- .../scala/org/apache/spark/SparkConf.scala | 2 +- .../serializer/GenericAvroSerializer.scala | 39 +++++++------------ .../spark/serializer/KryoSerializer.scala | 1 + .../GenericAvroSerializerSuite.scala | 1 + 4 files changed, 16 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 0d7a1378bc9a..a928d0998dff 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable.LinkedHashSet -import org.apache.avro.{Schema, SchemaNormalization} +import org.apache.avro.Schema import org.apache.spark.serializer.GenericAvroSerializer.{avroSchemaNamespace, avroSchemaKey} import org.apache.spark.serializer.KryoSerializer diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala index 6a4a5c2f2395..58697178cee0 100644 --- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala @@ -17,9 +17,12 @@ package org.apache.spark.serializer -import java.io.ByteArrayOutputStream +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import java.nio.ByteBuffer -import java.util.zip.{Inflater, Deflater} + +import org.apache.commons.io.IOUtils +import org.apache.spark.io.CompressionCodec +import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream} import scala.collection.mutable @@ -66,18 +69,11 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) * same schema is compressed many times over */ def compress(schema: Schema): Array[Byte] = compressCache.getOrElseUpdate(schema, { - val deflater = new Deflater(Deflater.BEST_COMPRESSION) - val schemaBytes = schema.toString.getBytes("UTF-8") - deflater.setInput(schemaBytes) - deflater.finish() - val buffer = Array.ofDim[Byte](schemaBytes.length) - val outputStream = new ByteArrayOutputStream(schemaBytes.length) - while(!deflater.finished()) { - val count = deflater.deflate(buffer) - outputStream.write(buffer, 0, count) - } - outputStream.close() - outputStream.toByteArray + val bos = new ByteArrayOutputStream() + val out = new SnappyOutputStream(bos) + out.write(schema.toString.getBytes("UTF-8")) + out.close() + bos.toByteArray }) @@ -86,18 +82,9 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) * seen values so to limit the number of times that decompression has to be done. */ def decompress(schemaBytes: ByteBuffer): Schema = decompressCache.getOrElseUpdate(schemaBytes, { - val inflater = new Inflater() - val bytes = schemaBytes.array() - inflater.setInput(bytes) - val outputStream = new ByteArrayOutputStream(bytes.length) - val tmpBuffer = Array.ofDim[Byte](1024) - while (!inflater.finished()) { - val count = inflater.inflate(tmpBuffer) - outputStream.write(tmpBuffer, 0, count) - } - inflater.end() - outputStream.close() - new Schema.Parser().parse(new String(outputStream.toByteArray, "UTF-8")) + val bis = new ByteArrayInputStream(schemaBytes.array()) + val bytes = IOUtils.toByteArray(new SnappyInputStream(bis)) + new Schema.Parser().parse(new String(bytes, "UTF-8")) }) /** diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 0ae3d5ce30ae..06271d9c590d 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -34,6 +34,7 @@ import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, Roaring import org.apache.spark._ import org.apache.spark.api.python.PythonBroadcast import org.apache.spark.broadcast.HttpBroadcast +import org.apache.spark.io.CompressionCodec import org.apache.spark.network.nio.{GetBlock, GotBlock, PutBlock} import org.apache.spark.network.util.ByteUnit import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus} diff --git a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala index cfc1baf930dd..00eab599ddee 100644 --- a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala @@ -23,6 +23,7 @@ import java.nio.ByteBuffer import com.esotericsoftware.kryo.io.{Output, Input} import org.apache.avro.generic.GenericData.Record import org.apache.avro.{SchemaBuilder, Schema} +import org.apache.spark.io.CompressionCodec import org.apache.spark.{SparkFunSuite, SharedSparkContext} class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { From c5fe79416f2dfd040514f6ef7f0852cd174db466 Mon Sep 17 00:00:00 2001 From: Joseph Batchik Date: Wed, 15 Jul 2015 13:48:04 -0700 Subject: [PATCH 08/13] implemented @squito suggestion --- .../scala/org/apache/spark/SparkConf.scala | 22 ++++++++++-------- .../serializer/GenericAvroSerializer.scala | 23 ++++--------------- .../GenericAvroSerializerSuite.scala | 6 ++--- 3 files changed, 21 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index a928d0998dff..474c03fdd157 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -22,9 +22,8 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable.LinkedHashSet -import org.apache.avro.Schema +import org.apache.avro.{SchemaNormalization, Schema} -import org.apache.spark.serializer.GenericAvroSerializer.{avroSchemaNamespace, avroSchemaKey} import org.apache.spark.serializer.KryoSerializer import org.apache.spark.util.Utils @@ -162,20 +161,25 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { set("spark.serializer", classOf[KryoSerializer].getName) this } + + private final val avroNamespace = "avro.schema." + /** * Use Kryo serialization and register the given set of Avro schemas so that the generic * record serializer can decrease network IO */ - def registerAvroSchemas(schemas: Schema*): SparkConf = schemas.foldLeft(this) { (conf, schema) => - conf.set(avroSchemaKey(schema), schema.toString) + def registerAvroSchemas(schemas: Schema*): SparkConf = { + schemas.foldLeft(this) { (conf, schema) => + conf.set(avroNamespace + SchemaNormalization.parsingFingerprint64(schema), schema.toString) + } } /** Gets all the avro schemas in the configuration used in the generic Avro record serializer */ - def getAvroSchema: Map[Long, String] = - getAll.filter { case (k, v) => k.startsWith(avroSchemaNamespace) } - .map { case (k, v) => (k.substring(avroSchemaNamespace.length).toLong, v) } - .toMap - + def getAvroSchema: Map[Long, String] = { + getAll.filter { case (k, v) => k.startsWith(avroNamespace) } + .map { case (k, v) => (k.substring(avroNamespace.length).toLong, v) } + .toMap + } /** Remove a parameter from the configuration */ def remove(key: String): SparkConf = { diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala index 58697178cee0..853b1b53ddfc 100644 --- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala @@ -20,26 +20,17 @@ package org.apache.spark.serializer import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import java.nio.ByteBuffer -import org.apache.commons.io.IOUtils -import org.apache.spark.io.CompressionCodec -import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream} - import scala.collection.mutable +import org.apache.commons.io.IOUtils import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer} import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} +import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream} + import org.apache.avro.{Schema, SchemaNormalization} import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.avro.io._ -/** - * - */ -private[spark] object GenericAvroSerializer { - final val avroSchemaNamespace = "avro.schema." - def avroSchemaKey(schema: Schema): String = - avroSchemaNamespace + SchemaNormalization.parsingFingerprint64(schema) -} /** * Custom serializer used for generic Avro records. If the user registers the schemas @@ -98,17 +89,14 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) SchemaNormalization.parsingFingerprint64(schema) }) schemas.get(fingerprint) match { - case Some(_) => { + case Some(_) => output.writeBoolean(true) output.writeLong(fingerprint) - } - case None => { + case None => output.writeBoolean(false) val compressedSchema = compress(schema) output.writeInt(compressedSchema.length) output.writeBytes(compressedSchema) - - } } writerCache.getOrElseUpdate(schema, GenericData.get.createDatumWriter(schema)) @@ -148,4 +136,3 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) override def read(kryo: Kryo, input: KryoInput, datumClass: Class[GenericRecord]): GenericRecord = deserializeDatum(input) } - diff --git a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala index 00eab599ddee..bc9f3708ed69 100644 --- a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala @@ -17,13 +17,13 @@ package org.apache.spark.serializer -import java.io.{ByteArrayInputStream, ByteArrayOutputStream, OutputStream} +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import java.nio.ByteBuffer import com.esotericsoftware.kryo.io.{Output, Input} -import org.apache.avro.generic.GenericData.Record import org.apache.avro.{SchemaBuilder, Schema} -import org.apache.spark.io.CompressionCodec +import org.apache.avro.generic.GenericData.Record + import org.apache.spark.{SparkFunSuite, SharedSparkContext} class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { From fa9298b70d2e07b6a0f3df51e758c0fe7e77ba3e Mon Sep 17 00:00:00 2001 From: Joseph Batchik Date: Wed, 15 Jul 2015 13:53:33 -0700 Subject: [PATCH 09/13] forgot a couple of fixes --- .../org/apache/spark/serializer/GenericAvroSerializer.scala | 1 - .../scala/org/apache/spark/serializer/KryoSerializer.scala | 5 ++--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala index 853b1b53ddfc..9f5c11deeb4d 100644 --- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala @@ -67,7 +67,6 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) bos.toByteArray }) - /** * Decompresses the schema into the actual in-memory object. Keeps an internal cache of already * seen values so to limit the number of times that decompression has to be done. diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 06271d9c590d..3107a735e2e5 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -21,10 +21,10 @@ import java.io.{EOFException, IOException, InputStream, OutputStream} import java.nio.ByteBuffer import javax.annotation.Nullable -import org.apache.avro.generic.{GenericData, GenericRecord} - import scala.reflect.ClassTag +import org.apache.avro.generic.{GenericData, GenericRecord} + import com.esotericsoftware.kryo.{Kryo, KryoException} import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} @@ -34,7 +34,6 @@ import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, Roaring import org.apache.spark._ import org.apache.spark.api.python.PythonBroadcast import org.apache.spark.broadcast.HttpBroadcast -import org.apache.spark.io.CompressionCodec import org.apache.spark.network.nio.{GetBlock, GotBlock, PutBlock} import org.apache.spark.network.util.ByteUnit import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus} From 1183a483f6fa01dad4e54c0e7274391ec9647fa7 Mon Sep 17 00:00:00 2001 From: Joseph Batchik Date: Wed, 15 Jul 2015 14:48:43 -0700 Subject: [PATCH 10/13] updated codec settings --- .../spark/serializer/GenericAvroSerializer.scala | 8 +++++--- .../apache/spark/serializer/KryoSerializer.scala | 7 +++++-- .../serializer/GenericAvroSerializerSuite.scala | 16 +++++++++++----- 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala index 9f5c11deeb4d..9e152ca472e5 100644 --- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala @@ -20,6 +20,8 @@ package org.apache.spark.serializer import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import java.nio.ByteBuffer +import org.apache.spark.io.CompressionCodec + import scala.collection.mutable import org.apache.commons.io.IOUtils @@ -39,7 +41,7 @@ import org.apache.avro.io._ * Actions like parsing or compressing schemas are computationally expensive so the serializer * caches all previously seen values as to reduce the amount of work needed to do. */ -private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) +private[serializer] class GenericAvroSerializer(schemas: Map[Long, String], codec: CompressionCodec) extends KSerializer[GenericRecord] { /** Used to reduce the amount of effort to compress the schema */ @@ -61,7 +63,7 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) */ def compress(schema: Schema): Array[Byte] = compressCache.getOrElseUpdate(schema, { val bos = new ByteArrayOutputStream() - val out = new SnappyOutputStream(bos) + val out = codec.compressedOutputStream(bos) out.write(schema.toString.getBytes("UTF-8")) out.close() bos.toByteArray @@ -73,7 +75,7 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) */ def decompress(schemaBytes: ByteBuffer): Schema = decompressCache.getOrElseUpdate(schemaBytes, { val bis = new ByteArrayInputStream(schemaBytes.array()) - val bytes = IOUtils.toByteArray(new SnappyInputStream(bis)) + val bytes = IOUtils.toByteArray(codec.compressedInputStream(bis)) new Schema.Parser().parse(new String(bytes, "UTF-8")) }) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 3107a735e2e5..19a607726833 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -21,6 +21,8 @@ import java.io.{EOFException, IOException, InputStream, OutputStream} import java.nio.ByteBuffer import javax.annotation.Nullable +import org.apache.spark.io.CompressionCodec + import scala.reflect.ClassTag import org.apache.avro.generic.{GenericData, GenericRecord} @@ -76,6 +78,7 @@ class KryoSerializer(conf: SparkConf) .filter(!_.isEmpty) private val avroSchemas = conf.getAvroSchema + private val codec = CompressionCodec.createCodec(conf) def newKryoOutput(): KryoOutput = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) @@ -103,8 +106,8 @@ class KryoSerializer(conf: SparkConf) kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer()) kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer()) - kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas)) - kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas)) + kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas, codec)) + kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas, codec)) try { // Use the default classloader when calling the user registrator. diff --git a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala index bc9f3708ed69..9dc1df4f1516 100644 --- a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala @@ -23,6 +23,7 @@ import java.nio.ByteBuffer import com.esotericsoftware.kryo.io.{Output, Input} import org.apache.avro.{SchemaBuilder, Schema} import org.apache.avro.generic.GenericData.Record +import org.apache.spark.io.CompressionCodec import org.apache.spark.{SparkFunSuite, SharedSparkContext} @@ -37,12 +38,14 @@ class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { record.put("data", "test data") test("schema compression and decompression") { - val genericSer = new GenericAvroSerializer(conf.getAvroSchema) + val genericSer = new GenericAvroSerializer(conf.getAvroSchema, + CompressionCodec.createCodec(conf)) assert(schema === genericSer.decompress(ByteBuffer.wrap(genericSer.compress(schema)))) } test("record serialization and deserialization") { - val genericSer = new GenericAvroSerializer(conf.getAvroSchema) + val genericSer = new GenericAvroSerializer(conf.getAvroSchema, + CompressionCodec.createCodec(conf)) val outputStream = new ByteArrayOutputStream() val output = new Output(outputStream) @@ -55,7 +58,8 @@ class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { } test("uses schema fingerprint to decrease message size") { - val genericSerFull = new GenericAvroSerializer(conf.getAvroSchema) + val genericSerFull = new GenericAvroSerializer(conf.getAvroSchema, + CompressionCodec.createCodec(conf)) val output = new Output(new ByteArrayOutputStream()) @@ -65,7 +69,8 @@ class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { val normalLength = output.total - beginningNormalPosition conf.registerAvroSchemas(schema) - val genericSerFinger = new GenericAvroSerializer(conf.getAvroSchema) + val genericSerFinger = new GenericAvroSerializer(conf.getAvroSchema, + CompressionCodec.createCodec(conf)) val beginningFingerprintPosition = output.total() genericSerFinger.serializeDatum(record, output) val fingerprintLength = output.total - beginningFingerprintPosition @@ -74,7 +79,8 @@ class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { } test("caches previously seen schemas") { - val genericSer = new GenericAvroSerializer(conf.getAvroSchema) + val genericSer = new GenericAvroSerializer(conf.getAvroSchema, + CompressionCodec.createCodec(conf)) val compressedSchema = genericSer.compress(schema) val decompressedScheam = genericSer.decompress(ByteBuffer.wrap(compressedSchema)) From dd71efeb302a2009c448e1c5a030daf38f08b322 Mon Sep 17 00:00:00 2001 From: Joseph Batchik Date: Wed, 15 Jul 2015 15:21:21 -0700 Subject: [PATCH 11/13] fixed bug with serializing --- .../spark/serializer/GenericAvroSerializer.scala | 5 ++++- .../apache/spark/serializer/KryoSerializer.scala | 7 ++----- .../serializer/GenericAvroSerializerSuite.scala | 15 +++++---------- 3 files changed, 11 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala index 9e152ca472e5..5d90ff10ed9c 100644 --- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala @@ -20,6 +20,7 @@ package org.apache.spark.serializer import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import java.nio.ByteBuffer +import org.apache.spark.SparkConf import org.apache.spark.io.CompressionCodec import scala.collection.mutable @@ -41,7 +42,7 @@ import org.apache.avro.io._ * Actions like parsing or compressing schemas are computationally expensive so the serializer * caches all previously seen values as to reduce the amount of work needed to do. */ -private[serializer] class GenericAvroSerializer(schemas: Map[Long, String], codec: CompressionCodec) +private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[GenericRecord] { /** Used to reduce the amount of effort to compress the schema */ @@ -56,6 +57,8 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String], code private val fingerprintCache = new mutable.HashMap[Schema, Long]() private val schemaCache = new mutable.HashMap[Long, Schema]() + private val codec = CompressionCodec.createCodec(new SparkConf()) + /** * Used to compress Schemas when they are being sent over the wire. * The compression results are memoized to reduce the compression time since the diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 19a607726833..3107a735e2e5 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -21,8 +21,6 @@ import java.io.{EOFException, IOException, InputStream, OutputStream} import java.nio.ByteBuffer import javax.annotation.Nullable -import org.apache.spark.io.CompressionCodec - import scala.reflect.ClassTag import org.apache.avro.generic.{GenericData, GenericRecord} @@ -78,7 +76,6 @@ class KryoSerializer(conf: SparkConf) .filter(!_.isEmpty) private val avroSchemas = conf.getAvroSchema - private val codec = CompressionCodec.createCodec(conf) def newKryoOutput(): KryoOutput = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) @@ -106,8 +103,8 @@ class KryoSerializer(conf: SparkConf) kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer()) kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer()) - kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas, codec)) - kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas, codec)) + kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas)) + kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas)) try { // Use the default classloader when calling the user registrator. diff --git a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala index 9dc1df4f1516..b7f72da666fe 100644 --- a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala @@ -38,14 +38,12 @@ class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { record.put("data", "test data") test("schema compression and decompression") { - val genericSer = new GenericAvroSerializer(conf.getAvroSchema, - CompressionCodec.createCodec(conf)) + val genericSer = new GenericAvroSerializer(conf.getAvroSchema) assert(schema === genericSer.decompress(ByteBuffer.wrap(genericSer.compress(schema)))) } test("record serialization and deserialization") { - val genericSer = new GenericAvroSerializer(conf.getAvroSchema, - CompressionCodec.createCodec(conf)) + val genericSer = new GenericAvroSerializer(conf.getAvroSchema) val outputStream = new ByteArrayOutputStream() val output = new Output(outputStream) @@ -58,8 +56,7 @@ class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { } test("uses schema fingerprint to decrease message size") { - val genericSerFull = new GenericAvroSerializer(conf.getAvroSchema, - CompressionCodec.createCodec(conf)) + val genericSerFull = new GenericAvroSerializer(conf.getAvroSchema) val output = new Output(new ByteArrayOutputStream()) @@ -69,8 +66,7 @@ class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { val normalLength = output.total - beginningNormalPosition conf.registerAvroSchemas(schema) - val genericSerFinger = new GenericAvroSerializer(conf.getAvroSchema, - CompressionCodec.createCodec(conf)) + val genericSerFinger = new GenericAvroSerializer(conf.getAvroSchema) val beginningFingerprintPosition = output.total() genericSerFinger.serializeDatum(record, output) val fingerprintLength = output.total - beginningFingerprintPosition @@ -79,8 +75,7 @@ class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { } test("caches previously seen schemas") { - val genericSer = new GenericAvroSerializer(conf.getAvroSchema, - CompressionCodec.createCodec(conf)) + val genericSer = new GenericAvroSerializer(conf.getAvroSchema) val compressedSchema = genericSer.compress(schema) val decompressedScheam = genericSer.decompress(ByteBuffer.wrap(compressedSchema)) From c0cf32988d5c77655c09e9c798bbb49cb8b68250 Mon Sep 17 00:00:00 2001 From: Joseph Batchik Date: Thu, 16 Jul 2015 10:15:21 -0700 Subject: [PATCH 12/13] implemented @squito suggestion for SparkEnv --- .../spark/serializer/GenericAvroSerializer.scala | 12 +++++------- .../serializer/GenericAvroSerializerSuite.scala | 1 - 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala index 5d90ff10ed9c..3105bc7fbf38 100644 --- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala @@ -20,20 +20,17 @@ package org.apache.spark.serializer import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import java.nio.ByteBuffer -import org.apache.spark.SparkConf -import org.apache.spark.io.CompressionCodec - import scala.collection.mutable -import org.apache.commons.io.IOUtils import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer} import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} -import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream} - import org.apache.avro.{Schema, SchemaNormalization} import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.avro.io._ +import org.apache.commons.io.IOUtils +import org.apache.spark.SparkEnv +import org.apache.spark.io.CompressionCodec /** * Custom serializer used for generic Avro records. If the user registers the schemas @@ -57,7 +54,8 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) private val fingerprintCache = new mutable.HashMap[Schema, Long]() private val schemaCache = new mutable.HashMap[Long, Schema]() - private val codec = CompressionCodec.createCodec(new SparkConf()) + /** This needs to be lazy since SparkEnv is not initialized yet sometimes when this is called */ + private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf) /** * Used to compress Schemas when they are being sent over the wire. diff --git a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala index b7f72da666fe..bc9f3708ed69 100644 --- a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala @@ -23,7 +23,6 @@ import java.nio.ByteBuffer import com.esotericsoftware.kryo.io.{Output, Input} import org.apache.avro.{SchemaBuilder, Schema} import org.apache.avro.generic.GenericData.Record -import org.apache.spark.io.CompressionCodec import org.apache.spark.{SparkFunSuite, SharedSparkContext} From 8158d5113a9084e44482ebfe0fbb73fe3d7bddd8 Mon Sep 17 00:00:00 2001 From: Joseph Batchik Date: Wed, 22 Jul 2015 14:40:30 -0700 Subject: [PATCH 13/13] updated per feedback --- core/src/main/scala/org/apache/spark/SparkConf.scala | 5 +++-- .../spark/serializer/GenericAvroSerializer.scala | 12 ++++++++++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 474c03fdd157..4161792976c7 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -169,9 +169,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { * record serializer can decrease network IO */ def registerAvroSchemas(schemas: Schema*): SparkConf = { - schemas.foldLeft(this) { (conf, schema) => - conf.set(avroNamespace + SchemaNormalization.parsingFingerprint64(schema), schema.toString) + for (schema <- schemas) { + set(avroNamespace + SchemaNormalization.parsingFingerprint64(schema), schema.toString) } + this } /** Gets all the avro schemas in the configuration used in the generic Avro record serializer */ diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala index 3105bc7fbf38..b8e8fa3be876 100644 --- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala @@ -29,7 +29,7 @@ import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.avro.io._ import org.apache.commons.io.IOUtils -import org.apache.spark.SparkEnv +import org.apache.spark.{SparkException, SparkEnv} import org.apache.spark.io.CompressionCodec /** @@ -38,6 +38,9 @@ import org.apache.spark.io.CompressionCodec * schema, as to reduce network IO. * Actions like parsing or compressing schemas are computationally expensive so the serializer * caches all previously seen values as to reduce the amount of work needed to do. + * @param schemas a map where the keys are unique IDs for Avro schemas and the values are the + * string representation of the Avro schema, used to decrease the amount of data + * that needs to be serialized. */ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[GenericRecord] { @@ -118,7 +121,12 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) schemaCache.getOrElseUpdate(fingerprint, { schemas.get(fingerprint) match { case Some(s) => new Schema.Parser().parse(s) - case None => throw new RuntimeException(s"Unknown fingerprint: $fingerprint") + case None => + throw new SparkException( + """Error reading attempting to read avro data -- + |encountered an unknown fingerprint: $fingerprint, not sure what schema to use. + |This could happen if you registered additional schemas after starting your + |spark context.""".stripMargin) } }) } else {