-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-18124] Observed delay based Event Time Watermarks #15702
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| child: SparkPlan) extends SparkPlan { | ||
|
|
||
| // TODO: Use Spark SQL Metrics? | ||
| val maxEventTime = new MaxLong |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zsxwing am I doing this right?
| public final long microseconds; | ||
|
|
||
| public final long milliseconds() { | ||
| return this.microseconds / MICROS_PER_MILLI; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 space indent
| )(sparkSession)).as[T] | ||
| } | ||
|
|
||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need a tag here for experimental
| * @since 2.1.0 | ||
| */ | ||
| @Experimental | ||
| @InterfaceStability.Evolving |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you'd need one that takes in a column wouldn't you?
|
Test build #67839 has finished for PR 15702 at commit
|
|
@ericl - flaky test... Should we turn it off for now? retest this please |
|
I'm still trying to find a failure that includes https://github.com/apache/spark/pull/15701/files. Until then it's hard to debug. Another option might be turning off or adding a retry around this particular test, I'll make another PR for that. |
|
Test build #67866 has finished for PR 15702 at commit
|
| override def toString: String = s"$name#${exprId.id}$typeSuffix" | ||
| /** Used to signal the column used to calculate an eventTime watermark (e.g. a#1-T{delayMs}) */ | ||
| private def delaySuffix = if (metadata.contains(EventTimeWatermark.delayKey)) { | ||
| s"-T${metadata.getLong(EventTimeWatermark.delayKey)}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this in milliseconds or microseconds like timestamp type?
| if (a semanticEquals eventTime) { | ||
| val updatedMetadata = new MetadataBuilder() | ||
| .withMetadata(a.metadata) | ||
| .putLong(EventTimeWatermark.delayKey, delay.milliseconds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused. Normally Spark SQL uses microsecond precision for TimestampType. When it converts it to LongType, it uses second precision. Here we're using milliseconds. Wouldn't that be super confusing to reason about?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I switched it to using CalendarInterval to make it clearer what units were being used where. I chose milliseconds because it seemed like the right granularity. Microseconds are too short for the global coordination required and seconds lack granularity. It should be easy to change, and I'm open to that if there's consensus this is too confusing though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updating the key to include Ms
|
A very dumb question (I apologize), there is nothing stopping a user to actually use processing time as watermarks with this API either. One can easily do My biggest confusion here, that I couldn't find documented was the Type of the watermark column. Does it need to be timestamp type or can it be LongType? |
|
Not a dumb question! You can certainly use processing time if those are the semantics you require. I do think there is a little bit of work we need to do to ensure determinism for these functions. Specifically, Good point on the documentation. The thing you are missing is that it must be used in a window function, which does require |
|
Test build #67939 has finished for PR 15702 at commit
|
| * - To know when a given time window aggregation can be finalized and thus can be emitted when | ||
| * using output modes that do not allow updates. | ||
| * - To minimize the amount of state that we need to keep for on-going aggregations. | ||
| * |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Should this be "The current watermark is computed..." ?
- what is an epoch, it isn't mentioned in the docs or elsewhere in the PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to watermark. For epoch, I really just mean "during some period of time where we decide too coordinate across the partitions". This happens at batch boundaries now, but that is not part of the contract we are promising. I just removed that word to avoid confusion.
| * | ||
| * Spark will use this watermark for several purposes: | ||
| * - To know when a given time window aggregation can be finalized and thus can be emitted when | ||
| * using output modes that do not allow updates. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For append, this sounds like the intention is emit only once watermark has passed, and drop state.
But for other output modes, it's not clear from reading this what the effect of the watermark on emission and dropping state is.
| * | ||
| * @param eventTime the name of the column that contains the event time of the row. | ||
| * @param delayThreshold the minimum delay to wait to data to arrive late, relative to the latest | ||
| * record that has been processed in the form of an interval |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this make it clear what the minimum useful granularity is (ms)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seems like more of an implementation detail, rather than a contract of the API. The real contract is stated above as the actual watermark used is only guaranteed to be at least 'delayThreshold' behind the actual event time. There aren't really any bounds we can promise without knowing more about the query (even ms).
| } | ||
|
|
||
| // Update and output modified rows from the StateStore. | ||
| case Some(Update) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not clear on why the semantics of Update mean that watermarks shouldn't be used to remove state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@koeninger, Update shall allow the late data to correct the previous results even they are late than the threshold, the similar implementation is in http://cdn.oreillystatic.com/en/assets/1/event/160/Triggers%20in%20Apache%20Beam%20_incubating_%20Presentation.pdf (search withLateFirings)...correct me if I was wrong
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To put it the other way, do the docs in this PR tell you as a user that for any output method other than Append, you are potentially keeping unlimited aggregate state in memory, regardless of whether you set a watermark?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only output modes that are supported publicly are Complete and Append (update is only available internally for tests). When we add support for Update (I'd like to do this soon), it should also evict tuples which can no longer be updated due to their group falling beneath the watermark. I thought that it was fairly clear that Complete would need to retain the complete set of aggregate state, but I'm happy to make this more explicit if others are confused by this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think it's a good idea to explicitly say for each output mode whether watermarks affect emit and evict. Just so I'm clear, the intention is
Append: affects emit, affects evict
Update: doesn't affect emit, affects evict
Complete: doesn't affect emit, no eviction
Is that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally, updates should be able to take into account late arrivals (in respect to EndOfWindow) and allow to act upon a user defined strategy, such as: update for each following element.
|
Given the concerns Ofir raised about a single far future event screwing up monotonic event time, do you want to document that problem even if there isn't an enforced filter for it? |
|
Test build #67998 has finished for PR 15702 at commit
|
zsxwing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall. My comments can be addressed later.
| import org.apache.spark.unsafe.types.CalendarInterval | ||
| import org.apache.spark.util.AccumulatorV2 | ||
|
|
||
| class MaxLong(protected var currentValue: Long = 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: protected -> private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Could you document that this one only support positive longs?
|
|
||
| class MaxLong(protected var currentValue: Long = 0) | ||
| extends AccumulatorV2[Long, Long] | ||
| with Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: not needed. AccumulatorV2 is already Serializable.
| case class ValueUpdated(key: UnsafeRow, value: UnsafeRow) extends StoreUpdate | ||
|
|
||
| case class KeyRemoved(key: UnsafeRow) extends StoreUpdate | ||
| case class ValueRemoved(key: UnsafeRow, value: UnsafeRow) extends StoreUpdate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any special reason to change this? It seems weird that adding an unused field value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is used. We need the value to emit the result upon eviction.
| streamMetrics.reportTriggerDetail(EVENT_TIME_WATERMARK, newWatermark) | ||
| currentEventTimeWatermark = newWatermark | ||
| } else { | ||
| logTrace(s"Event time didn't move: $newWatermark < $currentEventTimeWatermark") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to call streamMetrics.reportTriggerDetail(EVENT_TIME_WATERMARK, newWatermark) here. Otherwise, the trigger details won't have EVENT_TIME_WATERMARK for this batch.
| }.headOption.foreach { newWatermark => | ||
| if (newWatermark > currentEventTimeWatermark) { | ||
| logInfo(s"Updating eventTime watermark to: $newWatermark ms") | ||
| streamMetrics.reportTriggerDetail(EVENT_TIME_WATERMARK, newWatermark) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it fine to just set EVENT_TIME_WATERMARK to 0 if the first batch doesn't have any data (E.g., the filter specified by the user drops all data)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think thats okay?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest just fixing it since it's pretty easy. Just if (newWatermark == 0) "-" else newWatermark.toString
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, that makes sense. I actually just moved it out so we only report if its non-zero.
| child.execute().mapPartitions { iter => | ||
| val getEventTime = UnsafeProjection.create(eventTime :: Nil, child.output) | ||
| iter.map { row => | ||
| maxEventTime.add(getEventTime(row).getLong(0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a small question: which place will check the eventTime type? I guess getLong just throws an exception if the format is wrong. Can we fail it before starting the spark job?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added to checkAnalysis.
| CheckAnswer((10, 3)), | ||
| AddData(inputData, 10), // 10 is later than 15 second watermark | ||
| CheckAnswer((10, 3)), | ||
| AddData(inputData, 25), // 10 is later than 15 second watermark |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the comment is wrong
|
Test build #3397 has finished for PR 15702 at commit
|
|
Test build #3398 has finished for PR 15702 at commit
|
| case etw: EventTimeWatermark => | ||
| etw.eventTime.dataType match { | ||
| case s: StructType | ||
| if s.find(_.name == "start").map(_.dataType).contains(TimestampType) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Option.contains is not in Scala 2.10.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
really? lame...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh... it should also check the end of the window, not the start...
|
Test build #68023 has finished for PR 15702 at commit
|
|
Test build #68025 has finished for PR 15702 at commit
|
tdas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Major feedback - Python API for withWatermark()?
Other than its looking good
| operator match { | ||
| case etw: EventTimeWatermark => | ||
| etw.eventTime.dataType match { | ||
| case s: StructType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which high level case is caught by this condition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The result of a window operation.
| } | ||
|
|
||
| override def add(v: Long): Unit = { | ||
| if (value < v) { currentValue = v } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: less confusing to read if if (currentValue < v) { currentValue = v }.
In fact why not used math.max?
| } | ||
|
|
||
| override def merge(other: AccumulatorV2[Long, Long]): Unit = { | ||
| if (currentValue < other.value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: same as above, why not use math.max
|
Test build #68029 has finished for PR 15702 at commit
|
|
LGTM, pending tests. |
|
Test build #68496 has finished for PR 15702 at commit
|
|
jenkins, test this please |
|
Test build #68504 has finished for PR 15702 at commit
|
|
Test build #68631 has finished for PR 15702 at commit
|
|
jenkins test this please |
|
Test build #68637 has finished for PR 15702 at commit
|
|
I am merging this to master and 2.1 |
This PR adds a new method `withWatermark` to the `Dataset` API, which can be used specify an _event time watermark_. An event time watermark allows the streaming engine to reason about the point in time after which we no longer expect to see late data. This PR also has augmented `StreamExecution` to use this watermark for several purposes:
- To know when a given time window aggregation is finalized and thus results can be emitted when using output modes that do not allow updates (e.g. `Append` mode).
- To minimize the amount of state that we need to keep for on-going aggregations, by evicting state for groups that are no longer expected to change. Although, we do still maintain all state if the query requires (i.e. if the event time is not present in the `groupBy` or when running in `Complete` mode).
An example that emits windowed counts of records, waiting up to 5 minutes for late data to arrive.
```scala
df.withWatermark("eventTime", "5 minutes")
.groupBy(window($"eventTime", "1 minute") as 'window)
.count()
.writeStream
.format("console")
.mode("append") // In append mode, we only output finalized aggregations.
.start()
```
### Calculating the watermark.
The current event time is computed by looking at the `MAX(eventTime)` seen this epoch across all of the partitions in the query minus some user defined _delayThreshold_. An additional constraint is that the watermark must increase monotonically.
Note that since we must coordinate this value across partitions occasionally, the actual watermark used is only guaranteed to be at least `delay` behind the actual event time. In some cases we may still process records that arrive more than delay late.
This mechanism was chosen for the initial implementation over processing time for two reasons:
- it is robust to downtime that could affect processing delay
- it does not require syncing of time or timezones between the producer and the processing engine.
### Other notable implementation details
- A new trigger metric `eventTimeWatermark` outputs the current value of the watermark.
- We mark the event time column in the `Attribute` metadata using the key `spark.watermarkDelay`. This allows downstream operations to know which column holds the event time. Operations like `window` propagate this metadata.
- `explain()` marks the watermark with a suffix of `-T${delayMs}` to ease debugging of how this information is propagated.
- Currently, we don't filter out late records, but instead rely on the state store to avoid emitting records that are both added and filtered in the same epoch.
### Remaining in this PR
- [ ] The test for recovery is currently failing as we don't record the watermark used in the offset log. We will need to do so to ensure determinism, but this is deferred until #15626 is merged.
### Other follow-ups
There are some natural additional features that we should consider for future work:
- Ability to write records that arrive too late to some external store in case any out-of-band remediation is required.
- `Update` mode so you can get partial results before a group is evicted.
- Other mechanisms for calculating the watermark. In particular a watermark based on quantiles would be more robust to outliers.
Author: Michael Armbrust <michael@databricks.com>
Closes #15702 from marmbrus/watermarks.
(cherry picked from commit c071878)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
This PR adds a new method `withWatermark` to the `Dataset` API, which can be used specify an _event time watermark_. An event time watermark allows the streaming engine to reason about the point in time after which we no longer expect to see late data. This PR also has augmented `StreamExecution` to use this watermark for several purposes:
- To know when a given time window aggregation is finalized and thus results can be emitted when using output modes that do not allow updates (e.g. `Append` mode).
- To minimize the amount of state that we need to keep for on-going aggregations, by evicting state for groups that are no longer expected to change. Although, we do still maintain all state if the query requires (i.e. if the event time is not present in the `groupBy` or when running in `Complete` mode).
An example that emits windowed counts of records, waiting up to 5 minutes for late data to arrive.
```scala
df.withWatermark("eventTime", "5 minutes")
.groupBy(window($"eventTime", "1 minute") as 'window)
.count()
.writeStream
.format("console")
.mode("append") // In append mode, we only output finalized aggregations.
.start()
```
### Calculating the watermark.
The current event time is computed by looking at the `MAX(eventTime)` seen this epoch across all of the partitions in the query minus some user defined _delayThreshold_. An additional constraint is that the watermark must increase monotonically.
Note that since we must coordinate this value across partitions occasionally, the actual watermark used is only guaranteed to be at least `delay` behind the actual event time. In some cases we may still process records that arrive more than delay late.
This mechanism was chosen for the initial implementation over processing time for two reasons:
- it is robust to downtime that could affect processing delay
- it does not require syncing of time or timezones between the producer and the processing engine.
### Other notable implementation details
- A new trigger metric `eventTimeWatermark` outputs the current value of the watermark.
- We mark the event time column in the `Attribute` metadata using the key `spark.watermarkDelay`. This allows downstream operations to know which column holds the event time. Operations like `window` propagate this metadata.
- `explain()` marks the watermark with a suffix of `-T${delayMs}` to ease debugging of how this information is propagated.
- Currently, we don't filter out late records, but instead rely on the state store to avoid emitting records that are both added and filtered in the same epoch.
### Remaining in this PR
- [ ] The test for recovery is currently failing as we don't record the watermark used in the offset log. We will need to do so to ensure determinism, but this is deferred until apache#15626 is merged.
### Other follow-ups
There are some natural additional features that we should consider for future work:
- Ability to write records that arrive too late to some external store in case any out-of-band remediation is required.
- `Update` mode so you can get partial results before a group is evicted.
- Other mechanisms for calculating the watermark. In particular a watermark based on quantiles would be more robust to outliers.
Author: Michael Armbrust <michael@databricks.com>
Closes apache#15702 from marmbrus/watermarks.
This PR adds a new method
withWatermarkto theDatasetAPI, which can be used specify an event time watermark. An event time watermark allows the streaming engine to reason about the point in time after which we no longer expect to see late data. This PR also has augmentedStreamExecutionto use this watermark for several purposes:Appendmode).groupByor when running inCompletemode).An example that emits windowed counts of records, waiting up to 5 minutes for late data to arrive.
Calculating the watermark.
The current event time is computed by looking at the
MAX(eventTime)seen this epoch across all of the partitions in the query minus some user defined delayThreshold. An additional constraint is that the watermark must increase monotonically.Note that since we must coordinate this value across partitions occasionally, the actual watermark used is only guaranteed to be at least
delaybehind the actual event time. In some cases we may still process records that arrive more than delay late.This mechanism was chosen for the initial implementation over processing time for two reasons:
Other notable implementation details
eventTimeWatermarkoutputs the current value of the watermark.Attributemetadata using the keyspark.watermarkDelay. This allows downstream operations to know which column holds the event time. Operations likewindowpropagate this metadata.explain()marks the watermark with a suffix of-T${delayMs}to ease debugging of how this information is propagated.Remaining in this PR
Other follow-ups
There are some natural additional features that we should consider for future work:
Updatemode so you can get partial results before a group is evicted.