Skip to content

Commit

Permalink
Migrate table scan related code to DAGStorageInterpreter (#4783)
Browse files Browse the repository at this point in the history
ref #4118
  • Loading branch information
SeaRise authored May 5, 2022
1 parent 5aa1d14 commit e10c6ed
Show file tree
Hide file tree
Showing 12 changed files with 529 additions and 372 deletions.
298 changes: 4 additions & 294 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp

Large diffs are not rendered by default.

10 changes: 0 additions & 10 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,6 @@ class DAGQueryBlockInterpreter
#endif
void executeImpl(DAGPipeline & pipeline);
void handleTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline);
void executeCastAfterTableScan(
const TiDBTableScan & table_scan,
const std::vector<ExtraCastAfterTSMode> & is_need_add_cast_column,
size_t remote_read_streams_start_index,
DAGPipeline & pipeline);
void executePushedDownFilter(const std::vector<const tipb::Expr *> & conditions, size_t remote_read_streams_start_index, DAGPipeline & pipeline);
void handleJoin(const tipb::Join & join, DAGPipeline & pipeline, SubqueryForSet & right_query);
void prepareJoin(
const google::protobuf::RepeatedPtrField<tipb::Expr> & keys,
Expand Down Expand Up @@ -108,10 +102,6 @@ class DAGQueryBlockInterpreter

void restorePipelineConcurrency(DAGPipeline & pipeline);

void executeRemoteQueryImpl(
DAGPipeline & pipeline,
std::vector<RemoteRequest> & remote_requests);

DAGContext & dagContext() const { return *context.getDAGContext(); }

Context & context;
Expand Down
379 changes: 339 additions & 40 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp

Large diffs are not rendered by default.

41 changes: 29 additions & 12 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
#include <Flash/Coprocessor/ChunkCodec.h>
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Flash/Coprocessor/DAGQueryBlock.h>
#include <Flash/Coprocessor/DAGQuerySource.h>
#include <Flash/Coprocessor/PushDownFilter.h>
#include <Flash/Coprocessor/RemoteRequest.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Interpreters/Context.h>
Expand Down Expand Up @@ -50,9 +49,8 @@ class DAGStorageInterpreter
public:
DAGStorageInterpreter(
Context & context_,
const DAGQueryBlock & query_block_,
const TiDBTableScan & table_scan,
const std::vector<const tipb::Expr *> & conditions_,
const PushDownFilter & push_down_filter_,
size_t max_streams_);

DAGStorageInterpreter(DAGStorageInterpreter &&) = delete;
Expand All @@ -63,12 +61,6 @@ class DAGStorageInterpreter
/// Members will be transfered to DAGQueryBlockInterpreter after execute

std::unique_ptr<DAGExpressionAnalyzer> analyzer;
std::vector<ExtraCastAfterTSMode> is_need_add_cast_column;
/// it shouldn't be hash map because duplicated region id may occur if merge regions to retry of dag.
RegionRetryList region_retry_from_local_region;
TableLockHolders drop_locks;
std::vector<RemoteRequest> remote_requests;
BlockInputStreamPtr null_stream_if_empty;

private:
struct StorageWithStructureLock
Expand All @@ -92,12 +84,37 @@ class DAGStorageInterpreter

std::unordered_map<TableID, SelectQueryInfo> generateSelectQueryInfos();

DAGContext & dagContext() const;

void recordProfileStreams(DAGPipeline & pipeline, const String & key);

void executeRemoteQuery(DAGPipeline & pipeline);

void executeCastAfterTableScan(
size_t remote_read_streams_start_index,
DAGPipeline & pipeline);

void executePushedDownFilter(
size_t remote_read_streams_start_index,
DAGPipeline & pipeline);

void prepare();

void executeImpl(DAGPipeline & pipeline);

private:
std::vector<ExtraCastAfterTSMode> is_need_add_cast_column;
/// it shouldn't be hash map because duplicated region id may occur if merge regions to retry of dag.
RegionRetryList region_retry_from_local_region;
TableLockHolders drop_locks;
std::vector<RemoteRequest> remote_requests;
BlockInputStreamPtr null_stream_if_empty;

/// passed from caller, doesn't change during DAGStorageInterpreter's lifetime

Context & context;
const DAGQueryBlock & query_block;
const TiDBTableScan & table_scan;
const std::vector<const tipb::Expr *> & conditions;
const PushDownFilter & push_down_filter;
size_t max_streams;
LoggerPtr log;

Expand Down
14 changes: 14 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <DataStreams/SharedQueryBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Interpreters/Context.h>

namespace DB
{
Expand Down Expand Up @@ -79,4 +80,17 @@ void executeUnion(
pipeline.streams.push_back(non_joined_data_stream);
}
}

ExpressionActionsPtr generateProjectExpressionActions(
const BlockInputStreamPtr & stream,
const Context & context,
const NamesWithAliases & project_cols)
{
NamesAndTypesList input_column;
for (const auto & column : stream->getHeader())
input_column.emplace_back(column.name, column.type);
ExpressionActionsPtr project = std::make_shared<ExpressionActions>(input_column, context.getSettingsRef());
project->add(ExpressionAction::project(project_cols));
return project;
}
} // namespace DB
8 changes: 8 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@

