Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine cancelMPPQuery #5361

Merged
merged 7 commits into from
Jul 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ MPPTask::~MPPTask()
{
/// the threads of this task are not fully freed now, since the BlockIO and DAGContext are not destructed
/// TODO: finish all threads before here, except the current one.
manager->releaseThreadsFromScheduler(needed_threads);
manager.load()->releaseThreadsFromScheduler(needed_threads);
schedule_state = ScheduleState::COMPLETED;
}
LOG_FMT_DEBUG(log, "finish MPPTask: {}", id.toString());
Expand Down Expand Up @@ -212,10 +212,11 @@ std::pair<MPPTunnelPtr, String> MPPTask::getTunnel(const ::mpp::EstablishMPPConn

void MPPTask::unregisterTask()
{
if (manager != nullptr)
auto * manager_ptr = manager.load();
if (manager_ptr != nullptr)
{
LOG_DEBUG(log, "task unregistered");
manager->unregisterTask(this);
manager_ptr->unregisterTask(this);
}
else
{
Expand Down Expand Up @@ -435,7 +436,10 @@ void MPPTask::runImpl()

void MPPTask::handleError(const String & error_msg)
{
if (manager == nullptr || !manager->isTaskToBeCancelled(id))
auto * manager_ptr = manager.load();
/// if manager_ptr is not nullptr, it means the task has already been registered,
/// MPPTaskManager::cancelMPPQuery will handle it properly if the query is to be cancelled.
if (manager_ptr == nullptr || !manager_ptr->isQueryToBeCancelled(id.start_ts))
abort(error_msg, AbortType::ONERROR);
}

Expand Down Expand Up @@ -503,7 +507,7 @@ bool MPPTask::switchStatus(TaskStatus from, TaskStatus to)

void MPPTask::scheduleOrWait()
{
if (!manager->tryToScheduleTask(shared_from_this()))
if (!manager.load()->tryToScheduleTask(shared_from_this()))
{
LOG_FMT_INFO(log, "task waits for schedule");
Stopwatch stopwatch;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/MPPTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

int new_thread_count_of_exchange_receiver = 0;

MPPTaskManager * manager = nullptr;
std::atomic<MPPTaskManager *> manager = nullptr;

const LoggerPtr log;

Expand Down
44 changes: 20 additions & 24 deletions dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,16 @@ void MPPTaskManager::cancelMPPQuery(UInt64 query_id, const String & reason)
/// one without holding the lock
std::lock_guard lock(mu);
auto it = mpp_query_map.find(query_id);
if (it == mpp_query_map.end() || it->second->to_be_cancelled)
if (it == mpp_query_map.end())
{
LOG_WARNING(log, fmt::format("{} does not found in task manager, skip cancel", query_id));
return;
}
else if (it->second->to_be_cancelled)
{
LOG_WARNING(log, fmt::format("{} already in cancel process, skip cancel", query_id));
return;
}
it->second->to_be_cancelled = true;
task_set = it->second;
scheduler->deleteQuery(query_id, *this, true);
Expand All @@ -90,30 +98,22 @@ void MPPTaskManager::cancelMPPQuery(UInt64 query_id, const String & reason)
FmtBuffer fmt_buf;
fmt_buf.fmtAppend("Remaining task in query {} are: ", query_id);
// TODO: cancel tasks in order rather than issuing so many threads to cancel tasks
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
std::vector<std::thread> cancel_workers;
for (const auto & task : task_set->task_map)
auto thread_manager = newThreadManager();
for (auto it = task_set->task_map.begin(); it != task_set->task_map.end();)
{
fmt_buf.fmtAppend("{} ", task.first.toString());
std::thread t(&MPPTask::cancel, task.second, std::ref(reason));
cancel_workers.push_back(std::move(t));
fmt_buf.fmtAppend("{} ", it->first.toString());
auto current_task = it->second;
it = task_set->task_map.erase(it);
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
thread_manager->schedule(false, "CancelMPPTask", [task = std::move(current_task), &reason] { task->cancel(reason); });
}
LOG_WARNING(log, fmt_buf.toString());
for (auto & worker : cancel_workers)
{
worker.join();
}
MPPQueryTaskSetPtr canceled_task_set;
thread_manager->wait();
{
std::lock_guard lock(mu);
/// just to double check the query still exists
auto it = mpp_query_map.find(query_id);
/// just to double check the query still exists
if (it != mpp_query_map.end())
{
/// hold the canceled task set, so the mpp task will not be deconstruct when holding the
/// `mu` of MPPTaskManager, otherwise it might cause deadlock
canceled_task_set = it->second;
mpp_query_map.erase(it);
}
}
LOG_WARNING(log, "Finish cancel query: " + std::to_string(query_id));
}
Expand Down Expand Up @@ -147,15 +147,11 @@ bool MPPTaskManager::registerTask(MPPTaskPtr task)
return true;
}

bool MPPTaskManager::isTaskToBeCancelled(const MPPTaskId & task_id)
bool MPPTaskManager::isQueryToBeCancelled(UInt64 query_id)
{
std::unique_lock lock(mu);
auto it = mpp_query_map.find(task_id.start_ts);
if (it != mpp_query_map.end() && it->second->to_be_cancelled)
{
return it->second->task_map.find(task_id) != it->second->task_map.end();
}
return false;
auto it = mpp_query_map.find(query_id);
return it != mpp_query_map.end() && it->second->to_be_cancelled;
}

void MPPTaskManager::unregisterTask(MPPTask * task)
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Mpp/MPPTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace DB
struct MPPQueryTaskSet
{
/// to_be_cancelled is kind of lock, if to_be_cancelled is set
/// to true, then task_map can only be modified by query cancel
/// to true, then task_map can only be accessed by query cancel
/// thread, which means no task can register/un-register for the
/// query, here we do not need mutex because all the write/read
/// to MPPQueryTaskSet is protected by the mutex in MPPTaskManager
Expand Down Expand Up @@ -73,7 +73,7 @@ class MPPTaskManager : private boost::noncopyable

void unregisterTask(MPPTask * task);

bool isTaskToBeCancelled(const MPPTaskId & task_id);
bool isQueryToBeCancelled(UInt64 query_id);

bool tryToScheduleTask(const MPPTaskPtr & task);

Expand Down