Skip to content

Commit

Permalink
Support thread affinity
Browse files Browse the repository at this point in the history
  • Loading branch information
lizhanhui committed Nov 27, 2022
1 parent d52b520 commit 3a67d76
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,5 @@ fuzz/proto/gen/
fuzz/crash-*

cmake-build-*

.cache
60 changes: 60 additions & 0 deletions env/env_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors

#include <sched.h>
#ifdef _GNU_SOURCE
#include <pthread.h>
#endif

#include <cassert>

#if !defined(OS_WIN)

#include <dirent.h>
Expand Down Expand Up @@ -371,6 +378,20 @@ class PosixEnv : public CompositeEnv {
return Status::OK();
}

void SetCpuSet(std::vector<int> cpu_set) override {
#ifdef _GNU_SOURCE
for (int processor_id : cpu_set) {
assert((unsigned int)processor_id < std::thread::hardware_concurrency());
// Skip if the processor has already set.
if (CPU_ISSET(processor_id, &posix_cpu_set_)) {
continue;
}
CPU_SET(processor_id, &posix_cpu_set_);
}
#endif
CompositeEnv::SetCpuSet(cpu_set);
}

private:
friend Env* Env::Default();
// Constructs the default Env, a singleton
Expand All @@ -390,6 +411,10 @@ class PosixEnv : public CompositeEnv {
// If true, allow non owner read access for db files. Otherwise, non-owner
// has no access to db files.
bool& allow_non_owner_access_;

#ifdef _GNU_SOURCE
cpu_set_t posix_cpu_set_;
#endif
};

