-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-26655] [SS] Support multiple aggregates in append mode #23576
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This patch proposes to add support for multiple aggregates in append mode. In append mode, the aggregates are emitted only after the watermark passes the threshold (e.g. the window boundary) and the emitted value is not affected by further late data. This allows to chain multiple aggregates in 'Append' output mode without worrying about retractions etc. However the current event time watermarks in structured streaming are tracked at a global level and this does not work when aggregates are chained. The downstream watermarks usually lags the ones before and the global (min or max) watermarks will not let the stages make progress independently. The patch tracks the watermarks at each (stateful) operator so that the aggregate outputs are generated when the watermark passes the thresholds at the corresponding stateful operator. The values are also saved into the commit/offset logs (similar to global watermark) Each aggregate should have a corresponding watermark defined while creating the query (E.g. via withWatermark) and this is used to track the progress of event time corresponding to the stateful operator.
|
Test build #101380 has finished for PR 23576 at commit
|
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would spend more time to take a look deeply on the code change, but the concept looks good to me. Left some comments in test part.
It would be also better to describe some of use cases here.
| // Latest watermark value is more than that used in this previous executed plan | ||
| val watermarkHasChanged = | ||
| eventTimeWatermark.isDefined && newMetadata.batchWatermarkMs > eventTimeWatermark.get | ||
| eventTimeWatermark.isDefined && getWatermark(newMetadata) > eventTimeWatermark.get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: two spaces after &&
| testStream(windowedAggregation)( | ||
| AddData(inputData, 10, 11, 11, 12, 12), | ||
| CheckNewAnswer(), | ||
| AddData(inputData, 25), // watermark -> group1 = 15, group2 = 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might better to explain which rows are emitted from the first aggregation to help developers (reviewers for now) to verify the value of watermark for the second aggregation.
(I can imagine that would be (10, 15, 5), but better to show it explicitly.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I see you're adding current states in other test. It would be ideal we have similar level of explanation here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added more comments
| CheckNewAnswer(), | ||
| AddData(inputData, 25), // watermark -> group1 = 15, group2 = 10 | ||
| CheckNewAnswer(), | ||
| assertNumTotalStateRows(3), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here: better to explain them, especially we have two different state operators.
| AddData(inputData, 26, 26, 27), | ||
| CheckNewAnswer(), | ||
| AddData(inputData, 40), // watermark -> group1 = 30 , group2 = 25 | ||
| CheckNewAnswer((15, 1, 1), (15, 2, 2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This query doesn't look intuitive and looks a bit hard to track since second aggregation groups aggregated value on first aggregation.
Could we change the query a bit? I guess one of example would be having user as column, and grouping by window and user in the first aggregation (so aggregated per window and user), and grouping by window in the second aggregation (so aggregated per window - global users).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to check with multiple keys as part of the second group as well. I have added comments so that its easy to follow. Also added an additional test case like the one you suggested, with required comments.
| test("multiple aggregates in append mode recovery") { | ||
| val inputData = MemoryStream[Int] | ||
|
|
||
| val windowedAggregation = inputData.toDF() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here: I would ask to consider changing query.
|
Test build #101415 has finished for PR 23576 at commit
|
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall. Left some comments.
| val watermarkAttributes = aggregate.groupingExpressions.collect { | ||
| case a: Attribute if a.metadata.contains(EventTimeWatermark.delayKey) => a | ||
| } | ||
| aggregates.foreach(aggregate => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: foreach { aggregate =>
https://github.com/databricks/scala-style-guide/blob/master/README.md#anonymous-methods
| // since watermark of group2 is at 10 | ||
| CheckNewAnswer(), | ||
| assertNumTotalStateRows(3), // {[25-30],25} -> 1 in state1 and | ||
| // {[30-35],1} -> 1, {[30-35],1} -> 1 {[30-35],2} -> 2 in state2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
window2 [15 - 20] -> (1,1), (2,2) is retained in state2 and explanation here are different. Looks like former is correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes correct
| // window1 [40 - 45] -> (40,1) is emitted down | ||
| // window1 [55 - 60] -> (55, 1) is retained in state1 | ||
| // window2 [30 - 35] -> (1,2), (2,1) is emitted out | ||
| // window2 [40 - 45] -> (40, 1) is retained in state2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(1, 1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, thanks for noticing.
| case s: StatefulOperator => s | ||
| } | ||
|
|
||
| statefulOperators.foreach(statefulOperator => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here for nit: foreach { statefulOperator =>
| } | ||
|
|
||
| // compute watermark for the stateful operator node | ||
| statefulOperator.stateInfo.foreach(state => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here for nit: foreach { state =>
| updateWaterMarkMap(eventTimeExecs, | ||
| statefulOperatorToEventTimeMap.getOrElseUpdate(state.operatorId, | ||
| new mutable.HashMap[Int, Long]())) | ||
| val newWatermarkMs = statefulOperatorToEventTimeMap(state.operatorId).values.toSeq.min |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may want to apply watermark policy also here, like policy.chooseGlobalWatermark(statefulOperatorToEventTimeMap(state.operatorId).values.toSeq)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think with global watermark there needs to be a way to make progress across all stateful operators and looks like min did not work in all cases.
But I am not sure if it would make sense to choose max for the individual operator level watermark. If event times in one of the inputs is lagging, the best would be to not advance the watermark beyond it. Watermarks should ideally advance when all input data has been observed and choosing max would cause more events to be discarded as late data.
IMO we can just choose min here but would like to hear opinion from other reviewers as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I also would defer to other reviewers as well. Thanks for sharing your thought about this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to record here, I agree this should be just min.
|
cc. to @zsxwing and @gaborgsomogyi as well |
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except deferred of thing of decision: whether to always use min or as user configured (max or min) for watermark. I guess other reviewers will provide opinions so LGTM here.
|
Test build #101655 has finished for PR 23576 at commit
|
|
(Just leaving a note that I am looking at this; it's taking me a while to think through the details of the watermarking.) |
|
@jose-torres , did you get a chance to take a look ? Let me know how to take it forward. |
jose-torres
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs to be a subset of a more general proposal to make watermarks non-global. It’s not obvious to me that it’s valid for a stateful operator to reach through an arbitrary child and grab a watermark from the other side.
|
|
||
| statefulOperators.foreach { statefulOperator => | ||
| // find the first event time child node(s) | ||
| val eventTimeExecs = statefulOperator match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m not sure I understand this. Don’t we throw away this val immediately when we leave the foreach scope?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use this to finally update the statefulOperatorToWatermark below (its passed to updateWaterMarkMap).
So basically we collect all the event time (via EventTimeWaterMarkExec) inputs to a stateful operator. The (input) watermark of that stateful operator is then the minimum of all the input watermarks (the event times minus the lag) coming into that node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But what's the value of eventTimeExecs when we reach line 145? I don't understand how it's in scope at all, and if it is it seems that it would have only the value computed for the last stateful operator in statefulOperators. Maybe there's some Scala magic I'm missing here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually line: 145 is inside the statefulOperators.foreach. Maybe I need to refactor it out to a separate method to make it more readable.
@jose-torres, we don't pick an arbitrary child. We consider all the event time inputs (children) to the stateful operator and compute the watermark as the minimum of all the input watermarks (which is the EventTime minus the lag of each child EventTimeWatermarkExec). |
Do you have some idea/plan in mind for non-global watermark? Just curious, because that might be one of major change on concept. |
I am not sure if you mean the way the watermarks are computed or propagate...anyways that seems orthogonal. Here we compute the operator watermark as minimum of input watermarks which should hold irrespective. I don't see another reasonable way to make progress at each operator. |
|
I'd agree that min is the only reasonable way to compute an operator watermark. What I think we need a design for is operator watermarks in general, and how they slot into the rest of Spark. Questions I worry can't be addressed by a PR include:
|
Typically C cannot since A is the input watermark of B and assuming it does some aggregation, it needs to emit a new watermark. Theres a new check in the input.withWatermark("ts", ...)
.groupBy(window($"ts", ...), $"key").count()
.select($"window.end" as "windowts", $"count")
.withWatermark("windowts", ...)
.groupBy(window($"windowts", ...), $"count").count()
Need to check if it would interfere with multiple watermarks or we need any new rules.
I thought |
|
I have slightly modified the watermark computation logic and the details are documented here and attached to the JIRA. Please review. |
|
Test build #102523 has finished for PR 23576 at commit
|
|
Test build #102564 has finished for PR 23576 at commit
|
|
ping @jose-torres, @tdas for further reviews. |
|
I'm not going to give a hard no, but my review is that I don't think we should move forward with this PR without a full design for the new concept of operator-local watermarks. |
|
@jose-torres , thanks for the comment. The current approach allows the users to chain aggregates in append mode as long as they define a watermark per aggregate. Its a reasonable approach I could figure out within the existing framework. We could relax this restriction but need a way to choose the timestamp field per aggregate if the user does not explicitly specify it. The other alternative I explored would require a separate watermark channel and would need more disruptive changes. Could you review the doc and comment on the areas you would like more info and/or alternatives that can be explored so that we can agree on a more solid design to proceed further ? |
|
I would want a proposal that's a commit rather than a diff, if that makes sense. Something in the form of:
What I'm worried about is possibilities like this. Suppose we decide that we want to support multiple aggregates in complete mode in 3.1, and realize that we need a separate watermark channel in that case. Then we'll be stuck; we will be forced to either break the semantic we just added, or establish a weird piecemeal semantic where you specify watermarks differently depending on the shape of your query. |
|
The modes and watermarks should be independent, but anyways let me explore it a bit further and also try to address your other points in the design. |
|
This PR is definitely very interesting, thanks ! Having the support for multiple aggregations in streaming mode is very important. I saw the design document for wider watermark support got no comments. I added a comment |
|
Output watermark can be computed as some function of input watermark and the timestamp of events at that operator (e.g. min(input watermarks, timestamps of oldest event at that node)) so we could compute the other by storing only the input watermark. For now, we require the user to provide a timestamp column + lag using “withWatermark()” before each aggregate operation. Here the window.end of the first groupBy is the output watermark which becomes the input watermark of the second groupBy. Also note that the input water mark of an operator is propagated to the next operator only in the next batch so that it processes the events first and then the watermark. Let me know the specific cases where you found issues. |
|
I've left comment in the doc. Sorry I shouldn't leave comment here to make confusion. Let's talk in doc. Btw I guess this patch is not addressing the doc yet, then you may want to mark this patch as |
|
@arunmahadevan any updates on this feature or on the watermark design document ? |
|
I revisited and thought about this briefly, and felt that the watermark and mode Spark provide are different with other frameworks. Append mode is tricky if you are familiar with other frameworks. In Append mode, Spark tries to ensure there's only one output for each key, which "delay threshold" is taken into consideration as well. AFAIK, Flink emits another output if late but allowed tuple comes later than watermark and updates output, hence dealing with "upsert" is necessary. (Not sure for Beam but I guess Flink follows the Beam model so I would expect similar.) In Spark, "upsert" is still yet defined for DSv2, and hence UPDATE mode will be disabled for Spark 3. (#23859) Suppose there's stateful operator OP1 with batch B2, and watermark is defined before OP1 with delay threshold set to 1hr. The range of outputs OP1 can emit in B2 are following:
as it denotes that outputs which were not evicted (emitted) from previous batch but match condition of evicting (emitting) for this batch. If we have OP2 having OP1 as upstream, it will retrieve outputs as above, and to not drop any intermediate outputs, either 1) OP2 should inherit WM(OP1B1) as WM(OP2B2) and also have equal or bigger delay threshold, or 2) OP2 should define WM(OP2B2) as Maybe that's less important, as I can't think of safe approach in current status of Spark. I think Spark may need to make some changes before introducing advanced features. I think the main issue of Spark Structured Streaming is being "flexible" on watermark, flexible enough to let end users mess up their query easily. I assume other frameworks have special field for "event time" and prevent modifying the field, but for Spark it's just same as other column and open for modification. If event time is modified, it's no longer in line with watermark and the result would be indeterministic. Same for Similarly, which is event time column for stream-stream joined output where event time column is defined per each input? I'm not seeing clear definition of this. I'd in favor to let streaming engine manages event time and watermark once value of event time is defined, and restrict end users to modify event time (one-time update). To achieve this, each row should have meta-column of "event time", and once it's defined, further update should be done only from Spark side - each stateful operator needs to decide the event time of output according to input(s) and its watermark. (e.g. for windowed aggregation, "window.start" should be used for "event time" and it shouldn't be changed.) That's the only way Spark could ensure event time and watermark are in sync during multiple stateful operations. |
|
Hi @HeartSaVioR, thanks for your feedack. Regarding late data with Beam, indeed when an element comes behind the watermark but before allowed lateness it delays window closing. So the element that comes in that ranges counts in the output data. If it comes after allowed lateness, it is dropped. Regarding output mode, most of Beam runners (spark for ex) support discarding output in which element from different windows are independent and previous states are dropped. It seems very similar to spark append mode. Regarding event time, indeed beam forbids modifying it and also there is an event time per element the same way you suggest event time per row in spark. Also our answer to join-join stream watermark is: we take the minimum of the output watermark of previous stages. But that is because Beam WM is based on the minimum event timestamp seen. Also stateful operators do not change the event timestamp, no body does. That is why we defined input and output watermark to introduce this delay. Still there is something I do not understand. With previous Spark Dstream framework, multiple-aggregations were supported. What has changed in Spark watermark behavior that makes it not supported now with Structured Streaming ? |
I'm not sure I understand it correctly. The point for Append mode is, output for specific key (key shouldn't be necessary to be windowed, but should include "event time" column) will be provided only once in any case (orthogonal to fault tolerance, and doesn't mean "exactly-once" here), regardless of allowed lateness, no case of "upsert". If Beam doesn't close the window when watermark passes by (but still doesn't pass by allowed lateness) but triggers window and emits the output of window so far (so output could be emitted multiple times), it's not compatible with Spark's Append mode. stream-stream join should decide which "event time" should be taken even we change the way of storing event time, as there're two rows being joined. How Beam decides "event time" for new record from two records? In column based event time (current Spark), it should be hard to choose "min" or "max" of event time, as which column to pick as event time should be decided by query plan phase. |
Sorry I'm not aware of DStream's behavior, kinda started from structured streaming and didn't mind DStream much. But as there's no notion of event time and watermark in DStream doc, I'd rather avoid dealing with DStream for event time processing. When you're playing with DStream, you'll likely be doing processing time, with limit to batch duration. http://spark.apache.org/docs/latest/streaming-programming-guide.html |
Beam does not trigger output unless the watermark pass the end of window + allowed lateness. There is no triggering between end of window and allowed lateness. Close and output is at the same time.
Ah I thought we were talking about watermark. For choosing the event timestamp, Beam uses a TimestampCombiner which default policy is to set the resulting timestamp to the end of the window for new record. |
Ah OK I see. That looks similar as Append mode. That's a bit different from what I read a book for Flink so assuming there're some differences between Beam and Flink... (BTW I also read "Streaming Systems", though it mostly explains theory and not having pretty much details on Beam.)
That seems to only explain the case where window is applied. How it works for other cases? Does it keep the origin event timestamp as it is? In windowed stream-stream join it also makes sense, but there're also non-windowed stream-stream join as well, and then output should have only one event time whereas there're two inputs. |
Windows are mandatory in streaming mode in Beam (otherwise there is no trigger time and no output). But if you are in batch mode (the only case where you can have no window), then the timestamps of all elements are set to +INF. PS: I'm simplifying bit, in reality we can replace windows by configured triggers that can be based on the number of elements or processing time but as they don't exist in spark I did not mention them here. |
|
@HeartSaVioR will you be at the ApacheCon in September so that we can meet in person and discuss these topics ? |
|
Maybe I need to go through both Beam and Flink to understand the details and discuss in detailed level. That might take some time as I may need to spend my own time to cover it. And @echauchot thanks for asking but I wouldn't be at the ApacheCon - I might consider attending the event eventually when ApacheCon plans to hold in east Asia (I'm in S.Korea). You may also want to consider that I'm just a one of contributors in Spark project, and without long-term support (shepherd) from community (committers/PMC members), I couldn't put efforts for this huge major feature. (So if you have a chance to meet some PMC members of Apache Spark in person, it would be better chance for you.) Moreover the necessary efforts seem to be beyond which I could spend my own time, so persuading my employer might be also needed. |
|
cc. @tdas @zsxwing @jose-torres to see whether committers in this area are interested on this topic or not. |
|
@HeartSaVioR thanks for pinging the right guys ! |
|
We're closing this PR because it hasn't been updated in a while. If you'd like to revive this PR, please reopen it! |
|
I checked in the structured streaming programming guide for 3.0 in case it was addressed by another PR and the guide missed an update with 3.0. But it seems it is still unsupported. Any luck reopening this PR so that we can have multiple aggregations in spark in streaming mode ? It is a valuable feature IMHO ! |
|
I too agree that this PR is very valuable there are lot of use cases spark could be efficiently used for real-time streaming use cases. |
|
This PR is reopened based on the community request ( @echauchot and @balajij81 's ). |
|
Ping @arunmahadevan since he is the author. If @HeartSaVioR wants this PR, I believe @HeartSaVioR can take over this when the main author, @arunmahadevan , is busy. Of course, @arunmahadevan is the main author and @HeartSaVioR will be the co-author in the commit. It's up to him and @arunmahadevan . Also, cc @tdas , @zsxwing , @jose-torres , @cloud-fan , @gatorsmile , @dbtsai . |
|
Test build #128921 has finished for PR 23576 at commit
|
|
Unfortunately I would say the conceptual change of watermark should go through some sort of discussion (or even SPIP), which means it's no-op unless we have enough committers who are interested and be willing to support it. That said, if @arunmahadevan has a willingness to re-visit this, then I can add myself to the reviewer of the design doc and the following PR, but I probably wouldn't be sufficient. I might take it up if I see at least 3 committers on this area (or committers at least want to follow up the area and approve once they feel qualified) are promised to be committed on this topic. But before that I really want to drive my own discussion topics in dev. mailing list which I don't have any input from committers. |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
This patch proposes to add support for multiple aggregates in append mode. In
append mode, the aggregates are emitted only after the watermark passes
the threshold (e.g. the window boundary) and the emitted value is not
affected by further late data. This allows to chain multiple aggregates
in 'Append' output mode without worrying about retractions etc.
However the current event time watermarks in structured streaming are
tracked at a global level and this does not work when aggregates are
chained. The downstream watermarks usually lags the ones before and the
global (min or max) watermarks will not let the stages make progress
independently.
The patch tracks the watermarks at each (stateful)
operator so that the aggregate outputs are generated when the watermark
passes the thresholds at the corresponding stateful operator. The values
are also saved into the commit/offset logs (similar to global watermark)
Each aggregate should have a corresponding watermark defined while
creating the query (E.g. via withWatermark) and this is used to
track the progress of event time corresponding to the stateful operator.
How was this patch tested?
New and existing unit tests
Please review http://spark.apache.org/contributing.html before opening a pull request.