Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,16 @@ case class MapObjects private(
s"$seq == null ? $array[$loopIndex] : $seq.apply($loopIndex)"
}

// Make a copy of the data if it's unsafe-backed
def makeCopyIfInstanceOf(clazz: Class[_ <: Any], value: String) =
s"$value instanceof ${clazz.getSimpleName}? ${value}.copy() : $value"
val genFunctionValue = lambdaFunction.dataType match {
Copy link
Contributor

@hvanhovell hvanhovell Aug 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also copy UTF8String? This can be backed by a reused buffer. You'll need to use clone() instead of copy though.

Copy link
Contributor Author

@lw-lin lw-lin Aug 25, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're calling toString on UTF8Strings, so maybe there's no need to clone UTF8Strings?

...
/* 072 */    value8 = MapObjects_loopValue2.getUTF8String(0);
...
/* 082 */    funcResult = value8.toString();
...
/* 086 */    value7 = (java.lang.String) funcResult;
...
/* 128 */    convertedArray[loopIndex] = ...;

case StructType(_) => makeCopyIfInstanceOf(classOf[UnsafeRow], genFunction.value)
case ArrayType(_, _) => makeCopyIfInstanceOf(classOf[UnsafeArrayData], genFunction.value)
case MapType(_, _, _) => makeCopyIfInstanceOf(classOf[UnsafeMapData], genFunction.value)
case _ => genFunction.value
}

val loopNullCheck = inputDataType match {
case _: ArrayType => s"$loopIsNull = ${genInputData.value}.isNullAt($loopIndex);"
// The element of primitive array will never be null.
Expand Down Expand Up @@ -501,7 +511,7 @@ case class MapObjects private(
if (${genFunction.isNull}) {
$convertedArray[$loopIndex] = null;
} else {
$convertedArray[$loopIndex] = ${genFunction.value};
$convertedArray[$loopIndex] = $genFunctionValue;
}

$loopIndex += 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
// some expression is reusing variable names across different instances.
// This behavior is tested in ExpressionEvalHelperSuite.
val plan = generateProject(
GenerateUnsafeProjection.generate(
UnsafeProjection.create(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to prevent issues with Create*Struct right?

Copy link
Contributor Author

@lw-lin lw-lin Aug 25, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea.

GenerateUnsafeProjection.generate here was unable to use unsafe-backed data structure because it's Create*Struct.

UnsafeProjection.create, however, does use unsafe-backed data structure (UnsafeRow, UnsafeArrayData, ...) so that ExpressionEvalHelper.checkEvalutionWithUnsafeProjection is valid in testing what it's supposed to test against.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But looks like this change doesn't reflect in the test? Without this change, the added test is passed too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya maybe test against the following?

  • + this patch's changes to ObjectExpressionsSuite.scala
  • + this patch's changes to ExpressionEvalHelper.scala (this is also critical)
  • - this patch's changes to objects.scala

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lw-lin without this patch's changes to ExpressionEvalHelper.scala, this test still passes.

Copy link
Member

@viirya viirya Sep 23, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • + this patch's changes to ObjectExpressionsSuite.scala
  • + this patch's changes to ExpressionEvalHelper.scala (this is also critical)
  • - this patch's changes to objects.scala

Under this case, the test failed.

Alias(expression, s"Optimized($expression)1")() ::
Alias(expression, s"Optimized($expression)2")() :: Nil),
expression)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.objects.Invoke
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
import org.apache.spark.sql.types.{IntegerType, ObjectType}


Expand All @@ -32,4 +34,36 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val invoke = Invoke(inputObject, "_2", IntegerType)
checkEvaluationWithGeneratedMutableProjection(invoke, null, inputRow)
}

test("MapObjects should make copies of unsafe-backed data") {
// test UnsafeRow-backed data
val structEncoder = ExpressionEncoder[Array[Tuple2[java.lang.Integer, java.lang.Integer]]]
val structInputRow = InternalRow.fromSeq(Seq(Array((1, 2), (3, 4))))
val structExpected = new GenericArrayData(
Array(InternalRow.fromSeq(Seq(1, 2)), InternalRow.fromSeq(Seq(3, 4))))
checkEvalutionWithUnsafeProjection(
structEncoder.serializer.head, structExpected, structInputRow)

// test UnsafeArray-backed data
val arrayEncoder = ExpressionEncoder[Array[Array[Int]]]
val arrayInputRow = InternalRow.fromSeq(Seq(Array(Array(1, 2), Array(3, 4))))
val arrayExpected = new GenericArrayData(
Array(new GenericArrayData(Array(1, 2)), new GenericArrayData(Array(3, 4))))
checkEvalutionWithUnsafeProjection(
arrayEncoder.serializer.head, arrayExpected, arrayInputRow)

// test UnsafeMap-backed data
val mapEncoder = ExpressionEncoder[Array[Map[Int, Int]]]
val mapInputRow = InternalRow.fromSeq(Seq(Array(
Map(1 -> 100, 2 -> 200), Map(3 -> 300, 4 -> 400))))
val mapExpected = new GenericArrayData(Seq(
new ArrayBasedMapData(
new GenericArrayData(Array(1, 2)),
new GenericArrayData(Array(100, 200))),
new ArrayBasedMapData(
new GenericArrayData(Array(3, 4)),
new GenericArrayData(Array(300, 400)))))
checkEvalutionWithUnsafeProjection(
mapEncoder.serializer.head, mapExpected, mapInputRow)
}
}