Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFC: Add shuffle for iceberg sink #77
base: main
Are you sure you want to change the base?
RFC: Add shuffle for iceberg sink #77
Changes from all commits
a7c4aa2
8a594f6
1791508
532d50c
File filter
Filter by extension
Conversations
Jump to
There are no files selected for viewing
Shuffle according to iceberg's partition spec
Motivation
Apache iceberg allows users to define partition spec[1] for a table. The partition spec defines how data is partitioned and stored in the table. For example, a table can be partitioned by date and hour. Also, it's required that each data file in apache iceberg can contain only one partition value. This means that if the table is partitioned by bucket, we will have a lot of small files in the table if we don't do any shuffle at all. Let's use following table as an example:
If we don't do any shuffle, the writing process will be like following:
By shuffling values with same partition value to same sink, we can reduce the number of data files in the table:
Design
There are several cases to think about when doing shuffle:
Here we will use several examples to explain different cases.
Case 1: Append only table with range only partition
Following iceberg table definition has only range partition:
And we want to sink following query to iceberg:
In this case we don't need to do any shuffle, since it's range only partition. Otherwise it all traffic will go to same actor.
Case 2: Append only table with not range only partition
Following iceberg table definition has only range partition:
And we want to sink following query to iceberg:
In this case we need to shuffle by
(bucket(id), years(ts))
so that rows with same partition value will go to same actor.Case 3: Upsert table with range only partition
Following iceberg table definition has only range partition:
And we want to sink following query to iceberg:
In this case we need to shuffle by
id
to avoid all traffic go to same actor. Since in most cases upstream is already shuffled byid
, we can avoid another shuffle.Case 4: Upsert table with not range only partition
Following iceberg table definition has only range partition:
And we want to sink following query to iceberg:
In this case we need to shuffle by
(bucket(data), years(ts))
so that rows with same will go to same actor. Notice that in iceberg we don't need to shuffle bystream_key
, e.g.id
since it only requires that insert/delete of same row will go to same partition.Implementation
There are two possible implementations for shuffling by iceberg partition columns:
IcebergPartitionExecutor
, which calculates the partition value of each record and adds it to the record, then asking the dispatcher executor to do hash shuffle according to the partition value. The plan is like following:The
IcebergPartitionExecutor
will be aStreamExecutor
, which calculates the partition value of each record and adds it to the record. TheDispatcherExecutor
doesn't need to change much, and do hash shuffle according to the partition value.In this approach we need to add an extra
IcebergDispatcher
to dispatcher executor. TheIcebergDispatcher
will calculate the partition value of each record and do hash shuffle according to the partition value.I prefer approach 1 since it's more extensible and does not change too much current shuffle implementation, e.g. other lakehouse sinks (delta lake) could have similar approach.
References