Skip to content

Conversation

@brkyvz
Copy link
Contributor

@brkyvz brkyvz commented Mar 28, 2016

What changes were proposed in this pull request?

This PR adds the function window as a column expression.

window can be used to bucket rows into time windows given a time column. With this expression, performing time series analysis on batch data, as well as streaming data should become much more simpler.

Usage

Assume the following schema:

sensor_id, measurement, timestamp

To average 5 minute data every 1 minute (window length of 5 minutes, slide duration of 1 minute), we will use:

df.groupBy(window("timestamp", “5 minutes”, “1 minute”), "sensor_id")
  .agg(mean("measurement").as("avg_meas"))

This will generate windows such as:

09:00:00-09:05:00
09:01:00-09:06:00
09:02:00-09:07:00 ...

Intervals will start at every slideDuration starting at the unix epoch (1970-01-01 00:00:00 UTC).
To start intervals at a different point of time, e.g. 30 seconds after a minute, the startTime parameter can be used.

df.groupBy(window("timestamp", “5 minutes”, “1 minute”, "30 second"), "sensor_id")
  .agg(mean("measurement").as("avg_meas"))

This will generate windows such as:

09:00:30-09:05:30
09:01:30-09:06:30
09:02:30-09:07:30 ...

Support for Python will be made in a follow up PR after this.

How was this patch tested?

This patch has some basic unit tests for the TimeWindow expression testing that the parameters pass validation, and it also has some unit/integration tests testing the correctness of the windowing and usability in complex operations (multi-column grouping, multi-column projections, joins).

@SparkQA
Copy link

SparkQA commented Mar 28, 2016

Test build #54343 has finished for PR 12008 at commit 9e7febb.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 28, 2016

Test build #54344 has finished for PR 12008 at commit 5787e35.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

override def dataType: DataType = outputType

private def outputType: StructType = StructType(Seq(
StructField("start", TimestampType), StructField("end", TimestampType)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: indents are off in this file and the next.

Add(division, Literal(i - windowExpr.maxNumOverlapping)),
Literal(windowExpr.slideDuration)),
Literal(windowExpr.startTime)),
Literal(1000000))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be easier to read if it was written with the dsl.

windowDuration: String,
slideDuration: String,
startTime: String): Column = withExpr {
TimeWindow(timeColumn.expr, windowDuration, slideDuration, startTime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should just parse here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe in the companion object? Or with another constructor? The _param / lazy val is a little odd.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on parsing it here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the companion object apply method

(cal.months * 4 * CalendarInterval.MICROS_PER_WEEK + cal.microseconds) / 1000000
}

// The window duration in seconds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that the smallest window is 1 second?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunately yes. The conversion from LongType to TimestampType has second precision.

@rxin
Copy link
Contributor

rxin commented Mar 28, 2016

lower case Dataset

@brkyvz brkyvz changed the title [SPARK-14160] Time Windowing functions for DataSets [SPARK-14160] Time Windowing functions for Datasets Mar 28, 2016
@SparkQA
Copy link

SparkQA commented Mar 28, 2016

Test build #54345 has finished for PR 12008 at commit ad98902.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* @group datetime_funcs
* @since 2.0.0
*/
def window(timeColumn: Column, windowDuration: String): Column = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

experimental tag

@SparkQA
Copy link

SparkQA commented Mar 28, 2016

Test build #54370 has finished for PR 12008 at commit 6a784b7.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 28, 2016

Test build #54371 has finished for PR 12008 at commit b7154b2.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 29, 2016

Test build #54372 has finished for PR 12008 at commit 7eea448.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@brkyvz
Copy link
Contributor Author

brkyvz commented Mar 29, 2016

retest this please

@SparkQA
Copy link

SparkQA commented Mar 29, 2016

Test build #54387 has finished for PR 12008 at commit 7eea448.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 30, 2016

Test build #54484 has finished for PR 12008 at commit b4e2fc2.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

throw new IllegalArgumentException(
s"The provided interval ($interval) did not correspond to a valid interval string.")
}
(cal.months * 4 * CalendarInterval.MICROS_PER_WEEK + cal.microseconds) / 1000000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 weeks == 1 month looks weird. Maybe define window("timestamp", "1 month") as groupBy(getMonthInYear("timestamp")) is more intuitive?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 weeks == 1 month looks weird. Maybe define window("timestamp", "1 month") as groupBy(getMonthInYear("timestamp")) is more intuitive?

