diff --git a/src/bthread/key.cpp b/src/bthread/key.cpp index 433e1e1409..5720a675b2 100644 --- a/src/bthread/key.cpp +++ b/src/bthread/key.cpp @@ -20,21 +20,35 @@ // Date: Sun Aug 3 12:46:15 CST 2014 #include -#include "butil/macros.h" +#include + +#include "bthread/errno.h" // EAGAIN +#include "bthread/task_group.h" // TaskGroup #include "butil/atomicops.h" +#include "butil/macros.h" #include "butil/thread_key.h" +#include "butil/thread_local.h" #include "bvar/passive_status.h" -#include "bthread/errno.h" // EAGAIN -#include "bthread/task_group.h" // TaskGroup // Implement bthread_key_t related functions namespace bthread { +DEFINE_uint32(key_table_list_size, 5000, + "The maximum length of the KeyTableList. Once this value is " + "exceeded, a portion of the KeyTables will be moved to the " + "global free_keytables list."); + +DEFINE_uint32(borrow_from_globle_size, 100, + "The maximum number of KeyTables retrieved in a single operation " + "from the global free_keytables when no KeyTable exists in the " + "current thread's keytable_list."); + +EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(TaskGroup*, tls_task_group); + class KeyTable; // defined in task_group.cpp -extern __thread TaskGroup* tls_task_group; extern __thread LocalStorage tls_bls; static __thread bool tls_ever_created_keytable = false; @@ -52,7 +66,7 @@ static const uint32_t KEY_2NDLEVEL_SIZE = 32; static const uint32_t KEY_1STLEVEL_SIZE = 31; // Max tls in one thread, currently the value is 992 which should be enough -// for most projects throughout baidu. +// for most projects throughout baidu. static const uint32_t KEYS_MAX = KEY_2NDLEVEL_SIZE * KEY_1STLEVEL_SIZE; // destructors/version of TLS. @@ -94,7 +108,7 @@ class BAIDU_CACHELINE_ALIGNMENT SubKeyTable { // Set the position to NULL before calling dtor which may set // the position again. _data[i].ptr = NULL; - + KeyInfo info = bthread::s_key_info[offset + i]; if (info.dtor && _data[i].version == info.version) { info.dtor(p, info.dtor_args); @@ -205,17 +219,20 @@ class BAIDU_CACHELINE_ALIGNMENT KeyTable { SubKeyTable* _subs[KEY_1STLEVEL_SIZE]; }; -struct KeyTableList { - KeyTableList() { - keytable = NULL; +class BAIDU_CACHELINE_ALIGNMENT KeyTableList { +public: + KeyTableList() : + _head(NULL), _tail(NULL), _length(0) { } + ~KeyTableList() { - bthread::TaskGroup* g = bthread::tls_task_group; - bthread::KeyTable* old_kt = bthread::tls_bls.keytable; + TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); + KeyTable* old_kt = tls_bls.keytable; + KeyTable* keytable = _head; while (keytable) { - bthread::KeyTable* kt = keytable; + KeyTable* kt = keytable; keytable = kt->next; - bthread::tls_bls.keytable = kt; + tls_bls.keytable = kt; if (g) { g->current_task()->local_storage.keytable = kt; } @@ -223,35 +240,127 @@ struct KeyTableList { if (old_kt == kt) { old_kt = NULL; } - g = bthread::tls_task_group; + g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); } - bthread::tls_bls.keytable = old_kt; - if(g) { + tls_bls.keytable = old_kt; + if (g) { g->current_task()->local_storage.keytable = old_kt; } } - KeyTable* keytable; + + void append(KeyTable* keytable) { + if (keytable == NULL) { + return; + } + if (_head == NULL) { + _head = _tail = keytable; + } else { + _tail->next = keytable; + _tail = keytable; + } + keytable->next = NULL; + _length++; + } + + KeyTable* remove_front() { + if (_head == NULL) { + return NULL; + } + KeyTable* temp = _head; + _head = _head->next; + _length--; + if (_head == NULL) { + _tail = NULL; + } + return temp; + } + + int move_first_n_to_target(KeyTable** target, uint32_t size) { + if (size > _length || _head == NULL) { + return 0; + } + + KeyTable* current = _head; + KeyTable* prev = NULL; + uint32_t count = 0; + while (current != NULL && count < size) { + prev = current; + current = current->next; + count++; + } + if (prev != NULL) { + prev->next = NULL; + if (*target == NULL) { + *target = _head; + } else { + (*target)->next = _head; + } + _head = current; + _length -= count; + if (_head == NULL) { + _tail = NULL; + } + } + return count; + } + + inline uint32_t get_length() { + return _length; + } + + // Only for test + inline bool check_length() { + KeyTable* current = _head; + uint32_t count = 0; + while (current != NULL) { + current = current->next; + count++; + } + return count == _length; + } + +private: + KeyTable* _head; + KeyTable* _tail; + uint32_t _length; }; -static KeyTable* borrow_keytable(bthread_keytable_pool_t* pool) { +KeyTable* borrow_keytable(bthread_keytable_pool_t* pool) { if (pool != NULL && (pool->list || pool->free_keytables)) { KeyTable* p; pthread_rwlock_rdlock(&pool->rwlock); auto list = (butil::ThreadLocal*)pool->list; - if (list && list->get()->keytable) { - p = list->get()->keytable; - list->get()->keytable = p->next; - pthread_rwlock_unlock(&pool->rwlock); - return p; + if (list) { + p = list->get()->remove_front(); + if (p) { + pthread_rwlock_unlock(&pool->rwlock); + return p; + } } pthread_rwlock_unlock(&pool->rwlock); if (pool->free_keytables) { pthread_rwlock_wrlock(&pool->rwlock); p = (KeyTable*)pool->free_keytables; - if (p) { - pool->free_keytables = p->next; + if (list) { + for (uint32_t i = 0; i < FLAGS_borrow_from_globle_size; ++i) { + if (p) { + pool->free_keytables = p->next; + list->get()->append(p); + p = (KeyTable*)pool->free_keytables; + --pool->size; + } else { + break; + } + } + KeyTable* result = list->get()->remove_front(); pthread_rwlock_unlock(&pool->rwlock); - return p; + return result; + } else { + if (p) { + pool->free_keytables = p->next; + pthread_rwlock_unlock(&pool->rwlock); + return p; + } } pthread_rwlock_unlock(&pool->rwlock); } @@ -276,8 +385,17 @@ void return_keytable(bthread_keytable_pool_t* pool, KeyTable* kt) { return; } auto list = (butil::ThreadLocal*)pool->list; - kt->next = list->get()->keytable; - list->get()->keytable = kt; + list->get()->append(kt); + if (list->get()->get_length() > FLAGS_key_table_list_size) { + pthread_rwlock_unlock(&pool->rwlock); + pthread_rwlock_wrlock(&pool->rwlock); + if (!pool->destroyed) { + int out = list->get()->move_first_n_to_target( + (KeyTable**)(&pool->free_keytables), + FLAGS_key_table_list_size / 2); + pool->size += out; + } + } pthread_rwlock_unlock(&pool->rwlock); } @@ -327,6 +445,7 @@ int bthread_keytable_pool_init(bthread_keytable_pool_t* pool) { pthread_rwlock_init(&pool->rwlock, NULL); pool->list = new butil::ThreadLocal(); pool->free_keytables = NULL; + pool->size = 0; pool->destroyed = 0; return 0; } @@ -339,6 +458,7 @@ int bthread_keytable_pool_destroy(bthread_keytable_pool_t* pool) { bthread::KeyTable* saved_free_keytables = NULL; pthread_rwlock_wrlock(&pool->rwlock); pool->destroyed = 1; + pool->size = 0; delete (butil::ThreadLocal*)pool->list; saved_free_keytables = (bthread::KeyTable*)pool->free_keytables; pool->list = NULL; @@ -346,7 +466,8 @@ int bthread_keytable_pool_destroy(bthread_keytable_pool_t* pool) { pthread_rwlock_unlock(&pool->rwlock); // Cheat get/setspecific and destroy the keytables. - bthread::TaskGroup* g = bthread::tls_task_group; + bthread::TaskGroup* g = + bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); bthread::KeyTable* old_kt = bthread::tls_bls.keytable; while (saved_free_keytables) { bthread::KeyTable* kt = saved_free_keytables; @@ -356,7 +477,7 @@ int bthread_keytable_pool_destroy(bthread_keytable_pool_t* pool) { g->current_task()->local_storage.keytable = kt; } delete kt; - g = bthread::tls_task_group; + g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); } bthread::tls_bls.keytable = old_kt; if (g) { @@ -374,15 +495,34 @@ int bthread_keytable_pool_getstat(bthread_keytable_pool_t* pool, LOG(ERROR) << "Param[pool] or Param[stat] is NULL"; return EINVAL; } - pthread_rwlock_rdlock(&pool->rwlock); - size_t count = 0; - bthread::KeyTable* p = (bthread::KeyTable*)pool->free_keytables; - for (; p; p = p->next, ++count) {} - stat->nfree = count; + pthread_rwlock_wrlock(&pool->rwlock); + stat->nfree = pool->size; pthread_rwlock_unlock(&pool->rwlock); return 0; } +int get_thread_local_keytable_list_length(bthread_keytable_pool_t* pool) { + if (pool == NULL) { + LOG(ERROR) << "Param[pool] is NULL"; + return EINVAL; + } + int length = 0; + pthread_rwlock_rdlock(&pool->rwlock); + if (pool->destroyed) { + pthread_rwlock_unlock(&pool->rwlock); + return length; + } + auto list = (butil::ThreadLocal*)pool->list; + if (list) { + length = (int)(list->get()->get_length()); + if (!list->get()->check_length()) { + LOG(ERROR) << "Length is not equal"; + } + } + pthread_rwlock_unlock(&pool->rwlock); + return length; +} + // TODO: this is not strict `reserve' because we only check #free. // Currently there's no way to track KeyTables that may be returned // to the pool in future. @@ -418,6 +558,7 @@ void bthread_keytable_pool_reserve(bthread_keytable_pool_t* pool, } kt->next = (bthread::KeyTable*)pool->free_keytables; pool->free_keytables = kt; + ++pool->size; pthread_rwlock_unlock(&pool->rwlock); if (data == NULL) { break; @@ -467,10 +608,10 @@ int bthread_key_delete(bthread_key_t key) { ++bthread::s_key_info[key.index].version; } bthread::s_key_info[key.index].dtor = NULL; - bthread::s_key_info[key.index].dtor_args = NULL; + bthread::s_key_info[key.index].dtor_args = NULL; bthread::s_free_keys[bthread::nfreekey++] = key.index; return 0; - } + } } CHECK(false) << "bthread_key_delete is called on invalid " << key; return EINVAL; @@ -489,7 +630,7 @@ int bthread_setspecific(bthread_key_t key, void* data) { return ENOMEM; } bthread::tls_bls.keytable = kt; - bthread::TaskGroup* const g = bthread::tls_task_group; + bthread::TaskGroup* const g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); if (g) { g->current_task()->local_storage.keytable = kt; } else { @@ -510,7 +651,7 @@ void* bthread_getspecific(bthread_key_t key) { if (kt) { return kt->get_data(key); } - bthread::TaskGroup* const g = bthread::tls_task_group; + bthread::TaskGroup* const g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); if (g) { bthread::TaskMeta* const task = g->current_task(); kt = bthread::borrow_keytable(task->attr.keytable_pool); diff --git a/src/bthread/types.h b/src/bthread/types.h index ff99bb1c1f..0124fcbd7f 100644 --- a/src/bthread/types.h +++ b/src/bthread/types.h @@ -88,6 +88,7 @@ typedef struct { pthread_rwlock_t rwlock; void* list; void* free_keytables; + size_t size; int destroyed; } bthread_keytable_pool_t; diff --git a/src/bthread/unstable.h b/src/bthread/unstable.h index f5bdeecb49..dde5e6f6c5 100644 --- a/src/bthread/unstable.h +++ b/src/bthread/unstable.h @@ -128,6 +128,10 @@ extern int bthread_keytable_pool_destroy(bthread_keytable_pool_t*); extern int bthread_keytable_pool_getstat(bthread_keytable_pool_t* pool, bthread_keytable_pool_stat_t* stat); +// [RPC INTERNAL] +// Return thread local keytable list length if exist. +extern int get_thread_local_keytable_list_length(bthread_keytable_pool_t* pool); + // [RPC INTERNAL] // Reserve at most `nfree' keytables with `key' pointing to data created by // ctor(args). diff --git a/test/bthread_key_unittest.cpp b/test/bthread_key_unittest.cpp index c01ae7fe29..a7f11e7653 100644 --- a/test/bthread_key_unittest.cpp +++ b/test/bthread_key_unittest.cpp @@ -16,14 +16,33 @@ // under the License. #include // std::sort +#include #include "butil/atomicops.h" #include +#include "butil/thread_key.h" #include "butil/time.h" #include "butil/macros.h" #include "butil/scoped_lock.h" #include "butil/logging.h" #include "bthread/bthread.h" #include "bthread/unstable.h" +using namespace bthread; +namespace bthread { +DECLARE_uint32(key_table_list_size); +DECLARE_uint32(borrow_from_globle_size); +class KeyTable; +// defined in bthread/key.cpp +extern void return_keytable(bthread_keytable_pool_t*, KeyTable*); +extern KeyTable* borrow_keytable(bthread_keytable_pool_t*); +} // namespace bthread + +int main(int argc, char* argv[]) { + bthread::FLAGS_key_table_list_size = 20; + bthread::FLAGS_borrow_from_globle_size = 20; + testing::InitGoogleTest(&argc, argv); + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); + return RUN_ALL_TESTS(); +} extern "C" { int bthread_keytable_pool_size(bthread_keytable_pool_t* pool) { @@ -407,6 +426,144 @@ TEST(KeyTest, using_pool) { ASSERT_EQ(0, bthread_key_delete(key)); } +bthread_keytable_pool_t test_pool; +struct PoolData2 { + bthread_key_t key; + bthread_attr_t attr; +}; + +static void pool_dtor2(void* tls) { + delete static_cast(tls); +} + +static void usleep_thread_impl(PoolData2* data) { + if (NULL == bthread_getspecific(data->key)) { + PoolData2* data_new = new PoolData2(); + ASSERT_EQ(0, bthread_setspecific(data->key, data_new)); + } + bthread_usleep(1000L); + int length = get_thread_local_keytable_list_length(&test_pool); + ASSERT_LE((size_t)length, bthread::FLAGS_key_table_list_size); +} + +static void* usleep_thread(void* args) { + usleep_thread_impl((PoolData2*)args); + return NULL; +} + +static void launch_many_bthreads(PoolData2* data) { + std::vector tids; + tids.reserve(25000); + for (size_t i = 0; i < 25000; ++i) { + bthread_t t0; + PoolData2* data_tmp = new PoolData2(); + data_tmp->key = data->key; + ASSERT_EQ(0, bthread_start_background(&t0, &(data->attr), usleep_thread, data_tmp)); + tids.push_back(t0); + } + + usleep(3 * 1000 * 1000L); + for (size_t i = 0; i < tids.size(); ++i) { + bthread_join(tids[i], NULL); + } +} + +static void* run_launch_many_bthreads(void* args) { + PoolData2* data = (PoolData2*)args; + launch_many_bthreads(data); + return NULL; +} + +TEST(KeyTest, frequently_borrow_keytable_when_using_pool) { + PoolData2 data; + ASSERT_EQ(0, bthread_key_create(&data.key, pool_dtor2)); + + ASSERT_EQ(0, bthread_keytable_pool_init(&test_pool)); + ASSERT_EQ(0, bthread_keytable_pool_size(&test_pool)); + + ASSERT_EQ(0, bthread_attr_init(&data.attr)); + data.attr.keytable_pool = &test_pool; + + bthread_t bth; + ASSERT_EQ(0, bthread_start_urgent(&bth, &data.attr, run_launch_many_bthreads, &data)); + ASSERT_EQ(0, bthread_join(bth, NULL)); + std::cout << "Free keytable size is " + << bthread_keytable_pool_size(&test_pool) + << " use keytable size is 25000" << std::endl; + ASSERT_EQ(0, bthread_keytable_pool_destroy(&test_pool)); + ASSERT_EQ(0, bthread_key_delete(data.key)); +} + +std::vector table_list; +pthread_mutex_t table_mutex = PTHREAD_MUTEX_INITIALIZER; + +static void return_thread_impl() { + for (int i = 0; i < 32768; i++) { + { + BAIDU_SCOPED_LOCK(table_mutex); + if (table_list.size() > 0) { + bthread::KeyTable* keytable = table_list[0]; + table_list.erase(table_list.begin()); + bthread::return_keytable(&test_pool, keytable); + } + } + int length = get_thread_local_keytable_list_length(&test_pool); + ASSERT_LE((size_t)length, bthread::FLAGS_key_table_list_size); + } +} + +static void* return_thread(void*) { + return_thread_impl(); + return NULL; +} + +static void borrow_thread_impl() { + for (int i = 0; i < 32768; i++) { + bthread::KeyTable* keytable = bthread::borrow_keytable(&test_pool); + BAIDU_SCOPED_LOCK(table_mutex); + table_list.push_back(keytable); + } +} + +static void* borrow_thread(void*) { + borrow_thread_impl(); + return NULL; +} + +TEST(KeyTest, borrow_and_return_keytable_when_using_pool) { + ASSERT_EQ(0, bthread_keytable_pool_init(&test_pool)); + ASSERT_EQ(0, bthread_keytable_pool_size(&test_pool)); + + bthread_attr_t attr; + ASSERT_EQ(0, bthread_attr_init(&attr)); + attr.keytable_pool = &test_pool; + + bthread_t borrow_bth[8]; + bthread_t return_bth[8]; + for (size_t i = 0; i < arraysize(borrow_bth); ++i) { + ASSERT_EQ(0, bthread_start_background(&borrow_bth[i], &attr, + borrow_thread, NULL)); + } + for (size_t i = 0; i < arraysize(return_bth); ++i) { + ASSERT_EQ(0, bthread_start_background(&return_bth[i], &attr, + return_thread, NULL)); + } + + for (size_t i = 0; i < arraysize(borrow_bth); ++i) { + ASSERT_EQ(0, bthread_join(borrow_bth[i], NULL)); + } + for (size_t i = 0; i < arraysize(return_bth); ++i) { + ASSERT_EQ(0, bthread_join(return_bth[i], NULL)); + } + + for (size_t i = 0; i < table_list.size(); i++) { + bthread::return_keytable(&test_pool, table_list[i]); + } + table_list.clear(); + + ASSERT_EQ(0, bthread_keytable_pool_destroy(&test_pool)); +} + // NOTE: lid is short for 'lock in dtor'. butil::atomic lid_seq(1); std::vector lid_seqs; @@ -454,9 +611,10 @@ TEST(KeyTest, use_bthread_mutex_in_dtor) { std::sort(lid_seqs.begin(), lid_seqs.end()); ASSERT_EQ(lid_seqs.end(), std::unique(lid_seqs.begin(), lid_seqs.end())); ASSERT_EQ(arraysize(th) + arraysize(bth) - 1, - *(lid_seqs.end() - 1) - *lid_seqs.begin()); + *(lid_seqs.end() - 1) - *lid_seqs.begin()); ASSERT_EQ(0, bthread_key_delete(key)); + ASSERT_EQ(0, bthread_mutex_destroy(&mu)); } } // namespace