Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro

// go through the input array to calculate how many bytes we need.
val calculateNumBytes = elementType match {
case _ if (ctx.isPrimitiveType(elementType)) =>
case _ if ctx.isPrimitiveType(elementType) =>
// Should we do word align?
val elementSize = elementType.defaultSize
s"""
Expand All @@ -237,6 +237,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
case _ =>
val writer = getWriter(elementType)
val elementSize = s"$writer.getSize($elements[$index])"
// TODO(davies): avoid the copy
val unsafeType = elementType match {
case _: StructType => "UnsafeRow"
case _: ArrayType => "UnsafeArrayData"
Expand All @@ -249,8 +250,13 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
case _ => ""
}

val newElements = if (elementType == BinaryType) {
s"new byte[$numElements][]"
} else {
s"new $unsafeType[$numElements]"
}
s"""
final $unsafeType[] $elements = new $unsafeType[$numElements];
final $unsafeType[] $elements = $newElements;
for (int $index = 0; $index < $numElements; $index++) {
${convertedElement.code}
if (!${convertedElement.isNull}) {
Expand All @@ -262,7 +268,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
}

val writeElement = elementType match {
case _ if (ctx.isPrimitiveType(elementType)) =>
case _ if ctx.isPrimitiveType(elementType) =>
// Should we do word align?
val elementSize = elementType.defaultSize
s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

/**
Expand Down Expand Up @@ -79,4 +79,23 @@ class GeneratedProjectionSuite extends SparkFunSuite {
val row2 = mutableProj(result)
assert(result === row2)
}

test("generated unsafe projection with array of binary") {
val row = InternalRow(
Array[Byte](1, 2),
new GenericArrayData(Array(Array[Byte](1, 2), null, Array[Byte](3, 4))))
val fields = (BinaryType :: ArrayType(BinaryType) :: Nil).toArray[DataType]

val unsafeProj = UnsafeProjection.create(fields)
val unsafeRow: UnsafeRow = unsafeProj(row)
assert(java.util.Arrays.equals(unsafeRow.getBinary(0), Array[Byte](1, 2)))
assert(java.util.Arrays.equals(unsafeRow.getArray(1).getBinary(0), Array[Byte](1, 2)))
assert(unsafeRow.getArray(1).isNullAt(1))
assert(unsafeRow.getArray(1).getBinary(1) === null)
assert(java.util.Arrays.equals(unsafeRow.getArray(1).getBinary(2), Array[Byte](3, 4)))

val safeProj = FromUnsafeProjection(fields)
val row2 = safeProj(unsafeRow)
assert(row2 === row)
}
}