From 5dfd2b8873fe9435a8ca6d50269a64bde6b307e4 Mon Sep 17 00:00:00 2001 From: Gaurav Gupta Date: Tue, 4 Aug 2015 15:12:46 +0530 Subject: [PATCH] Modified First and Last aggregates to calculate on GroupedData partition and not on entire DataFrame partition. --- .../sql/catalyst/expressions/aggregates.scala | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala index 64e07bd2a17db..5be1a7eabd5bb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala @@ -717,15 +717,15 @@ case class First(child: Expression) extends PartialAggregate with trees.UnaryNod case class FirstFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction { def this() = this(null, null) // Required for serialization. - var result: Any = null + var result: MutableLiteral = MutableLiteral(null, expr.dataType) override def update(input: InternalRow): Unit = { - if (result == null) { - result = expr.eval(input) + if (result.value == null) { + result.value = expr.eval(input) } } - override def eval(input: InternalRow): Any = result + override def eval(input: InternalRow): Any = result.value } case class Last(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] { @@ -746,13 +746,11 @@ case class Last(child: Expression) extends PartialAggregate with trees.UnaryNode case class LastFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction { def this() = this(null, null) // Required for serialization. - var result: Any = null + var result: MutableLiteral = MutableLiteral(null, expr.dataType) override def update(input: InternalRow): Unit = { - result = input + result.value = expr.eval(input) } - override def eval(input: InternalRow): Any = { - if (result != null) expr.eval(result.asInstanceOf[InternalRow]) else null - } + override def eval(input: InternalRow): Any = result.value }