Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plan, executor: implement Expand operator for grouping sets #6545

Merged
merged 35 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
b70415d
repeat inital draft
AilinKid Dec 6, 2022
8f7150f
fix the test Exeception because of fmtlib can take '{' as escape symbol
Dec 6, 2022
7f9b656
add test for tiflash repeat logic
Dec 7, 2022
f66f010
fix the repeat source logical test and add the repeat physical planne…
Dec 13, 2022
4bb8fb9
change the name from repeat source to expand and rebase master
Dec 26, 2022
697e865
remove useless file
Dec 26, 2022
37e3662
remove debug log
Dec 26, 2022
35e1f5e
.
Dec 26, 2022
29f4b94
remove chinese comment
Dec 28, 2022
edaa6a2
make fmt
Dec 28, 2022
7adaf9b
rename repeat as expand
Jan 10, 2023
62dc142
rename file
Jan 10, 2023
9d40771
fix test under new rebased code
Jan 10, 2023
a34e952
address haisheng's comment
Jan 10, 2023
db7b1ff
clang fmt
Jan 10, 2023
445b3c4
fix rebase error
Feb 11, 2023
5bba07f
address partial of haisheng's comment
Feb 13, 2023
74ea652
make fmt
Feb 13, 2023
11e8a46
add
Feb 17, 2023
5609f3b
resolve header file recycle
Feb 17, 2023
995d14e
address jiangtao's comment
Feb 18, 2023
199961c
fmt
Feb 18, 2023
faf8cf3
add test for overlap grouping set
Feb 18, 2023
95c3ea1
remove debug info
Feb 20, 2023
126d2b0
remove useless header file and comment
Feb 20, 2023
c6344fe
enable pipeline model
Feb 20, 2023
034d06a
fix single block test
Feb 20, 2023
3f954d3
address haisheng comment
Feb 21, 2023
0eb3029
address haisheng's comment 2
Feb 21, 2023
a83d42c
Merge branch 'master' into local_repeat_source
AilinKid Feb 21, 2023
f01758d
Merge branch 'master' into local_repeat_source
AilinKid Feb 21, 2023
b3518e3
address comment: move plan test and refactor some code
Feb 22, 2023
6c12fd9
add test for gtest_pipeline_interpreter
Feb 22, 2023
dad3531
Merge branch 'master' into local_repeat_source
AilinKid Feb 22, 2023
bfe97e5
Merge branch 'master' into local_repeat_source
ti-chi-bot Feb 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ namespace DB
F(type_limit, {"type", "limit"}), F(type_join, {"type", "join"}), F(type_exchange_sender, {"type", "exchange_sender"}), \
F(type_exchange_receiver, {"type", "exchange_receiver"}), F(type_projection, {"type", "projection"}), \
F(type_partition_ts, {"type", "partition_table_scan"}), \
F(type_window, {"type", "window"}), F(type_window_sort, {"type", "window_sort"})) \
F(type_window, {"type", "window"}), F(type_window_sort, {"type", "window_sort"}), \
F(type_expand, {"type", "expand"})) \
M(tiflash_coprocessor_request_duration_seconds, "Bucketed histogram of request duration", Histogram, \
F(type_cop, {{"type", "cop"}}, ExpBuckets{0.001, 2, 20}), \
F(type_batch, {{"type", "batch"}}, ExpBuckets{0.001, 2, 20}), \
Expand Down Expand Up @@ -276,7 +277,8 @@ namespace DB
M(tiflash_compute_request_unit, "Request Unit used by tiflash compute", Counter, \
F(type_mpp, {{"type", "mpp"}, ComputeLabelHolder::instance().getClusterIdLabel(), ComputeLabelHolder::instance().getProcessIdLabel()}), \
F(type_cop, {{"type", "cop"}, ComputeLabelHolder::instance().getClusterIdLabel(), ComputeLabelHolder::instance().getProcessIdLabel()}), \
F(type_batch, {{"type", "batch"}, ComputeLabelHolder::instance().getClusterIdLabel(), ComputeLabelHolder::instance().getProcessIdLabel()})) \
F(type_batch, {{"type", "batch"}, ComputeLabelHolder::instance().getClusterIdLabel(), ComputeLabelHolder::instance().getProcessIdLabel()}))

// clang-format on

/// Buckets with boundaries [start * base^0, start * base^1, ..., start * base^(size-1)]
Expand Down
1 change: 0 additions & 1 deletion dbms/src/DataStreams/ExpressionBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,4 @@ Block ExpressionBlockInputStream::readImpl()
expression->execute(res);
return res;
}

} // namespace DB
68 changes: 68 additions & 0 deletions dbms/src/Debug/MockExecutor/ExpandBinder.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Debug/MockExecutor/AstToPBUtils.h>
#include <Debug/MockExecutor/ExpandBinder.h>

