Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The serialization problem caused by Flink shuffling design #7393

Closed
huyuanfeng2018 opened this issue Apr 21, 2023 · 13 comments
Closed

The serialization problem caused by Flink shuffling design #7393

huyuanfeng2018 opened this issue Apr 21, 2023 · 13 comments
Labels

Comments

@huyuanfeng2018
Copy link
Contributor

Feature Request / Improvement

this issue from #6303,I open a new issue to discuss this issue

I am very interested in the project. At present, we have a serious tilt problem in the process of using iceberg to write. I have been paying attention to the progress of this module. Now I want to put forward some of my ideas.

I took a close look at #6382 and #7269

I think there are some problems. I completed the following simple implementation based on these two PRs on my own branch, but the throughput of the program has dropped significantly, almost reaching the point of being unusable, so I think, should we Stop and think about whether this solution is suitable

From my observation, the problem lies in the DataStatisticsOperator. When output.collect is called here, Flink’s serialization will be forced to be triggered, but DataStatisticsOrRecord will degenerate into kryo mode during serialization, resulting in a performance drop of more than 4 times
image
Spent too many computing resources in serialization, So I think we may need to seriously consider the feasibility of this Proposal

@stevenzwu @hililiwei

Query engine

Flink

@hililiwei
Copy link
Contributor

@huyuanfeng2018 Thank you for bringing this up. I don't have any more data to share at the moment. I'll take a quick test.

@stevenzwu @yegangy0718 seems to have more detailed data, what do you guys think about this?

@stevenzwu
Copy link
Contributor

@huyuanfeng2018 thx for the experiment. DataStatisticsOrRecord is the only way to pass statistics to the custom partitioner. Agree with you that Kryo serialization will be slower. we will need to provide a type serializer for the type, which we had in our internal PoC impl and testing.

resulting in a performance drop of more than 4 times

Can you elaborate on the observation of 4x slowdown? what are the A/B test setup?

In the benchmark with the internal PoC impl, we observed 60% more CPU overhead for a simple job reading from Kafka and writing to Iceberg with event time partitioned table. As expected, bulk of the overhead comes from serdes and network I/O.

cc @yegangy0718

@yegangy0718
Copy link
Contributor

yegangy0718 commented Apr 24, 2023

Hi @huyuanfeng2018 Thanks for showing interest in the project.

We do have plan to add custom serializer for DataStatisticsOrRecord as @stevenzwu commented at #7269 (comment).

We have done perf test with the internal PoC impl. The result was published at https://www.slideshare.net/FlinkForward/tame-the-small-files-problem-and-optimize-data-layout-for-streaming-ingestion-to-iceberg from slide 44 to the end. We observed the CPU usage increased from 35% to 57% for the streaming job(consumes from Kafka and writes to Iceberg) after applying shuffling. It's expected since we trade more CPU usage for better file size and data clustering.

We may need more information for the test cases you run like the Flink DAGs structure, the data distribution, and so on to analyze the perf impact that happens to you.

@huyuanfeng2018
Copy link
Contributor Author

HI, @stevenzwu @stevenzwu @hililiwei
Thank you for your reply!
My scenario is that the server logs are written to iceberg in real time, and the peak period of real-time data volume is about 1.0M/s,
image
At present, according to the day, hour, and an enumeration partition field, we have about 70 enumeration partitions, of which two enumerations account for more than 70% of the total, so the current iceberg write mode certainly cannot meet our requirements. Requirements, currently we have 200 parallel writes online, shuffling by defining the ratio of the amount of data under each enumeration to the total amount of data by ourselves, specifying the ratio like this

'distribution-balance-column-ratio' = 'sysdk_android:0.0005,_wap:0.0003,android_tv:0.003.......'

However, the proportion of each enumeration will change in certain time periods, so there will still be a tilt in certain time periods, resulting in a backlog of my tasks.

So I tried to achieve automatic balancing, but under the same cluster configuration, my processing efficiency was 4 times slower, about 200~300k/s, among which I have put the flame graph on it, and most of the processing is I am doing the serialization operation of the statistics record. I think if you re-implement the serialization interface of the record, can you give me a sample and I can test it in my scenario to see how much improvement there is. In addition, if necessary, I will I can help as much as I can

@stevenzwu
Copy link
Contributor

I think if you re-implement the serialization interface of the record, can you give me a sample and I can test it in my scenario to see how much improvement there is.

so the 4x slowdown is measured by throughput (not CPU overhead). Unless you are seeing 4x CPU overhead, this will likely not solve your problem.

We haven't finished the MVP version with a custom range partitioner. How did you actually shuffle the records leveraging the data statistics? Were the traffic relatively evenly distributed to writer tasks?

@huyuanfeng2018
Copy link
Contributor Author

huyuanfeng2018 commented Apr 25, 2023

  1. Regarding the 4x slowdown, this is indeed a four-fold decrease in throughput. As a user, I am more concerned about the throughput under the same resources, so this does not mean that the CPU consumption is increased 4x.

  2. Regarding the custom range partitioner, I implemented a custom range partitioner myself. As described in my previous reply, we will specify the proportion of each enumeration partition to the total data volume before the task starts (distribution-balance-column- ratio), and then use the custom range partitioner to calculate which streamWrite subtasks the data of each enumeration type is distributed to. According to Flink: Implement data statistics operator to collect traffic distribution for guiding smart shuffling #6382 and Flink: Data statistics operator sends local data statistics to coordinator and receive aggregated data statistics from coordinator for smart shuffling #7269, I then realized the dynamic adjustment of the total data of each enumeration partition based on each checkpoint statistics The proportion of volume(distribution-balance-column- ratio), so as to achieve the purpose of dynamic allocation, from the metric on the Flink ui, my data shuffling is very uniform, from the monitoring point of view, the cpu occupancy rate of all my tm is 100%, the following two pictures These are some of my monitoring data at this time

image

image

3. In fact, when I doubled the parallelism and cpu core, I found that my processing efficiency also doubled like this, so it can be determined with a high probability that it is the bottleneck of the cpu rather than the network io:

image

  1. In fact, there is one thing I have doubts about. Why doesn't the DataStatisticsOrRecord object directly implement Flink Rowdata? Because the generic type here has been determined to be Rowdata when using Flink to write
public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData>

@huyuanfeng2018
Copy link
Contributor Author

Add a throughput without enumeration partition,with the same resources

image

@stevenzwu
Copy link
Contributor

Why doesn't the DataStatisticsOrRecord object directly implement Flink Rowdata? Because the generic type here has been determined to be Rowdata when using Flink to write

This is a good point. we probably should change it.

@xccui
Copy link
Member

xccui commented Apr 28, 2023

Came across this issue. As a workaround, you can try defining your own com.esotericsoftware.kryo.Serializer for DataStatisticsOrRecord and register it via env.registerTypeWithKryoSerializer().

@stevenzwu I feel that at least we can make DataStatisticsOrRecord a POJO, right?

@stevenzwu
Copy link
Contributor

@xccui I thought we can't make DataStatisticsOrRecord a true Pojo, because the RowData is not a POJO. we will provide TypeSerializer implementations for DataStatistics and DataStatisticsOrRecord

@xccui
Copy link
Member

xccui commented May 1, 2023

@xccui I thought we can't make DataStatisticsOrRecord a true Pojo, because the RowData is not a POJO. we will provide TypeSerializer implementations for DataStatistics and DataStatisticsOrRecord

Ah, yes, it's not that trivial. For RowData, org.apache.flink.table.runtime.typeutils.InternalTypeInfo can help, but overall, customized TypeSerializers would be nice to have 👍

@github-actions
Copy link

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

@github-actions github-actions bot added the stale label Oct 29, 2023
Copy link

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

No branches or pull requests

5 participants