Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refine SubqueryForSet #4623

Merged
merged 12 commits into from
Apr 18, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 14 additions & 47 deletions dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,43 +37,16 @@ namespace ErrorCodes
extern const int SET_SIZE_LIMIT_EXCEEDED;
}

CreatingSetsBlockInputStream::CreatingSetsBlockInputStream(
const BlockInputStreamPtr & input,
std::vector<SubqueriesForSets> && subqueries_for_sets_list_,
const SizeLimits & network_transfer_limits,
const String & req_id)
: subqueries_for_sets_list(std::move(subqueries_for_sets_list_))
, network_transfer_limits(network_transfer_limits)
, log(Logger::get(name, req_id))
{
init(input);
}

CreatingSetsBlockInputStream::CreatingSetsBlockInputStream(
const BlockInputStreamPtr & input,
const SubqueriesForSets & subqueries_for_sets,
const SizeLimits & network_transfer_limits,
const String & req_id)
: network_transfer_limits(network_transfer_limits)
, log(Logger::get(name, req_id))
{
subqueries_for_sets_list.push_back(subqueries_for_sets);
init(input);
}

void CreatingSetsBlockInputStream::init(const BlockInputStreamPtr & input)
{
for (auto & subqueries_for_sets : subqueries_for_sets_list)
for (auto & elem : subqueries_for_sets)
{
for (auto & elem : subqueries_for_sets)
if (elem.second.source)
{
if (elem.second.source)
{
children.push_back(elem.second.source);
children.push_back(elem.second.source);

if (elem.second.set)
elem.second.set->setHeader(elem.second.source->getHeader());
}
if (elem.second.set)
elem.second.set->setHeader(elem.second.source->getHeader());
}
}

Expand Down Expand Up @@ -115,27 +88,21 @@ void CreatingSetsBlockInputStream::createAll()
{
if (!created)
{
for (auto & subqueries_for_sets : subqueries_for_sets_list)
for (auto & elem : subqueries_for_sets)
{
for (auto & elem : subqueries_for_sets)
{
if (elem.second.join)
elem.second.join->setBuildTableState(Join::BuildTableState::WAITING);
}
if (elem.second.join)
elem.second.join->setBuildTableState(Join::BuildTableState::WAITING);
}
Stopwatch watch;
auto thread_manager = newThreadManager();
for (auto & subqueries_for_sets : subqueries_for_sets_list)
for (auto & elem : subqueries_for_sets)
{
for (auto & elem : subqueries_for_sets)
if (elem.second.source) /// There could be prepared in advance Set/Join - no source is specified for them.
{
if (elem.second.source) /// There could be prepared in advance Set/Join - no source is specified for them.
{
if (isCancelledOrThrowIfKilled())
return;
thread_manager->schedule(true, "CreatingSets", [this, &item = elem.second] { createOne(item); });
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_in_creating_set_input_stream);
}
if (isCancelledOrThrowIfKilled())
return;
thread_manager->schedule(true, "CreatingSets", [this, &item = elem.second] { createOne(item); });
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_in_creating_set_input_stream);
}
}

Expand Down
21 changes: 11 additions & 10 deletions dbms/src/DataStreams/CreatingSetsBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include <Common/MemoryTracker.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Flash/Mpp/MPPTaskId.h>
#include <Interpreters/ExpressionAnalyzer.h> /// SubqueriesForSets
#include <Interpreters/SubqueryForSet.h>


namespace DB
Expand All @@ -29,17 +29,18 @@ namespace DB
class CreatingSetsBlockInputStream : public IProfilingBlockInputStream
{
public:
template <typename Subqueries>
CreatingSetsBlockInputStream(
const BlockInputStreamPtr & input,
const SubqueriesForSets & subqueries_for_sets_,
Subqueries && subqueries_for_sets_,
const SizeLimits & network_transfer_limits,
const String & req_id);

CreatingSetsBlockInputStream(
const BlockInputStreamPtr & input,
std::vector<SubqueriesForSets> && subqueries_for_sets_list_,
const SizeLimits & network_transfer_limits,
const String & req_id);
const String & req_id)
: subqueries_for_sets(std::forward<Subqueries>(subqueries_for_sets_))
, network_transfer_limits(network_transfer_limits)
, log(Logger::get(name, req_id))
{
init(input);
}

~CreatingSetsBlockInputStream() = default;

Expand Down Expand Up @@ -86,7 +87,7 @@ class CreatingSetsBlockInputStream : public IProfilingBlockInputStream
private:
void init(const BlockInputStreamPtr & input);

std::vector<SubqueriesForSets> subqueries_for_sets_list;
SubqueriesForSets subqueries_for_sets;
bool created = false;

SizeLimits network_transfer_limits;
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ bool DAGContext::allowInvalidDate() const
return sql_mode & TiDBSQLMode::ALLOW_INVALID_DATES;
}

