Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support tiflash resource control #7656

Closed
wants to merge 41 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
e708b47
tmp save
guo-shaoge Jun 8, 2023
22ba269
ResourceControlQueue done basically
guo-shaoge Jun 10, 2023
150602f
compile
guo-shaoge Jun 10, 2023
56a72f7
add todo
guo-shaoge Jun 12, 2023
3d652b2
fix
guo-shaoge Jun 14, 2023
e2c8399
add keyspace
guo-shaoge Jun 14, 2023
8c3384d
Merge branch 'master' of github.com:pingcap/tiflash into resource_con…
guo-shaoge Jun 14, 2023
da49220
fmt
guo-shaoge Jun 14, 2023
03bb3b8
refine MinTSO related code
guo-shaoge Jun 14, 2023
e5d645a
remove resource group mintso gracefully
guo-shaoge Jun 14, 2023
ac0b144
refine comment and mintso config
guo-shaoge Jun 14, 2023
3aaae89
refine ResourceControlQueue
guo-shaoge Jun 15, 2023
5c72ae9
refine priority_queue
guo-shaoge Jun 15, 2023
9cfb00c
tmp save
guo-shaoge Jun 26, 2023
709bc06
refine MPPTaskManager
guo-shaoge Jul 4, 2023
8a5d5fc
refine comments
guo-shaoge Jul 4, 2023
f76b5ad
refine ResourceControlQueue
guo-shaoge Jul 4, 2023
c2e6e42
add resource_group_name and keyspace_id for Task
guo-shaoge Jul 4, 2023
574edd3
refine LocalAdmissionController
guo-shaoge Jul 4, 2023
1fe9f54
Merge branch 'master' of github.com:pingcap/tiflash into resource_con…
guo-shaoge Jul 4, 2023
b65e9e0
fix IOPriorityQueue and calc priority by ru_per_sec
guo-shaoge Jul 5, 2023
dc16009
fix data member
guo-shaoge Jul 6, 2023
8ff6b20
fix avg_speed always <= 0
guo-shaoge Jul 7, 2023
8113329
fix DAGContext.resource_group_name
guo-shaoge Aug 1, 2023
5c719fc
Merge branch 'master' of github.com:pingcap/tiflash into resource_con…
guo-shaoge Aug 1, 2023
e0f5d0f
fix exec_context
guo-shaoge Aug 1, 2023
d256169
refine log
guo-shaoge Aug 2, 2023
fc5ac2e
refine
guo-shaoge Aug 2, 2023
4c844ac
add gtest
guo-shaoge Aug 2, 2023
58d09a3
refine ResourceControlQueue::take()
guo-shaoge Aug 3, 2023
5626c87
fix gtest
guo-shaoge Aug 3, 2023
9d328cf
TestResourceGroup.BasicTest ok
guo-shaoge Aug 3, 2023
001993a
refine MockLocalAdmissionController
guo-shaoge Aug 4, 2023
fe76070
add gtest CPUUsageProportion
guo-shaoge Aug 6, 2023
5c9c911
refine TaskQueue::take semantics to drain all tasks when finished
guo-shaoge Aug 6, 2023
69f0883
refine ut
guo-shaoge Aug 7, 2023
187a744
fix compile(ENABLE_TESTS)
guo-shaoge Aug 7, 2023
631ecd0
tmp save
guo-shaoge Aug 8, 2023
4875db5
Merge branch 'master' of github.com:pingcap/tiflash into resource_con…
guo-shaoge Aug 8, 2023
364c21d
refine case
guo-shaoge Aug 8, 2023
d24776f
Merge branch 'master' of github.com:pingcap/tiflash into resource_con…
guo-shaoge Aug 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbms/src/Flash/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ add_headers_and_sources(flash_service ./Planner/Plans)
add_headers_and_sources(flash_service ./Statistics)
add_headers_and_sources(flash_service ./Management)
add_headers_and_sources(flash_service ./Disaggregated)
add_headers_and_sources(flash_service ./ResourceControl)

