Skip to content

Commit ab457ca

Browse files
committed
Sketch a loan/release based solution.
This makes it safe to invoke all SerializerInstance methods at any time, including the creation of multiple open OutputStreams from the same KryoSerializerInstance.
1 parent 9816e8f commit ab457ca

File tree

1 file changed

+75
-20
lines changed

1 file changed

+75
-20
lines changed

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 75 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -136,21 +136,37 @@ class KryoSerializer(conf: SparkConf)
136136
}
137137

138138
private[spark]
139-
class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream {
140-
val output = new KryoOutput(outStream)
139+
class KryoSerializationStream(
140+
serInstance: KryoSerializerInstance,
141+
outStream: OutputStream) extends SerializationStream {
142+
143+
private[this] var output: KryoOutput = new KryoOutput(outStream)
144+
private[this] var kryo: Kryo = serInstance.borrowKryo()
141145

142146
override def writeObject[T: ClassTag](t: T): SerializationStream = {
143147
kryo.writeClassAndObject(output, t)
144148
this
145149
}
146150

147151
override def flush() { output.flush() }
148-
override def close() { output.close() }
152+
override def close() {
153+
try {
154+
output.close()
155+
} finally {
156+
serInstance.releaseKryo(kryo)
157+
kryo = null
158+
output = null
159+
}
160+
}
149161
}
150162

151163
private[spark]
152-
class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends DeserializationStream {
153-
private val input = new KryoInput(inStream)
164+
class KryoDeserializationStream(
165+
serInstance: KryoSerializerInstance,
166+
inStream: InputStream) extends DeserializationStream {
167+
168+
private[this] var input: KryoInput = new KryoInput(inStream)
169+
private[this] var kryo: Kryo = serInstance.borrowKryo()
154170

155171
override def readObject[T: ClassTag](): T = {
156172
try {
@@ -163,52 +179,86 @@ class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends Deser
163179
}
164180

165181
override def close() {
166-
// Kryo's Input automatically closes the input stream it is using.
167-
input.close()
182+
try {
183+
// Kryo's Input automatically closes the input stream it is using.
184+
input.close()
185+
} finally {
186+
serInstance.releaseKryo(kryo)
187+
kryo = null
188+
input = null
189+
}
168190
}
169191
}
170192

171193
private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
172-
private val kryo = ks.newKryo()
194+
195+
private[this] var cachedKryo: Kryo = ks.newKryo()
196+
197+
private[spark] def borrowKryo(): Kryo = {
198+
if (cachedKryo != null) {
199+
val kryo = cachedKryo
200+
kryo.reset() // We must reset in case this serializer instance was reused (see SPARK-7766)
201+
cachedKryo = null
202+
kryo
203+
} else {
204+
ks.newKryo()
205+
}
206+
}
207+
208+
private[spark] def releaseKryo(kryo: Kryo): Unit = {
209+
if (cachedKryo == null) {
210+
cachedKryo = kryo
211+
}
212+
}
173213

174214
// Make these lazy vals to avoid creating a buffer unless we use them
175215
private lazy val output = ks.newKryoOutput()
176216
private lazy val input = new KryoInput()
177217

178218
override def serialize[T: ClassTag](t: T): ByteBuffer = {
179219
output.clear()
180-
kryo.reset() // We must reset in case this serializer instance was reused (see SPARK-7766)
220+
val kryo = borrowKryo()
181221
try {
182222
kryo.writeClassAndObject(output, t)
183223
} catch {
184224
case e: KryoException if e.getMessage.startsWith("Buffer overflow") =>
185225
throw new SparkException(s"Kryo serialization failed: ${e.getMessage}. To avoid this, " +
186226
"increase spark.kryoserializer.buffer.max value.")
227+
} finally {
228+
releaseKryo(kryo)
187229
}
188230
ByteBuffer.wrap(output.toBytes)
189231
}
190232

191233
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
192-
input.setBuffer(bytes.array)
193-
kryo.readClassAndObject(input).asInstanceOf[T]
234+
val kryo = borrowKryo()
235+
try {
236+
input.setBuffer(bytes.array)
237+
kryo.readClassAndObject(input).asInstanceOf[T]
238+
} finally {
239+
releaseKryo(kryo)
240+
}
194241
}
195242

196243
override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = {
244+
val kryo = borrowKryo()
197245
val oldClassLoader = kryo.getClassLoader
198-
kryo.setClassLoader(loader)
199-
input.setBuffer(bytes.array)
200-
val obj = kryo.readClassAndObject(input).asInstanceOf[T]
201-
kryo.setClassLoader(oldClassLoader)
202-
obj
246+
try {
247+
kryo.setClassLoader(loader)
248+
input.setBuffer(bytes.array)
249+
kryo.readClassAndObject(input).asInstanceOf[T]
250+
} finally {
251+
kryo.setClassLoader(oldClassLoader)
252+
releaseKryo(kryo)
253+
}
203254
}
204255

205256
override def serializeStream(s: OutputStream): SerializationStream = {
206-
kryo.reset() // We must reset in case this serializer instance was reused (see SPARK-7766)
207-
new KryoSerializationStream(kryo, s)
257+
new KryoSerializationStream(this, s)
208258
}
209259

210260
override def deserializeStream(s: InputStream): DeserializationStream = {
211-
new KryoDeserializationStream(kryo, s)
261+
new KryoDeserializationStream(this, s)
212262
}
213263

214264
/**
@@ -218,7 +268,12 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ
218268
def getAutoReset(): Boolean = {
219269
val field = classOf[Kryo].getDeclaredField("autoReset")
220270
field.setAccessible(true)
221-
field.get(kryo).asInstanceOf[Boolean]
271+
val kryo = borrowKryo()
272+
try {
273+
field.get(kryo).asInstanceOf[Boolean]
274+
} finally {
275+
releaseKryo(kryo)
276+
}
222277
}
223278
}
224279

0 commit comments

Comments
 (0)