Skip to content

Commit

Permalink
move output_field_types and output_offsets to DAGContext (#4626)
Browse files Browse the repository at this point in the history
ref #4118
  • Loading branch information
SeaRise authored Apr 13, 2022
1 parent 65247ec commit e192ce5
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 38 deletions.
20 changes: 20 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include <DataStreams/IProfilingBlockInputStream.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/Coprocessor/collectOutputFieldTypes.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Statistics/traverseExecutors.h>
#include <Storages/Transaction/TMTContext.h>
Expand All @@ -33,6 +35,24 @@ bool strictSqlMode(UInt64 sql_mode)
return sql_mode & TiDBSQLMode::STRICT_ALL_TABLES || sql_mode & TiDBSQLMode::STRICT_TRANS_TABLES;
}

void DAGContext::initOutputInfo()
{
output_field_types = collectOutputFieldTypes(*dag_request);
output_offsets.clear();
result_field_types.clear();
for (UInt32 i : dag_request->output_offsets())
{
output_offsets.push_back(i);
if (unlikely(i >= output_field_types.size()))
throw TiFlashException(
fmt::format("{}: Invalid output offset(schema has {} columns, access index {}", __PRETTY_FUNCTION__, output_field_types.size(), i),
Errors::Coprocessor::BadRequest);
result_field_types.push_back(output_field_types[i]);
}
encode_type = analyzeDAGEncodeType(*this);
keep_session_timezone_info = encode_type == tipb::EncodeType::TypeChunk || encode_type == tipb::EncodeType::TypeCHBlock;
}

bool DAGContext::allowZeroInDate() const
{
return flags & TiDBSQLFlags::IGNORE_ZERO_IN_DATE;
Expand Down
22 changes: 19 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@

#include <Common/ConcurrentBoundedQueue.h>
#include <Common/Logger.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/IBlockInputStream.h>
#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/Coprocessor/TablesRegionsInfo.h>
#include <Flash/Mpp/MPPTaskId.h>
#include <Storages/Transaction/TiDB.h>
Expand Down Expand Up @@ -112,6 +112,7 @@ constexpr UInt64 ALLOW_INVALID_DATES = 1ul << 32ul;
class DAGContext
{
public:
// for non-mpp(cop/batchCop)
explicit DAGContext(const tipb::DAGRequest & dag_request_)
: dag_request(&dag_request_)
, collect_execution_summaries(dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries())
Expand All @@ -126,8 +127,11 @@ class DAGContext
{
assert(dag_request->has_root_executor() || dag_request->executors_size() > 0);
return_executor_id = dag_request->root_executor().has_executor_id() || dag_request->executors(0).has_executor_id();

initOutputInfo();
}

// for mpp
DAGContext(const tipb::DAGRequest & dag_request_, const mpp::TaskMeta & meta_, bool is_root_mpp_task_)
: dag_request(&dag_request_)
, collect_execution_summaries(dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries())
Expand All @@ -144,8 +148,13 @@ class DAGContext
, warning_count(0)
{
assert(dag_request->has_root_executor() && dag_request->root_executor().has_executor_id());

// only mpp task has join executor.
initExecutorIdToJoinIdMap();
initOutputInfo();
}

// for test
explicit DAGContext(UInt64 max_error_count_)
: dag_request(nullptr)
, collect_execution_summaries(false)
Expand All @@ -162,7 +171,6 @@ class DAGContext
void attachBlockIO(const BlockIO & io_);
std::unordered_map<String, BlockInputStreams> & getProfileStreamsMap();

void initExecutorIdToJoinIdMap();
std::unordered_map<String, std::vector<String>> & getExecutorIdToJoinIdMap();

std::unordered_map<String, JoinExecuteInfo> & getJoinExecuteInfoMap();
Expand Down Expand Up @@ -291,9 +299,17 @@ class DAGContext

LoggerPtr log;

bool keep_session_timezone_info = false;
// initialized in `initOutputInfo`.
std::vector<tipb::FieldType> result_field_types;
tipb::EncodeType encode_type = tipb::EncodeType::TypeDefault;
// only meaningful in final projection.
bool keep_session_timezone_info = false;
std::vector<tipb::FieldType> output_field_types;
std::vector<Int32> output_offsets;

private:
void initExecutorIdToJoinIdMap();
void initOutputInfo();

private:
/// Hold io for correcting the destruction order.
Expand Down
4 changes: 0 additions & 4 deletions dbms/src/Flash/Coprocessor/DAGQueryBlock.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ class DAGQueryBlock
String qb_column_prefix;
std::vector<std::shared_ptr<DAGQueryBlock>> children;

// only meaningful for root query block.
std::vector<tipb::FieldType> output_field_types;
std::vector<Int32> output_offsets;

bool isRootQueryBlock() const { return id == 1; };
bool isTableScanSource() const { return source->tp() == tipb::ExecType::TypeTableScan || source->tp() == tipb::ExecType::TypePartitionTableScan; }
};
Expand Down
11 changes: 4 additions & 7 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,10 @@ DAGQueryBlockInterpreter::DAGQueryBlockInterpreter(
const std::vector<BlockInputStreams> & input_streams_vec_,
const DAGQueryBlock & query_block_,
size_t max_streams_,
bool keep_session_timezone_info_,
std::vector<SubqueriesForSets> & subqueries_for_sets_)
: context(context_)
, input_streams_vec(input_streams_vec_)
, query_block(query_block_)
, keep_session_timezone_info(keep_session_timezone_info_)
, max_streams(max_streams_)
, subqueries_for_sets(subqueries_for_sets_)
, log(Logger::get("DAGQueryBlockInterpreter", dagContext().log ? dagContext().log->identifier() : ""))
Expand Down Expand Up @@ -118,7 +116,6 @@ AnalysisResult analyzeExpressions(
Context & context,
DAGExpressionAnalyzer & analyzer,
const DAGQueryBlock & query_block,
bool keep_session_timezone_info,
NamesWithAliases & final_project)
{
AnalysisResult res;
Expand Down Expand Up @@ -174,14 +171,15 @@ AnalysisResult analyzeExpressions(
res.order_columns = analyzer.appendOrderBy(chain, query_block.limit_or_topn->topn());
}

const auto & dag_context = *context.getDAGContext();
// Append final project results if needed.
final_project = query_block.isRootQueryBlock()
? analyzer.appendFinalProjectForRootQueryBlock(
chain,
query_block.output_field_types,
query_block.output_offsets,
dag_context.output_field_types,
dag_context.output_offsets,
query_block.qb_column_prefix,
keep_session_timezone_info)
dag_context.keep_session_timezone_info)
: analyzer.appendFinalProjectForNonRootQueryBlock(
chain,
query_block.qb_column_prefix);
Expand Down Expand Up @@ -1057,7 +1055,6 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)
context,
*analyzer,
query_block,
keep_session_timezone_info,
final_project);

