-
Notifications
You must be signed in to change notification settings - Fork 409
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refine merge agg stream #6793
Refine merge agg stream #6793
Conversation
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
/run-all-tests |
/run-unit-test |
1 similar comment
/run-unit-test |
tsan unit tests pass.
|
/run-all-tests |
1 similar comment
/run-all-tests |
/cc @windtalker @ywqzzy |
dbms/src/Interpreters/Aggregator.cpp
Outdated
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); | ||
} | ||
#undef M | ||
single_level_blocks = aggregator.prepareBlocksAndFillSingleLevel(*first, final); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not move the merge code to L2201, together with the without_key
case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, you are right, updated.
const LoggerPtr log; | ||
const Aggregator & aggregator; | ||
ManyAggregatedDataVariants data; | ||
bool final; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename it since it conflicts with the “final” keyword
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, but I don't want to rename it in this pr..
Since final
is used in many places such Aggregator
, AggregatingBlockInputStream
, and MergingAggregatedMemoryEfficientBlockInputStream
and etc, it might be better to do it with another pr..
|
||
void thread(Int32 bucket_num) | ||
{ | ||
try |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How to handle the exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UnionBlockInputStream
or other outside blockInputStreams will handle it.
BlockInputStreams merging_streams; | ||
for (size_t i = 0; i < merging_buckets->getConcurrency(); ++i) | ||
merging_streams.push_back(std::make_shared<MergingAndConvertingBlockInputStream>(merging_buckets, i, log->identifier())); | ||
impl = std::make_unique<UnionBlockInputStream<>>(merging_streams, BlockInputStreams{}, max_threads, log->identifier()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about moving the unionBlockInputStream construction outside the readImpl function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's possible because Impl
needs to be constructed inside readImpl
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Others LGTM
dbms/src/Interpreters/Aggregator.cpp
Outdated
{ | ||
APPLY_FOR_VARIANTS_TWO_LEVEL(M) | ||
default: | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not throw error for the default branch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, updated.
/merge |
@SeaRise: It seems you want to merge this PR, I will help you trigger all the tests: /run-all-tests You only need to trigger If you have any questions about the PR merge process, please refer to pr process. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
This pull request has been accepted and is ready to merge. Commit hash: 4d694bb
|
What problem does this PR solve?
Issue Number: ref #5900
Problem Summary:
What is changed and how it works?
If the aggregation states are two-level, then it produces blocks strictly in order of 'bucket_num'.
, because tiflash does not need this.MergingAndConvertingBlockInputStream
toMergingBuckets
and don't create separate threads to execute two level merge.MergingAndConvertingBlockInputStream
only run in one thread, and useUnionBlockInputStream
to do parallel two level merge.MergingBuckets
toAggregator.h
because of the c++ compile error...Aggregator.h
andAggregator.cpp
and remove useless classMergingAggregatedBlockInputStream
Check List
Tests
tsan unit tests pass.
Side effects
Documentation
Release note