From 67fad04348b91cf93bdfad7495d298f54825602c Mon Sep 17 00:00:00 2001 From: Barak Shoshany Date: Wed, 3 Aug 2022 14:23:38 -0400 Subject: [PATCH] Updated to v3.3.0 --- BS_thread_pool.hpp | 116 ++++-- BS_thread_pool_light.hpp | 325 +++++++++++++++++ BS_thread_pool_light_test.cpp | 657 ++++++++++++++++++++++++++++++++++ BS_thread_pool_test.cpp | 28 +- CHANGELOG.md | 20 ++ README.md | 343 +++++++++++------- 6 files changed, 1323 insertions(+), 166 deletions(-) create mode 100644 BS_thread_pool_light.hpp create mode 100644 BS_thread_pool_light_test.cpp diff --git a/BS_thread_pool.hpp b/BS_thread_pool.hpp index af407d4..840bcdc 100644 --- a/BS_thread_pool.hpp +++ b/BS_thread_pool.hpp @@ -3,14 +3,14 @@ /** * @file BS_thread_pool.hpp * @author Barak Shoshany (baraksh@gmail.com) (http://baraksh.com) - * @version 3.2.0 - * @date 2022-07-28 + * @version 3.3.0 + * @date 2022-08-03 * @copyright Copyright (c) 2022 Barak Shoshany. Licensed under the MIT license. If you found this project useful, please consider starring it on GitHub! If you use this library in software of any kind, please provide a link to the GitHub repository https://github.com/bshoshany/thread-pool in the source code and documentation. If you use this library in published research, please cite it as follows: Barak Shoshany, "A C++17 Thread Pool for High-Performance Scientific Computing", doi:10.5281/zenodo.4742687, arXiv:2105.00613 (May 2021) * * @brief BS::thread_pool: a fast, lightweight, and easy-to-use C++17 thread pool library. This header file contains the entire library, including the main BS::thread_pool class and the helper classes BS::multi_future, BS::blocks, BS:synced_stream, and BS::timer. */ -#define BS_THREAD_POOL_VERSION "v3.2.0 (2022-07-28)" +#define BS_THREAD_POOL_VERSION "v3.3.0 (2022-08-03)" #include // std::atomic #include // std::chrono @@ -34,12 +34,6 @@ namespace BS */ using concurrency_t = std::invoke_result_t; -/** - * @brief Explicit casts of the flushing stream manipulators, to enable using them with synced_stream, e.g. sync_out.print(BS::flush). - */ -std::ostream& (&endl)(std::ostream&) = static_cast(std::endl); -std::ostream& (&flush)(std::ostream&) = static_cast(std::flush); - // ============================================================================================= // // Begin class multi_future // @@ -57,7 +51,7 @@ class [[nodiscard]] multi_future * * @param num_futures_ The desired number of futures to store. */ - multi_future(const size_t num_futures_ = 0) : f(num_futures_) {} + multi_future(const size_t num_futures_ = 0) : futures(num_futures_) {} /** * @brief Get the results from all the futures stored in this multi_future object, rethrowing any stored exceptions. @@ -68,32 +62,64 @@ class [[nodiscard]] multi_future { if constexpr (std::is_void_v) { - for (size_t i = 0; i < f.size(); ++i) - f[i].get(); + for (size_t i = 0; i < futures.size(); ++i) + futures[i].get(); return; } else { - std::vector results(f.size()); - for (size_t i = 0; i < f.size(); ++i) - results[i] = f[i].get(); + std::vector results(futures.size()); + for (size_t i = 0; i < futures.size(); ++i) + results[i] = futures[i].get(); return results; } } + /** + * @brief Get a reference to one of the futures stored in this multi_future object. + * + * @param i The index of the desired future. + * @return The future. + */ + [[nodiscard]] std::future& operator[](const size_t i) + { + return futures[i]; + } + + /** + * @brief Append a future to this multi_future object. + * + * @param future The future to append. + */ + void push_back(std::future future) + { + futures.push_back(std::move(future)); + } + + /** + * @brief Get the number of futures stored in this multi_future object. + * + * @return The number of futures. + */ + [[nodiscard]] size_t size() const + { + return futures.size(); + } + /** * @brief Wait for all the futures stored in this multi_future object. */ void wait() const { - for (size_t i = 0; i < f.size(); ++i) - f[i].wait(); + for (size_t i = 0; i < futures.size(); ++i) + futures[i].wait(); } +private: /** * @brief A vector to store the futures. */ - std::vector> f; + std::vector> futures; }; // End class multi_future // @@ -229,7 +255,7 @@ class [[nodiscard]] thread_pool } /** - * @brief Destruct the thread pool. Waits for all tasks to complete, then destroys all threads. Note that if the variable paused is set to true, then any tasks still in the queue will never be executed. + * @brief Destruct the thread pool. Waits for all tasks to complete, then destroys all threads. Note that if the pool is paused, then any tasks still in the queue will never be executed. */ ~thread_pool() { @@ -283,6 +309,16 @@ class [[nodiscard]] thread_pool return thread_count; } + /** + * @brief Check whether the pool is currently paused. + * + * @return true if the pool is paused, false if it is not paused. + */ + [[nodiscard]] bool is_paused() const + { + return paused; + } + /** * @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. Returns a multi_future object that contains the futures for all of the blocks. * @@ -305,7 +341,7 @@ class [[nodiscard]] thread_pool { multi_future mf(blks.get_num_blocks()); for (size_t i = 0; i < blks.get_num_blocks(); ++i) - mf.f[i] = submit(std::forward(loop), blks.start(i), blks.end(i)); + mf[i] = submit(std::forward(loop), blks.start(i), blks.end(i)); return mf; } else @@ -331,6 +367,14 @@ class [[nodiscard]] thread_pool return parallelize_loop(0, index_after_last, std::forward(loop), num_blocks); } + /** + * @brief Pause the pool. The workers will temporarily stop retrieving new tasks out of the queue, although any tasks already executed will keep running until they are finished. + */ + void pause() + { + paused = true; + } + /** * @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. Does not return a multi_future, so the user must use wait_for_tasks() or some other method to ensure that the loop finishes executing, otherwise bad things will happen. * @@ -450,6 +494,14 @@ class [[nodiscard]] thread_pool return task_promise->get_future(); } + /** + * @brief Unpause the pool. The workers will resume retrieving new tasks out of the queue. + */ + void unpause() + { + paused = false; + } + /** * @brief Wait for tasks to be completed. Normally, this function waits for all tasks, both those that are currently running in the threads and those that are still waiting in the queue. However, if the pool is paused, this function only waits for the currently running tasks (otherwise it would wait forever). Note: To wait for just one specific task, use submit() instead, and call the wait() member function of the generated future. */ @@ -461,15 +513,6 @@ class [[nodiscard]] thread_pool waiting = false; } - // =========== - // Public data - // =========== - - /** - * @brief An atomic variable indicating whether the workers should pause. When set to true, the workers temporarily stop retrieving new tasks out of the queue, although any tasks already executed will keep running until they are finished. Set to false again to resume retrieving tasks. - */ - std::atomic paused = false; - private: // ======================== // Private member functions @@ -547,6 +590,11 @@ class [[nodiscard]] thread_pool // Private data // ============ + /** + * @brief An atomic variable indicating whether the workers should pause. When set to true, the workers temporarily stop retrieving new tasks out of the queue, although any tasks already executed will keep running until they are finished. When set to false again, the workers resume retrieving tasks. + */ + std::atomic paused = false; + /** * @brief An atomic variable indicating to the workers to keep running. When set to false, the workers permanently stop working. */ @@ -637,6 +685,16 @@ class [[nodiscard]] synced_stream print(std::forward(items)..., '\n'); } + /** + * @brief A stream manipulator to pass to a synced_stream (an explicit cast of std::endl). Prints a newline character to the stream, and then flushes it. Should only be used if flushing is desired, otherwise '\n' should be used instead. + */ + inline static std::ostream& (&endl)(std::ostream&) = static_cast(std::endl); + + /** + * @brief A stream manipulator to pass to a synced_stream (an explicit cast of std::flush). Used to flush the stream. + */ + inline static std::ostream& (&flush)(std::ostream&) = static_cast(std::flush); + private: /** * @brief The output stream to print to. diff --git a/BS_thread_pool_light.hpp b/BS_thread_pool_light.hpp new file mode 100644 index 0000000..b432b9f --- /dev/null +++ b/BS_thread_pool_light.hpp @@ -0,0 +1,325 @@ +#pragma once + +/** + * @file BS_thread_pool_light.hpp + * @author Barak Shoshany (baraksh@gmail.com) (http://baraksh.com) + * @version 3.3.0 + * @date 2022-08-03 + * @copyright Copyright (c) 2022 Barak Shoshany. Licensed under the MIT license. If you found this project useful, please consider starring it on GitHub! If you use this library in software of any kind, please provide a link to the GitHub repository https://github.com/bshoshany/thread-pool in the source code and documentation. If you use this library in published research, please cite it as follows: Barak Shoshany, "A C++17 Thread Pool for High-Performance Scientific Computing", doi:10.5281/zenodo.4742687, arXiv:2105.00613 (May 2021) + * + * @brief BS::thread_pool_light: a fast, lightweight, and easy-to-use C++17 thread pool library. This header file contains a light version of the main library, for use when advanced features are not needed. + */ + +#define BS_THREAD_POOL_VERSION "v3.3.0 (2022-08-03) [light]" + +#include // std::atomic +#include // std::condition_variable +#include // std::current_exception +#include // std::bind, std::function, std::invoke +#include // std::future, std::promise +#include // std::make_shared, std::make_unique, std::shared_ptr, std::unique_ptr +#include // std::mutex, std::scoped_lock, std::unique_lock +#include // std::queue +#include // std::thread +#include // std::common_type_t, std::decay_t, std::invoke_result_t, std::is_void_v +#include // std::forward, std::move, std::swap + +namespace BS +{ +/** + * @brief A convenient shorthand for the type of std::thread::hardware_concurrency(). Should evaluate to unsigned int. + */ +using concurrency_t = std::invoke_result_t; + +/** + * @brief A fast, lightweight, and easy-to-use C++17 thread pool class. This is a lighter version of the main thread pool class. + */ +class [[nodiscard]] thread_pool_light +{ +public: + // ============================ + // Constructors and destructors + // ============================ + + /** + * @brief Construct a new thread pool. + * + * @param thread_count_ The number of threads to use. The default value is the total number of hardware threads available, as reported by the implementation. This is usually determined by the number of cores in the CPU. If a core is hyperthreaded, it will count as two threads. + */ + thread_pool_light(const concurrency_t thread_count_ = 0) : thread_count(determine_thread_count(thread_count_)), threads(std::make_unique(determine_thread_count(thread_count_))) + { + create_threads(); + } + + /** + * @brief Destruct the thread pool. Waits for all tasks to complete, then destroys all threads. + */ + ~thread_pool_light() + { + wait_for_tasks(); + destroy_threads(); + } + + // ======================= + // Public member functions + // ======================= + + /** + * @brief Get the number of threads in the pool. + * + * @return The number of threads. + */ + [[nodiscard]] concurrency_t get_thread_count() const + { + return thread_count; + } + + /** + * @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. The user must use wait_for_tasks() or some other method to ensure that the loop finishes executing, otherwise bad things will happen. + * + * @tparam F The type of the function to loop through. + * @tparam T1 The type of the first index in the loop. Should be a signed or unsigned integer. + * @tparam T2 The type of the index after the last index in the loop. Should be a signed or unsigned integer. If T1 is not the same as T2, a common type will be automatically inferred. + * @tparam T The common type of T1 and T2. + * @param first_index The first index in the loop. + * @param index_after_last The index after the last index in the loop. The loop will iterate from first_index to (index_after_last - 1) inclusive. In other words, it will be equivalent to "for (T i = first_index; i < index_after_last; ++i)". Note that if index_after_last == first_index, no blocks will be submitted. + * @param loop The function to loop through. Will be called once per block. Should take exactly two arguments: the first index in the block and the index after the last index in the block. loop(start, end) should typically involve a loop of the form "for (T i = start; i < end; ++i)". + * @param num_blocks The maximum number of blocks to split the loop into. The default is to use the number of threads in the pool. + */ + template > + void push_loop(T1 first_index_, T2 index_after_last_, F&& loop, size_t num_blocks = 0) + { + T first_index = static_cast(first_index_); + T index_after_last = static_cast(index_after_last_); + if (num_blocks == 0) + num_blocks = thread_count; + if (index_after_last < first_index) + std::swap(index_after_last, first_index); + size_t total_size = static_cast(index_after_last - first_index); + size_t block_size = static_cast(total_size / num_blocks); + if (block_size == 0) + { + block_size = 1; + num_blocks = (total_size > 1) ? total_size : 1; + } + if (total_size > 0) + { + for (size_t i = 0; i < num_blocks; ++i) + push_task(std::forward(loop), static_cast(i * block_size) + first_index, (i == num_blocks - 1) ? index_after_last : (static_cast((i + 1) * block_size) + first_index)); + } + } + + /** + * @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. The user must use wait_for_tasks() or some other method to ensure that the loop finishes executing, otherwise bad things will happen. This overload is used for the special case where the first index is 0. + * + * @tparam F The type of the function to loop through. + * @tparam T The type of the loop indices. Should be a signed or unsigned integer. + * @param index_after_last The index after the last index in the loop. The loop will iterate from 0 to (index_after_last - 1) inclusive. In other words, it will be equivalent to "for (T i = 0; i < index_after_last; ++i)". Note that if index_after_last == 0, no blocks will be submitted. + * @param loop The function to loop through. Will be called once per block. Should take exactly two arguments: the first index in the block and the index after the last index in the block. loop(start, end) should typically involve a loop of the form "for (T i = start; i < end; ++i)". + * @param num_blocks The maximum number of blocks to split the loop into. The default is to use the number of threads in the pool. + */ + template + void push_loop(const T index_after_last, F&& loop, const size_t num_blocks = 0) + { + push_loop(0, index_after_last, std::forward(loop), num_blocks); + } + + /** + * @brief Push a function with zero or more arguments, but no return value, into the task queue. Does not return a future, so the user must use wait_for_tasks() or some other method to ensure that the task finishes executing, otherwise bad things will happen. + * + * @tparam F The type of the function. + * @tparam A The types of the arguments. + * @param task The function to push. + * @param args The zero or more arguments to pass to the function. Note that if the task is a class member function, the first argument must be a pointer to the object, i.e. &object (or this), followed by the actual arguments. + */ + template + void push_task(F&& task, A&&... args) + { + std::function task_function = std::bind(std::forward(task), std::forward(args)...); + { + const std::scoped_lock tasks_lock(tasks_mutex); + tasks.push(task_function); + } + ++tasks_total; + task_available_cv.notify_one(); + } + + /** + * @brief Submit a function with zero or more arguments into the task queue. If the function has a return value, get a future for the eventual returned value. If the function has no return value, get an std::future which can be used to wait until the task finishes. + * + * @tparam F The type of the function. + * @tparam A The types of the zero or more arguments to pass to the function. + * @tparam R The return type of the function (can be void). + * @param task The function to submit. + * @param args The zero or more arguments to pass to the function. Note that if the task is a class member function, the first argument must be a pointer to the object, i.e. &object (or this), followed by the actual arguments. + * @return A future to be used later to wait for the function to finish executing and/or obtain its returned value if it has one. + */ + template , std::decay_t...>> + [[nodiscard]] std::future submit(F&& task, A&&... args) + { + std::function task_function = std::bind(std::forward(task), std::forward(args)...); + std::shared_ptr> task_promise = std::make_shared>(); + push_task( + [task_function, task_promise] + { + try + { + if constexpr (std::is_void_v) + { + std::invoke(task_function); + task_promise->set_value(); + } + else + { + task_promise->set_value(std::invoke(task_function)); + } + } + catch (...) + { + try + { + task_promise->set_exception(std::current_exception()); + } + catch (...) + { + } + } + }); + return task_promise->get_future(); + } + + /** + * @brief Wait for tasks to be completed. Normally, this function waits for all tasks, both those that are currently running in the threads and those that are still waiting in the queue. Note: To wait for just one specific task, use submit() instead, and call the wait() member function of the generated future. + */ + void wait_for_tasks() + { + waiting = true; + std::unique_lock tasks_lock(tasks_mutex); + task_done_cv.wait(tasks_lock, [this] { return (tasks_total == 0); }); + waiting = false; + } + +private: + // ======================== + // Private member functions + // ======================== + + /** + * @brief Create the threads in the pool and assign a worker to each thread. + */ + void create_threads() + { + running = true; + for (concurrency_t i = 0; i < thread_count; ++i) + { + threads[i] = std::thread(&thread_pool_light::worker, this); + } + } + + /** + * @brief Destroy the threads in the pool. + */ + void destroy_threads() + { + running = false; + task_available_cv.notify_all(); + for (concurrency_t i = 0; i < thread_count; ++i) + { + threads[i].join(); + } + } + + /** + * @brief Determine how many threads the pool should have, based on the parameter passed to the constructor. + * + * @param thread_count_ The parameter passed to the constructor. If the parameter is a positive number, then the pool will be created with this number of threads. If the parameter is non-positive, or a parameter was not supplied (in which case it will have the default value of 0), then the pool will be created with the total number of hardware threads available, as obtained from std::thread::hardware_concurrency(). If the latter returns a non-positive number for some reason, then the pool will be created with just one thread. + * @return The number of threads to use for constructing the pool. + */ + [[nodiscard]] concurrency_t determine_thread_count(const concurrency_t thread_count_) + { + if (thread_count_ > 0) + return thread_count_; + else + { + if (std::thread::hardware_concurrency() > 0) + return std::thread::hardware_concurrency(); + else + return 1; + } + } + + /** + * @brief A worker function to be assigned to each thread in the pool. Waits until it is notified by push_task() that a task is available, and then retrieves the task from the queue and executes it. Once the task finishes, the worker notifies wait_for_tasks() in case it is waiting. + */ + void worker() + { + while (running) + { + std::function task; + std::unique_lock tasks_lock(tasks_mutex); + task_available_cv.wait(tasks_lock, [this] { return !tasks.empty() || !running; }); + if (running) + { + task = std::move(tasks.front()); + tasks.pop(); + tasks_lock.unlock(); + task(); + tasks_lock.lock(); + --tasks_total; + if (waiting) + task_done_cv.notify_one(); + } + } + } + + // ============ + // Private data + // ============ + + /** + * @brief An atomic variable indicating to the workers to keep running. When set to false, the workers permanently stop working. + */ + std::atomic running = false; + + /** + * @brief A condition variable used to notify worker() that a new task has become available. + */ + std::condition_variable task_available_cv = {}; + + /** + * @brief A condition variable used to notify wait_for_tasks() that a tasks is done. + */ + std::condition_variable task_done_cv = {}; + + /** + * @brief A queue of tasks to be executed by the threads. + */ + std::queue> tasks = {}; + + /** + * @brief An atomic variable to keep track of the total number of unfinished tasks - either still in the queue, or running in a thread. + */ + std::atomic tasks_total = 0; + + /** + * @brief A mutex to synchronize access to the task queue by different threads. + */ + mutable std::mutex tasks_mutex = {}; + + /** + * @brief The number of threads in the pool. + */ + concurrency_t thread_count = 0; + + /** + * @brief A smart pointer to manage the memory allocated for the threads. + */ + std::unique_ptr threads = nullptr; + + /** + * @brief An atomic variable indicating that wait_for_tasks() is active and expects to be notified whenever a task is done. + */ + std::atomic waiting = false; +}; + +} // namespace BS diff --git a/BS_thread_pool_light_test.cpp b/BS_thread_pool_light_test.cpp new file mode 100644 index 0000000..a4e125a --- /dev/null +++ b/BS_thread_pool_light_test.cpp @@ -0,0 +1,657 @@ +/** + * @file BS_thread_pool_light_test.cpp + * @author Barak Shoshany (baraksh@gmail.com) (http://baraksh.com) + * @version 3.3.0 + * @date 2022-08-03 + * @copyright Copyright (c) 2022 Barak Shoshany. Licensed under the MIT license. If you found this project useful, please consider starring it on GitHub! If you use this library in software of any kind, please provide a link to the GitHub repository https://github.com/bshoshany/thread-pool in the source code and documentation. If you use this library in published research, please cite it as follows: Barak Shoshany, "A C++17 Thread Pool for High-Performance Scientific Computing", doi:10.5281/zenodo.4742687, arXiv:2105.00613 (May 2021) + * + * @brief BS::thread_pool_light: a fast, lightweight, and easy-to-use C++17 thread pool library. This program tests all aspects of the light version of the main library, but is not needed in order to use the library. + */ + +// Get rid of annoying MSVC warning. +#ifdef _MSC_VER +#define _CRT_SECURE_NO_WARNINGS +#endif + +#include // std::min, std::sort, std::unique +#include // std::atomic +#include // std::chrono +#include // std::abs +#include // std::condition_variable +#include // std::exception +#include // std::future +#include // std::cout, std::endl +#include // std::make_unique, std::unique_ptr +#include // std::mutex, std::scoped_lock, std::unique_lock +#include // std::mt19937_64, std::random_device, std::uniform_int_distribution +#include // std::runtime_error +#include // std::string, std::to_string +#include // std::this_thread, std::thread +#include // std::forward +#include // std::vector + +// Include the header file for the thread pool library. +#include "BS_thread_pool_light.hpp" + +// ================ +// Global variables +// ================ + +// A global thread pool object to be used throughout the test. +BS::thread_pool_light pool; + +// A global random_device object to be used to seed some random number generators. +std::random_device rd; + +// Global variables to measure how many checks succeeded and how many failed. +size_t tests_succeeded = 0; +size_t tests_failed = 0; + +// ================ +// Helper functions +// ================ + +/** + * @brief Print any number of items into both std::cout and the log file, syncing both independently. + * + * @tparam T The types of the items. + * @param items The items to print. + */ +template +void print(T&&... items) +{ + (std::cout << ... << std::forward(items)); +} + +/** + * @brief Print any number of items into both std::cout and the log file, syncing both independently. Also prints a newline character, and flushes the stream. + * + * @tparam T The types of the items. + * @param items The items to print. + */ +template +void println(T&&... items) +{ + print(std::forward(items)..., static_cast(std::endl)); +} + +/** + * @brief Print a stylized header. + * + * @param text The text of the header. Will appear between two lines. + * @param symbol The symbol to use for the lines. Default is '='. + */ +void print_header(const std::string& text, const char symbol = '=') +{ + println(); + println(std::string(text.length(), symbol)); + println(text); + println(std::string(text.length(), symbol)); +} + +/** + * @brief Check if a condition is met, report the result, and keep count of the total number of successes and failures. + * + * @param condition The condition to check. + */ +void check(const bool condition) +{ + if (condition) + { + println("-> PASSED!"); + ++tests_succeeded; + } + else + { + println("-> FAILED!"); + ++tests_failed; + } +} + +/** + * @brief Check if the expected result has been obtained, report the result, and keep count of the total number of successes and failures. + * + * @param condition The condition to check. + */ +template +void check(const T1 expected, const T2 obtained) +{ + print("Expected: ", expected, ", obtained: ", obtained); + if (expected == obtained) + { + println(" -> PASSED!"); + ++tests_succeeded; + } + else + { + println(" -> FAILED!"); + ++tests_failed; + } +} + +// ========================================= +// Functions to verify the number of threads +// ========================================= + +/** + * @brief Count the number of unique threads in the pool. Submits a number of tasks equal to twice the thread count into the pool. Each task stores the ID of the thread running it, and then waits until released by the main thread. This ensures that each thread in the pool runs at least one task. The number of unique thread IDs is then counted from the stored IDs. + */ +std::condition_variable ID_cv, total_cv; +std::mutex ID_mutex, total_mutex; +BS::concurrency_t count_unique_threads() +{ + const BS::concurrency_t num_tasks = pool.get_thread_count() * 2; + std::vector thread_IDs(num_tasks); + std::unique_lock total_lock(total_mutex); + BS::concurrency_t total_count = 0; + bool ID_release = false; + pool.wait_for_tasks(); + for (std::thread::id& id : thread_IDs) + pool.push_task( + [&total_count, &id, &ID_release] + { + id = std::this_thread::get_id(); + { + const std::scoped_lock total_lock_local(total_mutex); + ++total_count; + } + total_cv.notify_one(); + std::unique_lock ID_lock_local(ID_mutex); + ID_cv.wait(ID_lock_local, [&ID_release] { return ID_release; }); + }); + total_cv.wait(total_lock, [&total_count] { return total_count == pool.get_thread_count(); }); + { + const std::scoped_lock ID_lock(ID_mutex); + ID_release = true; + } + ID_cv.notify_all(); + total_cv.wait(total_lock, [&total_count, &num_tasks] { return total_count == num_tasks; }); + pool.wait_for_tasks(); + std::sort(thread_IDs.begin(), thread_IDs.end()); + return static_cast(std::unique(thread_IDs.begin(), thread_IDs.end()) - thread_IDs.begin()); +} + +/** + * @brief Check that the constructor works. + */ +void check_constructor() +{ + println("Checking that the thread pool reports a number of threads equal to the hardware concurrency..."); + check(std::thread::hardware_concurrency(), pool.get_thread_count()); + println("Checking that the manually counted number of unique thread IDs is equal to the reported number of threads..."); + check(pool.get_thread_count(), count_unique_threads()); +} + +// ======================================= +// Functions to verify submission of tasks +// ======================================= + +/** + * @brief Check that push_task() works. + */ +void check_push_task() +{ + println("Checking that push_task() works for a function with no arguments or return value..."); + { + bool flag = false; + pool.push_task([&flag] { flag = true; }); + pool.wait_for_tasks(); + check(flag); + } + println("Checking that push_task() works for a function with one argument and no return value..."); + { + bool flag = false; + pool.push_task([](bool* flag_) { *flag_ = true; }, &flag); + pool.wait_for_tasks(); + check(flag); + } + println("Checking that push_task() works for a function with two arguments and no return value..."); + { + bool flag1 = false; + bool flag2 = false; + pool.push_task([](bool* flag1_, bool* flag2_) { *flag1_ = *flag2_ = true; }, &flag1, &flag2); + pool.wait_for_tasks(); + check(flag1 && flag2); + } +} + +/** + * @brief Check that submit() works. + */ +void check_submit() +{ + println("Checking that submit() works for a function with no arguments or return value..."); + { + bool flag = false; + pool.submit([&flag] { flag = true; }).wait(); + check(flag); + } + println("Checking that submit() works for a function with one argument and no return value..."); + { + bool flag = false; + pool.submit([](bool* flag_) { *flag_ = true; }, &flag).wait(); + check(flag); + } + println("Checking that submit() works for a function with two arguments and no return value..."); + { + bool flag1 = false; + bool flag2 = false; + pool.submit([](bool* flag1_, bool* flag2_) { *flag1_ = *flag2_ = true; }, &flag1, &flag2).wait(); + check(flag1 && flag2); + } + println("Checking that submit() works for a function with no arguments and a return value..."); + { + bool flag = false; + std::future flag_future = pool.submit( + [&flag] + { + flag = true; + return 42; + }); + check(flag_future.get() == 42 && flag); + } + println("Checking that submit() works for a function with one argument and a return value..."); + { + bool flag = false; + std::future flag_future = pool.submit( + [](bool* flag_) + { + *flag_ = true; + return 42; + }, + &flag); + check(flag_future.get() == 42 && flag); + } + println("Checking that submit() works for a function with two arguments and a return value..."); + { + bool flag1 = false; + bool flag2 = false; + std::future flag_future = pool.submit( + [](bool* flag1_, bool* flag2_) + { + *flag1_ = *flag2_ = true; + return 42; + }, + &flag1, &flag2); + check(flag_future.get() == 42 && flag1 && flag2); + } +} + +class flag_class +{ +public: + void set_flag_no_args() + { + flag = true; + } + + void set_flag_one_arg(const bool arg) + { + flag = arg; + } + + int set_flag_no_args_return() + { + flag = true; + return 42; + } + + int set_flag_one_arg_return(const bool arg) + { + flag = arg; + return 42; + } + + bool get_flag() const + { + return flag; + } + + void push_test_flag_no_args() + { + pool.push_task(&flag_class::set_flag_no_args, this); + pool.wait_for_tasks(); + check(get_flag()); + } + + void push_test_flag_one_arg() + { + pool.push_task(&flag_class::set_flag_one_arg, this, true); + pool.wait_for_tasks(); + check(get_flag()); + } + + void submit_test_flag_no_args() + { + pool.submit(&flag_class::set_flag_no_args, this).wait(); + check(get_flag()); + } + + void submit_test_flag_one_arg() + { + pool.submit(&flag_class::set_flag_one_arg, this, true).wait(); + check(get_flag()); + } + + void submit_test_flag_no_args_return() + { + std::future flag_future = pool.submit(&flag_class::set_flag_no_args_return, this); + check(flag_future.get() == 42 && get_flag()); + } + + void submit_test_flag_one_arg_return() + { + std::future flag_future = pool.submit(&flag_class::set_flag_one_arg_return, this, true); + check(flag_future.get() == 42 && get_flag()); + } + +private: + bool flag = false; +}; + +/** + * @brief Check that submitting member functions works. + */ +void check_member_function() +{ + println("Checking that push_task() works for a member function with no arguments or return value..."); + { + flag_class flag; + pool.push_task(&flag_class::set_flag_no_args, &flag); + pool.wait_for_tasks(); + check(flag.get_flag()); + } + println("Checking that push_task() works for a member function with one argument and no return value..."); + { + flag_class flag; + pool.push_task(&flag_class::set_flag_one_arg, &flag, true); + pool.wait_for_tasks(); + check(flag.get_flag()); + } + println("Checking that submit() works for a member function with no arguments or return value..."); + { + flag_class flag; + pool.submit(&flag_class::set_flag_no_args, &flag).wait(); + check(flag.get_flag()); + } + println("Checking that submit() works for a member function with one argument and no return value..."); + { + flag_class flag; + pool.submit(&flag_class::set_flag_one_arg, &flag, true).wait(); + check(flag.get_flag()); + } + println("Checking that submit() works for a member function with no arguments and a return value..."); + { + flag_class flag; + std::future flag_future = pool.submit(&flag_class::set_flag_no_args_return, &flag); + check(flag_future.get() == 42 && flag.get_flag()); + } + println("Checking that submit() works for a member function with one argument and a return value..."); + { + flag_class flag; + std::future flag_future = pool.submit(&flag_class::set_flag_one_arg_return, &flag, true); + check(flag_future.get() == 42 && flag.get_flag()); + } +} + +/** + * @brief Check that submitting member functions within an object works. + */ +void check_member_function_within_object() +{ + println("Checking that push_task() works within an object for a member function with no arguments or return value..."); + { + flag_class flag; + flag.push_test_flag_no_args(); + } + println("Checking that push_task() works within an object for a member function with one argument and no return value..."); + { + flag_class flag; + flag.push_test_flag_one_arg(); + } + println("Checking that submit() works within an object for a member function with no arguments or return value..."); + { + flag_class flag; + flag.submit_test_flag_no_args(); + } + println("Checking that submit() works within an object for a member function with one argument and no return value..."); + { + flag_class flag; + flag.submit_test_flag_one_arg(); + } + println("Checking that submit() works within an object for a member function with no arguments and a return value..."); + { + flag_class flag; + flag.submit_test_flag_no_args_return(); + } + println("Checking that submit() works within an object for a member function with one argument and a return value..."); + { + flag_class flag; + flag.submit_test_flag_one_arg_return(); + } +} + +/** + * @brief Check that wait_for_tasks() works. + */ +void check_wait_for_tasks() +{ + const BS::concurrency_t n = pool.get_thread_count() * 10; + std::unique_ptr[]> flags = std::make_unique[]>(n); + for (BS::concurrency_t i = 0; i < n; ++i) + pool.push_task( + [&flags, i] + { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + flags[i] = true; + }); + println("Waiting for tasks..."); + pool.wait_for_tasks(); + bool all_flags = true; + for (BS::concurrency_t i = 0; i < n; ++i) + all_flags = all_flags && flags[i]; + check(all_flags); +} + +// ======================================== +// Functions to verify loop parallelization +// ======================================== + +/** + * @brief Check that push_loop() works for a specific range of indices split over a specific number of tasks, with no return value. + * + * @param random_start The first index in the loop. + * @param random_end The last index in the loop plus 1. + * @param num_tasks The number of tasks. + */ +template +void check_push_loop_no_return(const int64_t random_start, T random_end, const BS::concurrency_t num_tasks) +{ + if (random_start == random_end) + ++random_end; + println("Verifying that push_loop() from ", random_start, " to ", random_end, " with ", num_tasks, num_tasks == 1 ? " task" : " tasks", " modifies all indices..."); + const size_t num_indices = static_cast(std::abs(random_end - random_start)); + const int64_t offset = std::min(random_start, static_cast(random_end)); + std::unique_ptr[]> flags = std::make_unique[]>(num_indices); + const auto loop = [&flags, offset](const int64_t start, const int64_t end) + { + for (int64_t i = start; i < end; ++i) + flags[static_cast(i - offset)] = true; + }; + if (random_start == 0) + pool.push_loop(random_end, loop, num_tasks); + else + pool.push_loop(random_start, random_end, loop, num_tasks); + pool.wait_for_tasks(); + bool all_flags = true; + for (size_t i = 0; i < num_indices; ++i) + all_flags = all_flags && flags[i]; + check(all_flags); +} + +/** + * @brief Check that push_loop() works using several different random values for the range of indices and number of tasks. + */ +void check_parallelize_loop() +{ + std::mt19937_64 mt(rd()); + std::uniform_int_distribution index_dist(-1000000, 1000000); + std::uniform_int_distribution task_dist(1, pool.get_thread_count()); + constexpr uint64_t n = 10; + for (uint64_t i = 0; i < n; ++i) + check_push_loop_no_return(index_dist(mt), index_dist(mt), task_dist(mt)); + println("Verifying that push_loop() with identical start and end indices does nothing..."); + bool flag = true; + const int64_t index = index_dist(mt); + pool.push_loop(index, index, [&flag](const int64_t, const int64_t) { flag = false; }); + pool.wait_for_tasks(); + check(flag); + println("Trying push_loop() with start and end indices of different types:"); + const int64_t start = index_dist(mt); + const uint32_t end = static_cast(std::abs(index_dist(mt))); + check_push_loop_no_return(start, end, task_dist(mt)); + println("Trying the overload for push_loop() for the case where the first index is equal to 0:"); + check_push_loop_no_return(0, index_dist(mt), task_dist(mt)); +} + +// ====================================== +// Functions to verify exception handling +// ====================================== + +/** + * @brief Check that exception handling works. + */ +void check_exceptions() +{ + println("Checking that exceptions are forwarded correctly by submit()..."); + bool caught = false; + auto throws = [] + { + println("Throwing exception..."); + throw std::runtime_error("Exception thrown!"); + }; + std::future my_future = pool.submit(throws); + try + { + my_future.get(); + } + catch (const std::exception& e) + { + if (e.what() == std::string("Exception thrown!")) + caught = true; + } + check(caught); +} + +// ===================================== +// Functions to verify vector operations +// ===================================== + +/** + * @brief Check that parallelized vector operations work as expected by calculating the sum of two randomized vectors of a specific size in two ways, single-threaded and multithreaded, and comparing the results. + */ +void check_vector_of_size(const size_t vector_size, const BS::concurrency_t num_tasks) +{ + std::vector vector_1(vector_size); + std::vector vector_2(vector_size); + std::mt19937_64 mt(rd()); + std::uniform_int_distribution vector_dist(-1000000, 1000000); + for (size_t i = 0; i < vector_size; ++i) + { + vector_1[i] = vector_dist(mt); + vector_2[i] = vector_dist(mt); + } + println("Adding two vectors with ", vector_size, " elements using ", num_tasks, " tasks..."); + std::vector sum_single(vector_size); + for (size_t i = 0; i < vector_size; ++i) + sum_single[i] = vector_1[i] + vector_2[i]; + std::vector sum_multi(vector_size); + pool.push_loop( + 0, vector_size, + [&sum_multi, &vector_1, &vector_2](const size_t start, const size_t end) + { + for (size_t i = start; i < end; ++i) + sum_multi[i] = vector_1[i] + vector_2[i]; + }, + num_tasks); + pool.wait_for_tasks(); + bool vectors_equal = true; + for (size_t i = 0; i < vector_size; ++i) + vectors_equal = vectors_equal && (sum_single[i] == sum_multi[i]); + check(vectors_equal); +} + +/** + * @brief Check that parallelized vector operations work as expected by calculating the sum of two randomized vectors in two ways, single-threaded and multithreaded, and comparing the results. + */ +void check_vectors() +{ + std::mt19937_64 mt(rd()); + std::uniform_int_distribution size_dist(0, 1000000); + std::uniform_int_distribution task_dist(1, pool.get_thread_count()); + for (size_t i = 0; i < 10; ++i) + check_vector_of_size(size_dist(mt), task_dist(mt)); +} + +// ================== +// Main test function +// ================== + +/** + * @brief Test that various aspects of the library are working as expected. + */ +void do_tests() +{ + print_header("Checking that the constructor works:"); + check_constructor(); + + print_header("Checking that push_task() works:"); + check_push_task(); + + print_header("Checking that submit() works:"); + check_submit(); + + print_header("Checking that submitting member functions works:"); + check_member_function(); + + print_header("Checking that submitting member functions from within an object works:"); + check_member_function_within_object(); + + print_header("Checking that wait_for_tasks() works..."); + check_wait_for_tasks(); + + print_header("Checking that push_loop() works:"); + check_parallelize_loop(); + + print_header("Checking that exception handling works:"); + check_exceptions(); + + print_header("Testing that vector operations produce the expected results:"); + check_vectors(); +} + +int main() +{ + println("BS::thread_pool_light: a fast, lightweight, and easy-to-use C++17 thread pool library"); + println("(c) 2022 Barak Shoshany (baraksh@gmail.com) (http://baraksh.com)"); + println("GitHub: https://github.com/bshoshany/thread-pool\n"); + + println("Thread pool library version is ", BS_THREAD_POOL_VERSION, "."); + println("Hardware concurrency is ", std::thread::hardware_concurrency(), "."); + + println("Important: Please do not run any other applications, especially multithreaded applications, in parallel with this test!"); + + do_tests(); + + if (tests_failed == 0) + { + print_header("SUCCESS: Passed all " + std::to_string(tests_succeeded) + " checks!", '+'); + return EXIT_SUCCESS; + } + else + { + print_header("FAILURE: Passed " + std::to_string(tests_succeeded) + " checks, but failed " + std::to_string(tests_failed) + "!", '+'); + println("\nPlease submit a bug report at https://github.com/bshoshany/thread-pool/issues including the exact specifications of your system (OS, CPU, compiler, etc.) and the generated log file."); + return EXIT_FAILURE; + } +} diff --git a/BS_thread_pool_test.cpp b/BS_thread_pool_test.cpp index ce831f1..32201c8 100644 --- a/BS_thread_pool_test.cpp +++ b/BS_thread_pool_test.cpp @@ -1,8 +1,8 @@ /** * @file BS_thread_pool_test.cpp * @author Barak Shoshany (baraksh@gmail.com) (http://baraksh.com) - * @version 3.2.0 - * @date 2022-07-28 + * @version 3.3.0 + * @date 2022-08-03 * @copyright Copyright (c) 2022 Barak Shoshany. Licensed under the MIT license. If you found this project useful, please consider starring it on GitHub! If you use this library in software of any kind, please provide a link to the GitHub repository https://github.com/bshoshany/thread-pool in the source code and documentation. If you use this library in published research, please cite it as follows: Barak Shoshany, "A C++17 Thread Pool for High-Performance Scientific Computing", doi:10.5281/zenodo.4742687, arXiv:2105.00613 (May 2021) * * @brief BS::thread_pool: a fast, lightweight, and easy-to-use C++17 thread pool library. This program tests all aspects of the library, but is not needed in order to use the library. @@ -93,7 +93,7 @@ void dual_print(T&&... items) template void dual_println(T&&... items) { - dual_print(std::forward(items)..., BS::endl); + dual_print(std::forward(items)..., BS::synced_stream::endl); } /** @@ -120,7 +120,7 @@ std::string get_time() const std::time_t t = std::time(nullptr); char time_string[32]; std::strftime(time_string, sizeof(time_string), "%Y-%m-%d_%H.%M.%S", std::localtime(&t)); - return std::string(time_string); + return time_string; } /** @@ -669,8 +669,12 @@ void check_pausing() BS::concurrency_t n = std::min(std::thread::hardware_concurrency(), 4); dual_println("Resetting pool to ", n, " threads."); pool.reset(n); + dual_println("Checking that the pool correctly reports that it is not paused."); + check(pool.is_paused() == false); dual_println("Pausing pool."); - pool.paused = true; + pool.pause(); + dual_println("Checking that the pool correctly reports that it is paused."); + check(pool.is_paused() == true); dual_println("Submitting ", n * 3, " tasks, each one waiting for 200ms."); for (BS::concurrency_t i = 0; i < n * 3; ++i) pool.push_task( @@ -689,14 +693,16 @@ void check_pausing() dual_print("Result: ", pool.get_tasks_total(), " tasks total, ", pool.get_tasks_running(), " tasks running, ", pool.get_tasks_queued(), " tasks queued "); check(pool.get_tasks_total() == n * 3 && pool.get_tasks_running() == 0 && pool.get_tasks_queued() == n * 3); dual_println("Unpausing pool."); - pool.paused = false; + pool.unpause(); + dual_println("Checking that the pool correctly reports that it is not paused."); + check(pool.is_paused() == false); std::this_thread::sleep_for(std::chrono::milliseconds(300)); dual_println("300ms later, should have: ", n * 2, " tasks total, ", n, " tasks running, ", n, " tasks queued..."); dual_print("Result: ", pool.get_tasks_total(), " tasks total, ", pool.get_tasks_running(), " tasks running, ", pool.get_tasks_queued(), " tasks queued "); check(pool.get_tasks_total() == n * 2 && pool.get_tasks_running() == n && pool.get_tasks_queued() == n); dual_println("Pausing pool and using wait_for_tasks() to wait for the running tasks."); - pool.paused = true; + pool.pause(); pool.wait_for_tasks(); dual_println("After waiting, should have: ", n, " tasks total, ", 0, " tasks running, ", n, " tasks queued..."); @@ -708,7 +714,7 @@ void check_pausing() dual_print("Result: ", pool.get_tasks_total(), " tasks total, ", pool.get_tasks_running(), " tasks running, ", pool.get_tasks_queued(), " tasks queued "); check(pool.get_tasks_total() == n && pool.get_tasks_running() == 0 && pool.get_tasks_queued() == n); dual_println("Unpausing pool and using wait_for_tasks() to wait for all tasks."); - pool.paused = false; + pool.unpause(); pool.wait_for_tasks(); dual_println("After waiting, should have: ", 0, " tasks total, ", 0, " tasks running, ", 0, " tasks queued..."); @@ -750,8 +756,8 @@ void check_exceptions() dual_println("Checking that exceptions are forwarded correctly by BS::multi_future..."); caught = false; BS::multi_future my_future2; - my_future2.f.push_back(pool.submit(throws)); - my_future2.f.push_back(pool.submit(throws)); + my_future2.push_back(pool.submit(throws)); + my_future2.push_back(pool.submit(throws)); try { void(my_future2.get()); @@ -910,7 +916,7 @@ std::pair analyze(const std::vector(timings[i]) - mean) * (static_cast(timings[i]) - mean) / static_cast(timings.size()); const double sd = std::sqrt(variance); - return std::pair(mean, sd); + return {mean, sd}; } /** diff --git a/CHANGELOG.md b/CHANGELOG.md index 574e409..0fc03d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ Website: [https://baraksh.com/](https://baraksh.com/)
GitHub: [https://github.com/bshoshany](https://github.com/bshoshany)
* [Version history](#version-history) + * [v3.3.0 (2022-08-03)](#v330-2022-08-03) * [v3.2.0 (2022-07-28)](#v320-2022-07-28) * [v3.1.0 (2022-07-13)](#v310-2022-07-13) * [v3.0.0 (2022-05-30)](#v300-2022-05-30) @@ -33,6 +34,25 @@ GitHub: [https://github.com/bshoshany](https://github.com/bshoshany)
## Version history +### v3.3.0 (2022-08-03) + +* `BS_thread_pool.hpp`: + * The public member variable `paused` of `BS::thread_pool` has been made private for future-proofing (in case future versions implement a more involved pausing mechanism) and better encapsulation. It is now accessible only via the `pause()`, `unpause()`, and `is_paused()` member functions. In other words: + * Replace `pool.paused = true` with `pool.pause()`. + * Replace `pool.paused = false` with `pool.unpause()`. + * Replace `if (pool.paused)` (or similar) with `if (pool.is_paused())`. + * The public member variable `f` of `BS::multi_future` has been renamed to `futures` for clarity, and has been made private for encapsulation and simplification purposes. Instead of operating on the vector `futures` itself, you can now use the `[]` operator of the `BS::multi_future` to access the future at a specific index directly, or the `push_back()` member function to append a new future to the list. The `size()` member function tells you how many futures are currently stored in the object. + * The explicit casts of `std::endl` and `std::flush`, added in v3.2.0 to enable flushing a `BS::synced_stream`, caused ODR (One Definition Rule) violations if `BS_thread_pool.hpp` was included in two different translation units, since they were mistakenly not defined as `inline`. To fix this, I decided to make them static members of `BS::synced_stream` instead of global variables, which also makes the code better organized in my opinion. These objects can now be accessed as `BS::synced_stream::endl` and `BS::synced_stream::flush`. I also added an example for how to use them in `README.md`. See [#64](https://github.com/bshoshany/thread-pool/issues/64). +* `BS_thread_pool_light.hpp`: + * This package started out as a very lightweight thread pool, but over time has expanded to include many additional features, and at the time of writing it has a total of 340 lines of code, including all the helper classes. Therefore, I have decided to bundle a light version of the thread pool in a separate and stand-alone header file, `BS_thread_pool_light.hpp`, with only 170 lines of code (half the size of the full package). This file does not contain any of the helper classes, only a new `BS::thread_pool_light` class, which is a minimal thread pool with only the 5 most basic member functions: + * `get_thread_count()` + * `push_loop()` + * `push_task()` + * `submit()` + * `wait_for_tasks()` + * A separate test program `BS_thread_pool_light_test.cpp` tests only the features of the lightweight `BS::thread_pool_light` class. In the spirit of minimalism, it does not generate a log file and does not do any benchmarks. + * To be perfectly clear, each header file is 100% stand-alone. If you wish to use the full package, you only need `BS_thread_pool.hpp`, and if you wish to use the light version, you only need `BS_thread_pool_light.hpp`. Only a single header file needs to be included in your project. + ### v3.2.0 (2022-07-28) * `BS_thread_pool.hpp`: diff --git a/README.md b/README.md index 17fff67..ff92364 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ Email: [baraksh@gmail.com](mailto:baraksh@gmail.com)
Website: [https://baraksh.com/](https://baraksh.com/)
GitHub: [https://github.com/bshoshany](https://github.com/bshoshany)
-This is the complete documentation for v3.2.0 of the library, released on 2022-07-28. +This is the complete documentation for v3.3.0 of the library, released on 2022-08-03. * [Introduction](#introduction) * [Motivation](#motivation) @@ -37,8 +37,8 @@ This is the complete documentation for v3.2.0 of the library, released on 2022-0 * [Loops with return values](#loops-with-return-values) * [Parallelizing loops without futures](#parallelizing-loops-without-futures) * [Helper classes](#helper-classes) - * [Handling multiple futures at once](#handling-multiple-futures-at-once) * [Synchronizing printing to an output stream](#synchronizing-printing-to-an-output-stream) + * [Handling multiple futures at once](#handling-multiple-futures-at-once) * [Measuring execution time](#measuring-execution-time) * [Other features](#other-features) * [Monitoring the tasks](#monitoring-the-tasks) @@ -47,6 +47,7 @@ This is the complete documentation for v3.2.0 of the library, released on 2022-0 * [Testing the package](#testing-the-package) * [Automated tests](#automated-tests) * [Performance tests](#performance-tests) +* [The light version of the package](#the-light-version-of-the-package) * [About the project](#about-the-project) * [Issue and pull request policy](#issue-and-pull-request-policy) * [Acknowledgements](#acknowledgements) @@ -78,11 +79,12 @@ Other, more advanced multithreading libraries may offer more features and/or hig * Reusing threads avoids the overhead of creating and destroying them for individual tasks. * A task queue ensures that there are never more threads running in parallel than allowed by the hardware. * **Lightweight:** - * Only ~190 lines of code, excluding comments, blank lines, and the optional helper classes. * Single header file: simply `#include "BS_thread_pool.hpp"` and you're all set! * Header-only: no need to install or build the library. * Self-contained: no external requirements or dependencies. * Portable: uses only the C++ standard library, and works with any C++17-compliant compiler. + * Only ~340 lines of code, excluding comments and blank lines. + * A stand-alone "light version" of the thread pool is also available in the `BS_thread_pool_light.hpp` header file, with only ~170 lines of code. * **Easy to use:** * Very simple operation, using a handful of member functions. * Every task submitted to the queue using the `submit()` member function automatically generates an `std::future`, which can be used to wait for the task to finish executing and/or obtain its eventual return value. @@ -97,7 +99,7 @@ Other, more advanced multithreading libraries may offer more features and/or hig * Easily wait for all tasks in the queue to complete using the `wait_for_tasks()` member function. * Change the number of threads in the pool safely and on-the-fly as needed using the `reset()` member function. * Monitor the number of queued and/or running tasks using the `get_tasks_queued()`, `get_tasks_running()`, and `get_tasks_total()` member functions. - * Freely pause and resume the pool by modifying the `paused` member variable. When paused, threads do not retrieve new tasks out of the queue. + * Freely pause and resume the pool using the `pause()`, `unpause()`, and `is_paused()` member functions. When paused, threads do not retrieve new tasks out of the queue. * Catch exceptions thrown by the submitted tasks. * Submit class member functions to the pool, either applied to a specific object or from within the object itself. * Under continuous and active development. Bug reports and feature requests are welcome, and should be made via [GitHub issues](https://github.com/bshoshany/thread-pool/issues). @@ -627,47 +629,6 @@ As with `parallelize_loop()`, the first argument can be omitted if the start ind ## Helper classes -### Handling multiple futures at once - -The helper class template `BS::multi_future`, already introduced in the context of `parallelize_loop()`, provides a convenient way to collect and access groups of futures. The futures are stored in a public member variable `f` of type `std::vector>`, so all standard `std::vector` operations are available for organizing the futures. Once the futures are stored, you can use `wait()` to wait for all of them at once or `get()` to get an `std::vector` with the results from all of them. Here's a simple example: - -```cpp -#include "BS_thread_pool.hpp" - -int square(const int i) -{ - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - return i * i; -}; - -int main() -{ - BS::thread_pool pool; - BS::multi_future mf1; - BS::multi_future mf2; - for (int i = 0; i < 100; ++i) - mf1.f.push_back(pool.submit(square, i)); - for (int i = 100; i < 200; ++i) - mf2.f.push_back(pool.submit(square, i)); - /// ... - /// Do some stuff while the first group of tasks executes... - /// ... - const std::vector squares1 = mf1.get(); - std::cout << "Results from the first group:" << '\n'; - for (const int s : squares1) - std::cout << s << ' '; - /// ... - /// Do other stuff while the second group of tasks executes... - /// ... - const std::vector squares2 = mf2.get(); - std::cout << '\n' << "Results from the second group:" << '\n'; - for (const int s : squares2) - std::cout << s << ' '; -} -``` - -In this example, we simulate complicated tasks by having each task wait for 500ms before returning its result. We collect the futures of the tasks submitted within each loop into two separate `BS::multi_future` objects. `mf1` holds the results from the first loop, and `mf2` holds the results from the second loop. Now we can wait for and/or get the results from `mf1` whenever is convenient, and separately wait for and/or get the results from `mf2` at another time. - ### Synchronizing printing to an output stream When printing to an output stream from multiple threads in parallel, the output may become garbled. For example, consider this code: @@ -732,6 +693,94 @@ Task no. 5 executing. **Warning:** Always create the `BS::synced_stream` object **before** the `BS::thread_pool` object, as we did in this example. When the `BS::thread_pool` object goes out of scope, it waits for the remaining tasks to be executed. If the `BS::synced_stream` object goes out of scope before the `BS::thread_pool` object, then any tasks using the `BS::synced_stream` will crash. Since objects are destructed in the opposite order of construction, creating the `BS::synced_stream` object before the `BS::thread_pool` object ensures that the `BS::synced_stream` is always available to the tasks, even while the pool is destructing. +Most stream manipulators defined in the headers `` and ``, such as `std::setw` (set the character width of the next output), `std::setprecision` (set the precision of floating point numbers), and `std::fixed` (display floating point numbers with a fixed number of digits), can be passed to `print()` and `println()` just as you would pass them to a stream. + +The only exceptions are the flushing manipulators `std::endl` and `std::flush`, which will not work because the compiler will not be able to figure out which template specializations to use. Instead, use `BS::synced_stream::endl` and `BS::synced_stream::flush`. Here is an example: + +```cpp +#include "BS_thread_pool.hpp" +#include +#include + +int main() +{ + BS::synced_stream sync_out; + BS::thread_pool pool; + sync_out.print(std::setprecision(10), std::fixed); + for (size_t i = 1; i <= 10; ++i) + pool.push_task([i, &sync_out] { sync_out.print("The square root of ", std::setw(2), i, " is ", std::sqrt(i), ".", BS::synced_stream::endl); }); +} +``` + +### Handling multiple futures at once + +The helper class template `BS::multi_future`, already introduced in the context of `parallelize_loop()`, provides a convenient way to collect and access groups of futures. This class works similarly to STL containers such as `std::vector`: + +* When you create a new object, either use the default constructor to create an empty object and add futures to it later, or pass the desired number of futures to the constructor in advance. +* Use the `[]` operator to access the future at a specific index, or the `push_back()` member function to append a new future to the list. +* The `size()` member function tells you how many futures are currently stored in the object. +* Once all the futures are stored, you can use `wait()` to wait for all of them at once or `get()` to get an `std::vector` with the results from all of them. + +Aside from using `BS::multi_future` to track the execution of parallelized loops, it can also be used whenever you have several different groups of tasks and you want to track the execution of each group individually. Here's a simple example: + +```cpp +#include "BS_thread_pool.hpp" +#include + +BS::synced_stream sync_out; +BS::thread_pool pool; + +double power(const double i, const double j) +{ + std::this_thread::sleep_for(std::chrono::milliseconds(10 * pool.get_thread_count())); + return std::pow(i, j); +} + +void print_vector(const std::vector& vec) +{ + for (const double i : vec) + sync_out.print(i, ' '); + sync_out.println(); +} + +int main() +{ + constexpr size_t n = 100; + + // First group of tasks: calculate n squares. + // Here we create an empty BS::multi_future object, and append futures to it via push_back(). + BS::multi_future mf_squares; + for (int i = 0; i < n; ++i) + mf_squares.push_back(pool.submit(power, i, 2)); + + // Second group of tasks: calculate n cubes. + // In this case, we create a BS::multi_future object of the desired size in advance, and store the futures via the [] operator. This is faster since there will be no memory reallocations, but also more prone to errors. + BS::multi_future mf_cubes(n); + for (int i = 0; i < n; ++i) + mf_cubes[i] = pool.submit(power, i, 3); + + // Both groups are now queued, but it will take some time until they all execute. + + /// ... + /// Do some stuff while the first group of tasks executes... + /// ... + + // Get and print the results from the first group. + sync_out.println("Squares:"); + print_vector(mf_squares.get()); + + /// ... + /// Do other stuff while the second group of tasks executes... + /// ... + + // Get and print the results from the second group. + sync_out.println("Cubes:"); + print_vector(mf_cubes.get()); +} +``` + +In this example, we simulate complicated tasks by having each task wait for a bit before returning its result. We collect the futures of the tasks submitted within each group into two separate `BS::multi_future` objects. `mf_squares` holds the results from the first group, and `mf_cubes` holds the results from the second group. Now we can wait for and/or get the results from `mf_squares` whenever is convenient, and separately wait for and/or get the results from `mf_cubes` at another time. + ### Measuring execution time If you are using a thread pool, then your code is most likely performance-critical. Achieving maximum performance requires performing a considerable amount of benchmarking to determine the optimal settings and algorithms. Therefore, it is important to be able to measure the execution time of various computations and operations under different conditions. @@ -822,9 +871,9 @@ Task 11 done. ### Pausing the workers -Sometimes you may wish to temporarily pause the execution of tasks, or perhaps you want to submit tasks to the queue in advance and only start executing them at a later time. You can do this using the public member variable `paused`. +Sometimes you may wish to temporarily pause the execution of tasks, or perhaps you want to submit tasks to the queue in advance and only start executing them at a later time. You can do this using the member functions `pause()`, `unpause()`, and `is_paused()`. -When `paused` is set to `true`, the workers will temporarily stop retrieving new tasks out of the queue. However, any tasks already executed will keep running until they are done, since the thread pool has no control over the internal code of your tasks. If you need to pause a task in the middle of its execution, you must do that manually by programming your own pause mechanism into the task itself. To resume retrieving tasks, set `paused` back to its default value of `false`. +When you call `pause()`, the workers will temporarily stop retrieving new tasks out of the queue. However, any tasks already executed will keep running until they are done, since the thread pool has no control over the internal code of your tasks. If you need to pause a task in the middle of its execution, you must do that manually by programming your own pause mechanism into the task itself. To resume retrieving tasks, call `unpause()`. To check whether the pool is currently paused, call `is_paused()`. Here is an example: @@ -840,14 +889,22 @@ void sleep_half_second(const size_t i) sync_out.println("Task ", i, " done."); } +void check_if_paused() +{ + if (pool.is_paused()) + sync_out.println("Pool paused."); + else + sync_out.println("Pool unpaused."); +} + int main() { for (size_t i = 0; i < 8; ++i) pool.push_task(sleep_half_second, i); sync_out.println("Submitted 8 tasks."); std::this_thread::sleep_for(std::chrono::milliseconds(250)); - pool.paused = true; - sync_out.println("Pool paused."); + pool.pause(); + check_if_paused(); std::this_thread::sleep_for(std::chrono::milliseconds(1000)); sync_out.println("Still paused..."); std::this_thread::sleep_for(std::chrono::milliseconds(1000)); @@ -856,8 +913,8 @@ int main() sync_out.println("Submitted 4 more tasks."); sync_out.println("Still paused..."); std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - pool.paused = false; - sync_out.println("Pool resumed."); + pool.unpause(); + check_if_paused(); } ``` @@ -873,7 +930,7 @@ Task 3 done. Still paused... Submitted 4 more tasks. Still paused... -Pool resumed. +Pool unpaused. Task 4 done. Task 5 done. Task 6 done. @@ -884,7 +941,7 @@ Task 10 done. Task 11 done. ``` -Here is what happened. We initially submitted a total of 8 tasks to the queue. Since we waited for 250ms before pausing, the first 4 tasks have already started running, so they kept running until they finished. While the pool was paused, we submitted 4 more tasks to the queue, but they just waited at the end of the queue. When we resumed, the remaining 4 initial tasks were executed, followed by the 4 new tasks. +Here is what happened. We initially submitted a total of 8 tasks to the queue. Since we waited for 250ms before pausing, the first 4 tasks have already started running, so they kept running until they finished. While the pool was paused, we submitted 4 more tasks to the queue, but they just waited at the end of the queue. When we unpaused, the remaining 4 initial tasks were executed, followed by the 4 new tasks. While the workers are paused, `wait_for_tasks()` will wait for the running tasks instead of all tasks (otherwise it would wait forever). This is demonstrated by the following program: @@ -900,6 +957,14 @@ void sleep_half_second(const size_t i) sync_out.println("Task ", i, " done."); } +void check_if_paused() +{ + if (pool.is_paused()) + sync_out.println("Pool paused."); + else + sync_out.println("Pool unpaused."); +} + int main() { for (size_t i = 0; i < 8; ++i) @@ -910,8 +975,9 @@ int main() pool.push_task(sleep_half_second, i); sync_out.println("Submitted 12 more tasks."); std::this_thread::sleep_for(std::chrono::milliseconds(250)); - pool.paused = true; - sync_out.println("Pool paused. Waiting for the ", pool.get_tasks_running(), " running tasks to complete."); + pool.pause(); + check_if_paused(); + sync_out.println("Waiting for the ", pool.get_tasks_running(), " running tasks to complete."); pool.wait_for_tasks(); sync_out.println("All running tasks completed. ", pool.get_tasks_queued(), " tasks still queued."); std::this_thread::sleep_for(std::chrono::milliseconds(1000)); @@ -919,9 +985,10 @@ int main() std::this_thread::sleep_for(std::chrono::milliseconds(1000)); sync_out.println("Still paused..."); std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - pool.paused = false; + pool.unpause(); + check_if_paused(); std::this_thread::sleep_for(std::chrono::milliseconds(250)); - sync_out.println("Pool resumed. Waiting for the remaining ", pool.get_tasks_total(), " tasks (", pool.get_tasks_running(), " running and ", pool.get_tasks_queued(), " queued) to complete."); + sync_out.println("Waiting for the remaining ", pool.get_tasks_total(), " tasks (", pool.get_tasks_running(), " running and ", pool.get_tasks_queued(), " queued) to complete."); pool.wait_for_tasks(); sync_out.println("All tasks completed."); } @@ -940,7 +1007,8 @@ Task 5 done. Task 6 done. Task 7 done. Submitted 12 more tasks. -Pool paused. Waiting for the 4 running tasks to complete. +Pool paused. +Waiting for the 4 running tasks to complete. Task 8 done. Task 9 done. Task 10 done. @@ -948,7 +1016,8 @@ Task 11 done. All running tasks completed. 8 tasks still queued. Still paused... Still paused... -Pool resumed. Waiting for the remaining 8 tasks (4 running and 4 queued) to complete. +Pool unpaused. +Waiting for the remaining 8 tasks (4 running and 4 queued) to complete. Task 12 done. Task 13 done. Task 14 done. @@ -960,7 +1029,7 @@ Task 19 done. All tasks completed. ``` -The first `wait_for_tasks()`, which was called with `paused == false`, waited for all 8 tasks, both running and queued. The second `wait_for_tasks()`, which was called with `paused == true`, only waited for the 4 running tasks, while the other 8 tasks remained queued, and were not executed since the pool was paused. Finally, the third `wait_for_tasks()`, which was called with `paused == false`, waited for the remaining 8 tasks, both running and queued. +The first `wait_for_tasks()`, which was called while the pool was not paused, waited for all 8 tasks, both running and queued. The second `wait_for_tasks()`, which was called after pausing the pool, only waited for the 4 running tasks, while the other 8 tasks remained queued, and were not executed since the pool was paused. Finally, the third `wait_for_tasks()`, which was called after unpausing the pool, waited for the remaining 8 tasks, both running and queued. **Warning**: If the thread pool is destroyed while paused, any tasks still in the queue will never be executed! @@ -1059,9 +1128,9 @@ BS::thread_pool: a fast, lightweight, and easy-to-use C++17 thread pool library (c) 2022 Barak Shoshany (baraksh@gmail.com) (http://baraksh.com) GitHub: https://github.com/bshoshany/thread-pool -Thread pool library version is v3.2.0 (2022-07-28). +Thread pool library version is v3.3.0 (2022-08-03). Hardware concurrency is 24. -Generating log file: BS_thread_pool_test-2022-07-28_15.29.31.log. +Generating log file: BS_thread_pool_test-2022-08-03_12.32.04.log. Important: Please do not run any other applications, especially multithreaded applications, in parallel with this test! @@ -1152,78 +1221,78 @@ Waiting for tasks... ====================================================== Checking that push_loop() and parallelize_loop() work: ====================================================== -Verifying that push_loop() from 390892 to 541943 with 20 tasks modifies all indices... +Verifying that push_loop() from 917499 to 884861 with 19 tasks modifies all indices... -> PASSED! -Verifying that push_loop() from -233617 to 52646 with 22 tasks modifies all indices... +Verifying that push_loop() from 235488 to 296304 with 11 tasks modifies all indices... -> PASSED! -Verifying that push_loop() from 409845 to 410887 with 23 tasks modifies all indices... +Verifying that push_loop() from -790296 to -152228 with 21 tasks modifies all indices... -> PASSED! -Verifying that push_loop() from 977764 to 726111 with 12 tasks modifies all indices... +Verifying that push_loop() from -937055 to -135942 with 10 tasks modifies all indices... -> PASSED! -Verifying that push_loop() from -940107 to -882673 with 18 tasks modifies all indices... +Verifying that push_loop() from 372276 to 486867 with 3 tasks modifies all indices... -> PASSED! -Verifying that push_loop() from -613072 to 675872 with 10 tasks modifies all indices... +Verifying that push_loop() from 890415 to -163491 with 5 tasks modifies all indices... -> PASSED! -Verifying that push_loop() from 998082 to -267173 with 3 tasks modifies all indices... +Verifying that push_loop() from 637645 to -894687 with 7 tasks modifies all indices... -> PASSED! -Verifying that push_loop() from -779960 to 518984 with 4 tasks modifies all indices... +Verifying that push_loop() from 308032 to -254915 with 20 tasks modifies all indices... -> PASSED! -Verifying that push_loop() from -52424 to 2576 with 3 tasks modifies all indices... +Verifying that push_loop() from 499518 to 104936 with 17 tasks modifies all indices... -> PASSED! -Verifying that push_loop() from 131005 to -557044 with 10 tasks modifies all indices... +Verifying that push_loop() from 19080 to -378567 with 5 tasks modifies all indices... -> PASSED! -Verifying that parallelize_loop() from -119788 to -727738 with 14 tasks modifies all indices... +Verifying that parallelize_loop() from -298981 to -724834 with 11 tasks modifies all indices... -> PASSED! -Verifying that parallelize_loop() from -911425 to -923429 with 8 tasks modifies all indices... +Verifying that parallelize_loop() from 232695 to 767243 with 3 tasks modifies all indices... -> PASSED! -Verifying that parallelize_loop() from 49601 to -772605 with 16 tasks modifies all indices... +Verifying that parallelize_loop() from 177768 to 966097 with 10 tasks modifies all indices... -> PASSED! -Verifying that parallelize_loop() from -322809 to -66366 with 15 tasks modifies all indices... +Verifying that parallelize_loop() from 474617 to -155690 with 15 tasks modifies all indices... -> PASSED! -Verifying that parallelize_loop() from 880099 to -150434 with 22 tasks modifies all indices... +Verifying that parallelize_loop() from -733576 to 547977 with 9 tasks modifies all indices... -> PASSED! -Verifying that parallelize_loop() from 984341 to 69159 with 11 tasks modifies all indices... +Verifying that parallelize_loop() from -723922 to 992233 with 1 task modifies all indices... -> PASSED! -Verifying that parallelize_loop() from -207913 to 829987 with 9 tasks modifies all indices... +Verifying that parallelize_loop() from 957397 to 364478 with 5 tasks modifies all indices... -> PASSED! -Verifying that parallelize_loop() from 297749 to -332031 with 2 tasks modifies all indices... +Verifying that parallelize_loop() from 776948 to 895847 with 3 tasks modifies all indices... -> PASSED! -Verifying that parallelize_loop() from 517539 to 811728 with 8 tasks modifies all indices... +Verifying that parallelize_loop() from 696779 to 400637 with 17 tasks modifies all indices... -> PASSED! -Verifying that parallelize_loop() from 311585 to -81170 with 21 tasks modifies all indices... +Verifying that parallelize_loop() from -5265 to 746418 with 23 tasks modifies all indices... -> PASSED! -Verifying that parallelize_loop() from 670932 to 740527 with 3 tasks correctly sums all indices... -Expected: 98230419510, obtained: 98230419510 -> PASSED! -Verifying that parallelize_loop() from 645122 to -546036 with 11 tasks correctly sums all indices... -Expected: 118025890430, obtained: 118025890430 -> PASSED! -Verifying that parallelize_loop() from 251002 to -903037 with 6 tasks correctly sums all indices... -Expected: -752474973404, obtained: -752474973404 -> PASSED! -Verifying that parallelize_loop() from -222340 to -791805 with 15 tasks correctly sums all indices... -Expected: -577520651890, obtained: -577520651890 -> PASSED! -Verifying that parallelize_loop() from 937604 to 819765 with 3 tasks correctly sums all indices... -Expected: 207086487752, obtained: 207086487752 -> PASSED! -Verifying that parallelize_loop() from -53900 to -339294 with 24 tasks correctly sums all indices... -Expected: -112215493830, obtained: -112215493830 -> PASSED! -Verifying that parallelize_loop() from 65429 to 903556 with 22 tasks correctly sums all indices... -Expected: 812131652968, obtained: 812131652968 -> PASSED! -Verifying that parallelize_loop() from 519293 to -332413 with 19 tasks correctly sums all indices... -Expected: 159165965574, obtained: 159165965574 -> PASSED! -Verifying that parallelize_loop() from 165831 to 47482 with 18 tasks correctly sums all indices... -Expected: 25245261888, obtained: 25245261888 -> PASSED! -Verifying that parallelize_loop() from -534648 to 475350 with 13 tasks correctly sums all indices... -Expected: -59891871402, obtained: -59891871402 -> PASSED! +Verifying that parallelize_loop() from -229724 to -883103 with 9 tasks correctly sums all indices... +Expected: -727098445812, obtained: -727098445812 -> PASSED! +Verifying that parallelize_loop() from 730130 to -370499 with 3 tasks correctly sums all indices... +Expected: 395819207270, obtained: 395819207270 -> PASSED! +Verifying that parallelize_loop() from -493633 to 957239 with 14 tasks correctly sums all indices... +Expected: 672631513560, obtained: 672631513560 -> PASSED! +Verifying that parallelize_loop() from -455240 to -60850 with 14 tasks correctly sums all indices... +Expected: -203541129490, obtained: -203541129490 -> PASSED! +Verifying that parallelize_loop() from -287333 to 298991 with 9 tasks correctly sums all indices... +Expected: 6834778868, obtained: 6834778868 -> PASSED! +Verifying that parallelize_loop() from 62326 to -392718 with 18 tasks correctly sums all indices... +Expected: -150343352292, obtained: -150343352292 -> PASSED! +Verifying that parallelize_loop() from 663186 to 380865 with 23 tasks correctly sums all indices... +Expected: 294757240050, obtained: 294757240050 -> PASSED! +Verifying that parallelize_loop() from 609125 to -43020 with 1 task correctly sums all indices... +Expected: 369181893080, obtained: 369181893080 -> PASSED! +Verifying that parallelize_loop() from 465469 to 112037 with 4 tasks correctly sums all indices... +Expected: 204108747160, obtained: 204108747160 -> PASSED! +Verifying that parallelize_loop() from 690574 to 113023 with 15 tasks correctly sums all indices... +Expected: 464117673396, obtained: 464117673396 -> PASSED! Verifying that parallelize_loop() with identical start and end indices does nothing... -> PASSED! Trying parallelize_loop() with start and end indices of different types: -Verifying that parallelize_loop() from -962605 to 21974 with 17 tasks modifies all indices... +Verifying that parallelize_loop() from 894645 to 908567 with 9 tasks modifies all indices... -> PASSED! Trying the overloads for push_loop() and parallelize_loop() for the case where the first index is equal to 0: -Verifying that push_loop() from 0 to 482251 with 7 tasks modifies all indices... +Verifying that push_loop() from 0 to 949967 with 10 tasks modifies all indices... -> PASSED! -Verifying that parallelize_loop() from 0 to 151431 with 1 task modifies all indices... +Verifying that parallelize_loop() from 0 to 241018 with 3 tasks modifies all indices... -> PASSED! -Verifying that parallelize_loop() from 0 to 806998 with 4 tasks correctly sums all indices... -Expected: 651244965006, obtained: 651244965006 -> PASSED! +Verifying that parallelize_loop() from 0 to 574984 with 19 tasks correctly sums all indices... +Expected: 330606025272, obtained: 330606025272 -> PASSED! ==================================== Checking that task monitoring works: @@ -1232,21 +1301,21 @@ Resetting pool to 4 threads. Submitting 12 tasks. After submission, should have: 12 tasks total, 4 tasks running, 8 tasks queued... Result: 12 tasks total, 4 tasks running, 8 tasks queued -> PASSED! -Task 1 released. Task 2 released. -Task 3 released. Task 0 released. +Task 1 released. +Task 3 released. After releasing 4 tasks, should have: 8 tasks total, 4 tasks running, 4 tasks queued... Result: 8 tasks total, 4 tasks running, 4 tasks queued -> PASSED! -Task 5 released. Task 4 released. Task 7 released. Task 6 released. +Task 5 released. After releasing 4 more tasks, should have: 4 tasks total, 4 tasks running, 0 tasks queued... Result: 4 tasks total, 4 tasks running, 0 tasks queued -> PASSED! -Task 9 released. Task 11 released. Task 10 released. +Task 9 released. Task 8 released. After releasing the final 4 tasks, should have: 0 tasks total, 0 tasks running, 0 tasks queued... Result: 0 tasks total, 0 tasks running, 0 tasks queued -> PASSED! @@ -1256,31 +1325,37 @@ Resetting pool to 24 threads. Checking that pausing works: ============================ Resetting pool to 4 threads. +Checking that the pool correctly reports that it is not paused. +-> PASSED! Pausing pool. +Checking that the pool correctly reports that it is paused. +-> PASSED! Submitting 12 tasks, each one waiting for 200ms. Immediately after submission, should have: 12 tasks total, 0 tasks running, 12 tasks queued... Result: 12 tasks total, 0 tasks running, 12 tasks queued -> PASSED! 300ms later, should still have: 12 tasks total, 0 tasks running, 12 tasks queued... Result: 12 tasks total, 0 tasks running, 12 tasks queued -> PASSED! Unpausing pool. -Task 3 done. +Checking that the pool correctly reports that it is not paused. +-> PASSED! Task 1 done. Task 2 done. Task 0 done. +Task 3 done. 300ms later, should have: 8 tasks total, 4 tasks running, 4 tasks queued... Result: 8 tasks total, 4 tasks running, 4 tasks queued -> PASSED! Pausing pool and using wait_for_tasks() to wait for the running tasks. -Task 4 done. -Task 6 done. Task 7 done. Task 5 done. +Task 6 done. +Task 4 done. After waiting, should have: 4 tasks total, 0 tasks running, 4 tasks queued... Result: 4 tasks total, 0 tasks running, 4 tasks queued -> PASSED! 200ms later, should still have: 4 tasks total, 0 tasks running, 4 tasks queued... Result: 4 tasks total, 0 tasks running, 4 tasks queued -> PASSED! Unpausing pool and using wait_for_tasks() to wait for all tasks. -Task 9 done. Task 8 done. +Task 9 done. Task 10 done. Task 11 done. After waiting, should have: 0 tasks total, 0 tasks running, 0 tasks queued... @@ -1301,29 +1376,29 @@ Throwing exception... ============================================================ Testing that vector operations produce the expected results: ============================================================ -Adding two vectors with 767202 elements using 4 tasks... +Adding two vectors with 77579 elements using 9 tasks... -> PASSED! -Adding two vectors with 3575 elements using 3 tasks... +Adding two vectors with 925926 elements using 2 tasks... -> PASSED! -Adding two vectors with 392555 elements using 11 tasks... +Adding two vectors with 367682 elements using 22 tasks... -> PASSED! -Adding two vectors with 754640 elements using 16 tasks... +Adding two vectors with 28482 elements using 2 tasks... -> PASSED! -Adding two vectors with 516335 elements using 9 tasks... +Adding two vectors with 486607 elements using 19 tasks... -> PASSED! -Adding two vectors with 564723 elements using 17 tasks... +Adding two vectors with 688249 elements using 10 tasks... -> PASSED! -Adding two vectors with 558475 elements using 15 tasks... +Adding two vectors with 473738 elements using 18 tasks... -> PASSED! -Adding two vectors with 447497 elements using 21 tasks... +Adding two vectors with 743021 elements using 12 tasks... -> PASSED! -Adding two vectors with 121486 elements using 19 tasks... +Adding two vectors with 209804 elements using 11 tasks... -> PASSED! -Adding two vectors with 324254 elements using 24 tasks... +Adding two vectors with 635671 elements using 23 tasks... -> PASSED! ++++++++++++++++++++++++++++++ -SUCCESS: Passed all 85 checks! +SUCCESS: Passed all 88 checks! ++++++++++++++++++++++++++++++ ``` @@ -1360,6 +1435,22 @@ Thread pool performance test completed! This CPU has 12 physical cores, with each core providing two separate logical cores via hyperthreading, for a total of 24 threads. Without hyperthreading, we would expect a maximum theoretical speedup of 12x. With hyperthreading, one might naively expect to achieve up to a 24x speedup, but this is in fact impossible, as both logical cores share the same physical core's resources. However, generally we would expect [an estimated 30% additional speedup](https://software.intel.com/content/www/us/en/develop/articles/how-to-determine-the-effectiveness-of-hyper-threading-technology-with-an-application.html) from hyperthreading, which amounts to around 15.6x in this case. In our performance test, we see a speedup of 18.7x, saturating and even surpassing this estimated theoretical upper bound. +## The light version of the package + +This package started out as a very lightweight thread pool, but over time has expanded to include many additional features and helper classes. Therefore, I have decided to bundle a light version of the thread pool in a separate and stand-alone header file, `BS_thread_pool_light.hpp`, which is about half the size of the full package. + +This file does not contain any of the helper classes, only a new `BS::thread_pool_light` class, which is a minimal thread pool with only the 5 most basic member functions: + +* `get_thread_count()` +* `push_loop()` +* `push_task()` +* `submit()` +* `wait_for_tasks()` + +A separate test program `BS_thread_pool_light_test.cpp` tests only the features of the lightweight `BS::thread_pool_light` class. In the spirit of minimalism, it does not generate a log file and does not do any benchmarks. + +To be perfectly clear, each header file is 100% stand-alone. If you wish to use the full package, you only need `BS_thread_pool.hpp`, and if you wish to use the light version, you only need `BS_thread_pool_light.hpp`. Only a single header file needs to be included in your project. + ## About the project ### Issue and pull request policy