Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Custom partitioner for bucket partitions #7161

Merged
merged 24 commits into from
Aug 10, 2023

Conversation

kengtin
Copy link
Contributor

@kengtin kengtin commented Mar 21, 2023

References

Description

  • This partitioner will redirect elements to writers deterministically so that each writer only targets 1 bucket.
  • If the number of writers > number of buckets each partitioner will keep a state of multiple writers per bucket as evenly as possible, and will round-robin the requests across them.
  • This is enabled by default in FlinkSink Hash mode when a bucket is detected in a PartitionSpec
  • A new BucketPartitionKeySelector was written, identical to the PartitionKeySelector but it extracts and returns an Integer BucketId as they Key
  • Extra: added a HadoopCatalogExtention, a ported implementation of the HadoopCatalogResource, for Junit5

image

image

@kengtin kengtin changed the title Custom Bucket Partitioning Flink: Custom partitioner for a more even data distribution when bucket partitions are present Mar 23, 2023
@github-actions github-actions bot removed the API label Apr 5, 2023
- Factoring out the new partitioning utilities into BucketPartitionerUtils.java class
- Relying on the PartitionSpecVisitor pattern/approach to get Bucket information (no more regex based extraction)
- Parameterized the test and adding support to evaluate vs. schemas with different Bucket scenarios
schongloo added 8 commits April 23, 2023 11:34
- Logic simplification of the TestBucketPartitionerFlinkIcebergSink
- Migration to Junit5
- Clarifying the BucketPartitioner logic via Javadoc and better variable names.
- Clarifying the BucketPartitioner logic via Javadoc and better variable names.
- Cleaning up the test and general comments/javadoc
- Migrating the sink unit test to Junit5
…talogExtension.java, now handled internally.
… TestBucketPartitioner and TestBucketPartitionKeySelector.

- Refactored and simplified the TestBucketPartitionerUtils.
@kengtin kengtin requested a review from stevenzwu April 27, 2023 18:01
@stevenzwu stevenzwu changed the title Flink: Custom partitioner for a more even data distribution when bucket partitions are present Flink: Custom partitioner fro bucket partitions May 8, 2023
@stevenzwu stevenzwu changed the title Flink: Custom partitioner fro bucket partitions Flink: Custom partitioner for bucket partitions May 8, 2023
if (rowDataWrapper == null) {
rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
}
return rowDataWrapper;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: iceberg style adds an empty line after the end of a control block }

@stevenzwu stevenzwu merged commit 01ab0d9 into apache:master Aug 10, 2023
@stevenzwu
Copy link
Contributor

thanks @kengtin for the contribution

@advancedxy
Copy link
Contributor

Hi @stevenzwu @kengtin this PR restricts that only one bucket transform in the partition spec. Do you think it's beneficial to support multiple bucket transforms or even the mutli-arg bucket transform(#8259) since it's common to have multiple keys to form a primary key?

@kengtin
Copy link
Contributor Author

kengtin commented Aug 23, 2023

Hi @advancedxy, thanks for bringing it up. If I recall correctly @stevenzwu and I discussed that point during the early stages and at least back then we determined it would be much simpler. However I see your point about not restricting the user's design to 1 bucket only. Perhaps we could allow multi-bucket/key definitions but I'd still restrict the routing/distribution to be based only on the first bucket (or something like that) to not overcomplicate it. Thoughts?

@advancedxy
Copy link
Contributor

Perhaps we could allow multi-bucket/key definitions but I'd still restrict the routing/distribution to be based only on the first bucket (or something like that) to not overcomplicate it. Thoughts?

Ah, yeah. it's definitely much simpler to routing/distribution by one bucket partition(first one or the one has the largest number of bucket, etc., a.k.a the most significant bucket transform). When multi-arg bucket transform is not supported, perhaps we may allow multiple bucket transforms in the table but choose only one bucket transform. Once multi-arg bucket transform is supported, the multi-arg bucket transform should probably to be selected first.

@kengtin
Copy link
Contributor Author

kengtin commented Aug 23, 2023

@advancedxy I promise to get up to speed with the multi-arg bucket transform (not yet familiar) but your suggestion makes sense. I'll also let @stevenzwu weigh in.

@stevenzwu
Copy link
Contributor

@advancedxy thanks for a lot for raising the multi-dimensional bucketing. if there is enough use cases, we should be able to adapt the implementation to support it under the assumption that data are evenly distributed. Let's use an example with 2 bucketing columns (m and n). The assumption would be data are evenly distributed in the m x n combinational bucket tuples.

@chenwyi2
Copy link

Hi @stevenzwu @kengtin this PR can be create too many small files when parition with dt,hout,minute and bucekt(id), suppose paralisim is 120 and bucke number is 8, then 15 writes can write into same one bucket, but there is problem, data from the previous few hours can be into one commit because of data latency, there can be 15000 and more data files if changed partition is up to 1000, can we use complete parition name instead of just bucket?

