-
Notifications
You must be signed in to change notification settings - Fork 409
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve the performance of partition table in extreme case #4988
Conversation
Signed-off-by: bestwoody <bestwoody@163.com>
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
…into multiplex_dmsegstream
streams_queue_id = 0; | ||
|
||
auto & q_ptr = streams_queue_by_partition[streams_queue_id]; | ||
auto & q = *q_ptr; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why need q
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just make code easier to read&write... and to show it will not be a nullptr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, I figure out, since it's a std::shared_ptr
, use dereference tech to avoid triger atomic counter of std::shared_ptr
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then why not merge L56 and L57 as
auto & q = *streams_queue_by_partition[stream_queue_id]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -777,7 +780,10 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max | |||
throw; | |||
} | |||
} | |||
pipeline.streams.insert(pipeline.streams.end(), current_pipeline.streams.begin(), current_pipeline.streams.end()); | |||
} | |||
for (int i = 0; i < static_cast<int>(max_streams); i++) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need to use MultiplexInputStream
for non-partition table scan or partition table scan with only one partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you, the only trouble is the code will be more dirty when two cases are not in a universal code path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
…into multiplex_dmsegstream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -642,8 +646,7 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max | |||
if (region_num == 0) | |||
continue; | |||
/// calculate weighted max_streams for each partition, note at least 1 stream is needed for each partition | |||
size_t current_max_streams = table_query_infos.size() == 1 ? max_streams : (max_streams * region_num + total_local_region_num - 1) / total_local_region_num; | |||
|
|||
size_t current_max_streams = max_streams; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems we don't need current_max_streams
anymore, just use max_streams
is enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
if (has_multiple_partitions) | ||
{ | ||
String req_info = dag_context.isMPPTask() ? dag_context.getMPPTaskId().toString() : ""; | ||
for (int i = 0; i < static_cast<int>(max_streams); ++i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The stream number should be std::min(max_streams, the stream number in stream_pool)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, nice find
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
streams_queue_id = 0; | ||
|
||
auto & q_ptr = streams_queue_by_partition[streams_queue_id]; | ||
auto & q = *q_ptr; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then why not merge L56 and L57 as
auto & q = *streams_queue_by_partition[stream_queue_id]
?
…into multiplex_dmsegstream
std::make_shared<std::queue<std::shared_ptr<IBlockInputStream>>>()); | ||
for (const auto & stream : cur_streams) | ||
streams_queue_by_partition.back()->push(stream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you could construct the queue before entering the critical area.
auto queue = std::make_shared<std::queue<BlockInputStreamPtr>>(cur_streams.begin(), cur_streams.end());
std::unique_lock lk(mu);
if (queue_id + 1 < static_cast<int>(streams_queue_by_partition.size())) | ||
return queue_id + 1; | ||
else | ||
return 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (queue_id + 1 < static_cast<int>(streams_queue_by_partition.size())) | |
return queue_id + 1; | |
else | |
return 0; | |
return (queue_id + 1) % streams_queue_by_partition.size(); |
std::shared_ptr<std::queue< | ||
std::shared_ptr<IBlockInputStream>>>> | ||
streams_queue_by_partition; | ||
std::vector<std::shared_ptr<IBlockInputStream>> added_streams; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use BlockInputStreamPtr
other than std::shared_ptr<IBlockInputStream>
.
std::shared_ptr<std::queue< | ||
std::shared_ptr<IBlockInputStream>>>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: according to ClickHouse's coding style, better to add an alias for std::queue<std::shared_ptr<IBlockInputStream>>
like:
using BlockInputStreamPtrQueue = std::queue<std::shared_ptr<IBlockInputStream>>;
Co-authored-by: Fu Zhe <fuzhe1989@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/merge |
@bestwoody: It seems you want to merge this PR, I will help you trigger all the tests: /run-all-tests You only need to trigger If you have any questions about the PR merge process, please refer to pr process. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
This pull request has been accepted and is ready to merge. Commit hash: b90332f
|
Coverage for changed files
Coverage summary
full coverage report (for internal network access only) |
What problem does this PR solve?
Issue Number: close #4474
Problem Summary:
Improve the performance of partition table in extreme case
What is changed and how it works?
use Multiplex technique to balancely read underlying DMSegmentInputstream to avoid consume starvation.
design doc: https://pingcap.feishu.cn/wiki/wikcntp2B8GEXyFLSbIbAtMJzBG?useNewLarklet=1
benchmark result: https://pingcap.feishu.cn/wiki/wikcntp2B8GEXyFLSbIbAtMJzBG?useNewLarklet=1#0fWIDd
Check List
Tests
Side effects
Documentation
Release note