Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move output_field_types and output_offsets to DAGContext #4626

Merged
merged 6 commits into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include <DataStreams/IProfilingBlockInputStream.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/Coprocessor/collectOutputFieldTypes.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Statistics/traverseExecutors.h>
#include <Storages/Transaction/TMTContext.h>
Expand All @@ -33,6 +35,24 @@ bool strictSqlMode(UInt64 sql_mode)
return sql_mode & TiDBSQLMode::STRICT_ALL_TABLES || sql_mode & TiDBSQLMode::STRICT_TRANS_TABLES;
}

void DAGContext::initOutputInfo()
{
output_field_types = collectOutputFieldTypes(*dag_request);
output_offsets.clear();
result_field_types.clear();
for (UInt32 i : dag_request->output_offsets())
{
output_offsets.push_back(i);
if (unlikely(i >= output_field_types.size()))
throw TiFlashException(
fmt::format("{}: Invalid output offset(schema has {} columns, access index {}", __PRETTY_FUNCTION__, output_field_types.size(), i),
Errors::Coprocessor::BadRequest);
result_field_types.push_back(output_field_types[i]);
}
encode_type = analyzeDAGEncodeType(*this);
keep_session_timezone_info = encode_type == tipb::EncodeType::TypeChunk || encode_type == tipb::EncodeType::TypeCHBlock;
}

bool DAGContext::allowZeroInDate() const
{
return flags & TiDBSQLFlags::IGNORE_ZERO_IN_DATE;
Expand Down
22 changes: 19 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@

#include <Common/ConcurrentBoundedQueue.h>
#include <Common/Logger.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/IBlockInputStream.h>
#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/Coprocessor/TablesRegionsInfo.h>
#include <Flash/Mpp/MPPTaskId.h>
#include <Storages/Transaction/TiDB.h>
Expand Down Expand Up @@ -112,6 +112,7 @@ constexpr UInt64 ALLOW_INVALID_DATES = 1ul << 32ul;
class DAGContext
{
public:
// for non-mpp(cop/batchCop)
explicit DAGContext(const tipb::DAGRequest & dag_request_)
: dag_request(&dag_request_)
, collect_execution_summaries(dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries())
Expand All @@ -126,8 +127,11 @@ class DAGContext
{
assert(dag_request->has_root_executor() || dag_request->executors_size() > 0);
return_executor_id = dag_request->root_executor().has_executor_id() || dag_request->executors(0).has_executor_id();

initOutputInfo();
}

// for mpp
DAGContext(const tipb::DAGRequest & dag_request_, const mpp::TaskMeta & meta_, bool is_root_mpp_task_)
: dag_request(&dag_request_)
, collect_execution_summaries(dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries())
Expand All @@ -144,8 +148,13 @@ class DAGContext
, warning_count(0)
{
assert(dag_request->has_root_executor() && dag_request->root_executor().has_executor_id());

// only mpp task has join executor.
initExecutorIdToJoinIdMap();
initOutputInfo();
}

// for test
explicit DAGContext(UInt64 max_error_count_)
: dag_request(nullptr)
, collect_execution_summaries(false)
Expand All @@ -162,7 +171,6 @@ class DAGContext
void attachBlockIO(const BlockIO & io_);
std::unordered_map<String, BlockInputStreams> & getProfileStreamsMap();

void initExecutorIdToJoinIdMap();
std::unordered_map<String, std::vector<String>> & getExecutorIdToJoinIdMap();

std::unordered_map<String, JoinExecuteInfo> & getJoinExecuteInfoMap();
Expand Down Expand Up @@ -291,9 +299,17 @@ class DAGContext

LoggerPtr log;

bool keep_session_timezone_info = false;
// initialized in `initOutputInfo`.
std::vector<tipb::FieldType> result_field_types;
tipb::EncodeType encode_type = tipb::EncodeType::TypeDefault;
// only meaningful in final projection.
bool keep_session_timezone_info = false;
std::vector<tipb::FieldType> output_field_types;
std::vector<Int32> output_offsets;

private:
void initExecutorIdToJoinIdMap();
void initOutputInfo();
SeaRise marked this conversation as resolved.
Show resolved Hide resolved

private:
/// Hold io for correcting the destruction order.
Expand Down
4 changes: 0 additions & 4 deletions dbms/src/Flash/Coprocessor/DAGQueryBlock.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ class DAGQueryBlock
String qb_column_prefix;
std::vector<std::shared_ptr<DAGQueryBlock>> children;

// only meaningful for root query block.
std::vector<tipb::FieldType> output_field_types;
std::vector<Int32> output_offsets;

bool isRootQueryBlock() const { return id == 1; };
bool isTableScanSource() const { return source->tp() == tipb::ExecType::TypeTableScan || source->tp() == tipb::ExecType::TypePartitionTableScan; }
};
Expand Down
11 changes: 4 additions & 7 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,10 @@ DAGQueryBlockInterpreter::DAGQueryBlockInterpreter(
const std::vector<BlockInputStreams> & input_streams_vec_,
const DAGQueryBlock & query_block_,
size_t max_streams_,
bool keep_session_timezone_info_,
std::vector<SubqueriesForSets> & subqueries_for_sets_)
: context(context_)
, input_streams_vec(input_streams_vec_)
, query_block(query_block_)
, keep_session_timezone_info(keep_session_timezone_info_)
, max_streams(max_streams_)
, subqueries_for_sets(subqueries_for_sets_)
, log(Logger::get("DAGQueryBlockInterpreter", dagContext().log ? dagContext().log->identifier() : ""))
Expand Down Expand Up @@ -118,7 +116,6 @@ AnalysisResult analyzeExpressions(
Context & context,
DAGExpressionAnalyzer & analyzer,
const DAGQueryBlock & query_block,
bool keep_session_timezone_info,
NamesWithAliases & final_project)
{
AnalysisResult res;
Expand Down Expand Up @@ -174,14 +171,15 @@ AnalysisResult analyzeExpressions(
res.order_columns = analyzer.appendOrderBy(chain, query_block.limit_or_topn->topn());
}

const auto & dag_context = *context.getDAGContext();
// Append final project results if needed.
final_project = query_block.isRootQueryBlock()
? analyzer.appendFinalProjectForRootQueryBlock(
chain,
query_block.output_field_types,
query_block.output_offsets,
dag_context.output_field_types,
dag_context.output_offsets,
query_block.qb_column_prefix,
keep_session_timezone_info)
dag_context.keep_session_timezone_info)
: analyzer.appendFinalProjectForNonRootQueryBlock(
chain,
query_block.qb_column_prefix);
Expand Down Expand Up @@ -1057,7 +1055,6 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)
context,
*analyzer,
query_block,
keep_session_timezone_info,
final_project);

