Skip to content

Commit

Permalink
Updated to v3.3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
bshoshany committed Aug 3, 2022
1 parent 670e3ab commit 67fad04
Show file tree
Hide file tree
Showing 6 changed files with 1,323 additions and 166 deletions.
116 changes: 87 additions & 29 deletions BS_thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
/**
* @file BS_thread_pool.hpp
* @author Barak Shoshany (baraksh@gmail.com) (http://baraksh.com)
* @version 3.2.0
* @date 2022-07-28
* @version 3.3.0
* @date 2022-08-03
* @copyright Copyright (c) 2022 Barak Shoshany. Licensed under the MIT license. If you found this project useful, please consider starring it on GitHub! If you use this library in software of any kind, please provide a link to the GitHub repository https://github.com/bshoshany/thread-pool in the source code and documentation. If you use this library in published research, please cite it as follows: Barak Shoshany, "A C++17 Thread Pool for High-Performance Scientific Computing", doi:10.5281/zenodo.4742687, arXiv:2105.00613 (May 2021)
*
* @brief BS::thread_pool: a fast, lightweight, and easy-to-use C++17 thread pool library. This header file contains the entire library, including the main BS::thread_pool class and the helper classes BS::multi_future, BS::blocks, BS:synced_stream, and BS::timer.
*/

#define BS_THREAD_POOL_VERSION "v3.2.0 (2022-07-28)"
#define BS_THREAD_POOL_VERSION "v3.3.0 (2022-08-03)"

#include <atomic> // std::atomic
#include <chrono> // std::chrono
Expand All @@ -34,12 +34,6 @@ namespace BS
*/
using concurrency_t = std::invoke_result_t<decltype(std::thread::hardware_concurrency)>;

/**
* @brief Explicit casts of the flushing stream manipulators, to enable using them with synced_stream, e.g. sync_out.print(BS::flush).
*/
std::ostream& (&endl)(std::ostream&) = static_cast<std::ostream& (&)(std::ostream&)>(std::endl);
std::ostream& (&flush)(std::ostream&) = static_cast<std::ostream& (&)(std::ostream&)>(std::flush);

// ============================================================================================= //
// Begin class multi_future //

Expand All @@ -57,7 +51,7 @@ class [[nodiscard]] multi_future
*
* @param num_futures_ The desired number of futures to store.
*/
multi_future(const size_t num_futures_ = 0) : f(num_futures_) {}
multi_future(const size_t num_futures_ = 0) : futures(num_futures_) {}

/**
* @brief Get the results from all the futures stored in this multi_future object, rethrowing any stored exceptions.
Expand All @@ -68,32 +62,64 @@ class [[nodiscard]] multi_future
{
if constexpr (std::is_void_v<T>)
{
for (size_t i = 0; i < f.size(); ++i)
f[i].get();
for (size_t i = 0; i < futures.size(); ++i)
futures[i].get();
return;
}
else
{
std::vector<T> results(f.size());
for (size_t i = 0; i < f.size(); ++i)
results[i] = f[i].get();
std::vector<T> results(futures.size());
for (size_t i = 0; i < futures.size(); ++i)
results[i] = futures[i].get();
return results;
}
}

/**
* @brief Get a reference to one of the futures stored in this multi_future object.
*
* @param i The index of the desired future.
* @return The future.
*/
[[nodiscard]] std::future<T>& operator[](const size_t i)
{
return futures[i];
}

/**
* @brief Append a future to this multi_future object.
*
* @param future The future to append.
*/
void push_back(std::future<T> future)
{
futures.push_back(std::move(future));
}

/**
* @brief Get the number of futures stored in this multi_future object.
*
* @return The number of futures.
*/
[[nodiscard]] size_t size() const
{
return futures.size();
}

/**
* @brief Wait for all the futures stored in this multi_future object.
*/
void wait() const
{
for (size_t i = 0; i < f.size(); ++i)
f[i].wait();
for (size_t i = 0; i < futures.size(); ++i)
futures[i].wait();
}

private:
/**
* @brief A vector to store the futures.
*/
std::vector<std::future<T>> f;
std::vector<std::future<T>> futures;
};

// End class multi_future //
Expand Down Expand Up @@ -229,7 +255,7 @@ class [[nodiscard]] thread_pool
}

