Skip to content

Commit

Permalink
Scheduler: release threads by tasks (#4307)
Browse files Browse the repository at this point in the history
close #4308
  • Loading branch information
fzhedu authored Mar 17, 2022
1 parent e8c372d commit abcd217
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 37 deletions.
27 changes: 20 additions & 7 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_)
, id(meta.start_ts(), meta.task_id())
, log(getMPPTaskLog("MPPTask", id))
, mpp_task_statistics(id, meta.address())
, scheduled(false)
, schedule_state(ScheduleState::WAITING)
{}

MPPTask::~MPPTask()
Expand All @@ -67,6 +67,13 @@ MPPTask::~MPPTask()
if (current_memory_tracker != memory_tracker)
current_memory_tracker = memory_tracker;
closeAllTunnels("");
if (schedule_state == ScheduleState::SCHEDULED)
{
/// the threads of this task are not fully freed now, since the BlockIO and DAGContext are not destructed
/// TODO: finish all threads before here, except the current one.
manager->releaseThreadsFromScheduler(needed_threads);
schedule_state = ScheduleState::COMPLETED;
}
LOG_FMT_DEBUG(log, "finish MPPTask: {}", id.toString());
}

Expand Down Expand Up @@ -398,7 +405,7 @@ void MPPTask::cancel(const String & reason)
}
else if (previous_status == RUNNING && switchStatus(RUNNING, CANCELLED))
{
scheduleThisTask();
scheduleThisTask(ScheduleState::FAILED);
context->getProcessList().sendCancelToQuery(context->getCurrentQueryId(), context->getClientInfo().current_user, true);
closeAllTunnels(reason);
/// runImpl is running, leave remaining work to runImpl
Expand All @@ -422,21 +429,21 @@ void MPPTask::scheduleOrWait()
double time_cost;
{
std::unique_lock lock(schedule_mu);
schedule_cv.wait(lock, [&] { return scheduled; });
schedule_cv.wait(lock, [&] { return schedule_state != ScheduleState::WAITING; });
time_cost = stopwatch.elapsedSeconds();
GET_METRIC(tiflash_task_scheduler_waiting_duration_seconds).Observe(time_cost);
}
LOG_FMT_INFO(log, "task waits for {} s to schedule and starts to run in parallel.", time_cost);
}
}

void MPPTask::scheduleThisTask()
void MPPTask::scheduleThisTask(ScheduleState state)
{
std::unique_lock lock(schedule_mu);
if (!scheduled)
if (schedule_state == ScheduleState::WAITING)
{
LOG_FMT_INFO(log, "task gets schedule");
scheduled = true;
LOG_FMT_INFO(log, "task is {}.", state == ScheduleState::SCHEDULED ? "scheduled" : " failed to schedule");
schedule_state = state;
schedule_cv.notify_one();
}
}
Expand All @@ -460,4 +467,10 @@ int MPPTask::getNeededThreads()
return needed_threads;
}

bool MPPTask::isScheduled()
{
std::unique_lock lock(schedule_mu);
return schedule_state == ScheduleState::SCHEDULED;
}

} // namespace DB
14 changes: 12 additions & 2 deletions dbms/src/Flash/Mpp/MPPTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,17 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

int getNeededThreads();

void scheduleThisTask();
enum class ScheduleState
{
WAITING,
SCHEDULED,
FAILED,
COMPLETED
};

void scheduleThisTask(ScheduleState state);

bool isScheduled();

// tunnel and error_message
std::pair<MPPTunnelPtr, String> getTunnel(const ::mpp::EstablishMPPConnectionRequest * request);
Expand Down Expand Up @@ -129,7 +139,7 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>

std::mutex schedule_mu;
std::condition_variable schedule_cv;
bool scheduled;
ScheduleState schedule_state;
};

