Skip to content

Commit

Permalink
Schedule: track the waiting tasks with task ID, and deleted the sched…
Browse files Browse the repository at this point in the history
…uled task with exceeded state from the waiting tasks queue (#4958)

close #4954
  • Loading branch information
fzhedu authored May 23, 2022
1 parent 1dfe7fc commit 350323d
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 8 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/MPPTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ struct MPPQueryTaskSet
bool to_be_cancelled = false;
MPPTaskMap task_map;
/// only used in scheduler
std::queue<MPPTaskPtr> waiting_tasks;
std::queue<MPPTaskId> waiting_tasks;
};

using MPPQueryTaskSetPtr = std::shared_ptr<MPPQueryTaskSet>;
Expand Down
22 changes: 16 additions & 6 deletions dbms/src/Flash/Mpp/MinTSOScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ bool MinTSOScheduler::tryToSchedule(const MPPTaskPtr & task, MPPTaskManager & ta
LOG_FMT_WARNING(log, "{} is scheduled with miss or cancellation.", id.toString());
return true;
}
return scheduleImp(id.start_ts, query_task_set, task, false);
bool has_error = false;
return scheduleImp(id.start_ts, query_task_set, task, false, has_error);
}

/// after finishing the query, there would be no threads released soon, so the updated min-tso query with waiting tasks should be scheduled.
Expand All @@ -97,7 +98,9 @@ void MinTSOScheduler::deleteQuery(const UInt64 tso, MPPTaskManager & task_manage
{
while (!query_task_set->waiting_tasks.empty())
{
query_task_set->waiting_tasks.front()->scheduleThisTask(MPPTask::ScheduleState::FAILED);
auto task_it = query_task_set->task_map.find(query_task_set->waiting_tasks.front());
if (task_it != query_task_set->task_map.end() && task_it->second != nullptr)
task_it->second->scheduleThisTask(MPPTask::ScheduleState::FAILED);
query_task_set->waiting_tasks.pop();
GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Decrement();
}
Expand Down Expand Up @@ -153,9 +156,14 @@ void MinTSOScheduler::scheduleWaitingQueries(MPPTaskManager & task_manager)
/// schedule tasks one by one
while (!query_task_set->waiting_tasks.empty())
{
auto task = query_task_set->waiting_tasks.front();
if (!scheduleImp(current_query_id, query_task_set, task, true))
auto task_it = query_task_set->task_map.find(query_task_set->waiting_tasks.front());
bool has_error = false;
if (task_it != query_task_set->task_map.end() && task_it->second != nullptr && !scheduleImp(current_query_id, query_task_set, task_it->second, true, has_error))
{
if (has_error)
query_task_set->waiting_tasks.pop(); /// it should be pop from the waiting queue, because the task is scheduled with errors.
return;
}
query_task_set->waiting_tasks.pop();
GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Decrement();
}
Expand All @@ -166,7 +174,7 @@ void MinTSOScheduler::scheduleWaitingQueries(MPPTaskManager & task_manager)
}

/// [directly schedule, from waiting set] * [is min_tso query, not] * [can schedule, can't] totally 8 cases.
bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & query_task_set, const MPPTaskPtr & task, const bool isWaiting)
bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & query_task_set, const MPPTaskPtr & task, const bool isWaiting, bool & has_error)
{
auto needed_threads = task->getNeededThreads();
auto check_for_new_min_tso = tso <= min_tso && estimated_thread_usage + needed_threads <= thread_hard_limit;
Expand All @@ -187,6 +195,7 @@ bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & q
{
if (tso <= min_tso) /// the min_tso query should fully run, otherwise throw errors here.
{
has_error = true;
auto msg = fmt::format("threads are unavailable for the query {} ({} min_tso {}) {}, need {}, but used {} of the thread hard limit {}, {} active and {} waiting queries.", tso, tso == min_tso ? "is" : "is newer than", min_tso, isWaiting ? "from the waiting set" : "when directly schedule it", needed_threads, estimated_thread_usage, thread_hard_limit, active_set.size(), waiting_set.size());
LOG_FMT_ERROR(log, "{}", msg);
GET_METRIC(tiflash_task_scheduler, type_hard_limit_exceeded_count).Increment();
Expand All @@ -200,11 +209,12 @@ bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & q
{
throw Exception(msg);
}
return false;
}
if (!isWaiting)
{
waiting_set.insert(tso);
query_task_set->waiting_tasks.push(task);
query_task_set->waiting_tasks.push(task->getId());
GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size());
GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Increment();
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/MinTSOScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class MinTSOScheduler : private boost::noncopyable
void releaseThreadsThenSchedule(const int needed_threads, MPPTaskManager & task_manager);

private:
bool scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & query_task_set, const MPPTaskPtr & task, const bool isWaiting);
bool scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & query_task_set, const MPPTaskPtr & task, const bool isWaiting, bool & has_error);
bool updateMinTSO(const UInt64 tso, const bool retired, const String msg);
void scheduleWaitingQueries(MPPTaskManager & task_manager);
bool isDisabled()
Expand Down

0 comments on commit 350323d

Please sign in to comment.