add_library(flash_service ${flash_service_headers} ${flash_service_sources})
target_link_libraries(flash_service dbms)
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ DAGContext::DAGContext(tipb::DAGRequest & dag_request_, const mpp::TaskMeta & me
, warnings(max_recorded_error_count)
, warning_count(0)
, keyspace_id(RequestUtils::deriveKeyspaceID(meta_))
, resource_group_name(meta_.resource_group_name())
{
// only mpp task has join executor.
initExecutorIdToJoinIdMap();
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ class DAGContext
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }

KeyspaceID getKeyspaceID() const { return keyspace_id; }
String getResourceGroupName() { return resource_group_name; }

RU getReadRU() const;

Expand Down Expand Up @@ -407,6 +408,8 @@ class DAGContext
// The keyspace that the DAG request from
const KeyspaceID keyspace_id = NullspaceID;

const String resource_group_name;

// Used to determine the execution mode
// - None: request has not been executed yet
// - Stream: execute with block input stream
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/Executor/PipelineExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ PipelineExecutor::PipelineExecutor(
// But for cop/batchCop, there is no such unique identifier, so an empty value is given here, indicating that the query id of PipelineExecutor is invalid.
/*query_id=*/context.getDAGContext()->is_mpp_task ? context.getDAGContext()->getMPPTaskId().toString() : "",
req_id,
memory_tracker_)
memory_tracker_,
context.getDAGContext()->getResourceGroupName(),
context.getDAGContext()->getKeyspaceID())
{
PhysicalPlan physical_plan{context, log->identifier()};
physical_plan.build(context.getDAGContext()->dag_request());
Expand Down
19 changes: 18 additions & 1 deletion dbms/src/Flash/Executor/PipelineExecutorContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <Flash/Executor/ResultHandler.h>
#include <Flash/Executor/ResultQueue.h>
#include <Flash/Pipeline/Schedule/Tasks/TaskProfileInfo.h>
#include <Storages/Transaction/Types.h>

#include <atomic>
#include <exception>
Expand All @@ -38,8 +39,10 @@ class PipelineExecutorContext : private boost::noncopyable
, mem_tracker(nullptr)
{}

PipelineExecutorContext(const String & query_id_, const String & req_id, const MemoryTrackerPtr & mem_tracker_)
PipelineExecutorContext(const String & query_id_, const String & req_id, const MemoryTrackerPtr & mem_tracker_, const String & resource_group_name_ = "", const KeyspaceID & keyspace_id_ = NullspaceID)
: query_id(query_id_)
, resource_group_name(resource_group_name_)
, keyspace_id(keyspace_id_)
, log(Logger::get(req_id))
, mem_tracker(mem_tracker_)
{}
Expand Down Expand Up @@ -150,6 +153,16 @@ class PipelineExecutorContext : private boost::noncopyable
return query_id;
}

const String & getResourceGroupName() const
{
return resource_group_name;
}

const KeyspaceID & getKeyspaceID() const
{
return keyspace_id;
}

const MemoryTrackerPtr & getMemoryTracker() const
{
return mem_tracker;
Expand All @@ -166,6 +179,10 @@ class PipelineExecutorContext : private boost::noncopyable
private:
const String query_id;

// For resource control.
const String resource_group_name;
const KeyspaceID keyspace_id = NullspaceID;

LoggerPtr log;

MemoryTrackerPtr mem_tracker;
Expand Down
8 changes: 1 addition & 7 deletions dbms/src/Flash/Executor/toRU.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

namespace DB
{
namespace
{
// Convert cpu time nanoseconds to cpu time millisecond, and round up.
UInt64 toCPUTimeMillisecond(UInt64 cpu_time_ns)
{
if (unlikely(cpu_time_ns == 0))
Expand All @@ -29,7 +26,6 @@ UInt64 toCPUTimeMillisecond(UInt64 cpu_time_ns)
auto ceil_cpu_time_millisecond = ceil(cpu_time_millisecond);
return ceil_cpu_time_millisecond;
}
} // namespace

// 1 ru = 3 millisecond cpu time
RU toRU(UInt64 cpu_time_ns)
Expand All @@ -38,8 +34,6 @@ RU toRU(UInt64 cpu_time_ns)
return 0;

auto cpu_time_millisecond = toCPUTimeMillisecond(cpu_time_ns);
auto ru = static_cast<double>(cpu_time_millisecond) / 3;
auto ceil_ru = ceil(ru);
return ceil_ru;
return static_cast<double>(cpu_time_millisecond) / 3;
}
} // namespace DB
5 changes: 4 additions & 1 deletion dbms/src/Flash/Executor/toRU.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

