From 0c6a37f2abddf7427a9d6f30afdb3bb07914375f Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Thu, 30 Nov 2023 13:24:29 +0100 Subject: [PATCH] Review comments --- docs/flink-queries.md | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/docs/flink-queries.md b/docs/flink-queries.md index 7e849aba10db..48afbacda1ac 100644 --- a/docs/flink-queries.md +++ b/docs/flink-queries.md @@ -279,24 +279,30 @@ DataStream stream = env.fromSource(source, WatermarkStrategy.noWatermarks() ### Emitting watermarks Emitting watermarks from the source itself could be beneficial for several purposes, like harnessing the -[Flink Watermark Alignment](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment) -feature to prevent runaway readers, or providing triggers for [Flink windowing](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/). +[Flink Watermark Alignment](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment), +or prevent triggering [windows](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/windows/) +too early when reading multiple data files concurrently. Enable watermark generation for an `IcebergSource` by setting the `watermarkColumn`. The supported column types are `timestamp`, `timestamptz` and `long`. -Timestamp columns are automatically converted to milliseconds since the Java epoch of -1970-01-01T00:00:00Z. Use `watermarkTimeUnit` to configure the conversion for long columns. +Iceberg `timestamp` or `timestamptz` inherently contains the time precision. So there is no need +to specify the time unit. But `long` type column doesn't contain time unit information. Use +`watermarkTimeUnit` to configure the conversion for long columns. The watermarks are generated based on column metrics stored for data files and emitted once per split. -When using watermarks for Flink watermark alignment set `read.split.open-file-cost` to prevent -combining multiple files to a single split. +If multiple smaller files with different time ranges are combined into a single split, it can increase +the out-of-orderliness and extra data buffering in the Flink state. The main purpose of watermark alignment +is to reduce out-of-orderliness and excess data buffering in the Flink state. Hence it is recommended to +set `read.split.open-file-cost` to a very large value to prevent combining multiple smaller files into a +single split. + By default, the column metrics are collected for the first 100 columns of the table. Use [write properties](configuration.md#write-properties) starting with `write.metadata.metrics` when needed. ```java StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path"); -// For windowing +// Ordered data file reads with windowing, using a timestamp column DataStream stream = env.fromSource( IcebergSource.forRowData() @@ -311,7 +317,7 @@ DataStream stream = SOURCE_NAME, TypeInformation.of(RowData.class)); -// For watermark alignment +// Watermark alignment, using a long event time column DataStream stream = env.fromSource( IcebergSource source = IcebergSource.forRowData()