-
Notifications
You must be signed in to change notification settings - Fork 409
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add support for auxiliary memory bound in mpmc_queue #6820
add support for auxiliary memory bound in mpmc_queue #6820
Conversation
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
7383faa
to
c021cb9
Compare
dbms/src/Common/MPMCQueue.h
Outdated
@@ -436,6 +472,7 @@ class MPMCQueue | |||
Int64 write_pos = 0; | |||
Status status = Status::NORMAL; | |||
String cancel_reason; | |||
size_t current_auxiliary_memory_usage = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Int64 here to avoid incorrect get_auxiliary_memory_usage case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
dbms/src/Common/MPMCQueue.h
Outdated
if (is_timeout) | ||
return Result::TIMEOUT; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (is_timeout) | |
return Result::TIMEOUT; | |
if constexpr (need_wait) | |
{ | |
if (is_timeout) | |
return Result::TIMEOUT; | |
} |
dbms/src/Common/MPMCQueue.h
Outdated
if (is_timeout) | ||
return Result::TIMEOUT; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
dbms/src/Common/MPMCQueue.h
Outdated
} | ||
if (!isCancelled() && read_pos < write_pos) | ||
{ | ||
auto & obj = getObj(read_pos); | ||
res = std::move(obj); | ||
destruct(obj); | ||
current_auxiliary_memory_usage -= get_auxiliary_memory_usage(res); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a little wasteful to call get_auxiliary_memory_usage
twice for one object.
And also it requires that these two return values of get_auxiliary_memory_usage
must be the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, save auxiliary_memory
in another vector.
c6db9b5
to
3f397ba
Compare
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Others LGTM
dbms/src/Common/MPMCQueue.h
Outdated
current_auxiliary_memory_usage -= element_auxiliary_memory[pos % capacity]; | ||
element_auxiliary_memory[pos % capacity] = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
current_auxiliary_memory_usage -= element_auxiliary_memory[pos % capacity]; | |
element_auxiliary_memory[pos % capacity] = 0; | |
auto & elem_value = element_auxiliary_memory[pos % capacity]; | |
current_auxiliary_memory_usage -= elem_value; | |
elem_value = 0; |
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
8b77e85
to
aed9719
Compare
/merge |
@windtalker: It seems you want to merge this PR, I will help you trigger all the tests: /run-all-tests You only need to trigger If you have any questions about the PR merge process, please refer to pr process. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
This pull request has been accepted and is ready to merge. Commit hash: aed9719
|
What problem does this PR solve?
Issue Number: ref #6528
Problem Summary:
In TiFlash runtime, some executors(like ExchangeReceiver, MPPTunnelSender, UnionBlockInputStream) use a MPMC queue to buffer the data, but the queues are only limited by the number of elements. If the element occupies a large amount of memory, the buffer queue may occupy too much memory, this pr add
auxiliary_memory_bound
to mpmc queue, so the memory usage by the queue can be limited.What is changed and how it works?
Check List
Tests
Side effects
Documentation
Release note