Skip to content

Commit

Permalink
Add support for ThreadPool and different types of work.
Browse files Browse the repository at this point in the history
Resolve #13
  • Loading branch information
brayner1 committed Jul 29, 2023
1 parent e1bcf06 commit cb903f0
Show file tree
Hide file tree
Showing 6 changed files with 465 additions and 2 deletions.
9 changes: 7 additions & 2 deletions BRenderer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ set(BRenderer_SOURCE_FILES
"Core/Window.cpp"
"Core/WindowManager.cpp"
"Core/PerspectiveCamera.cpp"
"Core/Threading/ThreadPool.cpp"

"Geometry/Geometry.cpp"

Expand Down Expand Up @@ -77,7 +78,7 @@ set(BRenderer_SOURCE_FILES
"Renderer/Vulkan/VmaImpl.cpp"

"Files/FilesUtils.cpp"
)
)

set(BRenderer_HEADER_FILES
"BRenderer.h"
Expand All @@ -87,6 +88,10 @@ set(BRenderer_HEADER_FILES
"Core/Window.h"
"Core/WindowManager.h"
"Core/PerspectiveCamera.h"
"Core/Threading/Barrier.h"
"Core/Threading/Threading.h"
"Core/Threading/ThreadPool.h"
"Core/Threading/Work.h"

"Geometry/Geometry.h"

Expand Down Expand Up @@ -115,7 +120,7 @@ set(BRenderer_HEADER_FILES
"Renderer/VkInitializerHelper.h"

"Files/FilesUtils.h"
)
)

source_group(Renderer REGULAR_EXPRESSION "Renderer/.*")
source_group(Core REGULAR_EXPRESSION "Core/.*")
Expand Down
49 changes: 49 additions & 0 deletions BRenderer/Core/Threading/Barrier.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#ifndef BRR_Barrier_h
#define BRR_Barrier_h
#include <mutex>


namespace brr::threading
{
class Barrier
{
public:
explicit Barrier (size_t numThreads) :
originalExpected(numThreads), expectedNum(numThreads), generation(false),
mutex(std::mutex()), cv(std::condition_variable())
{
}

Barrier(const Barrier& ) = delete;
Barrier& operator=(const Barrier&) = delete;

void Wait()
{
std::unique_lock lock(mutex);
size_t lastGen = generation;
if (--expectedNum == 0)
{
// Reset the barrier after this generation completed.
generation = !generation;
expectedNum = originalExpected;
// Notify the rest of the threads
cv.notify_all();
}
else
{
// Wait inside the barrier until this generation is complete.
cv.wait(lock, [&]() { return lastGen != generation; });
}
}

private:
std::mutex mutex;
std::condition_variable cv;
size_t originalExpected;
size_t expectedNum;
bool generation;
};

}

#endif
110 changes: 110 additions & 0 deletions BRenderer/Core/Threading/ThreadPool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#include "pch.h"
#include "ThreadPool.h"
#include "Work.h"

namespace brr::threading
{
ThreadPool::ThreadPool() : ThreadPool(std::thread::hardware_concurrency() - 1)
{

}

ThreadPool::ThreadPool(size_t num_threads) : m_runningThreads(0), m_workerThreads(), m_workQueue()
{
std::cout << "Creating ThreadPool with " << num_threads << "threads\n";
// Pre-allocate the pool
m_workerThreads.reserve(num_threads);

for (int i = 0; i < num_threads; ++i)
{
// Create worker threads
m_workerThreads.emplace_back(&ThreadPool::WorkerThreadFunc, this, i + 1);
}

m_initialized = true;
}

ThreadPool::~ThreadPool()
{
// Indicate that worker threads should stop
m_stopWorkerThreads = true;
// Notify sleeping worker threads
m_workReadyCondition.notify_all();
// Join all the threads with the main thread.
for (std::thread& thread : m_workerThreads)
{
if (thread.joinable())
thread.join();
}
}

void ThreadPool::QueueCustomWork(std::shared_ptr<Work> work)
{
std::lock_guard queueLock (m_workQueueMutex);
m_workQueue.emplace_back(std::move(work));

m_workReadyCondition.notify_all();
}

void ThreadPool::QueueCustomWorkAndHelp(std::shared_ptr<Work> work)
{
if (!work->MultiThreadedWork())
{
work->Execute();
return;
}

{
std::lock_guard queueLock (m_workQueueMutex);

m_workQueue.emplace_back(std::move(work));

m_workReadyCondition.notify_all();
}

// TODO: If there are other works in the queue, the main thread may end up running all the work. Is this the best approach?
while (!work->Finished())
{
work->Execute();
}

work->m_finishedCondition.notify_all();
}

void ThreadPool::WorkerThreadFunc(size_t thread_idx)
{
while (!m_stopWorkerThreads)
{
std::unique_lock lock(m_workQueueMutex);
if (m_workQueue.empty())
{
m_workReadyCondition.wait(lock/*, [&]{ return !m_workQueue.empty(); }*/);
}
else
{
const std::shared_ptr<Work> work = m_workQueue.front();

bool isRemoved = false;
// Remove the Work from the queue if the Work will finish after the next 'Execute' call.
// It is not removed from the queue if the Work will require more processing after that.
if (work->WillFinishOnNextExecute())
{
isRemoved = true;
m_workQueue.pop_front();
}

lock.unlock();

work->Execute();

lock.lock();

if (work->Finished())
{
work->m_finishedCondition.notify_all();
m_workReadyCondition.notify_all();
}
}
}
}
}
62 changes: 62 additions & 0 deletions BRenderer/Core/Threading/ThreadPool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#ifndef BRR_ThreadPool_h
#define BRR_ThreadPool_h
#include <vector>
#include <list>
#include <thread>
#include <mutex>
#include <atomic>

namespace brr::threading
{
class Work;

class ThreadPool
{
public:

ThreadPool();
ThreadPool(size_t num_threads);

ThreadPool(ThreadPool&) = delete;
ThreadPool(ThreadPool&&) = delete;

ThreadPool& operator=(ThreadPool&) = delete;
ThreadPool& operator=(ThreadPool&&) = delete;

~ThreadPool();

//
/**
* \brief Queue a work in the work queue, where a thread from the thread pool will execute the work.
* \param work A shared_ptr of the work to be executed.
*/
void QueueCustomWork(std::shared_ptr<Work> work);

/**
* \brief Queue a work in the work queue, and uses the main thread to help the pool with the work execution. The function only returns when the work is finished.
* If the Work is not MultiThreaded (i.e., can be divided into multiple chunks of work), then the main thread will execute it and return.
* \param work A shared_ptr of the work to be executed.
*/
void QueueCustomWorkAndHelp(std::shared_ptr<Work> work);

[[nodiscard]] constexpr size_t AvailableWorkers() const { return (m_workerThreads.size() - m_runningThreads); }

private:

void WorkerThreadFunc(size_t thread_idx);

std::list<std::shared_ptr<Work>> m_workQueue{};
std::vector<std::thread> m_workerThreads{};

size_t m_runningThreads;

std::mutex m_workQueueMutex;
std::condition_variable m_workReadyCondition;

std::atomic<bool> m_stopWorkerThreads;
bool m_initialized;
};

}

#endif
8 changes: 8 additions & 0 deletions BRenderer/Core/Threading/Threading.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#ifndef BRR_Threading_h
#define BRR_Threading_h

#include "Core/Threading/Barrier.h"
#include "Core/Threading/Work.h"
#include "Core/Threading/ThreadPool.h"

#endif
Loading

0 comments on commit cb903f0

Please sign in to comment.