diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala index 76ab1284633b..b0f8cf9cd184 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala @@ -63,7 +63,7 @@ case object MinWatermark extends MultipleWatermarkPolicy { } /** - * Policy to choose the *min* of the operator watermark values as the global watermark value. So the + * Policy to choose the *max* of the operator watermark values as the global watermark value. So the * global watermark will advance if any of the individual operator watermarks has advanced. * In other words, in a streaming query with multiple input streams and watermarks defined on all * of them, the global watermark will advance as fast as the fastest input. So if there is watermark @@ -108,10 +108,9 @@ case class WatermarkTracker(policy: MultipleWatermarkPolicy) extends Logging { } } - // Update the global watermark to the minimum of all watermark nodes. - // This is the safest option, because only the global watermark is fault-tolerant. Making - // it the minimum of all individual watermarks guarantees it will never advance past where - // any individual watermark operator would be if it were in a plan by itself. + // Update the global watermark accordingly to the chosen policy. To find all available policies + // and their semantics, please check the comments of + // `org.apache.spark.sql.execution.streaming.MultipleWatermarkPolicy` implementations. val chosenGlobalWatermark = policy.chooseGlobalWatermark(operatorToWatermarkMap.values.toSeq) if (chosenGlobalWatermark > globalWatermarkMs) { logInfo(s"Updating event-time watermark from $globalWatermarkMs to $chosenGlobalWatermark ms")