Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: add more sink shuffling support #6303

Open
stevenzwu opened this issue Nov 28, 2022 · 5 comments
Open

Flink: add more sink shuffling support #6303

stevenzwu opened this issue Nov 28, 2022 · 5 comments
Assignees

Comments

@stevenzwu
Copy link
Contributor

Feature Request / Improvement

Today, Flink Iceberg sink only supports simple keyBy hash distribution on partition columns. In practice, keyBy shuffle on partition values doesn't work very well.

We can make the following shuffling enhancements in Flink streaming writer. More details can be found in the design doc. This is an uber issue for tracking purpose. Here are the rough phases.

  1. [hash distribution] custom partitioner on bucket values. PR 4228 demonstrated that keyBy on low-cardinality partitioning buckets resulted in skewed traffic distribution. Flink sink can add a custom partitioner that directly map the bucket value (integer) to the downstream writer tasks (integer) in round-robin fashion (mod). This is a relatively simple case.

This is a case when write.distribution-mode=hash and there is a bucketing partition column. Other partition columns (like hourly partition) will be ignored regarding shuffling. The assumption is that bucketing column is where we want to distribute/cluster the rows.

  1. [hash distribution] bin packing based on traffic distribution statistics. This works well for skewed data on partition columns (like event time). This requires calculating traffic distribution statistics across partition columns and use the statistics to guide shuffling decision.

This is a case when write.distribution-mode=hash and there is NO bucketing partition column.

  1. [range distribution] range partition based on traffic distribution statistics. It is a variant of 2 above. This works well for "sorting" non-partition columns (e.g. country code, event type). It can improve data clustering by creating data files with narrow value ranges. Note that Flink streaming writer probably won't sort rows within a file, as that would be very expensive (not impossible). Even without rows sorted within a file, the improved data clustering can result in effective file pruning. We just can't get the additional benefits of row group level skipping (for Parquet) with rows sorted within a file.

This is a case when write.distribution-mode=range and SortOrder is defined for non-partition columns. partition columns will be ignored for range shuffling as the assumption is that non-partition sort columns are what matter here.

  1. [high cardinality columns] 2 and 3 above are mostly for low-cardinality columns (e.g. unique values in hundreds), where a simple dictionary of count per value can be used to track traffic distribution statistics. For high-cardinality column (like device or user id), we would need to use probabilistic data sketches algorithm to calculate traffic distribution.

Query engine

Flink

@stevenzwu
Copy link
Contributor Author

Created a new project as this is a relatively large scope overall: https://github.com/apache/iceberg/projects/27

@hililiwei
Copy link
Contributor

Great design! I think we can continue adding new issues so that guys can choose the tasks they want to work on.

Copy link

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

@github-actions github-actions bot added the stale label Aug 24, 2024
@bendevera
Copy link

@stevenzwu was wondering the status of this project. We have faced issues with the performance of the default HASH distribution mode. This project looked promising and saw that some progress has been made with various related tasks.

@stevenzwu
Copy link
Contributor Author

@bendevera range distribution has been added to the main branch and will be part of the next 1.7 release. you can also see the doc here: https://iceberg.apache.org/docs/nightly/flink-writes/#range-distribution-experimental

@github-actions github-actions bot removed the stale label Sep 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants