Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,15 @@ class SortBasedAggregationIterator(
// The aggregation buffer used by the sort-based aggregation.
private[this] val sortBasedAggregationBuffer: MutableRow = newBuffer

// A SafeProjection to turn UnsafeRow into GenericInternalRow, because UnsafeRow can't be
// compared to MutableRow (aggregation buffer) directly.
// This safe projection is used to turn the input row into safe row. This is necessary
// because the input row may be produced by unsafe projection in child operator and all the
// produced rows share one byte array. However, when we update the aggregate buffer according to
// the input row, we may cache some values from input row, e.g. `Max` will keep the max value from
// input row via MutableProjection, `CollectList` will keep all values in an array via
// ImperativeAggregate framework. These values may get changed unexpectedly if the underlying
// unsafe projection update the shared byte array. By applying a safe projection to the input row,
// we can cut down the connection from input row to the shared byte array, and thus it's safe to
// cache values from input row while updating the aggregation buffer.
private[this] val safeProj: Projection = FromUnsafeProjection(valueAttributes.map(_.dataType))

protected def initialize(): Unit = {
Expand Down