Skip to content

Commit

Permalink
Flink: Document watermark generation feature (apache#9179)
Browse files Browse the repository at this point in the history
  • Loading branch information
pvary authored and lisirrx committed Jan 4, 2024
1 parent 4afc8b4 commit 64a04c1
Showing 1 changed file with 69 additions and 0 deletions.
69 changes: 69 additions & 0 deletions docs/flink-queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,75 @@ DataStream<Row> stream = env.fromSource(source, WatermarkStrategy.noWatermarks()
"Iceberg Source as Avro GenericRecord", new GenericRecordAvroTypeInfo(avroSchema));
```

### Emitting watermarks
Emitting watermarks from the source itself could be beneficial for several purposes, like harnessing the
[Flink Watermark Alignment](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment),
or prevent triggering [windows](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/windows/)
too early when reading multiple data files concurrently.

Enable watermark generation for an `IcebergSource` by setting the `watermarkColumn`.
The supported column types are `timestamp`, `timestamptz` and `long`.
Iceberg `timestamp` or `timestamptz` inherently contains the time precision. So there is no need
to specify the time unit. But `long` type column doesn't contain time unit information. Use
`watermarkTimeUnit` to configure the conversion for long columns.

The watermarks are generated based on column metrics stored for data files and emitted once per split.
If multiple smaller files with different time ranges are combined into a single split, it can increase
the out-of-orderliness and extra data buffering in the Flink state. The main purpose of watermark alignment
is to reduce out-of-orderliness and excess data buffering in the Flink state. Hence it is recommended to
set `read.split.open-file-cost` to a very large value to prevent combining multiple smaller files into a
single split. The negative impact (of not combining small files into a single split) is on read throughput,
especially if there are many small files. In typical stateful processing jobs, source read throughput is not
the bottleneck. Hence this is probably a reasonable tradeoff.

This feature requires column-level min-max stats. Make sure stats are generated for the watermark column
during write phase. By default, the column metrics are collected for the first 100 columns of the table.
If watermark column doesn't have stats enabled by default, use
[write properties](configuration.md#write-properties) starting with `write.metadata.metrics` when needed.

The following example could be useful if watermarks are used for windowing. The source reads Iceberg data files
in order, using a timestamp column and emits watermarks:
```java
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");

DataStream<RowData> stream =
env.fromSource(
IcebergSource.forRowData()
.tableLoader(tableLoader)
// Watermark using timestamp column
.watermarkColumn("timestamp_column")
.build(),
// Watermarks are generated by the source, no need to generate it manually
WatermarkStrategy.<RowData>noWatermarks()
// Extract event timestamp from records
.withTimestampAssigner((record, eventTime) -> record.getTimestamp(pos, precision).getMillisecond()),
SOURCE_NAME,
TypeInformation.of(RowData.class));
```

Example for reading Iceberg table using a long event column for watermark alignment:
```java
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");

DataStream<RowData> stream =
env.fromSource(
IcebergSource source = IcebergSource.forRowData()
.tableLoader(tableLoader)
// Disable combining multiple files to a single split
.set(FlinkReadOptions.SPLIT_FILE_OPEN_COST, String.valueOf(TableProperties.SPLIT_SIZE_DEFAULT))
// Watermark using long column
.watermarkColumn("long_column")
.watermarkTimeUnit(TimeUnit.MILLI_SCALE)
.build(),
// Watermarks are generated by the source, no need to generate it manually
WatermarkStrategy.<RowData>noWatermarks()
.withWatermarkAlignment(watermarkGroup, maxAllowedWatermarkDrift),
SOURCE_NAME,
TypeInformation.of(RowData.class));
```

## Options

### Read options
Expand Down

0 comments on commit 64a04c1

Please sign in to comment.