-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-21153] Use project instead of expand in tumbling windows #18364
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #78293 has finished for PR 18364 at commit
|
|
Test build #78303 has started for PR 18364 at commit |
|
i will retrigger this once jenkins restart |
|
test this please |
|
Test build #78319 has started for PR 18364 at commit |
|
Test build #78311 has finished for PR 18364 at commit
|
|
test this please |
|
Test build #78336 has started for PR 18364 at commit |
|
retest this please |
|
Test build #78397 has finished for PR 18364 at commit
|
| val window = windowExpressions.head | ||
|
|
||
| val metadata = window.timeColumn match { | ||
| case a: Attribute => a.metadata |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
existing: There is a comment above that says "not correct"?
marmbrus
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pretty big speed up!
| val windows = Seq.tabulate(maxNumOverlapping + 1) { i => | ||
| val windowId = Ceil((PreciseTimestamp(window.timeColumn) - window.startTime) / | ||
| window.slideDuration) | ||
| def getWindow(i: Int, maxNumOverlapping: Int): Expression = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand maxNumOverlapping as a name that we tabulate over. Isn't it like the overlapNumber or something?
| window.timeColumn < windowAttr.getField(WINDOW_END) | ||
| if (window.windowDuration == window.slideDuration) { | ||
| val windowStruct = Alias(getWindow(0, 1), WINDOW_COL_NAME)( | ||
| exprId = windowAttr.exprId, explicitMetadata = Some(metadata)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Wrapping is off. Prefer to break at = and if you wrap args, wrap all of them.
| // For backwards compatibility we add a filter to filter out nulls | ||
| val filterExpr = IsNotNull(window.timeColumn) | ||
|
|
||
| replacedPlan.withNewChildren(Filter(filterExpr, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: wrapping, indent query plans like trees.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, should we even be doing a projection here? If its just a substitution / filter, perhaps we should just replace it inline?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't get inline replacing to work. It breaks EventTimeWatermark tests
| replacedPlan.withNewChildren(Filter(filterExpr, | ||
| Project(windowStruct +: child.output, child)) :: Nil) | ||
| } else { | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: no blank line
| override def dataType: DataType = LongType | ||
| case class PreciseTimestampConversion( | ||
| child: Expression, | ||
| fromType: DataType, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The from type should just come from the child right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expectsInputTypes does implicit casting at times
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we shouldn't be using it then? This is a purely internal expression?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is purely internal used for microsecond precision access of Timestamps
|
Test build #78540 has finished for PR 18364 at commit
|
|
retest this please |
|
Test build #78544 has finished for PR 18364 at commit
|
|
retest this please |
|
Test build #78547 has finished for PR 18364 at commit
|
|
LGTM. Merging to master. |
## What changes were proposed in this pull request?
Time windowing in Spark currently performs an Expand + Filter, because there is no way to guarantee the amount of windows a timestamp will fall in, in the general case. However, for tumbling windows, a record is guaranteed to fall into a single bucket. In this case, doubling the number of records with Expand is wasteful, and can be improved by using a simple Projection instead.
Benchmarks show that we get an order of magnitude performance improvement after this patch.
## How was this patch tested?
Existing unit tests. Benchmarked using the following code:
```scala
import org.apache.spark.sql.functions._
spark.time {
spark.range(numRecords)
.select(from_unixtime((current_timestamp().cast("long") * 1000 + 'id / 1000) / 1000) as 'time)
.select(window('time, "10 seconds"))
.count()
}
```
Setup:
- 1 c3.2xlarge worker (8 cores)

1 B rows ran in 287 seconds after this optimization. I didn't wait for it to finish without the optimization. Shows about 5x improvement for large number of records.
Author: Burak Yavuz <brkyvz@gmail.com>
Closes apache#18364 from brkyvz/opt-tumble.
What changes were proposed in this pull request?
Time windowing in Spark currently performs an Expand + Filter, because there is no way to guarantee the amount of windows a timestamp will fall in, in the general case. However, for tumbling windows, a record is guaranteed to fall into a single bucket. In this case, doubling the number of records with Expand is wasteful, and can be improved by using a simple Projection instead.
Benchmarks show that we get an order of magnitude performance improvement after this patch.
How was this patch tested?
Existing unit tests. Benchmarked using the following code:
Setup:
1 B rows ran in 287 seconds after this optimization. I didn't wait for it to finish without the optimization. Shows about 5x improvement for large number of records.