Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement]: Clarify the concept of watermark and optimize its implementation #1805

Closed
3 tasks done
zhoujinsong opened this issue Aug 7, 2023 · 4 comments
Closed
3 tasks done

Comments

@zhoujinsong
Copy link
Contributor

Search before asking

  • I have searched in the issues and found no similar issues.

What would you like to be improved?

We have implemented calculating and showing the watermark for Mixed format tables.

As #944 discusses, the current implementation has some issues that should be fixed.

In addition, as we support more table formats, we should consider better ways to support the watermark on other format tables.

How should we improve?

  • Clarify the concept of table watermark to support it on all table formats.
  • Improve the implementation of table watermark on Mixed formats.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Subtasks

No response

Code of Conduct

@zhoujinsong
Copy link
Contributor Author

zhoujinsong commented Aug 7, 2023

Before starting the improvement, I would like to collect some information about table watermark.

What is table watermark?

A watermark is a time-based property on a table that indicates the freshness of the table, indicating that data older than this time has already been written to the table.

What is the usage of table watermark?

  • As a metric on the table, it enables users to perceive the freshness of the data on the table.
  • In a batch workflow, it serves as the basis for determining whether downstream tasks should be triggered for computation.
  • Foundational concepts for other features, such as automatically generating tags based on watermark.

How to generate table watermark?

The watermark changes as data is written, so it is usually generated and recorded in the table ingestion job. It is generally only generated in streaming ingestion jobs as It is difficult to ensure the semantic meaning of watermark in batch write jobs.

Here are some implementation methods of different table formats:

What should we improve?

  1. As watermark is a core property of data lake tables, we should provide a unified management method for them.
  • A watermark should be bound to a table's transaction (snapshot), and each transaction of the table should have a corresponding watermark.
  • Support showing current and different transactions' watermarks in Dashboard and SQL command lines.
  1. Open the computation method of watermark for different table formats.
  • Be compatible with existing watermark computation rules for different formats like Paimon.
  • Provide a watermark generation solution for other formats like Iceberg.
  • Improve the watermark calculation method for mixed formats.

@majin1102
Copy link
Contributor

majin1102 commented Aug 8, 2023

It seems that the Iceberg community has discussed watermark quite further. I made some conclusions:

  1. Watermark of a table(Iceberg or others) in many cases should be a property or metric based on a timestamp column of this very table, which is defined as event-time column in Mixed Iceberg format. instead of reported from an upstream streaming job. Two scenarios should be considered:

    1. For a Flink job with watermark writing to a table, the watermark could reveal the real event time of the sink table, that's how Paimon watermark works, but this watermark could somehow be conflicted with the event time defined by sink table.
    2. For a Table defined with event time(means watermark is necessary), there could not be any streaming job writing to the table, or it's not guaranteed that streaming jobs will always define a watermark.
    3. Given the two points above, we may need to support both defining event time on a table and using watermark values from streaming jobs. If user explicitly defines event time on a table, the watermark is calculated based on the data distribution of that column during committing. If event time is not defined, the watermark values from streaming jobs could be used. If neither is defined, the commit time can be considered.
  2. There could be multiple streaming jobs writing to a table, the watermark must align all streaming writers, which means that calculating the watermark in committing phase is not a feasible solution. Amoro could calculate watermark values in refreshing phase and define an IDLE timeout like Flink dealing with multiple datastreams.

@zhoujinsong zhoujinsong mentioned this issue Dec 29, 2023
66 tasks
Copy link

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

@github-actions github-actions bot added the stale label Aug 21, 2024
Copy link

github-actions bot commented Sep 4, 2024

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Sep 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants