Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler no throw in destruction and avoid updated min-tso query hang #4367

Merged
merged 7 commits into from
Mar 23, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,13 +426,16 @@ void MPPTask::scheduleOrWait()
{
LOG_FMT_INFO(log, "task waits for schedule");
Stopwatch stopwatch;
double time_cost;
{
std::unique_lock lock(schedule_mu);
schedule_cv.wait(lock, [&] { return schedule_state != ScheduleState::WAITING; });
time_cost = stopwatch.elapsedSeconds();
GET_METRIC(tiflash_task_scheduler_waiting_duration_seconds).Observe(time_cost);
if (schedule_state == ScheduleState::EXCEEDED)
{
throw Exception("{} is failed to schedule because of exceeding the thread hard limit in min-tso scheduler.", id.toString());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not throw error if the schedule state is FAILED?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the later processing in MppTask->runImp() will throw because this query is to be cancelled, whose state is not RUNNING..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But after the task is scheduled with FAILED schedule state, two threads are running concurrently with

  1. runImpl thread begin to run
  2. cancel thread mark MPPTask as cancelled
    There is no guarantee that rumImpl thread will see the Cancelled status right after it is waked up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to this goal.

}
auto time_cost = stopwatch.elapsedSeconds();
GET_METRIC(tiflash_task_scheduler_waiting_duration_seconds).Observe(time_cost);
LOG_FMT_INFO(log, "task waits for {} s to schedule and starts to run in parallel.", time_cost);
}
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Mpp/MPPTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>
WAITING,
SCHEDULED,
FAILED,
EXCEEDED,
COMPLETED
};

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void MPPTaskManager::cancelMPPQuery(UInt64 query_id, const String & reason)
return;
it->second->to_be_cancelled = true;
task_set = it->second;
scheduler->deleteCancelledQuery(query_id, *this);
scheduler->deleteQuery(query_id, *this, true);
cv.notify_all();
}
LOG_WARNING(log, fmt::format("Begin cancel query: {}", query_id));
Expand Down Expand Up @@ -155,7 +155,7 @@ void MPPTaskManager::unregisterTask(MPPTask * task)
if (it->second->task_map.empty())
{
/// remove query task map if the task is the last one
scheduler->deleteFinishedQuery(task->id.start_ts);
scheduler->deleteQuery(task->id.start_ts, *this, false);
mpp_query_map.erase(it);
}
return;
Expand Down
59 changes: 28 additions & 31 deletions dbms/src/Flash/Mpp/MinTSOScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,54 +75,43 @@ bool MinTSOScheduler::tryToSchedule(const MPPTaskPtr & task, MPPTaskManager & ta
return scheduleImp(id.start_ts, query_task_set, task, false);
}

/// the cancelled query maybe hang, so trigger scheduling as needed.
void MinTSOScheduler::deleteCancelledQuery(const UInt64 tso, MPPTaskManager & task_manager)
/// after finishing the query, there would be no threads released soon, so the updated min-tso query with waiting tasks should be scheduled.
/// the cancelled query maybe hang, so trigger scheduling as needed when deleting cancelled query.
void MinTSOScheduler::deleteQuery(const UInt64 tso, MPPTaskManager & task_manager, const bool is_cancelled)
{
if (isDisabled())
{
return;
}

LOG_FMT_DEBUG(log, "{} query {} (is min = {}) is deleted from active set {} left {} or waiting set {} left {}.", is_cancelled ? "Cancelled" : "Finished", tso, tso == min_tso, active_set.find(tso) != active_set.end(), active_set.size(), waiting_set.find(tso) != waiting_set.end(), waiting_set.size());
active_set.erase(tso);
waiting_set.erase(tso);
GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size());
GET_METRIC(tiflash_task_scheduler, type_active_queries_count).Set(active_set.size());

auto query_task_set = task_manager.getQueryTaskSetWithoutLock(tso);
if (query_task_set) /// release all waiting tasks
if (is_cancelled) /// cancelled queries may have waiting tasks, and finished queries haven't.
{
while (!query_task_set->waiting_tasks.empty())
auto query_task_set = task_manager.getQueryTaskSetWithoutLock(tso);
if (query_task_set) /// release all waiting tasks
{
query_task_set->waiting_tasks.front()->scheduleThisTask(MPPTask::ScheduleState::FAILED);
query_task_set->waiting_tasks.pop();
GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Decrement();
while (!query_task_set->waiting_tasks.empty())
{
query_task_set->waiting_tasks.front()->scheduleThisTask(MPPTask::ScheduleState::FAILED);
query_task_set->waiting_tasks.pop();
GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Decrement();
}
}
}

/// NOTE: if the cancelled query hang, it will block the min_tso, possibly resulting in deadlock. so here force scheduling waiting tasks of the updated min_tso query.
if (updateMinTSO(tso, true, "when cancelling it."))
/// NOTE: if updated min_tso query has waiting tasks, they should be scheduled, especially when the soft-limited threads are amost used and active tasks are in resources deadlock which cannot release threads soon.
if (updateMinTSO(tso, true, is_cancelled ? "when cancelling it" : "as finishing it"))
{
scheduleWaitingQueries(task_manager);
}
}

