Skip to content

Commit

Permalink
Use extended alignment safe allocator in CPUThreadPoolExecutor
Browse files Browse the repository at this point in the history
Summary:
Until C++17 types with extended alignment need special handling
in eg. std::vector and also std::unique_ptr. CPUThreadPoolExecutor has a
default queue that requires extended alignment, but also allows the user
to provide their own blocking queue. To handle this we move the private
member to a shared_ptr which supported type erased destructors, and then
use allocate_shared for the default queue type with a custom allocator
that will satisfy alignment constraints.

```
.../unique_ptr.h:825:34: runtime error: constructor call on misaligned address 0x613000000040 for type 'folly::UnboundedBlockingQueue<folly::CPUThreadPoolExecutor::CPUTask>', which requires 128 byte alignment
0x613000000040: note: pointer points here
 02 00 00 1c  be be be be be be be be  be be be be be be be be  be be be be be be be be  be be be be
              ^
    #0 0x75b35e in std::_MakeUniq<folly::UnboundedBlockingQueue<folly::CPUThreadPoolExecutor::CPUTask> >::__single_object std::make_unique<folly::UnboundedBlockingQueue<folly::CPUThreadPoolExecutor::CPUTask> >() .../unique_ptr.h:825:30
    facebook#1 0x75602f in folly::CPUThreadPoolExecutor::CPUThreadPoolExecutor(unsigned long, std::shared_ptr<folly::ThreadFactory>) xplat/folly/executors/CPUThreadPoolExecutor.cpp:66:18
    facebook#2 0x756c6c in folly::CPUThreadPoolExecutor::CPUThreadPoolExecutor(unsigned long) xplat/folly/executors/CPUThreadPoolExecutor.cpp:84:7
```

Differential Revision: D19237477

fbshipit-source-id: c88b26e2ca17e168f124ba27c0989f787b1ce4fd
  • Loading branch information
akrieger authored and dotconnor committed Mar 18, 2021
1 parent fc9ecd3 commit 6302db1
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 5 deletions.
19 changes: 15 additions & 4 deletions folly/executors/CPUThreadPoolExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <folly/executors/CPUThreadPoolExecutor.h>

#include <folly/Memory.h>
#include <folly/executors/task_queue/PriorityLifoSemMPMCQueue.h>
#include <folly/executors/task_queue/PriorityUnboundedBlockingQueue.h>
#include <folly/executors/task_queue/UnboundedBlockingQueue.h>
Expand All @@ -28,6 +29,16 @@ DEFINE_bool(

namespace folly {

namespace {
// queue_alloc custom allocator is necessary until C++17
// http://open-std.org/JTC1/SC22/WG21/docs/papers/2012/n3396.htm
// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=65122
// https://bugs.llvm.org/show_bug.cgi?id=22634
using default_queue = UnboundedBlockingQueue<CPUThreadPoolExecutor::CPUTask>;
using default_queue_alloc =
AlignedSysAllocator<default_queue, FixedAlign<alignof(default_queue)>>;
} // namespace

const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 14;

CPUThreadPoolExecutor::CPUThreadPoolExecutor(
Expand All @@ -38,7 +49,7 @@ CPUThreadPoolExecutor::CPUThreadPoolExecutor(
numThreads,
FLAGS_dynamic_cputhreadpoolexecutor ? 0 : numThreads,
std::move(threadFactory)),
taskQueue_(std::move(taskQueue)) {
taskQueue_(taskQueue.release()) {
setNumThreads(numThreads);
registerThreadPoolExecutor(this);
}
Expand All @@ -51,7 +62,7 @@ CPUThreadPoolExecutor::CPUThreadPoolExecutor(
numThreads.first,
numThreads.second,
std::move(threadFactory)),
taskQueue_(std::move(taskQueue)) {
taskQueue_(taskQueue.release()) {
setNumThreads(numThreads.first);
registerThreadPoolExecutor(this);
}
Expand All @@ -63,7 +74,7 @@ CPUThreadPoolExecutor::CPUThreadPoolExecutor(
numThreads,
FLAGS_dynamic_cputhreadpoolexecutor ? 0 : numThreads,
std::move(threadFactory)),
taskQueue_(std::make_unique<UnboundedBlockingQueue<CPUTask>>()) {
taskQueue_(std::allocate_shared<default_queue>(default_queue_alloc{})) {
setNumThreads(numThreads);
registerThreadPoolExecutor(this);
}
Expand All @@ -75,7 +86,7 @@ CPUThreadPoolExecutor::CPUThreadPoolExecutor(
numThreads.first,
numThreads.second,
std::move(threadFactory)),
taskQueue_(std::make_unique<UnboundedBlockingQueue<CPUTask>>()) {
taskQueue_(std::allocate_shared<default_queue>(default_queue_alloc{})) {
setNumThreads(numThreads.first);
registerThreadPoolExecutor(this);
}
Expand Down
3 changes: 2 additions & 1 deletion folly/executors/CPUThreadPoolExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor {
bool tryDecrToStop();
bool taskShouldStop(folly::Optional<CPUTask>&);

std::unique_ptr<BlockingQueue<CPUTask>> taskQueue_;
// shared_ptr for type erased dtor to handle extended alignment.
std::shared_ptr<BlockingQueue<CPUTask>> taskQueue_;
std::atomic<ssize_t> threadsToStop_{0};
};

Expand Down

0 comments on commit 6302db1

Please sign in to comment.