Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Column.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
package org.apache.spark.sql

import scala.language.implicitConversions
import scala.collection.JavaConversions._

import org.apache.spark.annotation.Experimental
import org.apache.spark.Logging
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.analysis.{MultiAlias, UnresolvedAttribute, UnresolvedStar, UnresolvedExtractValue}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.types._


Expand Down Expand Up @@ -889,6 +889,22 @@ class Column(protected[sql] val expr: Expression) extends Logging {
*/
def bitwiseXOR(other: Any): Column = BitwiseXor(expr, lit(other).expr)

/**
* Define a windowing column.
*
* {{{
* val w = Window.partitionBy("name").orderBy("id")
* df.select(
* sum("price").over(w.rangeBetween(Long.MinValue, 2)),
* avg("price").over(w.rowsBetween(0, 4))
* )
* }}}
*
* @group expr_ops
* @since 1.4.0
*/
def over(window: expressions.WindowSpec): Column = window.withAggregate(this)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: throw an exception if this is a not an aggregate function


}


Expand Down
9 changes: 5 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.analysis.{MultiAlias, ResolvedStar, Unresol
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{Filter, _}
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, ScalaReflection, SqlParser}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser}
import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD}
import org.apache.spark.sql.json.JacksonGenerator
import org.apache.spark.sql.sources.CreateTableUsingAsSelect
Expand Down Expand Up @@ -411,7 +411,7 @@ class DataFrame private[sql](
joined.left,
joined.right,
joinType = Inner,
Some(expressions.EqualTo(
Some(catalyst.expressions.EqualTo(
joined.left.resolve(usingColumn),
joined.right.resolve(usingColumn))))
)
Expand Down Expand Up @@ -480,8 +480,9 @@ class DataFrame private[sql](
// By the time we get here, since we have already run analysis, all attributes should've been
// resolved and become AttributeReference.
val cond = plan.condition.map { _.transform {
case expressions.EqualTo(a: AttributeReference, b: AttributeReference) if a.sameRef(b) =>
expressions.EqualTo(plan.left.resolve(a.name), plan.right.resolve(b.name))
case catalyst.expressions.EqualTo(a: AttributeReference, b: AttributeReference)
if a.sameRef(b) =>
catalyst.expressions.EqualTo(plan.left.resolve(a.name), plan.right.resolve(b.name))
}}
plan.copy(condition = cond)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.expressions

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions._

/**
* :: Experimental ::
* Utility functions for defining window in DataFrames.
*
* {{{
* // PARTITION BY country ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
* Window.partitionBy("country").orderBy("date").rowsBetween(Long.MinValue, 0)
*
* // PARTITION BY country ORDER BY date ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING
* Window.partitionBy("country").orderBy("date").rowsBetween(-3, 3)
* }}}
*
* @since 1.4.0
*/
@Experimental
object Window {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we not going to support the case without any partitionBy or orderBy window?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i thought about it but i don't think there is a strong enough use case to have that. if users complain, we can add them later.

/**
* Creates a [[WindowSpec]] with the partitioning defined.
* @since 1.4.0
*/
@scala.annotation.varargs
def partitionBy(colName: String, colNames: String*): WindowSpec = {
spec.partitionBy(colName, colNames : _*)
}

/**
* Creates a [[WindowSpec]] with the partitioning defined.
* @since 1.4.0
*/
@scala.annotation.varargs
def partitionBy(cols: Column*): WindowSpec = {
spec.partitionBy(cols : _*)
}

/**
* Creates a [[WindowSpec]] with the ordering defined.
* @since 1.4.0
*/
@scala.annotation.varargs
def orderBy(colName: String, colNames: String*): WindowSpec = {
spec.orderBy(colName, colNames : _*)
}

/**
* Creates a [[WindowSpec]] with the ordering defined.
* @since 1.4.0
*/
@scala.annotation.varargs
def orderBy(cols: Column*): WindowSpec = {
spec.orderBy(cols : _*)
}

private def spec: WindowSpec = {
new WindowSpec(Seq.empty, Seq.empty, UnspecifiedFrame)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.expressions

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.{Column, catalyst}
import org.apache.spark.sql.catalyst.expressions._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a trait WindowSpec under sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpression.scala, will that be better to move this WindowSpec into the package org.apache.spark.sql?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm trying to be more future proof here, because we might have other public facing expressions in the future.



/**
* :: Experimental ::
* A window specification that defines the partitioning, ordering, and frame boundaries.
*
* Use the static methods in [[Window]] to create a [[WindowSpec]].
*
* @since 1.4.0
*/
@Experimental
class WindowSpec private[sql](
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
frame: catalyst.expressions.WindowFrame) {

/**
* Defines the partitioning columns in a [[WindowSpec]].
* @since 1.4.0
*/
@scala.annotation.varargs
def partitionBy(colName: String, colNames: String*): WindowSpec = {
partitionBy((colName +: colNames).map(Column(_)): _*)
}

/**
* Defines the partitioning columns in a [[WindowSpec]].
* @since 1.4.0
*/
@scala.annotation.varargs
def partitionBy(cols: Column*): WindowSpec = {
new WindowSpec(cols.map(_.expr), orderSpec, frame)
}

/**
* Defines the ordering columns in a [[WindowSpec]].
* @since 1.4.0
*/
@scala.annotation.varargs
def orderBy(colName: String, colNames: String*): WindowSpec = {
orderBy((colName +: colNames).map(Column(_)): _*)
}

/**
* Defines the ordering columns in a [[WindowSpec]].
* @since 1.4.0
*/
@scala.annotation.varargs
def orderBy(cols: Column*): WindowSpec = {
val sortOrder: Seq[SortOrder] = cols.map { col =>
col.expr match {
case expr: SortOrder =>
expr
case expr: Expression =>
SortOrder(expr, Ascending)
}
}
new WindowSpec(partitionSpec, sortOrder, frame)
}

/**
* Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
*
* Both `start` and `end` are relative positions from the current row. For example, "0" means
* "current row", while "-1" means the row before the current row, and "5" means the fifth row
* after the current row.
*
* @param start boundary start, inclusive.
* The frame is unbounded if this is the minimum long value.
* @param end boundary end, inclusive.
* The frame is unbounded if this is the maximum long value.
* @since 1.4.0
*/
def rowsBetween(start: Long, end: Long): WindowSpec = {
between(RowFrame, start, end)
}

/**
* Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
*
* Both `start` and `end` are relative from the current row. For example, "0" means "current row",
* while "-1" means one off before the current row, and "5" means the five off after the
* current row.
*
* @param start boundary start, inclusive.
* The frame is unbounded if this is the minimum long value.
* @param end boundary end, inclusive.
* The frame is unbounded if this is the maximum long value.
* @since 1.4.0
*/
def rangeBetween(start: Long, end: Long): WindowSpec = {
between(RangeFrame, start, end)
}

private def between(typ: FrameType, start: Long, end: Long): WindowSpec = {
val boundaryStart = start match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea to use the negative number as preceding. :)

case 0 => CurrentRow
case Long.MinValue => UnboundedPreceding
case x if x < 0 => ValuePreceding(-start.toInt)
case x if x > 0 => ValueFollowing(start.toInt)
}

val boundaryEnd = end match {
case 0 => CurrentRow
case Long.MaxValue => UnboundedFollowing
case x if x < 0 => ValuePreceding(-end.toInt)
case x if x > 0 => ValueFollowing(end.toInt)
}

new WindowSpec(
partitionSpec,
orderSpec,
SpecifiedWindowFrame(typ, boundaryStart, boundaryEnd))
}

/**
* Converts this [[WindowSpec]] into a [[Column]] with an aggregate expression.
*/
private[sql] def withAggregate(aggregate: Column): Column = {
val windowExpr = aggregate.expr match {
case Average(child) => WindowExpression(
UnresolvedWindowFunction("avg", child :: Nil),
WindowSpecDefinition(partitionSpec, orderSpec, frame))
case Sum(child) => WindowExpression(
UnresolvedWindowFunction("sum", child :: Nil),
WindowSpecDefinition(partitionSpec, orderSpec, frame))
case Count(child) => WindowExpression(
UnresolvedWindowFunction("count", child :: Nil),
WindowSpecDefinition(partitionSpec, orderSpec, frame))
case First(child) => WindowExpression(
// TODO this is a hack for Hive UDAF first_value
UnresolvedWindowFunction("first_value", child :: Nil),
WindowSpecDefinition(partitionSpec, orderSpec, frame))
case Last(child) => WindowExpression(
// TODO this is a hack for Hive UDAF last_value
UnresolvedWindowFunction("last_value", child :: Nil),
WindowSpecDefinition(partitionSpec, orderSpec, frame))
case Min(child) => WindowExpression(
UnresolvedWindowFunction("min", child :: Nil),
WindowSpecDefinition(partitionSpec, orderSpec, frame))
case Max(child) => WindowExpression(
UnresolvedWindowFunction("max", child :: Nil),
WindowSpecDefinition(partitionSpec, orderSpec, frame))
case wf: WindowFunction => WindowExpression(
wf,
WindowSpecDefinition(partitionSpec, orderSpec, frame))
case x =>
throw new UnsupportedOperationException(s"$x is not supported in window operation.")
}
new Column(windowExpr)
}

}
Loading