From c2acde27ce45d5cf87f6ed42564ad73556ec3bfe Mon Sep 17 00:00:00 2001 From: xufei Date: Thu, 22 Sep 2022 19:15:04 +0800 Subject: [PATCH] This is an automated cherry-pick of #5966 Signed-off-by: ti-chi-bot --- dbms/src/Common/FailPoint.cpp | 3 - dbms/src/Common/MPMCQueue.h | 9 +- dbms/src/Flash/EstablishCall.cpp | 4 + dbms/src/Flash/Mpp/GRPCSendQueue.h | 29 +++-- dbms/src/Flash/Mpp/MPPHandler.cpp | 16 +-- dbms/src/Flash/Mpp/MPPTask.cpp | 95 +++++----------- dbms/src/Flash/Mpp/MPPTask.h | 12 +-- dbms/src/Flash/Mpp/MPPTaskManager.cpp | 102 +++++------------- dbms/src/Flash/Mpp/MPPTaskManager.h | 26 +++-- dbms/src/Flash/Mpp/MPPTunnel.cpp | 100 ++++++++--------- dbms/src/Flash/Mpp/MPPTunnel.h | 37 ++++--- dbms/src/Flash/Mpp/MPPTunnelSet.cpp | 27 +---- dbms/src/Flash/Mpp/MPPTunnelSet.h | 3 +- dbms/src/Flash/Mpp/MinTSOScheduler.cpp | 2 +- .../Flash/Mpp/tests/gtest_grpc_send_queue.cpp | 34 ++++++ dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp | 75 +++++++------ tests/fullstack-test/mpp/mpp_fail.test | 16 --- 17 files changed, 254 insertions(+), 336 deletions(-) diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 4b2ffcbe167..edfd2dcfdf6 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -56,8 +56,6 @@ std::unordered_map> FailPointHelper::f M(exception_before_mpp_register_tunnel_for_root_mpp_task) \ M(exception_before_mpp_root_task_run) \ M(exception_during_mpp_root_task_run) \ - M(exception_during_mpp_write_err_to_tunnel) \ - M(exception_during_mpp_close_tunnel) \ M(exception_during_write_to_storage) \ M(force_set_sst_to_dtfile_block_size) \ M(force_set_sst_decode_rand) \ @@ -127,7 +125,6 @@ std::unordered_map> FailPointHelper::f M(random_aggregate_merge_failpoint) \ M(random_sharedquery_failpoint) \ M(random_interpreter_failpoint) \ - M(random_task_lifecycle_failpoint) \ M(random_task_manager_find_task_failure_failpoint) \ M(random_min_tso_scheduler_failpoint) diff --git a/dbms/src/Common/MPMCQueue.h b/dbms/src/Common/MPMCQueue.h index 4bd440a8315..e966cd44547 100644 --- a/dbms/src/Common/MPMCQueue.h +++ b/dbms/src/Common/MPMCQueue.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include @@ -84,12 +85,6 @@ class MPMCQueue drain(); } - void finishAndDrain() - { - finish(); - drain(); - } - // Cannot to use copy/move constructor, // because MPMCQueue maybe used by different threads. // Copy and move it is dangerous. @@ -219,7 +214,7 @@ class MPMCQueue return static_cast(write_pos - read_pos); } - std::string_view getCancelReason() const + const String & getCancelReason() const { std::unique_lock lock(mu); RUNTIME_ASSERT(isCancelled()); diff --git a/dbms/src/Flash/EstablishCall.cpp b/dbms/src/Flash/EstablishCall.cpp index 38b666dfe0b..9e4fe758a86 100644 --- a/dbms/src/Flash/EstablishCall.cpp +++ b/dbms/src/Flash/EstablishCall.cpp @@ -207,6 +207,10 @@ void EstablishCallData::trySendOneMsg() case GRPCSendQueueRes::FINISHED: writeDone("", grpc::Status::OK); return; + case GRPCSendQueueRes::CANCELLED: + RUNTIME_ASSERT(!async_tunnel_sender->getCancelReason().empty(), "Tunnel sender cancelled without reason"); + writeErr(getPacketWithError(async_tunnel_sender->getCancelReason())); + return; case GRPCSendQueueRes::EMPTY: // No new message. return; diff --git a/dbms/src/Flash/Mpp/GRPCSendQueue.h b/dbms/src/Flash/Mpp/GRPCSendQueue.h index ffe77aad837..4ed63f9531d 100644 --- a/dbms/src/Flash/Mpp/GRPCSendQueue.h +++ b/dbms/src/Flash/Mpp/GRPCSendQueue.h @@ -24,7 +24,6 @@ namespace DB { - namespace tests { class TestGRPCSendQueue; @@ -58,6 +57,7 @@ enum class GRPCSendQueueRes OK, FINISHED, EMPTY, + CANCELLED, }; /// A multi-producer-single-consumer queue dedicated to async grpc streaming send work. @@ -120,6 +120,22 @@ class GRPCSendQueue return ret; } + /// Cancel the send queue, and set the cancel reason + bool cancelWith(const String & reason) + { + auto ret = send_queue.cancelWith(reason); + if (ret) + { + kickCompletionQueue(); + } + return ret; + } + + const String & getCancelReason() const + { + return send_queue.getCancelReason(); + } + /// Pop the data from queue. /// /// Return OK if pop is done. @@ -142,6 +158,8 @@ class GRPCSendQueue return GRPCSendQueueRes::OK; case MPMCQueueResult::FINISHED: return GRPCSendQueueRes::FINISHED; + case MPMCQueueResult::CANCELLED: + return GRPCSendQueueRes::CANCELLED; case MPMCQueueResult::EMPTY: // Handle this case later. break; @@ -161,6 +179,8 @@ class GRPCSendQueue return GRPCSendQueueRes::OK; case MPMCQueueResult::FINISHED: return GRPCSendQueueRes::FINISHED; + case MPMCQueueResult::CANCELLED: + return GRPCSendQueueRes::CANCELLED; case MPMCQueueResult::EMPTY: { // If empty, change status to WAITING. @@ -186,13 +206,6 @@ class GRPCSendQueue return ret; } - /// Finish and drain the queue. - void finishAndDrain() - { - send_queue.finishAndDrain(); - kickCompletionQueue(); - } - private: friend class tests::TestGRPCSendQueue; diff --git a/dbms/src/Flash/Mpp/MPPHandler.cpp b/dbms/src/Flash/Mpp/MPPHandler.cpp index 7f97a1dd698..c724a2c0c9c 100644 --- a/dbms/src/Flash/Mpp/MPPHandler.cpp +++ b/dbms/src/Flash/Mpp/MPPHandler.cpp @@ -14,7 +14,6 @@ #include #include -#include #include #include @@ -28,14 +27,17 @@ extern const char exception_before_mpp_root_task_run[]; void MPPHandler::handleError(const MPPTaskPtr & task, String error) { - try + if (task) { - if (task) + try + { task->handleError(error); - } - catch (...) - { - tryLogCurrentException(log, "Fail to handle error and clean task"); + } + catch (...) + { + tryLogCurrentException(log, "Fail to handle error and clean task"); + } + task->unregisterTask(); } } // execute is responsible for making plan , register tasks and tunnels and start the running thread. diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index f859c7eba1f..319a3344aa3 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -21,13 +21,10 @@ #include #include #include -#include #include #include #include -#include #include -#include #include #include #include @@ -50,15 +47,14 @@ extern const char exception_before_mpp_register_root_mpp_task[]; extern const char exception_before_mpp_register_tunnel_for_non_root_mpp_task[]; extern const char exception_before_mpp_register_tunnel_for_root_mpp_task[]; extern const char exception_during_mpp_register_tunnel_for_non_root_mpp_task[]; -extern const char exception_during_mpp_write_err_to_tunnel[]; extern const char force_no_local_region_for_mpp_task[]; -extern const char random_task_lifecycle_failpoint[]; } // namespace FailPoints MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_) : context(context_) , meta(meta_) , id(meta.start_ts(), meta.task_id()) + , manager(context_->getTMTContext().getMPPTaskManager().get()) , log(Logger::get("MPPTask", id.toString())) , mpp_task_statistics(id, meta.address()) , needed_threads(0) @@ -71,28 +67,25 @@ MPPTask::~MPPTask() /// to current_memory_tracker in the destructor if (current_memory_tracker != memory_tracker) current_memory_tracker = memory_tracker; - closeAllTunnels(""); + abortTunnels("", true); if (schedule_state == ScheduleState::SCHEDULED) { /// the threads of this task are not fully freed now, since the BlockIO and DAGContext are not destructed /// TODO: finish all threads before here, except the current one. - manager.load()->releaseThreadsFromScheduler(needed_threads); + manager->releaseThreadsFromScheduler(needed_threads); schedule_state = ScheduleState::COMPLETED; } LOG_FMT_DEBUG(log, "finish MPPTask: {}", id.toString()); } -void MPPTask::abortTunnels(const String & message, AbortType abort_type) +void MPPTask::abortTunnels(const String & message, bool wait_sender_finish) { - if (abort_type == AbortType::ONCANCELLATION) { - closeAllTunnels(message); - } - else - { - RUNTIME_ASSERT(tunnel_set != nullptr, log, "mpp task without tunnel set"); - tunnel_set->writeError(message); + std::unique_lock lock(tunnel_and_receiver_mu); + if (unlikely(tunnel_set == nullptr)) + return; } + tunnel_set->close(message, wait_sender_finish); } void MPPTask::abortReceivers() @@ -112,16 +105,6 @@ void MPPTask::abortDataStreams(AbortType abort_type) context->getProcessList().sendCancelToQuery(context->getCurrentQueryId(), context->getClientInfo().current_user, is_kill); } -void MPPTask::closeAllTunnels(const String & reason) -{ - { - std::unique_lock lock(tunnel_and_receiver_mu); - if (unlikely(tunnel_set == nullptr)) - return; - } - tunnel_set->close(reason); -} - void MPPTask::finishWrite() { RUNTIME_ASSERT(tunnel_set != nullptr, log, "mpp task without tunnel set"); @@ -208,12 +191,13 @@ void MPPTask::initExchangeReceivers() std::pair MPPTask::getTunnel(const ::mpp::EstablishMPPConnectionRequest * request) { - if (status == CANCELLED) + if (status == CANCELLED || status == FAILED) { auto err_msg = fmt::format( - "can't find tunnel ({} + {}) because the task is cancelled", + "can't find tunnel ({} + {}) because the task is aborted, error message = {}", request->sender_meta().task_id(), - request->receiver_meta().task_id()); + request->receiver_meta().task_id(), + err_string); return {nullptr, err_msg}; } @@ -233,19 +217,11 @@ std::pair MPPTask::getTunnel(const ::mpp::EstablishMPPConn void MPPTask::unregisterTask() { - auto * manager_ptr = manager.load(); - if (manager_ptr != nullptr) - { - auto [result, reason] = manager_ptr->unregisterTask(this); - if (result) - LOG_FMT_DEBUG(log, "task unregistered"); - else - LOG_FMT_WARNING(log, "task failed to unregister, reason: {}", reason); - } + auto [result, reason] = manager->unregisterTask(id); + if (result) + LOG_FMT_DEBUG(log, "task unregistered"); else - { - LOG_ERROR(log, "task manager is unset"); - } + LOG_FMT_WARNING(log, "task failed to unregister, reason: {}", reason); } void MPPTask::prepare(const mpp::DispatchTaskRequest & task_request) @@ -363,6 +339,7 @@ void MPPTask::runImpl() if (!switchStatus(INITIALIZING, RUNNING)) { LOG_WARNING(log, "task not in initializing state, skip running"); + unregisterTask(); return; } Stopwatch stopwatch; @@ -453,36 +430,19 @@ void MPPTask::runImpl() } } } - LOG_FMT_INFO(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds()); - // unregister flag is only for FailPoint usage, to produce the situation that MPPTask is destructed - // by grpc CancelMPPTask thread; - bool unregister = true; - fiu_do_on(FailPoints::random_task_lifecycle_failpoint, { - if (!err_msg.empty()) - unregister = false; - }); - if (unregister) - unregisterTask(); - mpp_task_statistics.end(status.load(), err_string); mpp_task_statistics.logTracingJson(); + + LOG_FMT_INFO(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds()); + unregisterTask(); } void MPPTask::handleError(const String & error_msg) { - auto * manager_ptr = manager.load(); - if (manager_ptr != nullptr) - { - auto updated_msg = fmt::format("From {}: {}", id.toString(), error_msg); - /// task already been registered in TaskManager, use task manager to abort the whole query in this node - newThreadManager()->scheduleThenDetach(true, "MPPTaskManagerAbortQuery", [manager_ptr, updated_msg, query_id = id.start_ts] { - CPUAffinityManager::getInstance().bindSelfQueryThread(); - manager_ptr->abortMPPQuery(query_id, updated_msg, AbortType::ONERROR); - }); - /// need to wait until query abort starts, otherwise, the error may be lost - manager_ptr->waitUntilQueryStartsAbort(id.start_ts); - } - else + auto updated_msg = fmt::format("From {}: {}", id.toString(), error_msg); + manager->abortMPPQuery(id.start_ts, updated_msg, AbortType::ONERROR); + if (!registered) + // if the task is not registered, need to cancel it explicitly abort(error_msg, AbortType::ONERROR); } @@ -513,8 +473,7 @@ void MPPTask::abort(const String & message, AbortType abort_type) err_string = message; /// if the task is in initializing state, mpp task can return error to TiDB directly, /// so just close all tunnels here - closeAllTunnels(""); - unregisterTask(); + abortTunnels("", false); LOG_WARNING(log, "Finish abort task from uninitialized"); return; } @@ -524,7 +483,7 @@ void MPPTask::abort(const String & message, AbortType abort_type) /// first, the top components may see an error caused by the abort, which is not /// the original error err_string = message; - abortTunnels(message, abort_type); + abortTunnels(message, false); abortDataStreams(abort_type); abortReceivers(); scheduleThisTask(ScheduleState::FAILED); @@ -542,7 +501,7 @@ bool MPPTask::switchStatus(TaskStatus from, TaskStatus to) void MPPTask::scheduleOrWait() { - if (!manager.load()->tryToScheduleTask(shared_from_this())) + if (!manager->tryToScheduleTask(shared_from_this())) { LOG_FMT_INFO(log, "task waits for schedule"); Stopwatch stopwatch; diff --git a/dbms/src/Flash/Mpp/MPPTask.h b/dbms/src/Flash/Mpp/MPPTask.h index 863e815e02b..db248cfde5d 100644 --- a/dbms/src/Flash/Mpp/MPPTask.h +++ b/dbms/src/Flash/Mpp/MPPTask.h @@ -96,13 +96,10 @@ class MPPTask : public std::enable_shared_from_this void unregisterTask(); - /// Similar to `writeErrToAllTunnels`, but it just try to write the error message to tunnel - /// without waiting the tunnel to be connected - void closeAllTunnels(const String & reason); - + // abort the mpp task, note this function should be non-blocking, it just set some flags void abort(const String & message, AbortType abort_type); - void abortTunnels(const String & message, AbortType abort_type); + void abortTunnels(const String & message, bool wait_sender_finish); void abortReceivers(); void abortDataStreams(AbortType abort_type); @@ -143,14 +140,15 @@ class MPPTask : public std::enable_shared_from_this int new_thread_count_of_exchange_receiver = 0; - std::atomic manager = nullptr; + MPPTaskManager * manager; + std::atomic registered{false}; const LoggerPtr log; MPPTaskStatistics mpp_task_statistics; friend class MPPTaskManager; - friend class MPPTaskCancelHelper; + friend class MPPHandler; int needed_threads; diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.cpp b/dbms/src/Flash/Mpp/MPPTaskManager.cpp index 6d5c51a198b..a3ff83adffc 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.cpp +++ b/dbms/src/Flash/Mpp/MPPTaskManager.cpp @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include #include #include @@ -49,7 +48,7 @@ std::pair MPPTaskManager::findTunnelWithTimeout(const ::mp { return false; } - else if (query_it->second->to_be_aborted) + else if (!query_it->second->isInNormalState()) { /// if the query is aborted, return true to stop waiting timeout. LOG_WARNING(log, fmt::format("Query {} is aborted, all its tasks are invalid.", id.start_ts)); @@ -72,25 +71,6 @@ std::pair MPPTaskManager::findTunnelWithTimeout(const ::mp return it->second->getTunnel(request); } -class MPPTaskCancelHelper -{ -public: - MPPTaskPtr task; - String reason; - AbortType abort_type; - MPPTaskCancelHelper(MPPTaskPtr && task_, const String & reason_, AbortType abort_type_) - : task(std::move(task_)) - , reason(reason_) - , abort_type(abort_type_) - {} - DISALLOW_COPY_AND_MOVE(MPPTaskCancelHelper); - void run() const - { - CPUAffinityManager::getInstance().bindSelfQueryThread(); - task->abort(reason, abort_type); - } -}; - void MPPTaskManager::abortMPPQuery(UInt64 query_id, const String & reason, AbortType abort_type) { LOG_WARNING(log, fmt::format("Begin to abort query: {}, abort type: {}, reason: {}", query_id, magic_enum::enum_name(abort_type), reason)); @@ -106,53 +86,38 @@ void MPPTaskManager::abortMPPQuery(UInt64 query_id, const String & reason, Abort LOG_WARNING(log, fmt::format("{} does not found in task manager, skip abort", query_id)); return; } - else if (it->second->to_be_aborted) + else if (!it->second->isInNormalState()) { LOG_WARNING(log, fmt::format("{} already in abort process, skip abort", query_id)); return; } - it->second->to_be_aborted = true; + it->second->state = MPPQueryTaskSet::Aborting; it->second->error_message = reason; task_set = it->second; scheduler->deleteQuery(query_id, *this, true); cv.notify_all(); } + FmtBuffer fmt_buf; fmt_buf.fmtAppend("Remaining task in query {} are: ", query_id); - // TODO: abort tasks in order rather than issuing so many threads to cancel tasks - auto thread_manager = newThreadManager(); - try - { - for (auto it = task_set->task_map.begin(); it != task_set->task_map.end();) - { - fmt_buf.fmtAppend("{} ", it->first.toString()); - auto current_task = it->second; - it = task_set->task_map.erase(it); - // Note it is not acceptable to destruct `current_task` inside the loop, because destruct a mpp task before all - // other mpp tasks are cancelled may cause some deadlock issues, so `current_task` has to be moved to cancel thread. - // At first, we use std::move to move `current_task` to lambda like this: - // thread_manager->schedule(false, "CancelMPPTask", [task = std::move(current_task), &reason] { task->cancel(reason); }); - // However, due to SOO in llvm(https://github.com/llvm/llvm-project/issues/32472), there is still a copy of `current_task` - // remaining in the current scope, as a workaround we add a wrap(MPPTaskCancelHelper) here to make sure `current_task` - // can be moved to cancel thread. - thread_manager->schedule(false, "AbortMPPTask", [helper = new MPPTaskCancelHelper(std::move(current_task), reason, abort_type)] { - std::unique_ptr(helper)->run(); - }); - } - } - catch (...) - { - thread_manager->wait(); - throw; - } + for (auto & it : task_set->task_map) + fmt_buf.fmtAppend("{} ", it.first.toString()); LOG_WARNING(log, fmt_buf.toString()); - thread_manager->wait(); + + for (auto & it : task_set->task_map) + it.second->abort(reason, abort_type); + { std::lock_guard lock(mu); auto it = mpp_query_map.find(query_id); +<<<<<<< HEAD /// just to double check the query still exists if (it != mpp_query_map.end()) mpp_query_map.erase(it); +======= + RUNTIME_ASSERT(it != mpp_query_map.end(), log, "MPPTaskQuerySet {} should remaining in MPPTaskManager", query_id); + it->second->state = MPPQueryTaskSet::Aborted; +>>>>>>> 988cde9cfa (Do not use extra threads when cancel mpp query (#5966)) cv.notify_all(); } LOG_WARNING(log, "Finish abort query: " + std::to_string(query_id)); @@ -162,9 +127,9 @@ std::pair MPPTaskManager::registerTask(MPPTaskPtr task) { std::unique_lock lock(mu); const auto & it = mpp_query_map.find(task->id.start_ts); - if (it != mpp_query_map.end() && it->second->to_be_aborted) + if (it != mpp_query_map.end() && !it->second->isInNormalState()) { - return {false, "query is being aborted"}; + return {false, fmt::format("query is being aborted, error message = {}", it->second->error_message)}; } if (it != mpp_query_map.end() && it->second->task_map.find(task->id) != it->second->task_map.end()) { @@ -180,52 +145,37 @@ std::pair MPPTaskManager::registerTask(MPPTaskPtr task) { mpp_query_map[task->id.start_ts]->task_map.emplace(task->id, task); } - task->manager = this; + task->registered = true; cv.notify_all(); return {true, ""}; } -void MPPTaskManager::waitUntilQueryStartsAbort(UInt64 query_id) +std::pair MPPTaskManager::unregisterTask(const MPPTaskId & id) { std::unique_lock lock(mu); + auto it = mpp_query_map.end(); cv.wait(lock, [&] { - auto query_it = mpp_query_map.find(query_id); - if (query_it == mpp_query_map.end()) - { - // query already aborted - return true; - } - else if (query_it->second->to_be_aborted) - { - return true; - } - return false; + it = mpp_query_map.find(id.start_ts); + return it == mpp_query_map.end() || it->second->allowUnregisterTask(); }); -} - -std::pair MPPTaskManager::unregisterTask(MPPTask * task) -{ - std::unique_lock lock(mu); - auto it = mpp_query_map.find(task->id.start_ts); if (it != mpp_query_map.end()) { - if (it->second->to_be_aborted) - return {false, "query is being aborted"}; - auto task_it = it->second->task_map.find(task->id); + auto task_it = it->second->task_map.find(id); if (task_it != it->second->task_map.end()) { it->second->task_map.erase(task_it); if (it->second->task_map.empty()) { /// remove query task map if the task is the last one - scheduler->deleteQuery(task->id.start_ts, *this, false); + scheduler->deleteQuery(id.start_ts, *this, false); mpp_query_map.erase(it); } cv.notify_all(); return {true, ""}; } } - return {false, "task can not be found"}; + cv.notify_all(); + return {false, "task can not be found, maybe not registered yet"}; } String MPPTaskManager::toString() diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.h b/dbms/src/Flash/Mpp/MPPTaskManager.h index 96ae86a0ec0..29409c398b9 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.h +++ b/dbms/src/Flash/Mpp/MPPTaskManager.h @@ -27,16 +27,26 @@ namespace DB { struct MPPQueryTaskSet { - /// to_be_aborted is kind of lock, if to_be_aborted is set - /// to true, then task_map can only be accessed by query cancel - /// thread, which means no task can register/un-register for the - /// query, here we do not need mutex because all the write/read - /// to MPPQueryTaskSet is protected by the mutex in MPPTaskManager - bool to_be_aborted = false; + enum State + { + Normal, + Aborting, + Aborted, + }; + /// task can only be registered state is Normal + State state = Normal; String error_message; MPPTaskMap task_map; /// only used in scheduler std::queue waiting_tasks; + bool isInNormalState() const + { + return state == Normal; + } + bool allowUnregisterTask() const + { + return state == Normal || state == Aborted; + } }; using MPPQueryTaskSetPtr = std::shared_ptr; @@ -70,9 +80,7 @@ class MPPTaskManager : private boost::noncopyable std::pair registerTask(MPPTaskPtr task); - std::pair unregisterTask(MPPTask * task); - - void waitUntilQueryStartsAbort(UInt64 query_id); + std::pair unregisterTask(const MPPTaskId & id); bool tryToScheduleTask(const MPPTaskPtr & task); diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index 2d872d2e58b..2c75a29e8b4 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -24,7 +24,6 @@ namespace DB { namespace FailPoints { -extern const char exception_during_mpp_close_tunnel[]; extern const char random_tunnel_wait_timeout_failpoint[]; } // namespace FailPoints @@ -89,7 +88,7 @@ MPPTunnel::~MPPTunnel() }); try { - close(""); + close("", true); } catch (...) { @@ -98,23 +97,9 @@ MPPTunnel::~MPPTunnel() LOG_FMT_TRACE(log, "destructed tunnel obj!"); } -void MPPTunnel::finishSendQueue(bool drain) -{ - if (tunnel_sender) - { - if (drain) - tunnel_sender->finishAndDrain(); - else - tunnel_sender->finish(); - } -} - /// exit abnormally, such as being cancelled. -void MPPTunnel::close(const String & reason) +void MPPTunnel::close(const String & reason, bool wait_sender_finish) { - SCOPE_EXIT({ - finishSendQueue(true); // drain the send_queue when close, to release useless memory - }); { std::unique_lock lk(mu); switch (status) @@ -124,59 +109,46 @@ void MPPTunnel::close(const String & reason) cv_for_status_changed.notify_all(); return; case TunnelStatus::Connected: + case TunnelStatus::WaitingForSenderFinish: { if (!reason.empty()) { - try - { - FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_mpp_close_tunnel); - tunnel_sender->push(std::make_shared(getPacketWithError(reason), getMemTracker())); - } - catch (...) - { - tryLogCurrentException(log, "Failed to close tunnel: " + tunnel_id); - } + tunnel_sender->cancelWith(reason); + } + else + { + tunnel_sender->finish(); } - finishSendQueue(); break; } - case TunnelStatus::WaitingForSenderFinish: - break; case TunnelStatus::Finished: return; default: RUNTIME_ASSERT(false, log, "Unsupported tunnel status: {}", status); } } - waitForSenderFinish(/*allow_throw=*/false); + if (wait_sender_finish) + waitForSenderFinish(false); } // TODO: consider to hold a buffer -void MPPTunnel::write(const mpp::MPPDataPacket & data, bool close_after_write) +void MPPTunnel::write(const mpp::MPPDataPacket & data) { LOG_FMT_TRACE(log, "ready to write"); { - { - std::unique_lock lk(mu); - waitUntilConnectedOrFinished(lk); - if (status == TunnelStatus::Finished) - throw Exception(fmt::format("write to tunnel which is already closed,{}", tunnel_sender ? tunnel_sender->getConsumerFinishMsg() : "")); - } + std::unique_lock lk(mu); + waitUntilConnectedOrFinished(lk); + if (tunnel_sender == nullptr) + throw Exception(fmt::format("write to tunnel which is already closed.")); + } - if (tunnel_sender->push(std::make_shared(data, getMemTracker()))) - { - connection_profile_info.bytes += data.ByteSizeLong(); - connection_profile_info.packets += 1; - if (close_after_write) - { - finishSendQueue(); - LOG_FMT_TRACE(log, "finish write."); - } - return; - } + if (tunnel_sender->push(std::make_shared(data, getMemTracker()))) + { + connection_profile_info.bytes += data.ByteSizeLong(); + connection_profile_info.packets += 1; + return; } - // push failed, wait consumer for the final state - waitForSenderFinish(/*allow_throw=*/true); + throw Exception(fmt::format("write to tunnel which is already closed,{}", tunnel_sender->isConsumerFinished() ? tunnel_sender->getConsumerFinishMsg() : "")); } /// done normally and being called exactly once after writing all packets @@ -185,12 +157,12 @@ void MPPTunnel::writeDone() LOG_FMT_TRACE(log, "ready to finish, is_local: {}", mode == TunnelSenderMode::LOCAL); { std::unique_lock lk(mu); - if (status == TunnelStatus::Finished) - throw Exception(fmt::format("write to tunnel which is already closed,{}", tunnel_sender ? tunnel_sender->getConsumerFinishMsg() : "")); /// make sure to finish the tunnel after it is connected waitUntilConnectedOrFinished(lk); - finishSendQueue(); + if (tunnel_sender == nullptr) + throw Exception(fmt::format("write to tunnel which is already closed.")); } + tunnel_sender->finish(); waitForSenderFinish(/*allow_throw=*/true); } @@ -367,6 +339,15 @@ void SyncTunnelSender::sendJob(PacketWriter * writer) break; } } + /// write the last error packet if needed + if (send_queue.getStatus() == MPMCQueueStatus::CANCELLED) + { + RUNTIME_ASSERT(!send_queue.getCancelReason().empty(), "Tunnel sender cancelled without reason"); + if (!writer->write(getPacketWithError(send_queue.getCancelReason()))) + { + err_msg = "grpc writes failed."; + } + } } catch (...) { @@ -393,12 +374,23 @@ void SyncTunnelSender::startSendThread(PacketWriter * writer) std::shared_ptr LocalTunnelSender::readForLocal() { TrackedMppDataPacketPtr res; - if (send_queue.pop(res) == MPMCQueueResult::OK) + auto result = send_queue.pop(res); + if (result == MPMCQueueResult::OK) { // switch tunnel's memory tracker into receiver's res->switchMemTracker(current_memory_tracker); return res; } + else if (result == MPMCQueueResult::CANCELLED) + { + RUNTIME_ASSERT(!send_queue.getCancelReason().empty(), "Tunnel sender cancelled without reason"); + if (!cancel_reason_sent) + { + cancel_reason_sent = true; + res = std::make_shared(getPacketWithError(send_queue.getCancelReason()), current_memory_tracker); + return res; + } + } consumerFinish(""); return nullptr; } diff --git a/dbms/src/Flash/Mpp/MPPTunnel.h b/dbms/src/Flash/Mpp/MPPTunnel.h index 0df38adca52..5ed062e00c9 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.h +++ b/dbms/src/Flash/Mpp/MPPTunnel.h @@ -78,14 +78,14 @@ class TunnelSender : private boost::noncopyable return send_queue.push(data) == MPMCQueueResult::OK; } - virtual bool finish() + virtual void cancelWith(const String & reason) { - return send_queue.finish(); + send_queue.cancelWith(reason); } - virtual void finishAndDrain() + virtual bool finish() { - send_queue.finishAndDrain(); + return send_queue.finish(); } void consumerFinish(const String & err_msg); @@ -180,9 +180,14 @@ class AsyncTunnelSender : public TunnelSender return queue.finish(); } - void finishAndDrain() override + void cancelWith(const String & reason) override { - queue.finishAndDrain(); + queue.cancelWith(reason); + } + + const String & getCancelReason() const + { + return queue.getCancelReason(); } GRPCSendQueueRes pop(TrackedMppDataPacketPtr & data, void * new_tag) @@ -202,6 +207,9 @@ class LocalTunnelSender : public TunnelSender using Base = TunnelSender; using Base::Base; TrackedMppDataPacketPtr readForLocal(); + +private: + bool cancel_reason_sent = false; }; using TunnelSenderPtr = std::shared_ptr; @@ -260,15 +268,17 @@ class MPPTunnel : private boost::noncopyable const String & id() const { return tunnel_id; } - // write a single packet to the tunnel, it will block if tunnel is not ready. - void write(const mpp::MPPDataPacket & data, bool close_after_write = false); + // write a single packet to the tunnel's send queue, it will block if tunnel is not ready. + void write(const mpp::MPPDataPacket & data); - // finish the writing. + // finish the writing, and wait until the sender finishes. void writeDone(); - /// close() finishes the tunnel, if the tunnel is connected already, it will - /// write the error message to the tunnel, otherwise it just close the tunnel - void close(const String & reason); + /// close() cancel the tunnel's send queue with `reason`, if reason is not empty, the tunnel sender will + /// write this reason as an error message to its receiver. If `wait_sender_finish` is true, close() will + /// not return until tunnel sender finishes, otherwise, close() will return just after the send queue is + /// cancelled(which is a non-blocking operation) + void close(const String & reason, bool wait_sender_finish); // a MPPConn request has arrived. it will build connection by this tunnel; void connect(PacketWriter * writer); @@ -300,12 +310,11 @@ class MPPTunnel : private boost::noncopyable { Unconnected, // Not connect to any writer, not able to accept new data Connected, // Connected to some writer, accepting data - WaitingForSenderFinish, // Accepting all data already, wait for sender to finish + WaitingForSenderFinish, // Wait for sender to finish Finished // Final state, no more work to do }; StringRef statusToString(); - void finishSendQueue(bool drain = false); void waitUntilConnectedOrFinished(std::unique_lock & lk); diff --git a/dbms/src/Flash/Mpp/MPPTunnelSet.cpp b/dbms/src/Flash/Mpp/MPPTunnelSet.cpp index e6610d4d7b8..e4ce59f4496 100644 --- a/dbms/src/Flash/Mpp/MPPTunnelSet.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnelSet.cpp @@ -21,13 +21,8 @@ namespace DB { -namespace FailPoints -{ -extern const char exception_during_mpp_write_err_to_tunnel[]; -} // namespace FailPoints namespace { - void checkPacketSize(size_t size) { static constexpr size_t max_packet_size = 1u << 31; @@ -118,24 +113,6 @@ void MPPTunnelSetBase::write(mpp::MPPDataPacket & packet, int16_t partit tunnels[partition_id]->write(packet); } -template -void MPPTunnelSetBase::writeError(const String & msg) -{ - for (auto & tunnel : tunnels) - { - try - { - FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_during_mpp_write_err_to_tunnel); - tunnel->write(getPacketWithError(msg), true); - } - catch (...) - { - tunnel->close(fmt::format("Failed to write error msg to tunnel, error message: {}", msg)); - tryLogCurrentException(log, "Failed to write error " + msg + " to tunnel: " + tunnel->id()); - } - } -} - template void MPPTunnelSetBase::registerTunnel(const MPPTaskId & receiver_task_id, const TunnelPtr & tunnel) { @@ -151,10 +128,10 @@ void MPPTunnelSetBase::registerTunnel(const MPPTaskId & receiver_task_id } template -void MPPTunnelSetBase::close(const String & reason) +void MPPTunnelSetBase::close(const String & reason, bool wait_sender_finish) { for (auto & tunnel : tunnels) - tunnel->close(reason); + tunnel->close(reason, wait_sender_finish); } template diff --git a/dbms/src/Flash/Mpp/MPPTunnelSet.h b/dbms/src/Flash/Mpp/MPPTunnelSet.h index da37423876e..bbda0f18249 100644 --- a/dbms/src/Flash/Mpp/MPPTunnelSet.h +++ b/dbms/src/Flash/Mpp/MPPTunnelSet.h @@ -54,8 +54,7 @@ class MPPTunnelSetBase : private boost::noncopyable // this is a partition writing. void write(tipb::SelectResponse & response, int16_t partition_id); void write(mpp::MPPDataPacket & packet, int16_t partition_id); - void writeError(const String & msg); - void close(const String & reason); + void close(const String & reason, bool wait_sender_finish); void finishWrite(); void registerTunnel(const MPPTaskId & id, const TunnelPtr & tunnel); void updateMemTracker(); diff --git a/dbms/src/Flash/Mpp/MinTSOScheduler.cpp b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp index 6f6be2a4f35..da4585d89db 100644 --- a/dbms/src/Flash/Mpp/MinTSOScheduler.cpp +++ b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp @@ -73,7 +73,7 @@ bool MinTSOScheduler::tryToSchedule(const MPPTaskPtr & task, MPPTaskManager & ta } const auto & id = task->getId(); auto query_task_set = task_manager.getQueryTaskSetWithoutLock(id.start_ts); - if (nullptr == query_task_set || query_task_set->to_be_aborted) + if (nullptr == query_task_set || !query_task_set->isInNormalState()) { LOG_FMT_WARNING(log, "{} is scheduled with miss or abort.", id.toString()); return true; diff --git a/dbms/src/Flash/Mpp/tests/gtest_grpc_send_queue.cpp b/dbms/src/Flash/Mpp/tests/gtest_grpc_send_queue.cpp index e5dabda5daf..73a63136ffc 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_grpc_send_queue.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_grpc_send_queue.cpp @@ -149,5 +149,39 @@ try } CATCH +TEST_F(TestGRPCSendQueue, SequentialPopAfterCancel) +try +{ + int p1; + int data; + + GTEST_ASSERT_EQ(queue.push(1), true); + checkTagInQueue(nullptr); + checkTag(nullptr); + + GTEST_ASSERT_EQ(queue.push(2), true); + checkTagInQueue(nullptr); + checkTag(nullptr); + + GTEST_ASSERT_EQ(queue.push(3), true); + checkTagInQueue(nullptr); + checkTag(nullptr); + + GTEST_ASSERT_EQ(queue.pop(data, &p1), GRPCSendQueueRes::OK); + GTEST_ASSERT_EQ(data, 1); + checkTagInQueue(nullptr); + checkTag(nullptr); + + // Cancel the queue + GTEST_ASSERT_EQ(queue.cancelWith("cancel test"), true); + + GTEST_ASSERT_EQ(queue.pop(data, &p1), GRPCSendQueueRes::CANCELLED); + checkTagInQueue(nullptr); + checkTag(nullptr); + + GTEST_ASSERT_EQ(queue.getCancelReason(), "cancel test"); +} +CATCH + } // namespace tests } // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp index 529a126083a..c97b9b39e6f 100644 --- a/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp +++ b/dbms/src/Flash/Mpp/tests/gtest_mpptunnel.cpp @@ -31,7 +31,7 @@ class MockWriter : public PacketWriter { bool write(const mpp::MPPDataPacket & packet) override { - write_packet_vec.push_back(packet.data()); + write_packet_vec.push_back(packet.data().empty() ? packet.error().msg() : packet.data()); return true; } @@ -82,7 +82,7 @@ struct MockLocalReader bool success = tmp_packet != nullptr; if (success) { - write_packet_vec.push_back(tmp_packet->packet.data()); + write_packet_vec.push_back(tmp_packet->packet.data().empty() ? tmp_packet->packet.error().msg() : tmp_packet->packet.data()); } else { @@ -174,6 +174,16 @@ class MockAsyncCallData : public IAsyncCallData case GRPCSendQueueRes::FINISHED: async_tunnel_sender->consumerFinish(""); return; + case GRPCSendQueueRes::CANCELLED: + assert(!async_tunnel_sender->getCancelReason().empty()); + if (write_failed) + { + async_tunnel_sender->consumerFinish(fmt::format("{} meet error: {}.", async_tunnel_sender->getTunnelId(), async_tunnel_sender->getCancelReason())); + return; + } + write_packet_vec.push_back(async_tunnel_sender->getCancelReason()); + async_tunnel_sender->consumerFinish(""); + return; case GRPCSendQueueRes::EMPTY: std::unique_lock lock(mu); cv.wait(lock, [&] { @@ -287,7 +297,7 @@ TEST_F(TestMPPTunnel, CloseBeforeConnect) try { auto mpp_tunnel_ptr = constructRemoteSyncTunnel(); - mpp_tunnel_ptr->close("Canceled"); + mpp_tunnel_ptr->close("Canceled", false); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), false); } @@ -297,9 +307,9 @@ TEST_F(TestMPPTunnel, CloseAfterClose) try { auto mpp_tunnel_ptr = constructRemoteSyncTunnel(); - mpp_tunnel_ptr->close("Canceled"); + mpp_tunnel_ptr->close("Canceled", false); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); - mpp_tunnel_ptr->close("Canceled"); + mpp_tunnel_ptr->close("Canceled", false); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); } CATCH @@ -317,7 +327,7 @@ TEST_F(TestMPPTunnel, WriteAfterUnconnectFinished) } catch (Exception & e) { - GTEST_ASSERT_EQ(e.message(), "write to tunnel which is already closed,"); + GTEST_ASSERT_EQ(e.message(), "write to tunnel which is already closed."); } } @@ -332,7 +342,7 @@ TEST_F(TestMPPTunnel, WriteDoneAfterUnconnectFinished) } catch (Exception & e) { - GTEST_ASSERT_EQ(e.message(), "write to tunnel which is already closed,"); + GTEST_ASSERT_EQ(e.message(), "write to tunnel which is already closed."); } } @@ -346,27 +356,13 @@ try std::unique_ptr data_packet_ptr = std::make_unique(); data_packet_ptr->set_data("First"); mpp_tunnel_ptr->write(*data_packet_ptr); - mpp_tunnel_ptr->close("Cancel"); + mpp_tunnel_ptr->close("Cancel", true); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); - GTEST_ASSERT_EQ(dynamic_cast(writer_ptr.get())->write_packet_vec.size(), 2); //Second for err msg - GTEST_ASSERT_EQ(dynamic_cast(writer_ptr.get())->write_packet_vec[0], "First"); -} -CATCH - -TEST_F(TestMPPTunnel, ConnectWriteWithCloseFlag) -try -{ - auto mpp_tunnel_ptr = constructRemoteSyncTunnel(); - std::unique_ptr writer_ptr = std::make_unique(); - mpp_tunnel_ptr->connect(writer_ptr.get()); - GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); - std::unique_ptr data_packet_ptr = std::make_unique(); - data_packet_ptr->set_data("First"); - mpp_tunnel_ptr->write(*data_packet_ptr, true); - mpp_tunnel_ptr->waitForFinish(); - GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); - GTEST_ASSERT_EQ(dynamic_cast(writer_ptr.get())->write_packet_vec.size(), 1); - GTEST_ASSERT_EQ(dynamic_cast(writer_ptr.get())->write_packet_vec[0], "First"); + auto result_size = dynamic_cast(writer_ptr.get())->write_packet_vec.size(); + // close will cancel the MPMCQueue, so there is no guarantee that all the message will be consumed, only the last error packet + // must to be consumed + GTEST_ASSERT_EQ(result_size >= 1 && result_size <= 2, true); + GTEST_ASSERT_EQ(dynamic_cast(writer_ptr.get())->write_packet_vec[result_size - 1], "Cancel"); } CATCH @@ -434,7 +430,7 @@ TEST_F(TestMPPTunnel, WriteAfterFinished) std::unique_ptr writer_ptr = std::make_unique(); mpp_tunnel_ptr->connect(writer_ptr.get()); GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); - mpp_tunnel_ptr->close("Canceled"); + mpp_tunnel_ptr->close("Canceled", false); auto data_packet_ptr = std::make_unique(); data_packet_ptr->set_data("First"); mpp_tunnel_ptr->write(*data_packet_ptr); @@ -481,7 +477,7 @@ TEST_F(TestMPPTunnel, LocalCloseBeforeConnect) try { auto mpp_tunnel_ptr = constructLocalSyncTunnel(); - mpp_tunnel_ptr->close("Canceled"); + mpp_tunnel_ptr->close("Canceled", false); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), false); } @@ -491,9 +487,9 @@ TEST_F(TestMPPTunnel, LocalCloseAfterClose) try { auto mpp_tunnel_ptr = constructLocalSyncTunnel(); - mpp_tunnel_ptr->close("Canceled"); + mpp_tunnel_ptr->close("Canceled", false); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); - mpp_tunnel_ptr->close("Canceled"); + mpp_tunnel_ptr->close("Canceled", false); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); } CATCH @@ -508,11 +504,12 @@ try std::unique_ptr data_packet_ptr = std::make_unique(); data_packet_ptr->set_data("First"); mpp_tunnel_ptr->write(*data_packet_ptr); - mpp_tunnel_ptr->close("Cancel"); + mpp_tunnel_ptr->close("Cancel", false); local_reader_ptr->thread_manager->wait(); // Join local read thread GTEST_ASSERT_EQ(getTunnelSenderConsumerFinishedFlag(mpp_tunnel_ptr->getTunnelSender()), true); - GTEST_ASSERT_EQ(local_reader_ptr->write_packet_vec.size(), 2); //Second for err msg - GTEST_ASSERT_EQ(local_reader_ptr->write_packet_vec[0], "First"); + auto result_size = local_reader_ptr->write_packet_vec.size(); + GTEST_ASSERT_EQ(result_size == 1 || result_size == 2, true); //Second for err msg + GTEST_ASSERT_EQ(local_reader_ptr->write_packet_vec[result_size - 1], "Cancel"); } CATCH @@ -580,7 +577,7 @@ TEST_F(TestMPPTunnel, LocalWriteAfterFinished) auto mpp_tunnel_ptr = constructLocalSyncTunnel(); auto local_reader_ptr = connectLocalSyncTunnel(mpp_tunnel_ptr); GTEST_ASSERT_EQ(getTunnelConnectedFlag(mpp_tunnel_ptr), true); - mpp_tunnel_ptr->close(""); + mpp_tunnel_ptr->close("", false); std::unique_ptr data_packet_ptr = std::make_unique(); data_packet_ptr->set_data("First"); mpp_tunnel_ptr->write(*data_packet_ptr); @@ -610,13 +607,13 @@ try mpp_tunnel_ptr->write(*data_packet_ptr); data_packet_ptr->set_data("Second"); mpp_tunnel_ptr->write(*data_packet_ptr); - mpp_tunnel_ptr->close("Cancel"); + mpp_tunnel_ptr->close("Cancel", true); GTEST_ASSERT_EQ(getTunnelFinishedFlag(mpp_tunnel_ptr), true); t.join(); - GTEST_ASSERT_EQ(call_data->write_packet_vec.size(), 3); //Third for err msg - GTEST_ASSERT_EQ(call_data->write_packet_vec[0], "First"); - GTEST_ASSERT_EQ(call_data->write_packet_vec[1], "Second"); + auto result_size = call_data->write_packet_vec.size(); + GTEST_ASSERT_EQ(result_size >= 1 && result_size <= 3, true); //Third for err msg + GTEST_ASSERT_EQ(call_data->write_packet_vec[result_size - 1], "Cancel"); } CATCH diff --git a/tests/fullstack-test/mpp/mpp_fail.test b/tests/fullstack-test/mpp/mpp_fail.test index e5bd29c9fa5..ef2eb2286c7 100644 --- a/tests/fullstack-test/mpp/mpp_fail.test +++ b/tests/fullstack-test/mpp/mpp_fail.test @@ -80,22 +80,6 @@ mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_m {#REGEXP}.*Fail point FailPoints::exception_during_mpp_root_task_run is triggered.* => DBGInvoke __disable_fail_point(exception_during_mpp_root_task_run) -## exception during mpp write err to tunnel -=> DBGInvoke __enable_fail_point(exception_during_mpp_non_root_task_run) -=> DBGInvoke __enable_fail_point(exception_during_mpp_write_err_to_tunnel) -mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id; -{#REGEXP}.*Fail point FailPoints::exception_during_mpp_non_root_task_run is triggered.* -=> DBGInvoke __disable_fail_point(exception_during_mpp_non_root_task_run) -=> DBGInvoke __disable_fail_point(exception_during_mpp_write_err_to_tunnel) - -## exception during mpp write close tunnel -=> DBGInvoke __enable_fail_point(exception_during_mpp_non_root_task_run) -=> DBGInvoke __enable_fail_point(exception_during_mpp_close_tunnel) -mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; select count(value), id from t group by id; -{#REGEXP}.*Fail point FailPoints::exception_during_mpp_non_root_task_run is triggered.* -=> DBGInvoke __disable_fail_point(exception_during_mpp_non_root_task_run) -=> DBGInvoke __disable_fail_point(exception_during_mpp_close_tunnel) - ## exception during mpp hash build ## desc format='brief' select t1.id from test.t t1 join test.t t2 on t1.id = t2.id and t1.id <2 join (select id from test.t group by id) t3 on t2.id=t3.id; ## +-----------------------------------------+---------+-------------------+---------------+-------------------------------------------------------------------------+