This looks hard to implement. Maybe we just don't need to support month or year.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, does this line mean that the user cannot use window("timestamp", "500 milliseconds")?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't operate on TimestampType as if they are Longs. They get cast to
LongType AFAIK, which has second precision. I'm not sure if supporting ms
is possible.
On Mar 29, 2016 11:14 PM, "Shixiong Zhu" notifications@github.com wrote:

In
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
#12008 (comment):

  • private def getIntervalInSeconds(interval: String): Long = {
  • if (StringUtils.isBlank(interval)) {
  •  throw new IllegalArgumentException(
    
  •    "The window duration, slide duration and start time cannot be null or blank.")
    
  • }
  • val intervalString = if (interval.startsWith("interval")) {
  •  interval
    
  • } else {
  •  "interval " + interval
    
  • }
  • val cal = CalendarInterval.fromString(intervalString)
  • if (cal == null) {
  •  throw new IllegalArgumentException(
    
  •    s"The provided interval ($interval) did not correspond to a valid interval string.")
    
  • }
  • (cal.months * 4 * CalendarInterval.MICROS_PER_WEEK + cal.microseconds) / 1000000

By the way, does it mean that the user cannot use window("timestamp", "500
milliseconds")?


You are receiving this because you authored the thread.
Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/12008/files/b4e2fc23585413b1bf50e2487437dd38b9cd748f#r57840903

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timestamp precision i think is 100ns

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we probably don't want to support month intervals since they are variable length. If people want to group on calendar boundaries they can use existing data functions.

@SparkQA
Copy link

SparkQA commented Mar 31, 2016

Test build #54574 has finished for PR 12008 at commit f756556.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PreciseTimestamp(child: Expression) extends UnaryExpression with ExpectsInputTypes

@SparkQA
Copy link

SparkQA commented Mar 31, 2016

Test build #54575 has finished for PR 12008 at commit 8bc9799.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 31, 2016

Test build #54640 has finished for PR 12008 at commit ff27b7a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@marmbrus
Copy link
Contributor

marmbrus commented Apr 1, 2016

Thanks! Merging to master.

@asfgit asfgit closed this in 1b829ce Apr 1, 2016
asfgit pushed a commit that referenced this pull request Apr 5, 2016
## What changes were proposed in this pull request?

The `window` function was added to Dataset with [this PR](#12008).
This PR adds the Python, and SQL, API for this function.

With this PR, SQL, Java, and Scala will share the same APIs as in users can use:
 - `window(timeColumn, windowDuration)`
 - `window(timeColumn, windowDuration, slideDuration)`
 - `window(timeColumn, windowDuration, slideDuration, startTime)`

In Python, users can access all APIs above, but in addition they can do
 - In Python:
   `window(timeColumn, windowDuration, startTime=...)`

that is, they can provide the startTime without providing the `slideDuration`. In this case, we will generate tumbling windows.

## How was this patch tested?

Unit tests + manual tests

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #12136 from brkyvz/python-windows.
asfgit pushed a commit that referenced this pull request Apr 6, 2016
## What changes were proposed in this pull request?

The `window` function was added to Dataset with [this PR](#12008).
This PR adds the R API for this function.

With this PR, SQL, Java, and Scala will share the same APIs as in users can use:
 - `window(timeColumn, windowDuration)`
 - `window(timeColumn, windowDuration, slideDuration)`
 - `window(timeColumn, windowDuration, slideDuration, startTime)`

In Python and R, users can access all APIs above, but in addition they can do
 - In R:
   `window(timeColumn, windowDuration, startTime=...)`

that is, they can provide the startTime without providing the `slideDuration`. In this case, we will generate tumbling windows.

## How was this patch tested?

Unit tests + manual tests

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #12141 from brkyvz/R-windows.
@brkyvz brkyvz deleted the df-time-window branch February 3, 2019 20:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants