Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Resolve writers object of PartitionedDeltaWriter will cause OOM when partition number is big #7217

Closed
wants to merge 2 commits into from

Conversation

gong
Copy link

@gong gong commented Mar 27, 2023

I use LRU map to replace hashmap to resolve OOM problem for PartitionedDeltaWriter class.
link issue:#7216

@github-actions github-actions bot added the flink label Mar 27, 2023
@gong gong changed the title Flink: resolve writers object of PartitionedDeltaWriter will cause OOM when partition number is big Flink: Resolve writers object of PartitionedDeltaWriter will cause OOM when partition number is big Mar 27, 2023
@gong
Copy link
Author

gong commented Mar 30, 2023

@openinx hello, could you help me review it ?

Copy link
Contributor

@hililiwei hililiwei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How many partitions do you have approximately? Can you try adding write.distribution-mode=hash to distribute the data of the same partition to one writer? We can’t just remove the wirter, if it’s no longer needed, we need to close it properly.

@gong
Copy link
Author

gong commented Mar 30, 2023

How many partitions do you have approximately? Can you try adding write.distribution-mode=hash to distribute the data of the same partition to one writer? We can’t just remove the wirter, if it’s no longer needed, we need to close it properly.

partition is too much. Snapshot data is partitioned by yyyy-mm-dd format of create_time field. I set 5 parallelisms and checkpoint interval is 1 min. I had used write.distribution-mode=hash. But taskmanager oom and dump show writers object consumes 2G memory. It will cause what problem if remove writer. @hililiwei

@hililiwei
Copy link
Contributor

When you do not close the writer, but only remove it, your data will be lost. This is a fatal flaw.
This can be easily verified by setting your capacity to 1, for example. Of course, this PR also needs UT, I hope you can supplement it.

@hililiwei
Copy link
Contributor

hililiwei commented Mar 31, 2023

partition is too much. Snapshot data is partitioned by yyyy-mm-dd format of create_time field. I set 5 parallelisms and checkpoint interval is 1 min.

Sounds like it's a streaming job? Do you receive very many different partitions (days) in one checkpoint cycle (1 min)? In my experience, it's usually in single digits. I'm a little confused about that.

Of course, I'm not against using LRU, just curious about your case.

@stevenzwu
Copy link
Contributor

stevenzwu commented Mar 31, 2023

@gong it may work for you for your use cases. but I don't think we would adopt this change as a generally applicable solution. we can live out the hard-coded threshold for now. Capping the number of concurrent writers can help with limiting the memory footprint. But it will create another serious problem. Now sink can constantly open and close many small files all the time.

2 GB memory footprint is really not that big for Parquet writer. you may just want to increase the memory allocation. you can also reduce the Parquet row group size to reduce memory footprint. or switch to Avro file format to reduce memory footprint.

if you have bucket partitioning, you can try out this PR that my colleague created recently: #7161

We are also working on a more general range partitioner that can work with skewed distributions (like event time): #6303

@gong
Copy link
Author

gong commented Mar 31, 2023

When you do not close the writer, but only remove it, your data will be lost. This is a fatal flaw. This can be easily verified by setting your capacity to 1, for example. Of course, this PR also needs UT, I hope you can supplement it.

@hililiwei Thanks for your suggestion

@gong
Copy link
Author

gong commented Mar 31, 2023

@stevenzwu Thanks for your suggestion

@stevenzwu stevenzwu closed this Mar 31, 2023
@gong
Copy link
Author

gong commented Apr 1, 2023

partition is too much. Snapshot data is partitioned by yyyy-mm-dd format of create_time field. I set 5 parallelisms and checkpoint interval is 1 min.

Sounds like it's a streaming job? Do you receive very many different partitions (days) in one checkpoint cycle (1 min)? In my experience, it's usually in single digits. I'm a little confused about that.

Of course, I'm not against using LRU, just curious about your case.

@hililiwei Partition field data is discrete when read snapshot phase. I write a defective code when use LRU map. Beacause I don't close writer. Thanks for your suggestion. I increase memory to avoid oom but checkpoint time is too long. I think cost many time to close writer. I increase checkpoint interval to avoid frequently close writer. Do you have any suggestion?

@gong
Copy link
Author

gong commented Apr 1, 2023

partition is too much. Snapshot data is partitioned by yyyy-mm-dd format of create_time field. I set 5 parallelisms and checkpoint interval is 1 min.

Sounds like it's a streaming job? Do you receive very many different partitions (days) in one checkpoint cycle (1 min)? In my experience, it's usually in single digits. I'm a little confused about that.
Of course, I'm not against using LRU, just curious about your case.

@hililiwei Partition field data is discrete when read snapshot phase. I write a defective code when use LRU map. Beacause I don't close writer. Thanks for your suggestion. I increase memory to avoid oom but checkpoint time is too long. I think cost many time to close writer. I increase checkpoint interval to avoid frequently close writer. Do you have any suggestion?

@hililiwei I modify PartitionedDeltaWriter#close method to paralle close. It can reduce checkpoint time.

 Tasks.foreach(writers.values())
                    .throwFailureWhenFinished()
                    .noRetry()
                    .executeWith(EXECUTOR_SERVICE) // set a ExecutorService object
                    .run(RowDataDeltaWriter::close, IOException.class);

@hililiwei
Copy link
Contributor

@hililiwei I modify PartitionedDeltaWriter#close method to paralle close. It can reduce checkpoint time.

 Tasks.foreach(writers.values())
                    .throwFailureWhenFinished()
                    .noRetry()
                    .executeWith(EXECUTOR_SERVICE) // set a ExecutorService object
                    .run(RowDataDeltaWriter::close, IOException.class);

I'm glad you worked it out. Writers of different partitions can be closed in parallel, which I think is OK.
Good luck.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants