Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

auto pass through hashagg #9167

Merged
merged 130 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from 111 commits
Commits
Show all changes
130 commits
Select commit Hold shift + click to select a range
cd6cbea
selective: updateWeakHash32 done
guo-shaoge Jun 3, 2024
bf2af85
selective: scatterTo
guo-shaoge Jun 3, 2024
2e3662d
selective: scatter
guo-shaoge Jun 3, 2024
dab162f
mpp writer
guo-shaoge Jun 3, 2024
fd21bf1
1. Aggregator: support collect_hit_rate and only_lookup
guo-shaoge Jun 3, 2024
5e803e2
execution: support auto pass operator/stream
guo-shaoge Jun 3, 2024
957c8e1
fix wrong check
guo-shaoge Jun 4, 2024
82883fe
interpter support auto pass through
guo-shaoge Jun 4, 2024
7b5320e
unit test
guo-shaoge Jun 5, 2024
f9105ec
Merge branch 'master' of github.com:pingcap/tiflash into auto_passthr…
guo-shaoge Jun 20, 2024
b469fa2
unit test for auto pass context
guo-shaoge Jun 20, 2024
59422ca
unit test for executor
guo-shaoge Jun 24, 2024
0db738e
fmt
guo-shaoge Jun 24, 2024
a7e8036
getPassThroughBlock && integration unit test
guo-shaoge Jun 26, 2024
3b588c2
fix AutoPassThroughHashAggContext/ColumnString todo
guo-shaoge Jun 26, 2024
6d646d9
updateWeakHash32 declaration
guo-shaoge Jun 26, 2024
4df0268
fix lookupImpl && fix getNotFoundRows && getPassThroughBlock()
guo-shaoge Jun 27, 2024
605c4ba
use findKey for only_lookup;
guo-shaoge Jun 27, 2024
3aa40cc
row_limit_unit
guo-shaoge Jun 27, 2024
1db168a
fix some todo
guo-shaoge Jun 27, 2024
c30b4bf
fmt
guo-shaoge Jun 27, 2024
047f4ed
tipb
guo-shaoge Jun 28, 2024
c0018b7
del setHitBit
guo-shaoge Jun 28, 2024
a7d444b
statistics
guo-shaoge Jun 28, 2024
9fb0cb1
fmt
guo-shaoge Jun 28, 2024
0f4cc76
fmt
guo-shaoge Jun 30, 2024
d43a8de
Merge branch 'master' of ssh://github.com/pingcap/tiflash into auto_p…
guo-shaoge Jun 30, 2024
7125e5a
fmt
guo-shaoge Jun 30, 2024
f28933a
fix some todo
guo-shaoge Jun 30, 2024
285dd6f
fix some todo
guo-shaoge Jun 30, 2024
a5d6382
try fix unit-test
guo-shaoge Jul 1, 2024
313d822
try fix unit-test
guo-shaoge Jul 1, 2024
31aab83
add check_block_selective for expression stream
guo-shaoge Jul 1, 2024
e1a5718
fix some todo
guo-shaoge Jul 2, 2024
2d3f579
fix case
guo-shaoge Jul 2, 2024
75cf66f
handle empty data without group by key
guo-shaoge Jul 2, 2024
978207e
empty tbl test
guo-shaoge Jul 2, 2024
496cd20
fix fill def agg func bug
guo-shaoge Jul 2, 2024
c02cc8f
fix empty tbl case
guo-shaoge Jul 2, 2024
45e77bb
3-value tiflash_pre_agg_mode
guo-shaoge Jul 3, 2024
8db40e7
update tipb
guo-shaoge Jul 3, 2024
877e833
remove switcher
guo-shaoge Jul 4, 2024
d8737b4
fix
guo-shaoge Jul 4, 2024
f41bb30
tipb
guo-shaoge Jul 4, 2024
98e90e6
fix
guo-shaoge Jul 5, 2024
db539b3
refine
guo-shaoge Jul 8, 2024
4022883
fix
guo-shaoge Jul 8, 2024
8e9fdc3
Merge branch 'master' of github.com:pingcap/tiflash into auto_passthr…
guo-shaoge Jul 8, 2024
4ad1654
support selective block for MPPExchangeWriter
guo-shaoge Jul 8, 2024
162bd6e
unit-test
guo-shaoge Jul 8, 2024
0c4592c
scatter unit-test
guo-shaoge Jul 9, 2024
2d44281
fix
guo-shaoge Jul 9, 2024
bc6d280
fix
guo-shaoge Jul 9, 2024
29f6ea2
scatterTo unittest
guo-shaoge Jul 9, 2024
018e208
fix
guo-shaoge Jul 9, 2024
77826d8
fix
guo-shaoge Jul 9, 2024
4e63522
add updateWeakHash32Impl
guo-shaoge Jul 10, 2024
18cf199
refine
guo-shaoge Jul 10, 2024
aa0ea6b
refine
guo-shaoge Jul 10, 2024
954e903
check block.info runtime
guo-shaoge Jul 10, 2024
b50063e
refine LoopOneColumn
guo-shaoge Jul 10, 2024
db21b5a
refine
guo-shaoge Jul 10, 2024
35b38e5
refine
guo-shaoge Jul 10, 2024
643d1b6
Merge branch 'master' of github.com:pingcap/tiflash into auto_passthr…
guo-shaoge Jul 11, 2024
7470ef1
fix getPassThroughBlock
guo-shaoge Jul 11, 2024
60a8559
Merge branch 'master' of github.com:pingcap/tiflash into selective_block
guo-shaoge Jul 12, 2024
4eaa2b0
Merge branch 'selective_block' of github.com:guo-shaoge/tiflash into …
guo-shaoge Jul 12, 2024
c98d332
fix conflict
guo-shaoge Jul 12, 2024
569053a
del checkSelective
guo-shaoge Jul 12, 2024
0a4148a
log
guo-shaoge Jul 12, 2024
6cecb2c
disable selective check for expression block input stream
guo-shaoge Jul 14, 2024
ed6033b
disable state switch
guo-shaoge Jul 14, 2024
d4a1ac6
Revert "disable state switch"
guo-shaoge Jul 14, 2024
e471184
Revert "disable selective check for expression block input stream"
guo-shaoge Jul 14, 2024
717dced
fix select template for exchange sender
guo-shaoge Jul 14, 2024
b25dee7
fix auto pass through integration case(need fix todos later)
guo-shaoge Jul 14, 2024
6df41d0
optimize getPassThroughBlock
guo-shaoge Jul 15, 2024
839e0cb
case for AutoPassThroughHashAggHelper
guo-shaoge Jul 16, 2024
9a03f59
benchmark for setupAutoPassThroughColumnGenerator
guo-shaoge Jul 17, 2024
8a8d9b6
fix some todos
guo-shaoge Jul 17, 2024
41783c8
fix projection
guo-shaoge Jul 17, 2024
1f71029
tmp disable send block in advance
guo-shaoge Jul 17, 2024
45f033d
Revert "tmp disable send block in advance"
guo-shaoge Jul 17, 2024
926d79e
tmp refine count col generator
guo-shaoge Jul 17, 2024
f5d1ba9
tmp change decimal infer
guo-shaoge Jul 18, 2024
9d5e7e3
tmp some log
guo-shaoge Jul 18, 2024
b4323f7
tmp disable expression copy block info
guo-shaoge Jul 18, 2024
8ba8bc3
tmp log agg expr actions
guo-shaoge Jul 18, 2024
7d99144
log expression block input stream time
guo-shaoge Jul 18, 2024
00b8b4e
tmp remove cast after agg
guo-shaoge Jul 18, 2024
77ab379
tmp debug exchange sender(and header in executeQuery)
guo-shaoge Jul 18, 2024
837284d
tmp disable cast in final projection
guo-shaoge Jul 18, 2024
b4a1510
Revert "tmp disable cast in final projection"
guo-shaoge Jul 19, 2024
70f4329
tmp hack count: count return int64; count gen didn't check nullable
guo-shaoge Jul 21, 2024
826734f
refine
guo-shaoge Jul 21, 2024
6e2a5a5
Merge branch 'master' of github.com:pingcap/tiflash into auto_passthr…
guo-shaoge Jul 21, 2024
7a675b0
refine
guo-shaoge Jul 21, 2024
35a742c
fix case
guo-shaoge Jul 21, 2024
e08f0ce
refine sum(null)/count(1)/min/max
guo-shaoge Jul 22, 2024
d6c8059
refine
guo-shaoge Jul 23, 2024
a430c69
refine case
guo-shaoge Jul 23, 2024
fd6b49c
lint
guo-shaoge Jul 23, 2024
b07a871
del template fro ExpressionBlockInputStream
guo-shaoge Jul 23, 2024
0d0e1ec
refine comment; add check for ExpressionBlockInputStream
guo-shaoge Jul 24, 2024
c060643
fix build
guo-shaoge Jul 24, 2024
ed65697
refine auto pass through helper
guo-shaoge Jul 29, 2024
947a19e
check block.info.selective runtime
guo-shaoge Jul 29, 2024
97b46f3
refine
guo-shaoge Jul 31, 2024
2f15000
enable -> enabled
guo-shaoge Jul 31, 2024
38b323b
refine log
guo-shaoge Jul 31, 2024
60736a1
fmt
guo-shaoge Jul 31, 2024
cb8098c
aggresive pass through policy
guo-shaoge Jul 31, 2024
411633a
refine stream
guo-shaoge Jul 31, 2024
8a508a1
fix cycle reference
guo-shaoge Aug 1, 2024
c75ac8c
refine setParent; add spillable case
guo-shaoge Aug 5, 2024
dcfd99c
refine forceState()
guo-shaoge Aug 5, 2024
8c5960b
refine case(spill not-spill)
guo-shaoge Aug 6, 2024
1cd2063
refine
guo-shaoge Aug 6, 2024
a8bf0f0
del spill case
guo-shaoge Aug 6, 2024
4c04f55
clear spill callback for auto pass
guo-shaoge Aug 6, 2024
c02793f
refine spill case
guo-shaoge Aug 6, 2024
cd9ed17
refine comment
guo-shaoge Aug 6, 2024
19bf804
update continusous limit logic
guo-shaoge Aug 7, 2024
35f5fc7
finish spillable stage
guo-shaoge Aug 7, 2024
0100f6b
fmt
guo-shaoge Aug 7, 2024
76a1e34
Merge branch 'master' into auto_passthrough_hashagg
guo-shaoge Aug 7, 2024
a5af40c
fix recursiveSetBlockInputStreamParent
guo-shaoge Aug 7, 2024
c3be1a9
Compression: add microbenchmark (#9293)
Lloyd-Pottiger Aug 7, 2024
7c49f3f
Revert "Compression: add microbenchmark (#9293)"
guo-shaoge Aug 7, 2024
d1447f2
Merge branch 'master' into auto_passthrough_hashagg
guo-shaoge Aug 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/tipb
2 changes: 1 addition & 1 deletion dbms/src/Common/HashTable/HashTableKeyHolder.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* make a persistent copy of the key in each of the following cases:
* 1) the aggregation method doesn't use temporary keys, so they're persistent
* from the start;
* 1) the key is already present in the hash table;
* 2) the key is already present in the hash table;
* 3) that particular key is stored by value, e.g. a short StringRef key in
* StringHashMap.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/AutoPassThroughAggregatingBlockInputStream.h>

