Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schedule: track the waiting tasks with task ID, and deleted the scheduled task with exceeded state from the waiting tasks queue (#4958) #4974

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/MPPTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ struct MPPQueryTaskSet
bool to_be_cancelled = false;
MPPTaskMap task_map;
/// only used in scheduler
std::queue<MPPTaskPtr> waiting_tasks;
std::queue<MPPTaskId> waiting_tasks;
};

using MPPQueryTaskSetPtr = std::shared_ptr<MPPQueryTaskSet>;
Expand Down
22 changes: 16 additions & 6 deletions dbms/src/Flash/Mpp/MinTSOScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ bool MinTSOScheduler::tryToSchedule(const MPPTaskPtr & task, MPPTaskManager & ta
LOG_FMT_WARNING(log, "{} is scheduled with miss or cancellation.", id.toString());
return true;
}
return scheduleImp(id.start_ts, query_task_set, task, false);
bool has_error = false;
return scheduleImp(id.start_ts, query_task_set, task, false, has_error);
}

/// after finishing the query, there would be no threads released soon, so the updated min-tso query with waiting tasks should be scheduled.
Expand All @@ -97,7 +98,9 @@ void MinTSOScheduler::deleteQuery(const UInt64 tso, MPPTaskManager & task_manage
{
while (!query_task_set->waiting_tasks.empty())
{
query_task_set->waiting_tasks.front()->scheduleThisTask(MPPTask::ScheduleState::FAILED);
auto task_it = query_task_set->task_map.find(query_task_set->waiting_tasks.front());
if (task_it != query_task_set->task_map.end() && task_it->second != nullptr)
task_it->second->scheduleThisTask(MPPTask::ScheduleState::FAILED);
query_task_set->waiting_tasks.pop();
GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Decrement();
}
Expand Down Expand Up @@ -153,9 +156,14 @@ void MinTSOScheduler::scheduleWaitingQueries(MPPTaskManager & task_manager)
/// schedule tasks one by one
while (!query_task_set->waiting_tasks.empty())
{
auto task = query_task_set->waiting_tasks.front();
if (!scheduleImp(current_query_id, query_task_set, task, true))
auto task_it = query_task_set->task_map.find(query_task_set->waiting_tasks.front());
bool has_error = false;
if (task_it != query_task_set->task_map.end() && task_it->second != nullptr && !scheduleImp(current_query_id, query_task_set, task_it->second, true, has_error))
{
if (has_error)
query_task_set->waiting_tasks.pop(); /// it should be pop from the waiting queue, because the task is scheduled with errors.
return;
}
query_task_set->waiting_tasks.pop();
GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Decrement();
}
Expand All @@ -166,7 +174,7 @@ void MinTSOScheduler::scheduleWaitingQueries(MPPTaskManager & task_manager)
}

/// [directly schedule, from waiting set] * [is min_tso query, not] * [can schedule, can't] totally 8 cases.
bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & query_task_set, const MPPTaskPtr & task, const bool isWaiting)
bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & query_task_set, const MPPTaskPtr & task, const bool isWaiting, bool & has_error)
{
auto needed_threads = task->getNeededThreads();
auto check_for_new_min_tso = tso <= min_tso && estimated_thread_usage + needed_threads <= thread_hard_limit;
Expand All @@ -187,6 +195,7 @@ bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & q
{
if (tso <= min_tso) /// the min_tso query should fully run, otherwise throw errors here.
{
has_error = true;
auto msg = fmt::format("threads are unavailable for the query {} ({} min_tso {}) {}, need {}, but used {} of the thread hard limit {}, {} active and {} waiting queries.", tso, tso == min_tso ? "is" : "is newer than", min_tso, isWaiting ? "from the waiting set" : "when directly schedule it", needed_threads, estimated_thread_usage, thread_hard_limit, active_set.size(), waiting_set.size());
LOG_FMT_ERROR(log, "{}", msg);
GET_METRIC(tiflash_task_scheduler, type_hard_limit_exceeded_count).Increment();
Expand All @@ -200,11 +209,12 @@ bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & q
{
throw Exception(msg);
}
return false;
}
if (!isWaiting)
{
waiting_set.insert(tso);
query_task_set->waiting_tasks.push(task);
query_task_set->waiting_tasks.push(task->getId());
GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size());
GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Increment();
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/MinTSOScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class MinTSOScheduler : private boost::noncopyable
void releaseThreadsThenSchedule(const int needed_threads, MPPTaskManager & task_manager);

private:
bool scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & query_task_set, const MPPTaskPtr & task, const bool isWaiting);
bool scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & query_task_set, const MPPTaskPtr & task, const bool isWaiting, bool & has_error);
bool updateMinTSO(const UInt64 tso, const bool retired, const String msg);
void scheduleWaitingQueries(MPPTaskManager & task_manager);
bool isDisabled()
Expand Down