using MPPTaskPtr = std::shared_ptr<MPPTask>;
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ void MPPTaskManager::cancelMPPQuery(UInt64 query_id, const String & reason)
/// hold the canceled task set, so the mpp task will not be deconstruct when holding the
/// `mu` of MPPTaskManager, otherwise it might cause deadlock
canceled_task_set = it->second;
scheduler->deleteThenSchedule(query_id, *this);
mpp_query_map.erase(it);
}
}
Expand Down Expand Up @@ -156,7 +155,7 @@ void MPPTaskManager::unregisterTask(MPPTask * task)
if (it->second->task_map.empty())
{
/// remove query task map if the task is the last one
scheduler->deleteThenSchedule(task->id.start_ts, *this);
scheduler->deleteFinishedQuery(task->id.start_ts);
mpp_query_map.erase(it);
}
return;
Expand Down Expand Up @@ -212,4 +211,10 @@ bool MPPTaskManager::tryToScheduleTask(const MPPTaskPtr & task)
return scheduler->tryToSchedule(task, *this);
}

void MPPTaskManager::releaseThreadsFromScheduler(const int needed_threads)
{
std::lock_guard lock(mu);
scheduler->releaseThreadsThenSchedule(needed_threads, *this);
}

} // namespace DB
4 changes: 2 additions & 2 deletions dbms/src/Flash/Mpp/MPPTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ struct MPPQueryTaskSet
bool to_be_cancelled = false;
MPPTaskMap task_map;
/// only used in scheduler
UInt32 scheduled_task = 0;
UInt32 estimated_thread_usage = 0;
std::queue<MPPTaskPtr> waiting_tasks;
};

Expand Down Expand Up @@ -77,6 +75,8 @@ class MPPTaskManager : private boost::noncopyable

bool tryToScheduleTask(const MPPTaskPtr & task);

void releaseThreadsFromScheduler(const int needed_threads);

MPPTaskPtr findTaskWithTimeout(const mpp::TaskMeta & meta, std::chrono::seconds timeout, std::string & errMsg);