void DAGContext::addSubqueryForSet(const String & subquery_id, SubqueryForSet && subquery)
{
if (subqueries_for_sets.find(subquery_id) != subqueries_for_sets.end())
throw TiFlashException(fmt::format("subquery_id [{}] duplicate", subquery_id), Errors::Coprocessor::Internal);
subqueries_for_sets[subquery_id] = std::move(subquery);
}

std::unordered_map<String, BlockInputStreams> & DAGContext::getProfileStreamsMap()
{
return profile_streams_map;
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/Coprocessor/TablesRegionsInfo.h>
#include <Flash/Mpp/MPPTaskId.h>
#include <Interpreters/SubqueryForSet.h>
#include <Storages/Transaction/TiDB.h>

namespace DB
Expand Down Expand Up @@ -271,6 +272,8 @@ class DAGContext
void initExchangeReceiverIfMPP(Context & context, size_t max_streams);
const std::unordered_map<String, std::shared_ptr<ExchangeReceiver>> & getMPPExchangeReceiverMap() const;

void addSubqueryForSet(const String & subquery_id, SubqueryForSet && subquery);

const tipb::DAGRequest * dag_request;
Int64 compile_time_ns = 0;
size_t final_concurrency = 1;
Expand All @@ -295,6 +298,9 @@ class DAGContext
std::vector<tipb::FieldType> result_field_types;
tipb::EncodeType encode_type = tipb::EncodeType::TypeDefault;

/// set of SubqueryForSet(such as join build subquery).
SubqueriesForSets subqueries_for_sets;

private:
/// Hold io for correcting the destruction order.
BlockIO io;
Expand Down
9 changes: 2 additions & 7 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,12 @@ DAGQueryBlockInterpreter::DAGQueryBlockInterpreter(
const std::vector<BlockInputStreams> & input_streams_vec_,
const DAGQueryBlock & query_block_,
size_t max_streams_,
bool keep_session_timezone_info_,
std::vector<SubqueriesForSets> & subqueries_for_sets_)
bool keep_session_timezone_info_)
: context(context_)
, input_streams_vec(input_streams_vec_)
, query_block(query_block_)
, keep_session_timezone_info(keep_session_timezone_info_)
, max_streams(max_streams_)
, subqueries_for_sets(subqueries_for_sets_)
, log(Logger::get("DAGQueryBlockInterpreter", dagContext().log ? dagContext().log->identifier() : ""))
{}

Expand Down Expand Up @@ -1025,10 +1023,7 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline)
SubqueryForSet right_query;
handleJoin(query_block.source->join(), pipeline, right_query);
recordProfileStreams(pipeline, query_block.source_name);

SubqueriesForSets subquries;
subquries[query_block.source_name] = right_query;
subqueries_for_sets.emplace_back(subquries);
dagContext().addSubqueryForSet(query_block.source_name, std::move(right_query));
}
else if (query_block.source->tp() == tipb::ExecType::TypeExchangeReceiver)
{
Expand Down
5 changes: 1 addition & 4 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ class DAGQueryBlockInterpreter
const std::vector<BlockInputStreams> & input_streams_vec_,
const DAGQueryBlock & query_block_,
size_t max_streams_,
bool keep_session_timezone_info_,
std::vector<SubqueriesForSets> & subqueries_for_sets_);
bool keep_session_timezone_info_);

~DAGQueryBlockInterpreter() = default;

Expand Down Expand Up @@ -119,8 +118,6 @@ class DAGQueryBlockInterpreter

std::unique_ptr<DAGExpressionAnalyzer> analyzer;

std::vector<SubqueriesForSets> & subqueries_for_sets;

LoggerPtr log;
};
} // namespace DB
30 changes: 16 additions & 14 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
// limitations under the License.

#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <Flash/Coprocessor/DAGBlockOutputStream.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGQueryBlockInterpreter.h>
#include <Flash/Coprocessor/InterpreterDAG.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Interpreters/Aggregator.h>
#include <Interpreters/Context.h>

namespace DB
{
Expand All @@ -35,24 +36,28 @@ InterpreterDAG::InterpreterDAG(Context & context_, const DAGQuerySource & dag_)
}
}

DAGContext & InterpreterDAG::dagContext() const
{
return *context.getDAGContext();
}

