Skip to content

Commit

Permalink
Don't use threads on VS2013 and earlier.
Browse files Browse the repository at this point in the history
Standard threads are rather broken in the msvc 12 runtime. So just avoid
them entirelty in that case. And just do single thread execution on
tasks.
  • Loading branch information
grafikrobot committed Aug 21, 2023
1 parent e67cb47 commit d6ad469
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 25 deletions.
3 changes: 3 additions & 0 deletions src/engine/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ typedef long int32_t;
#endif

// Indicate if we can use std::thread and friends.
#if defined(_MSC_VER) && _MSC_VER <= 1800
#define B2_USE_STD_THREADS 0
#endif
#ifndef B2_USE_STD_THREADS
#define B2_USE_STD_THREADS 1
#endif
Expand Down
61 changes: 36 additions & 25 deletions src/engine/tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Distributed under the Boost Software License, Version 1.0.
#include "jam.h"
#include "mod_sysinfo.h"
#include "output.h"
#include "types.h"

#include <queue>
#include <vector>
Expand All @@ -36,35 +37,37 @@ struct sync
#endif
};

#if B2_USE_STD_THREADS

inline sync::sync()
{
#if B2_USE_STD_THREADS
wait_arrived = false;
signal_arrived = false;
#endif
}

inline void sync::wait()
{
#if B2_USE_STD_THREADS
// Indicate that we waiting.
wait_arrived = true;
// Wait for the signal that we can proceed.
std::unique_lock<std::mutex> lock(arrived_mx);
arrived_cv.wait(lock, [this]() { return signal_arrived.load(); });
#endif
}

inline bool sync::signal()
{
#if B2_USE_STD_THREADS
// Wait for wait() to get called.
if (!wait_arrived.load()) return false;
// Tell the waiter that we arrived.
signal_arrived = true;
arrived_cv.notify_one();
#endif
return true;
}

#endif

/*
A group of tasks that run in parallel within a limit of parallelism. The
parallelism limit is enforced by only dequeuing calls when possible.
Expand All @@ -78,11 +81,8 @@ struct group::implementation
unsigned stat_max_running = 0;
unsigned stat_total = 0;
unsigned stat_max_pending = 0;

#if B2_USE_STD_THREADS
std::mutex mx;
mutex_t mx;
sync finished;
#endif // B2_USE_STD_THREADS

inline implementation(executor & e, unsigned p)
: exec(e)
Expand Down Expand Up @@ -111,9 +111,9 @@ struct executor::implementation
std::vector<std::shared_ptr<group>> groups;
unsigned call_count = 0;
unsigned running_count = 0;
mutex_t mx;

#if B2_USE_STD_THREADS
std::mutex mx;
std::vector<std::thread> runners;
std::condition_variable call_cv;
#endif // B2_USE_STD_THREADS
Expand Down Expand Up @@ -145,8 +145,6 @@ struct executor::implementation
void runner();
};

#if B2_USE_STD_THREADS

inline void group::implementation::call_queue(std::function<void()> f)
{
// If we don't have parallel allotment. We opt to execute the call inline.
Expand All @@ -161,22 +159,22 @@ inline void group::implementation::call_queue(std::function<void()> f)
// parallelism limit.
auto the_call = [this, f]() {
{
std::unique_lock<std::mutex> lock(mx);
scope_lock_t lock(mx);
running += 1;
stat_max_running = std::max(running, stat_max_running);
stat_total += 1;
}
f();
bool signal_finished = false;
{
std::unique_lock<std::mutex> lock(mx);
scope_lock_t lock(mx);
running -= 1;
signal_finished = pending.empty() && running == 0;
}
if (signal_finished) finished.signal();
};
{
std::unique_lock<std::mutex> lock(mx);
scope_lock_t lock(mx);
pending.push(std::move(the_call));
stat_max_pending = std::max(unsigned(pending.size()), stat_max_pending);
}
Expand All @@ -189,7 +187,7 @@ inline std::function<void()> group::implementation::call_dequeue()
bool signal_finished = false;
std::function<void()> result;
{
std::unique_lock<std::mutex> lock(mx);
scope_lock_t lock(mx);
signal_finished = pending.empty() && running == 0;
// We only return tasks when we have them, and when we have enough
// parallelism.
Expand Down Expand Up @@ -219,27 +217,35 @@ inline executor::implementation::implementation(unsigned parallelism)
// No need to launch anything if we aren't parallel.
if (parallelism == 0) return;
// Launch the threads to cover the expected parallelism.
std::unique_lock<std::mutex> lock(mx);
scope_lock_t lock(mx);
#if B2_USE_STD_THREADS
runners.reserve(parallelism);
for (; parallelism > 0; --parallelism)
{
running_count += 1;
runners.emplace_back([this]() { runner(); });
}
#endif
}

inline void executor::implementation::push_group(std::shared_ptr<group> g)
{
std::unique_lock<std::mutex> lock(mx);
scope_lock_t lock(mx);
groups.push_back(g);
}

inline void executor::implementation::call_signal() { call_cv.notify_one(); }
inline void executor::implementation::call_signal()
{
#if B2_USE_STD_THREADS
call_cv.notify_one();
#endif
}

inline std::function<void()> executor::implementation::call_get()
{
std::function<void()> result;
std::unique_lock<std::mutex> lock(mx);
scope_lock_t lock(mx);
#if B2_USE_STD_THREADS
// We only dequeue task calls when we have a thread to run them.
if (call_count < runners.size())
{
Expand All @@ -255,39 +261,40 @@ inline std::function<void()> executor::implementation::call_get()
}
// We don't have tasks to run, wait for some to become available.
call_cv.wait(lock);
#endif
return result;
}

inline void executor::implementation::call_done()
{
std::unique_lock<std::mutex> lock(mx);
scope_lock_t lock(mx);
call_count -= 1;
}

inline bool executor::implementation::is_running()
{
std::unique_lock<std::mutex> lock(mx);
scope_lock_t lock(mx);
return running_count > 0;
}

inline void executor::implementation::stop()
{
#if B2_USE_STD_THREADS
// Stop all the runner threads (i.e. signal and wait).
std::vector<std::thread> to_join;
{
std::unique_lock<std::mutex> lock(mx);
scope_lock_t lock(mx);
running_count = 0;
to_join.swap(runners);
}
call_cv.notify_all();
for (auto & t : to_join)
{
call_cv.notify_all();
t.join();
}
#endif
}

#endif // B2_USE_STD_THREADS

void executor::implementation::runner()
{
while (is_running())
Expand All @@ -309,9 +316,13 @@ void executor::implementation::runner()
namespace {
unsigned get_parallelism(int parallelism)
{
#if B2_USE_STD_THREADS
return parallelism >= 0
? parallelism
: std::min(unsigned(globs.jobs), system_info().cpu_thread_count());
#else
return 0;
#endif
}
} // namespace

Expand Down
10 changes: 10 additions & 0 deletions src/engine/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,18 @@ using int_t = int;
using uint_t = unsigned int;

#if B2_USE_STD_THREADS

using mutex_t = std::mutex;
using scope_lock_t = std::unique_lock<std::mutex>;

#else

struct mutex_t {};
struct scope_lock_t
{
inline scope_lock_t(mutex_t &) {}
};

#endif

} // namespace b2
Expand Down

0 comments on commit d6ad469

Please sign in to comment.