diff --git a/BS_thread_pool.hpp b/BS_thread_pool.hpp index 629e9d6..af407d4 100644 --- a/BS_thread_pool.hpp +++ b/BS_thread_pool.hpp @@ -3,39 +3,50 @@ /** * @file BS_thread_pool.hpp * @author Barak Shoshany (baraksh@gmail.com) (http://baraksh.com) - * @version 3.1.0 - * @date 2022-07-13 + * @version 3.2.0 + * @date 2022-07-28 * @copyright Copyright (c) 2022 Barak Shoshany. Licensed under the MIT license. If you found this project useful, please consider starring it on GitHub! If you use this library in software of any kind, please provide a link to the GitHub repository https://github.com/bshoshany/thread-pool in the source code and documentation. If you use this library in published research, please cite it as follows: Barak Shoshany, "A C++17 Thread Pool for High-Performance Scientific Computing", doi:10.5281/zenodo.4742687, arXiv:2105.00613 (May 2021) * - * @brief BS::thread_pool: a fast, lightweight, and easy-to-use C++17 thread pool library. This header file contains the entire library, including the main BS::thread_pool class and the helper classes BS::multi_future, BS:synced_stream, and BS::timer. + * @brief BS::thread_pool: a fast, lightweight, and easy-to-use C++17 thread pool library. This header file contains the entire library, including the main BS::thread_pool class and the helper classes BS::multi_future, BS::blocks, BS:synced_stream, and BS::timer. */ -#define BS_THREAD_POOL_VERSION "v3.1.0 (2022-07-13)" +#define BS_THREAD_POOL_VERSION "v3.2.0 (2022-07-28)" #include // std::atomic #include // std::chrono #include // std::condition_variable #include // std::current_exception -#include // std::function +#include // std::bind, std::function, std::invoke #include // std::future, std::promise -#include // std::cout, std::ostream +#include // std::cout, std::endl, std::flush, std::ostream #include // std::make_shared, std::make_unique, std::shared_ptr, std::unique_ptr #include // std::mutex, std::scoped_lock, std::unique_lock #include // std::queue #include // std::thread -#include // std::common_type_t, std::decay_t, std::is_void_v, std::invoke_result_t -#include // std::move, std::swap +#include // std::common_type_t, std::conditional_t, std::decay_t, std::invoke_result_t, std::is_void_v +#include // std::forward, std::move, std::swap #include // std::vector namespace BS { +/** + * @brief A convenient shorthand for the type of std::thread::hardware_concurrency(). Should evaluate to unsigned int. + */ using concurrency_t = std::invoke_result_t; +/** + * @brief Explicit casts of the flushing stream manipulators, to enable using them with synced_stream, e.g. sync_out.print(BS::flush). + */ +std::ostream& (&endl)(std::ostream&) = static_cast(std::endl); +std::ostream& (&flush)(std::ostream&) = static_cast(std::flush); + // ============================================================================================= // -// Begin class multi_future // +// Begin class multi_future // /** * @brief A helper class to facilitate waiting for and/or getting the results of multiple futures at once. + * + * @tparam T The return type of the futures. */ template class [[nodiscard]] multi_future @@ -49,16 +60,25 @@ class [[nodiscard]] multi_future multi_future(const size_t num_futures_ = 0) : f(num_futures_) {} /** - * @brief Get the results from all the futures stored in this multi_future object. + * @brief Get the results from all the futures stored in this multi_future object, rethrowing any stored exceptions. * - * @return A vector containing the results. + * @return If the futures return void, this function returns void as well. Otherwise, it returns a vector containing the results. */ - [[nodiscard]] std::vector get() + [[nodiscard]] std::conditional_t, void, std::vector> get() { - std::vector results(f.size()); - for (size_t i = 0; i < f.size(); ++i) - results[i] = f[i].get(); - return results; + if constexpr (std::is_void_v) + { + for (size_t i = 0; i < f.size(); ++i) + f[i].get(); + return; + } + else + { + std::vector results(f.size()); + for (size_t i = 0; i < f.size(); ++i) + results[i] = f[i].get(); + return results; + } } /** @@ -76,7 +96,113 @@ class [[nodiscard]] multi_future std::vector> f; }; -// End class multi_future // +// End class multi_future // +// ============================================================================================= // + +// ============================================================================================= // +// Begin class blocks // + +/** + * @brief A helper class to divide a range into blocks. Used by parallelize_loop() and push_loop(). + * + * @tparam T1 The type of the first index in the range. Should be a signed or unsigned integer. + * @tparam T2 The type of the index after the last index in the range. Should be a signed or unsigned integer. If T1 is not the same as T2, a common type will be automatically inferred. + * @tparam T The common type of T1 and T2. + */ +template > +class [[nodiscard]] blocks +{ +public: + /** + * @brief Construct a blocks object with the given specifications. + * + * @param first_index_ The first index in the range. + * @param index_after_last_ The index after the last index in the range. + * @param num_blocks_ The desired number of blocks to divide the range into. + */ + blocks(const T1 first_index_, const T2 index_after_last_, const size_t num_blocks_) : first_index(static_cast(first_index_)), index_after_last(static_cast(index_after_last_)), num_blocks(num_blocks_) + { + if (index_after_last < first_index) + std::swap(index_after_last, first_index); + total_size = static_cast(index_after_last - first_index); + block_size = static_cast(total_size / num_blocks); + if (block_size == 0) + { + block_size = 1; + num_blocks = (total_size > 1) ? total_size : 1; + } + } + + /** + * @brief Get the first index of a block. + * + * @param i The block number. + * @return The first index. + */ + [[nodiscard]] T start(const size_t i) const + { + return static_cast(i * block_size) + first_index; + } + + /** + * @brief Get the index after the last index of a block. + * + * @param i The block number. + * @return The index after the last index. + */ + [[nodiscard]] T end(const size_t i) const + { + return (i == num_blocks - 1) ? index_after_last : (static_cast((i + 1) * block_size) + first_index); + } + + /** + * @brief Get the number of blocks. Note that this may be different than the desired number of blocks that was passed to the constructor. + * + * @return The number of blocks. + */ + [[nodiscard]] size_t get_num_blocks() const + { + return num_blocks; + } + + /** + * @brief Get the total number of indices in the range. + * + * @return The total number of indices. + */ + [[nodiscard]] size_t get_total_size() const + { + return total_size; + } + +private: + /** + * @brief The size of each block (except possibly the last block). + */ + size_t block_size = 0; + + /** + * @brief The first index in the range. + */ + T first_index = 0; + + /** + * @brief The index after the last index in the range. + */ + T index_after_last = 0; + + /** + * @brief The number of blocks. + */ + size_t num_blocks = 0; + + /** + * @brief The total number of indices in the range. + */ + size_t total_size = 0; +}; + +// End class blocks // // ============================================================================================= // // ============================================================================================= // @@ -158,7 +284,7 @@ class [[nodiscard]] thread_pool } /** - * @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. + * @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. Returns a multi_future object that contains the futures for all of the blocks. * * @tparam F The type of the function to loop through. * @tparam T1 The type of the first index in the loop. Should be a signed or unsigned integer. @@ -166,56 +292,98 @@ class [[nodiscard]] thread_pool * @tparam T The common type of T1 and T2. * @tparam R The return value of the loop function F (can be void). * @param first_index The first index in the loop. - * @param index_after_last The index after the last index in the loop. The loop will iterate from first_index to (index_after_last - 1) inclusive. In other words, it will be equivalent to "for (T i = first_index; i < index_after_last; ++i)". Note that if first_index == index_after_last, no blocks will be submitted. + * @param index_after_last The index after the last index in the loop. The loop will iterate from first_index to (index_after_last - 1) inclusive. In other words, it will be equivalent to "for (T i = first_index; i < index_after_last; ++i)". Note that if index_after_last == first_index, no blocks will be submitted. * @param loop The function to loop through. Will be called once per block. Should take exactly two arguments: the first index in the block and the index after the last index in the block. loop(start, end) should typically involve a loop of the form "for (T i = start; i < end; ++i)". * @param num_blocks The maximum number of blocks to split the loop into. The default is to use the number of threads in the pool. - * @return A multi_future object that can be used to wait for all the blocks to finish. If the loop function returns a value, the multi_future object can be used to obtain the values returned by each block. + * @return A multi_future object that can be used to wait for all the blocks to finish. If the loop function returns a value, the multi_future object can also be used to obtain the values returned by each block. */ template , typename R = std::invoke_result_t, T, T>> - [[nodiscard]] multi_future parallelize_loop(const T1& first_index, const T2& index_after_last, const F& loop, size_t num_blocks = 0) + [[nodiscard]] multi_future parallelize_loop(const T1 first_index, const T2 index_after_last, F&& loop, const size_t num_blocks = 0) { - T first_index_T = static_cast(first_index); - T index_after_last_T = static_cast(index_after_last); - if (first_index_T == index_after_last_T) - return multi_future(); - if (index_after_last_T < first_index_T) - std::swap(index_after_last_T, first_index_T); - if (num_blocks == 0) - num_blocks = thread_count; - const size_t total_size = static_cast(index_after_last_T - first_index_T); - size_t block_size = static_cast(total_size / num_blocks); - if (block_size == 0) + blocks blks(first_index, index_after_last, num_blocks ? num_blocks : thread_count); + if (blks.get_total_size() > 0) { - block_size = 1; - num_blocks = total_size > 1 ? total_size : 1; + multi_future mf(blks.get_num_blocks()); + for (size_t i = 0; i < blks.get_num_blocks(); ++i) + mf.f[i] = submit(std::forward(loop), blks.start(i), blks.end(i)); + return mf; + } + else + { + return multi_future(); } - multi_future mf(num_blocks); - for (size_t i = 0; i < num_blocks; ++i) + } + + /** + * @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. Returns a multi_future object that contains the futures for all of the blocks. This overload is used for the special case where the first index is 0. + * + * @tparam F The type of the function to loop through. + * @tparam T The type of the loop indices. Should be a signed or unsigned integer. + * @tparam R The return value of the loop function F (can be void). + * @param index_after_last The index after the last index in the loop. The loop will iterate from 0 to (index_after_last - 1) inclusive. In other words, it will be equivalent to "for (T i = 0; i < index_after_last; ++i)". Note that if index_after_last == 0, no blocks will be submitted. + * @param loop The function to loop through. Will be called once per block. Should take exactly two arguments: the first index in the block and the index after the last index in the block. loop(start, end) should typically involve a loop of the form "for (T i = start; i < end; ++i)". + * @param num_blocks The maximum number of blocks to split the loop into. The default is to use the number of threads in the pool. + * @return A multi_future object that can be used to wait for all the blocks to finish. If the loop function returns a value, the multi_future object can also be used to obtain the values returned by each block. + */ + template , T, T>> + [[nodiscard]] multi_future parallelize_loop(const T index_after_last, F&& loop, const size_t num_blocks = 0) + { + return parallelize_loop(0, index_after_last, std::forward(loop), num_blocks); + } + + /** + * @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. Does not return a multi_future, so the user must use wait_for_tasks() or some other method to ensure that the loop finishes executing, otherwise bad things will happen. + * + * @tparam F The type of the function to loop through. + * @tparam T1 The type of the first index in the loop. Should be a signed or unsigned integer. + * @tparam T2 The type of the index after the last index in the loop. Should be a signed or unsigned integer. If T1 is not the same as T2, a common type will be automatically inferred. + * @tparam T The common type of T1 and T2. + * @param first_index The first index in the loop. + * @param index_after_last The index after the last index in the loop. The loop will iterate from first_index to (index_after_last - 1) inclusive. In other words, it will be equivalent to "for (T i = first_index; i < index_after_last; ++i)". Note that if index_after_last == first_index, no blocks will be submitted. + * @param loop The function to loop through. Will be called once per block. Should take exactly two arguments: the first index in the block and the index after the last index in the block. loop(start, end) should typically involve a loop of the form "for (T i = start; i < end; ++i)". + * @param num_blocks The maximum number of blocks to split the loop into. The default is to use the number of threads in the pool. + */ + template > + void push_loop(const T1 first_index, const T2 index_after_last, F&& loop, const size_t num_blocks = 0) + { + blocks blks(first_index, index_after_last, num_blocks ? num_blocks : thread_count); + if (blks.get_total_size() > 0) { - const T start = (static_cast(i * block_size) + first_index_T); - const T end = (i == num_blocks - 1) ? index_after_last_T : (static_cast((i + 1) * block_size) + first_index_T); - mf.f[i] = submit(loop, start, end); + for (size_t i = 0; i < blks.get_num_blocks(); ++i) + push_task(std::forward(loop), blks.start(i), blks.end(i)); } - return mf; } /** - * @brief Push a function with zero or more arguments, but no return value, into the task queue. + * @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. Does not return a multi_future, so the user must use wait_for_tasks() or some other method to ensure that the loop finishes executing, otherwise bad things will happen. This overload is used for the special case where the first index is 0. + * + * @tparam F The type of the function to loop through. + * @tparam T The type of the loop indices. Should be a signed or unsigned integer. + * @param index_after_last The index after the last index in the loop. The loop will iterate from 0 to (index_after_last - 1) inclusive. In other words, it will be equivalent to "for (T i = 0; i < index_after_last; ++i)". Note that if index_after_last == 0, no blocks will be submitted. + * @param loop The function to loop through. Will be called once per block. Should take exactly two arguments: the first index in the block and the index after the last index in the block. loop(start, end) should typically involve a loop of the form "for (T i = start; i < end; ++i)". + * @param num_blocks The maximum number of blocks to split the loop into. The default is to use the number of threads in the pool. + */ + template + void push_loop(const T index_after_last, F&& loop, const size_t num_blocks = 0) + { + push_loop(0, index_after_last, std::forward(loop), num_blocks); + } + + /** + * @brief Push a function with zero or more arguments, but no return value, into the task queue. Does not return a future, so the user must use wait_for_tasks() or some other method to ensure that the task finishes executing, otherwise bad things will happen. * * @tparam F The type of the function. * @tparam A The types of the arguments. * @param task The function to push. - * @param args The arguments to pass to the function. + * @param args The zero or more arguments to pass to the function. Note that if the task is a class member function, the first argument must be a pointer to the object, i.e. &object (or this), followed by the actual arguments. */ template - void push_task(const F& task, const A&... args) + void push_task(F&& task, A&&... args) { + std::function task_function = std::bind(std::forward(task), std::forward(args)...); { const std::scoped_lock tasks_lock(tasks_mutex); - if constexpr (sizeof...(args) == 0) - tasks.push(std::function(task)); - else - tasks.push(std::function([task, args...] { task(args...); })); + tasks.push(task_function); } ++tasks_total; task_available_cv.notify_one(); @@ -245,26 +413,27 @@ class [[nodiscard]] thread_pool * @tparam A The types of the zero or more arguments to pass to the function. * @tparam R The return type of the function (can be void). * @param task The function to submit. - * @param args The zero or more arguments to pass to the function. + * @param args The zero or more arguments to pass to the function. Note that if the task is a class member function, the first argument must be a pointer to the object, i.e. &object (or this), followed by the actual arguments. * @return A future to be used later to wait for the function to finish executing and/or obtain its returned value if it has one. */ template , std::decay_t...>> - [[nodiscard]] std::future submit(const F& task, const A&... args) + [[nodiscard]] std::future submit(F&& task, A&&... args) { + std::function task_function = std::bind(std::forward(task), std::forward(args)...); std::shared_ptr> task_promise = std::make_shared>(); push_task( - [task, args..., task_promise] + [task_function, task_promise] { try { if constexpr (std::is_void_v) { - task(args...); + std::invoke(task_function); task_promise->set_value(); } else { - task_promise->set_value(task(args...)); + task_promise->set_value(std::invoke(task_function)); } } catch (...) @@ -359,7 +528,7 @@ class [[nodiscard]] thread_pool { std::function task; std::unique_lock tasks_lock(tasks_mutex); - task_available_cv.wait(tasks_lock, [&] { return !tasks.empty() || !running; }); + task_available_cv.wait(tasks_lock, [this] { return !tasks.empty() || !running; }); if (running && !paused) { task = std::move(tasks.front()); @@ -450,10 +619,10 @@ class [[nodiscard]] synced_stream * @param items The items to print. */ template - void print(const T&... items) + void print(T&&... items) { const std::scoped_lock lock(stream_mutex); - (out_stream << ... << items); + (out_stream << ... << std::forward(items)); } /** @@ -463,9 +632,9 @@ class [[nodiscard]] synced_stream * @param items The items to print. */ template - void println(const T&... items) + void println(T&&... items) { - print(items..., '\n'); + print(std::forward(items)..., '\n'); } private: diff --git a/BS_thread_pool_test.cpp b/BS_thread_pool_test.cpp index 618b4b8..ce831f1 100644 --- a/BS_thread_pool_test.cpp +++ b/BS_thread_pool_test.cpp @@ -1,8 +1,8 @@ /** * @file BS_thread_pool_test.cpp * @author Barak Shoshany (baraksh@gmail.com) (http://baraksh.com) - * @version 3.1.0 - * @date 2022-07-13 + * @version 3.2.0 + * @date 2022-07-28 * @copyright Copyright (c) 2022 Barak Shoshany. Licensed under the MIT license. If you found this project useful, please consider starring it on GitHub! If you use this library in software of any kind, please provide a link to the GitHub repository https://github.com/bshoshany/thread-pool in the source code and documentation. If you use this library in published research, please cite it as follows: Barak Shoshany, "A C++17 Thread Pool for High-Performance Scientific Computing", doi:10.5281/zenodo.4742687, arXiv:2105.00613 (May 2021) * * @brief BS::thread_pool: a fast, lightweight, and easy-to-use C++17 thread pool library. This program tests all aspects of the library, but is not needed in order to use the library. @@ -16,9 +16,9 @@ #include // std::min, std::min_element, std::sort, std::unique #include // std::atomic #include // std::chrono -#include // std::abs, std::llround, std::round, std::sqrt +#include // std::abs, std::cos, std::exp, std::llround, std::log, std::round, std::sin, std::sqrt #include // std::condition_variable -#include // std::localtime, std::strftime, std::time_t +#include // std::localtime, std::strftime, std::time, std::time_t #include // std::exception #include // std::ofstream #include // std::future @@ -26,8 +26,9 @@ #include // std::fixed #include // std::cout #include // std::numeric_limits +#include // std::make_unique, std::unique_ptr #include // std::mutex, std::scoped_lock, std::unique_lock -#include // std::mt19937_64, std::random_device, std::uniform_int_distribution, std::uniform_real_distribution +#include // std::mt19937_64, std::random_device, std::uniform_int_distribution #include // std::runtime_error #include // std::string, std::to_string #include // std::this_thread, std::thread @@ -76,23 +77,23 @@ size_t tests_failed = 0; * @param items The items to print. */ template -void dual_print(const T&... items) +void dual_print(T&&... items) { - sync_cout.print(items...); + sync_cout.print(std::forward(items)...); if (output_log) - sync_file.print(items...); + sync_file.print(std::forward(items)...); } /** - * @brief Print any number of items into both std::cout and the log file, followed by a newline character, syncing both independently. + * @brief Print any number of items into both std::cout and the log file, syncing both independently. Also prints a newline character, and flushes the stream. * * @tparam T The types of the items. * @param items The items to print. */ template -void dual_println(const T&... items) +void dual_println(T&&... items) { - dual_print(items..., '\n'); + dual_print(std::forward(items)..., BS::endl); } /** @@ -169,19 +170,19 @@ void check(const T1 expected, const T2 obtained) /** * @brief Count the number of unique threads in the pool. Submits a number of tasks equal to twice the thread count into the pool. Each task stores the ID of the thread running it, and then waits until released by the main thread. This ensures that each thread in the pool runs at least one task. The number of unique thread IDs is then counted from the stored IDs. */ +std::condition_variable ID_cv, total_cv; +std::mutex ID_mutex, total_mutex; BS::concurrency_t count_unique_threads() { const BS::concurrency_t num_tasks = pool.get_thread_count() * 2; std::vector thread_IDs(num_tasks); - std::mutex ID_mutex, total_mutex; - std::condition_variable ID_cv, total_cv; std::unique_lock total_lock(total_mutex); BS::concurrency_t total_count = 0; bool ID_release = false; pool.wait_for_tasks(); for (std::thread::id& id : thread_IDs) pool.push_task( - [&] + [&total_count, &id, &ID_release] { id = std::this_thread::get_id(); { @@ -190,15 +191,16 @@ BS::concurrency_t count_unique_threads() } total_cv.notify_one(); std::unique_lock ID_lock_local(ID_mutex); - ID_cv.wait(ID_lock_local, [&] { return ID_release; }); + ID_cv.wait(ID_lock_local, [&ID_release] { return ID_release; }); }); - total_cv.wait_for(total_lock, std::chrono::milliseconds(500), [&] { return total_count == pool.get_thread_count(); }); + total_cv.wait(total_lock, [&total_count] { return total_count == pool.get_thread_count(); }); { const std::scoped_lock ID_lock(ID_mutex); ID_release = true; } ID_cv.notify_all(); - total_cv.wait_for(total_lock, std::chrono::milliseconds(500), [&] { return total_count == num_tasks; }); + total_cv.wait(total_lock, [&total_count, &num_tasks] { return total_count == num_tasks; }); + pool.wait_for_tasks(); std::sort(thread_IDs.begin(), thread_IDs.end()); return static_cast(std::unique(thread_IDs.begin(), thread_IDs.end()) - thread_IDs.begin()); } @@ -291,38 +293,192 @@ void check_submit() dual_println("Checking that submit() works for a function with no arguments and a return value..."); { bool flag = false; - std::future my_future = pool.submit( + std::future flag_future = pool.submit( [&flag] { flag = true; return 42; }); - check(my_future.get() == 42 && flag); + check(flag_future.get() == 42 && flag); } dual_println("Checking that submit() works for a function with one argument and a return value..."); { bool flag = false; - std::future my_future = pool.submit( + std::future flag_future = pool.submit( [](bool* flag_) { *flag_ = true; return 42; }, &flag); - check(my_future.get() == 42 && flag); + check(flag_future.get() == 42 && flag); } dual_println("Checking that submit() works for a function with two arguments and a return value..."); { bool flag1 = false; bool flag2 = false; - std::future my_future = pool.submit( + std::future flag_future = pool.submit( [](bool* flag1_, bool* flag2_) { *flag1_ = *flag2_ = true; return 42; }, &flag1, &flag2); - check(my_future.get() == 42 && flag1 && flag2); + check(flag_future.get() == 42 && flag1 && flag2); + } +} + +class flag_class +{ +public: + void set_flag_no_args() + { + flag = true; + } + + void set_flag_one_arg(const bool arg) + { + flag = arg; + } + + int set_flag_no_args_return() + { + flag = true; + return 42; + } + + int set_flag_one_arg_return(const bool arg) + { + flag = arg; + return 42; + } + + bool get_flag() const + { + return flag; + } + + void push_test_flag_no_args() + { + pool.push_task(&flag_class::set_flag_no_args, this); + pool.wait_for_tasks(); + check(get_flag()); + } + + void push_test_flag_one_arg() + { + pool.push_task(&flag_class::set_flag_one_arg, this, true); + pool.wait_for_tasks(); + check(get_flag()); + } + + void submit_test_flag_no_args() + { + pool.submit(&flag_class::set_flag_no_args, this).wait(); + check(get_flag()); + } + + void submit_test_flag_one_arg() + { + pool.submit(&flag_class::set_flag_one_arg, this, true).wait(); + check(get_flag()); + } + + void submit_test_flag_no_args_return() + { + std::future flag_future = pool.submit(&flag_class::set_flag_no_args_return, this); + check(flag_future.get() == 42 && get_flag()); + } + + void submit_test_flag_one_arg_return() + { + std::future flag_future = pool.submit(&flag_class::set_flag_one_arg_return, this, true); + check(flag_future.get() == 42 && get_flag()); + } + +private: + bool flag = false; +}; + +/** + * @brief Check that submitting member functions works. + */ +void check_member_function() +{ + dual_println("Checking that push_task() works for a member function with no arguments or return value..."); + { + flag_class flag; + pool.push_task(&flag_class::set_flag_no_args, &flag); + pool.wait_for_tasks(); + check(flag.get_flag()); + } + dual_println("Checking that push_task() works for a member function with one argument and no return value..."); + { + flag_class flag; + pool.push_task(&flag_class::set_flag_one_arg, &flag, true); + pool.wait_for_tasks(); + check(flag.get_flag()); + } + dual_println("Checking that submit() works for a member function with no arguments or return value..."); + { + flag_class flag; + pool.submit(&flag_class::set_flag_no_args, &flag).wait(); + check(flag.get_flag()); + } + dual_println("Checking that submit() works for a member function with one argument and no return value..."); + { + flag_class flag; + pool.submit(&flag_class::set_flag_one_arg, &flag, true).wait(); + check(flag.get_flag()); + } + dual_println("Checking that submit() works for a member function with no arguments and a return value..."); + { + flag_class flag; + std::future flag_future = pool.submit(&flag_class::set_flag_no_args_return, &flag); + check(flag_future.get() == 42 && flag.get_flag()); + } + dual_println("Checking that submit() works for a member function with one argument and a return value..."); + { + flag_class flag; + std::future flag_future = pool.submit(&flag_class::set_flag_one_arg_return, &flag, true); + check(flag_future.get() == 42 && flag.get_flag()); + } +} + +/** + * @brief Check that submitting member functions within an object works. + */ +void check_member_function_within_object() +{ + dual_println("Checking that push_task() works within an object for a member function with no arguments or return value..."); + { + flag_class flag; + flag.push_test_flag_no_args(); + } + dual_println("Checking that push_task() works within an object for a member function with one argument and no return value..."); + { + flag_class flag; + flag.push_test_flag_one_arg(); + } + dual_println("Checking that submit() works within an object for a member function with no arguments or return value..."); + { + flag_class flag; + flag.submit_test_flag_no_args(); + } + dual_println("Checking that submit() works within an object for a member function with one argument and no return value..."); + { + flag_class flag; + flag.submit_test_flag_one_arg(); + } + dual_println("Checking that submit() works within an object for a member function with no arguments and a return value..."); + { + flag_class flag; + flag.submit_test_flag_no_args_return(); + } + dual_println("Checking that submit() works within an object for a member function with one argument and a return value..."); + { + flag_class flag; + flag.submit_test_flag_one_arg_return(); } } @@ -353,29 +509,42 @@ void check_wait_for_tasks() // ======================================== /** - * @brief Check that parallelize_loop() works for a specific number of indices split over a specific number of tasks, with no return value. + * @brief Check that push_loop() or parallelize_loop() work for a specific range of indices split over a specific number of tasks, with no return value. * - * @param start The first index in the loop. - * @param end The last index in the loop plus 1. + * @param random_start The first index in the loop. + * @param random_end The last index in the loop plus 1. * @param num_tasks The number of tasks. + * @param use_push Whether to check push_loop() instead of parallelize_loop(). */ -void check_parallelize_loop_no_return(const int64_t random_start, int64_t random_end, const BS::concurrency_t num_tasks) +template +void check_parallelize_loop_no_return(const int64_t random_start, T random_end, const BS::concurrency_t num_tasks, const bool use_push = false) { if (random_start == random_end) ++random_end; - dual_println("Verifying that a loop from ", random_start, " to ", random_end, " with ", num_tasks, num_tasks == 1 ? " task" : " tasks", " modifies all indices..."); + dual_println("Verifying that ", use_push ? "push_loop()" : "parallelize_loop()", " from ", random_start, " to ", random_end, " with ", num_tasks, num_tasks == 1 ? " task" : " tasks", " modifies all indices..."); const size_t num_indices = static_cast(std::abs(random_end - random_start)); - const int64_t offset = std::min(random_start, random_end); + const int64_t offset = std::min(random_start, static_cast(random_end)); std::unique_ptr[]> flags = std::make_unique[]>(num_indices); - pool.parallelize_loop( - random_start, random_end, - [&flags, offset](const int64_t start, const int64_t end) - { - for (int64_t i = start; i < end; ++i) - flags[static_cast(i - offset)] = true; - }, - num_tasks) - .wait(); + const auto loop = [&flags, offset](const int64_t start, const int64_t end) + { + for (int64_t i = start; i < end; ++i) + flags[static_cast(i - offset)] = true; + }; + if (use_push) + { + if (random_start == 0) + pool.push_loop(random_end, loop, num_tasks); + else + pool.push_loop(random_start, random_end, loop, num_tasks); + pool.wait_for_tasks(); + } + else + { + if (random_start == 0) + pool.parallelize_loop(random_end, loop, num_tasks).wait(); + else + pool.parallelize_loop(random_start, random_end, loop, num_tasks).wait(); + } bool all_flags = true; for (size_t i = 0; i < num_indices; ++i) all_flags = all_flags && flags[i]; @@ -383,28 +552,25 @@ void check_parallelize_loop_no_return(const int64_t random_start, int64_t random } /** - * @brief Check that parallelize_loop() works for a specific number of indices split over a specific number of tasks, with a return value. + * @brief Check that parallelize_loop() works for a specific range of indices split over a specific number of tasks, with a return value. * - * @param start The first index in the loop. - * @param end The last index in the loop plus 1. + * @param random_start The first index in the loop. + * @param random_end The last index in the loop plus 1. * @param num_tasks The number of tasks. */ void check_parallelize_loop_return(const int64_t random_start, int64_t random_end, const BS::concurrency_t num_tasks) { if (random_start == random_end) ++random_end; - dual_println("Verifying that a loop from ", random_start, " to ", random_end, " with ", num_tasks, num_tasks == 1 ? " task" : " tasks", " correctly sums all indices..."); - const std::vector sums_vector = pool.parallelize_loop( - random_start, random_end, - [](const int64_t start, const int64_t end) - { - int64_t total = 0; - for (int64_t i = start; i < end; ++i) - total += i; - return total; - }, - num_tasks) - .get(); + dual_println("Verifying that parallelize_loop() from ", random_start, " to ", random_end, " with ", num_tasks, num_tasks == 1 ? " task" : " tasks", " correctly sums all indices..."); + const auto loop = [](const int64_t start, const int64_t end) + { + int64_t total = 0; + for (int64_t i = start; i < end; ++i) + total += i; + return total; + }; + const std::vector sums_vector = (random_start == 0) ? pool.parallelize_loop(random_end, loop, num_tasks).get() : pool.parallelize_loop(random_start, random_end, loop, num_tasks).get(); int64_t sum = 0; for (const int64_t& s : sums_vector) sum += s; @@ -412,7 +578,7 @@ void check_parallelize_loop_return(const int64_t random_start, int64_t random_en } /** - * @brief Check that parallelize_loop() works using several different random values for the range of indices and number of tasks. + * @brief Check that push_loop() and parallelize_loop() work using several different random values for the range of indices and number of tasks. */ void check_parallelize_loop() { @@ -420,10 +586,25 @@ void check_parallelize_loop() std::uniform_int_distribution index_dist(-1000000, 1000000); std::uniform_int_distribution task_dist(1, pool.get_thread_count()); constexpr uint64_t n = 10; + for (uint64_t i = 0; i < n; ++i) + check_parallelize_loop_no_return(index_dist(mt), index_dist(mt), task_dist(mt), true); for (uint64_t i = 0; i < n; ++i) check_parallelize_loop_no_return(index_dist(mt), index_dist(mt), task_dist(mt)); for (uint64_t i = 0; i < n; ++i) check_parallelize_loop_return(index_dist(mt), index_dist(mt), task_dist(mt)); + dual_println("Verifying that parallelize_loop() with identical start and end indices does nothing..."); + bool flag = true; + const int64_t index = index_dist(mt); + pool.parallelize_loop(index, index, [&flag](const int64_t, const int64_t) { flag = false; }).wait(); + check(flag); + dual_println("Trying parallelize_loop() with start and end indices of different types:"); + const int64_t start = index_dist(mt); + const uint32_t end = static_cast(std::abs(index_dist(mt))); + check_parallelize_loop_no_return(start, end, task_dist(mt)); + dual_println("Trying the overloads for push_loop() and parallelize_loop() for the case where the first index is equal to 0:"); + check_parallelize_loop_no_return(0, index_dist(mt), task_dist(mt), true); + check_parallelize_loop_no_return(0, index_dist(mt), task_dist(mt)); + check_parallelize_loop_return(0, index_dist(mt), task_dist(mt)); } // =============================================== @@ -450,27 +631,32 @@ void check_task_monitoring() }); constexpr std::chrono::milliseconds sleep_time(300); std::this_thread::sleep_for(sleep_time); + dual_println("After submission, should have: ", n * 3, " tasks total, ", n, " tasks running, ", n * 2, " tasks queued..."); dual_print("Result: ", pool.get_tasks_total(), " tasks total, ", pool.get_tasks_running(), " tasks running, ", pool.get_tasks_queued(), " tasks queued "); check(pool.get_tasks_total() == n * 3 && pool.get_tasks_running() == n && pool.get_tasks_queued() == n * 2); for (BS::concurrency_t i = 0; i < n; ++i) release[i] = true; std::this_thread::sleep_for(sleep_time); + dual_println("After releasing ", n, " tasks, should have: ", n * 2, " tasks total, ", n, " tasks running, ", n, " tasks queued..."); dual_print("Result: ", pool.get_tasks_total(), " tasks total, ", pool.get_tasks_running(), " tasks running, ", pool.get_tasks_queued(), " tasks queued "); check(pool.get_tasks_total() == n * 2 && pool.get_tasks_running() == n && pool.get_tasks_queued() == n); for (BS::concurrency_t i = n; i < n * 2; ++i) release[i] = true; std::this_thread::sleep_for(sleep_time); + dual_println("After releasing ", n, " more tasks, should have: ", n, " tasks total, ", n, " tasks running, ", 0, " tasks queued..."); dual_print("Result: ", pool.get_tasks_total(), " tasks total, ", pool.get_tasks_running(), " tasks running, ", pool.get_tasks_queued(), " tasks queued "); check(pool.get_tasks_total() == n && pool.get_tasks_running() == n && pool.get_tasks_queued() == 0); for (BS::concurrency_t i = n * 2; i < n * 3; ++i) release[i] = true; std::this_thread::sleep_for(sleep_time); + dual_println("After releasing the final ", n, " tasks, should have: ", 0, " tasks total, ", 0, " tasks running, ", 0, " tasks queued..."); dual_print("Result: ", pool.get_tasks_total(), " tasks total, ", pool.get_tasks_running(), " tasks running, ", pool.get_tasks_queued(), " tasks queued "); check(pool.get_tasks_total() == 0 && pool.get_tasks_running() == 0 && pool.get_tasks_queued() == 0); + dual_println("Resetting pool to ", std::thread::hardware_concurrency(), " threads."); pool.reset(std::thread::hardware_concurrency()); } @@ -493,35 +679,42 @@ void check_pausing() std::this_thread::sleep_for(std::chrono::milliseconds(200)); dual_println("Task ", i, " done."); }); + dual_println("Immediately after submission, should have: ", n * 3, " tasks total, ", 0, " tasks running, ", n * 3, " tasks queued..."); dual_print("Result: ", pool.get_tasks_total(), " tasks total, ", pool.get_tasks_running(), " tasks running, ", pool.get_tasks_queued(), " tasks queued "); check(pool.get_tasks_total() == n * 3 && pool.get_tasks_running() == 0 && pool.get_tasks_queued() == n * 3); std::this_thread::sleep_for(std::chrono::milliseconds(300)); + dual_println("300ms later, should still have: ", n * 3, " tasks total, ", 0, " tasks running, ", n * 3, " tasks queued..."); dual_print("Result: ", pool.get_tasks_total(), " tasks total, ", pool.get_tasks_running(), " tasks running, ", pool.get_tasks_queued(), " tasks queued "); check(pool.get_tasks_total() == n * 3 && pool.get_tasks_running() == 0 && pool.get_tasks_queued() == n * 3); dual_println("Unpausing pool."); pool.paused = false; std::this_thread::sleep_for(std::chrono::milliseconds(300)); + dual_println("300ms later, should have: ", n * 2, " tasks total, ", n, " tasks running, ", n, " tasks queued..."); dual_print("Result: ", pool.get_tasks_total(), " tasks total, ", pool.get_tasks_running(), " tasks running, ", pool.get_tasks_queued(), " tasks queued "); check(pool.get_tasks_total() == n * 2 && pool.get_tasks_running() == n && pool.get_tasks_queued() == n); dual_println("Pausing pool and using wait_for_tasks() to wait for the running tasks."); pool.paused = true; pool.wait_for_tasks(); + dual_println("After waiting, should have: ", n, " tasks total, ", 0, " tasks running, ", n, " tasks queued..."); dual_print("Result: ", pool.get_tasks_total(), " tasks total, ", pool.get_tasks_running(), " tasks running, ", pool.get_tasks_queued(), " tasks queued "); check(pool.get_tasks_total() == n && pool.get_tasks_running() == 0 && pool.get_tasks_queued() == n); std::this_thread::sleep_for(std::chrono::milliseconds(200)); + dual_println("200ms later, should still have: ", n, " tasks total, ", 0, " tasks running, ", n, " tasks queued..."); dual_print("Result: ", pool.get_tasks_total(), " tasks total, ", pool.get_tasks_running(), " tasks running, ", pool.get_tasks_queued(), " tasks queued "); check(pool.get_tasks_total() == n && pool.get_tasks_running() == 0 && pool.get_tasks_queued() == n); dual_println("Unpausing pool and using wait_for_tasks() to wait for all tasks."); pool.paused = false; pool.wait_for_tasks(); + dual_println("After waiting, should have: ", 0, " tasks total, ", 0, " tasks running, ", 0, " tasks queued..."); dual_print("Result: ", pool.get_tasks_total(), " tasks total, ", pool.get_tasks_running(), " tasks running, ", pool.get_tasks_queued(), " tasks queued "); check(pool.get_tasks_total() == 0 && pool.get_tasks_running() == 0 && pool.get_tasks_queued() == 0); + dual_println("Resetting pool to ", std::thread::hardware_concurrency(), " threads."); pool.reset(std::thread::hardware_concurrency()); } @@ -531,17 +724,18 @@ void check_pausing() // ====================================== /** - * @brief Check that exception handling work. + * @brief Check that exception handling works. */ void check_exceptions() { + dual_println("Checking that exceptions are forwarded correctly by submit()..."); bool caught = false; - std::future my_future = pool.submit( - [] - { - dual_println("Throwing exception..."); - throw std::runtime_error("Exception thrown!"); - }); + auto throws = [] + { + dual_println("Throwing exception..."); + throw std::runtime_error("Exception thrown!"); + }; + std::future my_future = pool.submit(throws); try { my_future.get(); @@ -552,6 +746,22 @@ void check_exceptions() caught = true; } check(caught); + + dual_println("Checking that exceptions are forwarded correctly by BS::multi_future..."); + caught = false; + BS::multi_future my_future2; + my_future2.f.push_back(pool.submit(throws)); + my_future2.f.push_back(pool.submit(throws)); + try + { + void(my_future2.get()); + } + catch (const std::exception& e) + { + if (e.what() == std::string("Exception thrown!")) + caught = true; + } + check(caught); } // ===================================== @@ -626,10 +836,16 @@ void do_tests() print_header("Checking that submit() works:"); check_submit(); + print_header("Checking that submitting member functions works:"); + check_member_function(); + + print_header("Checking that submitting member functions from within an object works:"); + check_member_function_within_object(); + print_header("Checking that wait_for_tasks() works..."); check_wait_for_tasks(); - print_header("Checking that parallelize_loop() works:"); + print_header("Checking that push_loop() and parallelize_loop() work:"); check_parallelize_loop(); print_header("Checking that task monitoring works:"); @@ -698,18 +914,19 @@ std::pair analyze(const std::vector::max()); - std::uniform_real_distribution vector_dist(-range, range); - // Initialize a timer object to measure execution time. BS::timer tmr; @@ -733,42 +946,47 @@ void check_performance() // Define the number of tasks to try in each run of the test (0 = single-threaded). const BS::concurrency_t try_tasks[] = {0, thread_count / 4, thread_count / 2, thread_count, thread_count * 2, thread_count * 4}; - // The size of the vectors to use for the test. - constexpr size_t vector_size = 500; - // How many times to repeat each run of the test in order to collect reliable statistics. constexpr size_t repeat = 20; dual_println("Each test will be repeated ", repeat, " times to collect reliable statistics."); - // The target duration of the single-threaded test in milliseconds. The total time spent on the test in the single-threaded case will be approximately equal to repeat * target_ms. - constexpr std::chrono::milliseconds::rep target_ms = 300; - - // Vectors to store statistics. - std::vector different_n_timings; - std::vector same_n_timings; + // The target execution time, in milliseconds, of the multi-threaded test with the number of blocks equal to the number of threads. The total time spent on that test will be approximately equal to repeat * target_ms. + constexpr std::chrono::milliseconds::rep target_ms = 50; - // Test how many vectors we need to generate to roughly achieve the target duration. - size_t num_vectors = 1; - do + // Test how many vectors we need to generate, and of what size, to roughly achieve the target execution time. + dual_println("Determining the number and size of vectors to generate in order to achieve an approximate mean execution time of ", target_ms, " ms with ", thread_count, " tasks..."); + size_t num_vectors = 64; + size_t vector_size = 64; + std::vector> vectors; + auto loop = [&vectors, &vector_size](const size_t start, const size_t end) { - num_vectors *= 2; - std::vector> vectors(num_vectors, std::vector(vector_size)); - std::mt19937_64 test_mt(rd()); - tmr.start(); - for (size_t i = 0; i < num_vectors; ++i) + for (size_t i = start; i < end; ++i) { for (size_t j = 0; j < vector_size; ++j) - vectors[i][j] = vector_dist(test_mt); + vectors[i][j] = generate_element(i, j); } + }; + do + { + num_vectors *= 2; + vector_size *= 2; + vectors = std::vector>(num_vectors, std::vector(vector_size)); + tmr.start(); + pool.push_loop(num_vectors, loop); + pool.wait_for_tasks(); tmr.stop(); } while (tmr.ms() < target_ms); - num_vectors = static_cast(std::llround(static_cast(num_vectors) * static_cast(target_ms) / static_cast(tmr.ms()))); + num_vectors = thread_count * static_cast(std::llround(static_cast(num_vectors) * static_cast(target_ms) / static_cast(tmr.ms()) / thread_count)); // Initialize the desired number of vectors. - std::vector> vectors(num_vectors, std::vector(vector_size)); + vectors = std::vector>(num_vectors, std::vector(vector_size)); + + // Define vectors to store statistics. + std::vector different_n_timings; + std::vector same_n_timings; // Perform the test. - dual_println("\nGenerating ", num_vectors, " random vectors with ", vector_size, " elements each:"); + dual_println("Generating ", num_vectors, " vectors with ", vector_size, " elements each:"); for (BS::concurrency_t n : try_tasks) { for (size_t r = 0; r < repeat; ++r) @@ -776,27 +994,15 @@ void check_performance() tmr.start(); if (n > 1) { - pool.parallelize_loop( - 0, num_vectors, - [&vector_dist, &vectors](const size_t start, const size_t end) - { - std::mt19937_64 multi_mt(generate_seed()); - for (size_t i = start; i < end; ++i) - { - for (size_t j = 0; j < vector_size; ++j) - vectors[i][j] = vector_dist(multi_mt); - } - }, - n) - .wait(); + pool.push_loop(num_vectors, loop, n); + pool.wait_for_tasks(); } else { - std::mt19937_64 single_mt(generate_seed()); for (size_t i = 0; i < num_vectors; ++i) { for (size_t j = 0; j < vector_size; ++j) - vectors[i][j] = vector_dist(single_mt); + vectors[i][j] = generate_element(i, j); } } tmr.stop(); diff --git a/CHANGELOG.md b/CHANGELOG.md index 98e743d..574e409 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,9 +10,13 @@ # `BS::thread_pool`: a fast, lightweight, and easy-to-use C++17 thread pool library -By Barak Shoshany ([baraksh@gmail.com](mailto:baraksh@gmail.com)) ([https://baraksh.com/](https://baraksh.com/)) +By Barak Shoshany
+Email: [baraksh@gmail.com](mailto:baraksh@gmail.com)
+Website: [https://baraksh.com/](https://baraksh.com/)
+GitHub: [https://github.com/bshoshany](https://github.com/bshoshany)
* [Version history](#version-history) + * [v3.2.0 (2022-07-28)](#v320-2022-07-28) * [v3.1.0 (2022-07-13)](#v310-2022-07-13) * [v3.0.0 (2022-05-30)](#v300-2022-05-30) * [v2.0.0 (2021-08-14)](#v200-2021-08-14) @@ -29,6 +33,32 @@ By Barak Shoshany ([baraksh@gmail.com](mailto:baraksh@gmail.com)) ([https://bara ## Version history +### v3.2.0 (2022-07-28) + +* `BS_thread_pool.hpp`: + * Main `BS::thread_pool` class: + * Added a new member function, `push_loop()`, which does the same thing as `parallelize_loop()`, except that it does not return a `BS::multi_future` with the futures for each block. Just like `push_task()` vs. `submit()`, this avoids the overhead of creating the futures, but the user must use `wait_for_tasks()` or some other method to ensure that the loop finishes executing, otherwise bad things will happen. + * `push_task()` and `submit()` now utilize perfect forwarding in order to support more types of tasks - in particular member functions, which in previous versions could not be submitted unless wrapped in a lambda. To submit a member function, use the syntax `submit(&class::function, &object, args)`. More information can be found in `README.md`. See [#9](https://github.com/bshoshany/thread-pool/issues/9). + * `push_loop()` and `parallelize_loop()` now have overloads where the first argument (the first index in the loop) is omitted, in which case it is assumed to be 0. This is for convenience, as the case where the first index is 0 is very common. + * Helper classes: + * `BS::synced_stream` now utilizes perfect forwarding in the member functions `print()` and `println()`. + * Previously, it was impossible to pass the flushing manipulators `std::endl` and `std::flush` to `print()` and `println()`, since the compiler could not figure out which template specializations to use. The new objects `BS::endl` and `BS::flush` are explicit casts of these manipulators, whose sole purpose is to enable passing them to `print()` and `println()`. + * `BS::multi_future::get()` now rethrows exceptions generated by the futures, even if the futures return `void`. See [#62](https://github.com/bshoshany/thread-pool/pull/62). + * Added a new helper class, `BS::blocks`, which is used by `parallelize_loop()` and `push_loop()` to divide a range into blocks. This class is not documented in `README.md`, as it most likely will not be of interest to most users, but it is still publicly available, in case you want to parallelize something manually but still benefit from the built-in algorithm for splitting a range into blocks. +* `BS_thread_pool_test.cpp`: + * Added plenty of new tests for the new features described above. + * Fixed a bug in `count_unique_threads()` that caused it to get stuck on certain systems. + * `dual_println()` now also flushes the stream using `BS::endl`, so that if the test gets stuck, the log file will still contain everything up to that point. (Note: It is a common misconception that `std::endl` and `'\n'` are interchangeable. `std::endl` not only prints a newline character, it also flushes the stream, which is not always desirable, as it may reduce performance.) + * The performance test has been modified as follows: + * Instead of generating random vectors using `std::mersenne_twister_engine`, which proved to be inconsistent across different compilers and systems, the test now generates each element via an arbitrarily-chosen numerical operation. In my testing, this provided much more consistent results. + * Instead of using a hard-coded vector size, a suitable vector size is now determined dynamically at runtime. + * Instead of using `parallelize_loop()`, the test now uses the new `push_loop()` function to squeeze out a bit more performance. + * Instead of setting the test parameters to achieve a fixed single-threaded mean execution time of 300 ms, the test now aims to achieve a fixed multi-threaded mean execution time of 50 ms when the number of blocks is equal to the number of threads. This allows for more reliable results on very fast CPUs with a very large number of threads, where the mean execution time when using all the threads could previously be below a statistically significant value. + * The number of vectors is now restricted to be a multiple of the number of threads, so that the blocks are always all of the same size. +* `README.md`: + * Added instructions and examples for the new features described above. + * Rewrote the documentation for `parallelize_loop()` to make it clearer. + ### v3.1.0 (2022-07-13) * `BS_thread_pool.hpp`: diff --git a/README.md b/README.md index 2f6acfa..17fff67 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ Email: [baraksh@gmail.com](mailto:baraksh@gmail.com)
Website: [https://baraksh.com/](https://baraksh.com/)
GitHub: [https://github.com/bshoshany](https://github.com/bshoshany)
-This is the complete documentation for v3.1.0 of the library, released on 2022-07-13. +This is the complete documentation for v3.2.0 of the library, released on 2022-07-28. * [Introduction](#introduction) * [Motivation](#motivation) @@ -32,8 +32,10 @@ This is the complete documentation for v3.1.0 of the library, released on 2022-0 * [Submitting tasks to the queue with futures](#submitting-tasks-to-the-queue-with-futures) * [Submitting tasks to the queue without futures](#submitting-tasks-to-the-queue-without-futures) * [Manually waiting for all tasks to complete](#manually-waiting-for-all-tasks-to-complete) + * [Submitting class member functions to the queue](#submitting-class-member-functions-to-the-queue) * [Parallelizing loops](#parallelizing-loops) * [Loops with return values](#loops-with-return-values) + * [Parallelizing loops without futures](#parallelizing-loops-without-futures) * [Helper classes](#helper-classes) * [Handling multiple futures at once](#handling-multiple-futures-at-once) * [Synchronizing printing to an output stream](#synchronizing-printing-to-an-output-stream) @@ -76,7 +78,7 @@ Other, more advanced multithreading libraries may offer more features and/or hig * Reusing threads avoids the overhead of creating and destroying them for individual tasks. * A task queue ensures that there are never more threads running in parallel than allowed by the hardware. * **Lightweight:** - * Only ~190 lines of code, excluding comments, blank lines, and the two optional helper classes. + * Only ~190 lines of code, excluding comments, blank lines, and the optional helper classes. * Single header file: simply `#include "BS_thread_pool.hpp"` and you're all set! * Header-only: no need to install or build the library. * Self-contained: no external requirements or dependencies. @@ -97,6 +99,7 @@ Other, more advanced multithreading libraries may offer more features and/or hig * Monitor the number of queued and/or running tasks using the `get_tasks_queued()`, `get_tasks_running()`, and `get_tasks_total()` member functions. * Freely pause and resume the pool by modifying the `paused` member variable. When paused, threads do not retrieve new tasks out of the queue. * Catch exceptions thrown by the submitted tasks. + * Submit class member functions to the pool, either applied to a specific object or from within the object itself. * Under continuous and active development. Bug reports and feature requests are welcome, and should be made via [GitHub issues](https://github.com/bshoshany/thread-pool/issues). ### Compiling and compatibility @@ -104,12 +107,12 @@ Other, more advanced multithreading libraries may offer more features and/or hig This library should successfully compile on any C++17 standard-compliant compiler, on all operating systems and architectures for which such a compiler is available. Compatibility was verified with a 12-core / 24-thread AMD Ryzen 9 3900X CPU using the following compilers and platforms: * Windows 11 build 22000.795: - * [GCC](https://gcc.gnu.org/) v12.1.0 ([WinLibs build](https://winlibs.com/)) * [Clang](https://clang.llvm.org/) v14.0.6 + * [GCC](https://gcc.gnu.org/) v12.1.0 ([WinLibs build](https://winlibs.com/)) * [MSVC](https://docs.microsoft.com/en-us/cpp/) v19.32.31332 * Ubuntu 22.04 LTS: - * [GCC](https://gcc.gnu.org/) v12.0.1 * [Clang](https://clang.llvm.org/) v14.0.0 + * [GCC](https://gcc.gnu.org/) v12.0.1 In addition, this library was tested on a [Compute Canada](https://www.computecanada.ca/) node equipped with two 20-core / 40-thread Intel Xeon Gold 6148 CPUs (for a total of 40 cores and 80 threads), running CentOS Linux 7.9.2009, using [GCC](https://gcc.gnu.org/) v12.1.1. @@ -117,20 +120,20 @@ The test program `BS_thread_pool_test.cpp` was compiled without warnings (with t As this library requires C++17 features, the code must be compiled with C++17 support: -* For GCC or Clang, use the `-std=c++17` flag. On Linux, you will also need to use the `-pthread` flag to enable the POSIX threads library. -* For Intel, use `-std=c++17` on Linux or `/Qstd:c++17` on Windows. +* For Clang or GCC, use the `-std=c++17` flag. On Linux, you will also need to use the `-pthread` flag to enable the POSIX threads library. * For MSVC, use `/std:c++17`, and preferably also `/permissive-` to ensure standards conformance. For maximum performance, it is recommended to compile with all available compiler optimizations: -* For GCC or Clang, use the `-O3` flag. -* For Intel, use `-O3` on Linux or `/O3` on Windows. +* For Clang or GCC, use the `-O3` flag. * For MSVC, use `/O2`. As an example, to compile the test program `BS_thread_pool_test.cpp` with warnings and optimizations, it is recommended to use the following commands: -* On Windows with MSVC: `cl BS_thread_pool_test.cpp /std:c++17 /permissive- /O2 /W4 /EHsc /Fe:BS_thread_pool_test.exe` * On Linux with GCC: `g++ BS_thread_pool_test.cpp -std=c++17 -O3 -Wall -Wextra -Wconversion -Wsign-conversion -Wpedantic -Weffc++ -Wshadow -pthread -o BS_thread_pool_test` +* On Linux with Clang: replace `g++` with `clang++`. +* On Windows with GCC or Clang: replace `-o BS_thread_pool_test` with `-o BS_thread_pool_test.exe` and remove `-pthread`. +* On Windows with MSVC: `cl BS_thread_pool_test.cpp /std:c++17 /permissive- /O2 /W4 /EHsc /Fe:BS_thread_pool_test.exe` ### Installing using vcpkg @@ -330,6 +333,8 @@ pool.push_task(task, arg); pool.push_task(task, arg1, arg2); ``` +**Warning!** Since `push_task()` does not return a future, there is no built-in way for the user to know when the task finishes executing. You must use either `wait_for_tasks()` (see below), or some other method such as condition variables, to ensure that the task finishes executing before trying to use anything that depends on its output. Otherwise, bad things will happen! + ### Manually waiting for all tasks to complete To wait for a **single** submitted task to complete, use `submit()` and then use the `wait()` or `get()` member functions of the obtained future. However, in cases where you need to wait until **all** submitted tasks finish their execution, or if the tasks have been submitted without futures using `push_task()`, you can use the member function `wait_for_tasks()`. @@ -364,9 +369,109 @@ after the `for` loop will ensure - as efficiently as possible - that all tasks h Note, however, that `wait_for_tasks()` will wait for **all** the tasks in the queue, including those that are unrelated to the `for` loop. Using [`parallelize_loop()`](#parallelizing-loops) would make much more sense in this particular case, as it will allow waiting only for the tasks related to the loop. +### Submitting class member functions to the queue + +Consider the following program: + +```cpp +#include "BS_thread_pool.hpp" + +BS::thread_pool pool; + +class flag_class +{ +public: + bool get_flag() const + { + return flag; + } + + void set_flag(const bool arg) + { + flag = arg; + } + +private: + bool flag = false; +}; + +int main() +{ + flag_class flag_object; + flag_object.set_flag(true); + std::cout << std::boolalpha << flag_object.get_flag() << '\n'; +} +``` + +This program creates a new object `flag_object` of the class `flag_class`, sets the flag to `true` using the member function `set_flag()`, and then prints out the flag's value. But what if you want to submit the member function `set_flag()` as a task to the thread pool? + +To submit member functions to the pool, use the following general syntax: + +```cpp +pool.push_task(&class::function, &object, args); +``` + +The same syntax also works with `submit()`. Note that, in the second argument, you must specify the object on which the member function will be executed (unless it's a static member function, in which case you just submit it like any other function). Also note that both the first and second arguments must be **pointers**, so they must be preceded by the `&` operator. + +If you remove the `&` from the first argument, the code won't work with most compilers, and if you remove the `&` from the second argument, the function will act on a **copy** of the object, rather than on the object itself, so any changes made to the object will not be saved. Therefore, it's important to ensure that both arguments are pointers. + +To make the above program submit the member function `set_flag()` to the thread pool, simply replace the line: + +```cpp +flag_object.set_flag(true); +``` + +with: + +```cpp +pool.push_task(&flag_class::set_flag, &flag_object, true); +pool.wait_for_tasks(); +``` + +Here the class is `flag_class`, the name of the function is `set_flag`, the object we want the function to act on is `flag_object`, and the argument to pass to the function is `true`. + +Another thing you might want to do is call a member function from within the object itself, that is, from another member function. This follows a similar syntax, except that you don't need to specify the class, and you use `this` to get a pointer to the current object (no `&` necessary, since `this` is already a pointer). Here is an example, this time using `submit()`: + +```cpp +#include "BS_thread_pool.hpp" + +BS::thread_pool pool; + +class flag_class +{ +public: + bool get_flag() const + { + return flag; + } + + void set_flag(const bool arg) + { + flag = arg; + } + + void set_flag_to_true() + { + pool.submit(&flag_class::set_flag, this, true).wait(); + } + +private: + bool flag = false; +}; + +int main() +{ + flag_class flag_object; + flag_object.set_flag_to_true(); + std::cout << std::boolalpha << flag_object.get_flag() << '\n'; +} +``` + ### Parallelizing loops -Consider the following loop: +One of the most common and effective methods of parallelization is splitting a loop into smaller loops and running them in parallel. It is most effective in "embarrassingly parallel" computations, such as vector or matrix operations, where each iteration of the loop is completely independent of every other iteration. For example, if we are summing up two vectors of 1000 elements each, and we have 10 threads, we could split the summation into 10 blocks of 100 elements each, and run all the blocks in parallel, potentially increasing performance by up to a factor of 10. + +`BS::thread_pool` can automatically parallelize loops. To see how this works, consider the following generic loop: ```cpp for (T i = start; i < end; ++i) @@ -379,7 +484,25 @@ where: * The loop is over the range `[start, end)`, i.e. inclusive of `start` but exclusive of `end`. * `do_something()` is an operation performed for each loop index `i`, such as modifying an array with `end - start` elements. -This loop may be automatically parallelized and submitted to the thread pool's queue using the member function `parallelize_loop()` as follows: +This loop may be automatically parallelized and submitted to the thread pool's queue using the member function `parallelize_loop()`, which has the follows syntax: + +```cpp +pool.parallelize_loop(start, end, loop, num_blocks); +``` + +where: + +* `start` is the first index in the range. + * This argument can be omitted, in which case it is assumed that the loop starts at 0. That is, `parallelize_loop(end, loop, num_blocks)` is equivalent to `parallelize_loop(0, end, loop, num_blocks)`. +* `end` is the index after the last index in the range, such that the full range is `[start, end)`. In other words, the loop will be equivalent to the one above if `start` and `end` are the same. + * `start` and `end` should both be integers, but they need not be of the same integer type. `parallelize_loop()` will automatically determine the best type to use for the loop indices. +* `loop()` is any function that takes two indices, `a`, and `b`, and executes only the portion of the loop in the range `[a, b)`. Typically, `loop()` will include a `for` loop of the form `for (T i = a; i < b; ++i)`. +* `num_blocks` is the number of blocks of the form `[a, b)` to split the loop into. For example, if the range is `[0, 9)` and there are 3 blocks, then the blocks will be the ranges `[0, 3)`, `[3, 6)`, and `[6, 9)`. If possible, the blocks will be equal in size; otherwise, the last block may be a bit longer. + * This argument can be omitted, in which case the number of blocks will be the number of threads in the pool. + +Each block will be submitted to the thread pool's queue as a separate task. Therefore, a loop that is split into 3 blocks will be split into 3 individual tasks, which may run in parallel. If there is only one block, then the entire loop will run as one task, and no parallelization will take place. + +To parallelize the generic loop above, we use the following code: ```cpp auto loop = [](const T a, const T b) @@ -387,23 +510,33 @@ auto loop = [](const T a, const T b) for (T i = a; i < b; ++i) do_something(i); }; -BS::multi_future loop_future = pool.parallelize_loop(start, end, loop, n); +BS::multi_future loop_future = pool.parallelize_loop(start, end, loop, num_blocks); loop_future.wait(); ``` -Here's how this works: +Here we defined `loop()` as a lambda function. Of course, `loop()` could also be defined as a lambda within the call to `parallelize_loop()` itself, as in the examples below; or it could be any ordinary function, but a lambda is preferred since one typically would like to capture some of the surrounding variables. + +`parallelize_loop()` returns an object of the helper class template `BS::multi_future`. Each of the `num_blocks` blocks will have an `std::future` assigned to it, and all these futures will be stored inside the returned `BS::multi_future` object. When `loop_future.wait()` is called, the main thread will wait until all tasks generated by `parallelize_loop()` finish executing, and only those tasks - not any other tasks that also happen to be in the queue. This is essentially the role of the `BS::multi_future` class: to wait for a specific **group of tasks**, in this case the tasks running the loop blocks. + +What value should you use for `num_blocks`? Omitting this argument, so that the number of blocks will be equal to the number of threads in the pool, is typically a good choice. For best performance, it is recommended to do your own benchmarks to find the optimal number of blocks for each loop (you can use the `BS::timer` helper class - see [below](#measuring-execution-time)). Using less tasks than there are threads may be preferred if you are also running other tasks in parallel. Using more tasks than there are threads may improve performance in some cases. + +As a simple example, the following code calculates and prints the squares of all integers from 0 to 99: -* The lambda function `loop()` takes two indices, `a`, and `b`, and executes only the portion of the loop in the range `[a, b)`. - * Note that this lambda was defined here separately for clarity. In practice, the lambda function will usually be defined within the call to `parallelize_loop()` itself, as in the examples below. - * `loop()` can also be an ordinary function (with or without a return value) instead of a lambda function, but that may be less useful, since typically one would like to capture some of the surrounding variables. -* When `parallelize_loop(start, end, loop, n)` is called, it will divide the range of indices `[start, end)` into `n` blocks of the form `[a, b)`. For example, if the range is `[0, 9)` and there are 3 blocks, then the blocks will be the ranges `[0, 3)`, `[3, 6)`, and `[6, 9)`. If possible, the blocks will be equal in size, otherwise the last block may be a bit longer. -* Then, a task will be submitted for each block, consisting of the function `loop()` with its two arguments being the start and end of the range `[a, b)` of each block. -* Each task will have an `std::future` assigned to it, and all these futures will be stored inside an object `loop_future` of the helper class template `BS::multi_future`. -* When `loop_future.wait()` is called, the main thread will wait until all tasks generated by `parallelize_loop()` finish executing, and only those tasks - not any other tasks that also happen to be in the queue. This is essentially the role of the `BS::multi_future` class: to wait for a specific group of tasks, in this case the tasks running the loop blocks. +```cpp +#include "BS_thread_pool.hpp" -If the fourth argument `n` is not specified, the number of blocks will be equal to the number of threads in the pool. For best performance, it is recommended to do your own benchmarks to find the optimal number of blocks for each loop (you can use the `BS::timer` helper class - see [below](#measuring-execution-time)). Using less tasks than there are threads may be preferred if you are also running other tasks in parallel. Using more tasks than there are threads may improve performance in some cases. +int main() +{ + int squares[100]; + for (int i = 0; i < 100; ++i) + { + squares[i] = i * i; + std::cout << i << "^2 = " << squares[i] << " "; + } +} +``` -As a simple example, the following code will calculate the squares of all integers from 0 to 99. Since there are 10 threads, and we did not specify a fourth argument, the loop will be divided into 10 blocks, each calculating 10 squares: +We can parallelize it as follows: ```cpp #include "BS_thread_pool.hpp" @@ -412,22 +545,30 @@ int main() { BS::thread_pool pool(10); int squares[100]; - pool.parallelize_loop(0, 100, + pool.parallelize_loop(100, [&squares](const int a, const int b) { for (int i = a; i < b; ++i) squares[i] = i * i; }) .wait(); - std::cout << squares[50]; + for (int i = 0; i < 100; ++i) + std::cout << i << "^2 = " << squares[i] << " "; } ``` -Note that here, for simplicity, instead of creating a `BS::multi_future` and then using it to wait, we simply called the `wait()` member function directly on the temporary object returned by `parallelize_loop()`. This is a convenient shortcut when we have nothing else to do while waiting. +Since there are 10 threads, and we omitted the `num_blocks` argument, the loop will be divided into 10 blocks, each calculating 10 squares. Also, since the loop starts from 0, we did not need to specify the first index. + +In this example, instead of storing the `BS::multi_future` object and then using it to wait, we simply called the `wait()` member function directly on the temporary object returned by `parallelize_loop()` without storing it anywhere. This is a convenient shortcut when we have nothing else to do while waiting. + +Notice that here we parallelized the calculation of the squares, but we did not parallelize printing the results. This is for two reasons: + +1. We want to print out the squares in ascending order, and we have no guarantee that the blocks will be executed in the correct order. This is very important; you must never expect that the parallelized loop will execute at the same order as the non-parallelized loop. +2. If we did print out the squares from within the parallel tasks, we would get a huge mess, since all 10 blocks would print to the standard output at once. [Later](#synchronizing-printing-to-an-output-stream) we will see how to synchronize printing to a stream from multiple tasks at the same time. ### Loops with return values -Usually, `parallelize_loop()` should take functions with no return values. This is because the function will be executed once for each block, but the blocks are managed by the thread pool, so there's limited usability in returning one value per block. However, for the case where this is desired, such as for summation or some sorting algorithms, `parallelize_loop()` does accept functions with return values, in which case it returns a `BS::multi_future` object where `T` is the return value. +Usually, `parallelize_loop()` should take functions with no return values. This is because the function will be executed once for each block, but the blocks are managed by the thread pool, so there's limited usability in returning one value per block. However, for the case where this is desired, such as for summation or some sorting algorithms, `parallelize_loop()` does accept functions with return values, in which case it returns a `BS::multi_future` object where `T` is the type of the return values. Here's an example of summing all the numbers from 1 to 100: @@ -453,7 +594,36 @@ int main() } ``` -Note that calling `get()` on a `BS::multi_future` object returns an `std::vector` with the values obtained from each future. In this case, the values will be the partial sums from each block, so when we add them up, we will get the total sum. +Calling `get()` on a `BS::multi_future` object returns an `std::vector` with the values obtained from each future. In this case, the values will be the partial sums from each block, so when we add them up, we will get the total sum. + +### Parallelizing loops without futures + +Just as in the case of [`push_task()`](#submitting-tasks-to-the-queue-without-futures) vs. [`submit()`](#submitting-tasks-to-the-queue-with-futures), sometimes you may want to parallelize a loop, but you don't need it to return a `BS::multi_future`. In this case, you can save the overhead of generating the futures (which can be significant, depending on the number of blocks) by using `push_loop()` instead of `parallelize_loop()`, with the same arguments. + +For example, you could also run the loop of squares example above as follows: + +```cpp +#include "BS_thread_pool.hpp" + +int main() +{ + BS::thread_pool pool(10); + int squares[100]; + pool.push_loop(100, + [&squares](const int a, const int b) + { + for (int i = a; i < b; ++i) + squares[i] = i * i; + }); + pool.wait_for_tasks(); + for (int i = 0; i < 100; ++i) + std::cout << i << "^2 = " << squares[i] << " "; +} +``` + +As with `parallelize_loop()`, the first argument can be omitted if the start index is 0, and the last argument can be omitted if the number of blocks should be equal to the number of threads. + +**Warning!** Since `push_loop()` does not return a `BS::multi_future`, there is no built-in way for the user to know when the loop finishes executing. You must use either [`wait_for_tasks()`](#manually-waiting-for-all-tasks-to-complete), or some other method such as condition variables, to ensure that the loop finishes executing before trying to use anything that depends on its output. Otherwise, bad things will happen! ## Helper classes @@ -801,6 +971,9 @@ The first `wait_for_tasks()`, which was called with `paused == false`, waited fo ```cpp #include "BS_thread_pool.hpp" +BS::synced_stream sync_out; +BS::thread_pool pool; + double inverse(const double x) { if (x == 0) @@ -811,16 +984,16 @@ double inverse(const double x) int main() { - BS::thread_pool pool; - auto my_future = pool.submit(inverse, 0); + constexpr double num = 0; + std::future my_future = pool.submit(inverse, num); try { const double result = my_future.get(); - std::cout << "The result is: " << result << '\n'; + sync_out.println("The inverse of ", num, " is ", result, "."); } catch (const std::exception& e) { - std::cout << "Caught exception: " << e.what() << '\n'; + sync_out.println("Caught exception: ", e.what()); } } ``` @@ -831,6 +1004,41 @@ The output will be: Caught exception: Division by zero! ``` +However, if you change `num` to any non-zero number, no exceptions will be thrown and the inverse will be printed. + +It is important to note that `wait()` does not throw any exceptions; only `get()` does. Therefore, even if your task does not return anything, i.e. your future is an `std::future`, you must still use `get()` on the future obtained from it if you want to catch exceptions thrown by it. Here is an example: + +```cpp +#include "BS_thread_pool.hpp" + +BS::synced_stream sync_out; +BS::thread_pool pool; + +void print_inverse(const double x) +{ + if (x == 0) + throw std::runtime_error("Division by zero!"); + else + sync_out.println("The inverse of ", x, " is ", 1 / x, "."); +} + +int main() +{ + constexpr double num = 0; + std::future my_future = pool.submit(print_inverse, num); + try + { + my_future.get(); + } + catch (const std::exception& e) + { + sync_out.println("Caught exception: ", e.what()); + } +} +``` + +When using `BS::multi_future` to handle multiple futures at once, exception handling works the same way: if any of the futures may throw exceptions, you may catch these exceptions when calling `get()`, even in the case of `BS::multi_future`. + ## Testing the package The included file `BS_thread_pool_test.cpp` will perform automated tests of all aspects of the package, and perform simple benchmarks. The output will be printed both to `std::cout` and to a file named `BS_thread_pool_test-yyyy-mm-dd_hh.mm.ss.log` based on the current date and time. In addition, the code is thoroughly documented, and is meant to serve as an extensive example of how to properly use the package. @@ -851,9 +1059,9 @@ BS::thread_pool: a fast, lightweight, and easy-to-use C++17 thread pool library (c) 2022 Barak Shoshany (baraksh@gmail.com) (http://baraksh.com) GitHub: https://github.com/bshoshany/thread-pool -Thread pool library version is v3.1.0 (2022-07-13). +Thread pool library version is v3.2.0 (2022-07-28). Hardware concurrency is 24. -Generating log file: BS_thread_pool_test-2022-07-13_19.39.37.log. +Generating log file: BS_thread_pool_test-2022-07-28_15.29.31.log. Important: Please do not run any other applications, especially multithreaded applications, in parallel with this test! @@ -903,55 +1111,119 @@ Checking that submit() works for a function with one argument and a return value Checking that submit() works for a function with two arguments and a return value... -> PASSED! +================================================ +Checking that submitting member functions works: +================================================ +Checking that push_task() works for a member function with no arguments or return value... +-> PASSED! +Checking that push_task() works for a member function with one argument and no return value... +-> PASSED! +Checking that submit() works for a member function with no arguments or return value... +-> PASSED! +Checking that submit() works for a member function with one argument and no return value... +-> PASSED! +Checking that submit() works for a member function with no arguments and a return value... +-> PASSED! +Checking that submit() works for a member function with one argument and a return value... +-> PASSED! + +====================================================================== +Checking that submitting member functions from within an object works: +====================================================================== +Checking that push_task() works within an object for a member function with no arguments or return value... +-> PASSED! +Checking that push_task() works within an object for a member function with one argument and no return value... +-> PASSED! +Checking that submit() works within an object for a member function with no arguments or return value... +-> PASSED! +Checking that submit() works within an object for a member function with one argument and no return value... +-> PASSED! +Checking that submit() works within an object for a member function with no arguments and a return value... +-> PASSED! +Checking that submit() works within an object for a member function with one argument and a return value... +-> PASSED! + ======================================= Checking that wait_for_tasks() works... ======================================= Waiting for tasks... -> PASSED! -======================================= -Checking that parallelize_loop() works: -======================================= -Verifying that a loop from 839486 to 578526 with 7 tasks modifies all indices... +====================================================== +Checking that push_loop() and parallelize_loop() work: +====================================================== +Verifying that push_loop() from 390892 to 541943 with 20 tasks modifies all indices... +-> PASSED! +Verifying that push_loop() from -233617 to 52646 with 22 tasks modifies all indices... +-> PASSED! +Verifying that push_loop() from 409845 to 410887 with 23 tasks modifies all indices... +-> PASSED! +Verifying that push_loop() from 977764 to 726111 with 12 tasks modifies all indices... +-> PASSED! +Verifying that push_loop() from -940107 to -882673 with 18 tasks modifies all indices... +-> PASSED! +Verifying that push_loop() from -613072 to 675872 with 10 tasks modifies all indices... +-> PASSED! +Verifying that push_loop() from 998082 to -267173 with 3 tasks modifies all indices... +-> PASSED! +Verifying that push_loop() from -779960 to 518984 with 4 tasks modifies all indices... +-> PASSED! +Verifying that push_loop() from -52424 to 2576 with 3 tasks modifies all indices... +-> PASSED! +Verifying that push_loop() from 131005 to -557044 with 10 tasks modifies all indices... +-> PASSED! +Verifying that parallelize_loop() from -119788 to -727738 with 14 tasks modifies all indices... +-> PASSED! +Verifying that parallelize_loop() from -911425 to -923429 with 8 tasks modifies all indices... +-> PASSED! +Verifying that parallelize_loop() from 49601 to -772605 with 16 tasks modifies all indices... -> PASSED! -Verifying that a loop from 913636 to 945504 with 22 tasks modifies all indices... +Verifying that parallelize_loop() from -322809 to -66366 with 15 tasks modifies all indices... -> PASSED! -Verifying that a loop from 963330 to 578092 with 14 tasks modifies all indices... +Verifying that parallelize_loop() from 880099 to -150434 with 22 tasks modifies all indices... -> PASSED! -Verifying that a loop from -520582 to -565322 with 9 tasks modifies all indices... +Verifying that parallelize_loop() from 984341 to 69159 with 11 tasks modifies all indices... -> PASSED! -Verifying that a loop from 374827 to 785635 with 12 tasks modifies all indices... +Verifying that parallelize_loop() from -207913 to 829987 with 9 tasks modifies all indices... -> PASSED! -Verifying that a loop from 453412 to 368063 with 7 tasks modifies all indices... +Verifying that parallelize_loop() from 297749 to -332031 with 2 tasks modifies all indices... -> PASSED! -Verifying that a loop from 557509 to -437850 with 18 tasks modifies all indices... +Verifying that parallelize_loop() from 517539 to 811728 with 8 tasks modifies all indices... -> PASSED! -Verifying that a loop from -11443 to 743510 with 2 tasks modifies all indices... +Verifying that parallelize_loop() from 311585 to -81170 with 21 tasks modifies all indices... -> PASSED! -Verifying that a loop from -724659 to -27497 with 20 tasks modifies all indices... +Verifying that parallelize_loop() from 670932 to 740527 with 3 tasks correctly sums all indices... +Expected: 98230419510, obtained: 98230419510 -> PASSED! +Verifying that parallelize_loop() from 645122 to -546036 with 11 tasks correctly sums all indices... +Expected: 118025890430, obtained: 118025890430 -> PASSED! +Verifying that parallelize_loop() from 251002 to -903037 with 6 tasks correctly sums all indices... +Expected: -752474973404, obtained: -752474973404 -> PASSED! +Verifying that parallelize_loop() from -222340 to -791805 with 15 tasks correctly sums all indices... +Expected: -577520651890, obtained: -577520651890 -> PASSED! +Verifying that parallelize_loop() from 937604 to 819765 with 3 tasks correctly sums all indices... +Expected: 207086487752, obtained: 207086487752 -> PASSED! +Verifying that parallelize_loop() from -53900 to -339294 with 24 tasks correctly sums all indices... +Expected: -112215493830, obtained: -112215493830 -> PASSED! +Verifying that parallelize_loop() from 65429 to 903556 with 22 tasks correctly sums all indices... +Expected: 812131652968, obtained: 812131652968 -> PASSED! +Verifying that parallelize_loop() from 519293 to -332413 with 19 tasks correctly sums all indices... +Expected: 159165965574, obtained: 159165965574 -> PASSED! +Verifying that parallelize_loop() from 165831 to 47482 with 18 tasks correctly sums all indices... +Expected: 25245261888, obtained: 25245261888 -> PASSED! +Verifying that parallelize_loop() from -534648 to 475350 with 13 tasks correctly sums all indices... +Expected: -59891871402, obtained: -59891871402 -> PASSED! +Verifying that parallelize_loop() with identical start and end indices does nothing... -> PASSED! -Verifying that a loop from -790047 to -124851 with 18 tasks modifies all indices... +Trying parallelize_loop() with start and end indices of different types: +Verifying that parallelize_loop() from -962605 to 21974 with 17 tasks modifies all indices... -> PASSED! -Verifying that a loop from 800644 to 589779 with 24 tasks correctly sums all indices... -Expected: 293191335030, obtained: 293191335030 -> PASSED! -Verifying that a loop from 74965 to 444791 with 23 tasks correctly sums all indices... -Expected: 192218912630, obtained: 192218912630 -> PASSED! -Verifying that a loop from 480099 to -851639 with 22 tasks correctly sums all indices... -Expected: -494795268258, obtained: -494795268258 -> PASSED! -Verifying that a loop from 213814 to 751751 with 15 tasks correctly sums all indices... -Expected: 519412601468, obtained: 519412601468 -> PASSED! -Verifying that a loop from 839817 to 51524 with 8 tasks correctly sums all indices... -Expected: 702637082620, obtained: 702637082620 -> PASSED! -Verifying that a loop from -736549 to -495131 with 21 tasks correctly sums all indices... -Expected: -297349963658, obtained: -297349963658 -> PASSED! -Verifying that a loop from -581483 to 982787 with 20 tasks correctly sums all indices... -Expected: 627746243810, obtained: 627746243810 -> PASSED! -Verifying that a loop from -991418 to -777388 with 21 tasks correctly sums all indices... -Expected: -378577762210, obtained: -378577762210 -> PASSED! -Verifying that a loop from -917643 to -429950 with 1 task correctly sums all indices... -Expected: -657212160642, obtained: -657212160642 -> PASSED! -Verifying that a loop from -473648 to -218320 with 18 tasks correctly sums all indices... -Expected: -176679060832, obtained: -176679060832 -> PASSED! +Trying the overloads for push_loop() and parallelize_loop() for the case where the first index is equal to 0: +Verifying that push_loop() from 0 to 482251 with 7 tasks modifies all indices... +-> PASSED! +Verifying that parallelize_loop() from 0 to 151431 with 1 task modifies all indices... +-> PASSED! +Verifying that parallelize_loop() from 0 to 806998 with 4 tasks correctly sums all indices... +Expected: 651244965006, obtained: 651244965006 -> PASSED! ==================================== Checking that task monitoring works: @@ -960,21 +1232,21 @@ Resetting pool to 4 threads. Submitting 12 tasks. After submission, should have: 12 tasks total, 4 tasks running, 8 tasks queued... Result: 12 tasks total, 4 tasks running, 8 tasks queued -> PASSED! -Task 0 released. -Task 2 released. Task 1 released. +Task 2 released. Task 3 released. +Task 0 released. After releasing 4 tasks, should have: 8 tasks total, 4 tasks running, 4 tasks queued... Result: 8 tasks total, 4 tasks running, 4 tasks queued -> PASSED! Task 5 released. +Task 4 released. Task 7 released. Task 6 released. -Task 4 released. After releasing 4 more tasks, should have: 4 tasks total, 4 tasks running, 0 tasks queued... Result: 4 tasks total, 4 tasks running, 0 tasks queued -> PASSED! -Task 10 released. Task 9 released. Task 11 released. +Task 10 released. Task 8 released. After releasing the final 4 tasks, should have: 0 tasks total, 0 tasks running, 0 tasks queued... Result: 0 tasks total, 0 tasks running, 0 tasks queued -> PASSED! @@ -991,26 +1263,26 @@ Result: 12 tasks total, 0 tasks running, 12 tasks queued -> PASSED! 300ms later, should still have: 12 tasks total, 0 tasks running, 12 tasks queued... Result: 12 tasks total, 0 tasks running, 12 tasks queued -> PASSED! Unpausing pool. -Task 1 done. Task 3 done. -Task 0 done. +Task 1 done. Task 2 done. +Task 0 done. 300ms later, should have: 8 tasks total, 4 tasks running, 4 tasks queued... Result: 8 tasks total, 4 tasks running, 4 tasks queued -> PASSED! Pausing pool and using wait_for_tasks() to wait for the running tasks. -Task 5 done. +Task 4 done. Task 6 done. Task 7 done. -Task 4 done. +Task 5 done. After waiting, should have: 4 tasks total, 0 tasks running, 4 tasks queued... Result: 4 tasks total, 0 tasks running, 4 tasks queued -> PASSED! 200ms later, should still have: 4 tasks total, 0 tasks running, 4 tasks queued... Result: 4 tasks total, 0 tasks running, 4 tasks queued -> PASSED! Unpausing pool and using wait_for_tasks() to wait for all tasks. Task 9 done. -Task 11 done. Task 8 done. Task 10 done. +Task 11 done. After waiting, should have: 0 tasks total, 0 tasks running, 0 tasks queued... Result: 0 tasks total, 0 tasks running, 0 tasks queued -> PASSED! Resetting pool to 24 threads. @@ -1018,35 +1290,40 @@ Resetting pool to 24 threads. ======================================= Checking that exception handling works: ======================================= +Checking that exceptions are forwarded correctly by submit()... +Throwing exception... +-> PASSED! +Checking that exceptions are forwarded correctly by BS::multi_future... +Throwing exception... Throwing exception... -> PASSED! ============================================================ Testing that vector operations produce the expected results: ============================================================ -Adding two vectors with 817976 elements using 4 tasks... +Adding two vectors with 767202 elements using 4 tasks... -> PASSED! -Adding two vectors with 525623 elements using 23 tasks... +Adding two vectors with 3575 elements using 3 tasks... -> PASSED! -Adding two vectors with 75250 elements using 7 tasks... +Adding two vectors with 392555 elements using 11 tasks... -> PASSED! -Adding two vectors with 255236 elements using 24 tasks... +Adding two vectors with 754640 elements using 16 tasks... -> PASSED! -Adding two vectors with 791117 elements using 3 tasks... +Adding two vectors with 516335 elements using 9 tasks... -> PASSED! -Adding two vectors with 568990 elements using 7 tasks... +Adding two vectors with 564723 elements using 17 tasks... -> PASSED! -Adding two vectors with 799419 elements using 23 tasks... +Adding two vectors with 558475 elements using 15 tasks... -> PASSED! -Adding two vectors with 460301 elements using 13 tasks... +Adding two vectors with 447497 elements using 21 tasks... -> PASSED! -Adding two vectors with 508493 elements using 19 tasks... +Adding two vectors with 121486 elements using 19 tasks... -> PASSED! -Adding two vectors with 196258 elements using 2 tasks... +Adding two vectors with 324254 elements using 24 tasks... -> PASSED! ++++++++++++++++++++++++++++++ -SUCCESS: Passed all 57 checks! +SUCCESS: Passed all 85 checks! ++++++++++++++++++++++++++++++ ``` @@ -1058,7 +1335,7 @@ Once the required number of vectors has been determined, the program will test t Please note that these benchmarks are only intended to demonstrate that the package can provide a significant speedup, and it is highly recommended to perform your own benchmarks with your specific system, compiler, and code. -Here we will present the results of the performance test running on a high-end desktop computer equipped with a 12-core / 24-thread AMD Ryzen 9 3900X CPU at 3.8 GHz and 32 GB of DDR4 RAM at 3600 MHz, compiled using [GCC](https://gcc.gnu.org/) v12.1.0 ([WinLibs build](https://winlibs.com/)) on Windows 11 build 22000.675 with the `-O3` compiler flag. The output was as follows: +Here we will present the results of the performance test running on a high-end desktop computer equipped with a 12-core / 24-thread AMD Ryzen 9 3900X CPU at 3.8 GHz and 32 GB of DDR4 RAM at 3600 MHz, compiled using [MSVC](https://docs.microsoft.com/en-us/cpp/) v19.32.31332 on Windows 11 build 22000.795 with the `/O2` compiler flag. The output was as follows: ```none ====================== @@ -1066,22 +1343,22 @@ Performing benchmarks: ====================== Using 24 threads. Each test will be repeated 20 times to collect reliable statistics. - -Generating 57320 random vectors with 500 elements each: -Single-threaded, mean execution time was 298.2 ms with standard deviation 1.7 ms. -With 6 tasks, mean execution time was 52.3 ms with standard deviation 1.3 ms. -With 12 tasks, mean execution time was 30.3 ms with standard deviation 0.8 ms. -With 24 tasks, mean execution time was 16.4 ms with standard deviation 1.2 ms. -With 48 tasks, mean execution time was 19.2 ms with standard deviation 2.6 ms. -With 96 tasks, mean execution time was 17.8 ms with standard deviation 1.2 ms. -Maximum speedup obtained by multithreading vs. single-threading: 18.2x, using 24 tasks. +Determining the number and size of vectors to generate in order to achieve an approximate mean execution time of 50 ms with 24 tasks... +Generating 3312 vectors with 4096 elements each: +Single-threaded, mean execution time was 542.2 ms with standard deviation 5.8 ms. +With 6 tasks, mean execution time was 95.2 ms with standard deviation 1.7 ms. +With 12 tasks, mean execution time was 49.6 ms with standard deviation 0.7 ms. +With 24 tasks, mean execution time was 29.0 ms with standard deviation 2.9 ms. +With 48 tasks, mean execution time was 33.2 ms with standard deviation 4.3 ms. +With 96 tasks, mean execution time was 35.5 ms with standard deviation 1.9 ms. +Maximum speedup obtained by multithreading vs. single-threading: 18.7x, using 24 tasks. +++++++++++++++++++++++++++++++++++++++ Thread pool performance test completed! +++++++++++++++++++++++++++++++++++++++ ``` -This CPU has 12 physical cores, with each core providing two separate logical cores via hyperthreading, for a total of 24 threads. Without hyperthreading, we would expect a maximum theoretical speedup of 12x. With hyperthreading, one might naively expect to achieve up to a 24x speedup, but this is in fact impossible, as both logical cores share the same physical core's resources. However, generally we would expect [an estimated 30% additional speedup](https://software.intel.com/content/www/us/en/develop/articles/how-to-determine-the-effectiveness-of-hyper-threading-technology-with-an-application.html) from hyperthreading, which amounts to around 15.6x in this case. In our performance test, we see a speedup of 18.2x, saturating and even surpassing this estimated theoretical upper bound. +This CPU has 12 physical cores, with each core providing two separate logical cores via hyperthreading, for a total of 24 threads. Without hyperthreading, we would expect a maximum theoretical speedup of 12x. With hyperthreading, one might naively expect to achieve up to a 24x speedup, but this is in fact impossible, as both logical cores share the same physical core's resources. However, generally we would expect [an estimated 30% additional speedup](https://software.intel.com/content/www/us/en/develop/articles/how-to-determine-the-effectiveness-of-hyper-threading-technology-with-an-application.html) from hyperthreading, which amounts to around 15.6x in this case. In our performance test, we see a speedup of 18.7x, saturating and even surpassing this estimated theoretical upper bound. ## About the project