namespace DB
{
using RU = UInt64;
using RU = double;
// Convert cpu time nanoseconds to Request Unit.
RU toRU(UInt64 cpu_time_ns);

// Convert cpu time nanoseconds to cpu time millisecond, and round up.
UInt64 toCPUTimeMillisecond(UInt64 cpu_time_ns);
} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ MPPTask::MPPTask(const mpp::TaskMeta & meta_, const ContextPtr & context_)
, id(meta)
, context(context_)
, manager(context_->getTMTContext().getMPPTaskManager().get())
, schedule_entry(manager, id)
, schedule_entry(manager, id, meta_.resource_group_name())
, log(Logger::get(id.toString()))
, mpp_task_statistics(id, meta.address())
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/MPPTaskId.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ size_t MPPGatherIdHash::operator()(MPPGatherId const & mpp_gather_id) const noex

String MPPTaskId::toString() const
{
return isUnknown() ? "MPP<gather_id:N/A,task_id:N/A>" : fmt::format("MPP<gather_id:{},task_id:{}>", gather_id.toString(), task_id);
return isUnknown() ? "MPP<gather_id:N/A,task_id:N/A>" : fmt::format("MPP<gather_id:{},task_id:{},resource_group:{}>", gather_id.toString(), task_id, resource_group_name);
}

const MPPTaskId MPPTaskId::unknown_mpp_task_id = MPPTaskId{};
Expand Down
8 changes: 6 additions & 2 deletions dbms/src/Flash/Mpp/MPPTaskId.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,22 @@ struct MPPTaskId
: task_id(unknown_task_id)
, gather_id(0, 0, 0, 0, 0){};

MPPTaskId(UInt64 start_ts, Int64 task_id_, UInt64 server_id, Int64 gather_id, UInt64 query_ts, UInt64 local_query_id)
MPPTaskId(UInt64 start_ts, Int64 task_id_, UInt64 server_id, Int64 gather_id, UInt64 query_ts, UInt64 local_query_id, const String & resource_group_name_ = "")
: task_id(task_id_)
, gather_id(gather_id, query_ts, local_query_id, server_id, start_ts)
, resource_group_name(resource_group_name_)
{}

explicit MPPTaskId(const mpp::TaskMeta & task_meta)
: task_id(task_meta.task_id())
, gather_id(task_meta)
, resource_group_name(task_meta.resource_group_name())
{}

Int64 task_id;
MPPGatherId gather_id;
// This is not part of MPPTaskId, put here because we can identify each resource group for each MPPTask in log.
String resource_group_name;

bool isUnknown() const { return task_id == unknown_task_id; }

Expand All @@ -136,4 +140,4 @@ class hash<DB::MPPTaskId>
return DB::MPPGatherIdHash()(id.gather_id) ^ hash<Int64>()(id.task_id);
}
};
} // namespace std
} // namespace std
106 changes: 99 additions & 7 deletions dbms/src/Flash/Mpp/MPPTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Mpp/MPPTask.h>
#include <Flash/Mpp/MPPTaskManager.h>
#include <Flash/ResourceControl/LocalAdmissionController.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/executeQuery.h>
Expand Down Expand Up @@ -62,8 +63,9 @@ MPPGatherTaskSetPtr MPPQuery::addMPPGatherTaskSet(const MPPGatherId & gather_id)
return ptr;
}

MPPTaskManager::MPPTaskManager(MPPTaskSchedulerPtr scheduler_)
MPPTaskManager::MPPTaskManager(MPPTaskSchedulerPtr scheduler_, UInt64 resource_control_mpp_task_hard_limit_)
: scheduler(std::move(scheduler_))
, resource_control_mpp_task_hard_limit(resource_control_mpp_task_hard_limit_)
, aborted_query_gather_cache(ABORTED_MPPGATHER_CACHE_SIZE)
, log(Logger::get())
, monitor(std::make_shared<MPPTaskMonitor>(log))
Expand Down Expand Up @@ -101,8 +103,9 @@ void MPPTaskManager::removeMPPGatherTaskSet(MPPQueryPtr & query, const MPPGather
query->mpp_gathers.erase(gather_id);
if (query->mpp_gathers.empty())
{
scheduler->deleteQuery(gather_id.query_id, *this, on_abort, -1);
mpp_query_map.erase(gather_id.query_id);
const auto & query_id = gather_id.query_id;
getScheduler(query_id)->deleteQuery(query_id, *this, on_abort, -1);
mpp_query_map.erase(query_id);
GET_METRIC(tiflash_mpp_task_manager, type_mpp_query_count).Set(mpp_query_map.size());
}
}
Expand Down Expand Up @@ -246,7 +249,7 @@ void MPPTaskManager::abortMPPGather(const MPPGatherId & gather_id, const String
cv.notify_all();
return;
}
scheduler->deleteQuery(gather_id.query_id, *this, true, gather_id.gather_id);
getScheduler(gather_id.query_id)->deleteQuery(gather_id.query_id, *this, true, gather_id.gather_id);
cv.notify_all();
}