namespace DB
{

template <bool force_streaming>
Block AutoPassThroughAggregatingBlockInputStream<force_streaming>::readImpl()
{
while (!build_done)
{
Block block = children[0]->read();
if (block)
{
auto_pass_through_context->onBlock<force_streaming>(block);
}
else
{
build_done = true;
break;
}

if (!auto_pass_through_context->passThroughBufferEmpty())
return auto_pass_through_context->popPassThroughBuffer();
}

assert(build_done);
if (!auto_pass_through_context->passThroughBufferEmpty())
return auto_pass_through_context->popPassThroughBuffer();

return auto_pass_through_context->getData();
}

} // namespace DB
60 changes: 60 additions & 0 deletions dbms/src/DataStreams/AutoPassThroughAggregatingBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <DataStreams/IProfilingBlockInputStream.h>
#include <Interpreters/Aggregator.h>
#include <Operators/AutoPassThroughHashAggContext.h>

namespace DB
{
static constexpr std::string_view autoPassThroughAggregatingExtraInfo = "auto pass through";

template <bool force_streaming>
class AutoPassThroughAggregatingBlockInputStream : public IProfilingBlockInputStream
{
static constexpr auto NAME = "Aggregating";

public:
AutoPassThroughAggregatingBlockInputStream(
const BlockInputStreamPtr & input_,
const Aggregator::Params & params_,
const String & req_id,
UInt64 row_limit_unit)
{
children.push_back(input_);
auto_pass_through_context = std::make_unique<AutoPassThroughHashAggContext>(
children[0]->getHeader(),
params_,
[&]() { return this->isCancelled(); },
req_id,
row_limit_unit);
}

String getName() const override { return NAME; }

Block getHeader() const override { return auto_pass_through_context->getHeader(); }

protected:
Block readImpl() override;

private:
AutoPassThroughHashAggContextPtr auto_pass_through_context;
bool build_done = false;
};

template class AutoPassThroughAggregatingBlockInputStream<true>;
template class AutoPassThroughAggregatingBlockInputStream<false>;
} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/DataStreams/ExchangeSenderBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class ExchangeSenderBlockInputStream : public IProfilingBlockInputStream
String getName() const override { return name; }
Block getHeader() const override { return children.back()->getHeader(); }