if (res.before_where)
Expand Down
2 changes: 0 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ class DAGQueryBlockInterpreter
const std::vector<BlockInputStreams> & input_streams_vec_,
const DAGQueryBlock & query_block_,
size_t max_streams_,
bool keep_session_timezone_info_,
std::vector<SubqueriesForSets> & subqueries_for_sets_);

~DAGQueryBlockInterpreter() = default;
Expand Down Expand Up @@ -110,7 +109,6 @@ class DAGQueryBlockInterpreter
Context & context;
std::vector<BlockInputStreams> input_streams_vec;
const DAGQueryBlock & query_block;
const bool keep_session_timezone_info;

NamesWithAliases final_project;

Expand Down
21 changes: 0 additions & 21 deletions dbms/src/Flash/Coprocessor/DAGQuerySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@

namespace DB
{
namespace ErrorCodes
{
extern const int COP_BAD_DAG_REQUEST;
} // namespace ErrorCodes

DAGQuerySource::DAGQuerySource(Context & context_)
: context(context_)
{
Expand All @@ -38,22 +33,6 @@ DAGQuerySource::DAGQuerySource(Context & context_)
{
root_query_block = std::make_shared<DAGQueryBlock>(1, dag_request.executors());
}

root_query_block->output_field_types = collectOutputFieldTypes(dag_request);
getDAGContext().initExecutorIdToJoinIdMap();

for (UInt32 i : dag_request.output_offsets())
{
root_query_block->output_offsets.push_back(i);
if (unlikely(i >= root_query_block->output_field_types.size()))
throw TiFlashException(
fmt::format("{}: Invalid output offset(schema has {} columns, access index {}", __PRETTY_FUNCTION__, root_query_block->output_field_types.size(), i),
Errors::Coprocessor::BadRequest);
getDAGContext().result_field_types.push_back(root_query_block->output_field_types[i]);
}
auto encode_type = analyzeDAGEncodeType(getDAGContext());
getDAGContext().encode_type = encode_type;
getDAGContext().keep_session_timezone_info = encode_type == tipb::EncodeType::TypeChunk || encode_type == tipb::EncodeType::TypeCHBlock;
}

std::tuple<std::string, ASTPtr> DAGQuerySource::parse(size_t)
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ BlockInputStreams InterpreterDAG::executeQueryBlock(DAGQueryBlock & query_block,
input_streams_vec,
query_block,
max_streams,
dagContext().keep_session_timezone_info || !query_block.isRootQueryBlock(),
subqueries_for_sets);
return query_block_interpreter.execute();
}
Expand Down