Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not use extra threads when cancel mpp query (#5966) #6006

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(exception_before_mpp_register_tunnel_for_root_mpp_task) \
M(exception_before_mpp_root_task_run) \
M(exception_during_mpp_root_task_run) \
M(exception_during_mpp_write_err_to_tunnel) \
M(exception_during_mpp_close_tunnel) \
M(exception_during_write_to_storage) \
M(force_set_sst_to_dtfile_block_size) \
M(force_set_sst_decode_rand) \
Expand Down Expand Up @@ -127,7 +125,6 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(random_aggregate_merge_failpoint) \
M(random_sharedquery_failpoint) \
M(random_interpreter_failpoint) \
M(random_task_lifecycle_failpoint) \
M(random_task_manager_find_task_failure_failpoint) \
M(random_min_tso_scheduler_failpoint)

Expand Down
9 changes: 2 additions & 7 deletions dbms/src/Common/MPMCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Common/Exception.h>
#include <Common/SimpleIntrusiveNode.h>
#include <Common/nocopyable.h>
#include <common/defines.h>
Expand Down Expand Up @@ -84,12 +85,6 @@ class MPMCQueue
drain();
}

void finishAndDrain()
{
finish();
drain();
}

// Cannot to use copy/move constructor,
// because MPMCQueue maybe used by different threads.
// Copy and move it is dangerous.
Expand Down Expand Up @@ -219,7 +214,7 @@ class MPMCQueue
return static_cast<size_t>(write_pos - read_pos);
}

std::string_view getCancelReason() const
const String & getCancelReason() const
{
std::unique_lock lock(mu);
RUNTIME_ASSERT(isCancelled());
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Flash/EstablishCall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ void EstablishCallData::trySendOneMsg()
case GRPCSendQueueRes::FINISHED:
writeDone("", grpc::Status::OK);
return;
case GRPCSendQueueRes::CANCELLED:
RUNTIME_ASSERT(!async_tunnel_sender->getCancelReason().empty(), "Tunnel sender cancelled without reason");
writeErr(getPacketWithError(async_tunnel_sender->getCancelReason()));
return;
case GRPCSendQueueRes::EMPTY:
// No new message.
return;
Expand Down
29 changes: 21 additions & 8 deletions dbms/src/Flash/Mpp/GRPCSendQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

namespace DB
{

namespace tests
{
class TestGRPCSendQueue;
Expand Down Expand Up @@ -58,6 +57,7 @@ enum class GRPCSendQueueRes
OK,
FINISHED,
EMPTY,
CANCELLED,
};

/// A multi-producer-single-consumer queue dedicated to async grpc streaming send work.
Expand Down Expand Up @@ -120,6 +120,22 @@ class GRPCSendQueue
return ret;
}

/// Cancel the send queue, and set the cancel reason
bool cancelWith(const String & reason)
{
auto ret = send_queue.cancelWith(reason);
if (ret)
{
kickCompletionQueue();
}
return ret;
}

const String & getCancelReason() const
{
return send_queue.getCancelReason();
}

/// Pop the data from queue.
///
/// Return OK if pop is done.
Expand All @@ -142,6 +158,8 @@ class GRPCSendQueue
return GRPCSendQueueRes::OK;
case MPMCQueueResult::FINISHED:
return GRPCSendQueueRes::FINISHED;
case MPMCQueueResult::CANCELLED:
return GRPCSendQueueRes::CANCELLED;
case MPMCQueueResult::EMPTY:
// Handle this case later.
break;
Expand All @@ -161,6 +179,8 @@ class GRPCSendQueue
return GRPCSendQueueRes::OK;
case MPMCQueueResult::FINISHED:
return GRPCSendQueueRes::FINISHED;
case MPMCQueueResult::CANCELLED:
return GRPCSendQueueRes::CANCELLED;
case MPMCQueueResult::EMPTY:
{
// If empty, change status to WAITING.
Expand All @@ -186,13 +206,6 @@ class GRPCSendQueue
return ret;
}

/// Finish and drain the queue.
void finishAndDrain()
{
send_queue.finishAndDrain();
kickCompletionQueue();
}

private:
friend class tests::TestGRPCSendQueue;

Expand Down
16 changes: 9 additions & 7 deletions dbms/src/Flash/Mpp/MPPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#include <Common/FailPoint.h>
#include <Common/Stopwatch.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Flash/Mpp/MPPHandler.h>
#include <Flash/Mpp/Utils.h>

Expand All @@ -28,14 +27,17 @@ extern const char exception_before_mpp_root_task_run[];

void MPPHandler::handleError(const MPPTaskPtr & task, String error)
{
try
if (task)
{
if (task)
try
{
task->handleError(error);
}
catch (...)
{
tryLogCurrentException(log, "Fail to handle error and clean task");
}
catch (...)
{
tryLogCurrentException(log, "Fail to handle error and clean task");
}
task->unregisterTask();
}
}
// execute is responsible for making plan , register tasks and tunnels and start the running thread.
Expand Down
95 changes: 27 additions & 68 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,10 @@
#include <DataStreams/SquashingBlockOutputStream.h>
#include <Flash/Coprocessor/DAGCodec.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/CoprocessorHandler.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Mpp/GRPCReceiverContext.h>
#include <Flash/Mpp/MPPTask.h>
#include <Flash/Mpp/MPPTaskManager.h>
#include <Flash/Mpp/MPPTunnelSet.h>
#include <Flash/Mpp/MinTSOScheduler.h>
#include <Flash/Mpp/Utils.h>
#include <Flash/Statistics/traverseExecutors.h>
#include <Flash/executeQuery.h>
Expand All @@ -50,15 +47,14 @@ extern const char exception_before_mpp_register_root_mpp_task[];
extern const char exception_before_mpp_register_tunnel_for_non_root_mpp_task[];
extern const char exception_before_mpp_register_tunnel_for_root_mpp_task[];
extern const char exception_during_mpp_register_tunnel_for_non_root_mpp_task[];
extern const char exception_during_mpp_write_err_to_tunnel[];
extern const char force_no_local_region_for_mpp_task[];
extern const char random_task_lifecycle_failpoint[];
} // namespace FailPoints

MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_)
: context(context_)
, meta(meta_)
, id(meta.start_ts(), meta.task_id())
, manager(context_->getTMTContext().getMPPTaskManager().get())
, log(Logger::get("MPPTask", id.toString()))
, mpp_task_statistics(id, meta.address())
, needed_threads(0)
Expand All @@ -71,28 +67,25 @@ MPPTask::~MPPTask()
/// to current_memory_tracker in the destructor
if (current_memory_tracker != memory_tracker)
current_memory_tracker = memory_tracker;
closeAllTunnels("");
abortTunnels("", true);
if (schedule_state == ScheduleState::SCHEDULED)
{
/// the threads of this task are not fully freed now, since the BlockIO and DAGContext are not destructed
/// TODO: finish all threads before here, except the current one.
manager.load()->releaseThreadsFromScheduler(needed_threads);
manager->releaseThreadsFromScheduler(needed_threads);
schedule_state = ScheduleState::COMPLETED;
}
LOG_FMT_DEBUG(log, "finish MPPTask: {}", id.toString());
}

void MPPTask::abortTunnels(const String & message, AbortType abort_type)
void MPPTask::abortTunnels(const String & message, bool wait_sender_finish)
{
if (abort_type == AbortType::ONCANCELLATION)
{
closeAllTunnels(message);
}
else
{
RUNTIME_ASSERT(tunnel_set != nullptr, log, "mpp task without tunnel set");
tunnel_set->writeError(message);
std::unique_lock lock(tunnel_and_receiver_mu);
if (unlikely(tunnel_set == nullptr))
return;
}
tunnel_set->close(message, wait_sender_finish);
}

void MPPTask::abortReceivers()
Expand All @@ -112,16 +105,6 @@ void MPPTask::abortDataStreams(AbortType abort_type)
context->getProcessList().sendCancelToQuery(context->getCurrentQueryId(), context->getClientInfo().current_user, is_kill);
}

void MPPTask::closeAllTunnels(const String & reason)
{
{
std::unique_lock lock(tunnel_and_receiver_mu);
if (unlikely(tunnel_set == nullptr))
return;
}
tunnel_set->close(reason);
}

void MPPTask::finishWrite()
{
RUNTIME_ASSERT(tunnel_set != nullptr, log, "mpp task without tunnel set");
Expand Down Expand Up @@ -208,12 +191,13 @@ void MPPTask::initExchangeReceivers()

std::pair<MPPTunnelPtr, String> MPPTask::getTunnel(const ::mpp::EstablishMPPConnectionRequest * request)
{
if (status == CANCELLED)
if (status == CANCELLED || status == FAILED)
{
auto err_msg = fmt::format(
"can't find tunnel ({} + {}) because the task is cancelled",
"can't find tunnel ({} + {}) because the task is aborted, error message = {}",
request->sender_meta().task_id(),
request->receiver_meta().task_id());
request->receiver_meta().task_id(),
err_string);
return {nullptr, err_msg};
}

Expand All @@ -233,19 +217,11 @@ std::pair<MPPTunnelPtr, String> MPPTask::getTunnel(const ::mpp::EstablishMPPConn

void MPPTask::unregisterTask()
{
auto * manager_ptr = manager.load();
if (manager_ptr != nullptr)
{
auto [result, reason] = manager_ptr->unregisterTask(this);
if (result)
LOG_FMT_DEBUG(log, "task unregistered");
else
LOG_FMT_WARNING(log, "task failed to unregister, reason: {}", reason);
}
auto [result, reason] = manager->unregisterTask(id);
if (result)
LOG_FMT_DEBUG(log, "task unregistered");
else
{
LOG_ERROR(log, "task manager is unset");
}
LOG_FMT_WARNING(log, "task failed to unregister, reason: {}", reason);
}

void MPPTask::prepare(const mpp::DispatchTaskRequest & task_request)
Expand Down Expand Up @@ -363,6 +339,7 @@ void MPPTask::runImpl()
if (!switchStatus(INITIALIZING, RUNNING))
{
LOG_WARNING(log, "task not in initializing state, skip running");
unregisterTask();
return;
}
Stopwatch stopwatch;
Expand Down Expand Up @@ -453,36 +430,19 @@ void MPPTask::runImpl()
}
}
}
LOG_FMT_INFO(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds());
// unregister flag is only for FailPoint usage, to produce the situation that MPPTask is destructed
// by grpc CancelMPPTask thread;
bool unregister = true;
fiu_do_on(FailPoints::random_task_lifecycle_failpoint, {
if (!err_msg.empty())
unregister = false;
});
if (unregister)
unregisterTask();