bool canHandleSelectiveBlock() const override { return true; }

protected:
Block readImpl() override;
void readPrefixImpl() override { writer->prepare(getHeader()); }
Expand Down
17 changes: 16 additions & 1 deletion dbms/src/DataStreams/ExpressionBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,22 @@ Block ExpressionBlockInputStream::readImpl()
Block res = children.back()->read();
if (!res)
return res;
expression->execute(res);

if (res.info.selective)
{
const auto ori_rows = res.rows();
auto ori_info = res.info;
expression->execute(res);
res.info = ori_info;
// When block.info.selective is not null, the expression action should be cast/project.
// So the rows should not change.
RUNTIME_CHECK(ori_rows == res.rows());
}
else
{
expression->execute(res);
}
return res;
}

} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/DataStreams/ExpressionBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class ExpressionBlockInputStream : public IProfilingBlockInputStream
String getName() const override { return NAME; }
Block getHeader() const override;

bool canHandleSelectiveBlock() const override { return true; }

protected:
Block readImpl() override;

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/DataStreams/IBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ class IBlockInputStream : private boost::noncopyable

virtual void appendInfo(FmtBuffer & /*buffer*/) const {};

virtual bool canHandleSelectiveBlock() const { return false; }

protected:
virtual uint64_t collectCPUTimeNsImpl(bool /*is_thread_runner*/) { return 0; }

