diff --git a/wangle/concurrent/BoundThreadFactory.h b/wangle/concurrent/BoundThreadFactory.h new file mode 100644 index 000000000..3c62e2dc5 --- /dev/null +++ b/wangle/concurrent/BoundThreadFactory.h @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2016, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + * + */ + +#pragma once + +#include + +namespace wangle { + +/** + * A ThreadFactory that sets binds each thread to a specific CPU core. + * The main use case for this class is NUMA-aware computing. + */ +class BoundThreadFactory : public ThreadFactory { + public: + explicit BoundThreadFactory(std::shared_ptr factory, + int32_t coreId) + : factory_(std::move(factory)) + , coreId_(coreId) {} + + std::thread newThread(folly::Func&& func) override { + int32_t coreId = coreId_; + return factory_->newThread([ coreId, func = std::move(func) ]() mutable { + cpu_set_t cpuSet; + CPU_ZERO(&cpuSet); + CPU_SET(coreId, &cpuSet); + int32_t error = pthread_setaffinity_np(pthread_self(), sizeof(cpuSet), &cpuSet); + if (error != 0) { + LOG(ERROR) << "set cpu affinity failed for core=" << coreId + << " with error " << error, strerror(error); + } + func(); + }); + } + + private: + std::shared_ptr factory_; + int32_t coreId_; +}; + +} // wangle diff --git a/wangle/concurrent/test/ThreadPoolExecutorTest.cpp b/wangle/concurrent/test/ThreadPoolExecutorTest.cpp index 5ec368768..19f2d386a 100644 --- a/wangle/concurrent/test/ThreadPoolExecutorTest.cpp +++ b/wangle/concurrent/test/ThreadPoolExecutorTest.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -425,6 +426,28 @@ TEST(PriorityThreadFactoryTest, ThreadPriority) { EXPECT_EQ(1, actualPriority); } +TEST(BoundThreadFactoryTest, ThreadPriority) { + int32_t expectedCoreId = 1; // use sysconf(_SC_NPROCESSORS_ONLN) or hwloc lib + BoundThreadFactory factory( + std::make_shared("stuff"), expectedCoreId); + factory.newThread([&]() { + cpu_set_t cpuSet; + CPU_ZERO(&cpuSet); + int error = pthread_getaffinity_np(pthread_self(), sizeof(cpuSet), &cpuSet); + ASSERT_EQ(error, 0); + + for (int32_t c = 0; c < CPU_SETSIZE; c++) { + if (c == expectedCoreId) { + ASSERT_TRUE(CPU_ISSET(c, &cpuSet)); + } else { + ASSERT_FALSE(CPU_ISSET(c, &cpuSet)); + } + } + + }).join(); +} + + class TestData : public folly::RequestData { public: explicit TestData(int data) : data_(data) {}