Skip to content

Commit 740ad3d

Browse files
leonardBangwuchong
authored andcommitted
[FLINK-22356][hive][filesystem] Fix partition-time commit failure when watermark is applied defined TIMESTAMP_LTZ column
This closes #15709
1 parent 00865d0 commit 740ad3d

File tree

9 files changed

+339
-34
lines changed

9 files changed

+339
-34
lines changed

docs/content.zh/docs/connectors/table/filesystem.md

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,12 @@ To define when to commit a partition, providing partition commit trigger:
225225
<td>Duration</td>
226226
<td>The partition will not commit until the delay time. If it is a daily partition, should be '1 d', if it is a hourly partition, should be '1 h'.</td>
227227
</tr>
228+
<tr>
229+
<td><h5>sink.partition-commit.watermark-time-zone</h5></td>
230+
<td style="word-wrap: break-word;">UTC</td>
231+
<td>String</td>
232+
<td>The time zone to parse the long watermark value to TIMESTAMP value, the parsed watermark timestamp is used to compare with partition time to decide the partition should commit or not. This option is only take effect when `sink.partition-commit.trigger` is set to 'partition-time'. If this option is not configured correctly, e.g. source rowtime is defined on TIMESTAMP_LTZ column, but this config is not configured, then users may see the partition committed after a few hours. The default value is 'UTC', which means the watermark is defined on TIMESTAMP column or not defined. If the watermark is defined on TIMESTAMP_LTZ column, the time zone of watermark is the session time zone. The option value is either a full name such as 'America/Los_Angeles', or a custom timezone id such as 'GMT-8:00'.</td>
233+
</tr>
228234
</tbody>
229235
</table>
230236

@@ -401,15 +407,15 @@ The parallelism of writing files into external file system (including Hive) can
401407

402408
## Full Example
403409

404-
The below shows how the file system connector can be used to write a streaming query to write data from Kafka into a file system and runs a batch query to read that data back out.
410+
The below examples show how the file system connector can be used to write a streaming query to write data from Kafka into a file system and runs a batch query to read that data back out.
405411

406412
```sql
407413

408414
CREATE TABLE kafka_table (
409415
user_id STRING,
410416
order_amount DOUBLE,
411417
log_ts TIMESTAMP(3),
412-
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
418+
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP column
413419
) WITH (...);
414420

415421
CREATE TABLE fs_table (
@@ -438,4 +444,44 @@ FROM kafka_table;
438444
SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';
439445
```
440446

447+
If the watermark is defined on TIMESTAMP_LTZ column and used `partition-time` to commit, the `sink.partition-commit.watermark-time-zone` is required to set to the session time zone, otherwise the partition committed may happen after a few hours.
448+
```sql
449+
450+
CREATE TABLE kafka_table (
451+
user_id STRING,
452+
order_amount DOUBLE,
453+
ts BIGINT, -- time in epoch milliseconds
454+
ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
455+
WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP_LTZ column
456+
) WITH (...);
457+
458+
CREATE TABLE fs_table (
459+
user_id STRING,
460+
order_amount DOUBLE,
461+
dt STRING,
462+
`hour` STRING
463+
) PARTITIONED BY (dt, `hour`) WITH (
464+
'connector'='filesystem',
465+
'path'='...',
466+
'format'='parquet',
467+
'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
468+
'sink.partition-commit.delay'='1 h',
469+
'sink.partition-commit.trigger'='partition-time',
470+
'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- Assume user configured time zone is 'Asia/Shanghai'
471+
'sink.partition-commit.policy.kind'='success-file'
472+
);
473+
474+
-- streaming sql, insert into file system table
475+
INSERT INTO fs_table
476+
SELECT
477+
user_id,
478+
order_amount,
479+
DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'),
480+
DATE_FORMAT(ts_ltz, 'HH')
481+
FROM kafka_table;
482+
483+
-- batch sql, select with partition pruning
484+
SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';
485+
```
486+
441487
{{< top >}}

docs/content.zh/docs/connectors/table/hive/hive_read_write.md

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1') SELECT 'Tom
346346
visible - incrementally. Users control when/how to trigger commits with several properties. Insert
347347
overwrite is not supported for streaming write.
348348