Expand Down
11 changes: 10 additions & 1 deletion dbms/src/DataStreams/IProfilingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,15 @@ Block IProfilingBlockInputStream::read(FilterPtr & res_filter, bool return_filte

if (quota != nullptr)
checkQuota(res);

// Check if child can handle selective block.
for (const auto & child : children)
{
RUNTIME_CHECK_MSG(
child->canHandleSelectiveBlock() || !res.info.selective,
"{} cannot handle selective block",
getName());
}
}
else
{
Expand Down Expand Up @@ -228,7 +237,7 @@ static bool handleOverflowMode(OverflowMode mode, const String & message, int co
default:
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
};
}


bool IProfilingBlockInputStream::checkTimeLimit() const
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ void ParallelAggregatingBlockInputStream::cancel(bool kill)
processor.cancel(kill);
}


Block ParallelAggregatingBlockInputStream::readImpl()
{
if (!executed)
Expand Down
14 changes: 11 additions & 3 deletions dbms/src/Debug/MockExecutor/AggregationBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ bool AggregationBinder::toTiPBExecutor(
tipb_executor->set_executor_id(name);
tipb_executor->set_fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count);
auto * agg = tipb_executor->mutable_aggregation();
if (switcher)
agg->set_pre_agg_mode(switcher->mode);
buildAggExpr(agg, collator_id, context);
buildGroupBy(agg, collator_id, context);
auto * child_executor = agg->mutable_child();
Expand Down Expand Up @@ -87,6 +89,7 @@ void AggregationBinder::toMPPSubPlan(
// todo support avg
if (has_uniq_raw_res)
throw Exception("uniq raw res not supported in mpp query");
// Partial agg cannot be fine grained shuffle. So set fine_grained_shuffle_stream_count as 0.
std::shared_ptr<AggregationBinder> partial_agg = std::make_shared<AggregationBinder>(
executor_index,
output_schema_for_partial_agg,
Expand All @@ -95,7 +98,8 @@ void AggregationBinder::toMPPSubPlan(
std::move(agg_exprs),
std::move(gby_exprs),
false,
fine_grained_shuffle_stream_count);
/*fine_grained_shuffle_stream_count*/ 0,
switcher);
partial_agg->children.push_back(children[0]);
std::vector<size_t> partition_keys;
size_t agg_func_num = partial_agg->agg_exprs.size();
Expand Down Expand Up @@ -136,6 +140,8 @@ void AggregationBinder::toMPPSubPlan(
gby_exprs.push_back(std::make_shared<ASTIdentifier>(output_schema_for_partial_agg[agg_func_num + i].first));
}
children[0] = exchange_receiver;
// Because this aggregation is 2nd agg, so reset auto_pass_through flag.
switcher = nullptr;
}

bool AggregationBinder::needAppendProject() const
Expand Down Expand Up @@ -235,7 +241,8 @@ ExecutorBinderPtr compileAggregation(
size_t & executor_index,
ASTPtr agg_funcs,
ASTPtr group_by_exprs,
uint64_t fine_grained_shuffle_stream_count)
uint64_t fine_grained_shuffle_stream_count,
std::shared_ptr<AutoPassThroughSwitcher> switcher)
{
std::vector<ASTPtr> agg_exprs;
std::vector<ASTPtr> gby_exprs;
Expand Down Expand Up @@ -308,7 +315,8 @@ ExecutorBinderPtr compileAggregation(
std::move(agg_exprs),
std::move(gby_exprs),
true,
fine_grained_shuffle_stream_count);
fine_grained_shuffle_stream_count,
switcher);
aggregation->children.push_back(input);
return aggregation;
}
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Debug/MockExecutor/AggregationBinder.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <Debug/MockExecutor/ExecutorBinder.h>
#include <Operators/AutoPassThroughHashAggContext.h>
#include <Parsers/ASTFunction.h>