/** executeQueryBlock recursively converts all the children of the DAGQueryBlock and itself (Coprocessor DAG request)
* into an array of IBlockInputStream (element of physical executing plan of TiFlash)
*/
BlockInputStreams InterpreterDAG::executeQueryBlock(DAGQueryBlock & query_block, std::vector<SubqueriesForSets> & subqueries_for_sets)
BlockInputStreams InterpreterDAG::executeQueryBlock(DAGQueryBlock & query_block)
{
std::vector<BlockInputStreams> input_streams_vec;
for (auto & child : query_block.children)
{
BlockInputStreams child_streams = executeQueryBlock(*child, subqueries_for_sets);
BlockInputStreams child_streams = executeQueryBlock(*child);
input_streams_vec.push_back(child_streams);
}
DAGQueryBlockInterpreter query_block_interpreter(
context,
input_streams_vec,
query_block,
max_streams,
dagContext().keep_session_timezone_info || !query_block.isRootQueryBlock(),
subqueries_for_sets);
dagContext().keep_session_timezone_info || !query_block.isRootQueryBlock());
return query_block_interpreter.execute();
}

Expand All @@ -61,26 +66,23 @@ BlockIO InterpreterDAG::execute()
/// Due to learner read, DAGQueryBlockInterpreter may take a long time to build
/// the query plan, so we init mpp exchange receiver before executeQueryBlock
dagContext().initExchangeReceiverIfMPP(context, max_streams);
/// region_info should base on the source executor, however
/// tidb does not support multi-table dag request yet, so
/// it is ok to use the same region_info for the whole dag request
std::vector<SubqueriesForSets> subqueries_for_sets;
BlockInputStreams streams = executeQueryBlock(*dag.getRootQueryBlock(), subqueries_for_sets);

BlockInputStreams streams = executeQueryBlock(*dag.getRootQueryBlock());
DAGPipeline pipeline;
pipeline.streams = streams;

/// add union to run in parallel if needed
if (context.getDAGContext()->isMPPTask())
if (dagContext().isMPPTask())
ywqzzy marked this conversation as resolved.
Show resolved Hide resolved
/// MPPTask do not need the returned blocks.
executeUnion(pipeline, max_streams, dagContext().log, /*ignore_block=*/true);
else
executeUnion(pipeline, max_streams, dagContext().log);
if (!subqueries_for_sets.empty())
if (dagContext().subqueries_for_sets.empty())
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
{
const Settings & settings = context.getSettingsRef();
pipeline.firstStream() = std::make_shared<CreatingSetsBlockInputStream>(
pipeline.firstStream(),
std::move(subqueries_for_sets),
std::move(dagContext().subqueries_for_sets),
SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode),
dagContext().log->identifier());
}
Expand Down
13 changes: 3 additions & 10 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,9 @@
#pragma GCC diagnostic pop

#include <DataStreams/BlockIO.h>
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGQueryBlockInterpreter.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGQuerySource.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/IInterpreter.h>
#include <Storages/RegionQueryInfo.h>
#include <Storages/Transaction/Collator.h>
#include <Storages/Transaction/TMTStorages.h>

namespace DB
{
Expand All @@ -50,9 +43,9 @@ class InterpreterDAG : public IInterpreter
BlockIO execute() override;

private:
BlockInputStreams executeQueryBlock(DAGQueryBlock & query_block, std::vector<SubqueriesForSets> & subqueries_for_sets);
BlockInputStreams executeQueryBlock(DAGQueryBlock & query_block);

DAGContext & dagContext() const { return *context.getDAGContext(); }
DAGContext & dagContext() const;

Context & context;
const DAGQuerySource & dag;
Expand Down
27 changes: 1 addition & 26 deletions dbms/src/Interpreters/ExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Core/Block.h>
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/Settings.h>
#include <Interpreters/SubqueryForSet.h>


namespace DB
Expand All @@ -26,19 +27,13 @@ class Context;
class ExpressionActions;
struct ExpressionActionsChain;

class Join;
using JoinPtr = std::shared_ptr<Join>;

class IAST;
using ASTPtr = std::shared_ptr<IAST>;

class Set;
using SetPtr = std::shared_ptr<Set>;
using PreparedSets = std::unordered_map<IAST *, SetPtr>;

class IBlockInputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;

class IStorage;
using StoragePtr = std::shared_ptr<IStorage>;
using Tables = std::map<String, StoragePtr>;
Expand All @@ -48,26 +43,6 @@ class ASTExpressionList;
class ASTSelectQuery;


/** Information on what to do when executing a subquery in the [GLOBAL] IN/JOIN section.
*/
struct SubqueryForSet
{
/// The source is obtained using the InterpreterSelectQuery subquery.
BlockInputStreamPtr source;

/// If set, build it from result.
SetPtr set;
JoinPtr join;

/// If set, put the result into the table.
/// This is a temporary table for transferring to remote servers for distributed query processing.
StoragePtr table;
};

/// ID of subquery -> what to do with it.
using SubqueriesForSets = std::unordered_map<String, SubqueryForSet>;


/** Transforms an expression from a syntax tree into a sequence of actions to execute it.
*
* NOTE: if `ast` is a SELECT query from a table, the structure of this table should not change during the lifetime of ExpressionAnalyzer.
Expand Down
Loading