Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5095] Flink: Stores a special watermark(flag) to identify the current progress of writing data #8753

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

SteNicholas
Copy link
Member

Change Logs

AbstractStreamWriteFunction could process watermark from StreamWriteOperator and send WriteMetadataEvent with the watermark of write function to StreamWriteOperatorCoordinator. StreamWriteOperatorCoordinator stores the min watermark of WriteMetadataEvents from subtasks into the extra metadata of HoodieCommitMetadata. Meanwhile, StreamWriteOperatorCoordinator advances the min watermark for subtasks in which no data has written, and does not advance watermark for commit on empty batch.

Impact

StreamWriteOperatorCoordinator stores the min watermark of write function into the extra metadata of HoodieCommitMetadata to identify the current progress of writing.

Risk level (write none, low medium or high below)

None.

Documentation Update

None.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@danny0405
Copy link
Contributor

Can you elaborate a little more what is the end-2-end user story here?

@danny0405 danny0405 self-assigned this May 18, 2023
@danny0405 danny0405 added flink Issues related to flink flink-sql labels May 18, 2023
@SteNicholas
Copy link
Member Author

SteNicholas commented May 18, 2023

@danny0405, in the data warehouse incremental scenario, whether partition completes to write is depended on the progress of writing, which is used to commit partition for trigger downstream task to read the committed partition data. The watermark could identify the current progress of writing data and infer the committed partition.
In BiliBili, incremental ETL uses the watermark to identify the partition committed and triggers the downstream ETL tasks including streaming and batch tasks to execute that could only query the committed partition data. Meanwhile, the watermark is also used to identify the current writing progress to provide the streaming view with the progress.

@danny0405
Copy link
Contributor

The watermark could identify the current progress of writing data and infer the committed partition

Can you sketch the inference details a little more?

@SteNicholas
Copy link
Member Author

SteNicholas commented May 18, 2023

@danny0405, StreamWriteOperatorCoordinator commits partitions when doing commits in BiliBili, which partitions that need to commit or create is inferred from the watermark. The inference of watermark is as follows:

TreeMap<Long, String> writePartitionCollector = getWritePartitions(writeResults, watermark);
Map.Entry<Long, String> minPartition = writePartitionCollector.firstEntry();
Map.Entry<Long, String> maxPartition = writePartitionCollector.lastEntry();
Long commitStartTime = minPartition.getKey();
Long commitEndTime = maxPartition.getKey()
Long commitEndTime = watermarkToCurrentPartitionTime(convertReadyCommitTime(commitEndTime, watermark, partitionInterval));
List<String> needCommitPartitions = getNeedCommitList(commitStartTime, commitEndTime, partitionInterval);

public List<List<String>> getNeedCommitList(long commitStartTime, long commitEndTime, long partitionInterval) {
       List<List<String>> needCommitPartitions = new LinkedList<>();
       long commitCurrentTime = commitStartTime;
       while (commitCurrentTime <= commitEndTime) {
             List<String> partitionValues = convertToPartitionValues(commitCurrentTime, type);
             needCommitPartitions.add(partitionValues);
             commitCurrentTime = commitCurrentTime + partitionInterval;
        }
        return needCommitPartitions;
}

BTW, the main purpose has also mentioned in #7099. The difference is the implementation that current implementation follows the watermark mechanism of Flink.

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@SteNicholas
Copy link
Member Author

SteNicholas commented May 24, 2023

@danny0405, could you help to review this implementation?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
flink Issues related to flink flink-sql size:L PR with lines of changes in (300, 1000]
Projects
Status: 🆕 New
Development

Successfully merging this pull request may close these issues.

3 participants