Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2515,6 +2515,7 @@ object EliminateEventTimeWatermark extends Rule[LogicalPlan] {
object TimeWindowing extends Rule[LogicalPlan] {
import org.apache.spark.sql.catalyst.dsl.expressions._

private final val WINDOW_COL_NAME = "window"
private final val WINDOW_START = "start"
private final val WINDOW_END = "end"

Expand Down Expand Up @@ -2550,49 +2551,76 @@ object TimeWindowing extends Rule[LogicalPlan] {
case p: LogicalPlan if p.children.size == 1 =>
val child = p.children.head
val windowExpressions =
p.expressions.flatMap(_.collect { case t: TimeWindow => t }).distinct.toList // Not correct.
p.expressions.flatMap(_.collect { case t: TimeWindow => t }).toSet

// Only support a single window expression for now
if (windowExpressions.size == 1 &&
windowExpressions.head.timeColumn.resolved &&
windowExpressions.head.checkInputDataTypes().isSuccess) {

val window = windowExpressions.head

val metadata = window.timeColumn match {
case a: Attribute => a.metadata
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

existing: There is a comment above that says "not correct"?

case _ => Metadata.empty
}
val windowAttr =
AttributeReference("window", window.dataType, metadata = metadata)()

val maxNumOverlapping = math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt
val windows = Seq.tabulate(maxNumOverlapping + 1) { i =>
val windowId = Ceil((PreciseTimestamp(window.timeColumn) - window.startTime) /
window.slideDuration)
val windowStart = (windowId + i - maxNumOverlapping) *
window.slideDuration + window.startTime

def getWindow(i: Int, overlappingWindows: Int): Expression = {
val division = (PreciseTimestampConversion(
window.timeColumn, TimestampType, LongType) - window.startTime) / window.slideDuration
val ceil = Ceil(division)
// if the division is equal to the ceiling, our record is the start of a window
val windowId = CaseWhen(Seq((ceil === division, ceil + 1)), Some(ceil))
val windowStart = (windowId + i - overlappingWindows) *
window.slideDuration + window.startTime
val windowEnd = windowStart + window.windowDuration

CreateNamedStruct(
Literal(WINDOW_START) :: windowStart ::
Literal(WINDOW_END) :: windowEnd :: Nil)
Literal(WINDOW_START) ::
PreciseTimestampConversion(windowStart, LongType, TimestampType) ::
Literal(WINDOW_END) ::
PreciseTimestampConversion(windowEnd, LongType, TimestampType) ::
Nil)
}

val projections = windows.map(_ +: p.children.head.output)
val windowAttr = AttributeReference(
WINDOW_COL_NAME, window.dataType, metadata = metadata)()

if (window.windowDuration == window.slideDuration) {
val windowStruct = Alias(getWindow(0, 1), WINDOW_COL_NAME)(
exprId = windowAttr.exprId)

val replacedPlan = p transformExpressions {
case t: TimeWindow => windowAttr
}

// For backwards compatibility we add a filter to filter out nulls
val filterExpr = IsNotNull(window.timeColumn)

val filterExpr =
window.timeColumn >= windowAttr.getField(WINDOW_START) &&
window.timeColumn < windowAttr.getField(WINDOW_END)
replacedPlan.withNewChildren(
Filter(filterExpr,
Project(windowStruct +: child.output, child)) :: Nil)
} else {
val overlappingWindows =
math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt
val windows =
Seq.tabulate(overlappingWindows)(i => getWindow(i, overlappingWindows))

val projections = windows.map(_ +: child.output)

val filterExpr =
window.timeColumn >= windowAttr.getField(WINDOW_START) &&
window.timeColumn < windowAttr.getField(WINDOW_END)

val expandedPlan =
Filter(filterExpr,
val substitutedPlan = Filter(filterExpr,
Expand(projections, windowAttr +: child.output, child))

val substitutedPlan = p transformExpressions {
case t: TimeWindow => windowAttr
}
val renamedPlan = p transformExpressions {
case t: TimeWindow => windowAttr
}

substitutedPlan.withNewChildren(expandedPlan :: Nil)
renamedPlan.withNewChildren(substitutedPlan :: Nil)
}
} else if (windowExpressions.size > 1) {
p.failAnalysis("Multiple time window expressions would result in a cartesian product " +
"of rows, therefore they are currently not supported.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,17 +152,21 @@ object TimeWindow {
}

/**
* Expression used internally to convert the TimestampType to Long without losing
* Expression used internally to convert the TimestampType to Long and back without losing
* precision, i.e. in microseconds. Used in time windowing.
*/
case class PreciseTimestamp(child: Expression) extends UnaryExpression with ExpectsInputTypes {
override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType)
override def dataType: DataType = LongType
case class PreciseTimestampConversion(
child: Expression,
fromType: DataType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The from type should just come from the child right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expectsInputTypes does implicit casting at times

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we shouldn't be using it then? This is a purely internal expression?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is purely internal used for microsecond precision access of Timestamps

toType: DataType) extends UnaryExpression with ExpectsInputTypes {
override def inputTypes: Seq[AbstractDataType] = Seq(fromType)
override def dataType: DataType = toType
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val eval = child.genCode(ctx)
ev.copy(code = eval.code +
s"""boolean ${ev.isNull} = ${eval.isNull};
|${ctx.javaType(dataType)} ${ev.value} = ${eval.value};
""".stripMargin)
}
override def nullSafeEval(input: Any): Any = input
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

package org.apache.spark.sql

import java.util.TimeZone

import org.scalatest.BeforeAndAfterEach

import org.apache.spark.sql.catalyst.plans.logical.Expand
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.StringType
Expand All @@ -29,11 +28,27 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B

import testImplicits._

test("simple tumbling window with record at window start") {
val df = Seq(
("2016-03-27 19:39:30", 1, "a")).toDF("time", "value", "id")

checkAnswer(
df.groupBy(window($"time", "10 seconds"))
.agg(count("*").as("counts"))
.orderBy($"window.start".asc)
.select($"window.start".cast("string"), $"window.end".cast("string"), $"counts"),
Seq(
Row("2016-03-27 19:39:30", "2016-03-27 19:39:40", 1)
)
)
}

test("tumbling window groupBy statement") {
val df = Seq(
("2016-03-27 19:39:34", 1, "a"),
("2016-03-27 19:39:56", 2, "a"),
("2016-03-27 19:39:27", 4, "b")).toDF("time", "value", "id")

checkAnswer(
df.groupBy(window($"time", "10 seconds"))
.agg(count("*").as("counts"))
Expand All @@ -59,14 +74,18 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B

test("tumbling window with multi-column projection") {
val df = Seq(
("2016-03-27 19:39:34", 1, "a"),
("2016-03-27 19:39:56", 2, "a"),
("2016-03-27 19:39:27", 4, "b")).toDF("time", "value", "id")
("2016-03-27 19:39:34", 1, "a"),
("2016-03-27 19:39:56", 2, "a"),
("2016-03-27 19:39:27", 4, "b")).toDF("time", "value", "id")
.select(window($"time", "10 seconds"), $"value")
.orderBy($"window.start".asc)
.select($"window.start".cast("string"), $"window.end".cast("string"), $"value")

val expands = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Expand])
assert(expands.isEmpty, "Tumbling windows shouldn't require expand")

checkAnswer(
df.select(window($"time", "10 seconds"), $"value")
.orderBy($"window.start".asc)
.select($"window.start".cast("string"), $"window.end".cast("string"), $"value"),
df,
Seq(
Row("2016-03-27 19:39:20", "2016-03-27 19:39:30", 4),
Row("2016-03-27 19:39:30", "2016-03-27 19:39:40", 1),
Expand Down Expand Up @@ -104,13 +123,17 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B

test("sliding window projection") {
val df = Seq(
("2016-03-27 19:39:34", 1, "a"),
("2016-03-27 19:39:56", 2, "a"),
("2016-03-27 19:39:27", 4, "b")).toDF("time", "value", "id")
("2016-03-27 19:39:34", 1, "a"),
("2016-03-27 19:39:56", 2, "a"),
("2016-03-27 19:39:27", 4, "b")).toDF("time", "value", "id")
.select(window($"time", "10 seconds", "3 seconds", "0 second"), $"value")
.orderBy($"window.start".asc, $"value".desc).select("value")

val expands = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Expand])
assert(expands.nonEmpty, "Sliding windows require expand")

checkAnswer(
df.select(window($"time", "10 seconds", "3 seconds", "0 second"), $"value")
.orderBy($"window.start".asc, $"value".desc).select("value"),
df,
// 2016-03-27 19:39:27 UTC -> 4 bins
// 2016-03-27 19:39:34 UTC -> 3 bins
// 2016-03-27 19:39:56 UTC -> 3 bins
Expand Down