namespace DB::mock
Expand All @@ -33,14 +34,16 @@ class AggregationBinder : public ExecutorBinder
ASTs && agg_exprs_,
ASTs && gby_exprs_,
bool is_final_mode_,
uint64_t fine_grained_shuffle_stream_count_)
uint64_t fine_grained_shuffle_stream_count_,
std::shared_ptr<AutoPassThroughSwitcher> switcher_)
: ExecutorBinder(index_, "aggregation_" + std::to_string(index_), output_schema_)
, has_uniq_raw_res(has_uniq_raw_res_)
, need_append_project(need_append_project_)
, agg_exprs(std::move(agg_exprs_))
, gby_exprs(std::move(gby_exprs_))
, is_final_mode(is_final_mode_)
, fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_)
, switcher(switcher_)
{}

bool toTiPBExecutor(
Expand Down Expand Up @@ -73,6 +76,7 @@ class AggregationBinder : public ExecutorBinder
bool is_final_mode;
DAGSchema output_schema_for_partial_agg;
uint64_t fine_grained_shuffle_stream_count;
std::shared_ptr<AutoPassThroughSwitcher> switcher;

private:
void buildGroupBy(tipb::Aggregation * agg, int32_t collator_id, const Context & context) const;
Expand All @@ -85,6 +89,7 @@ ExecutorBinderPtr compileAggregation(
size_t & executor_index,
ASTPtr agg_funcs,
ASTPtr group_by_exprs,
uint64_t fine_grained_shuffle_stream_count = 0);
uint64_t fine_grained_shuffle_stream_count = 0,
std::shared_ptr<AutoPassThroughSwitcher> switcher = nullptr);

} // namespace DB::mock
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/FineGrainedShuffle.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ static constexpr std::string_view enableFineGrainedShuffleExtraInfo = "enable fi

