Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add on thread start callback #629

Merged
merged 1 commit into from
Aug 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
### Enhancements
* db_bench: add estimate-table-readers-mem benchmark which prints these stats.

* A new option on_thread_start_callback has been added. It allows to set thread affinity or perform other optimizations (e.g. NUMA pinning) to speedb background threads.
An example file on_thread_start_callback_example.cc has been provided to demonstrate how to use this feature.



### Bug Fixes
* unit tests: fix GlobalWriteControllerTest.GlobalAndWBMSetupDelay by waiting for the memtable memory release.
* spdb memtable: use_seek_parallel_threshold option parameter mishandled (#570)
Expand Down
3 changes: 2 additions & 1 deletion build_tools/check-sources.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ git grep -n 'using namespace' -- ':!build_tools' ':!docs' \
':!third-party/folly/folly/lang/Align.h' \
':!third-party/gtest-1.8.1/fused-src/gtest/gtest.h' \
':!examples/speedb_with_ttl_example.cc' \
':!examples/enable_speedb_features_example.cc'
':!examples/enable_speedb_features_example.cc' \
':!examples/on_thread_start_callback_example.cc'
if [ "$?" != "1" ]; then
echo '^^^^ Do not use "using namespace"'
BAD=1
Expand Down
36 changes: 36 additions & 0 deletions db/db_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2161,6 +2161,42 @@ TEST_P(DBMultiGetTestWithParam, MultiGetBatchedValueSizeMultiLevelMerge) {
}
}

TEST_F(DBBasicTest, DBSetThreadAffinity) {
Options options = GetDefaultOptions();
std::string dbname = test::PerThreadDBPath("db_close_test");
ASSERT_OK(DestroyDB(dbname, options));

DB* db = nullptr;
TestEnv* env = new TestEnv(env_);
std::unique_ptr<TestEnv> local_env_guard(env);
options.create_if_missing = true;
options.env = env;
auto f = [](std::thread::native_handle_type thr) {
#if defined(OS_WIN)
#include "winbase.h"
SetThreadAffinityMask(thr, 0);
#else
#include "pthread.h"
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(0, &cpuset);
pthread_setaffinity_np(thr, sizeof(cpu_set_t), &cpuset);
#endif
};
options.on_thread_start_callback =
std::make_shared<std::function<void(std::thread::native_handle_type)>>(f);
Status s = DB::Open(options, dbname, &db);
ASSERT_OK(s);
ASSERT_TRUE(db != nullptr);

s = db->Close();
ASSERT_EQ(env->GetCloseCount(), 1);
ASSERT_TRUE(s.IsIOError());

delete db;
ASSERT_EQ(env->GetCloseCount(), 1);
}

INSTANTIATE_TEST_CASE_P(DBMultiGetTestWithParam, DBMultiGetTestWithParam,
testing::Combine(testing::Bool(), testing::Bool()));

