From 0f8ee68b4da42f5e7c7a93655c998854cb0ae183 Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Fri, 3 May 2024 20:20:45 +0800 Subject: [PATCH] Support thread local object iteration --- src/butil/thread_key.cpp | 81 +++++++++++++----------------------- src/butil/thread_key.h | 28 ++++++++++--- src/butil/type_traits.h | 34 +++++++++++++++ test/endpoint_unittest.cpp | 4 +- test/thread_key_unittest.cpp | 44 +++++++++++++++++++- 5 files changed, 130 insertions(+), 61 deletions(-) diff --git a/src/butil/thread_key.cpp b/src/butil/thread_key.cpp index 02bcd5867a..3bf4bb0f37 100644 --- a/src/butil/thread_key.cpp +++ b/src/butil/thread_key.cpp @@ -38,7 +38,7 @@ pthread_mutex_t g_thread_key_mutex = PTHREAD_MUTEX_INITIALIZER; static size_t g_id = 0; static std::deque* g_free_ids = NULL; static std::vector* g_thread_keys = NULL; -static __thread std::vector* g_tls_data = NULL; +static __thread std::vector* thread_key_tls_data = NULL; ThreadKey& ThreadKey::operator=(ThreadKey&& other) noexcept { if (this == &other) { @@ -56,58 +56,42 @@ bool ThreadKey::Valid() const { } static void DestroyTlsData() { - if (!g_tls_data) { + if (!thread_key_tls_data) { return; } std::vector dummy_keys; { BAIDU_SCOPED_LOCK(g_thread_key_mutex); - if (BAIDU_LIKELY(g_thread_keys)) { - dummy_keys.insert(dummy_keys.end(), g_thread_keys->begin(), g_thread_keys->end()); - } + dummy_keys.insert(dummy_keys.end(), + g_thread_keys->begin(), + g_thread_keys->end()); } - for (size_t i = 0; i < g_tls_data->size(); ++i) { + for (size_t i = 0; i < thread_key_tls_data->size(); ++i) { if (!KEY_UNUSED(dummy_keys[i].seq) && dummy_keys[i].dtor) { - dummy_keys[i].dtor((*g_tls_data)[i].data); + dummy_keys[i].dtor((*thread_key_tls_data)[i].data); } } - delete g_tls_data; - g_tls_data = NULL; -} - -static std::deque* GetGlobalFreeIds() { - if (BAIDU_UNLIKELY(!g_free_ids)) { - g_free_ids = new (std::nothrow) std::deque(); - if (BAIDU_UNLIKELY(!g_free_ids)) { - abort(); - } - } - - return g_free_ids; + delete thread_key_tls_data; + thread_key_tls_data = NULL; } int thread_key_create(ThreadKey& thread_key, DtorFunction dtor) { BAIDU_SCOPED_LOCK(g_thread_key_mutex); - size_t id; - auto free_ids = GetGlobalFreeIds(); - if (!free_ids) { - return ENOMEM; + if (BAIDU_UNLIKELY(!g_free_ids)) { + g_free_ids = new std::deque; } - - if (!free_ids->empty()) { - id = free_ids->back(); - free_ids->pop_back(); + size_t id; + if (!g_free_ids->empty()) { + id = g_free_ids->back(); + g_free_ids->pop_back(); } else { if (g_id >= ThreadKey::InvalidID) { // No more available ids. return EAGAIN; } id = g_id++; - if(BAIDU_UNLIKELY(!g_thread_keys)) { - g_thread_keys = new (std::nothrow) std::vector; - if(BAIDU_UNLIKELY(!g_thread_keys)) { - return ENOMEM; - } + if (BAIDU_UNLIKELY(!g_thread_keys)) { + g_thread_keys = new std::vector; g_thread_keys->reserve(THREAD_KEY_RESERVE); } g_thread_keys->resize(id + 1); @@ -136,14 +120,10 @@ int thread_key_delete(ThreadKey& thread_key) { return EINVAL; } - if (BAIDU_UNLIKELY(!GetGlobalFreeIds())) { - return ENOMEM; - } - ++((*g_thread_keys)[id].seq); // Collect the usable key id for reuse. if (KEY_USABLE((*g_thread_keys)[id].seq)) { - GetGlobalFreeIds()->push_back(id); + g_free_ids->push_back(id); } thread_key.Reset(); @@ -156,22 +136,19 @@ int thread_setspecific(ThreadKey& thread_key, void* data) { } size_t id = thread_key._id; size_t seq = thread_key._seq; - if (BAIDU_UNLIKELY(!g_tls_data)) { - g_tls_data = new (std::nothrow) std::vector; - if (BAIDU_UNLIKELY(!g_tls_data)) { - return ENOMEM; - } - g_tls_data->reserve(THREAD_KEY_RESERVE); + if (BAIDU_UNLIKELY(!thread_key_tls_data)) { + thread_key_tls_data = new std::vector; + thread_key_tls_data->reserve(THREAD_KEY_RESERVE); // Register the destructor of tls_data in this thread. butil::thread_atexit(DestroyTlsData); } - if (id >= g_tls_data->size()) { - g_tls_data->resize(id + 1); + if (id >= thread_key_tls_data->size()) { + thread_key_tls_data->resize(id + 1); } - (*g_tls_data)[id].seq = seq; - (*g_tls_data)[id].data = data; + (*thread_key_tls_data)[id].seq = seq; + (*thread_key_tls_data)[id].data = data; return 0; } @@ -182,13 +159,13 @@ void* thread_getspecific(ThreadKey& thread_key) { } size_t id = thread_key._id; size_t seq = thread_key._seq; - if (BAIDU_UNLIKELY(!g_tls_data || - id >= g_tls_data->size() || - (*g_tls_data)[id].seq != seq)){ + if (BAIDU_UNLIKELY(!thread_key_tls_data || + id >= thread_key_tls_data->size() || + (*thread_key_tls_data)[id].seq != seq)){ return NULL; } - return (*g_tls_data)[id].data; + return (*thread_key_tls_data)[id].data; } } // namespace butil \ No newline at end of file diff --git a/src/butil/thread_key.h b/src/butil/thread_key.h index f8d8f0e47c..e95fa2fa91 100644 --- a/src/butil/thread_key.h +++ b/src/butil/thread_key.h @@ -23,6 +23,7 @@ #include #include #include "butil/scoped_lock.h" +#include "butil/type_traits.h" namespace butil { @@ -38,7 +39,7 @@ class ThreadKey { static constexpr size_t InvalidID = std::numeric_limits::max(); static constexpr size_t InitSeq = 0; - constexpr ThreadKey() :_id(InvalidID), _seq(InitSeq) {} + constexpr ThreadKey() : _id(InvalidID), _seq(InitSeq) {} ~ThreadKey() { Reset(); @@ -62,7 +63,7 @@ class ThreadKey { _seq = InitSeq; } - private: +private: size_t _id; // Key id. // Sequence number form g_thread_keys set in thread_key_create. size_t _seq; @@ -111,6 +112,20 @@ class ThreadLocal { T& operator*() const { return *get(); } + // Iterate through all thread local objects. + // Callback, which must accept Args params and return void, + // will be called under a thread lock. + template + void for_each(Callback&& callback) { + BAIDU_CASSERT( + (is_result_void::value), + "Callback must accept Args params and return void"); + BAIDU_SCOPED_LOCK(_mutex); + for (auto ptr : ptrs) { + callback(ptr); + } + } + void reset(T* ptr); void reset() { @@ -177,6 +192,9 @@ T* ThreadLocal::get() { template void ThreadLocal::reset(T* ptr) { T* old_ptr = get(); + if (ptr == old_ptr) { + return; + } if (thread_setspecific(_key, ptr) != 0) { return; } @@ -187,9 +205,9 @@ void ThreadLocal::reset(T* ptr) { } // Remove and delete old_ptr. if (old_ptr) { - auto iter = std::find(ptrs.begin(), ptrs.end(), old_ptr); - if (iter!=ptrs.end()) { - ptrs.erase(iter); + auto iter = std::remove(ptrs.begin(), ptrs.end(), old_ptr); + if (iter != ptrs.end()) { + ptrs.erase(iter, ptrs.end()); } DefaultDtor(old_ptr); } diff --git a/src/butil/type_traits.h b/src/butil/type_traits.h index 5f342db3d9..dd6aaca4b1 100644 --- a/src/butil/type_traits.h +++ b/src/butil/type_traits.h @@ -351,6 +351,40 @@ template struct is_enum : is_enum { }; template struct is_enum : is_enum { }; template struct is_enum : is_enum { }; +// Deduces the return type of an INVOKE expression +// at compile time. +// If the callable is non-static member function, +// the first argument should be the class type. +#if (__cplusplus >= 201703L) +// std::result_of is deprecated in C++17 and removed in C++20, +// use std::invoke_result instead. +template +struct result_of; +template +struct result_of : std::invoke_result {}; +#elif (__cplusplus >= 201103L) +template +using result_of = std::result_of; +#else +#error Only C++11 or later is supported. +#endif + +template +using result_of_t = typename result_of::type; + +// Whether a callable returns type which is same as ReturnType. +template +struct is_result_same + : public butil::is_same> {}; + +// Whether a callable returns void. +template +struct is_result_void : public is_result_same {}; + +// Whether a callable returns int. +template +struct is_result_int : public is_result_same {}; + } // namespace butil #endif // BUTIL_TYPE_TRAITS_H diff --git a/test/endpoint_unittest.cpp b/test/endpoint_unittest.cpp index fcb23a7b27..e0da1af14f 100644 --- a/test/endpoint_unittest.cpp +++ b/test/endpoint_unittest.cpp @@ -486,11 +486,11 @@ TEST(EndPointTest, tcp_connect) { ASSERT_EQ(0, butil::hostname2endpoint(g_hostname, 80, &ep)); { butil::fd_guard sockfd(butil::tcp_connect(ep, NULL)); - ASSERT_LE(0, sockfd); + ASSERT_LE(0, sockfd) << "errno=" << errno; } { butil::fd_guard sockfd(butil::tcp_connect(ep, NULL, 1000)); - ASSERT_LE(0, sockfd); + ASSERT_LE(0, sockfd) << "errno=" << errno; } { butil::fd_guard sockfd(butil::tcp_connect(ep, NULL, 1)); diff --git a/test/thread_key_unittest.cpp b/test/thread_key_unittest.cpp index a4609aed20..adbeb02460 100644 --- a/test/thread_key_unittest.cpp +++ b/test/thread_key_unittest.cpp @@ -104,7 +104,7 @@ TEST(ThreadLocalTest, thread_key_seq) { } } -void* THreadKeyCreateAndDeleteFunc(void* arg) { +void* THreadKeyCreateAndDeleteFunc(void*) { while (!g_stopped) { ThreadKey key; EXPECT_EQ(0, butil::thread_key_create(key, NULL)); @@ -162,7 +162,7 @@ TEST(ThreadLocalTest, thread_local_multi_thread) { ASSERT_EQ(0, pthread_create(&threads[i], NULL, ThreadLocalFunc, &args)); } - sleep(5); + sleep(2); g_stopped = true; for (const auto& thread : threads) { pthread_join(thread, NULL); @@ -172,6 +172,46 @@ TEST(ThreadLocalTest, thread_local_multi_thread) { } } +butil::atomic g_counter(0); + +void* ThreadLocalForEachFunc(void* arg) { + auto counter = static_cast>*>(arg); + auto local_counter = counter->get(); + EXPECT_NE(nullptr, local_counter); + while (!g_stopped) { + local_counter->fetch_add(1, butil::memory_order_relaxed); + g_counter.fetch_add(1, butil::memory_order_relaxed); + if (butil::fast_rand_less_than(100) + 1 > 80) { + local_counter = new butil::atomic( + local_counter->load(butil::memory_order_relaxed)); + counter->reset(local_counter); + } + } + return NULL; +} + +TEST(ThreadLocalTest, thread_local_for_each) { + g_stopped = false; + ThreadLocal> counter(false); + const int thread_num = 8; + pthread_t threads[thread_num]; + for (int i = 0; i < thread_num; ++i) { + ASSERT_EQ(0, pthread_create( + &threads[i], NULL, ThreadLocalForEachFunc, &counter)); + } + + sleep(2); + g_stopped = true; + for (const auto& thread : threads) { + pthread_join(thread, NULL); + } + int count = 0; + counter.for_each([&count](butil::atomic* c) { + count += c->load(butil::memory_order_relaxed); + }); + ASSERT_EQ(count, g_counter.load(butil::memory_order_relaxed)); +} + struct BAIDU_CACHELINE_ALIGNMENT ThreadKeyArg { std::vector thread_keys; bool ready_delete = false;