Skip to content

Commit

Permalink
Planner: Support common physical plans (#5143)
Browse files Browse the repository at this point in the history
ref #4739
  • Loading branch information
SeaRise committed Jul 8, 2022
1 parent aabe213 commit 07b1b20
Show file tree
Hide file tree
Showing 63 changed files with 2,922 additions and 647 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/license-checker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
steps:
- uses: actions/checkout@v2
- name: Check License Header
uses: apache/skywalking-eyes@main
uses: apache/skywalking-eyes@v0.3.0
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
Expand Down
13 changes: 3 additions & 10 deletions dbms/src/DataStreams/TiRemoteBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Coprocessor/CoprocessorReader.h>
#include <Flash/Coprocessor/DAGResponseWriter.h>
#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Statistics/ConnectionProfileInfo.h>
#include <Interpreters/Context.h>
Expand Down Expand Up @@ -176,21 +177,13 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
, log(Logger::get(name, req_id, executor_id))
, total_rows(0)
{
// generate sample block
ColumnsWithTypeAndName columns;
for (auto & dag_col : remote_reader->getOutputSchema())
{
auto tp = getDataTypeByColumnInfoForComputingLayer(dag_col.second);
ColumnWithTypeAndName col(tp, dag_col.first);
columns.emplace_back(col);
}
for (size_t i = 0; i < source_num; i++)
for (size_t i = 0; i < source_num; ++i)
{
execution_summaries_inited[i].store(false);
}
execution_summaries.resize(source_num);
connection_profile_infos.resize(source_num);
sample_block = Block(columns);
sample_block = Block(getColumnWithTypeAndName(toNamesAndTypes(remote_reader->getOutputSchema())));
}

Block getHeader() const override { return sample_block; }
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Debug/astToExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ std::unordered_map<String, tipb::ScalarFuncSig> func_name_to_sig({
{"cast_decimal_datetime", tipb::ScalarFuncSig::CastDecimalAsTime},
{"cast_time_datetime", tipb::ScalarFuncSig::CastTimeAsTime},
{"cast_string_datetime", tipb::ScalarFuncSig::CastStringAsTime},
{"concat", tipb::ScalarFuncSig::Concat},
{"round_int", tipb::ScalarFuncSig::RoundInt},
{"round_uint", tipb::ScalarFuncSig::RoundInt},
{"round_dec", tipb::ScalarFuncSig::RoundDec},
Expand Down Expand Up @@ -455,6 +456,14 @@ void functionToPB(const DAGSchema & input, ASTFunction * func, tipb::Expr * expr
ft->set_collate(collator_id);
break;
}
case tipb::ScalarFuncSig::Concat:
{
expr->set_sig(it_sig->second);
auto * ft = expr->mutable_field_type();
ft->set_tp(TiDB::TypeString);
ft->set_collate(collator_id);
break;
}
case tipb::ScalarFuncSig::RoundInt:
case tipb::ScalarFuncSig::RoundWithFracInt:
{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@ target_link_libraries(flash_service dbms)

if (ENABLE_TESTS)
add_subdirectory(Coprocessor/tests)
add_subdirectory(Planner/tests)
add_subdirectory(tests)
endif ()
5 changes: 5 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ std::unordered_map<String, BlockInputStreams> & DAGContext::getProfileStreamsMap
return profile_streams_map;
}

void DAGContext::updateFinalConcurrency(size_t cur_streams_size, size_t streams_upper_limit)
{
final_concurrency = std::min(std::max(final_concurrency, cur_streams_size), streams_upper_limit);
}

void DAGContext::initExecutorIdToJoinIdMap()
{
// only mpp task has join executor
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ class DAGContext
return sql_mode & f;
}

void updateFinalConcurrency(size_t cur_streams_size, size_t streams_upper_limit);

bool isTest() const { return is_test; }
void setColumnsForTest(std::unordered_map<String, ColumnsWithTypeAndName> & columns_for_test_map_) { columns_for_test_map = columns_for_test_map_; }
ColumnsWithTypeAndName columnsForTest(String executor_id);
Expand Down
24 changes: 17 additions & 7 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1124,30 +1124,40 @@ std::pair<bool, BoolVec> DAGExpressionAnalyzer::isCastRequiredForRootFinalProjec
return std::make_pair(need_append_type_cast, std::move(need_append_type_cast_vec));
}

NamesWithAliases DAGExpressionAnalyzer::appendFinalProjectForRootQueryBlock(
ExpressionActionsChain & chain,
NamesWithAliases DAGExpressionAnalyzer::buildFinalProjection(
const ExpressionActionsPtr & actions,
const std::vector<tipb::FieldType> & schema,
const std::vector<Int32> & output_offsets,
const String & column_prefix,
bool keep_session_timezone_info)
{
if (unlikely(output_offsets.empty()))
throw Exception("Root Query block without output_offsets", ErrorCodes::LOGICAL_ERROR);
throw Exception("DAGRequest without output_offsets", ErrorCodes::LOGICAL_ERROR);

bool need_append_timezone_cast = !keep_session_timezone_info && !context.getTimezoneInfo().is_utc_timezone;
auto [need_append_type_cast, need_append_type_cast_vec] = isCastRequiredForRootFinalProjection(schema, output_offsets);
assert(need_append_type_cast_vec.size() == output_offsets.size());

auto & step = initAndGetLastStep(chain);

if (need_append_timezone_cast || need_append_type_cast)
{
// after appendCastForRootFinalProjection, source_columns has been modified.
appendCastForRootFinalProjection(step.actions, schema, output_offsets, need_append_timezone_cast, need_append_type_cast_vec);
appendCastForRootFinalProjection(actions, schema, output_offsets, need_append_timezone_cast, need_append_type_cast_vec);
}

// generate project aliases from source_columns.
NamesWithAliases final_project = genRootFinalProjectAliases(column_prefix, output_offsets);
return genRootFinalProjectAliases(column_prefix, output_offsets);
}

NamesWithAliases DAGExpressionAnalyzer::appendFinalProjectForRootQueryBlock(
ExpressionActionsChain & chain,
const std::vector<tipb::FieldType> & schema,
const std::vector<Int32> & output_offsets,
const String & column_prefix,
bool keep_session_timezone_info)
{
auto & step = initAndGetLastStep(chain);

NamesWithAliases final_project = buildFinalProjection(step.actions, schema, output_offsets, column_prefix, keep_session_timezone_info);

for (const auto & name : final_project)
{
Expand Down
63 changes: 38 additions & 25 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ class DAGExpressionAnalyzer : private boost::noncopyable

const Context & getContext() const { return context; }

void reset(const std::vector<NameAndTypePair> & source_columns_)
{
source_columns = source_columns_;
prepared_sets.clear();
}

const std::vector<NameAndTypePair> & getCurrentInputColumns() const;

DAGPreparedSets & getPreparedSets() { return prepared_sets; }
Expand Down Expand Up @@ -102,6 +108,8 @@ class DAGExpressionAnalyzer : private boost::noncopyable
ExpressionActionsChain & chain,
const String & column_prefix) const;

NamesWithAliases genNonRootFinalProjectAliases(const String & column_prefix) const;

// Generate a project action for root DAGQueryBlock,
// to keep the schema of Block and tidb-schema the same.
NamesWithAliases appendFinalProjectForRootQueryBlock(
Expand All @@ -111,6 +119,13 @@ class DAGExpressionAnalyzer : private boost::noncopyable
const String & column_prefix,
bool keep_session_timezone_info);

NamesWithAliases buildFinalProjection(
const ExpressionActionsPtr & actions,
const std::vector<tipb::FieldType> & schema,
const std::vector<Int32> & output_offsets,
const String & column_prefix,
bool keep_session_timezone_info);

String getActions(
const tipb::Expr & expr,
const ExpressionActionsPtr & actions,
Expand Down Expand Up @@ -153,17 +168,37 @@ class DAGExpressionAnalyzer : private boost::noncopyable
const tipb::Window & window,
const size_t window_columns_start_index);

#ifndef DBMS_PUBLIC_GTEST
private:
#endif
NamesAndTypes buildOrderColumns(
const ExpressionActionsPtr & actions,
const ::google::protobuf::RepeatedPtrField<tipb::ByItem> & order_by);

String buildFilterColumn(
const ExpressionActionsPtr & actions,
const std::vector<const tipb::Expr *> & conditions);

void buildAggFuncs(
const tipb::Aggregation & aggregation,
const ExpressionActionsPtr & actions,
AggregateDescriptions & aggregate_descriptions,
NamesAndTypes & aggregated_columns);

void buildAggGroupBy(
const google::protobuf::RepeatedPtrField<tipb::Expr> & group_by,
const ExpressionActionsPtr & actions,
AggregateDescriptions & aggregate_descriptions,
NamesAndTypes & aggregated_columns,
Names & aggregation_keys,
std::unordered_set<String> & agg_key_set,
bool group_by_collation_sensitive,
TiDB::TiDBCollators & collators);

void appendCastAfterAgg(
const ExpressionActionsPtr & actions,
const tipb::Aggregation & agg);

#ifndef DBMS_PUBLIC_GTEST
private:
#endif
String buildTupleFunctionForGroupConcat(
const tipb::Expr & expr,
SortDescription & sort_desc,
Expand All @@ -187,22 +222,6 @@ class DAGExpressionAnalyzer : private boost::noncopyable
NamesAndTypes & aggregated_columns,
bool empty_input_as_null);

void buildAggFuncs(
const tipb::Aggregation & aggregation,
const ExpressionActionsPtr & actions,
AggregateDescriptions & aggregate_descriptions,
NamesAndTypes & aggregated_columns);

void buildAggGroupBy(
const google::protobuf::RepeatedPtrField<tipb::Expr> & group_by,
const ExpressionActionsPtr & actions,
AggregateDescriptions & aggregate_descriptions,
NamesAndTypes & aggregated_columns,
Names & aggregation_keys,
std::unordered_set<String> & agg_key_set,
bool group_by_collation_sensitive,
TiDB::TiDBCollators & collators);

void fillArgumentDetail(
const ExpressionActionsPtr & actions,
const tipb::Expr & arg,
Expand Down Expand Up @@ -275,12 +294,6 @@ class DAGExpressionAnalyzer : private boost::noncopyable
const ExpressionActionsPtr & actions,
const String & column_name);

String buildFilterColumn(
const ExpressionActionsPtr & actions,
const std::vector<const tipb::Expr *> & conditions);

NamesWithAliases genNonRootFinalProjectAliases(const String & column_prefix) const;

NamesWithAliases genRootFinalProjectAliases(
const String & column_prefix,
const std::vector<Int32> & output_offsets) const;
Expand Down
Loading

0 comments on commit 07b1b20

Please sign in to comment.