diff --git a/dbms/src/Flash/Mpp/MPPTaskManager.h b/dbms/src/Flash/Mpp/MPPTaskManager.h index 024dd4f3a59..d7047804aca 100644 --- a/dbms/src/Flash/Mpp/MPPTaskManager.h +++ b/dbms/src/Flash/Mpp/MPPTaskManager.h @@ -35,7 +35,7 @@ struct MPPQueryTaskSet bool to_be_cancelled = false; MPPTaskMap task_map; /// only used in scheduler - std::queue waiting_tasks; + std::queue waiting_tasks; }; using MPPQueryTaskSetPtr = std::shared_ptr; diff --git a/dbms/src/Flash/Mpp/MinTSOScheduler.cpp b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp index 9fbe4b7b7cb..af525bd1a55 100644 --- a/dbms/src/Flash/Mpp/MinTSOScheduler.cpp +++ b/dbms/src/Flash/Mpp/MinTSOScheduler.cpp @@ -72,7 +72,8 @@ bool MinTSOScheduler::tryToSchedule(const MPPTaskPtr & task, MPPTaskManager & ta LOG_FMT_WARNING(log, "{} is scheduled with miss or cancellation.", id.toString()); return true; } - return scheduleImp(id.start_ts, query_task_set, task, false); + bool has_error = false; + return scheduleImp(id.start_ts, query_task_set, task, false, has_error); } /// after finishing the query, there would be no threads released soon, so the updated min-tso query with waiting tasks should be scheduled. @@ -97,7 +98,9 @@ void MinTSOScheduler::deleteQuery(const UInt64 tso, MPPTaskManager & task_manage { while (!query_task_set->waiting_tasks.empty()) { - query_task_set->waiting_tasks.front()->scheduleThisTask(MPPTask::ScheduleState::FAILED); + auto task_it = query_task_set->task_map.find(query_task_set->waiting_tasks.front()); + if (task_it != query_task_set->task_map.end() && task_it->second != nullptr) + task_it->second->scheduleThisTask(MPPTask::ScheduleState::FAILED); query_task_set->waiting_tasks.pop(); GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Decrement(); } @@ -153,9 +156,14 @@ void MinTSOScheduler::scheduleWaitingQueries(MPPTaskManager & task_manager) /// schedule tasks one by one while (!query_task_set->waiting_tasks.empty()) { - auto task = query_task_set->waiting_tasks.front(); - if (!scheduleImp(current_query_id, query_task_set, task, true)) + auto task_it = query_task_set->task_map.find(query_task_set->waiting_tasks.front()); + bool has_error = false; + if (task_it != query_task_set->task_map.end() && task_it->second != nullptr && !scheduleImp(current_query_id, query_task_set, task_it->second, true, has_error)) + { + if (has_error) + query_task_set->waiting_tasks.pop(); /// it should be pop from the waiting queue, because the task is scheduled with errors. return; + } query_task_set->waiting_tasks.pop(); GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Decrement(); } @@ -166,7 +174,7 @@ void MinTSOScheduler::scheduleWaitingQueries(MPPTaskManager & task_manager) } /// [directly schedule, from waiting set] * [is min_tso query, not] * [can schedule, can't] totally 8 cases. -bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & query_task_set, const MPPTaskPtr & task, const bool isWaiting) +bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & query_task_set, const MPPTaskPtr & task, const bool isWaiting, bool & has_error) { auto needed_threads = task->getNeededThreads(); auto check_for_new_min_tso = tso <= min_tso && estimated_thread_usage + needed_threads <= thread_hard_limit; @@ -187,6 +195,7 @@ bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & q { if (tso <= min_tso) /// the min_tso query should fully run, otherwise throw errors here. { + has_error = true; auto msg = fmt::format("threads are unavailable for the query {} ({} min_tso {}) {}, need {}, but used {} of the thread hard limit {}, {} active and {} waiting queries.", tso, tso == min_tso ? "is" : "is newer than", min_tso, isWaiting ? "from the waiting set" : "when directly schedule it", needed_threads, estimated_thread_usage, thread_hard_limit, active_set.size(), waiting_set.size()); LOG_FMT_ERROR(log, "{}", msg); GET_METRIC(tiflash_task_scheduler, type_hard_limit_exceeded_count).Increment(); @@ -200,11 +209,12 @@ bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & q { throw Exception(msg); } + return false; } if (!isWaiting) { waiting_set.insert(tso); - query_task_set->waiting_tasks.push(task); + query_task_set->waiting_tasks.push(task->getId()); GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size()); GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Increment(); } diff --git a/dbms/src/Flash/Mpp/MinTSOScheduler.h b/dbms/src/Flash/Mpp/MinTSOScheduler.h index 501aa772a33..17ab1f4dfa3 100644 --- a/dbms/src/Flash/Mpp/MinTSOScheduler.h +++ b/dbms/src/Flash/Mpp/MinTSOScheduler.h @@ -42,7 +42,7 @@ class MinTSOScheduler : private boost::noncopyable void releaseThreadsThenSchedule(const int needed_threads, MPPTaskManager & task_manager); private: - bool scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & query_task_set, const MPPTaskPtr & task, const bool isWaiting); + bool scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & query_task_set, const MPPTaskPtr & task, const bool isWaiting, bool & has_error); bool updateMinTSO(const UInt64 tso, const bool retired, const String msg); void scheduleWaitingQueries(MPPTaskManager & task_manager); bool isDisabled()