diff --git a/docs/content/docs/dev/datastream/event-time/generating_watermarks.md b/docs/content/docs/dev/datastream/event-time/generating_watermarks.md index 88c8488ba8c4e..35bbd14712732 100644 --- a/docs/content/docs/dev/datastream/event-time/generating_watermarks.md +++ b/docs/content/docs/dev/datastream/event-time/generating_watermarks.md @@ -304,19 +304,21 @@ other sources/tasks which can move the combined watermark forward and that way u one. {{< hint warning >}} -**Note:** As of 1.15, Flink supports aligning across tasks of the same source and/or different -sources. It does not support aligning splits/partitions/shards in the same task. +**Note:** As of Flink 1.17, split level watermark alignment is supported by the FLIP-27 source framework. +Source connectors have to implement an interface to resume and pause splits so that splits/partitions/shards +can be aligned in the same task. More detail on the pause and resume interfaces can found in the [Source API]({{< ref "docs/dev/datastream/sources" >}}#split-level-watermark-alignment). -In a case where there are e.g. two Kafka partitions that produce watermarks at different pace, that -get assigned to the same task watermark might not behave as expected. Fortunately, worst case it -should not perform worse than without alignment. +If you are upgrading from a Flink version between 1.15.x and 1.16.x inclusive, you can disable split level alignment by setting +`pipeline.watermark-alignment.allow-unaligned-source-splits` to true. Moreover, you can tell if your source supports split level alignment +by checking if it throws an `UnsupportedOperationException` at runtime or by reading the javadocs. In this case, it would be desirable to +to disable split level watermark alignment to avoid fatal exceptions. -Given the limitation above, we suggest applying watermark alignment in two situations: - -1. You have two different sources (e.g. Kafka and File) that produce watermarks at different speeds -2. You run your source with parallelism equal to the number of splits/shards/partitions, which - results in every subtask being assigned a single unit of work. +When setting the flag to true, watermark alignment will be only working properly when the number of splits/shards/partitions is equal to the +parallelism of the source operator. This results in every subtask being assigned a single unit of work. On the other hand, if there are two Kafka partitions, which produce watermarks at different paces and +get assigned to the same task, then watermarks might not behave as expected. Fortunately, even in the worst case, the basic alignment should not perform worse than having no alignment at all. +Furthermore, Flink also supports aligning across tasks of the same sources and/or different +sources, which is useful when you have two different sources (e.g. Kafka and File) that produce watermarks at different speeds. {{< /hint >}} ## Writing WatermarkGenerators diff --git a/docs/content/docs/dev/datastream/sources.md b/docs/content/docs/dev/datastream/sources.md index 020738cdbb139..0b755dc10aa38 100644 --- a/docs/content/docs/dev/datastream/sources.md +++ b/docs/content/docs/dev/datastream/sources.md @@ -428,3 +428,9 @@ The data source API supports running watermark generators individually *per spli When implementing a source connector using the *Split Reader API*, this is automatically handled. All implementations based on the Split Reader API have split-aware watermarks out-of-the-box. For an implementation of the lower level `SourceReader` API to use split-aware watermark generation, the implementation must output events from different splits to different outputs: the *Split-local SourceOutputs*. Split-local outputs can be created and released on the main {{< gh_link file="flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java" name="ReaderOutput" >}} via the `createOutputForSplit(splitId)` and `releaseOutputForSplit(splitId)` methods. Please refer to the JavaDocs of the class and methods for details. + +#### Split Level Watermark Alignment + +Although source operator watermark alignment is handled by Flink runtime, the source needs to additionally implement `SourceReader#pauseOrResumeSplits` and `SplitReader#pauseOrResumeSplits` to achieve split level watermark alignment. Split level watermark alignment is useful for when +there are multiple splits assigned to a source reader. By default, these implementations will throw an `UnsupportedOperationException`, `pipeline.watermark-alignment.allow-unaligned-source-splits` is set to false, when there is more than one split assigned, and the split exceeds the watermark alignment threshold configured by the `WatermarkStrategy`. `SourceReaderBase` +contains an implementation for `SourceReader#pauseOrResumeSplits` so that inheriting sources only need to implement `SplitReader#pauseOrResumeSplits`. See the javadocs for more implementation hints. diff --git a/docs/layouts/shortcodes/generated/pipeline_configuration.html b/docs/layouts/shortcodes/generated/pipeline_configuration.html index d42329f6f9d8b..f2b8b660f5a53 100644 --- a/docs/layouts/shortcodes/generated/pipeline_configuration.html +++ b/docs/layouts/shortcodes/generated/pipeline_configuration.html @@ -138,7 +138,7 @@
pipeline.watermark-alignment.allow-unaligned-source-splits
false Boolean - If watermark alignment is used, sources with multiple splits will attempt to pause/resume split readers to avoid watermark drift of source splits. However, if split readers don't support pause/resume an UnsupportedOperationException will be thrown when there is an attempt to pause/resume. To allow use of split readers that don't support pause/resume and, hence, t allow unaligned splits while still using watermark alignment, set this parameter to true. The default value is false. Note: This parameter may be removed in future releases. + If watermark alignment is used, sources with multiple splits will attempt to pause/resume split readers to avoid watermark drift of source splits. However, if split readers don't support pause/resume, an UnsupportedOperationException will be thrown when there is an attempt to pause/resume. To allow use of split readers that don't support pause/resume and, hence, to allow unaligned splits while still using watermark alignment, set this parameter to true. The default value is false. Note: This parameter may be removed in future releases. diff --git a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java index 4c739e984268f..f8bacdf494bbf 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java @@ -303,10 +303,10 @@ public enum VertexDescriptionMode { "If watermark alignment is used, sources with multiple splits will " + "attempt to pause/resume split readers to avoid watermark " + "drift of source splits. " - + "However, if split readers don't support pause/resume an " + + "However, if split readers don't support pause/resume, an " + "UnsupportedOperationException will be thrown when there is " + "an attempt to pause/resume. To allow use of split readers that " - + "don't support pause/resume and, hence, t allow unaligned splits " + + "don't support pause/resume and, hence, to allow unaligned splits " + "while still using watermark alignment, set this parameter to true. " + "The default value is false. Note: This parameter may be " + "removed in future releases.");