Skip to content

Commit

Permalink
Flink: Document watermark generation feature
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Nov 28, 2023
1 parent 4e62b58 commit feed5fd
Showing 1 changed file with 52 additions and 0 deletions.
52 changes: 52 additions & 0 deletions docs/flink-queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,58 @@ DataStream<Row> stream = env.fromSource(source, WatermarkStrategy.noWatermarks()
"Iceberg Source as Avro GenericRecord", new GenericRecordAvroTypeInfo(avroSchema));
```

### Emitting watermarks
Emitting watermarks from the source itself could be beneficial for several purposes, like harnessing the
[Flink Watermark Alignment](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment)
feature to prevent runaway readers, or providing triggers for [Flink windowing](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/).

Enable watermark generation for an `IcebergSource` by setting the `watermarkColumn`.
The supported column types are `timestamp`, `timestamptz` and `long`.
Timestamp columns are automatically converted to milliseconds since the Java epoch of
1970-01-01T00:00:00Z. Use `watermarkTimeUnit` to configure the conversion for long columns.

The watermarks are generated based on column metrics stored for data files and emitted once per split.
When using watermarks for Flink watermark alignment set `read.split.open-file-cost` to prevent
combining multiple files to a single split.
By default, the column metrics are collected for the first 100 columns of the table. Use [write properties](configuration.md#write-properties) starting with `write.metadata.metrics` when needed.

```java
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");

// For windowing
DataStream<RowData> stream =
env.fromSource(
IcebergSource.forRowData()
.tableLoader(tableLoader)
// Watermark using timestamp column
.watermarkColumn("timestamp_column")
.build(),
// Watermarks are generated by the source, no need to generate it manually
WatermarkStrategy.<RowData>noWatermarks()
// Extract event timestamp from records
.withTimestampAssigner((record, eventTime) -> record.getTimestamp(pos, precision).getMillisecond()),
SOURCE_NAME,
TypeInformation.of(RowData.class));

// For watermark alignment
DataStream<RowData> stream =
env.fromSource(
IcebergSource source = IcebergSource.forRowData()
.tableLoader(tableLoader)
// Disable combining multiple files to a single split
.set(FlinkReadOptions.SPLIT_FILE_OPEN_COST, String.valueOf(TableProperties.SPLIT_SIZE_DEFAULT))
// Watermark using long column
.watermarkColumn("long_column")
.watermarkTimeUnit(TimeUnit.MILLI_SCALE)
.build(),
// Watermarks are generated by the source, no need to generate it manually
WatermarkStrategy.<RowData>noWatermarks()
.withWatermarkAlignment(watermarkGroup, maxAllowedWatermarkDrift),
SOURCE_NAME,
TypeInformation.of(RowData.class));
```

## Options

### Read options
Expand Down

0 comments on commit feed5fd

Please sign in to comment.