PosixEnv::PosixEnv()
Expand All @@ -400,12 +425,22 @@ PosixEnv::PosixEnv()
mu_(mu_storage_),
threads_to_join_(threads_to_join_storage_),
allow_non_owner_access_(allow_non_owner_access_storage_) {

#ifdef _GNU_SOURCE
CPU_ZERO(&posix_cpu_set_);
#endif

ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
thread_pools_[pool_id].SetThreadPriority(
static_cast<Env::Priority>(pool_id));
// This allows later initializing the thread-local-env of each thread.
thread_pools_[pool_id].SetHostEnv(this);

#ifdef _GNU_SOURCE
// Specify thread affinity
thread_pools_[pool_id].SetCpuSet(&posix_cpu_set_);
#endif
}
thread_status_updater_ = CreateThreadStatusUpdater();
}
Expand All @@ -417,6 +452,11 @@ PosixEnv::PosixEnv(const PosixEnv* default_env,
mu_(default_env->mu_),
threads_to_join_(default_env->threads_to_join_),
allow_non_owner_access_(default_env->allow_non_owner_access_) {

#ifdef _GNU_SOURCE
CPU_ZERO(&posix_cpu_set_);
#endif

thread_status_updater_ = default_env->thread_status_updater_;
}

Expand All @@ -438,10 +478,23 @@ unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const {
struct StartThreadState {
void (*user_function)(void*);
void* arg;

#ifdef _GNU_SOURCE
cpu_set_t* cpu_set;
#endif
};

static void* StartThreadWrapper(void* arg) {
StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);

// Set thread affinity
#ifdef _GNU_SOURCE
if (CPU_COUNT(state->cpu_set)) {
pthread_setaffinity_np(pthread_self(), sizeof(*state->cpu_set),
state->cpu_set);
}
#endif

state->user_function(state->arg);
delete state;
return nullptr;
Expand All @@ -452,6 +505,13 @@ void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
StartThreadState* state = new StartThreadState;
state->user_function = function;
state->arg = arg;


#ifdef _GNU_SOURCE
// Specify thread affinity
state->cpu_set = &posix_cpu_set_;
#endif

ThreadPoolImpl::PthreadCall(
"start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state));
ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_));
Expand Down
15 changes: 15 additions & 0 deletions include/rocksdb/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,12 @@ class Env {
// could be a fully implemented one, or a wrapper class around the Env
const std::shared_ptr<SystemClock>& GetSystemClock() const;

// Get/Set CPU-set, on which RocksDB threads are allowed to schedule.
virtual const std::vector<int>& GetCpuSet() const { return cpu_set_; }
virtual void SetCpuSet(std::vector<int> cpu_set) {
this->cpu_set_.swap(cpu_set);
}

// If you're adding methods here, remember to add them to EnvWrapper too.

protected:
Expand All @@ -605,6 +611,8 @@ class Env {
// Pointer to the underlying SystemClock implementation
std::shared_ptr<SystemClock> system_clock_;

std::vector<int> cpu_set_;

private:
static const size_t kMaxHostNameLen = 256;
};
Expand Down Expand Up @@ -1514,6 +1522,13 @@ class EnvWrapper : public Env {
target_->SanitizeEnvOptions(env_opts);
}

const std::vector<int>& GetCpuSet() const override {
return target_->GetCpuSet();
}
void SetCpuSet(std::vector<int> cpu_set) override {
target_->SetCpuSet(cpu_set);
}

private:
Env* target_;
};
Expand Down
24 changes: 24 additions & 0 deletions util/threadpool_imp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
#include "test_util/sync_point.h"
#include "util/string_util.h"

#ifdef _GNU_SOURCE
#include <pthread.h>
#endif

namespace ROCKSDB_NAMESPACE {

void ThreadPoolImpl::PthreadCall(const char* label, int result) {
Expand Down Expand Up @@ -100,6 +104,10 @@ struct ThreadPoolImpl::Impl {
// Set the thread priority.
void SetThreadPriority(Env::Priority priority) { priority_ = priority; }

#ifdef _GNU_SOURCE
void SetCpuSet(cpu_set_t* cpu_set) { cpu_set_ = cpu_set; }
#endif

private:
static void BGThreadWrapper(void* arg);

Expand All @@ -126,6 +134,10 @@ struct ThreadPoolImpl::Impl {
std::mutex mu_;
std::condition_variable bgsignal_;
std::vector<port::Thread> bgthreads_;

#ifdef _GNU_SOURCE
cpu_set_t* cpu_set_;
#endif
};

inline ThreadPoolImpl::Impl::Impl()
Expand Down Expand Up @@ -337,6 +349,12 @@ void ThreadPoolImpl::Impl::StartBGThreads() {
port::Thread p_t(&BGThreadWrapper,
new BGThreadMetadata(this, bgthreads_.size()));

#ifdef _GNU_SOURCE
if (CPU_COUNT(cpu_set_)) {
pthread_setaffinity_np(p_t.native_handle(), sizeof(*cpu_set_), cpu_set_);
}
#endif

// Set the thread name to aid debugging
#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
#if __GLIBC_PREREQ(2, 12)
Expand Down Expand Up @@ -499,6 +517,12 @@ void ThreadPoolImpl::SetThreadPriority(Env::Priority priority) {
impl_->SetThreadPriority(priority);
}

#ifdef _GNU_SOURCE
void ThreadPoolImpl::SetCpuSet(cpu_set_t* cpu_set) {
impl_->SetCpuSet(cpu_set);
}
#endif

ThreadPool* NewThreadPool(int num_threads) {
ThreadPoolImpl* thread_pool = new ThreadPoolImpl();
thread_pool->SetBackgroundThreads(num_threads);
Expand Down
8 changes: 8 additions & 0 deletions util/threadpool_imp.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
#include <memory>
#include <functional>

#ifdef _GNU_SOURCE
#include <pthread.h>
#endif

namespace ROCKSDB_NAMESPACE {

class ThreadPoolImpl : public ThreadPool {
Expand Down Expand Up @@ -88,6 +92,10 @@ class ThreadPoolImpl : public ThreadPool {
// Set the thread priority.
void SetThreadPriority(Env::Priority priority);

#ifdef _GNU_SOURCE
void SetCpuSet(cpu_set_t* cpu_set);
#endif

static void PthreadCall(const char* label, int result);

struct Impl;
Expand Down

0 comments on commit 3a67d76

Please sign in to comment.