diff --git a/HISTORY.md b/HISTORY.md index 5f91d6d4c9..25d278b9c6 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -12,6 +12,11 @@ ### Enhancements * db_bench: add estimate-table-readers-mem benchmark which prints these stats. +* A new option on_thread_start_callback has been added. It allows to set thread affinity or perform other optimizations (e.g. NUMA pinning) to speedb background threads. +An example file on_thread_start_callback_example.cc has been provided to demonstrate how to use this feature. + + + ### Bug Fixes * unit tests: fix GlobalWriteControllerTest.GlobalAndWBMSetupDelay by waiting for the memtable memory release. * spdb memtable: use_seek_parallel_threshold option parameter mishandled (#570) diff --git a/build_tools/check-sources.sh b/build_tools/check-sources.sh index 0b8b3f181a..2e5b0262f9 100755 --- a/build_tools/check-sources.sh +++ b/build_tools/check-sources.sh @@ -33,7 +33,8 @@ git grep -n 'using namespace' -- ':!build_tools' ':!docs' \ ':!third-party/folly/folly/lang/Align.h' \ ':!third-party/gtest-1.8.1/fused-src/gtest/gtest.h' \ ':!examples/speedb_with_ttl_example.cc' \ - ':!examples/enable_speedb_features_example.cc' + ':!examples/enable_speedb_features_example.cc' \ + ':!examples/on_thread_start_callback_example.cc' if [ "$?" != "1" ]; then echo '^^^^ Do not use "using namespace"' BAD=1 diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 8f558b1967..395d8c300d 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -2161,6 +2161,42 @@ TEST_P(DBMultiGetTestWithParam, MultiGetBatchedValueSizeMultiLevelMerge) { } } +TEST_F(DBBasicTest, DBSetThreadAffinity) { + Options options = GetDefaultOptions(); + std::string dbname = test::PerThreadDBPath("db_close_test"); + ASSERT_OK(DestroyDB(dbname, options)); + + DB* db = nullptr; + TestEnv* env = new TestEnv(env_); + std::unique_ptr local_env_guard(env); + options.create_if_missing = true; + options.env = env; + auto f = [](std::thread::native_handle_type thr) { +#if defined(OS_WIN) +#include "winbase.h" + SetThreadAffinityMask(thr, 0); +#else +#include "pthread.h" + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(0, &cpuset); + pthread_setaffinity_np(thr, sizeof(cpu_set_t), &cpuset); +#endif + }; + options.on_thread_start_callback = + std::make_shared>(f); + Status s = DB::Open(options, dbname, &db); + ASSERT_OK(s); + ASSERT_TRUE(db != nullptr); + + s = db->Close(); + ASSERT_EQ(env->GetCloseCount(), 1); + ASSERT_TRUE(s.IsIOError()); + + delete db; + ASSERT_EQ(env->GetCloseCount(), 1); +} + INSTANTIATE_TEST_CASE_P(DBMultiGetTestWithParam, DBMultiGetTestWithParam, testing::Combine(testing::Bool(), testing::Bool())); diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index ad84fd1089..88212d733e 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1774,6 +1774,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, const std::vector& column_families, std::vector* handles, DB** dbptr, const bool seq_per_batch, const bool batch_per_txn) { + port::Thread::on_thread_start_callback = db_options.on_thread_start_callback; Status s = ValidateOptionsByTable(db_options, column_families); if (!s.ok()) { return s; diff --git a/db/db_impl/db_spdb_impl_write.h b/db/db_impl/db_spdb_impl_write.h index 87c44e969e..75e89c0b07 100644 --- a/db/db_impl/db_spdb_impl_write.h +++ b/db/db_impl/db_spdb_impl_write.h @@ -93,7 +93,7 @@ class SpdbWriteImpl { std::condition_variable flush_thread_cv_; port::Mutex add_buffer_mutex_; port::RWMutexWr flush_rwlock_; - std::thread flush_thread_; + port::Thread flush_thread_; port::RWMutexWr wal_buffers_rwlock_; port::Mutex wal_write_mutex_; port::Mutex wb_list_mutex_; diff --git a/env/env_posix.cc b/env/env_posix.cc index 77f28e1f50..e651d40f1b 100644 --- a/env/env_posix.cc +++ b/env/env_posix.cc @@ -215,8 +215,8 @@ class PosixEnv : public CompositeEnv { ~PosixEnv() override { if (this == Env::Default()) { - for (const auto tid : threads_to_join_) { - pthread_join(tid, nullptr); + for (auto& tid : threads_to_join_) { + if (tid.joinable()) tid.join(); } for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { thread_pools_[pool_id].JoinAllThreads(); @@ -397,12 +397,12 @@ class PosixEnv : public CompositeEnv { // members in te default instance std::vector thread_pools_storage_; pthread_mutex_t mu_storage_; - std::vector threads_to_join_storage_; + std::vector threads_to_join_storage_; bool allow_non_owner_access_storage_; std::vector& thread_pools_; pthread_mutex_t& mu_; - std::vector& threads_to_join_; + std::vector& threads_to_join_; // If true, allow non owner read access for db files. Otherwise, non-owner // has no access to db files. bool& allow_non_owner_access_; @@ -451,33 +451,18 @@ int PosixEnv::ReleaseThreads(int threads_to_released, Priority pri) { return thread_pools_[pri].ReleaseThreads(threads_to_released); } -struct StartThreadState { - void (*user_function)(void*); - void* arg; -}; - -static void* StartThreadWrapper(void* arg) { - StartThreadState* state = reinterpret_cast(arg); - state->user_function(state->arg); - delete state; - return nullptr; -} - void PosixEnv::StartThread(void (*function)(void* arg), void* arg) { - pthread_t t; - StartThreadState* state = new StartThreadState; - state->user_function = function; - state->arg = arg; - ThreadPoolImpl::PthreadCall( - "start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state)); - ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_)); - threads_to_join_.push_back(t); - ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + auto thr = port::Thread(function, arg); + pthread_mutex_lock(&mu_); + threads_to_join_.push_back(std::move(thr)); + pthread_mutex_unlock(&mu_); } void PosixEnv::WaitForJoin() { - for (const auto tid : threads_to_join_) { - pthread_join(tid, nullptr); + for (auto& thr : threads_to_join_) { + if (thr.joinable()) { + thr.join(); + } } threads_to_join_.clear(); } diff --git a/examples/.gitignore b/examples/.gitignore index cd412ac2b9..e0a247a709 100644 --- a/examples/.gitignore +++ b/examples/.gitignore @@ -11,3 +11,4 @@ transaction_example rocksdb_backup_restore_example speedb_is_awesome_example speedb_with_ttl_example +on_thread_start_callback_example diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index d761c88247..73b5ad40d3 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -53,3 +53,8 @@ add_executable(speedb_is_awesome_example speedb_is_awesome_example.cc) target_link_libraries(speedb_is_awesome_example ${ROCKSDB_LIB}) + +add_executable(on_thread_start_callback_example +on_thread_start_callback_example.cc) + target_link_libraries(on_thread_start_callback_example + ${ROCKSDB_LIB}) diff --git a/examples/Makefile b/examples/Makefile index 897add759a..308167cac8 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -21,7 +21,7 @@ CFLAGS += -Wstrict-prototypes all: simple_example column_families_example compaction_filter_example compact_files_example c_simple_example optimistic_transaction_example \ transaction_example options_file_example rocksdb_backup_restore_example speedb_is_awesome_example speedb_with_ttl_example \ - enable_speedb_features_example + enable_speedb_features_example on_thread_start_callback_example simple_example: static_lib simple_example.cc $(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) @@ -61,13 +61,16 @@ enable_speedb_features_example: static_lib enable_speedb_features_example.cc speedb_with_ttl_example: static_lib speedb_with_ttl_example.cc $(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) +on_thread_start_callback_example: static_lib on_thread_start_callback_example.cc + $(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) + rocksdb_backup_restore_example: static_lib rocksdb_backup_restore_example.cc $(CXX) $(CXXFLAGS) $@.cc -o$@ ../$(LIBNAME).a -I../include -O2 -std=c++17 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) clean: rm -rf ./simple_example ./column_families_example ./compact_files_example ./compaction_filter_example ./c_simple_example c_simple_example.o \ ./optimistic_transaction_example ./transaction_example ./options_file_example ./multi_processes_example ./rocksdb_backup_restore_example \ - ./speedb_is_awesome_example ./speedb_with_ttl_example ./enable_speedb_features_example + ./speedb_is_awesome_example ./speedb_with_ttl_example ./enable_speedb_features_example ./on_thread_start_callback_example static_lib: LIBNAME="$(LIBNAME)" $(MAKE) -C .. static_lib diff --git a/examples/on_thread_start_callback_example.cc b/examples/on_thread_start_callback_example.cc new file mode 100644 index 0000000000..1ebe59dbd3 --- /dev/null +++ b/examples/on_thread_start_callback_example.cc @@ -0,0 +1,72 @@ +// Copyright (C) 2023 Speedb Ltd. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include "rocksdb/db.h" +#include "rocksdb/options.h" + +using namespace ROCKSDB_NAMESPACE; + +#if defined(OS_WIN) +std::string kDBPath = "C:\\Windows\\TEMP\\speedb_thr_affinity"; +#else +std::string kDBPath = "/tmp/speedb_thr_affinity"; +#endif + +int main() { + // Open the storage + DB* db = nullptr; + Options options; + // create the DB if it's not already present + options.create_if_missing = true; + auto f = [](std::thread::native_handle_type thr) { +// callback to pin all Speedb threads to the first core. +#if defined(OS_WIN) +#include "winbase.h" + SetThreadAffinityMask(thr, 0); +#else +#include "pthread.h" + std::cout << "thread spawned, thread_id: " << thr << std::endl; + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(0, &cpuset); + pthread_setaffinity_np(thr, sizeof(cpu_set_t), &cpuset); +#endif + }; + options.on_thread_start_callback = + std::make_shared>(f); + Status s = DB::Open(options, kDBPath, &db); + assert(s.ok()); + + // append new entry + std::string key = "key_1"; + std::string put_value = "Speedb is awesome!"; + s = db->Put(WriteOptions(), key, put_value); + assert(s.ok()); + + // retrieve entry + std::string get_value; + s = db->Get(ReadOptions(), key, &get_value); + assert(s.ok()); + assert(get_value == put_value); + std::cout << get_value << std::endl; + + // close DB + s = db->Close(); + assert(s.ok()); + return 0; +} diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index a385adb6fe..1bbe8e6925 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -22,6 +22,7 @@ #include "rocksdb/listener.h" #include "rocksdb/metadata.h" #include "rocksdb/options.h" +#include "rocksdb/port_defs.h" #include "rocksdb/snapshot.h" #include "rocksdb/sst_file_writer.h" #include "rocksdb/thread_status.h" diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 62af602c62..fec653823c 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include "rocksdb/customizable.h" diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index d5d18863b4..0977c54e08 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -46,6 +46,7 @@ #include #include "rocksdb/customizable.h" +#include "rocksdb/port_defs.h" #include "rocksdb/slice.h" namespace ROCKSDB_NAMESPACE { @@ -319,7 +320,7 @@ class MemTableRepFactory : public Customizable { void Init() { switch_memtable_thread_ = - std::thread(&MemTableRepFactory::PrepareSwitchMemTable, this); + port::Thread(&MemTableRepFactory::PrepareSwitchMemTable, this); // need to verify the thread was executed { std::unique_lock lck(switch_memtable_thread_mutex_); @@ -420,7 +421,7 @@ class MemTableRepFactory : public Customizable { bool enable_switch_memtable_ = false; private: - std::thread switch_memtable_thread_; + port::Thread switch_memtable_thread_; std::mutex switch_memtable_thread_mutex_; std::condition_variable switch_memtable_thread_cv_; std::atomic terminate_switch_memtable_ = false; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index d045a01591..98677b9cff 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1462,6 +1462,8 @@ struct DBOptions { // Defaults to check once per hour. Set to 0 to disable the task. unsigned int refresh_options_sec = 60 * 60; std::string refresh_options_file; + std::shared_ptr> + on_thread_start_callback = nullptr; }; // Options to control the behavior of a database (passed to DB::Open) diff --git a/include/rocksdb/port_defs.h b/include/rocksdb/port_defs.h index 9771aacb92..bfb616ce80 100644 --- a/include/rocksdb/port_defs.h +++ b/include/rocksdb/port_defs.h @@ -8,8 +8,11 @@ #pragma once -#include "rocksdb/rocksdb_namespace.h" +#include +#include +#include +#include "rocksdb/rocksdb_namespace.h" namespace ROCKSDB_NAMESPACE { enum class CpuPriority { @@ -18,5 +21,38 @@ enum class CpuPriority { kNormal = 2, kHigh = 3, }; +namespace port { +class ThreadWithCb { + public: + static std::shared_ptr> + on_thread_start_callback; + template + ThreadWithCb(Function&& func, Args&&... args) { + thread_ = + std::thread(std::forward(func), std::forward(args)...); + if (on_thread_start_callback) { + on_thread_start_callback->operator()(native_handle()); + } + } + + ThreadWithCb() {} + bool joinable() { return thread_.joinable(); } + void join() { thread_.join(); } + + void detach() { thread_.detach(); } + std::thread::id get_id() { return thread_.get_id(); } + std::thread& operator=(std::thread&& __t) { + thread_ = std::move(__t); + return thread_; + } + std::thread::native_handle_type native_handle() { + return thread_.native_handle(); + } + + private: + std::thread thread_; +}; +using Thread = ThreadWithCb; +} // namespace port } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/write_buffer_manager.h b/include/rocksdb/write_buffer_manager.h index 718ac97004..6aa8a9f5f6 100644 --- a/include/rocksdb/write_buffer_manager.h +++ b/include/rocksdb/write_buffer_manager.h @@ -25,6 +25,7 @@ #include #include "rocksdb/cache.h" +#include "rocksdb/port_defs.h" namespace ROCKSDB_NAMESPACE { struct Options; @@ -447,7 +448,7 @@ class WriteBufferManager final { // reason to wakeup. See the thread's code for more details bool new_flushes_wakeup_ = false; - std::thread flushes_thread_; + port::Thread flushes_thread_; bool terminate_flushes_thread_ = false; }; diff --git a/memtable/spdb_sorted_vector.h b/memtable/spdb_sorted_vector.h index 8442f6f334..3f2d006b73 100644 --- a/memtable/spdb_sorted_vector.h +++ b/memtable/spdb_sorted_vector.h @@ -194,7 +194,7 @@ class SpdbVectorContainer { spdb_vectors_.push_front(spdb_vector); spdb_vector->SetVectorListIter(std::prev(spdb_vectors_.end())); curr_vector_.store(spdb_vector.get()); - sort_thread_ = std::thread(&SpdbVectorContainer::SortThread, this); + sort_thread_ = port::Thread(&SpdbVectorContainer::SortThread, this); } ~SpdbVectorContainer() { @@ -246,7 +246,7 @@ class SpdbVectorContainer { std::atomic immutable_; // sort thread info std::atomic num_elements_; - std::thread sort_thread_; + port::Thread sort_thread_; std::mutex sort_thread_mutex_; std::condition_variable sort_thread_cv_; }; diff --git a/memtable/write_buffer_manager.cc b/memtable/write_buffer_manager.cc index 7710d39f22..b0c9071fad 100644 --- a/memtable/write_buffer_manager.cc +++ b/memtable/write_buffer_manager.cc @@ -608,7 +608,7 @@ void WriteBufferManager::InitFlushInitiationVars(size_t quota) { if (flushes_thread_.joinable() == false) { flushes_thread_ = - std::thread(&WriteBufferManager::InitiateFlushesThread, this); + port::Thread(&WriteBufferManager::InitiateFlushesThread, this); } } diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index eefa564063..0a86caaf17 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -258,6 +258,9 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { {offsetof(struct DBOptions, compaction_service), sizeof(std::shared_ptr)}, {offsetof(struct DBOptions, refresh_options_file), sizeof(std::string)}, + {offsetof(struct DBOptions, on_thread_start_callback), + sizeof(std::shared_ptr< + std::function>)}, }; char* options_ptr = new char[sizeof(DBOptions)]; diff --git a/port/port_posix.cc b/port/port_posix.cc index f227bf2223..3a6abf30ba 100644 --- a/port/port_posix.cc +++ b/port/port_posix.cc @@ -47,7 +47,8 @@ extern const bool kDefaultToAdaptiveMutex = false; #endif namespace port { - +std::shared_ptr> + ThreadWithCb::on_thread_start_callback; static int PthreadCall(const char* label, int result) { if (result != 0 && result != ETIMEDOUT && result != EBUSY) { fprintf(stderr, "pthread %s: %s\n", label, errnoStr(result).c_str()); diff --git a/port/port_posix.h b/port/port_posix.h index 11a73a8a88..fd209fbe75 100644 --- a/port/port_posix.h +++ b/port/port_posix.h @@ -176,7 +176,7 @@ class CondVar { Mutex* mu_; }; -using Thread = std::thread; +using Thread = ThreadWithCb; static inline void AsmVolatilePause() { #if defined(__i386__) || defined(__x86_64__) diff --git a/port/win/port_win.cc b/port/win/port_win.cc index 37e8f655ce..36356899ca 100644 --- a/port/win/port_win.cc +++ b/port/win/port_win.cc @@ -39,7 +39,8 @@ namespace ROCKSDB_NAMESPACE { extern const bool kDefaultToAdaptiveMutex = false; namespace port { - +std::shared_ptr> + ThreadWithCb::on_thread_start_callback; #ifdef ROCKSDB_WINDOWS_UTF8_FILENAMES std::string utf16_to_utf8(const std::wstring& utf16) { std::wstring_convert, wchar_t> convert; diff --git a/port/win/port_win.h b/port/win/port_win.h index c8d90f0b76..54e0d8c461 100644 --- a/port/win/port_win.h +++ b/port/win/port_win.h @@ -191,13 +191,7 @@ class CondVar { Mutex* mu_; }; -#ifdef _POSIX_THREADS -using Thread = std::thread; -#else -// Wrapper around the platform efficient -// or otherwise preferrable implementation -using Thread = WindowsThread; -#endif +using Thread = port::ThreadWithCb; // OnceInit type helps emulate // Posix semantics with initialization