void MinTSOScheduler::deleteFinishedQuery(const UInt64 tso)
{
if (isDisabled())
{
return;
}

LOG_FMT_DEBUG(log, "query {} (is min = {}) is deleted from active set {} left {} or waiting set {} left {}.", tso, tso == min_tso, active_set.find(tso) != active_set.end(), active_set.size(), waiting_set.find(tso) != waiting_set.end(), waiting_set.size());
/// delete from sets
active_set.erase(tso);
waiting_set.erase(tso);
GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size());
GET_METRIC(tiflash_task_scheduler, type_active_queries_count).Set(active_set.size());

updateMinTSO(tso, true, "as deleting it.");
}

/// NOTE: should not throw exceptions due to being called when destruction.
void MinTSOScheduler::releaseThreadsThenSchedule(const int needed_threads, MPPTaskManager & task_manager)
{
if (isDisabled())
Expand All @@ -132,9 +121,8 @@ void MinTSOScheduler::releaseThreadsThenSchedule(const int needed_threads, MPPTa

if (static_cast<Int64>(estimated_thread_usage) < needed_threads)
{
auto msg = fmt::format("estimated_thread_usage should not be smaller than 0, actually is {}.", static_cast<Int64>(estimated_thread_usage) - needed_threads);
LOG_FMT_ERROR(log, "{}", msg);
throw Exception(msg);
LOG_FMT_FATAL(log, "estimated_thread_usage should not be smaller than 0, actually is {}.", static_cast<Int64>(estimated_thread_usage) - needed_threads);
std::terminate();
}
estimated_thread_usage -= needed_threads;
GET_METRIC(tiflash_task_scheduler, type_estimated_thread_usage).Set(estimated_thread_usage);
Expand Down Expand Up @@ -202,7 +190,16 @@ bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & q
auto msg = fmt::format("threads are unavailable for the query {} ({} min_tso {}) {}, need {}, but used {} of the thread hard limit {}, {} active and {} waiting queries.", tso, tso == min_tso ? "is" : "is newer than", min_tso, isWaiting ? "from the waiting set" : "when directly schedule it", needed_threads, estimated_thread_usage, thread_hard_limit, active_set.size(), waiting_set.size());
LOG_FMT_ERROR(log, "{}", msg);
GET_METRIC(tiflash_task_scheduler, type_hard_limit_exceeded_count).Increment();
throw Exception(msg);
if (isWaiting)
{
/// set this task be failed to schedule, and the task will throw exception, then TiDB will finally notify this tiflash node canceling all tasks of this tso and update metrics.
task->scheduleThisTask(MPPTask::ScheduleState::EXCEEDED);
waiting_set.erase(tso); /// avoid the left waiting tasks of this query reaching here many times.
}
else
{
throw Exception(msg);
}
}
if (!isWaiting)
{
Expand Down
13 changes: 4 additions & 9 deletions dbms/src/Flash/Mpp/MinTSOScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace DB
/// scheduling tasks in the set according to the tso order under the soft limit of threads, but allow the min_tso query to preempt threads under the hard limit of threads.
/// The min_tso query avoids the deadlock resulted from threads competition among nodes.
/// schedule tasks under the lock protection of the task manager.
/// NOTE: if this scheduler hangs resulting from some bugs, kill the min_tso query, and the cancelled query surely transfers the min_tso.
/// NOTE: if the updated min-tso query has waiting tasks, necessarily scheduling them, otherwise the query would hang.
class MinTSOScheduler : private boost::noncopyable
{
public:
Expand All @@ -34,14 +34,9 @@ class MinTSOScheduler : private boost::noncopyable
/// NOTE: call tryToSchedule under the lock protection of MPPTaskManager
bool tryToSchedule(const MPPTaskPtr & task, MPPTaskManager & task_manager);

/// delete this to-be cancelled query from scheduler and update min_tso if needed, so that there aren't cancelled queries in the scheduler.
/// NOTE: call deleteCancelledQuery under the lock protection of MPPTaskManager
void deleteCancelledQuery(const UInt64 tso, MPPTaskManager & task_manager);

/// delete the query in the active set and waiting set
/// NOTE: call deleteFinishedQuery under the lock protection of MPPTaskManager,
/// so this func is called exactly once for a query.
void deleteFinishedQuery(const UInt64 tso);
/// delete this to-be cancelled/finished query from scheduler and update min_tso if needed, so that there aren't cancelled/finished queries in the scheduler.
/// NOTE: call deleteQuery under the lock protection of MPPTaskManager
void deleteQuery(const UInt64 tso, MPPTaskManager & task_manager, const bool is_cancelled);

/// all scheduled tasks should finally call this function to release threads and schedule new tasks
void releaseThreadsThenSchedule(const int needed_threads, MPPTaskManager & task_manager);
Expand Down
1 change: 1 addition & 0 deletions libs/libcommon/include/common/logger_useful.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,4 @@ std::string toCheckedFmtStr(const S & format, const Ignored &, Args &&... args)
#define LOG_FMT_INFO(logger, ...) LOG_FMT_IMPL(logger, Poco::Message::PRIO_INFORMATION, __VA_ARGS__)
#define LOG_FMT_WARNING(logger, ...) LOG_FMT_IMPL(logger, Poco::Message::PRIO_WARNING, __VA_ARGS__)
#define LOG_FMT_ERROR(logger, ...) LOG_FMT_IMPL(logger, Poco::Message::PRIO_ERROR, __VA_ARGS__)
#define LOG_FMT_FATAL(logger, ...) LOG_FMT_IMPL(logger, Poco::Message::PRIO_FATAL, __VA_ARGS__)