namespace DB::mock
{

bool ExpandBinder::toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context)
{
tipb_executor->set_tp(tipb::ExecType::TypeExpand);
tipb_executor->set_executor_id(name);
tipb::Expand * expand = tipb_executor->mutable_expand();
for (const auto & grouping_set : grouping_sets_columns)
{
auto * gss = expand->add_grouping_sets();
for (const auto & grouping_exprs : grouping_set)
{
auto * ges = gss->add_grouping_exprs();
for (const auto & grouping_col : grouping_exprs)
{
tipb::Expr * add_column = ges->add_grouping_expr();
astToPB(children[0]->output_schema, grouping_col, add_column, collator_id, context); // ast column ref change to tipb:Expr column ref
}
}
}
auto * children_executor = expand->mutable_child();
return children[0]->toTiPBExecutor(children_executor, collator_id, mpp_info, context);
}

ExecutorBinderPtr compileExpand(ExecutorBinderPtr input, size_t & executor_index, MockVVecGroupingNameVec grouping_set_columns, std::set<String> in_set)
{
DAGSchema output_schema;
for (const auto & field : input->output_schema)
{
// if the column is in the grouping sets, make it nullable.
if (in_set.find(field.first) != in_set.end() && field.second.hasNotNullFlag())
output_schema.push_back(toNullableDAGColumnInfo(field));
else
output_schema.push_back(field);
}
{
tipb::FieldType field_type{};
field_type.set_tp(TiDB::TypeLongLong);
field_type.set_charset("binary");
field_type.set_collate(TiDB::ITiDBCollator::BINARY);
field_type.set_flag(TiDB::ColumnFlagUnsigned | TiDB::ColumnFlagNotNull); // should have NOT NULL FLAG
field_type.set_flen(-1);
field_type.set_decimal(-1);
output_schema.push_back(std::make_pair("groupingID", TiDB::fieldTypeToColumnInfo(field_type)));
}
ExecutorBinderPtr expand = std::make_shared<ExpandBinder>(executor_index, output_schema, std::move(grouping_set_columns));
expand->children.push_back(input);
return expand;
}
} // namespace DB::mock
43 changes: 43 additions & 0 deletions dbms/src/Debug/MockExecutor/ExpandBinder.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Debug/MockExecutor/ExecutorBinder.h>

namespace DB::mock
{
using MockGroupingNameVec = std::vector<ASTPtr>;
using MockVecGroupingNameVec = std::vector<MockGroupingNameVec>;
using MockVVecGroupingNameVec = std::vector<MockVecGroupingNameVec>;

class ExpandBinder : public ExecutorBinder
{
public:
ExpandBinder(size_t & index_, const DAGSchema & output_schema_, MockVVecGroupingNameVec gss)
: ExecutorBinder(index_, "expand_" + std::to_string(index_), output_schema_)
, grouping_sets_columns(gss)
{}

bool toTiPBExecutor(tipb::Executor * tipb_executor, int32_t collator_id, const MPPInfo & mpp_info, const Context & context) override;

void columnPrune(std::unordered_set<String> &) override { throw Exception("Should not reach here"); }

private:
// for now, every grouping set is base columns list, modify structure to be one more nested if grouping set merge is enabled.
MockVVecGroupingNameVec grouping_sets_columns;
};

ExecutorBinderPtr compileExpand(ExecutorBinderPtr input, size_t & executor_index, MockVVecGroupingNameVec grouping_set_columns, std::set<String> set);
} // namespace DB::mock
60 changes: 60 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <Functions/FunctionHelpers.h>
#include <Functions/FunctionsTiDBConversion.h>
#include <Interpreters/Context.h>
#include <Interpreters/Expand.h>
#include <Interpreters/Set.h>
#include <Interpreters/Settings.h>
#include <Interpreters/convertFieldToType.h>
Expand Down Expand Up @@ -804,6 +805,65 @@ NamesAndTypes DAGExpressionAnalyzer::buildOrderColumns(
return order_columns;
}

