Skip to content

Commit b4b2e95

Browse files
bartosz25dongjoon-hyun
authored andcommitted
[MINOR][SS][DOCS] Adapt multiple watermark policy comment to the reality
### What changes were proposed in this pull request? Previous comment was true for Apache Spark 2.3.0. The 2.4.0 release brought multiple watermark policy and therefore stating that the 'min' is always chosen is misleading. This PR updates the comments about multiple watermark policy. They aren't true anymore since in case of multiple watermarks, we can configure which one will be applied to the query. This change was brought with Apache Spark 2.4.0 release. ### Why are the changes needed? It introduces some confusion about the real execution of the commented code. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? The tests weren't added because the change is only about the documentation level. I affirm that the contribution is my original work and that I license the work to the project under the project's open source license. Closes #25832 from bartosz25/fix_comments_multiple_watermark_policy. Authored-by: bartosz25 <bartkonieczny@yahoo.fr> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
1 parent cd48177 commit b4b2e95

File tree

1 file changed

+4
-5
lines changed

1 file changed

+4
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ case object MinWatermark extends MultipleWatermarkPolicy {
6363
}
6464

6565
/**
66-
* Policy to choose the *min* of the operator watermark values as the global watermark value. So the
66+
* Policy to choose the *max* of the operator watermark values as the global watermark value. So the
6767
* global watermark will advance if any of the individual operator watermarks has advanced.
6868
* In other words, in a streaming query with multiple input streams and watermarks defined on all
6969
* of them, the global watermark will advance as fast as the fastest input. So if there is watermark
@@ -108,10 +108,9 @@ case class WatermarkTracker(policy: MultipleWatermarkPolicy) extends Logging {
108108
}
109109
}
110110

111-
// Update the global watermark to the minimum of all watermark nodes.
112-
// This is the safest option, because only the global watermark is fault-tolerant. Making
113-
// it the minimum of all individual watermarks guarantees it will never advance past where
114-
// any individual watermark operator would be if it were in a plan by itself.
111+
// Update the global watermark accordingly to the chosen policy. To find all available policies
112+
// and their semantics, please check the comments of
113+
// `org.apache.spark.sql.execution.streaming.MultipleWatermarkPolicy` implementations.
115114
val chosenGlobalWatermark = policy.chooseGlobalWatermark(operatorToWatermarkMap.values.toSeq)
116115
if (chosenGlobalWatermark > globalWatermarkMs) {
117116
logInfo(s"Updating event-time watermark from $globalWatermarkMs to $chosenGlobalWatermark ms")

0 commit comments

Comments
 (0)