Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ThreadManager: Add simple priority queues #16812

Merged
merged 5 commits into from
Feb 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Common/GPU/Vulkan/VulkanRenderManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,10 @@ class CreateMultiPipelinesTask : public Task {
return TaskType::CPU_COMPUTE;
}

TaskPriority Priority() const override {
return TaskPriority::HIGH;
}

void Run() override {
for (auto &task : tasks_) {
task.pipeline->Create(vulkan_, task.compatibleRenderPass, task.rpType, task.sampleCount, task.scheduleTime, task.countToCompile);
Expand Down
27 changes: 16 additions & 11 deletions Common/Thread/ParallelLoop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@

class LoopRangeTask : public Task {
public:
LoopRangeTask(WaitableCounter *counter, const std::function<void(int, int)> &loop, int lower, int upper)
: counter_(counter), loop_(loop), lower_(lower), upper_(upper) {}
LoopRangeTask(WaitableCounter *counter, const std::function<void(int, int)> &loop, int lower, int upper, TaskPriority p)
: counter_(counter), loop_(loop), lower_(lower), upper_(upper), priority_(p) {}

TaskType Type() const override {
return TaskType::CPU_COMPUTE;
}

TaskPriority Priority() const override {
return priority_;
}

void Run() override {
loop_(lower_, upper_);
counter_->Count();
Expand All @@ -23,9 +27,10 @@ class LoopRangeTask : public Task {

int lower_;
int upper_;
const TaskPriority priority_;
};

WaitableCounter *ParallelRangeLoopWaitable(ThreadManager *threadMan, const std::function<void(int, int)> &loop, int lower, int upper, int minSize) {
WaitableCounter *ParallelRangeLoopWaitable(ThreadManager *threadMan, const std::function<void(int, int)> &loop, int lower, int upper, int minSize, TaskPriority priority) {
if (minSize == -1) {
minSize = 1;
}
Expand All @@ -38,7 +43,7 @@ WaitableCounter *ParallelRangeLoopWaitable(ThreadManager *threadMan, const std::
} else if (range <= minSize) {
// Single background task.
WaitableCounter *waitableCounter = new WaitableCounter(1);
threadMan->EnqueueTaskOnThread(0, new LoopRangeTask(waitableCounter, loop, lower, upper));
threadMan->EnqueueTaskOnThread(0, new LoopRangeTask(waitableCounter, loop, lower, upper, priority));
return waitableCounter;
} else {
// Split the range between threads. Allow for some fractional bits.
Expand All @@ -65,7 +70,7 @@ WaitableCounter *ParallelRangeLoopWaitable(ThreadManager *threadMan, const std::
// Let's do the stragglers on the current thread.
break;
}
threadMan->EnqueueTaskOnThread(i, new LoopRangeTask(waitableCounter, loop, start, end));
threadMan->EnqueueTaskOnThread(i, new LoopRangeTask(waitableCounter, loop, start, end, priority));
counter += delta;
if ((counter >> fractionalBits) >= upper) {
break;
Expand All @@ -83,7 +88,7 @@ WaitableCounter *ParallelRangeLoopWaitable(ThreadManager *threadMan, const std::
}
}

void ParallelRangeLoop(ThreadManager *threadMan, const std::function<void(int, int)> &loop, int lower, int upper, int minSize) {
void ParallelRangeLoop(ThreadManager *threadMan, const std::function<void(int, int)> &loop, int lower, int upper, int minSize, TaskPriority priority) {
if (cpu_info.num_cores == 1 || (minSize >= (upper - lower) && upper > lower)) {
// "Optimization" for single-core devices, or minSize larger than the range.
// No point in adding threading overhead, let's just do it inline (since this is the blocking variant).
Expand All @@ -96,7 +101,7 @@ void ParallelRangeLoop(ThreadManager *threadMan, const std::function<void(int, i
minSize = 1;
}

WaitableCounter *counter = ParallelRangeLoopWaitable(threadMan, loop, lower, upper, minSize);
WaitableCounter *counter = ParallelRangeLoopWaitable(threadMan, loop, lower, upper, minSize, priority);
// TODO: Optimize using minSize. We'll just compute whether there's a remainer, remove it from the call to ParallelRangeLoopWaitable,
// and process the remainder right here. If there's no remainer, we'll steal a whole chunk.
if (counter) {
Expand All @@ -105,7 +110,7 @@ void ParallelRangeLoop(ThreadManager *threadMan, const std::function<void(int, i
}

// NOTE: Supports a max of 2GB.
void ParallelMemcpy(ThreadManager *threadMan, void *dst, const void *src, size_t bytes) {
void ParallelMemcpy(ThreadManager *threadMan, void *dst, const void *src, size_t bytes, TaskPriority priority) {
// This threshold can probably be a lot bigger.
if (bytes < 512) {
memcpy(dst, src, bytes);
Expand All @@ -118,11 +123,11 @@ void ParallelMemcpy(ThreadManager *threadMan, void *dst, const void *src, size_t
const char *s = (const char *)src;
ParallelRangeLoop(threadMan, [&](int l, int h) {
memmove(d + l, s + l, h - l);
}, 0, (int)bytes, 128 * 1024);
}, 0, (int)bytes, 128 * 1024, priority);
}

// NOTE: Supports a max of 2GB.
void ParallelMemset(ThreadManager *threadMan, void *dst, uint8_t value, size_t bytes) {
void ParallelMemset(ThreadManager *threadMan, void *dst, uint8_t value, size_t bytes, TaskPriority priority) {
// This threshold can probably be a lot bigger.
if (bytes < 512) {
memset(dst, 0, bytes);
Expand All @@ -134,5 +139,5 @@ void ParallelMemset(ThreadManager *threadMan, void *dst, uint8_t value, size_t b
char *d = (char *)dst;
ParallelRangeLoop(threadMan, [&](int l, int h) {
memset(d + l, value, h - l);
}, 0, (int)bytes, 128 * 1024);
}, 0, (int)bytes, 128 * 1024, priority);
}
8 changes: 4 additions & 4 deletions Common/Thread/ParallelLoop.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ struct WaitableCounter : public Waitable {
};

// Note that upper bounds are non-inclusive: range is [lower, upper)
WaitableCounter *ParallelRangeLoopWaitable(ThreadManager *threadMan, const std::function<void(int, int)> &loop, int lower, int upper, int minSize);
WaitableCounter *ParallelRangeLoopWaitable(ThreadManager *threadMan, const std::function<void(int, int)> &loop, int lower, int upper, int minSize, TaskPriority priority);

// Note that upper bounds are non-inclusive: range is [lower, upper)
void ParallelRangeLoop(ThreadManager *threadMan, const std::function<void(int, int)> &loop, int lower, int upper, int minSize);
void ParallelRangeLoop(ThreadManager *threadMan, const std::function<void(int, int)> &loop, int lower, int upper, int minSize, TaskPriority priority = TaskPriority::NORMAL);

// Common utilities for large (!) memory copies.
// Will only fall back to threads if it seems to make sense.
// NOTE: These support a max of 2GB.
void ParallelMemcpy(ThreadManager *threadMan, void *dst, const void *src, size_t bytes);
void ParallelMemset(ThreadManager *threadMan, void *dst, uint8_t value, size_t bytes);
void ParallelMemcpy(ThreadManager *threadMan, void *dst, const void *src, size_t bytes, TaskPriority priority = TaskPriority::NORMAL);
void ParallelMemset(ThreadManager *threadMan, void *dst, uint8_t value, size_t bytes, TaskPriority priority = TaskPriority::NORMAL);
18 changes: 12 additions & 6 deletions Common/Thread/Promise.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
template<class T>
class PromiseTask : public Task {
public:
PromiseTask(std::function<T ()> fun, Mailbox<T> *tx, TaskType t) : fun_(fun), tx_(tx), type_(t) {
PromiseTask(std::function<T ()> fun, Mailbox<T> *tx, TaskType t, TaskPriority p)
: fun_(fun), tx_(tx), type_(t), priority_(p) {
tx_->AddRef();
}
~PromiseTask() {
Expand All @@ -21,14 +22,19 @@ class PromiseTask : public Task {
return type_;
}

TaskPriority Priority() const override {
return priority_;
}

void Run() override {
T value = fun_();
tx_->Send(value);
}

std::function<T ()> fun_;
Mailbox<T> *tx_;
TaskType type_;
const TaskType type_;
const TaskPriority priority_;
};

// Represents pending or actual data.
Expand All @@ -39,13 +45,13 @@ class PromiseTask : public Task {
template<class T>
class Promise {
public:
static Promise<T> *Spawn(ThreadManager *threadman, std::function<T()> fun, TaskType taskType) {
static Promise<T> *Spawn(ThreadManager *threadman, std::function<T()> fun, TaskType taskType, TaskPriority taskPriority = TaskPriority::NORMAL) {
Mailbox<T> *mailbox = new Mailbox<T>();

Promise<T> *promise = new Promise<T>();
promise->rx_ = mailbox;

PromiseTask<T> *task = new PromiseTask<T>(fun, mailbox, taskType);
PromiseTask<T> *task = new PromiseTask<T>(fun, mailbox, taskType, taskPriority);
threadman->EnqueueTask(task);
return promise;
}
Expand All @@ -65,8 +71,8 @@ class Promise {
}

// Allow an empty promise to spawn, too, in case we want to delay it.
void SpawnEmpty(ThreadManager *threadman, std::function<T()> fun, TaskType taskType) {
PromiseTask<T> *task = new PromiseTask<T>(fun, rx_, taskType);
void SpawnEmpty(ThreadManager *threadman, std::function<T()> fun, TaskType taskType, TaskPriority taskPriority = TaskPriority::NORMAL) {
PromiseTask<T> *task = new PromiseTask<T>(fun, rx_, taskType, taskPriority);
threadman->EnqueueTask(task);
}

Expand Down
88 changes: 53 additions & 35 deletions Common/Thread/ThreadManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@

const int MAX_CORES_TO_USE = 16;
const int MIN_IO_BLOCKING_THREADS = 4;
static constexpr size_t TASK_PRIORITY_COUNT = (size_t)TaskPriority::COUNT;

struct GlobalThreadContext {
std::mutex mutex; // associated with each respective condition variable
std::deque<Task *> compute_queue;
std::deque<Task *> compute_queue[TASK_PRIORITY_COUNT];
std::atomic<int> compute_queue_size;
std::deque<Task *> io_queue;
std::deque<Task *> io_queue[TASK_PRIORITY_COUNT];
std::atomic<int> io_queue_size;
std::vector<ThreadContext *> threads_;

Expand All @@ -42,7 +43,7 @@ struct ThreadContext {
int index;
TaskType type;
std::atomic<bool> cancelled;
std::deque<Task *> private_queue;
std::deque<Task *> private_queue[TASK_PRIORITY_COUNT];
char name[16];
};

Expand All @@ -65,12 +66,14 @@ void ThreadManager::Teardown() {

// Purge any cancellable tasks while the threads shut down.
if (global_->compute_queue_size > 0 || global_->io_queue_size > 0) {
auto drainQueue = [&](std::deque<Task *> &queue, std::atomic<int> &size) {
for (auto it = queue.begin(); it != queue.end(); ++it) {
if (TeardownTask(*it, false)) {
queue.erase(it);
size--;
return false;
auto drainQueue = [&](std::deque<Task *> queue[TASK_PRIORITY_COUNT], std::atomic<int> &size) {
for (size_t i = 0; i < TASK_PRIORITY_COUNT; ++i) {
for (auto it = queue[i].begin(); it != queue[i].end(); ++it) {
if (TeardownTask(*it, false)) {
queue[i].erase(it);
size--;
return false;
}
}
}
return true;
Expand All @@ -86,8 +89,10 @@ void ThreadManager::Teardown() {
for (ThreadContext *&threadCtx : global_->threads_) {
threadCtx->thread.join();
// TODO: Is it better to just delete these?
for (Task *task : threadCtx->private_queue) {
TeardownTask(task, true);
for (size_t i = 0; i < TASK_PRIORITY_COUNT; ++i) {
for (Task *task : threadCtx->private_queue[i]) {
TeardownTask(task, true);
}
}
delete threadCtx;
}
Expand All @@ -109,11 +114,12 @@ bool ThreadManager::TeardownTask(Task *task, bool enqueue) {
}

if (enqueue) {
size_t queueIndex = (size_t)task->Priority();
if (task->Type() == TaskType::CPU_COMPUTE) {
global_->compute_queue.push_back(task);
global_->compute_queue[queueIndex].push_back(task);
global_->compute_queue_size++;
} else if (task->Type() == TaskType::IO_BLOCKING) {
global_->io_queue.push_back(task);
global_->io_queue[queueIndex].push_back(task);
global_->io_queue_size++;
} else {
_assert_(false);
Expand Down Expand Up @@ -147,33 +153,43 @@ static void WorkerThreadFunc(GlobalThreadContext *global, ThreadContext *thread)
if (global_queue_size() > 0) {
// Grab one from the global queue if there is any.
std::unique_lock<std::mutex> lock(global->mutex);
auto &queue = isCompute ? global->compute_queue : global->io_queue;
auto queue = isCompute ? global->compute_queue : global->io_queue;
auto &queue_size = isCompute ? global->compute_queue_size : global->io_queue_size;

if (!queue.empty()) {
task = queue.front();
queue.pop_front();
queue_size--;

// We are processing one now, so mark that.
thread->queue_size++;
for (size_t p = 0; p < TASK_PRIORITY_COUNT; ++p) {
if (!queue[p].empty()) {
task = queue[p].front();
queue[p].pop_front();
queue_size--;

// We are processing one now, so mark that.
thread->queue_size++;
} else if (thread->queue_size != 0) {
// Check the thread, as we prefer a HIGH thread task to a global NORMAL task.
std::unique_lock<std::mutex> lock(thread->mutex);
if (!thread->private_queue[p].empty()) {
task = thread->private_queue[p].front();
thread->private_queue[p].pop_front();
}
}
}
}

if (!task) {
// We didn't have any global, do we have anything on the thread?
std::unique_lock<std::mutex> lock(thread->mutex);
// We must check both queue and single again, while locked.
bool wait = true;
if (!thread->private_queue.empty()) {
task = thread->private_queue.front();
thread->private_queue.pop_front();
wait = false;
} else if (thread->cancelled) {
wait = false;
} else {
wait = global_queue_size() == 0;
for (size_t p = 0; p < TASK_PRIORITY_COUNT; ++p) {
if (thread->private_queue[p].empty())
continue;

task = thread->private_queue[p].front();
thread->private_queue[p].pop_front();
break;
}

// We must check both queue and single again, while locked.
bool wait = !thread->cancelled && !task && global_queue_size() == 0;

if (wait)
thread->cond.wait(lock);
}
Expand Down Expand Up @@ -229,6 +245,7 @@ void ThreadManager::EnqueueTask(Task *task) {

_assert_msg_(IsInitialized(), "ThreadManager not initialized");

size_t queueIndex = (size_t)task->Priority();
int minThread;
int maxThread;
if (task->Type() == TaskType::CPU_COMPUTE) {
Expand All @@ -247,7 +264,7 @@ void ThreadManager::EnqueueTask(Task *task) {
ThreadContext *thread = global_->threads_[threadNum];
if (thread->queue_size.load() == 0) {
std::unique_lock<std::mutex> lock(thread->mutex);
thread->private_queue.push_back(task);
thread->private_queue[queueIndex].push_back(task);
thread->queue_size++;
thread->cond.notify_one();
// Found it - done.
Expand All @@ -260,10 +277,10 @@ void ThreadManager::EnqueueTask(Task *task) {
{
std::unique_lock<std::mutex> lock(global_->mutex);
if (task->Type() == TaskType::CPU_COMPUTE) {
global_->compute_queue.push_back(task);
global_->compute_queue[queueIndex].push_back(task);
global_->compute_queue_size++;
} else if (task->Type() == TaskType::IO_BLOCKING) {
global_->io_queue.push_back(task);
global_->io_queue[queueIndex].push_back(task);
global_->io_queue_size++;
} else {
_assert_(false);
Expand All @@ -284,11 +301,12 @@ void ThreadManager::EnqueueTaskOnThread(int threadNum, Task *task) {

_assert_msg_(threadNum >= 0 && threadNum < (int)global_->threads_.size(), "Bad threadnum or not initialized");
ThreadContext *thread = global_->threads_[threadNum];
size_t queueIndex = (size_t)task->Priority();

thread->queue_size++;

std::unique_lock<std::mutex> lock(thread->mutex);
thread->private_queue.push_back(task);
thread->private_queue[queueIndex].push_back(task);
thread->cond.notify_one();
}

Expand Down
9 changes: 9 additions & 0 deletions Common/Thread/ThreadManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,20 @@ enum class TaskType {
DEDICATED_THREAD, // These can never get stuck in queue behind others, but are more expensive to launch. Cannot use I/O.
};

enum class TaskPriority {
HIGH = 0,
NORMAL = 1,
LOW = 2,

COUNT,
};

// Implement this to make something that you can run on the thread manager.
class Task {
public:
virtual ~Task() {}
virtual TaskType Type() const = 0;
virtual TaskPriority Priority() const = 0;
virtual void Run() = 0;
virtual bool Cancellable() { return false; }
virtual void Cancel() {}
Expand Down
Loading