Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Write watermark to the snapshot summary #6253

Closed
wants to merge 1 commit into from

Conversation

hililiwei
Copy link
Contributor

In some scenarios, the task needs to determine that all data of a certain period has been written based on the watermark.

The PR writes the watermark of the task to the snapshot summary, like the flink job id.

@github-actions github-actions bot added the flink label Nov 23, 2022
@Fokko Fokko requested a review from stevenzwu November 23, 2022 09:52
@rdblue
Copy link
Contributor

rdblue commented Nov 26, 2022

@hililiwei, I really like the idea of having watermarks written. Can you explain in more detail what you're doing here and where the watermark comes from?

@hililiwei
Copy link
Contributor Author

hililiwei commented Nov 28, 2022

hi @rdblue, thank you so much for your feedback.
Before I answer your question, I'd like to say something else around this.

Actually, I want to solve a problem that is common in streaming scenarios: how does the downstream application know that all of the specified period data has been written if the application is not reading incremental data in real time, but is microbatching? Watermarking is an easy solution to think of. When downstream applications get the watermark, they can know whether the event time or process time of the data has reached a critical value. But it's not enough.

If our community accepts this PR solution, I would like to do one more thing, which is to support time-based partition commit. In some scenarios, when a new partition is written, it is usually necessary to notify the downstream application. For example, When all the data for this partition is written, commit this partition to iceberg, just as flink does for hive\filesystem. I once participated in the development of this part of Flink, and I hope to introduce it to iceberg sink. Because of iceberg's snapshot management feature, we may be able to do better than hive\filesystem.

The current iceberg flink sink can only commit based on checkpoint. When the time-based commit is complete, it will provides a partition commit feature that allows configuring custom policies. Commit actions are based on a combination of triggers and policies.

Back to your question.

  1. In PR, the commiter caches the watemark of the current data stream and writes it to the summary when the snapshot is committed. Strictly speaking, it doesn't represent the watermark of the iceberg table, if other applications are writing in at the same time, it just represents the watermark of the application it's in. If there's only one application, and we can think of it as, as shown in the second figure.

image

  1. Where does the water level come from?
    Flink provides an interface method for us to catch it. Its value depends on the low-watermark of upstream data.

image

@stevenzwu
Copy link
Contributor

stevenzwu commented Nov 28, 2022

there is a previous attempt on writing the watermark (data completeness) to the Iceberg table: https://github.com/apache/iceberg/pull/2109/files.

Let me summarize the questions/points. Would be great to gather some inputs. @rdblue @pvary @dixingxing0 @zhangjun0x01 @liubo1022126

  1. should we write the Flink watermark or use the min value of some timestamp column (typically associated with the time partitioned table)? it is easier to just write Flink watermark. but it might be more meaningful to use the timestamp column value as it is used for table partition. Is Flink watermark always based on the same timestamp column used for Iceberg table partition?
  2. should we write the metadata as snapshot summary or table properties? It is minimal change to write as snapshot summary as shown in this PR or PR Flink: store watermark as iceberg table's property #2109. It is a little bigger change (like using transaction) to write as a table property, but it will be easier for consumer to extract the info.
  3. we need to be able to support multiple Flink jobs (e.g. from multiple regions) writing to the same Iceberg table.
  4. it might be good to add an extra config to pull back the watermark by some threshold (e.g. -30 minutes) to give some extra buffer for indicating data completeness.

@stevenzwu
Copy link
Contributor

If our community accepts this PR solution, I would like to do one more thing, which is to support time-based partition commit. In some scenarios, when a new partition is written, it is usually necessary to notify the downstream application. For example, When all the data for this partition is written, commit this partition to iceberg, just as flink does for hive\filesystem. I once participated in the development of this part of Flink, and I hope to introduce it to iceberg sink. Because of iceberg's snapshot management feature, we may be able to do better than hive\filesystem.

The current iceberg flink sink can only commit based on checkpoint. When the time-based commit is complete, it will provides a partition commit feature that allows configuring custom policies. Commit actions are based on a combination of triggers and policies.

@hililiwei time-based partition commit seems quite complicated. trying to understand its value. With watermark info to mark the data completeness, downstream can decide which partition (hourly or daily) has the complete data and it is ok to trigger the processing of the completed hour or day.

@hililiwei
Copy link
Contributor Author

@hililiwei time-based partition commit seems quite complicated. trying to understand its value. With watermark info to mark the data completeness, downstream can decide which partition (hourly or daily) has the complete data and it is ok to trigger the processing of the completed hour or day.

Hi @stevenzwu

  1. If time-based commit is supported, as long as the partition is visible, its data is ready. downstream applications can directly use incremental reads instead of round robin.
  2. In some scenarios, when the partition data is not completely written, we hope that downstream applications cannot see it. Watermarks alone are not friendly enough.

@stevenzwu
Copy link
Contributor

stevenzwu commented Nov 29, 2022

downstream applications can directly use incremental reads instead of round robin.

What do you mean round robin?

as long as the partition is visible, its data is ready.

How could downstream consumers know when a new partition show up?

Watermarks alone are not friendly enough.

can you elaborate the differences btw watermark and new complete partition for downstream consumers?

@hililiwei
Copy link
Contributor Author

Hi @stevenzwu

What do you mean round robin?

I mean, downstream applications need to get the watermark of the table at intervals to determine whether to start processing. For example, get the latest watermark every five minutes.

How could downstream consumers know when a new partition show up?

The writer determines whether to commit new partitions to the table based on a combination of triggers and policies. Once a new partition is committed, it means that the writer considers the new partition's data ready. For details:
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/filesystem/#partition-commit.

Downstream applications read tables using incremental read scan. When a new snapshot (including a new partition and data) is commited by the upstream, the downstream can get it.

can you elaborate the differences btw watermark and new complete partition for downstream consumers?

Take a scenario in our current production as an example. When my table is partitioned by hour, if the data in [12:00,13:00) is not completely written, I do not want consumers to see it when querying the table. The query action may be an ad hoc query or a statistics task. I want to commit the snapshot after all the data in [12:00, 13:00) is written instead of commting the snapshot at each checkpoint.

Thx
Liwei

@hililiwei
Copy link
Contributor Author

hililiwei commented Dec 24, 2022

should we write the metadata as snapshot summary or table properties? It is minimal change to write as snapshot summary as shown in this PR or PR #2109. It is a little bigger change (like using transaction) to write as a table property, but it will be easier for consumer to extract the info.

I think it represents different meanings. I include it in the summary because it does not represent the water mark for the table, but simply represents the current flink task that generated the snapshot.

@huyuanfeng2018
Copy link
Contributor

huyuanfeng2018 commented Sep 6, 2023

in our practice,If the flink task turns on unaligned checkpoints, it may cause the watermark to advance too much.
@hililiwei

Copy link

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Aug 24, 2024
Copy link

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Sep 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants