Skip to content

Commit

Permalink
support fine grained shuffle for window function (#5048)
Browse files Browse the repository at this point in the history
close #5142
  • Loading branch information
guo-shaoge authored Jul 11, 2022
1 parent b62dc6a commit 649919d
Show file tree
Hide file tree
Showing 28 changed files with 1,196 additions and 365 deletions.
2 changes: 1 addition & 1 deletion contrib/tipb
10 changes: 8 additions & 2 deletions dbms/src/DataStreams/TiRemoteBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream

uint64_t total_rows;

// For fine grained shuffle, sender will partition data into muiltiple streams by hashing.
// ExchangeReceiverBlockInputStream only need to read its own stream, i.e., streams[stream_id].
// CoprocessorBlockInputStream doesn't take care of this.
size_t stream_id;

void initRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index)
{
for (const auto & execution_summary : resp.execution_summaries())
Expand Down Expand Up @@ -120,7 +125,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream

bool fetchRemoteResult()
{
auto result = remote_reader->nextResult(block_queue, sample_block);
auto result = remote_reader->nextResult(block_queue, sample_block, stream_id);
if (result.meet_error)
{
LOG_FMT_WARNING(log, "remote reader meets error: {}", result.error_msg);
Expand Down Expand Up @@ -168,13 +173,14 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream
}

public:
TiRemoteBlockInputStream(std::shared_ptr<RemoteReader> remote_reader_, const String & req_id, const String & executor_id)
TiRemoteBlockInputStream(std::shared_ptr<RemoteReader> remote_reader_, const String & req_id, const String & executor_id, size_t stream_id_)
: remote_reader(remote_reader_)
, source_num(remote_reader->getSourceNum())
, name(fmt::format("TiRemoteBlockInputStream({})", RemoteReader::name))
, execution_summaries_inited(source_num)
, log(Logger::get(name, req_id, executor_id))
, total_rows(0)
, stream_id(stream_id_)
{
// generate sample block
ColumnsWithTypeAndName columns;
Expand Down
18 changes: 11 additions & 7 deletions dbms/src/Debug/astToExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,7 @@ bool ExchangeReceiver::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t c
{
tipb_executor->set_tp(tipb::ExecType::TypeExchangeReceiver);
tipb_executor->set_executor_id(name);
tipb_executor->set_fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count);
tipb::ExchangeReceiver * exchange_receiver = tipb_executor->mutable_exchange_receiver();
for (auto & field : output_schema)
{
Expand Down Expand Up @@ -1354,6 +1355,7 @@ bool Window::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id
{
tipb_executor->set_tp(tipb::ExecType::TypeWindow);
tipb_executor->set_executor_id(name);
tipb_executor->set_fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count);
tipb::Window * window = tipb_executor->mutable_window();
auto & input_schema = children[0]->output_schema;
for (const auto & expr : func_descs)
Expand Down Expand Up @@ -1430,6 +1432,7 @@ bool Sort::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id,
{
tipb_executor->set_tp(tipb::ExecType::TypeSort);
tipb_executor->set_executor_id(name);
tipb_executor->set_fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count);
tipb::Sort * sort = tipb_executor->mutable_sort();
sort->set_ispartialsort(is_partial_sort);

Expand Down Expand Up @@ -1665,13 +1668,13 @@ ExecutorPtr compileExchangeSender(ExecutorPtr input, size_t & executor_index, ti
return exchange_sender;
}

ExecutorPtr compileExchangeReceiver(size_t & executor_index, DAGSchema schema)
ExecutorPtr compileExchangeReceiver(size_t & executor_index, DAGSchema schema, uint64_t fine_grained_shuffle_stream_count)
{
ExecutorPtr exchange_receiver = std::make_shared<mock::ExchangeReceiver>(executor_index, schema);
ExecutorPtr exchange_receiver = std::make_shared<mock::ExchangeReceiver>(executor_index, schema, fine_grained_shuffle_stream_count);
return exchange_receiver;
}

ExecutorPtr compileWindow(ExecutorPtr input, size_t & executor_index, ASTPtr func_desc_list, ASTPtr partition_by_expr_list, ASTPtr order_by_expr_list, mock::MockWindowFrame frame)
ExecutorPtr compileWindow(ExecutorPtr input, size_t & executor_index, ASTPtr func_desc_list, ASTPtr partition_by_expr_list, ASTPtr order_by_expr_list, mock::MockWindowFrame frame, uint64_t fine_grained_shuffle_stream_count)
{
std::vector<ASTPtr> partition_columns;
if (partition_by_expr_list != nullptr)
Expand Down Expand Up @@ -1739,12 +1742,13 @@ ExecutorPtr compileWindow(ExecutorPtr input, size_t & executor_index, ASTPtr fun
window_exprs,
std::move(partition_columns),
std::move(order_columns),
frame);
frame,
fine_grained_shuffle_stream_count);
window->children.push_back(input);
return window;
}

ExecutorPtr compileSort(ExecutorPtr input, size_t & executor_index, ASTPtr order_by_expr_list, bool is_partial_sort)
ExecutorPtr compileSort(ExecutorPtr input, size_t & executor_index, ASTPtr order_by_expr_list, bool is_partial_sort, uint64_t fine_grained_shuffle_stream_count)
{
std::vector<ASTPtr> order_columns;
if (order_by_expr_list != nullptr)
Expand All @@ -1758,8 +1762,8 @@ ExecutorPtr compileSort(ExecutorPtr input, size_t & executor_index, ASTPtr order
compileExpr(input->output_schema, elem->children[0]);
}
}
ExecutorPtr sort = std::make_shared<mock::Sort>(executor_index, input->output_schema, std::move(order_columns), is_partial_sort);
ExecutorPtr sort = std::make_shared<mock::Sort>(executor_index, input->output_schema, std::move(order_columns), is_partial_sort, fine_grained_shuffle_stream_count);
sort->children.push_back(input);
return sort;
}
} // namespace DB
} // namespace DB
19 changes: 13 additions & 6 deletions dbms/src/Debug/astToExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,11 @@ struct ExchangeSender : Executor
struct ExchangeReceiver : Executor
{
TaskMetas task_metas;
ExchangeReceiver(size_t & index, const DAGSchema & output)
uint64_t fine_grained_shuffle_stream_count;

ExchangeReceiver(size_t & index, const DAGSchema & output, uint64_t fine_grained_shuffle_stream_count_ = 0)
: Executor(index, "exchange_receiver_" + std::to_string(index), output)
, fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_)
{}
void columnPrune(std::unordered_set<String> &) override { throw Exception("Should not reach here"); }
bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context &) override;
Expand Down Expand Up @@ -292,13 +295,15 @@ struct Window : Executor
std::vector<ASTPtr> partition_by_exprs;
std::vector<ASTPtr> order_by_exprs;
MockWindowFrame frame;
uint64_t fine_grained_shuffle_stream_count;

Window(size_t & index_, const DAGSchema & output_schema_, std::vector<ASTPtr> func_descs_, std::vector<ASTPtr> partition_by_exprs_, std::vector<ASTPtr> order_by_exprs_, MockWindowFrame frame_)
Window(size_t & index_, const DAGSchema & output_schema_, std::vector<ASTPtr> func_descs_, std::vector<ASTPtr> partition_by_exprs_, std::vector<ASTPtr> order_by_exprs_, MockWindowFrame frame_, uint64_t fine_grained_shuffle_stream_count_ = 0)
: Executor(index_, "window_" + std::to_string(index_), output_schema_)
, func_descs(std::move(func_descs_))
, partition_by_exprs(std::move(partition_by_exprs_))
, order_by_exprs(order_by_exprs_)
, frame(frame_)
, fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_)
{
}
// Currently only use Window Executor in Unit Test which don't call columnPrume.
Expand All @@ -311,11 +316,13 @@ struct Sort : Executor
{
std::vector<ASTPtr> by_exprs;
bool is_partial_sort;
uint64_t fine_grained_shuffle_stream_count;

Sort(size_t & index_, const DAGSchema & output_schema_, std::vector<ASTPtr> by_exprs_, bool is_partial_sort_)
Sort(size_t & index_, const DAGSchema & output_schema_, std::vector<ASTPtr> by_exprs_, bool is_partial_sort_, uint64_t fine_grained_shuffle_stream_count_ = 0)
: Executor(index_, "sort_" + std::to_string(index_), output_schema_)
, by_exprs(by_exprs_)
, is_partial_sort(is_partial_sort_)
, fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_)
{
}
// Currently only use Sort Executor in Unit Test which don't call columnPrume.
Expand Down Expand Up @@ -343,11 +350,11 @@ ExecutorPtr compileJoin(size_t & executor_index, ExecutorPtr left, ExecutorPtr r

ExecutorPtr compileExchangeSender(ExecutorPtr input, size_t & executor_index, tipb::ExchangeType exchange_type);

ExecutorPtr compileExchangeReceiver(size_t & executor_index, DAGSchema schema);
ExecutorPtr compileExchangeReceiver(size_t & executor_index, DAGSchema schema, uint64_t fine_grained_shuffle_stream_count = 0);

ExecutorPtr compileWindow(ExecutorPtr input, size_t & executor_index, ASTPtr func_desc_list, ASTPtr partition_by_expr_list, ASTPtr order_by_expr_list, mock::MockWindowFrame frame);
ExecutorPtr compileWindow(ExecutorPtr input, size_t & executor_index, ASTPtr func_desc_list, ASTPtr partition_by_expr_list, ASTPtr order_by_expr_list, mock::MockWindowFrame frame, uint64_t fine_grained_shuffle_stream_count = 0);

ExecutorPtr compileSort(ExecutorPtr input, size_t & executor_index, ASTPtr order_by_expr_list, bool is_partial_sort);
ExecutorPtr compileSort(ExecutorPtr input, size_t & executor_index, ASTPtr order_by_expr_list, bool is_partial_sort, uint64_t fine_grained_shuffle_stream_count = 0);

void literalFieldToTiPBExpr(const ColumnInfo & ci, const Field & field, tipb::Expr * expr, Int32 collator_id);
} // namespace DB
5 changes: 3 additions & 2 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,9 @@ BlockInputStreamPtr executeQuery(Context & context, RegionID region_id, const DA
tipb_exchange_receiver.encoded_task_meta_size(),
10,
/*req_id=*/"",
/*executor_id=*/"");
BlockInputStreamPtr ret = std::make_shared<ExchangeReceiverInputStream>(exchange_receiver, /*req_id=*/"", /*executor_id=*/"");
/*executor_id=*/"",
/*fine_grained_shuffle_stream_count=*/0);
BlockInputStreamPtr ret = std::make_shared<ExchangeReceiverInputStream>(exchange_receiver, /*req_id=*/"", /*executor_id=*/"", /*stream_id*/ 0);
return ret;
}
else
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/CoprocessorReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ class CoprocessorReader
return detail;
}

