From 25cfdb08300f367334da3b52a235a6fb3362c6cf Mon Sep 17 00:00:00 2001 From: Bright Chen Date: Fri, 13 Oct 2023 10:20:41 +0800 Subject: [PATCH] Thread local without limit by _SC_THREAD_KEYS_MAX (#2296) * Thread local without num limit * Thread local without num limit * Thread local without num limit * Opt rename --- BUILD.bazel | 1 + CMakeLists.txt | 1 + Makefile | 1 + src/butil/thread_key.cpp | 194 +++++++++++++++ src/butil/thread_key.h | 202 +++++++++++++++ test/BUILD.bazel | 1 + test/CMakeLists.txt | 1 + test/Makefile | 1 + test/thread_key_unittest.cpp | 460 +++++++++++++++++++++++++++++++++++ 9 files changed, 862 insertions(+) create mode 100644 src/butil/thread_key.cpp create mode 100644 src/butil/thread_key.h create mode 100644 test/thread_key_unittest.cpp diff --git a/BUILD.bazel b/BUILD.bazel index 8848593a95..5d317c90fe 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -200,6 +200,7 @@ BUTIL_SRCS = [ "src/butil/status.cpp", "src/butil/string_printf.cpp", "src/butil/thread_local.cpp", + "src/butil/thread_key.cpp", "src/butil/unix_socket.cpp", "src/butil/endpoint.cpp", "src/butil/fd_utility.cpp", diff --git a/CMakeLists.txt b/CMakeLists.txt index 66cc015783..a01a0bf6b4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -377,6 +377,7 @@ set(BUTIL_SOURCES ${PROJECT_SOURCE_DIR}/src/butil/status.cpp ${PROJECT_SOURCE_DIR}/src/butil/string_printf.cpp ${PROJECT_SOURCE_DIR}/src/butil/thread_local.cpp + ${PROJECT_SOURCE_DIR}/src/butil/thread_key.cpp ${PROJECT_SOURCE_DIR}/src/butil/unix_socket.cpp ${PROJECT_SOURCE_DIR}/src/butil/endpoint.cpp ${PROJECT_SOURCE_DIR}/src/butil/fd_utility.cpp diff --git a/Makefile b/Makefile index 574c63bbfb..87ddc5a4ee 100644 --- a/Makefile +++ b/Makefile @@ -151,6 +151,7 @@ BUTIL_SOURCES = \ src/butil/status.cpp \ src/butil/string_printf.cpp \ src/butil/thread_local.cpp \ + src/butil/thread_key.cpp \ src/butil/unix_socket.cpp \ src/butil/endpoint.cpp \ src/butil/fd_utility.cpp \ diff --git a/src/butil/thread_key.cpp b/src/butil/thread_key.cpp new file mode 100644 index 0000000000..02bcd5867a --- /dev/null +++ b/src/butil/thread_key.cpp @@ -0,0 +1,194 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "thread_key.h" +#include "pthread.h" +#include +#include "butil/thread_local.h" + +namespace butil { + +// Check whether an entry is unused. +#define KEY_UNUSED(p) (((p) & 1) == 0) + +// Check whether a key is usable. We cannot reuse an allocated key if +// the sequence counter would overflow after the next destroy call. +// This would mean that we potentially free memory for a key with the +// same sequence. This is *very* unlikely to happen, A program would +// have to create and destroy a key 2^31 times. If it should happen we +// simply don't use this specific key anymore. +#define KEY_USABLE(p) (((size_t) (p)) < ((size_t) ((p) + 2))) + +static const uint32_t THREAD_KEY_RESERVE = 8096; +pthread_mutex_t g_thread_key_mutex = PTHREAD_MUTEX_INITIALIZER; +static size_t g_id = 0; +static std::deque* g_free_ids = NULL; +static std::vector* g_thread_keys = NULL; +static __thread std::vector* g_tls_data = NULL; + +ThreadKey& ThreadKey::operator=(ThreadKey&& other) noexcept { + if (this == &other) { + return *this; + } + + _id = other._id; + _seq = other._seq; + other.Reset(); + return *this; +} + +bool ThreadKey::Valid() const { + return _id != InvalidID && !KEY_UNUSED(_seq); +} + +static void DestroyTlsData() { + if (!g_tls_data) { + return; + } + std::vector dummy_keys; + { + BAIDU_SCOPED_LOCK(g_thread_key_mutex); + if (BAIDU_LIKELY(g_thread_keys)) { + dummy_keys.insert(dummy_keys.end(), g_thread_keys->begin(), g_thread_keys->end()); + } + } + for (size_t i = 0; i < g_tls_data->size(); ++i) { + if (!KEY_UNUSED(dummy_keys[i].seq) && dummy_keys[i].dtor) { + dummy_keys[i].dtor((*g_tls_data)[i].data); + } + } + delete g_tls_data; + g_tls_data = NULL; +} + +static std::deque* GetGlobalFreeIds() { + if (BAIDU_UNLIKELY(!g_free_ids)) { + g_free_ids = new (std::nothrow) std::deque(); + if (BAIDU_UNLIKELY(!g_free_ids)) { + abort(); + } + } + + return g_free_ids; +} + +int thread_key_create(ThreadKey& thread_key, DtorFunction dtor) { + BAIDU_SCOPED_LOCK(g_thread_key_mutex); + size_t id; + auto free_ids = GetGlobalFreeIds(); + if (!free_ids) { + return ENOMEM; + } + + if (!free_ids->empty()) { + id = free_ids->back(); + free_ids->pop_back(); + } else { + if (g_id >= ThreadKey::InvalidID) { + // No more available ids. + return EAGAIN; + } + id = g_id++; + if(BAIDU_UNLIKELY(!g_thread_keys)) { + g_thread_keys = new (std::nothrow) std::vector; + if(BAIDU_UNLIKELY(!g_thread_keys)) { + return ENOMEM; + } + g_thread_keys->reserve(THREAD_KEY_RESERVE); + } + g_thread_keys->resize(id + 1); + } + + ++((*g_thread_keys)[id].seq); + (*g_thread_keys)[id].dtor = dtor; + thread_key._id = id; + thread_key._seq = (*g_thread_keys)[id].seq; + + return 0; +} + +int thread_key_delete(ThreadKey& thread_key) { + if (BAIDU_UNLIKELY(!thread_key.Valid())) { + return EINVAL; + } + + BAIDU_SCOPED_LOCK(g_thread_key_mutex); + size_t id = thread_key._id; + size_t seq = thread_key._seq; + if (id >= g_thread_keys->size() || + seq != (*g_thread_keys)[id].seq || + KEY_UNUSED((*g_thread_keys)[id].seq)) { + thread_key.Reset(); + return EINVAL; + } + + if (BAIDU_UNLIKELY(!GetGlobalFreeIds())) { + return ENOMEM; + } + + ++((*g_thread_keys)[id].seq); + // Collect the usable key id for reuse. + if (KEY_USABLE((*g_thread_keys)[id].seq)) { + GetGlobalFreeIds()->push_back(id); + } + thread_key.Reset(); + + return 0; +} + +int thread_setspecific(ThreadKey& thread_key, void* data) { + if (BAIDU_UNLIKELY(!thread_key.Valid())) { + return EINVAL; + } + size_t id = thread_key._id; + size_t seq = thread_key._seq; + if (BAIDU_UNLIKELY(!g_tls_data)) { + g_tls_data = new (std::nothrow) std::vector; + if (BAIDU_UNLIKELY(!g_tls_data)) { + return ENOMEM; + } + g_tls_data->reserve(THREAD_KEY_RESERVE); + // Register the destructor of tls_data in this thread. + butil::thread_atexit(DestroyTlsData); + } + + if (id >= g_tls_data->size()) { + g_tls_data->resize(id + 1); + } + + (*g_tls_data)[id].seq = seq; + (*g_tls_data)[id].data = data; + + return 0; +} + +void* thread_getspecific(ThreadKey& thread_key) { + if (BAIDU_UNLIKELY(!thread_key.Valid())) { + return NULL; + } + size_t id = thread_key._id; + size_t seq = thread_key._seq; + if (BAIDU_UNLIKELY(!g_tls_data || + id >= g_tls_data->size() || + (*g_tls_data)[id].seq != seq)){ + return NULL; + } + + return (*g_tls_data)[id].data; +} + +} // namespace butil \ No newline at end of file diff --git a/src/butil/thread_key.h b/src/butil/thread_key.h new file mode 100644 index 0000000000..48f02f7d02 --- /dev/null +++ b/src/butil/thread_key.h @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BRPC_THREAD_KEY_H +#define BRPC_THREAD_KEY_H + +#include +#include +#include +#include +#include "butil/scoped_lock.h" + +namespace butil { + +typedef void (*DtorFunction)(void *); + +class ThreadKey { +public: + friend int thread_key_create(ThreadKey& thread_key, DtorFunction dtor); + friend int thread_key_delete(ThreadKey& thread_key); + friend int thread_setspecific(ThreadKey& thread_key, void* data); + friend void* thread_getspecific(ThreadKey& thread_key); + + static constexpr size_t InvalidID = std::numeric_limits::max(); + static constexpr size_t InitSeq = 0; + + constexpr ThreadKey() :_id(InvalidID), _seq(InitSeq) {} + + ~ThreadKey() { + Reset(); + } + + ThreadKey(ThreadKey&& other) noexcept + : _id(other._id) + , _seq(other._seq) { + other.Reset(); + } + + ThreadKey& operator=(ThreadKey&& other) noexcept; + + ThreadKey(const ThreadKey& other) = delete; + ThreadKey& operator=(const ThreadKey& other) = delete; + + bool Valid() const; + + void Reset() { + _id = InvalidID; + _seq = InitSeq; + } + + private: + size_t _id; // Key id. + // Sequence number form g_thread_keys set in thread_key_create. + size_t _seq; +}; + +struct ThreadKeyInfo { + ThreadKeyInfo() : seq(0), dtor(NULL) {} + + size_t seq; // Already allocated? + DtorFunction dtor; // Destruction routine. +}; + +struct ThreadKeyTLS { + ThreadKeyTLS() : seq(0), data(NULL) {} + + // Sequence number form ThreadKey, + // set in `thread_setspecific', + // used to check if the key is valid in `thread_getspecific'. + size_t seq; + void* data; // User data. +}; + +// pthread_key_xxx implication without num limit of key. +int thread_key_create(ThreadKey& thread_key, DtorFunction dtor); +int thread_key_delete(ThreadKey& thread_key); +int thread_setspecific(ThreadKey& thread_key, void* data); +void* thread_getspecific(ThreadKey& thread_key); + + +template +class ThreadLocal { +public: + ThreadLocal() : ThreadLocal(false) {} + + explicit ThreadLocal(bool delete_on_thread_exit); + + ~ThreadLocal(); + + // non-copyable + ThreadLocal(const ThreadLocal&) = delete; + ThreadLocal& operator=(const ThreadLocal&) = delete; + + T* get(); + + T* operator->() const { return get(); } + + T& operator*() const { return *get(); } + + void reset(T* ptr); + + void reset() { + reset(NULL); + } + +private: + static void DefaultDtor(void* ptr) { + if (ptr) { + delete static_cast(ptr); + } + } + + ThreadKey _key; + pthread_mutex_t _mutex; + // All pointers of data allocated by the ThreadLocal. + std::vector ptrs; + // Delete data on thread exit or destructor of ThreadLocal. + bool _delete_on_thread_exit; +}; + +template +ThreadLocal::ThreadLocal(bool delete_on_thread_exit) + : _mutex(PTHREAD_MUTEX_INITIALIZER) + , _delete_on_thread_exit(delete_on_thread_exit) { + DtorFunction dtor = _delete_on_thread_exit ? DefaultDtor : NULL; + thread_key_create(_key, dtor); +} + + +template +ThreadLocal::~ThreadLocal() { + thread_key_delete(_key); + if (!_delete_on_thread_exit) { + BAIDU_SCOPED_LOCK(_mutex); + for (auto ptr : ptrs) { + DefaultDtor(ptr); + } + } + pthread_mutex_destroy(&_mutex); +} + +template +T* ThreadLocal::get() { + T* ptr = static_cast(thread_getspecific(_key)); + if (!ptr) { + ptr = new (std::nothrow) T; + if (!ptr) { + return NULL; + } + int rc = thread_setspecific(_key, ptr); + if (rc != 0) { + DefaultDtor(ptr); + return NULL; + } + { + BAIDU_SCOPED_LOCK(_mutex); + ptrs.push_back(ptr); + } + } + return ptr; +} + +template +void ThreadLocal::reset(T* ptr) { + T* old_ptr = get(); + if (thread_setspecific(_key, ptr) != 0) { + return; + } + { + BAIDU_SCOPED_LOCK(_mutex); + if (ptr) { + ptrs.push_back(ptr); + } + // Remove and delete old_ptr. + if (old_ptr) { + auto iter = std::find(ptrs.begin(), ptrs.end(), old_ptr); + if (iter!=ptrs.end()) { + ptrs.erase(iter); + } + DefaultDtor(old_ptr); + } + } +} + +} + + +#endif //BRPC_THREAD_KEY_H diff --git a/test/BUILD.bazel b/test/BUILD.bazel index 82dcd8828b..2c47aa1001 100644 --- a/test/BUILD.bazel +++ b/test/BUILD.bazel @@ -127,6 +127,7 @@ TEST_BUTIL_SOURCES = [ "synchronous_event_unittest.cpp", "temp_file_unittest.cpp", "baidu_thread_local_unittest.cpp", + "thread_key_unittest.cpp", "baidu_time_unittest.cpp", "flat_map_unittest.cpp", "crc32c_unittest.cc", diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 9720a0fabe..aa441d27f6 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -162,6 +162,7 @@ SET(TEST_BUTIL_SOURCES ${PROJECT_SOURCE_DIR}/test/synchronous_event_unittest.cpp ${PROJECT_SOURCE_DIR}/test/temp_file_unittest.cpp ${PROJECT_SOURCE_DIR}/test/baidu_thread_local_unittest.cpp + ${PROJECT_SOURCE_DIR}/test/thread_key_unittest.cpp ${PROJECT_SOURCE_DIR}/test/baidu_time_unittest.cpp ${PROJECT_SOURCE_DIR}/test/flat_map_unittest.cpp ${PROJECT_SOURCE_DIR}/test/crc32c_unittest.cc diff --git a/test/Makefile b/test/Makefile index 871a99ed88..0723d2df8f 100644 --- a/test/Makefile +++ b/test/Makefile @@ -132,6 +132,7 @@ TEST_BUTIL_SOURCES = \ synchronous_event_unittest.cpp \ temp_file_unittest.cpp \ baidu_thread_local_unittest.cpp \ + thread_key_unittest.cpp \ baidu_time_unittest.cpp \ flat_map_unittest.cpp \ crc32c_unittest.cc \ diff --git a/test/thread_key_unittest.cpp b/test/thread_key_unittest.cpp new file mode 100644 index 0000000000..a4609aed20 --- /dev/null +++ b/test/thread_key_unittest.cpp @@ -0,0 +1,460 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "butil/thread_key.h" +#include "butil/fast_rand.h" +#include "bthread/bthread.h" + +namespace butil { +namespace { + +//pthread_key_xxx implication without num limit... +//user promise no setspecific/getspecific called in calling thread_key_delete(). +// Check whether an entry is unused. +#define KEY_UNUSED(p) (((p) & 1) == 0) + +// Check whether a key is usable. We cannot reuse an allocated key if +// the sequence counter would overflow after the next destroy call. +// This would mean that we potentially free memory for a key with the +// same sequence. This is *very* unlikely to happen, A program would +// have to create and destroy a key 2^31 times. If it should happen we +// simply don't use this specific key anymore. +#define KEY_USABLE(p) (((size_t) (p)) < ((size_t) ((p) + 2))) + +bool g_started = false; +bool g_stopped = false; + +struct ThreadKeyInfo { + uint32_t id; + uint32_t seq; +}; + +TEST(ThreadLocalTest, sanity) { + { + ThreadKey key; + for (int i = 0; i < 5; ++i) { + std::unique_ptr data(new int(1)); + int *raw_data = data.get(); + ASSERT_EQ(0, butil::thread_key_create(key, NULL)); + + ASSERT_EQ(NULL, butil::thread_getspecific(key)); + ASSERT_EQ(0, butil::thread_setspecific(key, (void *)raw_data)); + ASSERT_EQ(raw_data, butil::thread_getspecific(key)); + + ASSERT_EQ(0, butil::thread_key_delete(key)); + ASSERT_EQ(NULL, butil::thread_getspecific(key)); + ASSERT_NE(0, butil::thread_setspecific(key, (void *)raw_data)); + } + } + + for (int i = 0; i < 5; ++i) { + ThreadLocal tl; + ASSERT_TRUE(tl.get()!=NULL); + int* data = new int; + tl.reset(data); // tl owns data + ASSERT_EQ(data, tl.get()); + tl.reset(); // data has been deleted + ASSERT_TRUE(tl.get()!=NULL); + } +} + +TEST(ThreadLocalTest, thread_key_seq) { + std::vector seqs; + std::vector keys; + for (int i = 0; i < 10000; ++i) { + bool create = fast_rand_less_than(2); + uint64_t num = fast_rand_less_than(5); + if (keys.empty() || create) { + for (uint64_t j = 0; j < num; ++j) { + keys.emplace_back(); + ASSERT_EQ(0, butil::thread_key_create(keys.back(), NULL)); + ASSERT_TRUE(!KEY_UNUSED(keys.back()._seq)); + if (keys.back()._id >= seqs.size()) { + seqs.resize(keys.back()._id + 1); + } else { + ASSERT_EQ(seqs[keys.back()._id] + 2, keys.back()._seq); + } + seqs[keys.back()._id] = keys.back()._seq; + } + } else { + for (uint64_t j = 0; j < num && !keys.empty(); ++j) { + uint64_t index = fast_rand_less_than(keys.size()); + ASSERT_TRUE(!KEY_UNUSED(seqs[keys[index]._id])); + ASSERT_EQ(0, butil::thread_key_delete(keys[index])); + keys.erase(keys.begin() + index); + } + } + } +} + +void* THreadKeyCreateAndDeleteFunc(void* arg) { + while (!g_stopped) { + ThreadKey key; + EXPECT_EQ(0, butil::thread_key_create(key, NULL)); + EXPECT_TRUE(!KEY_UNUSED(key._seq)); + EXPECT_EQ(0, butil::thread_key_delete(key)); + } + return NULL; +} + +TEST(ThreadLocalTest, thread_key_create_and_delete) { + LOG(INFO) << "numeric_limits::max()=" << std::numeric_limits::max(); + g_stopped = false; + const int thread_num = 8; + pthread_t threads[thread_num]; + for (int i = 0; i < thread_num; ++i) { + ASSERT_EQ(0, pthread_create(&threads[i], NULL, THreadKeyCreateAndDeleteFunc, NULL)); + } + sleep(2); + g_stopped = true; + for (const auto& thread : threads) { + pthread_join(thread, NULL); + } +} + +void* ThreadLocalFunc(void* arg) { + auto thread_locals = (std::vector*>*)arg; + std::vector expects(thread_locals->size(), 0); + for (auto tl : *thread_locals) { + EXPECT_TRUE(tl->get() != NULL); + *(tl->get()) = 0; + } + while (!g_stopped) { + uint64_t index = + fast_rand_less_than(thread_locals->size()); + EXPECT_TRUE((*thread_locals)[index]->get() != NULL); + EXPECT_EQ(*((*thread_locals)[index]->get()), expects[index]); + ++(*((*thread_locals)[index]->get())); + ++expects[index]; + bthread_usleep(10); + } + return NULL; +} + +TEST(ThreadLocalTest, thread_local_multi_thread) { + g_stopped = false; + int thread_local_num = 20480; + std::vector*> args(thread_local_num, NULL); + for (int i = 0; i < thread_local_num; ++i) { + args[i] = new ThreadLocal(); + ASSERT_TRUE(args[i]->get() != NULL); + } + const int thread_num = 8; + pthread_t threads[thread_num]; + for (int i = 0; i < thread_num; ++i) { + ASSERT_EQ(0, pthread_create(&threads[i], NULL, ThreadLocalFunc, &args)); + } + + sleep(5); + g_stopped = true; + for (const auto& thread : threads) { + pthread_join(thread, NULL); + } + for (auto tl : args) { + delete tl; + } +} + +struct BAIDU_CACHELINE_ALIGNMENT ThreadKeyArg { + std::vector thread_keys; + bool ready_delete = false; +}; + +bool g_deleted = false; +void* ThreadKeyFunc(void* arg) { + auto thread_key_arg = (ThreadKeyArg*)arg; + auto thread_keys = thread_key_arg->thread_keys; + std::vector expects(thread_keys.size(), 0); + for (auto key : thread_keys) { + EXPECT_TRUE(butil::thread_getspecific(*key) == NULL); + EXPECT_EQ(0, butil::thread_setspecific(*key, new int(0))); + EXPECT_EQ(*(static_cast(butil::thread_getspecific(*key))), 0); + } + while (!g_stopped) { + uint64_t index = + fast_rand_less_than(thread_keys.size()); + auto data = static_cast(butil::thread_getspecific(*thread_keys[index])); + EXPECT_TRUE(data != NULL); + EXPECT_EQ(*data, expects[index]); + ++(*data); + ++expects[index]; + bthread_usleep(10); + } + + thread_key_arg->ready_delete = true; + while (!g_deleted) { + bthread_usleep(10); + } + + for (auto key : thread_keys) { + EXPECT_TRUE(butil::thread_getspecific(*key) == NULL) + << butil::thread_getspecific(*key); + } + return NULL; +} + +TEST(ThreadLocalTest, thread_key_multi_thread) { + g_stopped = false; + g_deleted = false; + std::vector thread_keys; + int key_num = 20480; + for (int i = 0; i < key_num; ++i) { + thread_keys.push_back(new ThreadKey()); + ASSERT_EQ(0, butil::thread_key_create(*thread_keys.back(), [](void* data) { + delete static_cast(data); + })); + ASSERT_TRUE(butil::thread_getspecific(*thread_keys.back()) == NULL); + ASSERT_EQ(0, butil::thread_setspecific(*thread_keys.back(), new int(0))); + ASSERT_EQ(*(static_cast(butil::thread_getspecific(*thread_keys.back()))), 0); + } + const int thread_num = 8; + std::vector args(thread_num); + pthread_t threads[thread_num]; + for (int i = 0; i < thread_num; ++i) { + args[i].thread_keys = thread_keys; + ASSERT_EQ(0, pthread_create(&threads[i], NULL, ThreadKeyFunc, &args[i])); + } + + sleep(5); + g_stopped = true; + while (true) { + bool all_ready = true; + for (int i = 0; i < thread_num; ++i) { + if (!args[i].ready_delete) { + all_ready = false; + break; + } + } + if (all_ready) { + break; + } + usleep(1000); + } + for (auto key : thread_keys) { + ASSERT_EQ(0, butil::thread_key_delete(*key)); + ASSERT_TRUE(butil::thread_getspecific(*key) == NULL); + } + g_deleted = true; + + for (const auto& thread : threads) { + ASSERT_EQ(0, pthread_join(thread, NULL)); + } + for (auto key : thread_keys) { + delete key; + } +} + +DEFINE_bool(test_pthread_key, true, "test pthread_key"); + +struct BAIDU_CACHELINE_ALIGNMENT ThreadKeyPerfArgs { + pthread_key_t pthread_key; + ThreadKey* thread_key; + bool is_pthread_key; + int64_t counter; + int64_t elapse_ns; + bool ready; + + ThreadKeyPerfArgs() + : thread_key(NULL) + , is_pthread_key(true) + , counter(0) + , elapse_ns(0) + , ready(false) {} +}; + +void* ThreadKeyPerfFunc(void* void_arg) { + auto args = (ThreadKeyPerfArgs*)void_arg; + args->ready = true; + std::unique_ptr data(new int(1)); + if (args->is_pthread_key) { + pthread_setspecific(args->pthread_key, (void*)data.get()); + } else { + butil::thread_setspecific(*args->thread_key, (void*)data.get()); + } + butil::Timer t; + while (!g_stopped) { + if (g_started) { + break; + } + bthread_usleep(10); + } + t.start(); + while (!g_stopped) { + if (args->is_pthread_key) { + pthread_getspecific(args->pthread_key); + } else { + butil::thread_getspecific(*args->thread_key); + } + ++args->counter; + } + t.stop(); + args->elapse_ns = t.n_elapsed(); + return NULL; +} + + +void ThreadKeyPerfTest(int thread_num, bool test_pthread_key) { + g_started = false; + g_stopped = false; + pthread_key_t pthread_key; + butil::ThreadKey thread_key; + if (test_pthread_key) { + ASSERT_EQ(0, pthread_key_create(&pthread_key, NULL)); + } else { + ASSERT_EQ(0, butil::thread_key_create(thread_key, NULL)); + } + pthread_t threads[thread_num]; + std::vector args(thread_num); + for (int i = 0; i < thread_num; ++i) { + if (test_pthread_key) { + args[i].pthread_key = pthread_key; + args[i].is_pthread_key = true; + } else { + args[i].thread_key = &thread_key; + args[i].is_pthread_key = false; + } + ASSERT_EQ(0, pthread_create(&threads[i], NULL, ThreadKeyPerfFunc, &args[i])); + } + while (true) { + bool all_ready = true; + for (int i = 0; i < thread_num; ++i) { + if (!args[i].ready) { + all_ready = false; + break; + } + } + if (all_ready) { + break; + } + usleep(1000); + } + g_started = true; + int64_t run_ms = 5 * 1000; + usleep(run_ms * 1000); + g_stopped = true; + int64_t wait_time = 0; + int64_t count = 0; + for (int i = 0; i < thread_num; ++i) { + pthread_join(threads[i], NULL); + wait_time += args[i].elapse_ns; + count += args[i].counter; + } + if (test_pthread_key) { + ASSERT_EQ(0, pthread_key_delete(pthread_key)); + } else { + ASSERT_EQ(0, butil::thread_key_delete(thread_key)); + } + LOG(INFO) << (test_pthread_key ? "pthread_key" : "thread_key") + << " thread_num=" << thread_num + << " count=" << count + << " average_time=" << wait_time / (double)count; +} + +struct BAIDU_CACHELINE_ALIGNMENT ThreadLocalPerfArgs { + ThreadLocal* tl; + int64_t counter; + int64_t elapse_ns; + bool ready; + + ThreadLocalPerfArgs() + : tl(NULL) , counter(0) + , elapse_ns(0) , ready(false) {} +}; + +void* ThreadLocalPerfFunc(void* void_arg) { + auto args = (ThreadLocalPerfArgs*)void_arg; + args->ready = true; + EXPECT_TRUE(args->tl->get() != NULL); + butil::Timer t; + while (!g_stopped) { + if (g_started) { + break; + } + bthread_usleep(10); + } + t.start(); + while (!g_stopped) { + args->tl->get(); + ++args->counter; + } + t.stop(); + args->elapse_ns = t.n_elapsed(); + return NULL; +} + +void ThreadLocalPerfTest(int thread_num) { + g_started = false; + g_stopped = false; + ThreadLocal tl; + pthread_t threads[thread_num]; + std::vector args(thread_num); + for (int i = 0; i < thread_num; ++i) { + args[i].tl = &tl; + ASSERT_EQ(0, pthread_create(&threads[i], NULL, ThreadLocalPerfFunc, &args[i])); + } + while (true) { + bool all_ready = true; + for (int i = 0; i < thread_num; ++i) { + if (!args[i].ready) { + all_ready = false; + break; + } + } + if (all_ready) { + break; + } + usleep(1000); + } + g_started = true; + int64_t run_ms = 5 * 1000; + usleep(run_ms * 1000); + g_stopped = true; + int64_t wait_time = 0; + int64_t count = 0; + for (int i = 0; i < thread_num; ++i) { + pthread_join(threads[i], NULL); + wait_time += args[i].elapse_ns; + count += args[i].counter; + } + LOG(INFO) << "ThreadLocal thread_num=" << thread_num + << " count=" << count + << " average_time=" << wait_time / (double)count; +} + +TEST(ThreadLocalTest, thread_key_performance) { + int thread_num = 1; + ThreadKeyPerfTest(thread_num, true); + ThreadKeyPerfTest(thread_num, false); + ThreadLocalPerfTest(thread_num); + + thread_num = 4; + ThreadKeyPerfTest(thread_num, true); + ThreadKeyPerfTest(thread_num, false); + ThreadLocalPerfTest(thread_num); + + thread_num = 8; + ThreadKeyPerfTest(thread_num, true); + ThreadKeyPerfTest(thread_num, false); + ThreadLocalPerfTest(thread_num); + +} + +} +} // namespace butil \ No newline at end of file