#include <Common/Logger.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Interpreters/ExpressionActions.h>

namespace DB
{
class Context;

void restoreConcurrency(
DAGPipeline & pipeline,
size_t concurrency,
Expand All @@ -35,4 +38,9 @@ void executeUnion(
size_t max_streams,
const LoggerPtr & log,
bool ignore_block = false);

ExpressionActionsPtr generateProjectExpressionActions(
const BlockInputStreamPtr & stream,
const Context & context,
const NamesWithAliases & project_cols);
} // namespace DB
65 changes: 65 additions & 0 deletions dbms/src/Flash/Coprocessor/PushDownFilter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/TiFlashException.h>
#include <Flash/Coprocessor/PushDownFilter.h>
#include <common/likely.h>

namespace DB
{
PushDownFilter::PushDownFilter(
const String & executor_id_,
const std::vector<const tipb::Expr *> & conditions_)
: executor_id(executor_id_)
, conditions(conditions_)
{
if (unlikely(conditions.empty() != executor_id.empty()))
{
throw TiFlashException(
"for PushDownFilter, conditions and executor_id should both be empty or neither should be empty",
Errors::Coprocessor::BadRequest);
}
}

tipb::Executor * PushDownFilter::constructSelectionForRemoteRead(tipb::Executor * mutable_executor) const
{
if (hasValue())
{
mutable_executor->set_tp(tipb::ExecType::TypeSelection);
mutable_executor->set_executor_id(executor_id);
auto * new_selection = mutable_executor->mutable_selection();
for (const auto & condition : conditions)
*new_selection->add_conditions() = *condition;
return new_selection->mutable_child();
}
else
{
return mutable_executor;
}
}

PushDownFilter PushDownFilter::toPushDownFilter(const tipb::Executor * filter_executor)
{
if (!filter_executor || !filter_executor->has_selection())
{
return {"", {}};
}

std::vector<const tipb::Expr *> conditions;
for (const auto & condition : filter_executor->selection().conditions())
conditions.push_back(&condition);

return {filter_executor->executor_id(), conditions};
}
} // namespace DB
39 changes: 39 additions & 0 deletions dbms/src/Flash/Coprocessor/PushDownFilter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <common/types.h>
#include <tipb/executor.pb.h>

#include <vector>

namespace DB
{
struct PushDownFilter
{
static PushDownFilter toPushDownFilter(const tipb::Executor * filter_executor);

PushDownFilter(
const String & executor_id_,
const std::vector<const tipb::Expr *> & conditions_);

bool hasValue() const { return !conditions.empty(); }

tipb::Executor * constructSelectionForRemoteRead(tipb::Executor * mutable_executor) const;

String executor_id;
std::vector<const tipb::Expr *> conditions;
};
} // namespace DB
19 changes: 8 additions & 11 deletions dbms/src/Flash/Coprocessor/RemoteRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@

namespace DB
{
RemoteRequest RemoteRequest::build(const RegionRetryList & retry_regions, DAGContext & dag_context, const TiDBTableScan & table_scan, const TiDB::TableInfo & table_info, const tipb::Executor * selection, LoggerPtr & log)
RemoteRequest RemoteRequest::build(
const RegionRetryList & retry_regions,
DAGContext & dag_context,
const TiDBTableScan & table_scan,
const TiDB::TableInfo & table_info,
const PushDownFilter & push_down_filter,
const LoggerPtr & log)
{
auto print_retry_regions = [&retry_regions, &table_info] {
FmtBuffer buffer;
Expand All @@ -35,16 +41,7 @@ RemoteRequest RemoteRequest::build(const RegionRetryList & retry_regions, DAGCon

DAGSchema schema;
tipb::DAGRequest dag_req;
auto * executor = dag_req.mutable_root_executor();
if (selection != nullptr)
{
executor->set_tp(tipb::ExecType::TypeSelection);
executor->set_executor_id(selection->executor_id());
auto * new_selection = executor->mutable_selection();
for (const auto & condition : selection->selection().conditions())
*new_selection->add_conditions() = condition;
executor = new_selection->mutable_child();
}
auto * executor = push_down_filter.constructSelectionForRemoteRead(dag_req.mutable_root_executor());

{
tipb::Executor * ts_exec = executor;
Expand Down
14 changes: 12 additions & 2 deletions dbms/src/Flash/Coprocessor/RemoteRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Flash/Coprocessor/PushDownFilter.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Storages/Transaction/TiDB.h>
#include <pingcap/coprocessor/Client.h>
Expand All @@ -34,7 +35,10 @@ using DAGSchema = std::vector<DAGColumnInfo>;

struct RemoteRequest
{
RemoteRequest(tipb::DAGRequest && dag_request_, DAGSchema && schema_, std::vector<pingcap::coprocessor::KeyRange> && key_ranges_)
RemoteRequest(
tipb::DAGRequest && dag_request_,
DAGSchema && schema_,
std::vector<pingcap::coprocessor::KeyRange> && key_ranges_)
: dag_request(std::move(dag_request_))
, schema(std::move(schema_))
, key_ranges(std::move(key_ranges_))
Expand All @@ -43,6 +47,12 @@ struct RemoteRequest
DAGSchema schema;
/// the sorted key ranges
std::vector<pingcap::coprocessor::KeyRange> key_ranges;
static RemoteRequest build(const RegionRetryList & retry_regions, DAGContext & dag_context, const TiDBTableScan & table_scan, const TiDB::TableInfo & table_info, const tipb::Executor * selection, LoggerPtr & log);
static RemoteRequest build(
const RegionRetryList & retry_regions,
DAGContext & dag_context,
const TiDBTableScan & table_scan,
const TiDB::TableInfo & table_info,
const PushDownFilter & push_down_filter,
const LoggerPtr & log);
};
} // namespace DB
6 changes: 5 additions & 1 deletion dbms/src/Flash/Coprocessor/TiDBTableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@

namespace DB
{
TiDBTableScan::TiDBTableScan(const tipb::Executor * table_scan_, const DAGContext & dag_context)
TiDBTableScan::TiDBTableScan(
const tipb::Executor * table_scan_,
const String & executor_id_,
const DAGContext & dag_context)
: table_scan(table_scan_)
, executor_id(executor_id_)
, is_partition_table_scan(table_scan->tp() == tipb::TypePartitionTableScan)
, columns(is_partition_table_scan ? table_scan->partition_table_scan().columns() : table_scan->tbl_scan().columns())
{
Expand Down
8 changes: 6 additions & 2 deletions dbms/src/Flash/Coprocessor/TiDBTableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ namespace DB
class TiDBTableScan
{
public:
TiDBTableScan(const tipb::Executor * table_scan_, const DAGContext & dag_context);
TiDBTableScan(
const tipb::Executor * table_scan_,
const String & executor_id_,
const DAGContext & dag_context);
bool isPartitionTableScan() const
{
return is_partition_table_scan;
Expand All @@ -48,11 +51,12 @@ class TiDBTableScan
}
String getTableScanExecutorID() const
{
return table_scan->executor_id();
return executor_id;
}

private:
const tipb::Executor * table_scan;
String executor_id;
bool is_partition_table_scan;
const google::protobuf::RepeatedPtrField<tipb::ColumnInfo> & columns;
/// logical_table_id is the table id for a TiDB' table, while if the
Expand Down

0 comments on commit e10c6ed

Please sign in to comment.