349-
The below shows how the streaming sink can be used to write a streaming query to write data from Kafka into a Hive table with partition-commit,
349+
The below examples show how the streaming sink can be used to write a streaming query to write data from Kafka into a Hive table with partition-commit,
350350
and runs a batch query to read that data back out.
351351

352352
Please see the [streaming sink]({{< ref "docs/connectors/table/filesystem" >}}#streaming-sink) for a full list of available configurations.
@@ -369,7 +369,7 @@ CREATE TABLE kafka_table (
369369
user_id STRING,
370370
order_amount DOUBLE,
371371
log_ts TIMESTAMP(3),
372-
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
372+
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP column
373373
) WITH (...);
374374

375375
-- streaming sql, insert into hive table
@@ -382,6 +382,39 @@ SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';
382382

383383
```
384384

385+
If the watermark is defined on TIMESTAMP_LTZ column and used `partition-time` to commit, the `sink.partition-commit.watermark-time-zone` is required to set to the session time zone, otherwise the partition committed may happen after a few hours.
386+
```sql
387+
388+
SET table.sql-dialect=hive;
389+
CREATE TABLE hive_table (
390+
user_id STRING,
391+
order_amount DOUBLE
392+
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
393+
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
394+
'sink.partition-commit.trigger'='partition-time',
395+
'sink.partition-commit.delay'='1 h',
396+
'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- Assume user configured time zone is 'Asia/Shanghai'
397+
'sink.partition-commit.policy.kind'='metastore,success-file'
398+
);
399+
400+
SET table.sql-dialect=default;
401+
CREATE TABLE kafka_table (
402+
user_id STRING,
403+
order_amount DOUBLE,
404+
ts BIGINT, -- time in epoch milliseconds
405+
ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
406+
WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP_LTZ column
407+
) WITH (...);
408+
409+
-- streaming sql, insert into hive table
410+
INSERT INTO TABLE hive_table
411+
SELECT user_id, order_amount, DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'), DATE_FORMAT(ts_ltz, 'HH')
412+
FROM kafka_table;
413+
414+
-- batch sql, select with partition pruning
415+
SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';
416+
417+
```
385418

386419
By default, for streaming writes, Flink only supports renaming committers, meaning the S3 filesystem
387420
cannot support exactly-once streaming writes.

docs/content/docs/connectors/table/filesystem.md

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,12 @@ To define when to commit a partition, providing partition commit trigger:
225225
<td>Duration</td>
226226
<td>The partition will not commit until the delay time. If it is a daily partition, should be '1 d', if it is a hourly partition, should be '1 h'.</td>
227227
</tr>
228+
<tr>
229+
<td><h5>sink.partition-commit.watermark-time-zone</h5></td>
230+
<td style="word-wrap: break-word;">UTC</td>
231+
<td>String</td>
232+
<td>The time zone to parse the long watermark value to TIMESTAMP value, the parsed watermark timestamp is used to compare with partition time to decide the partition should commit or not. This option is only take effect when `sink.partition-commit.trigger` is set to 'partition-time'. If this option is not configured correctly, e.g. source rowtime is defined on TIMESTAMP_LTZ column, but this config is not configured, then users may see the partition committed after a few hours. The default value is 'UTC', which means the watermark is defined on TIMESTAMP column or not defined. If the watermark is defined on TIMESTAMP_LTZ column, the time zone of watermark is the session time zone. The option value is either a full name such as 'America/Los_Angeles', or a custom timezone id such as 'GMT-8:00'.</td>
233+
</tr>
228234
</tbody>
229235
</table>
230236

@@ -401,7 +407,7 @@ The parallelism of writing files into external file system (including Hive) can
401407

402408
## Full Example
403409

404-
The below shows how the file system connector can be used to write a streaming query to write data from Kafka into a file system and runs a batch query to read that data back out.
410+
The below examples show how the file system connector can be used to write a streaming query to write data from Kafka into a file system and runs a batch query to read that data back out.
405411

406412
```sql
407413

@@ -438,4 +444,44 @@ FROM kafka_table;
438444
SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';
439445
```
440446

447+
If the watermark is defined on TIMESTAMP_LTZ column and used `partition-time` to commit, the `sink.partition-commit.watermark-time-zone` is required to set to the session time zone, otherwise the partition committed may happen after a few hours.
448+
```sql
449+
450+
CREATE TABLE kafka_table (
451+
user_id STRING,
452+
order_amount DOUBLE,
453+
ts BIGINT, -- time in epoch milliseconds
454+
ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
455+
WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP_LTZ column
456+
) WITH (...);
457+
458+
CREATE TABLE fs_table (
459+
user_id STRING,
460+
order_amount DOUBLE,
461+
dt STRING,
462+
`hour` STRING
463+
) PARTITIONED BY (dt, `hour`) WITH (
464+
'connector'='filesystem',
465+
'path'='...',
466+
'format'='parquet',
467+
'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
468+
'sink.partition-commit.delay'='1 h',
469+
'sink.partition-commit.trigger'='partition-time',
470+
'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- Assume user configured time zone is 'Asia/Shanghai'
471+
'sink.partition-commit.policy.kind'='success-file'
472+
);
473+
474+
-- streaming sql, insert into file system table
475+
INSERT INTO fs_table
476+
SELECT
477+
user_id,
478+
order_amount,
479+
DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'),
480+
DATE_FORMAT(ts_ltz, 'HH')
481+
FROM kafka_table;
482+
483+
-- batch sql, select with partition pruning
484+
SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';
485+
```
486+
441487
{{< top >}}

docs/content/docs/connectors/table/hive/hive_read_write.md

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1') SELECT 'Tom
346346
visible - incrementally. Users control when/how to trigger commits with several properties. Insert
347347
overwrite is not supported for streaming write.
348348

349-
The below shows how the streaming sink can be used to write a streaming query to write data from Kafka into a Hive table with partition-commit,
349+
The below examples show how the streaming sink can be used to write a streaming query to write data from Kafka into a Hive table with partition-commit,
350350
and runs a batch query to read that data back out.
351351

352352
Please see the [streaming sink]({{< ref "docs/connectors/table/filesystem" >}}#streaming-sink) for a full list of available configurations.
@@ -369,7 +369,7 @@ CREATE TABLE kafka_table (
369369
user_id STRING,
370370
order_amount DOUBLE,
371371
log_ts TIMESTAMP(3),
372-
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
372+
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP column
373373
) WITH (...);
374374

375375
-- streaming sql, insert into hive table
@@ -382,6 +382,39 @@ SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';
382382

383383
```
384384

385+
If the watermark is defined on TIMESTAMP_LTZ column and used `partition-time` to commit, the `sink.partition-commit.watermark-time-zone` is required to set to the session time zone, otherwise the partition committed may happen after a few hours.
386+
```sql
387+
388+
SET table.sql-dialect=hive;
389+
CREATE TABLE hive_table (
390+
user_id STRING,
391+
order_amount DOUBLE
392+
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
393+
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
394+
'sink.partition-commit.trigger'='partition-time',
395+
'sink.partition-commit.delay'='1 h',
396+
'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- Assume user configured time zone is 'Asia/Shanghai'
397+
'sink.partition-commit.policy.kind'='metastore,success-file'
398+
);
399+
400+
SET table.sql-dialect=default;
401+
CREATE TABLE kafka_table (
402+
user_id STRING,
403+
order_amount DOUBLE,
404+
ts BIGINT, -- time in epoch milliseconds
405+
ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
406+
WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP_LTZ column
407+
) WITH (...);
408+
409+
-- streaming sql, insert into hive table
410+
INSERT INTO TABLE hive_table
411+
SELECT user_id, order_amount, DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'), DATE_FORMAT(ts_ltz, 'HH')
412+
FROM kafka_table;
413+
414+
-- batch sql, select with partition pruning
415+
SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';
416+
417+
```
385418

386419
By default, for streaming writes, Flink only supports renaming committers, meaning the S3 filesystem
387420
cannot support exactly-once streaming writes.

0 commit comments

Comments
 (0)