Skip to content

Commit f6f2eeb

Browse files
chenghao-intelrxin
authored andcommitted
[SPARK-7322][SQL] Window functions in DataFrame
This closes #6104. Author: Cheng Hao <hao.cheng@intel.com> Author: Reynold Xin <rxin@databricks.com> Closes #6343 from rxin/window-df and squashes the following commits: 026d587 [Reynold Xin] Address code review feedback. dc448fe [Reynold Xin] Fixed Hive tests. 9794d9d [Reynold Xin] Moved Java test package. 9331605 [Reynold Xin] Refactored API. 3313e2a [Reynold Xin] Merge pull request #6104 from chenghao-intel/df_window d625a64 [Cheng Hao] Update the dataframe window API as suggsted c141fb1 [Cheng Hao] hide all of properties of the WindowFunctionDefinition 3b1865f [Cheng Hao] scaladoc typos f3fd2d0 [Cheng Hao] polish the unit test 6847825 [Cheng Hao] Add additional analystcs functions 57e3bc0 [Cheng Hao] typos 24a08ec [Cheng Hao] scaladoc 28222ed [Cheng Hao] fix bug of range/row Frame 1d91865 [Cheng Hao] style issue 53f89f2 [Cheng Hao] remove the over from the functions.scala 964c013 [Cheng Hao] add more unit tests and window functions 64e18a7 [Cheng Hao] Add Window Function support for DataFrame
1 parent 2728c3d commit f6f2eeb

File tree

13 files changed

+807
-7
lines changed

13 files changed

+807
-7
lines changed

sql/core/src/main/scala/org/apache/spark/sql/Column.scala

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@
1818
package org.apache.spark.sql
1919

2020
import scala.language.implicitConversions
21-
import scala.collection.JavaConversions._
2221

2322
import org.apache.spark.annotation.Experimental
2423
import org.apache.spark.Logging
24+
import org.apache.spark.sql.expressions.Window
2525
import org.apache.spark.sql.functions.lit
2626
import org.apache.spark.sql.catalyst.expressions._
27-
import org.apache.spark.sql.catalyst.analysis.{MultiAlias, UnresolvedAttribute, UnresolvedStar, UnresolvedExtractValue}
27+
import org.apache.spark.sql.catalyst.analysis._
2828
import org.apache.spark.sql.types._
2929

3030

@@ -889,6 +889,22 @@ class Column(protected[sql] val expr: Expression) extends Logging {
889889
*/
890890
def bitwiseXOR(other: Any): Column = BitwiseXor(expr, lit(other).expr)
891891

892+
/**
893+
* Define a windowing column.
894+
*
895+
* {{{
896+
* val w = Window.partitionBy("name").orderBy("id")
897+
* df.select(
898+
* sum("price").over(w.rangeBetween(Long.MinValue, 2)),
899+
* avg("price").over(w.rowsBetween(0, 4))
900+
* )
901+
* }}}
902+
*
903+
* @group expr_ops
904+
* @since 1.4.0
905+
*/
906+
def over(window: expressions.WindowSpec): Column = window.withAggregate(this)
907+
892908
}
893909

894910

sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.analysis.{MultiAlias, ResolvedStar, Unresol
3737
import org.apache.spark.sql.catalyst.expressions._
3838
import org.apache.spark.sql.catalyst.plans.logical.{Filter, _}
3939
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
40-
import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, ScalaReflection, SqlParser}
40+
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser}
4141
import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD}
4242
import org.apache.spark.sql.json.JacksonGenerator
4343
import org.apache.spark.sql.sources.CreateTableUsingAsSelect
@@ -411,7 +411,7 @@ class DataFrame private[sql](
411411
joined.left,
412412
joined.right,
413413
joinType = Inner,
414-
Some(expressions.EqualTo(
414+
Some(catalyst.expressions.EqualTo(
415415
joined.left.resolve(usingColumn),
416416
joined.right.resolve(usingColumn))))
417417
)
@@ -480,8 +480,9 @@ class DataFrame private[sql](
480480
// By the time we get here, since we have already run analysis, all attributes should've been
481481
// resolved and become AttributeReference.
482482
val cond = plan.condition.map { _.transform {
483-
case expressions.EqualTo(a: AttributeReference, b: AttributeReference) if a.sameRef(b) =>
484-
expressions.EqualTo(plan.left.resolve(a.name), plan.right.resolve(b.name))
483+
case catalyst.expressions.EqualTo(a: AttributeReference, b: AttributeReference)
484+
if a.sameRef(b) =>
485+
catalyst.expressions.EqualTo(plan.left.resolve(a.name), plan.right.resolve(b.name))
485486
}}
486487
plan.copy(condition = cond)
487488
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.expressions
19+
20+
import org.apache.spark.annotation.Experimental
21+
import org.apache.spark.sql.Column
22+
import org.apache.spark.sql.catalyst.expressions._
23+
24+
/**
25+
* :: Experimental ::
26+
* Utility functions for defining window in DataFrames.
27+
*
28+
* {{{
29+
* // PARTITION BY country ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
30+
* Window.partitionBy("country").orderBy("date").rowsBetween(Long.MinValue, 0)
31+
*
32+
* // PARTITION BY country ORDER BY date ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING
33+
* Window.partitionBy("country").orderBy("date").rowsBetween(-3, 3)
34+
* }}}
35+
*
36+
* @since 1.4.0
37+
*/
38+
@Experimental
39+
object Window {
40+
41+
/**
42+
* Creates a [[WindowSpec]] with the partitioning defined.
43+
* @since 1.4.0
44+
*/
45+
@scala.annotation.varargs
46+
def partitionBy(colName: String, colNames: String*): WindowSpec = {
47+
spec.partitionBy(colName, colNames : _*)
48+
}
49+
50+
/**
51+
* Creates a [[WindowSpec]] with the partitioning defined.
52+
* @since 1.4.0
53+
*/
54+
@scala.annotation.varargs
55+
def partitionBy(cols: Column*): WindowSpec = {
56+
spec.partitionBy(cols : _*)
57+
}
58+
59+
/**
60+
* Creates a [[WindowSpec]] with the ordering defined.
61+
* @since 1.4.0
62+
*/
63+
@scala.annotation.varargs
64+
def orderBy(colName: String, colNames: String*): WindowSpec = {
65+
spec.orderBy(colName, colNames : _*)
66+
}
67+
68+
/**
69+
* Creates a [[WindowSpec]] with the ordering defined.
70+
* @since 1.4.0
71+
*/
72+
@scala.annotation.varargs
73+
def orderBy(cols: Column*): WindowSpec = {
74+
spec.orderBy(cols : _*)
75+
}
76+
77+
private def spec: WindowSpec = {
78+
new WindowSpec(Seq.empty, Seq.empty, UnspecifiedFrame)
79+
}
80+
81+
}
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.expressions
19+
20+
import org.apache.spark.annotation.Experimental
21+
import org.apache.spark.sql.{Column, catalyst}
22+
import org.apache.spark.sql.catalyst.expressions._
23+
24+
25+
/**
26+
* :: Experimental ::
27+
* A window specification that defines the partitioning, ordering, and frame boundaries.
28+
*
29+
* Use the static methods in [[Window]] to create a [[WindowSpec]].
30+
*
31+
* @since 1.4.0
32+
*/
33+
@Experimental
34+
class WindowSpec private[sql](
35+
partitionSpec: Seq[Expression],
36+
orderSpec: Seq[SortOrder],
37+
frame: catalyst.expressions.WindowFrame) {
38+
39+
/**
40+
* Defines the partitioning columns in a [[WindowSpec]].
41+
* @since 1.4.0
42+
*/
43+
@scala.annotation.varargs
44+
def partitionBy(colName: String, colNames: String*): WindowSpec = {
45+
partitionBy((colName +: colNames).map(Column(_)): _*)
46+
}
47+
48+
/**
49+
* Defines the partitioning columns in a [[WindowSpec]].
50+
* @since 1.4.0
51+
*/
52+
@scala.annotation.varargs
53+
def partitionBy(cols: Column*): WindowSpec = {
54+
new WindowSpec(cols.map(_.expr), orderSpec, frame)
55+
}
56+
57+
/**
58+
* Defines the ordering columns in a [[WindowSpec]].
59+
* @since 1.4.0
60+
*/
61+
@scala.annotation.varargs
62+
def orderBy(colName: String, colNames: String*): WindowSpec = {
63+
orderBy((colName +: colNames).map(Column(_)): _*)
64+
}
65+
66+
/**
67+
* Defines the ordering columns in a [[WindowSpec]].
68+
* @since 1.4.0
69+
*/
70+
@scala.annotation.varargs
71+
def orderBy(cols: Column*): WindowSpec = {
72+
val sortOrder: Seq[SortOrder] = cols.map { col =>
73+
col.expr match {
74+
case expr: SortOrder =>
75+
expr
76+
case expr: Expression =>
77+
SortOrder(expr, Ascending)
78+
}
79+
}
80+
new WindowSpec(partitionSpec, sortOrder, frame)
81+
}
82+
83+
/**
84+
* Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
85+
*
86+
* Both `start` and `end` are relative positions from the current row. For example, "0" means
87+
* "current row", while "-1" means the row before the current row, and "5" means the fifth row
88+
* after the current row.
89+
*
90+
* @param start boundary start, inclusive.
91+
* The frame is unbounded if this is the minimum long value.
92+
* @param end boundary end, inclusive.
93+
* The frame is unbounded if this is the maximum long value.
94+
* @since 1.4.0
95+
*/
96+
def rowsBetween(start: Long, end: Long): WindowSpec = {
97+
between(RowFrame, start, end)
98+
}
99+
100+
/**
101+
* Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
102+
*
103+
* Both `start` and `end` are relative from the current row. For example, "0" means "current row",
104+
* while "-1" means one off before the current row, and "5" means the five off after the
105+
* current row.
106+
*
107+
* @param start boundary start, inclusive.
108+
* The frame is unbounded if this is the minimum long value.
109+
* @param end boundary end, inclusive.
110+
* The frame is unbounded if this is the maximum long value.
111+
* @since 1.4.0
112+
*/
113+
def rangeBetween(start: Long, end: Long): WindowSpec = {
114+
between(RangeFrame, start, end)
115+
}
116+
117+
private def between(typ: FrameType, start: Long, end: Long): WindowSpec = {
118+
val boundaryStart = start match {
119+
case 0 => CurrentRow
120+
case Long.MinValue => UnboundedPreceding
121+
case x if x < 0 => ValuePreceding(-start.toInt)
122+
case x if x > 0 => ValueFollowing(start.toInt)
123+
}
124+
125+
val boundaryEnd = end match {
126+
case 0 => CurrentRow
127+
case Long.MaxValue => UnboundedFollowing
128+
case x if x < 0 => ValuePreceding(-end.toInt)
129+
case x if x > 0 => ValueFollowing(end.toInt)
130+
}
131+
132+
new WindowSpec(
133+
partitionSpec,
134+
orderSpec,
135+
SpecifiedWindowFrame(typ, boundaryStart, boundaryEnd))
136+
}
137+
138+
/**
139+
* Converts this [[WindowSpec]] into a [[Column]] with an aggregate expression.
140+
*/
141+
private[sql] def withAggregate(aggregate: Column): Column = {
142+
val windowExpr = aggregate.expr match {
143+
case Average(child) => WindowExpression(
144+
UnresolvedWindowFunction("avg", child :: Nil),
145+
WindowSpecDefinition(partitionSpec, orderSpec, frame))
146+
case Sum(child) => WindowExpression(
147+
UnresolvedWindowFunction("sum", child :: Nil),
148+
WindowSpecDefinition(partitionSpec, orderSpec, frame))
149+
case Count(child) => WindowExpression(
150+
UnresolvedWindowFunction("count", child :: Nil),
151+
WindowSpecDefinition(partitionSpec, orderSpec, frame))
152+
case First(child) => WindowExpression(
153+
// TODO this is a hack for Hive UDAF first_value
154+
UnresolvedWindowFunction("first_value", child :: Nil),
155+
WindowSpecDefinition(partitionSpec, orderSpec, frame))
156+
case Last(child) => WindowExpression(
157+
// TODO this is a hack for Hive UDAF last_value
158+
UnresolvedWindowFunction("last_value", child :: Nil),
159+
WindowSpecDefinition(partitionSpec, orderSpec, frame))
160+
case Min(child) => WindowExpression(
161+
UnresolvedWindowFunction("min", child :: Nil),
162+
WindowSpecDefinition(partitionSpec, orderSpec, frame))
163+
case Max(child) => WindowExpression(
164+
UnresolvedWindowFunction("max", child :: Nil),
165+
WindowSpecDefinition(partitionSpec, orderSpec, frame))
166+
case wf: WindowFunction => WindowExpression(
167+
wf,
168+
WindowSpecDefinition(partitionSpec, orderSpec, frame))
169+
case x =>
170+
throw new UnsupportedOperationException(s"$x is not supported in window operation.")
171+
}
172+
new Column(windowExpr)
173+
}
174+
175+
}

0 commit comments

Comments
 (0)