Skip to content

Commit

Permalink
Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Nov 30, 2023
1 parent feed5fd commit 0c6a37f
Showing 1 changed file with 14 additions and 8 deletions.
22 changes: 14 additions & 8 deletions docs/flink-queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,24 +279,30 @@ DataStream<Row> stream = env.fromSource(source, WatermarkStrategy.noWatermarks()

### Emitting watermarks
Emitting watermarks from the source itself could be beneficial for several purposes, like harnessing the
[Flink Watermark Alignment](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment)
feature to prevent runaway readers, or providing triggers for [Flink windowing](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/).
[Flink Watermark Alignment](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment),
or prevent triggering [windows](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/windows/)
too early when reading multiple data files concurrently.

Enable watermark generation for an `IcebergSource` by setting the `watermarkColumn`.
The supported column types are `timestamp`, `timestamptz` and `long`.
Timestamp columns are automatically converted to milliseconds since the Java epoch of
1970-01-01T00:00:00Z. Use `watermarkTimeUnit` to configure the conversion for long columns.
Iceberg `timestamp` or `timestamptz` inherently contains the time precision. So there is no need
to specify the time unit. But `long` type column doesn't contain time unit information. Use
`watermarkTimeUnit` to configure the conversion for long columns.

The watermarks are generated based on column metrics stored for data files and emitted once per split.
When using watermarks for Flink watermark alignment set `read.split.open-file-cost` to prevent
combining multiple files to a single split.
If multiple smaller files with different time ranges are combined into a single split, it can increase
the out-of-orderliness and extra data buffering in the Flink state. The main purpose of watermark alignment
is to reduce out-of-orderliness and excess data buffering in the Flink state. Hence it is recommended to
set `read.split.open-file-cost` to a very large value to prevent combining multiple smaller files into a
single split.

By default, the column metrics are collected for the first 100 columns of the table. Use [write properties](configuration.md#write-properties) starting with `write.metadata.metrics` when needed.

```java
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");

// For windowing
// Ordered data file reads with windowing, using a timestamp column
DataStream<RowData> stream =
env.fromSource(
IcebergSource.forRowData()
Expand All @@ -311,7 +317,7 @@ DataStream<RowData> stream =
SOURCE_NAME,
TypeInformation.of(RowData.class));

// For watermark alignment
// Watermark alignment, using a long event time column
DataStream<RowData> stream =
env.fromSource(
IcebergSource source = IcebergSource.forRowData()
Expand Down

0 comments on commit 0c6a37f

Please sign in to comment.