From 649919d86632f8dbf95b4f294e19e52de84bc1bf Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 11 Jul 2022 11:21:05 +0800 Subject: [PATCH] support fine grained shuffle for window function (#5048) close pingcap/tiflash#5142 --- contrib/kvproto | 2 +- contrib/tipb | 2 +- .../DataStreams/TiRemoteBlockInputStream.h | 10 +- dbms/src/Debug/astToExecutor.cpp | 18 +- dbms/src/Debug/astToExecutor.h | 19 +- dbms/src/Debug/dbgFuncCoprocessor.cpp | 5 +- .../src/Flash/Coprocessor/CoprocessorReader.h | 3 +- dbms/src/Flash/Coprocessor/DAGContext.h | 5 + dbms/src/Flash/Coprocessor/DAGDriver.cpp | 7 +- .../Coprocessor/DAGQueryBlockInterpreter.cpp | 166 ++++++--- .../Coprocessor/DAGQueryBlockInterpreter.h | 11 +- .../Coprocessor/DAGStorageInterpreter.cpp | 2 +- dbms/src/Flash/Coprocessor/DecodeDetail.h | 8 +- .../StreamingDAGResponseWriter.cpp | 327 +++++++++++++----- .../Coprocessor/StreamingDAGResponseWriter.h | 18 +- .../tests/gtest_streaming_dag_writer.cpp | 184 ++++++++++ dbms/src/Flash/Mpp/ExchangeReceiver.cpp | 236 ++++++++++--- dbms/src/Flash/Mpp/ExchangeReceiver.h | 41 ++- dbms/src/Flash/Mpp/MPPTask.cpp | 3 +- dbms/src/Flash/tests/WindowTestUtil.h | 22 +- dbms/src/Flash/tests/bench_exchange.cpp | 207 +++++++---- dbms/src/Flash/tests/bench_exchange.h | 22 +- dbms/src/Flash/tests/bench_window.cpp | 86 ++++- dbms/src/Flash/tests/gtest_interpreter.cpp | 74 +++- dbms/src/TestUtils/bench_dbms_main.cpp | 2 + dbms/src/TestUtils/mockExecutor.cpp | 33 +- dbms/src/TestUtils/mockExecutor.h | 16 +- tests/fullstack-test/mpp/window.test | 32 ++ 28 files changed, 1196 insertions(+), 365 deletions(-) create mode 100644 dbms/src/Flash/Coprocessor/tests/gtest_streaming_dag_writer.cpp create mode 100644 tests/fullstack-test/mpp/window.test diff --git a/contrib/kvproto b/contrib/kvproto index 12e2f5a9d16..a5d4ffd2ba3 160000 --- a/contrib/kvproto +++ b/contrib/kvproto @@ -1 +1 @@ -Subproject commit 12e2f5a9d167f46602804840857ddc8ff06dc695 +Subproject commit a5d4ffd2ba337dad0bc99e9fb53bf665864a3f3b diff --git a/contrib/tipb b/contrib/tipb index bfb5c2c5518..0f4f873beca 160000 --- a/contrib/tipb +++ b/contrib/tipb @@ -1 +1 @@ -Subproject commit bfb5c2c55188c254018d3cf77bfad73b4d4b77ec +Subproject commit 0f4f873beca8d5078dde0a23d15ad5ce3188ed0d diff --git a/dbms/src/DataStreams/TiRemoteBlockInputStream.h b/dbms/src/DataStreams/TiRemoteBlockInputStream.h index f249bf1a0dc..cfa3e95c440 100644 --- a/dbms/src/DataStreams/TiRemoteBlockInputStream.h +++ b/dbms/src/DataStreams/TiRemoteBlockInputStream.h @@ -58,6 +58,11 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream uint64_t total_rows; + // For fine grained shuffle, sender will partition data into muiltiple streams by hashing. + // ExchangeReceiverBlockInputStream only need to read its own stream, i.e., streams[stream_id]. + // CoprocessorBlockInputStream doesn't take care of this. + size_t stream_id; + void initRemoteExecutionSummaries(tipb::SelectResponse & resp, size_t index) { for (const auto & execution_summary : resp.execution_summaries()) @@ -120,7 +125,7 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream bool fetchRemoteResult() { - auto result = remote_reader->nextResult(block_queue, sample_block); + auto result = remote_reader->nextResult(block_queue, sample_block, stream_id); if (result.meet_error) { LOG_FMT_WARNING(log, "remote reader meets error: {}", result.error_msg); @@ -168,13 +173,14 @@ class TiRemoteBlockInputStream : public IProfilingBlockInputStream } public: - TiRemoteBlockInputStream(std::shared_ptr remote_reader_, const String & req_id, const String & executor_id) + TiRemoteBlockInputStream(std::shared_ptr remote_reader_, const String & req_id, const String & executor_id, size_t stream_id_) : remote_reader(remote_reader_) , source_num(remote_reader->getSourceNum()) , name(fmt::format("TiRemoteBlockInputStream({})", RemoteReader::name)) , execution_summaries_inited(source_num) , log(Logger::get(name, req_id, executor_id)) , total_rows(0) + , stream_id(stream_id_) { // generate sample block ColumnsWithTypeAndName columns; diff --git a/dbms/src/Debug/astToExecutor.cpp b/dbms/src/Debug/astToExecutor.cpp index 481eac65fe2..e02dd3aa740 100644 --- a/dbms/src/Debug/astToExecutor.cpp +++ b/dbms/src/Debug/astToExecutor.cpp @@ -851,6 +851,7 @@ bool ExchangeReceiver::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t c { tipb_executor->set_tp(tipb::ExecType::TypeExchangeReceiver); tipb_executor->set_executor_id(name); + tipb_executor->set_fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count); tipb::ExchangeReceiver * exchange_receiver = tipb_executor->mutable_exchange_receiver(); for (auto & field : output_schema) { @@ -1354,6 +1355,7 @@ bool Window::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id { tipb_executor->set_tp(tipb::ExecType::TypeWindow); tipb_executor->set_executor_id(name); + tipb_executor->set_fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count); tipb::Window * window = tipb_executor->mutable_window(); auto & input_schema = children[0]->output_schema; for (const auto & expr : func_descs) @@ -1430,6 +1432,7 @@ bool Sort::toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, { tipb_executor->set_tp(tipb::ExecType::TypeSort); tipb_executor->set_executor_id(name); + tipb_executor->set_fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count); tipb::Sort * sort = tipb_executor->mutable_sort(); sort->set_ispartialsort(is_partial_sort); @@ -1665,13 +1668,13 @@ ExecutorPtr compileExchangeSender(ExecutorPtr input, size_t & executor_index, ti return exchange_sender; } -ExecutorPtr compileExchangeReceiver(size_t & executor_index, DAGSchema schema) +ExecutorPtr compileExchangeReceiver(size_t & executor_index, DAGSchema schema, uint64_t fine_grained_shuffle_stream_count) { - ExecutorPtr exchange_receiver = std::make_shared(executor_index, schema); + ExecutorPtr exchange_receiver = std::make_shared(executor_index, schema, fine_grained_shuffle_stream_count); return exchange_receiver; } -ExecutorPtr compileWindow(ExecutorPtr input, size_t & executor_index, ASTPtr func_desc_list, ASTPtr partition_by_expr_list, ASTPtr order_by_expr_list, mock::MockWindowFrame frame) +ExecutorPtr compileWindow(ExecutorPtr input, size_t & executor_index, ASTPtr func_desc_list, ASTPtr partition_by_expr_list, ASTPtr order_by_expr_list, mock::MockWindowFrame frame, uint64_t fine_grained_shuffle_stream_count) { std::vector partition_columns; if (partition_by_expr_list != nullptr) @@ -1739,12 +1742,13 @@ ExecutorPtr compileWindow(ExecutorPtr input, size_t & executor_index, ASTPtr fun window_exprs, std::move(partition_columns), std::move(order_columns), - frame); + frame, + fine_grained_shuffle_stream_count); window->children.push_back(input); return window; } -ExecutorPtr compileSort(ExecutorPtr input, size_t & executor_index, ASTPtr order_by_expr_list, bool is_partial_sort) +ExecutorPtr compileSort(ExecutorPtr input, size_t & executor_index, ASTPtr order_by_expr_list, bool is_partial_sort, uint64_t fine_grained_shuffle_stream_count) { std::vector order_columns; if (order_by_expr_list != nullptr) @@ -1758,8 +1762,8 @@ ExecutorPtr compileSort(ExecutorPtr input, size_t & executor_index, ASTPtr order compileExpr(input->output_schema, elem->children[0]); } } - ExecutorPtr sort = std::make_shared(executor_index, input->output_schema, std::move(order_columns), is_partial_sort); + ExecutorPtr sort = std::make_shared(executor_index, input->output_schema, std::move(order_columns), is_partial_sort, fine_grained_shuffle_stream_count); sort->children.push_back(input); return sort; } -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Debug/astToExecutor.h b/dbms/src/Debug/astToExecutor.h index 4d87c0db77e..f39f4059d26 100644 --- a/dbms/src/Debug/astToExecutor.h +++ b/dbms/src/Debug/astToExecutor.h @@ -139,8 +139,11 @@ struct ExchangeSender : Executor struct ExchangeReceiver : Executor { TaskMetas task_metas; - ExchangeReceiver(size_t & index, const DAGSchema & output) + uint64_t fine_grained_shuffle_stream_count; + + ExchangeReceiver(size_t & index, const DAGSchema & output, uint64_t fine_grained_shuffle_stream_count_ = 0) : Executor(index, "exchange_receiver_" + std::to_string(index), output) + , fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_) {} void columnPrune(std::unordered_set &) override { throw Exception("Should not reach here"); } bool toTiPBExecutor(tipb::Executor * tipb_executor, uint32_t collator_id, const MPPInfo & mpp_info, const Context &) override; @@ -292,13 +295,15 @@ struct Window : Executor std::vector partition_by_exprs; std::vector order_by_exprs; MockWindowFrame frame; + uint64_t fine_grained_shuffle_stream_count; - Window(size_t & index_, const DAGSchema & output_schema_, std::vector func_descs_, std::vector partition_by_exprs_, std::vector order_by_exprs_, MockWindowFrame frame_) + Window(size_t & index_, const DAGSchema & output_schema_, std::vector func_descs_, std::vector partition_by_exprs_, std::vector order_by_exprs_, MockWindowFrame frame_, uint64_t fine_grained_shuffle_stream_count_ = 0) : Executor(index_, "window_" + std::to_string(index_), output_schema_) , func_descs(std::move(func_descs_)) , partition_by_exprs(std::move(partition_by_exprs_)) , order_by_exprs(order_by_exprs_) , frame(frame_) + , fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_) { } // Currently only use Window Executor in Unit Test which don't call columnPrume. @@ -311,11 +316,13 @@ struct Sort : Executor { std::vector by_exprs; bool is_partial_sort; + uint64_t fine_grained_shuffle_stream_count; - Sort(size_t & index_, const DAGSchema & output_schema_, std::vector by_exprs_, bool is_partial_sort_) + Sort(size_t & index_, const DAGSchema & output_schema_, std::vector by_exprs_, bool is_partial_sort_, uint64_t fine_grained_shuffle_stream_count_ = 0) : Executor(index_, "sort_" + std::to_string(index_), output_schema_) , by_exprs(by_exprs_) , is_partial_sort(is_partial_sort_) + , fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_) { } // Currently only use Sort Executor in Unit Test which don't call columnPrume. @@ -343,11 +350,11 @@ ExecutorPtr compileJoin(size_t & executor_index, ExecutorPtr left, ExecutorPtr r ExecutorPtr compileExchangeSender(ExecutorPtr input, size_t & executor_index, tipb::ExchangeType exchange_type); -ExecutorPtr compileExchangeReceiver(size_t & executor_index, DAGSchema schema); +ExecutorPtr compileExchangeReceiver(size_t & executor_index, DAGSchema schema, uint64_t fine_grained_shuffle_stream_count = 0); -ExecutorPtr compileWindow(ExecutorPtr input, size_t & executor_index, ASTPtr func_desc_list, ASTPtr partition_by_expr_list, ASTPtr order_by_expr_list, mock::MockWindowFrame frame); +ExecutorPtr compileWindow(ExecutorPtr input, size_t & executor_index, ASTPtr func_desc_list, ASTPtr partition_by_expr_list, ASTPtr order_by_expr_list, mock::MockWindowFrame frame, uint64_t fine_grained_shuffle_stream_count = 0); -ExecutorPtr compileSort(ExecutorPtr input, size_t & executor_index, ASTPtr order_by_expr_list, bool is_partial_sort); +ExecutorPtr compileSort(ExecutorPtr input, size_t & executor_index, ASTPtr order_by_expr_list, bool is_partial_sort, uint64_t fine_grained_shuffle_stream_count = 0); void literalFieldToTiPBExpr(const ColumnInfo & ci, const Field & field, tipb::Expr * expr, Int32 collator_id); } // namespace DB diff --git a/dbms/src/Debug/dbgFuncCoprocessor.cpp b/dbms/src/Debug/dbgFuncCoprocessor.cpp index e9335d1e2bd..62a8b7537f1 100644 --- a/dbms/src/Debug/dbgFuncCoprocessor.cpp +++ b/dbms/src/Debug/dbgFuncCoprocessor.cpp @@ -290,8 +290,9 @@ BlockInputStreamPtr executeQuery(Context & context, RegionID region_id, const DA tipb_exchange_receiver.encoded_task_meta_size(), 10, /*req_id=*/"", - /*executor_id=*/""); - BlockInputStreamPtr ret = std::make_shared(exchange_receiver, /*req_id=*/"", /*executor_id=*/""); + /*executor_id=*/"", + /*fine_grained_shuffle_stream_count=*/0); + BlockInputStreamPtr ret = std::make_shared(exchange_receiver, /*req_id=*/"", /*executor_id=*/"", /*stream_id*/ 0); return ret; } else diff --git a/dbms/src/Flash/Coprocessor/CoprocessorReader.h b/dbms/src/Flash/Coprocessor/CoprocessorReader.h index 25c07cff49c..b48fdbcd6dc 100644 --- a/dbms/src/Flash/Coprocessor/CoprocessorReader.h +++ b/dbms/src/Flash/Coprocessor/CoprocessorReader.h @@ -139,7 +139,8 @@ class CoprocessorReader return detail; } - CoprocessorReaderResult nextResult(std::queue & block_queue, const Block & header) + // stream_id is only meaningful for ExchagneReceiver. + CoprocessorReaderResult nextResult(std::queue & block_queue, const Block & header, size_t /*stream_id*/) { auto && [result, has_next] = resp_iter.next(); if (!result.error.empty()) diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index a50a4d4007b..10190074a0f 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -116,6 +116,11 @@ constexpr UInt64 NO_ENGINE_SUBSTITUTION = 1ul << 30ul; constexpr UInt64 ALLOW_INVALID_DATES = 1ul << 32ul; } // namespace TiDBSQLMode +inline bool enableFineGrainedShuffle(uint64_t stream_count) +{ + return stream_count > 0; +} + /// A context used to track the information that needs to be passed around during DAG planning. class DAGContext { diff --git a/dbms/src/Flash/Coprocessor/DAGDriver.cpp b/dbms/src/Flash/Coprocessor/DAGDriver.cpp index 55a2024a8bc..9fe388f8fe4 100644 --- a/dbms/src/Flash/Coprocessor/DAGDriver.cpp +++ b/dbms/src/Flash/Coprocessor/DAGDriver.cpp @@ -72,6 +72,7 @@ DAGDriver::DAGDriver( ::grpc::ServerWriter<::coprocessor::BatchResponse> * writer_, bool internal_) : context(context_) + , dag_response(nullptr) , writer(writer_) , internal(internal_) , log(&Poco::Logger::get("DAGDriver")) @@ -129,7 +130,7 @@ try auto streaming_writer = std::make_shared(writer); TiDB::TiDBCollators collators; - std::unique_ptr response_writer = std::make_unique>( + std::unique_ptr response_writer = std::make_unique>( streaming_writer, std::vector(), collators, @@ -137,7 +138,9 @@ try context.getSettingsRef().dag_records_per_chunk, context.getSettingsRef().batch_send_min_limit, true, - dag_context); + dag_context, + /*fine_grained_shuffle_stream_count=*/0, + /*fine_grained_shuffle_batch_size=*/0); dag_output_stream = std::make_shared(streams.in->getHeader(), std::move(response_writer)); copyData(*streams.in, *dag_output_stream); } diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index bf695da34c1..4714580fda0 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -57,6 +57,10 @@ namespace FailPoints { extern const char minimum_block_size_for_cross_join[]; } // namespace FailPoints +namespace +{ +const String enableFineGrainedShuffleExtraInfo = "enable fine grained shuffle"; +} DAGQueryBlockInterpreter::DAGQueryBlockInterpreter( Context & context_, @@ -347,14 +351,26 @@ void DAGQueryBlockInterpreter::executeWhere(DAGPipeline & pipeline, const Expres void DAGQueryBlockInterpreter::executeWindow( DAGPipeline & pipeline, - WindowDescription & window_description) + WindowDescription & window_description, + bool enable_fine_grained_shuffle) { executeExpression(pipeline, window_description.before_window, "before window"); - /// If there are several streams, we merge them into one - executeUnion(pipeline, max_streams, log, false, "merge into one for window input"); - assert(pipeline.streams.size() == 1); - pipeline.firstStream() = std::make_shared(pipeline.firstStream(), window_description, log->identifier()); + if (enable_fine_grained_shuffle) + { + /// Window function can be multiple threaded when fine grained shuffle is enabled. + pipeline.transform([&](auto & stream) { + stream = std::make_shared(stream, window_description, log->identifier()); + stream->setExtraInfo(enableFineGrainedShuffleExtraInfo); + }); + } + else + { + /// If there are several streams, we merge them into one. + executeUnion(pipeline, max_streams, log, false, "merge into one for window input"); + assert(pipeline.streams.size() == 1); + pipeline.firstStream() = std::make_shared(pipeline.firstStream(), window_description, log->identifier()); + } } void DAGQueryBlockInterpreter::executeAggregation( @@ -437,20 +453,23 @@ void DAGQueryBlockInterpreter::executeExpression(DAGPipeline & pipeline, const E } } -void DAGQueryBlockInterpreter::executeWindowOrder(DAGPipeline & pipeline, SortDescription sort_desc) +void DAGQueryBlockInterpreter::executeWindowOrder(DAGPipeline & pipeline, SortDescription sort_desc, bool enable_fine_grained_shuffle) { - orderStreams(pipeline, sort_desc, 0); + orderStreams(pipeline, sort_desc, 0, enable_fine_grained_shuffle); } void DAGQueryBlockInterpreter::executeOrder(DAGPipeline & pipeline, const NamesAndTypes & order_columns) { Int64 limit = query_block.limit_or_topn->topn().limit(); - orderStreams(pipeline, getSortDescription(order_columns, query_block.limit_or_topn->topn().order_by()), limit); + orderStreams(pipeline, getSortDescription(order_columns, query_block.limit_or_topn->topn().order_by()), limit, false); } -void DAGQueryBlockInterpreter::orderStreams(DAGPipeline & pipeline, SortDescription order_descr, Int64 limit) +void DAGQueryBlockInterpreter::orderStreams(DAGPipeline & pipeline, SortDescription order_descr, Int64 limit, bool enable_fine_grained_shuffle) { const Settings & settings = context.getSettingsRef(); + String extra_info; + if (enable_fine_grained_shuffle) + extra_info = enableFineGrainedShuffleExtraInfo; pipeline.transform([&](auto & stream) { auto sorting_stream = std::make_shared(stream, order_descr, log->identifier(), limit); @@ -462,20 +481,38 @@ void DAGQueryBlockInterpreter::orderStreams(DAGPipeline & pipeline, SortDescript sorting_stream->setLimits(limits); stream = sorting_stream; + stream->setExtraInfo(extra_info); }); - /// If there are several streams, we merge them into one - executeUnion(pipeline, max_streams, log, false, "for partial order"); - - /// Merge the sorted blocks. - pipeline.firstStream() = std::make_shared( - pipeline.firstStream(), - order_descr, - settings.max_block_size, - limit, - settings.max_bytes_before_external_sort, - context.getTemporaryPath(), - log->identifier()); + if (enable_fine_grained_shuffle) + { + pipeline.transform([&](auto & stream) { + stream = std::make_shared( + stream, + order_descr, + settings.max_block_size, + limit, + settings.max_bytes_before_external_sort, + context.getTemporaryPath(), + log->identifier()); + stream->setExtraInfo(enableFineGrainedShuffleExtraInfo); + }); + } + else + { + /// If there are several streams, we merge them into one + executeUnion(pipeline, max_streams, log, false, "for partial order"); + + /// Merge the sorted blocks. + pipeline.firstStream() = std::make_shared( + pipeline.firstStream(), + order_descr, + settings.max_block_size, + limit, + settings.max_bytes_before_external_sort, + context.getTemporaryPath(), + log->identifier()); + } } void DAGQueryBlockInterpreter::recordProfileStreams(DAGPipeline & pipeline, const String & key) @@ -491,12 +528,25 @@ void DAGQueryBlockInterpreter::handleExchangeReceiver(DAGPipeline & pipeline) throw Exception("Can not find exchange receiver for " + query_block.source_name, ErrorCodes::LOGICAL_ERROR); // todo choose a more reasonable stream number auto & exchange_receiver_io_input_streams = dagContext().getInBoundIOInputStreamsMap()[query_block.source_name]; - for (size_t i = 0; i < max_streams; ++i) + + const bool enable_fine_grained_shuffle = enableFineGrainedShuffle(exchange_receiver->getFineGrainedShuffleStreamCount()); + String extra_info = "squashing after exchange receiver"; + size_t stream_count = max_streams; + if (enable_fine_grained_shuffle) + { + extra_info += ", " + enableFineGrainedShuffleExtraInfo; + stream_count = std::min(max_streams, exchange_receiver->getFineGrainedShuffleStreamCount()); + } + + for (size_t i = 0; i < stream_count; ++i) { - BlockInputStreamPtr stream = std::make_shared(exchange_receiver, log->identifier(), query_block.source_name); + BlockInputStreamPtr stream = std::make_shared(exchange_receiver, + log->identifier(), + query_block.source_name, + /*stream_id=*/enable_fine_grained_shuffle ? i : 0); exchange_receiver_io_input_streams.push_back(stream); stream = std::make_shared(stream, 8192, 0, log->identifier()); - stream->setExtraInfo("squashing after exchange receiver"); + stream->setExtraInfo(extra_info); pipeline.streams.push_back(stream); } NamesAndTypes source_columns; @@ -561,7 +611,7 @@ void DAGQueryBlockInterpreter::handleProjection(DAGPipeline & pipeline, const ti analyzer = std::make_unique(std::move(output_columns), context); } -void DAGQueryBlockInterpreter::handleWindow(DAGPipeline & pipeline, const tipb::Window & window) +void DAGQueryBlockInterpreter::handleWindow(DAGPipeline & pipeline, const tipb::Window & window, bool enable_fine_grained_shuffle) { NamesAndTypes input_columns; assert(input_streams_vec.size() == 1); @@ -570,13 +620,13 @@ void DAGQueryBlockInterpreter::handleWindow(DAGPipeline & pipeline, const tipb:: input_columns.emplace_back(p.name, p.type); DAGExpressionAnalyzer dag_analyzer(input_columns, context); WindowDescription window_description = dag_analyzer.buildWindowDescription(window); - executeWindow(pipeline, window_description); + executeWindow(pipeline, window_description, enable_fine_grained_shuffle); executeExpression(pipeline, window_description.after_window, "cast after window"); analyzer = std::make_unique(window_description.after_window_columns, context); } -void DAGQueryBlockInterpreter::handleWindowOrder(DAGPipeline & pipeline, const tipb::Sort & window_sort) +void DAGQueryBlockInterpreter::handleWindowOrder(DAGPipeline & pipeline, const tipb::Sort & window_sort, bool enable_fine_grained_shuffle) { NamesAndTypes input_columns; assert(input_streams_vec.size() == 1); @@ -585,7 +635,7 @@ void DAGQueryBlockInterpreter::handleWindowOrder(DAGPipeline & pipeline, const t input_columns.emplace_back(p.name, p.type); DAGExpressionAnalyzer dag_analyzer(input_columns, context); auto order_columns = dag_analyzer.buildWindowOrderColumns(window_sort); - executeWindowOrder(pipeline, getSortDescription(order_columns, window_sort.byitems())); + executeWindowOrder(pipeline, getSortDescription(order_columns, window_sort.byitems()), enable_fine_grained_shuffle); analyzer = std::make_unique(std::move(input_columns), context); } @@ -633,13 +683,13 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline) } else if (query_block.source->tp() == tipb::ExecType::TypeWindow) { - handleWindow(pipeline, query_block.source->window()); + handleWindow(pipeline, query_block.source->window(), enableFineGrainedShuffle(query_block.source->fine_grained_shuffle_stream_count())); recordProfileStreams(pipeline, query_block.source_name); restorePipelineConcurrency(pipeline); } else if (query_block.source->tp() == tipb::ExecType::TypeSort) { - handleWindowOrder(pipeline, query_block.source->sort()); + handleWindowOrder(pipeline, query_block.source->sort(), enableFineGrainedShuffle(query_block.source->fine_grained_shuffle_stream_count())); recordProfileStreams(pipeline, query_block.source_name); } else @@ -748,19 +798,47 @@ void DAGQueryBlockInterpreter::handleExchangeSender(DAGPipeline & pipeline) std::vector partition_col_ids = ExchangeSenderInterpreterHelper::genPartitionColIds(exchange_sender); TiDB::TiDBCollators partition_col_collators = ExchangeSenderInterpreterHelper::genPartitionColCollators(exchange_sender); int stream_id = 0; - pipeline.transform([&](auto & stream) { - // construct writer - std::unique_ptr response_writer = std::make_unique>( - context.getDAGContext()->tunnel_set, - partition_col_ids, - partition_col_collators, - exchange_sender.tp(), - context.getSettingsRef().dag_records_per_chunk, - context.getSettingsRef().batch_send_min_limit, - stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response - dagContext()); - stream = std::make_shared(stream, std::move(response_writer), log->identifier()); - }); + const uint64_t stream_count = query_block.exchange_sender->fine_grained_shuffle_stream_count(); + const uint64_t batch_size = query_block.exchange_sender->fine_grained_shuffle_batch_size(); + + if (enableFineGrainedShuffle(stream_count)) + { + pipeline.transform([&](auto & stream) { + // construct writer + std::unique_ptr response_writer = std::make_unique>( + context.getDAGContext()->tunnel_set, + partition_col_ids, + partition_col_collators, + exchange_sender.tp(), + context.getSettingsRef().dag_records_per_chunk, + context.getSettingsRef().batch_send_min_limit, + stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response + dagContext(), + stream_count, + batch_size); + stream = std::make_shared(stream, std::move(response_writer), log->identifier()); + stream->setExtraInfo(enableFineGrainedShuffleExtraInfo); + }); + RUNTIME_CHECK(exchange_sender.tp() == tipb::ExchangeType::Hash, Exception, "exchange_sender has to be hash partition when fine grained shuffle is enabled"); + RUNTIME_CHECK(stream_count <= 1024, Exception, "fine_grained_shuffle_stream_count should not be greater than 1024"); + } + else + { + pipeline.transform([&](auto & stream) { + std::unique_ptr response_writer = std::make_unique>( + context.getDAGContext()->tunnel_set, + partition_col_ids, + partition_col_collators, + exchange_sender.tp(), + context.getSettingsRef().dag_records_per_chunk, + context.getSettingsRef().batch_send_min_limit, + stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response + dagContext(), + stream_count, + batch_size); + stream = std::make_shared(stream, std::move(response_writer), log->identifier()); + }); + } } void DAGQueryBlockInterpreter::handleMockExchangeSender(DAGPipeline & pipeline) @@ -788,4 +866,4 @@ BlockInputStreams DAGQueryBlockInterpreter::execute() return pipeline.streams; } -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h index e68c4f91cee..532dceabce9 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h @@ -64,17 +64,18 @@ class DAGQueryBlockInterpreter void handleExchangeReceiver(DAGPipeline & pipeline); void handleMockExchangeReceiver(DAGPipeline & pipeline); void handleProjection(DAGPipeline & pipeline, const tipb::Projection & projection); - void handleWindow(DAGPipeline & pipeline, const tipb::Window & window); - void handleWindowOrder(DAGPipeline & pipeline, const tipb::Sort & window_sort); + void handleWindow(DAGPipeline & pipeline, const tipb::Window & window, bool enable_fine_grained_shuffle); + void handleWindowOrder(DAGPipeline & pipeline, const tipb::Sort & window_sort, bool enable_fine_grained_shuffle); void executeWhere(DAGPipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, String & filter_column, const String & extra_info = ""); void executeExpression(DAGPipeline & pipeline, const ExpressionActionsPtr & expressionActionsPtr, const String & extra_info = ""); - void executeWindowOrder(DAGPipeline & pipeline, SortDescription sort_desc); - void orderStreams(DAGPipeline & pipeline, SortDescription order_descr, Int64 limit); + void executeWindowOrder(DAGPipeline & pipeline, SortDescription sort_desc, bool enable_fine_grained_shuffle); + void orderStreams(DAGPipeline & pipeline, SortDescription order_descr, Int64 limit, bool enable_fine_grained_shuffle); void executeOrder(DAGPipeline & pipeline, const NamesAndTypes & order_columns); void executeLimit(DAGPipeline & pipeline); void executeWindow( DAGPipeline & pipeline, - WindowDescription & window_description); + WindowDescription & window_description, + bool enable_fine_grained_shuffle); void executeAggregation( DAGPipeline & pipeline, const ExpressionActionsPtr & expression_actions_ptr, diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index ad2de7217e0..390ce7b9948 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -487,7 +487,7 @@ void DAGStorageInterpreter::buildRemoteStreams(std::vector && rem auto coprocessor_reader = std::make_shared(schema, cluster, tasks, has_enforce_encode_type, 1); context.getDAGContext()->addCoprocessorReader(coprocessor_reader); - BlockInputStreamPtr input = std::make_shared(coprocessor_reader, log->identifier(), table_scan.getTableScanExecutorID()); + BlockInputStreamPtr input = std::make_shared(coprocessor_reader, log->identifier(), table_scan.getTableScanExecutorID(), /*stream_id=*/0); pipeline.streams.push_back(input); task_start = task_end; } diff --git a/dbms/src/Flash/Coprocessor/DecodeDetail.h b/dbms/src/Flash/Coprocessor/DecodeDetail.h index 9bad0ca2b72..91851650d9e 100644 --- a/dbms/src/Flash/Coprocessor/DecodeDetail.h +++ b/dbms/src/Flash/Coprocessor/DecodeDetail.h @@ -21,8 +21,12 @@ namespace DB /// Detail of the packet that decoding in TiRemoteInputStream.RemoteReader.decodeChunks() struct DecodeDetail { + // For fine grained shuffle, each ExchangeReceiver/thread will decode its own blocks. + // So this is the row number of partial blocks of the original packet. + // This will be the row number of all blocks of the original packet if it's not fine grained shuffle. Int64 rows = 0; - // byte size of origin packet. + + // Total byte size of the origin packet, even for fine grained shuffle. Int64 packet_bytes = 0; }; -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp index f915653fe96..a72dfcc16ef 100644 --- a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp +++ b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.cpp @@ -23,6 +23,8 @@ #include #include +#include + namespace DB { namespace ErrorCodes @@ -37,8 +39,8 @@ inline void serializeToPacket(mpp::MPPDataPacket & packet, const tipb::SelectRes throw Exception(fmt::format("Fail to serialize response, response size: {}", response.ByteSizeLong())); } -template -StreamingDAGResponseWriter::StreamingDAGResponseWriter( +template +StreamingDAGResponseWriter::StreamingDAGResponseWriter( StreamWriterPtr writer_, std::vector partition_col_ids_, TiDB::TiDBCollators collators_, @@ -46,7 +48,9 @@ StreamingDAGResponseWriter::StreamingDAGResponseWriter( Int64 records_per_chunk_, Int64 batch_send_min_limit_, bool should_send_exec_summary_at_last_, - DAGContext & dag_context_) + DAGContext & dag_context_, + uint64_t fine_grained_shuffle_stream_count_, + UInt64 fine_grained_shuffle_batch_size_) : DAGResponseWriter(records_per_chunk_, dag_context_) , batch_send_min_limit(batch_send_min_limit_) , should_send_exec_summary_at_last(should_send_exec_summary_at_last_) @@ -54,6 +58,8 @@ StreamingDAGResponseWriter::StreamingDAGResponseWriter( , writer(writer_) , partition_col_ids(std::move(partition_col_ids_)) , collators(std::move(collators_)) + , fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_) + , fine_grained_shuffle_batch_size(fine_grained_shuffle_batch_size_) { rows_in_blocks = 0; partition_num = writer_->getPartitionNum(); @@ -71,17 +77,37 @@ StreamingDAGResponseWriter::StreamingDAGResponseWriter( } } -template -void StreamingDAGResponseWriter::finishWrite() +template +void StreamingDAGResponseWriter::finishWrite() { if (should_send_exec_summary_at_last) - batchWrite(); + { + if constexpr (enable_fine_grained_shuffle) + { + assert(exchange_type == tipb::ExchangeType::Hash); + batchWriteFineGrainedShuffle(); + } + else + { + batchWrite(); + } + } else - batchWrite(); + { + if constexpr (enable_fine_grained_shuffle) + { + assert(exchange_type == tipb::ExchangeType::Hash); + batchWriteFineGrainedShuffle(); + } + else + { + batchWrite(); + } + } } -template -void StreamingDAGResponseWriter::write(const Block & block) +template +void StreamingDAGResponseWriter::write(const Block & block) { if (block.columns() != dag_context.result_field_types.size()) throw TiFlashException("Output column size mismatch with field type size", Errors::Coprocessor::Internal); @@ -91,15 +117,23 @@ void StreamingDAGResponseWriter::write(const Block & block) { blocks.push_back(block); } - if (static_cast(rows_in_blocks) > (dag_context.encode_type == tipb::EncodeType::TypeCHBlock ? batch_send_min_limit : records_per_chunk - 1)) + + if constexpr (enable_fine_grained_shuffle) { - batchWrite(); + assert(exchange_type == tipb::ExchangeType::Hash); + if (static_cast(rows_in_blocks) >= fine_grained_shuffle_batch_size) + batchWriteFineGrainedShuffle(); + } + else + { + if (static_cast(rows_in_blocks) > (dag_context.encode_type == tipb::EncodeType::TypeCHBlock ? batch_send_min_limit : records_per_chunk - 1)) + batchWrite(); } } -template +template template -void StreamingDAGResponseWriter::encodeThenWriteBlocks( +void StreamingDAGResponseWriter::encodeThenWriteBlocks( const std::vector & input_blocks, tipb::SelectResponse & response) const { @@ -191,133 +225,238 @@ void StreamingDAGResponseWriter::encodeThenWriteBlocks( } } -/// hash exchanging data among only TiFlash nodes. -template + +template template -void StreamingDAGResponseWriter::partitionAndEncodeThenWriteBlocks( - std::vector & input_blocks, - tipb::SelectResponse & response) const +void StreamingDAGResponseWriter::batchWrite() { - std::vector packet(partition_num); - - std::vector responses_row_count(partition_num); + tipb::SelectResponse response; + if constexpr (send_exec_summary_at_last) + addExecuteSummaries(response, !dag_context.isMPPTask() || dag_context.isRootMPPTask()); + if (exchange_type == tipb::ExchangeType::Hash) + { + partitionAndEncodeThenWriteBlocks(blocks, response); + } + else + { + encodeThenWriteBlocks(blocks, response); + } + blocks.clear(); + rows_in_blocks = 0; +} +template +template +void StreamingDAGResponseWriter::handleExecSummary( + const std::vector & input_blocks, + std::vector & packet, + tipb::SelectResponse & response) const +{ if constexpr (send_exec_summary_at_last) { /// Sending the response to only one node, default the first one. serializeToPacket(packet[0], response); - } - if (input_blocks.empty()) - { - if constexpr (send_exec_summary_at_last) + // No need to send data when blocks are not empty, + // because exec_summary will be sent together with blocks. + if (input_blocks.empty()) { for (auto part_id = 0; part_id < partition_num; ++part_id) { writer->write(packet[part_id], part_id); } } - return; } +} - // partition tuples in blocks - // 1) compute partition id - // 2) partition each row - // 3) encode each chunk and send it - std::vector partition_key_containers(collators.size()); - for (auto & block : input_blocks) +template +template +void StreamingDAGResponseWriter::writePackets(const std::vector & responses_row_count, + std::vector & packets) const +{ + for (size_t part_id = 0; part_id < packets.size(); ++part_id) { - std::vector dest_blocks(partition_num); - std::vector dest_tbl_cols(partition_num); - - for (size_t i = 0; i < block.columns(); ++i) + if constexpr (send_exec_summary_at_last) { - if (ColumnPtr converted = block.getByPosition(i).column->convertToFullColumnIfConst()) - { - block.getByPosition(i).column = converted; - } + writer->write(packets[part_id], part_id); } - - for (auto i = 0; i < partition_num; ++i) + else { - dest_tbl_cols[i] = block.cloneEmptyColumns(); - dest_blocks[i] = block.cloneEmpty(); + if (responses_row_count[part_id] > 0) + writer->write(packets[part_id], part_id); } + } +} - size_t rows = block.rows(); - WeakHash32 hash(rows); - - // get hash values by all partition key columns - for (size_t i = 0; i < partition_col_ids.size(); i++) +inline void initInputBlocks(std::vector & input_blocks) +{ + for (auto & input_block : input_blocks) + { + for (size_t i = 0; i < input_block.columns(); ++i) { - block.getByPosition(partition_col_ids[i]).column->updateWeakHash32(hash, collators[i], partition_key_containers[i]); + if (ColumnPtr converted = input_block.getByPosition(i).column->convertToFullColumnIfConst()) + input_block.getByPosition(i).column = converted; } - const auto & hash_data = hash.getData(); + } +} - // partition each row - IColumn::Selector selector(rows); - for (size_t row = 0; row < rows; ++row) - { - /// Row from interval [(2^32 / partition_num) * i, (2^32 / partition_num) * (i + 1)) goes to bucket with number i. - selector[row] = hash_data[row]; /// [0, 2^32) - selector[row] *= partition_num; /// [0, partition_num * 2^32), selector stores 64 bit values. - selector[row] >>= 32u; /// [0, partition_num) - } +inline void initDestColumns(const Block & input_block, std::vector & dest_tbl_cols) +{ + for (auto & cols : dest_tbl_cols) + { + cols = input_block.cloneEmptyColumns(); + } +} - for (size_t col_id = 0; col_id < block.columns(); ++col_id) - { - // Scatter columns to different partitions - auto scattered_columns = block.getByPosition(col_id).column->scatter(partition_num, selector); - for (size_t part_id = 0; part_id < partition_num; ++part_id) - { - dest_tbl_cols[part_id][col_id] = std::move(scattered_columns[part_id]); - } - } - // serialize each partitioned block and write it to its destination - for (auto part_id = 0; part_id < partition_num; ++part_id) - { - dest_blocks[part_id].setColumns(std::move(dest_tbl_cols[part_id])); - responses_row_count[part_id] += dest_blocks[part_id].rows(); - chunk_codec_stream->encode(dest_blocks[part_id], 0, dest_blocks[part_id].rows()); - packet[part_id].add_chunks(chunk_codec_stream->getString()); - chunk_codec_stream->clear(); - } +void computeHash(const Block & input_block, + uint32_t bucket_num, + const TiDB::TiDBCollators & collators, + std::vector & partition_key_containers, + const std::vector & partition_col_ids, + std::vector> & result_columns) +{ + size_t rows = input_block.rows(); + WeakHash32 hash(rows); + + // get hash values by all partition key columns + for (size_t i = 0; i < partition_col_ids.size(); ++i) + { + input_block.getByPosition(partition_col_ids[i]).column->updateWeakHash32(hash, collators[i], partition_key_containers[i]); } - for (auto part_id = 0; part_id < partition_num; ++part_id) + const auto & hash_data = hash.getData(); + + // partition each row + IColumn::Selector selector(rows); + for (size_t row = 0; row < rows; ++row) { - if constexpr (send_exec_summary_at_last) + /// Row from interval [(2^32 / bucket_num) * i, (2^32 / bucket_num) * (i + 1)) goes to bucket with number i. + selector[row] = hash_data[row]; /// [0, 2^32) + selector[row] *= bucket_num; /// [0, bucket_num * 2^32), selector stores 64 bit values. + selector[row] >>= 32u; /// [0, bucket_num) + } + + for (size_t col_id = 0; col_id < input_block.columns(); ++col_id) + { + // Scatter columns to different partitions + std::vector part_columns = input_block.getByPosition(col_id).column->scatter(bucket_num, selector); + assert(part_columns.size() == bucket_num); + for (size_t bucket_idx = 0; bucket_idx < bucket_num; ++bucket_idx) { - writer->write(packet[part_id], part_id); + result_columns[bucket_idx][col_id] = std::move(part_columns[bucket_idx]); } - else + } +} + +/// Hash exchanging data among only TiFlash nodes. Only be called when enable_fine_grained_shuffle is false. +template +template +void StreamingDAGResponseWriter::partitionAndEncodeThenWriteBlocks( + std::vector & input_blocks, + tipb::SelectResponse & response) const +{ + static_assert(!enable_fine_grained_shuffle); + std::vector packet(partition_num); + std::vector responses_row_count(partition_num); + handleExecSummary(input_blocks, packet, response); + if (input_blocks.empty()) + return; + + initInputBlocks(input_blocks); + Block dest_block = input_blocks[0].cloneEmpty(); + std::vector partition_key_containers(collators.size()); + for (const auto & block : input_blocks) + { + std::vector dest_tbl_cols(partition_num); + initDestColumns(block, dest_tbl_cols); + + computeHash(block, partition_num, collators, partition_key_containers, partition_col_ids, dest_tbl_cols); + + for (size_t part_id = 0; part_id < partition_num; ++part_id) { - if (responses_row_count[part_id] > 0) - writer->write(packet[part_id], part_id); + dest_block.setColumns(std::move(dest_tbl_cols[part_id])); + responses_row_count[part_id] += dest_block.rows(); + chunk_codec_stream->encode(dest_block, 0, dest_block.rows()); + packet[part_id].add_chunks(chunk_codec_stream->getString()); + chunk_codec_stream->clear(); } } + + writePackets(responses_row_count, packet); } -template +/// Hash exchanging data among only TiFlash nodes. Only be called when enable_fine_grained_shuffle is true. +template template -void StreamingDAGResponseWriter::batchWrite() +void StreamingDAGResponseWriter::batchWriteFineGrainedShuffle() { + static_assert(enable_fine_grained_shuffle); + assert(exchange_type == tipb::ExchangeType::Hash); + assert(fine_grained_shuffle_stream_count <= 1024); + tipb::SelectResponse response; if constexpr (send_exec_summary_at_last) addExecuteSummaries(response, !dag_context.isMPPTask() || dag_context.isRootMPPTask()); - if (exchange_type == tipb::ExchangeType::Hash) - { - partitionAndEncodeThenWriteBlocks(blocks, response); - } - else + + std::vector packet(partition_num); + std::vector responses_row_count(partition_num, 0); + + // fine_grained_shuffle_stream_count is in [0, 1024], and partition_num is uint16_t, so will not overflow. + uint32_t bucket_num = partition_num * fine_grained_shuffle_stream_count; + handleExecSummary(blocks, packet, response); + if (!blocks.empty()) { - encodeThenWriteBlocks(blocks, response); + std::vector final_dest_tbl_columns(bucket_num); + initInputBlocks(blocks); + initDestColumns(blocks[0], final_dest_tbl_columns); + + // Hash partition input_blocks into bucket_num. + for (const auto & block : blocks) + { + std::vector partition_key_containers(collators.size()); + std::vector dest_tbl_columns(bucket_num); + initDestColumns(block, dest_tbl_columns); + computeHash(block, bucket_num, collators, partition_key_containers, partition_col_ids, dest_tbl_columns); + for (size_t bucket_idx = 0; bucket_idx < bucket_num; ++bucket_idx) + { + for (size_t col_id = 0; col_id < block.columns(); ++col_id) + { + const MutableColumnPtr & src_col = dest_tbl_columns[bucket_idx][col_id]; + final_dest_tbl_columns[bucket_idx][col_id]->insertRangeFrom(*src_col, 0, src_col->size()); + } + } + } + + // For i-th stream_count buckets, send to i-th tiflash node. + for (size_t bucket_idx = 0; bucket_idx < bucket_num; bucket_idx += fine_grained_shuffle_stream_count) + { + size_t part_id = bucket_idx / fine_grained_shuffle_stream_count; // NOLINT(clang-analyzer-core.DivideZero) + size_t row_count_per_part = 0; + for (uint64_t stream_idx = 0; stream_idx < fine_grained_shuffle_stream_count; ++stream_idx) + { + Block dest_block = blocks[0].cloneEmpty(); + // For now we put all rows into one Block, may cause this Block too large. + dest_block.setColumns(std::move(final_dest_tbl_columns[bucket_idx + stream_idx])); + row_count_per_part += dest_block.rows(); + + chunk_codec_stream->encode(dest_block, 0, dest_block.rows()); + packet[part_id].add_chunks(chunk_codec_stream->getString()); + packet[part_id].add_stream_ids(stream_idx); + chunk_codec_stream->clear(); + } + responses_row_count[part_id] = row_count_per_part; + } } + + writePackets(responses_row_count, packet); + blocks.clear(); rows_in_blocks = 0; } -template class StreamingDAGResponseWriter; -template class StreamingDAGResponseWriter; +template class StreamingDAGResponseWriter; +template class StreamingDAGResponseWriter; +template class StreamingDAGResponseWriter; +template class StreamingDAGResponseWriter; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h index 9b5e3864c64..cd7559d1e79 100644 --- a/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h +++ b/dbms/src/Flash/Coprocessor/StreamingDAGResponseWriter.h @@ -33,7 +33,7 @@ namespace DB /// Serializes the stream of blocks and sends them to TiDB or TiFlash with different serialization paths. /// When sending data to TiDB, blocks with extra info are written into tipb::SelectResponse, then the whole tipb::SelectResponse is further serialized into mpp::MPPDataPacket.data. /// Differently when sending data to TiFlash, blocks with only tuples are directly serialized into mpp::MPPDataPacket.chunks, but for the last block, its extra info (like execution summaries) is written into tipb::SelectResponse, then further serialized into mpp::MPPDataPacket.data. -template +template class StreamingDAGResponseWriter : public DAGResponseWriter { public: @@ -45,18 +45,30 @@ class StreamingDAGResponseWriter : public DAGResponseWriter Int64 records_per_chunk_, Int64 batch_send_min_limit_, bool should_send_exec_summary_at_last, - DAGContext & dag_context_); + DAGContext & dag_context_, + UInt64 fine_grained_shuffle_stream_count_, + UInt64 fine_grained_shuffle_batch_size); void write(const Block & block) override; void finishWrite() override; private: template void batchWrite(); + template + void batchWriteFineGrainedShuffle(); + template void encodeThenWriteBlocks(const std::vector & input_blocks, tipb::SelectResponse & response) const; template void partitionAndEncodeThenWriteBlocks(std::vector & input_blocks, tipb::SelectResponse & response) const; + template + void handleExecSummary(const std::vector & input_blocks, + std::vector & packet, + tipb::SelectResponse & response) const; + template + void writePackets(const std::vector & responses_row_count, std::vector & packets) const; + Int64 batch_send_min_limit; bool should_send_exec_summary_at_last; /// only one stream needs to sending execution summaries at last. tipb::ExchangeType exchange_type; @@ -67,6 +79,8 @@ class StreamingDAGResponseWriter : public DAGResponseWriter size_t rows_in_blocks; uint16_t partition_num; std::unique_ptr chunk_codec_stream; + UInt64 fine_grained_shuffle_stream_count; + UInt64 fine_grained_shuffle_batch_size; }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_streaming_dag_writer.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_streaming_dag_writer.cpp new file mode 100644 index 00000000000..5d4186123b7 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/tests/gtest_streaming_dag_writer.cpp @@ -0,0 +1,184 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace DB +{ +namespace tests +{ + +using BlockPtr = std::shared_ptr; +class TestStreamingDAGResponseWriter : public testing::Test +{ +protected: + void SetUp() override + { + dag_context_ptr = std::make_unique(1024); + dag_context_ptr->encode_type = tipb::EncodeType::TypeCHBlock; + dag_context_ptr->is_mpp_task = true; + dag_context_ptr->is_root_mpp_task = false; + dag_context_ptr->result_field_types = makeFields(); + context.setDAGContext(dag_context_ptr.get()); + } + +public: + TestStreamingDAGResponseWriter() + : context(TiFlashTestEnv::getContext()) + , part_col_ids{0} + , part_col_collators{ + TiDB::ITiDBCollator::getCollator(TiDB::ITiDBCollator::BINARY)} + {} + + // Return 10 Int64 column. + static std::vector makeFields() + { + std::vector fields(10); + for (int i = 0; i < 10; ++i) + { + fields[i].set_tp(TiDB::TypeLongLong); + } + return fields; + } + + // Return a block with **rows** and 10 Int64 column. + static BlockPtr prepareBlock(const std::vector & rows) + { + BlockPtr block = std::make_shared(); + for (int i = 0; i < 10; ++i) + { + DataTypePtr int64_data_type = std::make_shared(); + DataTypePtr nullable_int64_data_type = std::make_shared(int64_data_type); + MutableColumnPtr int64_col = nullable_int64_data_type->createColumn(); + for (Int64 r : rows) + { + int64_col->insert(Field(r)); + } + block->insert(ColumnWithTypeAndName{std::move(int64_col), + nullable_int64_data_type, + String("col") + std::to_string(i)}); + } + return block; + } + + Context context; + std::vector part_col_ids; + TiDB::TiDBCollators part_col_collators; + + std::unique_ptr dag_context_ptr; +}; + +using MockStreamWriterChecker = std::function; + +struct MockStreamWriter +{ + MockStreamWriter(MockStreamWriterChecker checker_, + uint16_t part_num_) + : checker(checker_) + , part_num(part_num_) + {} + + void write(mpp::MPPDataPacket &) { FAIL() << "cannot reach here, because we only expect hash partition"; } + void write(mpp::MPPDataPacket & packet, uint16_t part_id) { checker(packet, part_id); } + void write(tipb::SelectResponse &, uint16_t) { FAIL() << "cannot reach here, only consider CH Block format"; } + void write(tipb::SelectResponse &) { FAIL() << "cannot reach here, only consider CH Block format"; } + uint16_t getPartitionNum() const { return part_num; } + +private: + MockStreamWriterChecker checker; + uint16_t part_num; +}; + +// Input block data is distributed uniform. +// partition_num: 4 +// fine_grained_shuffle_stream_count: 8 +TEST_F(TestStreamingDAGResponseWriter, testBatchWriteFineGrainedShuffle) +try +{ + const size_t block_rows = 1024; + const uint16_t part_num = 4; + const uint32_t fine_grained_shuffle_stream_count = 8; + const Int64 fine_grained_shuffle_batch_size = 4096; + + // Set these to 1, because when fine grained shuffle is enabled, + // batchWriteFineGrainedShuffle() only check fine_grained_shuffle_batch_size. + // records_per_chunk and batch_send_min_limit are useless. + const Int64 records_per_chunk = 1; + const Int64 batch_send_min_limit = 1; + const bool should_send_exec_summary_at_last = true; + + // 1. Build Block. + std::vector uniform_data_set; + for (size_t i = 0; i < block_rows; ++i) + { + uniform_data_set.push_back(i); + } + BlockPtr block = prepareBlock(uniform_data_set); + + // 2. Build MockStreamWriter. + std::unordered_map write_report; + auto checker = [&write_report](mpp::MPPDataPacket & packet, uint16_t part_id) { + auto res = write_report.insert({part_id, packet}); + // Should always insert succeed. + // Because block.rows(1024) < fine_grained_shuffle_batch_size(4096), + // batchWriteFineGrainedShuffle() only called once, so will only be one packet for each partition. + ASSERT_TRUE(res.second); + }; + auto mock_writer = std::make_shared(checker, part_num); + + // 3. Start to write. + auto dag_writer = std::make_shared, /*enable_fine_grained_shuffle=*/true>>( + mock_writer, + part_col_ids, + part_col_collators, + tipb::ExchangeType::Hash, + records_per_chunk, + batch_send_min_limit, + should_send_exec_summary_at_last, + *dag_context_ptr, + fine_grained_shuffle_stream_count, + fine_grained_shuffle_batch_size); + dag_writer->write(*block); + dag_writer->finishWrite(); + + // 4. Start to check write_report. + std::vector decoded_blocks; + ASSERT_EQ(write_report.size(), part_num); + for (const auto & ele : write_report) + { + const mpp::MPPDataPacket & packet = ele.second; + ASSERT_EQ(packet.chunks_size(), packet.stream_ids_size()); + for (int i = 0; i < packet.chunks_size(); ++i) + { + decoded_blocks.push_back(CHBlockChunkCodec::decode(packet.chunks(i), *block)); + } + } + ASSERT_EQ(decoded_blocks.size(), fine_grained_shuffle_stream_count * part_num); + for (const auto & block : decoded_blocks) + { + ASSERT_EQ(block.rows(), block_rows / (fine_grained_shuffle_stream_count * part_num)); + } +} +CATCH + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp index 966babb832f..ab8d83a1481 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.cpp +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -48,6 +49,106 @@ String getReceiverStateStr(const ExchangeReceiverState & s) } } +// If enable_fine_grained_shuffle: +// Seperate chunks according to packet.stream_ids[i], then push to msg_channels[stream_id]. +// If fine grained_shuffle is disabled: +// Push all chunks to msg_channels[0]. +// Return true if all push succeed, otherwise return false. +// NOTE: shared_ptr will be hold by all ExchangeReceiverBlockInputStream to make chunk pointer valid. +template +bool pushPacket(size_t source_index, + const String & req_info, + MPPDataPacketPtr & packet, + const std::vector & msg_channels, + LoggerPtr & log) +{ + bool push_succeed = true; + + const mpp::Error * error_ptr = nullptr; + if (packet->has_error()) + error_ptr = &packet->error(); + const String * resp_ptr = nullptr; + if (!packet->data().empty()) + resp_ptr = &packet->data(); + + if constexpr (enable_fine_grained_shuffle) + { + std::vector> chunks(msg_channels.size()); + if (!packet->chunks().empty()) + { + // Packet not empty. + if (unlikely(packet->stream_ids().empty())) + { + // Fine grained shuffle is enabled in receiver, but sender didn't. We cannot handle this, so return error. + // This can happen when there are old version nodes when upgrading. + LOG_FMT_ERROR(log, "MPPDataPacket.stream_ids empty, it means ExchangeSender is old version of binary " + "(source_index: {}) while fine grained shuffle of ExchangeReceiver is enabled. " + "Cannot handle this.", + source_index); + return false; + } + // packet.stream_ids[i] is corresponding to packet.chunks[i], + // indicating which stream_id this chunk belongs to. + assert(packet->chunks_size() == packet->stream_ids_size()); + + for (int i = 0; i < packet->stream_ids_size(); ++i) + { + UInt64 stream_id = packet->stream_ids(i) % msg_channels.size(); + chunks[stream_id].push_back(&packet->chunks(i)); + } + } + // Still need to send error_ptr or resp_ptr even if packet.chunks_size() is zero. + for (size_t i = 0; i < msg_channels.size() && push_succeed; ++i) + { + if (resp_ptr == nullptr && error_ptr == nullptr && chunks[i].empty()) + continue; + + std::shared_ptr recv_msg = std::make_shared( + source_index, + req_info, + packet, + error_ptr, + resp_ptr, + std::move(chunks[i])); + push_succeed = msg_channels[i]->push(std::move(recv_msg)); + if constexpr (is_sync) + fiu_do_on(FailPoints::random_receiver_sync_msg_push_failure_failpoint, push_succeed = false;); + else + fiu_do_on(FailPoints::random_receiver_async_msg_push_failure_failpoint, push_succeed = false;); + + // Only the first ExchangeReceiverInputStream need to handle resp. + resp_ptr = nullptr; + } + } + else + { + std::vector chunks(packet->chunks_size()); + for (int i = 0; i < packet->chunks_size(); ++i) + { + chunks[i] = &packet->chunks(i); + } + + if (!(resp_ptr == nullptr && error_ptr == nullptr && chunks.empty())) + { + std::shared_ptr recv_msg = std::make_shared( + source_index, + req_info, + packet, + error_ptr, + resp_ptr, + std::move(chunks)); + + push_succeed = msg_channels[0]->push(std::move(recv_msg)); + if constexpr (is_sync) + fiu_do_on(FailPoints::random_receiver_sync_msg_push_failure_failpoint, push_succeed = false;); + else + fiu_do_on(FailPoints::random_receiver_async_msg_push_failure_failpoint, push_succeed = false;); + } + } + LOG_FMT_DEBUG(log, "push recv_msg to msg_channels(size: {}) succeed:{}, enable_fine_grained_shuffle: {}", msg_channels.size(), push_succeed, enable_fine_grained_shuffle); + return push_succeed; +} + enum class AsyncRequestStage { NEED_INIT, @@ -64,25 +165,25 @@ using TimePoint = Clock::time_point; constexpr Int32 max_retry_times = 10; constexpr Int32 batch_packet_count = 16; -template +template class AsyncRequestHandler : public UnaryCallback { public: using Status = typename RPCContext::Status; using Request = typename RPCContext::Request; using AsyncReader = typename RPCContext::AsyncReader; - using Self = AsyncRequestHandler; + using Self = AsyncRequestHandler; AsyncRequestHandler( MPMCQueue * queue, - MPMCQueue> * msg_channel_, + std::vector * msg_channels_, const std::shared_ptr & context, const Request & req, const String & req_id) : rpc_context(context) , request(&req) , notify_queue(queue) - , msg_channel(msg_channel_) + , msg_channels(msg_channels_) , req_info(fmt::format("tunnel{}+{}", req.send_task_id, req.recv_task_id)) , log(Logger::get("ExchangeReceiver", req_id, req_info)) { @@ -260,13 +361,7 @@ class AsyncRequestHandler : public UnaryCallback for (size_t i = 0; i < read_packet_index; ++i) { auto & packet = packets[i]; - auto recv_msg = std::make_shared(); - recv_msg->packet = std::move(packet); - recv_msg->source_index = request->source_index; - recv_msg->req_info = req_info; - bool push_success = msg_channel->push(std::move(recv_msg)); - fiu_do_on(FailPoints::random_receiver_async_msg_push_failure_failpoint, push_success = false;); - if (!push_success) + if (!pushPacket(request->source_index, req_info, packet, *msg_channels, log)) return false; // can't reuse packet since it is sent to readers. packet = std::make_shared(); @@ -283,7 +378,7 @@ class AsyncRequestHandler : public UnaryCallback std::shared_ptr rpc_context; const Request * request; // won't be null MPMCQueue * notify_queue; // won't be null - MPMCQueue> * msg_channel; // won't be null + std::vector * msg_channels; // won't be null String req_info; bool meet_error = false; @@ -308,20 +403,32 @@ ExchangeReceiverBase::ExchangeReceiverBase( size_t source_num_, size_t max_streams_, const String & req_id, - const String & executor_id) + const String & executor_id, + uint64_t fine_grained_shuffle_stream_count_) : rpc_context(std::move(rpc_context_)) , source_num(source_num_) , max_streams(max_streams_) , max_buffer_size(std::max(batch_packet_count, std::max(source_num, max_streams_) * 2)) , thread_manager(newThreadManager()) - , msg_channel(max_buffer_size) , live_connections(source_num) , state(ExchangeReceiverState::NORMAL) , exc_log(Logger::get("ExchangeReceiver", req_id, executor_id)) , collected(false) + , fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_) { try { + if (enableFineGrainedShuffle(fine_grained_shuffle_stream_count_)) + { + for (size_t i = 0; i < max_streams_; ++i) + { + msg_channels.push_back(std::make_unique>>(max_buffer_size)); + } + } + else + { + msg_channels.push_back(std::make_unique>>(max_buffer_size)); + } rpc_context->fillSchema(schema); setUpConnection(); } @@ -358,14 +465,14 @@ template void ExchangeReceiverBase::cancel() { setEndState(ExchangeReceiverState::CANCELED); - msg_channel.cancel(); + cancelAllMsgChannels(); } template void ExchangeReceiverBase::close() { setEndState(ExchangeReceiverState::CLOSED); - msg_channel.finish(); + finishAllMsgChannels(); } template @@ -380,7 +487,12 @@ void ExchangeReceiverBase::setUpConnection() async_requests.push_back(std::move(req)); else { - thread_manager->schedule(true, "Receiver", [this, req = std::move(req)] { readLoop(req); }); + thread_manager->schedule(true, "Receiver", [this, req = std::move(req)] { + if (enableFineGrainedShuffle(fine_grained_shuffle_stream_count)) + readLoop(req); + else + readLoop(req); + }); ++thread_count; } } @@ -388,15 +500,21 @@ void ExchangeReceiverBase::setUpConnection() // TODO: reduce this thread in the future. if (!async_requests.empty()) { - thread_manager->schedule(true, "RecvReactor", [this, async_requests = std::move(async_requests)] { reactor(async_requests); }); + thread_manager->schedule(true, "RecvReactor", [this, async_requests = std::move(async_requests)] { + if (enableFineGrainedShuffle(fine_grained_shuffle_stream_count)) + reactor(async_requests); + else + reactor(async_requests); + }); ++thread_count; } } template +template void ExchangeReceiverBase::reactor(const std::vector & async_requests) { - using AsyncHandler = AsyncRequestHandler; + using AsyncHandler = AsyncRequestHandler; GET_METRIC(tiflash_thread_count, type_threads_of_receiver_reactor).Increment(); SCOPE_EXIT({ @@ -412,7 +530,7 @@ void ExchangeReceiverBase::reactor(const std::vector & asyn std::vector> handlers; handlers.reserve(alive_async_connections); for (const auto & req : async_requests) - handlers.emplace_back(std::make_unique(&ready_requests, &msg_channel, rpc_context, req, exc_log->identifier())); + handlers.emplace_back(std::make_unique(&ready_requests, &msg_channels, rpc_context, req, exc_log->identifier())); while (alive_async_connections > 0) { @@ -457,6 +575,7 @@ void ExchangeReceiverBase::reactor(const std::vector & asyn } template +template void ExchangeReceiverBase::readLoop(const Request & req) { GET_METRIC(tiflash_thread_count, type_threads_of_receiver_read_loop).Increment(); @@ -481,20 +600,15 @@ void ExchangeReceiverBase::readLoop(const Request & req) for (;;) { LOG_FMT_TRACE(log, "begin next "); - auto recv_msg = std::make_shared(); - recv_msg->packet = std::make_shared(); - recv_msg->req_info = req_info; - recv_msg->source_index = req.source_index; - bool success = reader->read(recv_msg->packet); + MPPDataPacketPtr packet = std::make_shared(); + bool success = reader->read(packet); if (!success) break; has_data = true; - if (recv_msg->packet->has_error()) - throw Exception("Exchange receiver meet error : " + recv_msg->packet->error().msg()); + if (packet->has_error()) + throw Exception("Exchange receiver meet error : " + packet->error().msg()); - bool push_success = msg_channel.push(std::move(recv_msg)); - fiu_do_on(FailPoints::random_receiver_sync_msg_push_failure_failpoint, push_success = false;); - if (!push_success) + if (!pushPacket(req.source_index, req_info, packet, msg_channels, log)) { meet_error = true; auto local_state = getState(); @@ -564,15 +678,15 @@ DecodeDetail ExchangeReceiverBase::decodeChunks( assert(recv_msg != nullptr); DecodeDetail detail; - int chunk_size = recv_msg->packet->chunks_size(); - if (chunk_size == 0) + if (recv_msg->chunks.empty()) return detail; + // Record total packet size even if fine grained shuffle is enabled. detail.packet_bytes = recv_msg->packet->ByteSizeLong(); - /// ExchangeReceiverBase should receive chunks of TypeCHBlock - for (int i = 0; i < chunk_size; ++i) + + for (const String * chunk : recv_msg->chunks) { - Block block = CHBlockChunkCodec::decode(recv_msg->packet->chunks(i), header); + Block block = CHBlockChunkCodec::decode(*chunk, header); detail.rows += block.rows(); if (unlikely(block.rows() == 0)) continue; @@ -582,10 +696,15 @@ DecodeDetail ExchangeReceiverBase::decodeChunks( } template -ExchangeReceiverResult ExchangeReceiverBase::nextResult(std::queue & block_queue, const Block & header) +ExchangeReceiverResult ExchangeReceiverBase::nextResult(std::queue & block_queue, const Block & header, size_t stream_id) { + if (unlikely(stream_id >= msg_channels.size())) + { + LOG_FMT_ERROR(exc_log, "stream_id out of range, stream_id: {}, total_stream_count: {}", stream_id, msg_channels.size()); + return {nullptr, 0, "", true, "stream_id out of range", false}; + } std::shared_ptr recv_msg; - if (!msg_channel.pop(recv_msg)) + if (!msg_channels[stream_id]->pop(recv_msg)) { std::unique_lock lock(mu); @@ -607,29 +726,32 @@ ExchangeReceiverResult ExchangeReceiverBase::nextResult(std::queuepacket != nullptr); + assert(recv_msg != nullptr); ExchangeReceiverResult result; - if (recv_msg->packet->has_error()) + if (recv_msg->error_ptr != nullptr) { - result = {nullptr, recv_msg->source_index, recv_msg->req_info, true, recv_msg->packet->error().msg(), false}; + result = {nullptr, recv_msg->source_index, recv_msg->req_info, true, recv_msg->error_ptr->msg(), false}; } else { - if (!recv_msg->packet->data().empty()) /// the data of the last packet is serialized from tipb::SelectResponse including execution summaries. + if (recv_msg->resp_ptr != nullptr) /// the data of the last packet is serialized from tipb::SelectResponse including execution summaries. { - auto resp_ptr = std::make_shared(); - if (!resp_ptr->ParseFromString(recv_msg->packet->data())) + auto select_resp = std::make_shared(); + if (!select_resp->ParseFromString(*(recv_msg->resp_ptr))) { result = {nullptr, recv_msg->source_index, recv_msg->req_info, true, "decode error", false}; } else { - result = {resp_ptr, recv_msg->source_index, recv_msg->req_info, false, "", false}; - /// If mocking TiFlash as TiDB, here should decode chunks from resp_ptr. - if (!resp_ptr->chunks().empty()) + result = {select_resp, recv_msg->source_index, recv_msg->req_info, false, "", false}; + /// If mocking TiFlash as TiDB, here should decode chunks from select_resp. + if (!select_resp->chunks().empty()) { - assert(recv_msg->packet->chunks().empty()); - result.decode_detail = CoprocessorReader::decodeChunks(resp_ptr, block_queue, header, schema); + assert(recv_msg->chunks.empty()); + // Fine grained shuffle should only be enabled when sending data to TiFlash node. + // So all data should be encoded into MPPDataPacket.chunks. + RUNTIME_CHECK(!enableFineGrainedShuffle(fine_grained_shuffle_stream_count), Exception, "Data should not be encoded into tipb::SelectResponse.chunks when fine grained shuffle is enabled"); + result.decode_detail = CoprocessorReader::decodeChunks(select_resp, block_queue, header, schema); } } } @@ -637,7 +759,7 @@ ExchangeReceiverResult ExchangeReceiverBase::nextResult(std::queuesource_index, recv_msg->req_info, false, "", false}; } - if (!result.meet_error && !recv_msg->packet->chunks().empty()) + if (!result.meet_error && !recv_msg->chunks.empty()) { assert(result.decode_detail.rows == 0); result.decode_detail = decodeChunks(recv_msg, block_queue, header); @@ -699,7 +821,21 @@ void ExchangeReceiverBase::connectionDone( throw Exception("live_connections should not be less than 0!"); if (meet_error || copy_live_conn == 0) - msg_channel.finish(); + finishAllMsgChannels(); +} + +template +void ExchangeReceiverBase::finishAllMsgChannels() +{ + for (auto & msg_channel : msg_channels) + msg_channel->finish(); +} + +template +void ExchangeReceiverBase::cancelAllMsgChannels() +{ + for (auto & msg_channel : msg_channels) + msg_channel->cancel(); } /// Explicit template instantiations - to avoid code bloat in headers. diff --git a/dbms/src/Flash/Mpp/ExchangeReceiver.h b/dbms/src/Flash/Mpp/ExchangeReceiver.h index 830dc6241a9..708f133f226 100644 --- a/dbms/src/Flash/Mpp/ExchangeReceiver.h +++ b/dbms/src/Flash/Mpp/ExchangeReceiver.h @@ -35,9 +35,28 @@ namespace DB { struct ReceivedMessage { - std::shared_ptr packet; - size_t source_index = 0; + size_t source_index; String req_info; + // shared_ptr is copied to make sure error_ptr, resp_ptr and chunks are valid. + const std::shared_ptr packet; + const mpp::Error * error_ptr; + const String * resp_ptr; + std::vector chunks; + + // Constructor that move chunks. + ReceivedMessage(size_t source_index_, + const String & req_info_, + const std::shared_ptr & packet_, + const mpp::Error * error_ptr_, + const String * resp_ptr_, + std::vector && chunks_) + : source_index(source_index_) + , req_info(req_info_) + , packet(packet_) + , error_ptr(error_ptr_) + , resp_ptr(resp_ptr_) + , chunks(chunks_) + {} }; struct ExchangeReceiverResult @@ -78,6 +97,7 @@ enum class ExchangeReceiverState CLOSED, }; +using MsgChannelPtr = std::unique_ptr>>; template class ExchangeReceiverBase @@ -92,7 +112,8 @@ class ExchangeReceiverBase size_t source_num_, size_t max_streams_, const String & req_id, - const String & executor_id); + const String & executor_id, + uint64_t fine_grained_shuffle_stream_count); ~ExchangeReceiverBase(); @@ -104,9 +125,11 @@ class ExchangeReceiverBase ExchangeReceiverResult nextResult( std::queue & block_queue, - const Block & header); + const Block & header, + size_t stream_id); size_t getSourceNum() const { return source_num; } + uint64_t getFineGrainedShuffleStreamCount() const { return fine_grained_shuffle_stream_count; } int computeNewThreadCount() const { return thread_count; } @@ -128,7 +151,10 @@ class ExchangeReceiverBase using Request = typename RPCContext::Request; void setUpConnection(); + // Template argument enable_fine_grained_shuffle will be setup properly in setUpConnection(). + template void readLoop(const Request & req); + template void reactor(const std::vector & async_requests); bool setEndState(ExchangeReceiverState new_state); @@ -139,12 +165,14 @@ class ExchangeReceiverBase std::queue & block_queue, const Block & header); - void connectionDone( bool meet_error, const String & local_err_msg, const LoggerPtr & log); + void finishAllMsgChannels(); + void cancelAllMsgChannels(); + std::shared_ptr rpc_context; const tipb::ExchangeReceiver pb_exchange_receiver; @@ -156,7 +184,7 @@ class ExchangeReceiverBase std::shared_ptr thread_manager; DAGSchema schema; - MPMCQueue> msg_channel; + std::vector msg_channels; std::mutex mu; /// should lock `mu` when visit these members @@ -168,6 +196,7 @@ class ExchangeReceiverBase bool collected = false; int thread_count = 0; + uint64_t fine_grained_shuffle_stream_count; }; class ExchangeReceiver : public ExchangeReceiverBase diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index da8f3034abc..7ddc6af361f 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -172,7 +172,8 @@ void MPPTask::initExchangeReceivers() executor.exchange_receiver().encoded_task_meta_size(), context->getMaxStreams(), log->identifier(), - executor_id); + executor_id, + executor.fine_grained_shuffle_stream_count()); if (status != RUNNING) throw Exception("exchange receiver map can not be initialized, because the task is not in running state"); diff --git a/dbms/src/Flash/tests/WindowTestUtil.h b/dbms/src/Flash/tests/WindowTestUtil.h index 3f4cb7d595f..b7385380419 100644 --- a/dbms/src/Flash/tests/WindowTestUtil.h +++ b/dbms/src/Flash/tests/WindowTestUtil.h @@ -39,9 +39,9 @@ inline void mockExecuteProject(std::shared_ptr & mock_ mock_interpreter->executeProject(pipeline, final_project); } -inline void mockExecuteWindowOrder(std::shared_ptr & mock_interpreter, DAGPipeline & pipeline, const tipb::Sort & sort) +inline void mockExecuteWindowOrder(std::shared_ptr & mock_interpreter, DAGPipeline & pipeline, const tipb::Sort & sort, uint64_t fine_grained_shuffle_stream_count) { - mock_interpreter->handleWindowOrder(pipeline, sort); + mock_interpreter->handleWindowOrder(pipeline, sort, ::DB::enableFineGrainedShuffle(fine_grained_shuffle_stream_count)); mock_interpreter->input_streams_vec[0] = pipeline.streams; NamesWithAliases final_project; for (const auto & column : (*mock_interpreter->analyzer).source_columns) @@ -51,16 +51,9 @@ inline void mockExecuteWindowOrder(std::shared_ptr & m mockExecuteProject(mock_interpreter, pipeline, final_project); } -inline void mockExecuteWindowOrder(std::shared_ptr & mock_interpreter, DAGPipeline & pipeline, const String & sort_json) +inline void mockExecuteWindow(std::shared_ptr & mock_interpreter, DAGPipeline & pipeline, const tipb::Window & window, uint64_t fine_grained_shuffle_stream_count) { - tipb::Sort sort; - ::google::protobuf::util::JsonStringToMessage(sort_json, &sort); - mockExecuteWindowOrder(mock_interpreter, pipeline, sort); -} - -inline void mockExecuteWindow(std::shared_ptr & mock_interpreter, DAGPipeline & pipeline, const tipb::Window & window) -{ - mock_interpreter->handleWindow(pipeline, window); + mock_interpreter->handleWindow(pipeline, window, ::DB::enableFineGrainedShuffle(fine_grained_shuffle_stream_count)); mock_interpreter->input_streams_vec[0] = pipeline.streams; NamesWithAliases final_project; for (const auto & column : (*mock_interpreter->analyzer).source_columns) @@ -70,12 +63,5 @@ inline void mockExecuteWindow(std::shared_ptr & mock_i mockExecuteProject(mock_interpreter, pipeline, final_project); } -inline void mockExecuteWindow(std::shared_ptr & mock_interpreter, DAGPipeline & pipeline, std::string window_json_str) -{ - tipb::Window window; - google::protobuf::util::JsonStringToMessage(window_json_str, &window); - mockExecuteWindow(mock_interpreter, pipeline, window); -} - } // namespace tests } // namespace DB diff --git a/dbms/src/Flash/tests/bench_exchange.cpp b/dbms/src/Flash/tests/bench_exchange.cpp index cbbdf060580..d6e3f3e825e 100644 --- a/dbms/src/Flash/tests/bench_exchange.cpp +++ b/dbms/src/Flash/tests/bench_exchange.cpp @@ -47,29 +47,46 @@ MockFixedRowsBlockInputStream::MockFixedRowsBlockInputStream(size_t total_rows_, , blocks(blocks_) {} -Block makeBlock(int row_num) +Block makeBlock(int row_num, bool skew) { - std::mt19937 mt(rd()); - std::uniform_int_distribution int64_dist; - std::uniform_int_distribution len_dist(10, 20); - std::uniform_int_distribution char_dist; - InferredDataVector> int64_vec; InferredDataVector> int64_vec2; - for (int i = 0; i < row_num; ++i) + InferredDataVector> string_vec; + + if (skew) { - int64_vec.emplace_back(int64_dist(mt)); - int64_vec2.emplace_back(int64_dist(mt)); + for (int i = 0; i < row_num; ++i) + { + int64_vec.emplace_back(100); + int64_vec2.emplace_back(100); + } + + for (int i = 0; i < row_num; ++i) + { + string_vec.push_back("abcdefg"); + } } - - InferredDataVector> string_vec; - for (int i = 0; i < row_num; ++i) + else { - int len = len_dist(mt); - String s; - for (int j = 0; j < len; ++j) - s.push_back(char_dist(mt)); - string_vec.push_back(std::move(s)); + std::mt19937 mt(rd()); + std::uniform_int_distribution int64_dist; + std::uniform_int_distribution len_dist(10, 20); + std::uniform_int_distribution char_dist; + + for (int i = 0; i < row_num; ++i) + { + int64_vec.emplace_back(int64_dist(mt)); + int64_vec2.emplace_back(int64_dist(mt)); + } + + for (int i = 0; i < row_num; ++i) + { + int len = len_dist(mt); + String s; + for (int j = 0; j < len; ++j) + s.push_back(char_dist(mt)); + string_vec.push_back(std::move(s)); + } } auto int64_data_type = makeDataType>(); @@ -82,11 +99,11 @@ Block makeBlock(int row_num) return Block({int64_column, string_column, int64_column2}); } -std::vector makeBlocks(int block_num, int row_num) +std::vector makeBlocks(int block_num, int row_num, bool skew) { std::vector blocks; for (int i = 0; i < block_num; ++i) - blocks.push_back(makeBlock(row_num)); + blocks.push_back(makeBlock(row_num, skew)); return blocks; } @@ -139,32 +156,10 @@ void printException(const Exception & e) << e.getStackTrace().toString() << std::endl; } -void sendPacket(const std::vector & packets, const PacketQueuePtr & queue, StopFlag & stop_flag) -{ - std::mt19937 mt(rd()); - std::uniform_int_distribution dist(0, packets.size() - 1); - - while (!stop_flag.load()) - { - int i = dist(mt); - queue->tryPush(packets[i], std::chrono::milliseconds(10)); - } - queue->finish(); -} - -void receivePacket(const PacketQueuePtr & queue) -{ - while (true) - { - PacketPtr packet; - if (!queue->pop(packet)) - break; - } -} - -ReceiverHelper::ReceiverHelper(int concurrency_, int source_num_) +ReceiverHelper::ReceiverHelper(int concurrency_, int source_num_, uint32_t fine_grained_shuffle_stream_count_) : concurrency(concurrency_) , source_num(source_num_) + , fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_) { pb_exchange_receiver.set_tp(tipb::Hash); for (int i = 0; i < source_num; ++i) @@ -198,16 +193,21 @@ MockExchangeReceiverPtr ReceiverHelper::buildReceiver() source_num, concurrency, "mock_req_id", - "mock_exchange_receiver_id"); + "mock_exchange_receiver_id", + fine_grained_shuffle_stream_count); } std::vector ReceiverHelper::buildExchangeReceiverStream() { auto receiver = buildReceiver(); std::vector streams(concurrency); + // NOTE: check if need fine_grained_shuffle_stream_count for (int i = 0; i < concurrency; ++i) { - streams[i] = std::make_shared(receiver, "mock_req_id", "mock_executor_id" + std::to_string(i)); + streams[i] = std::make_shared(receiver, + "mock_req_id", + "mock_executor_id" + std::to_string(i), + /*stream_id=*/enableFineGrainedShuffle(fine_grained_shuffle_stream_count) ? i : 0); } return streams; } @@ -230,10 +230,14 @@ void ReceiverHelper::finish() SenderHelper::SenderHelper( int source_num_, int concurrency_, + uint32_t fine_grained_shuffle_stream_count_, + int64_t fine_grained_shuffle_batch_size_, const std::vector & queues_, const std::vector & fields) : source_num(source_num_) , concurrency(concurrency_) + , fine_grained_shuffle_stream_count(fine_grained_shuffle_stream_count_) + , fine_grained_shuffle_batch_size(fine_grained_shuffle_batch_size_) , queues(queues_) { mpp::TaskMeta task_meta; @@ -277,17 +281,38 @@ BlockInputStreamPtr SenderHelper::buildUnionStream( for (int i = 0; i < concurrency; ++i) { BlockInputStreamPtr stream = std::make_shared(blocks, stop_flag); - std::unique_ptr response_writer( - new StreamingDAGResponseWriter( - tunnel_set, - {0, 1, 2}, - TiDB::TiDBCollators(3), - tipb::Hash, - -1, - -1, - true, - *dag_context)); - send_streams.push_back(std::make_shared(stream, std::move(response_writer), /*req_id=*/"")); + if (enableFineGrainedShuffle(fine_grained_shuffle_stream_count)) + { + std::unique_ptr response_writer( + new StreamingDAGResponseWriter( + tunnel_set, + {0, 1, 2}, + TiDB::TiDBCollators(3), + tipb::Hash, + -1, + -1, + true, + *dag_context, + fine_grained_shuffle_stream_count, + fine_grained_shuffle_batch_size)); + send_streams.push_back(std::make_shared(stream, std::move(response_writer), /*req_id=*/"")); + } + else + { + std::unique_ptr response_writer( + new StreamingDAGResponseWriter( + tunnel_set, + {0, 1, 2}, + TiDB::TiDBCollators(3), + tipb::Hash, + -1, + -1, + true, + *dag_context, + fine_grained_shuffle_stream_count, + fine_grained_shuffle_batch_size)); + send_streams.push_back(std::make_shared(stream, std::move(response_writer), /*req_id=*/"")); + } } return std::make_shared>(send_streams, BlockInputStreams{}, concurrency, /*req_id=*/""); @@ -299,17 +324,38 @@ BlockInputStreamPtr SenderHelper::buildUnionStream(size_t total_rows, const std: for (int i = 0; i < concurrency; ++i) { BlockInputStreamPtr stream = std::make_shared(total_rows / concurrency, blocks); - std::unique_ptr response_writer( - new StreamingDAGResponseWriter( - tunnel_set, - {0, 1, 2}, - TiDB::TiDBCollators(3), - tipb::Hash, - -1, - -1, - true, - *dag_context)); - send_streams.push_back(std::make_shared(stream, std::move(response_writer), /*req_id=*/"")); + if (enableFineGrainedShuffle(fine_grained_shuffle_stream_count)) + { + std::unique_ptr response_writer( + new StreamingDAGResponseWriter( + tunnel_set, + {0, 1, 2}, + TiDB::TiDBCollators(3), + tipb::Hash, + -1, + -1, + true, + *dag_context, + fine_grained_shuffle_stream_count, + fine_grained_shuffle_batch_size)); + send_streams.push_back(std::make_shared(stream, std::move(response_writer), /*req_id=*/"")); + } + else + { + std::unique_ptr response_writer( + new StreamingDAGResponseWriter( + tunnel_set, + {0, 1, 2}, + TiDB::TiDBCollators(3), + tipb::Hash, + -1, + -1, + true, + *dag_context, + fine_grained_shuffle_stream_count, + fine_grained_shuffle_batch_size)); + send_streams.push_back(std::make_shared(stream, std::move(response_writer), /*req_id=*/"")); + } } return std::make_shared>(send_streams, BlockInputStreams{}, concurrency, /*req_id=*/""); @@ -327,13 +373,12 @@ void SenderHelper::finish() void ExchangeBench::SetUp(const benchmark::State &) { - Poco::Logger::root().setLevel("error"); - DynamicThreadPool::global_instance = std::make_unique( /*fixed_thread_num=*/300, std::chrono::milliseconds(100000)); - input_blocks = makeBlocks(/*block_num=*/100, /*row_num=*/1024); + uniform_blocks = makeBlocks(/*block_num=*/100, /*row_num=*/1024); + skew_blocks = makeBlocks(/*block_num=*/100, /*row_num=*/1024, /*skew=*/true); try { @@ -348,7 +393,8 @@ void ExchangeBench::SetUp(const benchmark::State &) void ExchangeBench::TearDown(const benchmark::State &) { - input_blocks.clear(); + uniform_blocks.clear(); + skew_blocks.clear(); // NOTE: Must reset here, otherwise DynamicThreadPool::fixedWork() may core because metrics already destroyed. DynamicThreadPool::global_instance.reset(); } @@ -383,25 +429,38 @@ try const int concurrency = state.range(0); const int source_num = state.range(1); const int total_rows = state.range(2); + const int fine_grained_shuffle_stream_count = state.range(3); + const int fine_grained_shuffle_batch_size = state.range(4); Context context = TiFlashTestEnv::getContext(); for (auto _ : state) { - std::shared_ptr receiver_helper = std::make_shared(concurrency, source_num); + std::shared_ptr receiver_helper = std::make_shared(concurrency, source_num, fine_grained_shuffle_stream_count); BlockInputStreamPtr receiver_stream = receiver_helper->buildUnionStream(); std::shared_ptr sender_helper = std::make_shared(source_num, concurrency, + fine_grained_shuffle_stream_count, + fine_grained_shuffle_batch_size, receiver_helper->queues, receiver_helper->fields); - BlockInputStreamPtr sender_stream = sender_helper->buildUnionStream(total_rows, input_blocks); + BlockInputStreamPtr sender_stream = sender_helper->buildUnionStream(total_rows, uniform_blocks); runAndWait(receiver_helper, receiver_stream, sender_helper, sender_stream); } } CATCH BENCHMARK_REGISTER_F(ExchangeBench, basic_send_receive) - ->Args({8, 1, 1024 * 1000}); + ->Args({8, 1, 1024 * 1000, 0, 4096}) + ->Args({8, 1, 1024 * 1000, 4, 4096}) + ->Args({8, 1, 1024 * 1000, 8, 4096}) + ->Args({8, 1, 1024 * 1000, 16, 4096}) + ->Args({8, 1, 1024 * 1000, 32, 4096}) + ->Args({8, 1, 1024 * 1000, 8, 1}) + ->Args({8, 1, 1024 * 1000, 8, 1000}) + ->Args({8, 1, 1024 * 1000, 8, 10000}) + ->Args({8, 1, 1024 * 1000, 8, 100000}); + } // namespace tests } // namespace DB diff --git a/dbms/src/Flash/tests/bench_exchange.h b/dbms/src/Flash/tests/bench_exchange.h index 6b09e319613..d8300d45740 100644 --- a/dbms/src/Flash/tests/bench_exchange.h +++ b/dbms/src/Flash/tests/bench_exchange.h @@ -69,7 +69,9 @@ struct MockReceiverContext : queue(queue_) {} - void initialize() const {} + void initialize() const + { + } bool read(PacketPtr & packet [[maybe_unused]]) const { @@ -105,7 +107,8 @@ struct MockReceiverContext const std::vector & field_types_) : queues(queues_) , field_types(field_types_) - {} + { + } void fillSchema(DAGSchema & schema) const { @@ -220,8 +223,8 @@ struct MockFixedRowsBlockInputStream : public IProfilingBlockInputStream } }; -Block makeBlock(int row_num); -std::vector makeBlocks(int block_num, int row_num); +Block makeBlock(int row_num, bool skew = false); +std::vector makeBlocks(int block_num, int row_num, bool skew = false); mpp::MPPDataPacket makePacket(ChunkCodecStream & codec, int row_num); std::vector makePackets(ChunkCodecStream & codec, int packet_num, int row_num); std::vector makePacketQueues(int source_num, int queue_size); @@ -234,17 +237,17 @@ struct ReceiverHelper { const int concurrency; const int source_num; + const uint32_t fine_grained_shuffle_stream_count; tipb::ExchangeReceiver pb_exchange_receiver; std::vector fields; mpp::TaskMeta task_meta; std::vector queues; std::shared_ptr join_ptr; - explicit ReceiverHelper(int concurrency_, int source_num_); + explicit ReceiverHelper(int concurrency_, int source_num_, uint32_t fine_grained_shuffle_stream_count_); MockExchangeReceiverPtr buildReceiver(); std::vector buildExchangeReceiverStream(); BlockInputStreamPtr buildUnionStream(); - BlockInputStreamPtr buildUnionStreamWithHashJoinBuildStream(); void finish(); }; @@ -252,6 +255,8 @@ struct SenderHelper { const int source_num; const int concurrency; + const uint32_t fine_grained_shuffle_stream_count; + const int64_t fine_grained_shuffle_batch_size; std::vector queues; std::vector mock_writers; @@ -262,6 +267,8 @@ struct SenderHelper SenderHelper( int source_num_, int concurrency_, + uint32_t fine_grained_shuffle_stream_count_, + int64_t fine_grained_shuffle_batch_size_, const std::vector & queues_, const std::vector & fields); @@ -283,7 +290,8 @@ class ExchangeBench : public benchmark::Fixture std::shared_ptr & sender_helper, BlockInputStreamPtr sender_stream); - std::vector input_blocks; + std::vector uniform_blocks; + std::vector skew_blocks; }; diff --git a/dbms/src/Flash/tests/bench_window.cpp b/dbms/src/Flash/tests/bench_window.cpp index 356f544a836..75dc53b065b 100644 --- a/dbms/src/Flash/tests/bench_window.cpp +++ b/dbms/src/Flash/tests/bench_window.cpp @@ -24,9 +24,13 @@ class WindowFunctionBench : public ExchangeBench public: void SetUp(const benchmark::State & state) override { - // build tipb::Window and tipb::Sort. + // Using DAGRequestBuilder to build tipb::Window and tipb::Sort. // select row_number() over w1 from t1 window w1 as (partition by c1, c2, c3 order by c1, c2, c3); ExchangeBench::SetUp(state); + } + + static void setupPB(uint64_t fine_grained_shuffle_stream_count, tipb::Window & window, tipb::Sort & sort) + { MockColumnInfoVec columns{ {"c1", TiDB::TP::TypeLongLong}, {"c2", TiDB::TP::TypeString}, @@ -36,11 +40,12 @@ class WindowFunctionBench : public ExchangeBench DAGRequestBuilder builder(executor_index); builder .mockTable("test", "t1", columns) - .sort({{"c1", false}, {"c2", false}, {"c3", false}}, true) + .sort({{"c1", false}, {"c2", false}, {"c3", false}}, true, fine_grained_shuffle_stream_count) .window(RowNumber(), {{"c1", false}, {"c2", false}, {"c3", false}}, {{"c1", false}, {"c2", false}, {"c3", false}}, - buildDefaultRowsFrame()); + buildDefaultRowsFrame(), + fine_grained_shuffle_stream_count); tipb::DAGRequest req; MPPInfo mpp_info(0, -1, -1, {}, std::unordered_map>{}); builder.getRoot()->toTiPBExecutor(req.mutable_root_executor(), /*collator_id=*/0, mpp_info, TiFlashTestEnv::getContext()); @@ -50,13 +55,17 @@ class WindowFunctionBench : public ExchangeBench sort = window.child().sort(); } - void prepareWindowStream(Context & context, int concurrency, int source_num, int total_rows, const std::vector & blocks, BlockInputStreamPtr & sender_stream, BlockInputStreamPtr & receiver_stream, std::shared_ptr & sender_helper, std::shared_ptr & receiver_helper) const + static void prepareWindowStream(Context & context, int concurrency, int source_num, int total_rows, uint32_t fine_grained_shuffle_stream_count, uint64_t fine_grained_shuffle_batch_size, const std::vector & blocks, BlockInputStreamPtr & sender_stream, BlockInputStreamPtr & receiver_stream, std::shared_ptr & sender_helper, std::shared_ptr & receiver_helper, bool build_window = true) { + tipb::Window window; + tipb::Sort sort; + setupPB(fine_grained_shuffle_stream_count, window, sort); + DAGPipeline pipeline; - receiver_helper = std::make_shared(concurrency, source_num); + receiver_helper = std::make_shared(concurrency, source_num, fine_grained_shuffle_stream_count); pipeline.streams = receiver_helper->buildExchangeReceiverStream(); - sender_helper = std::make_shared(source_num, concurrency, receiver_helper->queues, receiver_helper->fields); + sender_helper = std::make_shared(source_num, concurrency, fine_grained_shuffle_stream_count, fine_grained_shuffle_batch_size, receiver_helper->queues, receiver_helper->fields); sender_stream = sender_helper->buildUnionStream(total_rows, blocks); context.setDAGContext(sender_helper->dag_context.get()); @@ -66,16 +75,16 @@ class WindowFunctionBench : public ExchangeBench NameAndTypePair("c3", makeNullable(std::make_shared()))}; auto mock_interpreter = mockInterpreter(context, source_columns, concurrency); mock_interpreter->input_streams_vec.push_back(pipeline.streams); - mockExecuteWindowOrder(mock_interpreter, pipeline, sort); - mockExecuteWindow(mock_interpreter, pipeline, window); + mockExecuteWindowOrder(mock_interpreter, pipeline, sort, fine_grained_shuffle_stream_count); + if (build_window) + { + mockExecuteWindow(mock_interpreter, pipeline, window, fine_grained_shuffle_stream_count); + } pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, 8192, 0, "mock_executor_id_squashing"); }); receiver_stream = std::make_shared>(pipeline.streams, BlockInputStreams{}, concurrency, /*req_id=*/""); } - - tipb::Window window; - tipb::Sort sort; }; BENCHMARK_DEFINE_F(WindowFunctionBench, basic_row_number) @@ -85,8 +94,15 @@ try const int concurrency = state.range(0); const int source_num = state.range(1); const int total_rows = state.range(2); + const int fine_grained_shuffle_stream_count = state.range(3); + const int fine_grained_shuffle_batch_size = state.range(4); + const bool skew = state.range(5); Context context = TiFlashTestEnv::getContext(); + std::vector * blocks = &uniform_blocks; + if (skew) + blocks = &skew_blocks; + for (auto _ : state) { std::shared_ptr sender_helper; @@ -94,14 +110,58 @@ try BlockInputStreamPtr sender_stream; BlockInputStreamPtr receiver_stream; - prepareWindowStream(context, concurrency, source_num, total_rows, input_blocks, sender_stream, receiver_stream, sender_helper, receiver_helper); + prepareWindowStream(context, concurrency, source_num, total_rows, fine_grained_shuffle_stream_count, fine_grained_shuffle_batch_size, *blocks, sender_stream, receiver_stream, sender_helper, receiver_helper); runAndWait(receiver_helper, receiver_stream, sender_helper, sender_stream); } } CATCH BENCHMARK_REGISTER_F(WindowFunctionBench, basic_row_number) - ->Args({8, 1, 1024 * 1000}); + ->Args({8, 1, 1024 * 1000, 0, 4096, false}) // Test fine_grained_shuffle_stream_count. + ->Args({8, 1, 1024 * 1000, 4, 4096, false}) + ->Args({8, 1, 1024 * 1000, 8, 4096, false}) + ->Args({8, 1, 1024 * 1000, 16, 4096, false}) + ->Args({8, 1, 1024 * 1000, 32, 4096, false}) + ->Args({8, 1, 1024 * 1000, 8, 1, false}) // Test fine_grained_shuffle_batch_size. + ->Args({8, 1, 1024 * 1000, 8, 1000, false}) + ->Args({8, 1, 1024 * 1000, 8, 10000, false}) + ->Args({8, 1, 1024 * 1000, 8, 100000, false}) + ->Args({8, 1, 1024 * 1000, 0, 4096, true}) // Test skew dataset. + ->Args({8, 1, 1024 * 1000, 4, 4096, true}) + ->Args({8, 1, 1024 * 1000, 8, 4096, true}) + ->Args({8, 1, 1024 * 1000, 16, 4096, true}); + +BENCHMARK_DEFINE_F(WindowFunctionBench, partial_sort_skew_dataset) +(benchmark::State & state) +try +{ + const int concurrency = state.range(0); + const int source_num = state.range(1); + const int total_rows = state.range(2); + const int fine_grained_shuffle_stream_count = state.range(3); + const int fine_grained_shuffle_batch_size = state.range(4); + Context context = TiFlashTestEnv::getContext(); + std::vector * blocks = &skew_blocks; + + for (auto _ : state) + { + std::shared_ptr sender_helper; + std::shared_ptr receiver_helper; + BlockInputStreamPtr sender_stream; + BlockInputStreamPtr receiver_stream; + + // Only build partial sort. + prepareWindowStream(context, concurrency, source_num, total_rows, fine_grained_shuffle_stream_count, fine_grained_shuffle_batch_size, *blocks, sender_stream, receiver_stream, sender_helper, receiver_helper, /*build_window=*/false); + + runAndWait(receiver_helper, receiver_stream, sender_helper, sender_stream); + } +} +CATCH +BENCHMARK_REGISTER_F(WindowFunctionBench, partial_sort_skew_dataset) + ->Args({1, 1, 1024 * 10000, 0, 4096}) // Test how much multiple-thread improves performance for partial sort. + ->Args({2, 1, 1024 * 10000, 0, 4096}) + ->Args({4, 1, 1024 * 10000, 0, 4096}) + ->Args({8, 1, 1024 * 10000, 0, 4096}); } // namespace tests } // namespace DB diff --git a/dbms/src/Flash/tests/gtest_interpreter.cpp b/dbms/src/Flash/tests/gtest_interpreter.cpp index c583fbf35c6..53b260f9638 100644 --- a/dbms/src/Flash/tests/gtest_interpreter.cpp +++ b/dbms/src/Flash/tests/gtest_interpreter.cpp @@ -340,6 +340,78 @@ Union: } CATCH +TEST_F(InterpreterExecuteTest, FineGrainedShuffle) +try +{ + // fine-grained shuffle is enabled. + const uint64_t enable = 8; + const uint64_t disable = 0; + auto request = context + .receive("sender_1", enable) + .sort({{"s1", true}, {"s2", false}}, true, enable) + .window(RowNumber(), {"s1", true}, {"s2", false}, buildDefaultRowsFrame(), enable) + .build(context); + { + String expected = R"( +Union: + Expression x 10: + Expression: + Window: , function: {row_number}, frame: {type: Rows, boundary_begin: Current, boundary_end: Current} + Expression: + MergeSorting: , limit = 0 + PartialSorting: : limit = 0 + Expression: + MockExchangeReceiver + )"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } + + auto topn_request = context + .receive("sender_1") + .topN("s2", false, 10) + .build(context); + String topn_expected = R"( +Union: + SharedQuery x 10: + Expression: + MergeSorting, limit = 10 + Union: + PartialSorting x 10: limit = 10 + MockExchangeReceiver + )"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(topn_expected, topn_request, 10); + + // fine-grained shuffle is disabled. + request = context + .receive("sender_1", disable) + .sort({{"s1", true}, {"s2", false}}, true, disable) + .window(RowNumber(), {"s1", true}, {"s2", false}, buildDefaultRowsFrame(), disable) + .build(context); + { + String expected = R"( +Union: + Expression x 10: + SharedQuery: + Expression: + Window, function: {row_number}, frame: {type: Rows, boundary_begin: Current, boundary_end: Current} + Expression: + MergeSorting, limit = 0 + Union: + PartialSorting x 10: limit = 0 + Expression: + MockExchangeReceiver + )"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } + + topn_request = context + .receive("sender_1") + .topN("s2", false, 10) + .build(context); + ASSERT_BLOCKINPUTSTREAM_EQAUL(topn_expected, topn_request, 10); +} +CATCH + TEST_F(InterpreterExecuteTest, Join) try { @@ -586,4 +658,4 @@ CreatingSets CATCH } // namespace tests -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/TestUtils/bench_dbms_main.cpp b/dbms/src/TestUtils/bench_dbms_main.cpp index 48bd02a71f7..092c45c35e2 100644 --- a/dbms/src/TestUtils/bench_dbms_main.cpp +++ b/dbms/src/TestUtils/bench_dbms_main.cpp @@ -20,6 +20,8 @@ int main(int argc, char * argv[]) { benchmark::Initialize(&argc, argv); DB::tests::TiFlashTestEnv::setupLogger(); + // Each time TiFlashTestEnv::getContext() is called, some log will print, it's annoying. + Poco::Logger::root().setLevel("error"); DB::tests::TiFlashTestEnv::initializeGlobalContext(); if (::benchmark::ReportUnrecognizedArguments(argc, argv)) return 1; diff --git a/dbms/src/TestUtils/mockExecutor.cpp b/dbms/src/TestUtils/mockExecutor.cpp index 49ae9787ea4..30d05786c9a 100644 --- a/dbms/src/TestUtils/mockExecutor.cpp +++ b/dbms/src/TestUtils/mockExecutor.cpp @@ -119,12 +119,12 @@ DAGRequestBuilder & DAGRequestBuilder::mockTable(const MockTableName & name, con return mockTable(name.first, name.second, columns); } -DAGRequestBuilder & DAGRequestBuilder::exchangeReceiver(const MockColumnInfoVec & columns) +DAGRequestBuilder & DAGRequestBuilder::exchangeReceiver(const MockColumnInfoVec & columns, uint64_t fine_grained_shuffle_stream_count) { - return buildExchangeReceiver(columns); + return buildExchangeReceiver(columns, fine_grained_shuffle_stream_count); } -DAGRequestBuilder & DAGRequestBuilder::buildExchangeReceiver(const MockColumnInfoVec & columns) +DAGRequestBuilder & DAGRequestBuilder::buildExchangeReceiver(const MockColumnInfoVec & columns, uint64_t fine_grained_shuffle_stream_count) { DAGSchema schema; for (const auto & column : columns) @@ -135,7 +135,7 @@ DAGRequestBuilder & DAGRequestBuilder::buildExchangeReceiver(const MockColumnInf schema.push_back({column.first, info}); } - root = compileExchangeReceiver(getExecutorIndex(), schema); + root = compileExchangeReceiver(getExecutorIndex(), schema, fine_grained_shuffle_stream_count); return *this; } @@ -266,45 +266,45 @@ DAGRequestBuilder & DAGRequestBuilder::buildAggregation(ASTPtr agg_funcs, ASTPtr return *this; } -DAGRequestBuilder & DAGRequestBuilder::window(ASTPtr window_func, MockOrderByItem order_by, MockPartitionByItem partition_by, MockWindowFrame frame) +DAGRequestBuilder & DAGRequestBuilder::window(ASTPtr window_func, MockOrderByItem order_by, MockPartitionByItem partition_by, MockWindowFrame frame, uint64_t fine_grained_shuffle_stream_count) { assert(root); auto window_func_list = std::make_shared(); window_func_list->children.push_back(window_func); - root = compileWindow(root, getExecutorIndex(), window_func_list, buildOrderByItemVec({partition_by}), buildOrderByItemVec({order_by}), frame); + root = compileWindow(root, getExecutorIndex(), window_func_list, buildOrderByItemVec({partition_by}), buildOrderByItemVec({order_by}), frame, fine_grained_shuffle_stream_count); return *this; } -DAGRequestBuilder & DAGRequestBuilder::window(ASTPtr window_func, MockOrderByItemVec order_by_vec, MockPartitionByItemVec partition_by_vec, MockWindowFrame frame) +DAGRequestBuilder & DAGRequestBuilder::window(ASTPtr window_func, MockOrderByItemVec order_by_vec, MockPartitionByItemVec partition_by_vec, MockWindowFrame frame, uint64_t fine_grained_shuffle_stream_count) { assert(root); auto window_func_list = std::make_shared(); window_func_list->children.push_back(window_func); - root = compileWindow(root, getExecutorIndex(), window_func_list, buildOrderByItemVec(partition_by_vec), buildOrderByItemVec(order_by_vec), frame); + root = compileWindow(root, getExecutorIndex(), window_func_list, buildOrderByItemVec(partition_by_vec), buildOrderByItemVec(order_by_vec), frame, fine_grained_shuffle_stream_count); return *this; } -DAGRequestBuilder & DAGRequestBuilder::window(MockAstVec window_funcs, MockOrderByItemVec order_by_vec, MockPartitionByItemVec partition_by_vec, MockWindowFrame frame) +DAGRequestBuilder & DAGRequestBuilder::window(MockAstVec window_funcs, MockOrderByItemVec order_by_vec, MockPartitionByItemVec partition_by_vec, MockWindowFrame frame, uint64_t fine_grained_shuffle_stream_count) { assert(root); auto window_func_list = std::make_shared(); for (const auto & func : window_funcs) window_func_list->children.push_back(func); - root = compileWindow(root, getExecutorIndex(), window_func_list, buildOrderByItemVec(partition_by_vec), buildOrderByItemVec(order_by_vec), frame); + root = compileWindow(root, getExecutorIndex(), window_func_list, buildOrderByItemVec(partition_by_vec), buildOrderByItemVec(order_by_vec), frame, fine_grained_shuffle_stream_count); return *this; } -DAGRequestBuilder & DAGRequestBuilder::sort(MockOrderByItem order_by, bool is_partial_sort) +DAGRequestBuilder & DAGRequestBuilder::sort(MockOrderByItem order_by, bool is_partial_sort, uint64_t fine_grained_shuffle_stream_count) { assert(root); - root = compileSort(root, getExecutorIndex(), buildOrderByItemVec({order_by}), is_partial_sort); + root = compileSort(root, getExecutorIndex(), buildOrderByItemVec({order_by}), is_partial_sort, fine_grained_shuffle_stream_count); return *this; } -DAGRequestBuilder & DAGRequestBuilder::sort(MockOrderByItemVec order_by_vec, bool is_partial_sort) +DAGRequestBuilder & DAGRequestBuilder::sort(MockOrderByItemVec order_by_vec, bool is_partial_sort, uint64_t fine_grained_shuffle_stream_count) { assert(root); - root = compileSort(root, getExecutorIndex(), buildOrderByItemVec(order_by_vec), is_partial_sort); + root = compileSort(root, getExecutorIndex(), buildOrderByItemVec(order_by_vec), is_partial_sort, fine_grained_shuffle_stream_count); return *this; } @@ -368,9 +368,9 @@ DAGRequestBuilder MockDAGRequestContext::scan(String db_name, String table_name) return builder; } -DAGRequestBuilder MockDAGRequestContext::receive(String exchange_name) +DAGRequestBuilder MockDAGRequestContext::receive(String exchange_name, uint64_t fine_grained_shuffle_stream_count) { - auto builder = DAGRequestBuilder(index).exchangeReceiver(exchange_schemas[exchange_name]); + auto builder = DAGRequestBuilder(index).exchangeReceiver(exchange_schemas[exchange_name], fine_grained_shuffle_stream_count); receiver_source_task_ids_map[builder.getRoot()->name] = {}; // If don't have related columns, user must pass input columns as argument of executeStreams in order to run Executors Tests. // If user don't want to test executors, it will be safe to run Interpreter Tests. @@ -380,5 +380,4 @@ DAGRequestBuilder MockDAGRequestContext::receive(String exchange_name) } return builder; } - } // namespace DB::tests diff --git a/dbms/src/TestUtils/mockExecutor.h b/dbms/src/TestUtils/mockExecutor.h index 5f752e58da6..c034a8f86ca 100644 --- a/dbms/src/TestUtils/mockExecutor.h +++ b/dbms/src/TestUtils/mockExecutor.h @@ -66,7 +66,7 @@ class DAGRequestBuilder DAGRequestBuilder & mockTable(const String & db, const String & table, const MockColumnInfoVec & columns); DAGRequestBuilder & mockTable(const MockTableName & name, const MockColumnInfoVec & columns); - DAGRequestBuilder & exchangeReceiver(const MockColumnInfoVec & columns); + DAGRequestBuilder & exchangeReceiver(const MockColumnInfoVec & columns, uint64_t fine_grained_shuffle_stream_count = 0); DAGRequestBuilder & filter(ASTPtr filter_expr); @@ -93,16 +93,16 @@ class DAGRequestBuilder DAGRequestBuilder & aggregation(MockAstVec agg_funcs, MockAstVec group_by_exprs); // window - DAGRequestBuilder & window(ASTPtr window_func, MockOrderByItem order_by, MockPartitionByItem partition_by, MockWindowFrame frame); - DAGRequestBuilder & window(MockAstVec window_funcs, MockOrderByItemVec order_by_vec, MockPartitionByItemVec partition_by_vec, MockWindowFrame frame); - DAGRequestBuilder & window(ASTPtr window_func, MockOrderByItemVec order_by_vec, MockPartitionByItemVec partition_by_vec, MockWindowFrame frame); - DAGRequestBuilder & sort(MockOrderByItem order_by, bool is_partial_sort); - DAGRequestBuilder & sort(MockOrderByItemVec order_by_vec, bool is_partial_sort); + DAGRequestBuilder & window(ASTPtr window_func, MockOrderByItem order_by, MockPartitionByItem partition_by, MockWindowFrame frame, uint64_t fine_grained_shuffle_stream_count = 0); + DAGRequestBuilder & window(MockAstVec window_funcs, MockOrderByItemVec order_by_vec, MockPartitionByItemVec partition_by_vec, MockWindowFrame frame, uint64_t fine_grained_shuffle_stream_count = 0); + DAGRequestBuilder & window(ASTPtr window_func, MockOrderByItemVec order_by_vec, MockPartitionByItemVec partition_by_vec, MockWindowFrame frame, uint64_t fine_grained_shuffle_stream_count = 0); + DAGRequestBuilder & sort(MockOrderByItem order_by, bool is_partial_sort, uint64_t fine_grained_shuffle_stream_count = 0); + DAGRequestBuilder & sort(MockOrderByItemVec order_by_vec, bool is_partial_sort, uint64_t fine_grained_shuffle_stream_count = 0); private: void initDAGRequest(tipb::DAGRequest & dag_request); DAGRequestBuilder & buildAggregation(ASTPtr agg_funcs, ASTPtr group_by_exprs); - DAGRequestBuilder & buildExchangeReceiver(const MockColumnInfoVec & columns); + DAGRequestBuilder & buildExchangeReceiver(const MockColumnInfoVec & columns, uint64_t fine_grained_shuffle_stream_count = 0); ExecutorPtr root; DAGProperties properties; @@ -139,7 +139,7 @@ class MockDAGRequestContext std::unordered_map & executorIdColumnsMap() { return executor_id_columns_map; } DAGRequestBuilder scan(String db_name, String table_name); - DAGRequestBuilder receive(String exchange_name); + DAGRequestBuilder receive(String exchange_name, uint64_t fine_grained_shuffle_stream_count = 0); private: size_t index; diff --git a/tests/fullstack-test/mpp/window.test b/tests/fullstack-test/mpp/window.test new file mode 100644 index 00000000000..698d39ef2ea --- /dev/null +++ b/tests/fullstack-test/mpp/window.test @@ -0,0 +1,32 @@ +# Copyright 2022 PingCAP, Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +mysql> drop table if exists test.t1; +mysql> create table test.t1(c1 int, c2 int); +mysql> insert into test.t1 values(1, 1),(2, 2),(3, 3),(1, 1),(2, 2),(3, 3),(4, 4); +mysql> alter table test.t1 set tiflash replica 1; +func> wait_table test t1 +mysql> use test; set @@tidb_isolation_read_engines='tiflash'; select c1, c2, row_number() over w2, row_number() over w1 from test.t1 window w1 as(partition by c1), w2 as (partition by c1, c2) order by 1, 2, 3, 4; ++------+------+----------------------+----------------------+ +| c1 | c2 | row_number() over w2 | row_number() over w1 | ++------+------+----------------------+----------------------+ +| 1 | 1 | 1 | 1 | +| 1 | 1 | 2 | 2 | +| 2 | 2 | 1 | 1 | +| 2 | 2 | 2 | 2 | +| 3 | 3 | 1 | 1 | +| 3 | 3 | 2 | 2 | +| 4 | 4 | 1 | 1 | ++------+------+----------------------+----------------------+ +mysql> drop table if exists test.t1;