GroupingSets DAGExpressionAnalyzer::buildExpandGroupingColumns(
const tipb::Expand & expand,
const ExpressionActionsPtr & actions)
{
GroupingSets group_sets_columns;
std::map<String, bool> map_grouping_col;
group_sets_columns.reserve(expand.grouping_sets().size());
for (const auto & group_set : expand.grouping_sets())
{
GroupingSet group_set_columns;
group_set_columns.reserve(group_set.grouping_exprs().size());
for (const auto & group_exprs : group_set.grouping_exprs())
{
GroupingColumnNames group_exprs_columns;
group_exprs_columns.reserve(group_exprs.grouping_expr().size());
for (const auto & group_expr : group_exprs.grouping_expr())
{
String cp_name = getActions(group_expr, actions);
// tidb expression computation is based on column index offset child's chunk schema, change to ck block column name here.
group_exprs_columns.emplace_back(cp_name);
map_grouping_col.insert(std::pair<String, bool>(cp_name, true));
}
// move here, cause basic string is copied from input cols.
group_set_columns.emplace_back(std::move(group_exprs_columns));
}
group_sets_columns.emplace_back(std::move(group_set_columns));
}
// change the original source column to be nullable, and add a new column for groupingID.
for (auto & mutable_one : source_columns)
{
if (map_grouping_col[mutable_one.name])
mutable_one.type = makeNullable(mutable_one.type);
}
source_columns.emplace_back(Expand::grouping_identifier_column_name, Expand::grouping_identifier_column_type);
return group_sets_columns;
}

ExpressionActionsPtr DAGExpressionAnalyzer::appendExpand(
const tipb::Expand & expand,
ExpressionActionsChain & chain)
{
auto & last_step = initAndGetLastStep(chain);
for (const auto & origin_col : last_step.actions->getSampleBlock().getNamesAndTypesList())
{
last_step.required_output.push_back(origin_col.name);
}
auto grouping_sets = buildExpandGroupingColumns(expand, last_step.actions);
last_step.actions->add(ExpressionAction::expandSource(grouping_sets));

auto before_expand = chain.getLastActions();
chain.finalize();
chain.clear();

auto & after_expand_step = initAndGetLastStep(chain);
for (const auto & column : getCurrentInputColumns())
after_expand_step.required_output.push_back(column.name);
return before_expand;
}

std::vector<NameAndTypePair> DAGExpressionAnalyzer::appendOrderBy(
ExpressionActionsChain & chain,
const tipb::TopN & topN)
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ class DAGExpressionAnalyzer : private boost::noncopyable
ExpressionActionsChain & chain,
const std::vector<const tipb::Expr *> & conditions);

GroupingSets buildExpandGroupingColumns(const tipb::Expand & expand, const ExpressionActionsPtr & actions);

ExpressionActionsPtr appendExpand(const tipb::Expand & expand, ExpressionActionsChain & chain);

NamesAndTypes buildWindowOrderColumns(const tipb::Sort & window_sort) const;

