Skip to content

Commit 1d98085

Browse files
committed
Explicitly implement KryoSerialization for LazilyGenerateOrdering
1 parent 7bb64aa commit 1d98085

File tree

2 files changed

+23
-5
lines changed

2 files changed

+23
-5
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ package org.apache.spark.sql.catalyst.expressions.codegen
1919

2020
import java.io.ObjectInputStream
2121

22+
import com.esotericsoftware.kryo.{Kryo, KryoSerializable}
23+
import com.esotericsoftware.kryo.io.{Input, Output}
24+
2225
import org.apache.spark.internal.Logging
2326
import org.apache.spark.sql.catalyst.InternalRow
2427
import org.apache.spark.sql.catalyst.expressions._
@@ -147,7 +150,8 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
147150
/**
148151
* A lazily generated row ordering comparator.
149152
*/
150-
class LazilyGeneratedOrdering(val ordering: Seq[SortOrder]) extends Ordering[InternalRow] {
153+
class LazilyGeneratedOrdering(val ordering: Seq[SortOrder])
154+
extends Ordering[InternalRow] with KryoSerializable {
151155

152156
def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
153157
this(ordering.map(BindReferences.bindReference(_, inputSchema)))
@@ -163,6 +167,14 @@ class LazilyGeneratedOrdering(val ordering: Seq[SortOrder]) extends Ordering[Int
163167
in.defaultReadObject()
164168
generatedOrdering = GenerateOrdering.generate(ordering)
165169
}
170+
171+
override def write(kryo: Kryo, out: Output): Unit = Utils.tryOrIOException {
172+
kryo.writeObject(out, ordering.toArray)
173+
}
174+
175+
override def read(kryo: Kryo, in: Input): Unit = Utils.tryOrIOException {
176+
generatedOrdering = GenerateOrdering.generate(kryo.readObject(in, classOf[Array[SortOrder]]))
177+
}
166178
}
167179

168180
object LazilyGeneratedOrdering {

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@ package org.apache.spark.sql.catalyst.expressions
1919

2020
import scala.math._
2121

22-
import org.apache.spark.SparkFunSuite
22+
import org.apache.spark.{SparkConf, SparkFunSuite}
23+
import org.apache.spark.serializer.KryoSerializer
2324
import org.apache.spark.sql.{RandomDataGenerator, Row}
2425
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
2526
import org.apache.spark.sql.catalyst.dsl.expressions._
26-
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering
27+
import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateOrdering, LazilyGeneratedOrdering}
2728
import org.apache.spark.sql.types._
2829

2930
class OrderingSuite extends SparkFunSuite with ExpressionEvalHelper {
@@ -44,9 +45,14 @@ class OrderingSuite extends SparkFunSuite with ExpressionEvalHelper {
4445
case Ascending => signum(expected)
4546
case Descending => -1 * signum(expected)
4647
}
48+
49+
val kryo = new KryoSerializer(new SparkConf).newInstance()
4750
val intOrdering = new InterpretedOrdering(sortOrder :: Nil)
48-
val genOrdering = GenerateOrdering.generate(sortOrder :: Nil)
49-
Seq(intOrdering, genOrdering).foreach { ordering =>
51+
val genOrdering = new LazilyGeneratedOrdering(sortOrder :: Nil)
52+
val kryoIntOrdering = kryo.deserialize[InterpretedOrdering](kryo.serialize(intOrdering))
53+
val kryoGenOrdering = kryo.deserialize[LazilyGeneratedOrdering](kryo.serialize(genOrdering))
54+
55+
Seq(intOrdering, genOrdering, kryoIntOrdering, kryoGenOrdering).foreach { ordering =>
5056
assert(ordering.compare(rowA, rowA) === 0)
5157
assert(ordering.compare(rowB, rowB) === 0)
5258
assert(signum(ordering.compare(rowA, rowB)) === expectedCompareResult)

0 commit comments

Comments
 (0)