Expand Down Expand Up @@ -425,14 +428,103 @@ std::pair<MPPGatherTaskSetPtr, String> MPPTaskManager::getGatherTaskSet(const MP
}

bool MPPTaskManager::tryToScheduleTask(MPPTaskScheduleEntry & schedule_entry)
{
bool scheduled = false;
const String & resource_group_name = schedule_entry.getResourceGroupName();
if (!resource_group_name.empty())
{
std::lock_guard lock(mu);

// Check global MPPTask hard limit.
size_t mintso_active_set_size = 0;
for (const auto & ele : resource_group_schedulers)
mintso_active_set_size += ele.second->getActiveSetSize();

// This check helps reduce the contention of lock within resource group.
if (mintso_active_set_size >= resource_control_mpp_task_hard_limit)
{
size_t non_throttled_rg_active_set_size = 0;
for (const auto & ele : resource_group_schedulers)
{
// Will ignore the group whose tokens have been exhausted,
// this can prevent tasks throllted by resource control from occupying too much hard limit proportion.
if (LocalAdmissionController::global_instance->isResourceGroupThrottled(ele.first))
continue;

non_throttled_rg_active_set_size += ele.second->getActiveSetSize();
}
if (non_throttled_rg_active_set_size >= resource_control_mpp_task_hard_limit)
throw Exception(fmt::format("too many running mpp tasks(mpptask hard limit: {}, "
"current total mpptask: {}, non throttled resource group mpptasks: {})",
resource_control_mpp_task_hard_limit,
mintso_active_set_size,
non_throttled_rg_active_set_size));
}

// Start MinTSO scheduling.
auto iter = resource_group_schedulers.find(resource_group_name);
MPPTaskSchedulerPtr resource_group_scheduler;
if (iter == resource_group_schedulers.end())
{
// For now, resource group MinTSO use same config as the global MinTSO.
auto [thread_soft_limit, thread_hard_limit, active_set_soft_limit] = scheduler->getLimitConfig();
resource_group_scheduler = std::make_shared<MinTSOScheduler>(thread_soft_limit, thread_hard_limit, active_set_soft_limit);
resource_group_schedulers.insert({resource_group_name, resource_group_scheduler});
}
else
{
resource_group_scheduler = iter->second;
}
scheduled = resource_group_scheduler->tryToSchedule(schedule_entry, *this);
// Should always insert succees, query_id will not be duplicate.
auto insert_res = resource_group_query_ids.insert({schedule_entry.getMPPTaskId().gather_id.query_id, resource_group_name});
assert(insert_res.second);
}
else
{
std::lock_guard lock(mu);
scheduled = scheduler->tryToSchedule(schedule_entry, *this);
}
return scheduled;
}

void MPPTaskManager::tagResourceGroupSchedulerReadyToDelete(const String & name)
{
std::lock_guard lock(mu);
return scheduler->tryToSchedule(schedule_entry, *this);
auto scheduler_iter = resource_group_schedulers.find(name);
if (scheduler_iter == resource_group_schedulers.end())
return;

bool query_id_deleted = false;
for (auto query_id_iter = resource_group_query_ids.begin(); query_id_iter != resource_group_query_ids.end();)
{
if (query_id_iter->second == name)
{
query_id_iter = resource_group_query_ids.erase(query_id_iter);
resource_group_schedulers_ready_to_delete.insert(scheduler_iter->second);
query_id_deleted = true;
break;
}
}
assert(query_id_deleted);
resource_group_schedulers.erase(scheduler_iter);
}

