Skip to content

Commit

Permalink
Extract all event bases from an IO thread pool
Browse files Browse the repository at this point in the history
Summary:
This functionality can be used to simplify use cases which require finer
control over the distribution of tasks across threads.

Reviewed By: iahs

Differential Revision: D34158426

fbshipit-source-id: fdbf18fec61f20b78961d11c23d9ed1a6d322c74
  • Loading branch information
Emanuele Altieri authored and facebook-github-bot committed Feb 15, 2022
1 parent 484915b commit a25c6a2
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 4 deletions.
13 changes: 13 additions & 0 deletions folly/executors/IOThreadPoolExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,19 @@ EventBase* IOThreadPoolExecutor::getEventBase() {
return pickThread()->eventBase;
}

std::vector<Executor::KeepAlive<EventBase>>
IOThreadPoolExecutor::getAllEventBases() {
ensureMaxActiveThreads();
std::vector<Executor::KeepAlive<EventBase>> evbs;
SharedMutex::ReadHolder r{&threadListLock_};
const auto& threads = threadList_.get();
evbs.reserve(threads.size());
for (const auto& thr : threads) {
evbs.emplace_back(static_cast<IOThread&>(*thr).eventBase);
}
return evbs;
}

EventBase* IOThreadPoolExecutor::getEventBase(
ThreadPoolExecutor::ThreadHandle* h) {
auto thread = dynamic_cast<IOThread*>(h);
Expand Down
4 changes: 4 additions & 0 deletions folly/executors/IOThreadPoolExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ class IOThreadPoolExecutor : public ThreadPoolExecutor, public IOExecutor {

folly::EventBase* getEventBase() override;

// Ensures that the maximum number of active threads is running and returns
// the EventBase associated with each thread.
std::vector<folly::Executor::KeepAlive<folly::EventBase>> getAllEventBases();

static folly::EventBase* getEventBase(ThreadPoolExecutor::ThreadHandle*);

static std::mutex* getEventBaseShutdownMutex(
Expand Down
12 changes: 8 additions & 4 deletions folly/executors/ThreadPoolExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,10 +420,7 @@ void ThreadPoolExecutor::addObserver(std::shared_ptr<Observer> o) {
o->threadPreviouslyStarted(thread.get());
}
}
while (activeThreads_.load(std::memory_order_relaxed) <
maxThreads_.load(std::memory_order_relaxed)) {
ensureActiveThreads();
}
ensureMaxActiveThreads();
}

void ThreadPoolExecutor::removeObserver(std::shared_ptr<Observer> o) {
Expand Down Expand Up @@ -523,6 +520,13 @@ void ThreadPoolExecutor::ensureActiveThreads() {
activeThreads_.store(active + 1, std::memory_order_relaxed);
}

void ThreadPoolExecutor::ensureMaxActiveThreads() {
while (activeThreads_.load(std::memory_order_relaxed) <
maxThreads_.load(std::memory_order_relaxed)) {
ensureActiveThreads();
}
}

// If an idle thread times out, only join it if there are at least
// minThreads threads.
bool ThreadPoolExecutor::minActive() {
Expand Down
1 change: 1 addition & 0 deletions folly/executors/ThreadPoolExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ class ThreadPoolExecutor : public DefaultKeepAliveExecutor {
folly::ThreadPoolListHook threadPoolHook_;

// Dynamic thread sizing functions and variables
void ensureMaxActiveThreads();
void ensureActiveThreads();
void ensureJoined();
bool minActive();
Expand Down

0 comments on commit a25c6a2

Please sign in to comment.