Expand Down
1 change: 1 addition & 0 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1774,6 +1774,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
const bool seq_per_batch, const bool batch_per_txn) {
port::Thread::on_thread_start_callback = db_options.on_thread_start_callback;
Status s = ValidateOptionsByTable(db_options, column_families);
if (!s.ok()) {
return s;
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_spdb_impl_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class SpdbWriteImpl {
std::condition_variable flush_thread_cv_;
port::Mutex add_buffer_mutex_;
port::RWMutexWr flush_rwlock_;
std::thread flush_thread_;
port::Thread flush_thread_;
port::RWMutexWr wal_buffers_rwlock_;
port::Mutex wal_write_mutex_;
port::Mutex wb_list_mutex_;
Expand Down
39 changes: 12 additions & 27 deletions env/env_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ class PosixEnv : public CompositeEnv {

~PosixEnv() override {
if (this == Env::Default()) {
for (const auto tid : threads_to_join_) {
pthread_join(tid, nullptr);
for (auto& tid : threads_to_join_) {
if (tid.joinable()) tid.join();
}
for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
thread_pools_[pool_id].JoinAllThreads();
Expand Down Expand Up @@ -397,12 +397,12 @@ class PosixEnv : public CompositeEnv {
// members in te default instance
std::vector<ThreadPoolImpl> thread_pools_storage_;
pthread_mutex_t mu_storage_;
std::vector<pthread_t> threads_to_join_storage_;
std::vector<port::Thread> threads_to_join_storage_;
bool allow_non_owner_access_storage_;

std::vector<ThreadPoolImpl>& thread_pools_;
pthread_mutex_t& mu_;
std::vector<pthread_t>& threads_to_join_;
std::vector<port::Thread>& threads_to_join_;
// If true, allow non owner read access for db files. Otherwise, non-owner
// has no access to db files.
bool& allow_non_owner_access_;
Expand Down Expand Up @@ -451,33 +451,18 @@ int PosixEnv::ReleaseThreads(int threads_to_released, Priority pri) {
return thread_pools_[pri].ReleaseThreads(threads_to_released);
}

struct StartThreadState {
void (*user_function)(void*);
void* arg;
};

static void* StartThreadWrapper(void* arg) {
StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
state->user_function(state->arg);
delete state;
return nullptr;
}

void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
pthread_t t;
StartThreadState* state = new StartThreadState;
state->user_function = function;
state->arg = arg;
ThreadPoolImpl::PthreadCall(
"start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state));
ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_));
threads_to_join_.push_back(t);
ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_));
auto thr = port::Thread(function, arg);
pthread_mutex_lock(&mu_);
threads_to_join_.push_back(std::move(thr));
pthread_mutex_unlock(&mu_);
}

void PosixEnv::WaitForJoin() {
for (const auto tid : threads_to_join_) {
pthread_join(tid, nullptr);
for (auto& thr : threads_to_join_) {
if (thr.joinable()) {
thr.join();
}
}
threads_to_join_.clear();
}
Expand Down
1 change: 1 addition & 0 deletions examples/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ transaction_example
rocksdb_backup_restore_example
speedb_is_awesome_example
speedb_with_ttl_example
on_thread_start_callback_example
5 changes: 5 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,8 @@ add_executable(speedb_is_awesome_example
speedb_is_awesome_example.cc)
target_link_libraries(speedb_is_awesome_example
${ROCKSDB_LIB})

add_executable(on_thread_start_callback_example
on_thread_start_callback_example.cc)
target_link_libraries(on_thread_start_callback_example
${ROCKSDB_LIB})
7 changes: 5 additions & 2 deletions examples/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ CFLAGS += -Wstrict-prototypes

all: simple_example column_families_example compaction_filter_example compact_files_example c_simple_example optimistic_transaction_example \
transaction_example options_file_example rocksdb_backup_restore_example speedb_is_awesome_example speedb_with_ttl_example \
enable_speedb_features_example
enable_speedb_features_example on_thread_start_callback_example

simple_example: static_lib simple_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)
Expand Down Expand Up @@ -61,13 +61,16 @@ enable_speedb_features_example: static_lib enable_speedb_features_example.cc
speedb_with_ttl_example: static_lib speedb_with_ttl_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)

on_thread_start_callback_example: static_lib on_thread_start_callback_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)

rocksdb_backup_restore_example: static_lib rocksdb_backup_restore_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)

clean:
rm -rf ./simple_example ./column_families_example ./compact_files_example ./compaction_filter_example ./c_simple_example c_simple_example.o \
./optimistic_transaction_example ./transaction_example ./options_file_example ./multi_processes_example ./rocksdb_backup_restore_example \
./speedb_is_awesome_example ./speedb_with_ttl_example ./enable_speedb_features_example
./speedb_is_awesome_example ./speedb_with_ttl_example ./enable_speedb_features_example ./on_thread_start_callback_example

static_lib:
LIBNAME="$(LIBNAME)" $(MAKE) -C .. static_lib
72 changes: 72 additions & 0 deletions examples/on_thread_start_callback_example.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (C) 2023 Speedb Ltd. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <functional>
#include <iostream>
#include <memory>

#include "rocksdb/db.h"
#include "rocksdb/options.h"

using namespace ROCKSDB_NAMESPACE;

#if defined(OS_WIN)
std::string kDBPath = "C:\\Windows\\TEMP\\speedb_thr_affinity";
#else
std::string kDBPath = "/tmp/speedb_thr_affinity";
#endif

int main() {
// Open the storage
DB* db = nullptr;
Options options;
// create the DB if it's not already present
options.create_if_missing = true;
auto f = [](std::thread::native_handle_type thr) {
// callback to pin all Speedb threads to the first core.
#if defined(OS_WIN)
#include "winbase.h"
SetThreadAffinityMask(thr, 0);
#else
#include "pthread.h"
std::cout << "thread spawned, thread_id: " << thr << std::endl;
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(0, &cpuset);
pthread_setaffinity_np(thr, sizeof(cpu_set_t), &cpuset);
#endif
};
options.on_thread_start_callback =
std::make_shared<std::function<void(std::thread::native_handle_type)>>(f);
Status s = DB::Open(options, kDBPath, &db);
assert(s.ok());

// append new entry
std::string key = "key_1";
std::string put_value = "Speedb is awesome!";
s = db->Put(WriteOptions(), key, put_value);
assert(s.ok());

// retrieve entry
std::string get_value;
s = db->Get(ReadOptions(), key, &get_value);
assert(s.ok());
assert(get_value == put_value);
std::cout << get_value << std::endl;

// close DB
s = db->Close();
assert(s.ok());
return 0;
}
1 change: 1 addition & 0 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "rocksdb/listener.h"
#include "rocksdb/metadata.h"
#include "rocksdb/options.h"
#include "rocksdb/port_defs.h"
#include "rocksdb/snapshot.h"
#include "rocksdb/sst_file_writer.h"
#include "rocksdb/thread_status.h"
Expand Down
1 change: 1 addition & 0 deletions include/rocksdb/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <limits>
#include <memory>
#include <string>
#include <thread>
#include <vector>

#include "rocksdb/customizable.h"
Expand Down
5 changes: 3 additions & 2 deletions include/rocksdb/memtablerep.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include <unordered_set>

#include "rocksdb/customizable.h"
#include "rocksdb/port_defs.h"
#include "rocksdb/slice.h"

namespace ROCKSDB_NAMESPACE {
Expand Down Expand Up @@ -319,7 +320,7 @@ class MemTableRepFactory : public Customizable {

void Init() {
switch_memtable_thread_ =
std::thread(&MemTableRepFactory::PrepareSwitchMemTable, this);
port::Thread(&MemTableRepFactory::PrepareSwitchMemTable, this);
// need to verify the thread was executed
{
std::unique_lock<std::mutex> lck(switch_memtable_thread_mutex_);
Expand Down Expand Up @@ -420,7 +421,7 @@ class MemTableRepFactory : public Customizable {
bool enable_switch_memtable_ = false;

private:
std::thread switch_memtable_thread_;
port::Thread switch_memtable_thread_;
std::mutex switch_memtable_thread_mutex_;
std::condition_variable switch_memtable_thread_cv_;
std::atomic<bool> terminate_switch_memtable_ = false;
Expand Down
2 changes: 2 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -1462,6 +1462,8 @@ struct DBOptions {
// Defaults to check once per hour. Set to 0 to disable the task.
unsigned int refresh_options_sec = 60 * 60;
std::string refresh_options_file;
std::shared_ptr<std::function<void(std::thread::native_handle_type)>>
on_thread_start_callback = nullptr;
};

// Options to control the behavior of a database (passed to DB::Open)
Expand Down
38 changes: 37 additions & 1 deletion include/rocksdb/port_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@

#pragma once

#include "rocksdb/rocksdb_namespace.h"
#include <functional>
#include <memory>
#include <thread>

#include "rocksdb/rocksdb_namespace.h"
namespace ROCKSDB_NAMESPACE {

enum class CpuPriority {
Expand All @@ -18,5 +21,38 @@ enum class CpuPriority {
kNormal = 2,
kHigh = 3,
};
namespace port {
class ThreadWithCb {
public:
static std::shared_ptr<std::function<void(std::thread::native_handle_type)>>
on_thread_start_callback;
template <typename Function, typename... Args>
ThreadWithCb(Function&& func, Args&&... args) {
thread_ =
std::thread(std::forward<Function>(func), std::forward<Args>(args)...);
if (on_thread_start_callback) {
on_thread_start_callback->operator()(native_handle());
}
}

ThreadWithCb() {}
bool joinable() { return thread_.joinable(); }

void join() { thread_.join(); }

void detach() { thread_.detach(); }
std::thread::id get_id() { return thread_.get_id(); }
std::thread& operator=(std::thread&& __t) {
thread_ = std::move(__t);
return thread_;
}
std::thread::native_handle_type native_handle() {
return thread_.native_handle();
}

private:
std::thread thread_;
};
using Thread = ThreadWithCb;
} // namespace port
} // namespace ROCKSDB_NAMESPACE
3 changes: 2 additions & 1 deletion include/rocksdb/write_buffer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <vector>

#include "rocksdb/cache.h"
#include "rocksdb/port_defs.h"

namespace ROCKSDB_NAMESPACE {
struct Options;
Expand Down Expand Up @@ -447,7 +448,7 @@ class WriteBufferManager final {
// reason to wakeup. See the thread's code for more details
bool new_flushes_wakeup_ = false;

std::thread flushes_thread_;
port::Thread flushes_thread_;
bool terminate_flushes_thread_ = false;
};

Expand Down
Loading