std::vector<NameAndTypePair> appendOrderBy(
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ bool isSourceNode(const tipb::Executor * root)
const static String SOURCE_NAME("source");
const static String SEL_NAME("selection");
const static String AGG_NAME("aggregation");
const static String EXPAND_NAME("expand");
const static String WINDOW_NAME("window");
const static String WINDOW_SORT_NAME("window_sort");
const static String HAVING_NAME("having");
Expand Down Expand Up @@ -96,6 +97,12 @@ DAGQueryBlock::DAGQueryBlock(const tipb::Executor & root_, QueryBlockIDGenerator
}
current = &current->selection().child();
break;
case tipb::ExecType::TypeExpand:
GET_METRIC(tiflash_coprocessor_executor_count, type_expand).Increment();
assignOrThrowException(&expand, current, EXPAND_NAME);
expand_name = current->executor_id();
current = &current->expand().child();
break;
case tipb::ExecType::TypeStreamAgg:
RUNTIME_CHECK_MSG(current->aggregation().group_by_size() == 0, STREAM_AGG_ERROR);
case tipb::ExecType::TypeAggregation:
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlock.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class DAGQueryBlock
String having_name;
const tipb::Executor * limit_or_topn = nullptr;
String limit_or_topn_name;
const tipb::Executor * expand = nullptr;
String expand_name;
const tipb::Executor * exchange_sender = nullptr;
String exchange_sender_name;
UInt32 id;
Expand Down
54 changes: 46 additions & 8 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Core/NamesAndTypes.h>
#include <DataStreams/AggregatingBlockInputStream.h>
#include <DataStreams/ExchangeSenderBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <DataStreams/HashJoinBuildBlockInputStream.h>
#include <DataStreams/HashJoinProbeBlockInputStream.h>
Expand All @@ -43,6 +44,7 @@
#include <Flash/Coprocessor/StorageDisaggregatedInterpreter.h>
#include <Flash/Mpp/newMPPExchangeWriter.h>
#include <Interpreters/Aggregator.h>
#include <Interpreters/Expand.h>
#include <Interpreters/Join.h>
#include <Parsers/ASTSelectQuery.h>
#include <Storages/Transaction/TMTContext.h>
Expand Down Expand Up @@ -73,7 +75,10 @@ struct AnalysisResult
ExpressionActionsPtr before_where;
ExpressionActionsPtr before_aggregation;
ExpressionActionsPtr before_having;
ExpressionActionsPtr before_order_and_select;
// ExpressionActionsPtr before_order_and_select;
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
ExpressionActionsPtr before_order;
ExpressionActionsPtr before_expand;
ExpressionActionsPtr before_select;
ExpressionActionsPtr final_projection;

String filter_column_name;
Expand Down Expand Up @@ -131,6 +136,14 @@ AnalysisResult analyzeExpressions(
if (query_block.limit_or_topn && query_block.limit_or_topn->tp() == tipb::ExecType::TypeTopN)
{
res.order_columns = analyzer.appendOrderBy(chain, query_block.limit_or_topn->topn());
res.before_order = chain.getLastActions();
chain.addStep();
}

if (query_block.expand)
{
res.before_expand = analyzer.appendExpand(query_block.expand->expand(), chain);
chain.addStep();
}

const auto & dag_context = *context.getDAGContext();
Expand All @@ -146,7 +159,7 @@ AnalysisResult analyzeExpressions(
chain,
query_block.qb_column_prefix);

res.before_order_and_select = chain.getLastActions();
res.before_select = chain.getLastActions();

chain.finalize();
chain.clear();
Expand Down Expand Up @@ -570,12 +583,12 @@ void DAGQueryBlockInterpreter::handleWindowOrder(DAGPipeline & pipeline, const t
}

// To execute a query block, you have to:
// 1. generate the date stream and push it to pipeline.
// 1. generate the data stream and push it to pipeline.
// 2. assign the analyzer
// 3. construct a final projection, even if it's not necessary. just construct it.
// Talking about projection, it has the following rules.
// 1. if the query block does not contain agg, then the final project is the same as the source Executor
// 2. if the query block contains agg, then the final project is the same as agg Executor
// 2. if the query block contains agg/expand, then the final project is the same as agg/expand Executor
// 3. if the cop task may contains more then 1 query block, and the current query block is not the root
// query block, then the project should add an alias for each column that needs to be projected, something
// like final_project.emplace_back(col.name, query_block.qb_column_prefix + col.name);
Expand Down Expand Up @@ -664,9 +677,9 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)
executeWhere(pipeline, res.before_having, res.having_column_name, "execute having");
recordProfileStreams(pipeline, query_block.having_name);
}
if (res.before_order_and_select)
if (res.before_order)
{
executeExpression(pipeline, res.before_order_and_select, log, "before order and select");
executeExpression(pipeline, res.before_order, log, "before order");
}

if (!res.order_columns.empty())
Expand All @@ -676,14 +689,30 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)
recordProfileStreams(pipeline, query_block.limit_or_topn_name);
}

// execute final project action
executeProject(pipeline, final_project, "final projection");
// execute limit
if (query_block.limit_or_topn && query_block.limit_or_topn->tp() == tipb::TypeLimit)
{
executeLimit(pipeline);
recordProfileStreams(pipeline, query_block.limit_or_topn_name);
}

// execute the expand OP after all filter/limits and so on.
// since expand OP has some row replication work to do, place it after limit can reduce some unnecessary burden.
// and put it before the final projection, because we should recognize some base col as grouping set col before change their alias.
if (res.before_expand)
{
executeExpand(pipeline, res.before_expand);
recordProfileStreams(pipeline, query_block.expand_name);
}

if (res.before_select)
{
executeExpression(pipeline, res.before_select, log, "before select");
}

// execute final project action
executeProject(pipeline, final_project, "final projection");

restorePipelineConcurrency(pipeline);

// execute exchange_sender
Expand Down Expand Up @@ -724,6 +753,15 @@ void DAGQueryBlockInterpreter::executeLimit(DAGPipeline & pipeline)
}
}

void DAGQueryBlockInterpreter::executeExpand(DAGPipeline & pipeline, const ExpressionActionsPtr & expr)
{
String expand_extra_info = fmt::format("expand: grouping set {}", expr->getActions().back().expand->getGroupingSetsDes());
pipeline.transform([&](auto & stream) {
stream = std::make_shared<ExpressionBlockInputStream>(stream, expr, log->identifier());
stream->setExtraInfo(expand_extra_info);
});
}

void DAGQueryBlockInterpreter::handleExchangeSender(DAGPipeline & pipeline)
{
RUNTIME_ASSERT(dagContext().isMPPTask() && dagContext().tunnel_set != nullptr, log, "exchange_sender only run in MPP");
Expand Down
Loading