mpp_task_statistics.end(status.load(), err_string);
mpp_task_statistics.logTracingJson();

LOG_FMT_INFO(log, "task ends, time cost is {} ms.", stopwatch.elapsedMilliseconds());
unregisterTask();
}

void MPPTask::handleError(const String & error_msg)
{
auto * manager_ptr = manager.load();
if (manager_ptr != nullptr)
{
auto updated_msg = fmt::format("From {}: {}", id.toString(), error_msg);
/// task already been registered in TaskManager, use task manager to abort the whole query in this node
newThreadManager()->scheduleThenDetach(true, "MPPTaskManagerAbortQuery", [manager_ptr, updated_msg, query_id = id.start_ts] {
CPUAffinityManager::getInstance().bindSelfQueryThread();
manager_ptr->abortMPPQuery(query_id, updated_msg, AbortType::ONERROR);
});
/// need to wait until query abort starts, otherwise, the error may be lost
manager_ptr->waitUntilQueryStartsAbort(id.start_ts);
}
else
auto updated_msg = fmt::format("From {}: {}", id.toString(), error_msg);
manager->abortMPPQuery(id.start_ts, updated_msg, AbortType::ONERROR);
if (!registered)
// if the task is not registered, need to cancel it explicitly
abort(error_msg, AbortType::ONERROR);
}

Expand Down Expand Up @@ -513,8 +473,7 @@ void MPPTask::abort(const String & message, AbortType abort_type)
err_string = message;
/// if the task is in initializing state, mpp task can return error to TiDB directly,
/// so just close all tunnels here
closeAllTunnels("");
unregisterTask();
abortTunnels("", false);
LOG_WARNING(log, "Finish abort task from uninitialized");
return;
}
Expand All @@ -524,7 +483,7 @@ void MPPTask::abort(const String & message, AbortType abort_type)
/// first, the top components may see an error caused by the abort, which is not
/// the original error
err_string = message;
abortTunnels(message, abort_type);
abortTunnels(message, false);
abortDataStreams(abort_type);
abortReceivers();
scheduleThisTask(ScheduleState::FAILED);
Expand All @@ -542,7 +501,7 @@ bool MPPTask::switchStatus(TaskStatus from, TaskStatus to)

void MPPTask::scheduleOrWait()
{
if (!manager.load()->tryToScheduleTask(shared_from_this()))
if (!manager->tryToScheduleTask(shared_from_this()))
{
LOG_FMT_INFO(log, "task waits for schedule");
Stopwatch stopwatch;
Expand Down
12 changes: 5 additions & 7 deletions dbms/src/Flash/Mpp/MPPTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,10 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

void unregisterTask();

/// Similar to `writeErrToAllTunnels`, but it just try to write the error message to tunnel
/// without waiting the tunnel to be connected
void closeAllTunnels(const String & reason);

// abort the mpp task, note this function should be non-blocking, it just set some flags
void abort(const String & message, AbortType abort_type);

void abortTunnels(const String & message, AbortType abort_type);
void abortTunnels(const String & message, bool wait_sender_finish);
void abortReceivers();
void abortDataStreams(AbortType abort_type);

Expand Down Expand Up @@ -143,14 +140,15 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

int new_thread_count_of_exchange_receiver = 0;

std::atomic<MPPTaskManager *> manager = nullptr;
MPPTaskManager * manager;
std::atomic<bool> registered{false};

const LoggerPtr log;

MPPTaskStatistics mpp_task_statistics;

friend class MPPTaskManager;
friend class MPPTaskCancelHelper;
friend class MPPHandler;

int needed_threads;

Expand Down
Loading