Skip to content

Commit

Permalink
[FLINK-28853] Document the split level watermark alignment feature an…
Browse files Browse the repository at this point in the history
…d fixup grammar from the configuration table
  • Loading branch information
mas-chen authored and mxm committed May 26, 2023
1 parent 3b53f30 commit 678370b
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -304,19 +304,21 @@ other sources/tasks which can move the combined watermark forward and that way u
one.

{{< hint warning >}}
**Note:** As of 1.15, Flink supports aligning across tasks of the same source and/or different
sources. It does not support aligning splits/partitions/shards in the same task.
**Note:** As of Flink 1.17, split level watermark alignment is supported by the FLIP-27 source framework.
Source connectors have to implement an interface to resume and pause splits so that splits/partitions/shards
can be aligned in the same task. More detail on the pause and resume interfaces can found in the [Source API]({{< ref "docs/dev/datastream/sources" >}}#split-level-watermark-alignment).

In a case where there are e.g. two Kafka partitions that produce watermarks at different pace, that
get assigned to the same task watermark might not behave as expected. Fortunately, worst case it
should not perform worse than without alignment.
If you are upgrading from a Flink version between 1.15.x and 1.16.x inclusive, you can disable split level alignment by setting
`pipeline.watermark-alignment.allow-unaligned-source-splits` to true. Moreover, you can tell if your source supports split level alignment
by checking if it throws an `UnsupportedOperationException` at runtime or by reading the javadocs. In this case, it would be desirable to
to disable split level watermark alignment to avoid fatal exceptions.

Given the limitation above, we suggest applying watermark alignment in two situations:

1. You have two different sources (e.g. Kafka and File) that produce watermarks at different speeds
2. You run your source with parallelism equal to the number of splits/shards/partitions, which
results in every subtask being assigned a single unit of work.
When setting the flag to true, watermark alignment will be only working properly when the number of splits/shards/partitions is equal to the
parallelism of the source operator. This results in every subtask being assigned a single unit of work. On the other hand, if there are two Kafka partitions, which produce watermarks at different paces and
get assigned to the same task, then watermarks might not behave as expected. Fortunately, even in the worst case, the basic alignment should not perform worse than having no alignment at all.

Furthermore, Flink also supports aligning across tasks of the same sources and/or different
sources, which is useful when you have two different sources (e.g. Kafka and File) that produce watermarks at different speeds.
{{< /hint >}}

## Writing WatermarkGenerators
Expand Down
6 changes: 6 additions & 0 deletions docs/content/docs/dev/datastream/sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -428,3 +428,9 @@ The data source API supports running watermark generators individually *per spli
When implementing a source connector using the *Split Reader API*, this is automatically handled. All implementations based on the Split Reader API have split-aware watermarks out-of-the-box.

For an implementation of the lower level `SourceReader` API to use split-aware watermark generation, the implementation must output events from different splits to different outputs: the *Split-local SourceOutputs*. Split-local outputs can be created and released on the main {{< gh_link file="flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java" name="ReaderOutput" >}} via the `createOutputForSplit(splitId)` and `releaseOutputForSplit(splitId)` methods. Please refer to the JavaDocs of the class and methods for details.

#### Split Level Watermark Alignment

Although source operator watermark alignment is handled by Flink runtime, the source needs to additionally implement `SourceReader#pauseOrResumeSplits` and `SplitReader#pauseOrResumeSplits` to achieve split level watermark alignment. Split level watermark alignment is useful for when
there are multiple splits assigned to a source reader. By default, these implementations will throw an `UnsupportedOperationException`, `pipeline.watermark-alignment.allow-unaligned-source-splits` is set to false, when there is more than one split assigned, and the split exceeds the watermark alignment threshold configured by the `WatermarkStrategy`. `SourceReaderBase`
contains an implementation for `SourceReader#pauseOrResumeSplits` so that inheriting sources only need to implement `SplitReader#pauseOrResumeSplits`. See the javadocs for more implementation hints.
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@
<td><h5>pipeline.watermark-alignment.allow-unaligned-source-splits</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If watermark alignment is used, sources with multiple splits will attempt to pause/resume split readers to avoid watermark drift of source splits. However, if split readers don't support pause/resume an UnsupportedOperationException will be thrown when there is an attempt to pause/resume. To allow use of split readers that don't support pause/resume and, hence, t allow unaligned splits while still using watermark alignment, set this parameter to true. The default value is false. Note: This parameter may be removed in future releases.</td>
<td>If watermark alignment is used, sources with multiple splits will attempt to pause/resume split readers to avoid watermark drift of source splits. However, if split readers don't support pause/resume, an UnsupportedOperationException will be thrown when there is an attempt to pause/resume. To allow use of split readers that don't support pause/resume and, hence, to allow unaligned splits while still using watermark alignment, set this parameter to true. The default value is false. Note: This parameter may be removed in future releases.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,10 @@ public enum VertexDescriptionMode {
"If watermark alignment is used, sources with multiple splits will "
+ "attempt to pause/resume split readers to avoid watermark "
+ "drift of source splits. "
+ "However, if split readers don't support pause/resume an "
+ "However, if split readers don't support pause/resume, an "
+ "UnsupportedOperationException will be thrown when there is "
+ "an attempt to pause/resume. To allow use of split readers that "
+ "don't support pause/resume and, hence, t allow unaligned splits "
+ "don't support pause/resume and, hence, to allow unaligned splits "
+ "while still using watermark alignment, set this parameter to true. "
+ "The default value is false. Note: This parameter may be "
+ "removed in future releases.");
Expand Down

0 comments on commit 678370b

Please sign in to comment.