CoprocessorReaderResult nextResult(std::queue<Block> & block_queue, const Block & header)
// stream_id is only meaningful for ExchagneReceiver.
CoprocessorReaderResult nextResult(std::queue<Block> & block_queue, const Block & header, size_t /*stream_id*/)
{
auto && [result, has_next] = resp_iter.next();
if (!result.error.empty())
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ constexpr UInt64 NO_ENGINE_SUBSTITUTION = 1ul << 30ul;
constexpr UInt64 ALLOW_INVALID_DATES = 1ul << 32ul;
} // namespace TiDBSQLMode

inline bool enableFineGrainedShuffle(uint64_t stream_count)
{
return stream_count > 0;
}

/// A context used to track the information that needs to be passed around during DAG planning.
class DAGContext
{
Expand Down
7 changes: 5 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ DAGDriver<true>::DAGDriver(
::grpc::ServerWriter<::coprocessor::BatchResponse> * writer_,
bool internal_)
: context(context_)
, dag_response(nullptr)
, writer(writer_)
, internal(internal_)
, log(&Poco::Logger::get("DAGDriver"))
Expand Down Expand Up @@ -129,15 +130,17 @@ try
auto streaming_writer = std::make_shared<StreamWriter>(writer);
TiDB::TiDBCollators collators;

std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<StreamingDAGResponseWriter<StreamWriterPtr>>(
std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<StreamingDAGResponseWriter<StreamWriterPtr, false>>(
streaming_writer,
std::vector<Int64>(),
collators,
tipb::ExchangeType::PassThrough,
context.getSettingsRef().dag_records_per_chunk,
context.getSettingsRef().batch_send_min_limit,
true,
dag_context);
dag_context,
/*fine_grained_shuffle_stream_count=*/0,
/*fine_grained_shuffle_batch_size=*/0);
dag_output_stream = std::make_shared<DAGBlockOutputStream>(streams.in->getHeader(), std::move(response_writer));
copyData(*streams.in, *dag_output_stream);
}
Expand Down
Loading

0 comments on commit 649919d

Please sign in to comment.