if (res.before_where)
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ class DAGQueryBlockInterpreter
const std::vector<BlockInputStreams> & input_streams_vec_,
const DAGQueryBlock & query_block_,
size_t max_streams_,
bool keep_session_timezone_info_,
std::vector<SubqueriesForSets> & subqueries_for_sets_);

~DAGQueryBlockInterpreter() = default;
Expand Down Expand Up @@ -110,7 +109,6 @@ class DAGQueryBlockInterpreter
Context & context;
std::vector<BlockInputStreams> input_streams_vec;
const DAGQueryBlock & query_block;
const bool keep_session_timezone_info;

NamesWithAliases final_project;

Expand Down
21 changes: 0 additions & 21 deletions dbms/src/Flash/Coprocessor/DAGQuerySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@

namespace DB
{
namespace ErrorCodes
{
extern const int COP_BAD_DAG_REQUEST;
} // namespace ErrorCodes

DAGQuerySource::DAGQuerySource(Context & context_)
: context(context_)
{
Expand All @@ -38,22 +33,6 @@ DAGQuerySource::DAGQuerySource(Context & context_)
{
root_query_block = std::make_shared<DAGQueryBlock>(1, dag_request.executors());
}

root_query_block->output_field_types = collectOutputFieldTypes(dag_request);
getDAGContext().initExecutorIdToJoinIdMap();

for (UInt32 i : dag_request.output_offsets())
{
root_query_block->output_offsets.push_back(i);
if (unlikely(i >= root_query_block->output_field_types.size()))
throw TiFlashException(
fmt::format("{}: Invalid output offset(schema has {} columns, access index {}", __PRETTY_FUNCTION__, root_query_block->output_field_types.size(), i),
Errors::Coprocessor::BadRequest);
getDAGContext().result_field_types.push_back(root_query_block->output_field_types[i]);
}
auto encode_type = analyzeDAGEncodeType(getDAGContext());
getDAGContext().encode_type = encode_type;
getDAGContext().keep_session_timezone_info = encode_type == tipb::EncodeType::TypeChunk || encode_type == tipb::EncodeType::TypeCHBlock;
}

std::tuple<std::string, ASTPtr> DAGQuerySource::parse(size_t)
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ BlockInputStreams InterpreterDAG::executeQueryBlock(DAGQueryBlock & query_block,
input_streams_vec,
query_block,
max_streams,
dagContext().keep_session_timezone_info || !query_block.isRootQueryBlock(),
subqueries_for_sets);
return query_block_interpreter.execute();
}
Expand Down

0 comments on commit e192ce5

Please sign in to comment.