void cancelMPPQuery(UInt64 query_id, const String & reason);
Expand Down
51 changes: 30 additions & 21 deletions dbms/src/Flash/Mpp/MinTSOScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ bool MinTSOScheduler::tryToSchedule(const MPPTaskPtr & task, MPPTaskManager & ta
/// the cancelled query maybe hang, so trigger scheduling as needed.
void MinTSOScheduler::deleteCancelledQuery(const UInt64 tso, MPPTaskManager & task_manager)
{
if (isDisabled())
{
return;
}

active_set.erase(tso);
waiting_set.erase(tso);
GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size());
Expand All @@ -88,7 +93,7 @@ void MinTSOScheduler::deleteCancelledQuery(const UInt64 tso, MPPTaskManager & ta
{
while (!query_task_set->waiting_tasks.empty())
{
query_task_set->waiting_tasks.front()->scheduleThisTask();
query_task_set->waiting_tasks.front()->scheduleThisTask(MPPTask::ScheduleState::FAILED);
query_task_set->waiting_tasks.pop();
GET_METRIC(tiflash_task_scheduler, type_waiting_tasks_count).Decrement();
}
Expand All @@ -101,32 +106,40 @@ void MinTSOScheduler::deleteCancelledQuery(const UInt64 tso, MPPTaskManager & ta
}
}

void MinTSOScheduler::deleteThenSchedule(const UInt64 tso, MPPTaskManager & task_manager)
void MinTSOScheduler::deleteFinishedQuery(const UInt64 tso)
{
if (isDisabled())
{
return;
}
auto query_task_set = task_manager.getQueryTaskSetWithoutLock(tso);
/// return back threads
if (query_task_set)
{
estimated_thread_usage -= query_task_set->estimated_thread_usage;
GET_METRIC(tiflash_task_scheduler, type_estimated_thread_usage).Decrement(query_task_set->estimated_thread_usage);
GET_METRIC(tiflash_task_scheduler, type_active_tasks_count).Decrement(query_task_set->scheduled_task);
query_task_set->estimated_thread_usage = 0;
query_task_set->scheduled_task = 0;
}
LOG_FMT_INFO(log, "query {} (is min = {}) is deleted from active set {} left {} or waiting set {} left {}.", tso, tso == min_tso, active_set.find(tso) != active_set.end(), active_set.size(), waiting_set.find(tso) != waiting_set.end(), waiting_set.size());
/// delete from working set and return threads for finished or cancelled queries

LOG_FMT_DEBUG(log, "query {} (is min = {}) is deleted from active set {} left {} or waiting set {} left {}.", tso, tso == min_tso, active_set.find(tso) != active_set.end(), active_set.size(), waiting_set.find(tso) != waiting_set.end(), waiting_set.size());
/// delete from sets
active_set.erase(tso);
waiting_set.erase(tso);
GET_METRIC(tiflash_task_scheduler, type_waiting_queries_count).Set(waiting_set.size());
GET_METRIC(tiflash_task_scheduler, type_active_queries_count).Set(active_set.size());

updateMinTSO(tso, true, "as deleting it.");
}

/// as deleted query release some threads, so some tasks would get scheduled.
void MinTSOScheduler::releaseThreadsThenSchedule(const int needed_threads, MPPTaskManager & task_manager)
{
if (isDisabled())
{
return;
}

if (static_cast<Int64>(estimated_thread_usage) < needed_threads)
{
auto msg = fmt::format("estimated_thread_usage should not be smaller than 0, actually is {}.", static_cast<Int64>(estimated_thread_usage) - needed_threads);
LOG_FMT_ERROR(log, "{}", msg);
throw Exception(msg);
}
estimated_thread_usage -= needed_threads;
GET_METRIC(tiflash_task_scheduler, type_estimated_thread_usage).Set(estimated_thread_usage);
GET_METRIC(tiflash_task_scheduler, type_active_tasks_count).Decrement();
/// as tasks release some threads, so some tasks would get scheduled.
scheduleWaitingQueries(task_manager);
}

Expand Down Expand Up @@ -174,13 +187,8 @@ bool MinTSOScheduler::scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & q
{
updateMinTSO(tso, false, isWaiting ? "from the waiting set" : "when directly schedule it");
active_set.insert(tso);
++query_task_set->scheduled_task;
query_task_set->estimated_thread_usage += needed_threads;
estimated_thread_usage += needed_threads;
if (isWaiting)
{
task->scheduleThisTask();
}
task->scheduleThisTask(MPPTask::ScheduleState::SCHEDULED);
GET_METRIC(tiflash_task_scheduler, type_active_queries_count).Set(active_set.size());
GET_METRIC(tiflash_task_scheduler, type_estimated_thread_usage).Set(estimated_thread_usage);
GET_METRIC(tiflash_task_scheduler, type_active_tasks_count).Increment();
Expand Down Expand Up @@ -233,4 +241,5 @@ bool MinTSOScheduler::updateMinTSO(const UInt64 tso, const bool retired, const S
}
return force_scheduling;
}

} // namespace DB
9 changes: 6 additions & 3 deletions dbms/src/Flash/Mpp/MinTSOScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,13 @@ class MinTSOScheduler : private boost::noncopyable
/// NOTE: call deleteCancelledQuery under the lock protection of MPPTaskManager
void deleteCancelledQuery(const UInt64 tso, MPPTaskManager & task_manager);

/// delete the query in the active set and waiting set and release threads, then schedule waiting tasks.
/// NOTE: call deleteThenSchedule under the lock protection of MPPTaskManager,
/// delete the query in the active set and waiting set
/// NOTE: call deleteFinishedQuery under the lock protection of MPPTaskManager,
/// so this func is called exactly once for a query.
void deleteThenSchedule(const UInt64 tso, MPPTaskManager & task_manager);
void deleteFinishedQuery(const UInt64 tso);

/// all scheduled tasks should finally call this function to release threads and schedule new tasks
void releaseThreadsThenSchedule(const int needed_threads, MPPTaskManager & task_manager);

private:
bool scheduleImp(const UInt64 tso, const MPPQueryTaskSetPtr & query_task_set, const MPPTaskPtr & task, const bool isWaiting);
Expand Down

0 comments on commit abcd217

Please sign in to comment.