/**
* @brief Destruct the thread pool. Waits for all tasks to complete, then destroys all threads. Note that if the variable paused is set to true, then any tasks still in the queue will never be executed.
* @brief Destruct the thread pool. Waits for all tasks to complete, then destroys all threads. Note that if the pool is paused, then any tasks still in the queue will never be executed.
*/
~thread_pool()
{
Expand Down Expand Up @@ -283,6 +309,16 @@ class [[nodiscard]] thread_pool
return thread_count;
}

/**
* @brief Check whether the pool is currently paused.
*
* @return true if the pool is paused, false if it is not paused.
*/
[[nodiscard]] bool is_paused() const
{
return paused;
}

/**
* @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. Returns a multi_future object that contains the futures for all of the blocks.
*
Expand All @@ -305,7 +341,7 @@ class [[nodiscard]] thread_pool
{
multi_future<R> mf(blks.get_num_blocks());
for (size_t i = 0; i < blks.get_num_blocks(); ++i)
mf.f[i] = submit(std::forward<F>(loop), blks.start(i), blks.end(i));
mf[i] = submit(std::forward<F>(loop), blks.start(i), blks.end(i));
return mf;
}
else
Expand All @@ -331,6 +367,14 @@ class [[nodiscard]] thread_pool
return parallelize_loop(0, index_after_last, std::forward<F>(loop), num_blocks);
}

/**
* @brief Pause the pool. The workers will temporarily stop retrieving new tasks out of the queue, although any tasks already executed will keep running until they are finished.
*/
void pause()
{
paused = true;
}

/**
* @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue. Does not return a multi_future, so the user must use wait_for_tasks() or some other method to ensure that the loop finishes executing, otherwise bad things will happen.
*
Expand Down Expand Up @@ -450,6 +494,14 @@ class [[nodiscard]] thread_pool
return task_promise->get_future();
}

/**
* @brief Unpause the pool. The workers will resume retrieving new tasks out of the queue.
*/
void unpause()
{
paused = false;
}

/**
* @brief Wait for tasks to be completed. Normally, this function waits for all tasks, both those that are currently running in the threads and those that are still waiting in the queue. However, if the pool is paused, this function only waits for the currently running tasks (otherwise it would wait forever). Note: To wait for just one specific task, use submit() instead, and call the wait() member function of the generated future.
*/
Expand All @@ -461,15 +513,6 @@ class [[nodiscard]] thread_pool
waiting = false;
}

// ===========
// Public data
// ===========

/**
* @brief An atomic variable indicating whether the workers should pause. When set to true, the workers temporarily stop retrieving new tasks out of the queue, although any tasks already executed will keep running until they are finished. Set to false again to resume retrieving tasks.
*/
std::atomic<bool> paused = false;

private:
// ========================
// Private member functions
Expand Down Expand Up @@ -547,6 +590,11 @@ class [[nodiscard]] thread_pool
// Private data
// ============

/**
* @brief An atomic variable indicating whether the workers should pause. When set to true, the workers temporarily stop retrieving new tasks out of the queue, although any tasks already executed will keep running until they are finished. When set to false again, the workers resume retrieving tasks.
*/
std::atomic<bool> paused = false;

/**
* @brief An atomic variable indicating to the workers to keep running. When set to false, the workers permanently stop working.
*/
Expand Down Expand Up @@ -637,6 +685,16 @@ class [[nodiscard]] synced_stream
print(std::forward<T>(items)..., '\n');
}

/**
* @brief A stream manipulator to pass to a synced_stream (an explicit cast of std::endl). Prints a newline character to the stream, and then flushes it. Should only be used if flushing is desired, otherwise '\n' should be used instead.
*/
inline static std::ostream& (&endl)(std::ostream&) = static_cast<std::ostream& (&)(std::ostream&)>(std::endl);

/**
* @brief A stream manipulator to pass to a synced_stream (an explicit cast of std::flush). Used to flush the stream.
*/
inline static std::ostream& (&flush)(std::ostream&) = static_cast<std::ostream& (&)(std::ostream&)>(std::flush);

private:
/**
* @brief The output stream to print to.
Expand Down
Loading

0 comments on commit 67fad04

Please sign in to comment.