static constexpr size_t maxFineGrainedStreamCount = 1024;

inline bool enableFineGrainedShuffle(uint64_t stream_count)
inline bool fineGrainedShuffleEnabled(uint64_t stream_count)
{
return stream_count > 0;
}
Expand All @@ -40,7 +40,7 @@ struct FineGrainedShuffle
, batch_size(executor ? executor->fine_grained_shuffle_batch_size() : 0)
{}

bool enable() const { return enableFineGrainedShuffle(stream_count); }
bool enabled() const { return fineGrainedShuffleEnabled(stream_count); }

const UInt64 stream_count;
const UInt64 batch_size;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ ExchangeReceiverBase<RPCContext>::ExchangeReceiverBase(
: exc_log(Logger::get(req_id, executor_id))
, rpc_context(std::move(rpc_context_))
, source_num(source_num_)
, enable_fine_grained_shuffle_flag(enableFineGrainedShuffle(fine_grained_shuffle_stream_count_))
, enable_fine_grained_shuffle_flag(fineGrainedShuffleEnabled(fine_grained_shuffle_stream_count_))
, output_stream_count(
enable_fine_grained_shuffle_flag ? std::min(max_streams_, fine_grained_shuffle_stream_count_) : max_streams_)
, max_buffer_size(getMaxBufferSize(source_num, settings.recv_queue_size))
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ void Pipeline::addPlanNode(const PhysicalPlanNodePtr & plan_node)
{
assert(plan_node);
/// For fine grained mode, all plan node should enable fine grained shuffle.
if (!plan_node->getFineGrainedShuffle().enable())
if (!plan_node->getFineGrainedShuffle().enabled())
is_fine_grained_mode = false;
plan_nodes.push_back(plan_node);
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Planner/PhysicalPlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ void PhysicalPlan::build(const tipb::Executor * executor)
{
RUNTIME_CHECK(executor);
RUNTIME_CHECK(executor->has_executor_id());

const auto & executor_id = executor->executor_id();
switch (executor->tp())
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Planner/PhysicalPlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ void PhysicalPlanNode::buildPipeline(

EventPtr PhysicalPlanNode::sinkComplete(PipelineExecutorContext & exec_context)
{
if (getFineGrainedShuffle().enable())
if (getFineGrainedShuffle().enabled())
return nullptr;
return doSinkComplete(exec_context);
}
Expand Down
Loading