@stevenzwu
Copy link
Contributor

@chenwyi2 Is your point that we shouldn't only consider bucketing column (like did in this PR). you just want a plain keyBy in this case? that would be a fair point. Do you get balanced traffic distribution among write tasks with simple keyBy?

I am also wondering if the partition spec of dt,hour,minute and bucekt(id) is the best option. especially the minute column as partition. do you really need minute level partition granularity. you are creating very fine grained partitions. even with the most optimal data distribution/shuffle. there are still a lot of partitions and data files.

you used 8 for bucket number. it seems quite small for bucketing. what's the reason of using 8 buckets?

@chenwyi2
Copy link

yes, I am creating very fine grained partitions, because i want to query and comput some business metrics between minutes ss fast as possible. As for bucket number, i use a fomula QPS * 500B/条 / bucket number / 1024 /1024 = 10M/秒(The write-in traffic of one bucket), because i found that the write-in traffic of one bucket is large, the writer can be OOM or backpressure, i set 10M/秒 in each bucket.
In this pr hdfs can be influenced by many files, writting to files can be slow then data latency will be relatively large.

@stevenzwu
Copy link
Contributor

stevenzwu commented Oct 16, 2023

is the partition time an event time or ingestion/processing time? or asking in a different way, how many active minutes do the Flink writer job process for every commit cycle?

I feel this ongoing work might work better for you
https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo
https://www.youtube.com/watch?v=GJplmOO7ULA

Using bucketing column seems a temp solution to handle (1) skewed data distribution across hours and minutes (2) skewed partition distribution from simple keyBy hash distribution.

On the other hand, I also recognize this change imposed a behavior change that doesn't work for your use case. We can revert the distribution mode change in the FlinkSink class. To enable the custom bucketing partitioner, users will have to manually apply the custom keyBy.

input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType))

@kengtin can you create a PR for reverting the FlinkSink change?

@chenwyi2
Copy link

In normal conditition, only the data of current minute will be written. However, if the data is delayed, for example, at 11:50, the data has not been written until 11:55, then at 11:56 will commit data composed by 11:51,11:52,...11:56, in this situation some small fies can be created

@bendevera
Copy link

@stevenzwu I see defaulting to BucketPartitioner was reverted here: #8848

We've found performance issues with DistributionMode.HASH, and wanted to test BucketPartitioner but it isn't public.

You mentioned above that to use the bucketing partitioner, users will need to manually apply keyBy. Believe the code snippet though is the default HASH logic. What is the recommended method for using the bucket partitioner and any issues reported by users that resulted in reverting?

@stevenzwu
Copy link
Contributor

stevenzwu commented Dec 18, 2023

It is reverted because there are users depending on the previous behavior of keyBy all partition columns. #7161 (comment)

We were assuming that if there is a bucket column, users only want to shuffle by the bucketing column. that is not the case from the user report linked in the above comment. so we decided to roll back for backward compatibility.

@bendevera you are right that BucketPartitioner isn't public and can't be used at the moment. Now we need to discuss what's the best way moving forward? we are working on a more comprehensive smart shuffling (range partition) feature: https://github.com/apache/iceberg/projects/27. I am thinking maybe we can expose this in range distribution mode.

before that, you may have to copy the code and manually apply the bucketing shuffling.

input.partitionCustom(
                    new BucketPartitioner(partitionSpec),
                    new BucketPartitionKeySelector(partitionSpec, iSchema, flinkRowType));

Note that bucketing partitioning is the simplest form of range partition / smart shuffling we want to achieve, as we can assume each bucket has the same weight. we don't need to dynamically calculate traffic distribution stats for skewed column (like event time hour or country code or event type etc.)

@bendevera
Copy link

@stevenzwu thank you for the quick response!

Okay, will run some BucketPartitioner tests for our use case by copying code manually. Smart shuffling sounds interesting and would certainly test out. A lot of the use cases we deal with fit well with BucketPartitioner conceptually and so can test to verify. Current DistributionMode.HASH implementation is too slow computing the data file path for each record and we've noticed processing rates take a huge hit when enabling. Glad to see features being extended! Will read up more regarding the smart shuffling design and see if we can get involved

@stevenzwu
Copy link
Contributor

@binshuohu
Copy link

@stevenzwu Is there any plan to reapply this change to the main branch? Has there been any follow up since #8848 ?

@stevenzwu
Copy link
Contributor

@binshuohu Currently, there is no plan to reapply this change to the main branch. We have a more general range distribution available now (guided by statistics collection): https://iceberg.apache.org/docs/nightly/flink-writes/#range-distribution-experimental. It is more general than this (bucketing only). Range distribution also handle different parallelisms and partitions well.

Range distribution has one disadvantage. It performs statistics collection and aggregation to guide the range split. That adds a little overhead. Bucketing partitioner here assumes traffic are evenly distributed across buckets, which should be true (hash % nBuckets).

cc @pvary

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

Successfully merging this pull request may close these issues.

6 participants