void MPPTaskManager::cleanResourceGroupScheduler()
{
std::lock_guard lock(mu);
for (auto iter = resource_group_schedulers_ready_to_delete.begin(); iter != resource_group_schedulers_ready_to_delete.end();)
{
if ((*iter)->getActiveSetSize() + (*iter)->getWaitingSetSize() == 0)
iter = resource_group_schedulers_ready_to_delete.erase(iter);
else
++iter;
}
}

void MPPTaskManager::releaseThreadsFromScheduler(const int needed_threads)
void MPPTaskManager::releaseThreadsFromScheduler(const String & resource_group_name, int needed_threads)
{
std::lock_guard lock(mu);
scheduler->releaseThreadsThenSchedule(needed_threads, *this);
getScheduler(resource_group_name)->releaseThreadsThenSchedule(needed_threads, *this);
}
} // namespace DB
40 changes: 38 additions & 2 deletions dbms/src/Flash/Mpp/MPPTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,19 @@ class MPPTaskManager : private boost::noncopyable
{
MPPTaskSchedulerPtr scheduler;

// key: resource_group_name, value: MPPTaskSchedulerPtr
// If resource control is enabled, will use scheduler in resource_group_schedulers.
// Otherwise use the global scheduler.
std::unordered_map<String, MPPTaskSchedulerPtr> resource_group_schedulers;
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved

// key: MPPQueryID, value: MPPTaskSchedulerPtr
// This is to find corresponding scheduler by query_id so we can release query info of MinTSO.
std::unordered_map<MPPQueryId, String, MPPQueryIdHash> resource_group_query_ids;

std::unordered_set<MPPTaskSchedulerPtr> resource_group_schedulers_ready_to_delete;

UInt64 resource_control_mpp_task_hard_limit;

std::mutex mu;

MPPQueryMap mpp_query_map;
Expand All @@ -207,7 +220,7 @@ class MPPTaskManager : private boost::noncopyable
std::shared_ptr<MPPTaskMonitor> monitor;

public:
explicit MPPTaskManager(MPPTaskSchedulerPtr scheduler);
explicit MPPTaskManager(MPPTaskSchedulerPtr scheduler, UInt64 resource_control_mpp_task_hard_limit);

~MPPTaskManager();

Expand All @@ -231,7 +244,7 @@ class MPPTaskManager : private boost::noncopyable

bool tryToScheduleTask(MPPTaskScheduleEntry & schedule_entry);

void releaseThreadsFromScheduler(int needed_threads);
void releaseThreadsFromScheduler(const String & resource_group_name, int needed_threads);

std::pair<MPPTunnelPtr, String> findTunnelWithTimeout(const ::mpp::EstablishMPPConnectionRequest * request, std::chrono::seconds timeout);

Expand All @@ -241,10 +254,33 @@ class MPPTaskManager : private boost::noncopyable

String toString();

// Tag a resource group scheduler can be delete. It's called in LocalAdmissionController periodically.
// And the scheduler will be really deleted when all mpptasks are done.
void tagResourceGroupSchedulerReadyToDelete(const String & name);

// Really delete resource group scheduler whose running mpp tasks is empty.
void cleanResourceGroupScheduler();

MPPQueryPtr getMPPQueryWithoutLock(const MPPQueryId & query_id);

MPPQueryPtr getMPPQuery(const MPPQueryId & query_id);

MPPTaskSchedulerPtr getScheduler(const MPPQueryId & query_id)
{
auto query_id_iter = resource_group_query_ids.find(query_id);
if (query_id_iter == resource_group_query_ids.end())
return scheduler;
auto scheduler_iter = resource_group_schedulers.find(query_id_iter->second);
return scheduler_iter->second;
}
MPPTaskSchedulerPtr getScheduler(const String & resource_group_name)
{
auto iter = resource_group_schedulers.find(resource_group_name);
if (iter == resource_group_schedulers.end())
return scheduler;
return iter->second;
}

private:
MPPQueryPtr addMPPQuery(const MPPQueryId & query_id, bool has_meaningful_gather_id);
void removeMPPGatherTaskSet(MPPQueryPtr & mpp_query, const MPPGatherId & gather_id, bool on_abort);
Expand Down
Loading