Skip to content

Commit 92e7ecb

Browse files
mgaido91hvanhovell
authored andcommitted
[SPARK-23592][SQL] Add interpreted execution to DecodeUsingSerializer
## What changes were proposed in this pull request? The PR adds interpreted execution to DecodeUsingSerializer. ## How was this patch tested? added UT Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Marco Gaido <marcogaido91@gmail.com> Closes #20760 from mgaido91/SPARK-23592.
1 parent 7013eea commit 92e7ecb

File tree

2 files changed

+20
-0
lines changed

2 files changed

+20
-0
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1242,6 +1242,11 @@ case class EncodeUsingSerializer(child: Expression, kryo: Boolean)
12421242
case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: Boolean)
12431243
extends UnaryExpression with NonSQLExpression with SerializerSupport {
12441244

1245+
override def nullSafeEval(input: Any): Any = {
1246+
val inputBytes = java.nio.ByteBuffer.wrap(input.asInstanceOf[Array[Byte]])
1247+
serializerInstance.deserialize(inputBytes)
1248+
}
1249+
12451250
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
12461251
val serializer = addImmutableSerializerIfNeeded(ctx)
12471252
// Code to deserialize.

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.catalyst.expressions
1919

20+
import scala.reflect.ClassTag
21+
2022
import org.apache.spark.{SparkConf, SparkFunSuite}
2123
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
2224
import org.apache.spark.sql.Row
@@ -123,4 +125,17 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
123125
checkEvaluation(encodeUsingSerializer, null, InternalRow.fromSeq(Seq(null)))
124126
}
125127
}
128+
129+
test("SPARK-23592: DecodeUsingSerializer should support interpreted execution") {
130+
val cls = classOf[java.lang.Integer]
131+
val inputObject = BoundReference(0, ObjectType(classOf[Array[Byte]]), nullable = true)
132+
val conf = new SparkConf()
133+
Seq(true, false).foreach { useKryo =>
134+
val serializer = if (useKryo) new KryoSerializer(conf) else new JavaSerializer(conf)
135+
val input = serializer.newInstance().serialize(new Integer(1)).array()
136+
val decodeUsingSerializer = DecodeUsingSerializer(inputObject, ClassTag(cls), useKryo)
137+
checkEvaluation(decodeUsingSerializer, new Integer(1), InternalRow.fromSeq(Seq(input)))
138+
checkEvaluation(decodeUsingSerializer, null, InternalRow.fromSeq(Seq(null)))
139+
}
140+
}
126141
}

0 commit comments

Comments
 (0)