Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected val DISTINCT = Keyword("DISTINCT")
protected val FALSE = Keyword("FALSE")
protected val FIRST = Keyword("FIRST")
protected val LAST = Keyword("LAST")
protected val FROM = Keyword("FROM")
protected val FULL = Keyword("FULL")
protected val GROUP = Keyword("GROUP")
Expand Down Expand Up @@ -309,6 +310,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
case s ~ _ ~ _ ~ _ ~ _ ~ e => ApproxCountDistinct(e, s.toDouble)
} |
FIRST ~> "(" ~> expression <~ ")" ^^ { case exp => First(exp) } |
LAST ~> "(" ~> expression <~ ")" ^^ { case exp => Last(exp) } |
AVG ~> "(" ~> expression <~ ")" ^^ { case exp => Average(exp) } |
MIN ~> "(" ~> expression <~ ")" ^^ { case exp => Min(exp) } |
MAX ~> "(" ~> expression <~ ")" ^^ { case exp => Max(exp) } |
Expand Down
1 change: 1 addition & 0 deletions sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ package object dsl {
def approxCountDistinct(e: Expression, rsd: Double = 0.05) = ApproxCountDistinct(e, rsd)
def avg(e: Expression) = Average(e)
def first(e: Expression) = First(e)
def last(e: Expression) = Last(e)
def min(e: Expression) = Min(e)
def max(e: Expression) = Max(e)
def upper(e: Expression) = Upper(e)
Expand Down
27 changes: 27 additions & 0 deletions sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,21 @@ case class First(child: Expression) extends PartialAggregate with trees.UnaryNod
override def newInstance() = new FirstFunction(child, this)
}

case class Last(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
override def references = child.references
override def nullable = true
override def dataType = child.dataType
override def toString = s"LAST($child)"

override def asPartial: SplitEvaluation = {
val partialLast = Alias(Last(child), "PartialLast")()
SplitEvaluation(
Last(partialLast.toAttribute),
partialLast :: Nil)
}
override def newInstance() = new LastFunction(child, this)
}

case class AverageFunction(expr: Expression, base: AggregateExpression)
extends AggregateFunction {

Expand Down Expand Up @@ -409,3 +424,15 @@ case class FirstFunction(expr: Expression, base: AggregateExpression) extends Ag

override def eval(input: Row): Any = result
}

case class LastFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction {
def this() = this(null, null) // Required for serialization.

var result: Any = null

override def update(input: Row): Unit = {
result = input
}

override def eval(input: Row): Any = if (result != null) expr.eval(result.asInstanceOf[Row]) else null
}