Skip to content

Commit

Permalink
Optimize keytablelist implementation (apache#2768)
Browse files Browse the repository at this point in the history
* Optimize keytablelist implementation

* fix

* add some ut and small fix

* add size in bthread_keytable_pool_t
  • Loading branch information
MJY-HUST authored Oct 7, 2024
1 parent 249bc51 commit d94c88f
Show file tree
Hide file tree
Showing 4 changed files with 344 additions and 40 deletions.
219 changes: 180 additions & 39 deletions src/bthread/key.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,35 @@
// Date: Sun Aug 3 12:46:15 CST 2014

#include <pthread.h>
#include "butil/macros.h"
#include <gflags/gflags.h>

#include "bthread/errno.h" // EAGAIN
#include "bthread/task_group.h" // TaskGroup
#include "butil/atomicops.h"
#include "butil/macros.h"
#include "butil/thread_key.h"
#include "butil/thread_local.h"
#include "bvar/passive_status.h"
#include "bthread/errno.h" // EAGAIN
#include "bthread/task_group.h" // TaskGroup

// Implement bthread_key_t related functions

namespace bthread {

DEFINE_uint32(key_table_list_size, 5000,
"The maximum length of the KeyTableList. Once this value is "
"exceeded, a portion of the KeyTables will be moved to the "
"global free_keytables list.");

DEFINE_uint32(borrow_from_globle_size, 100,
"The maximum number of KeyTables retrieved in a single operation "
"from the global free_keytables when no KeyTable exists in the "
"current thread's keytable_list.");

EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(TaskGroup*, tls_task_group);

class KeyTable;

// defined in task_group.cpp
extern __thread TaskGroup* tls_task_group;
extern __thread LocalStorage tls_bls;
static __thread bool tls_ever_created_keytable = false;

Expand All @@ -52,7 +66,7 @@ static const uint32_t KEY_2NDLEVEL_SIZE = 32;
static const uint32_t KEY_1STLEVEL_SIZE = 31;

// Max tls in one thread, currently the value is 992 which should be enough
// for most projects throughout baidu.
// for most projects throughout baidu.
static const uint32_t KEYS_MAX = KEY_2NDLEVEL_SIZE * KEY_1STLEVEL_SIZE;

// destructors/version of TLS.
Expand Down Expand Up @@ -94,7 +108,7 @@ class BAIDU_CACHELINE_ALIGNMENT SubKeyTable {
// Set the position to NULL before calling dtor which may set
// the position again.
_data[i].ptr = NULL;

KeyInfo info = bthread::s_key_info[offset + i];
if (info.dtor && _data[i].version == info.version) {
info.dtor(p, info.dtor_args);
Expand Down Expand Up @@ -205,53 +219,148 @@ class BAIDU_CACHELINE_ALIGNMENT KeyTable {
SubKeyTable* _subs[KEY_1STLEVEL_SIZE];
};

struct KeyTableList {
KeyTableList() {
keytable = NULL;
class BAIDU_CACHELINE_ALIGNMENT KeyTableList {
public:
KeyTableList() :
_head(NULL), _tail(NULL), _length(0) {
}

~KeyTableList() {
bthread::TaskGroup* g = bthread::tls_task_group;
bthread::KeyTable* old_kt = bthread::tls_bls.keytable;
TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
KeyTable* old_kt = tls_bls.keytable;
KeyTable* keytable = _head;
while (keytable) {
bthread::KeyTable* kt = keytable;
KeyTable* kt = keytable;
keytable = kt->next;
bthread::tls_bls.keytable = kt;
tls_bls.keytable = kt;
if (g) {
g->current_task()->local_storage.keytable = kt;
}
delete kt;
if (old_kt == kt) {
old_kt = NULL;
}
g = bthread::tls_task_group;
g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
}
bthread::tls_bls.keytable = old_kt;
if(g) {
tls_bls.keytable = old_kt;
if (g) {
g->current_task()->local_storage.keytable = old_kt;
}
}
KeyTable* keytable;

void append(KeyTable* keytable) {
if (keytable == NULL) {
return;
}
if (_head == NULL) {
_head = _tail = keytable;
} else {
_tail->next = keytable;
_tail = keytable;
}
keytable->next = NULL;
_length++;
}

KeyTable* remove_front() {
if (_head == NULL) {
return NULL;
}
KeyTable* temp = _head;
_head = _head->next;
_length--;
if (_head == NULL) {
_tail = NULL;
}
return temp;
}

int move_first_n_to_target(KeyTable** target, uint32_t size) {
if (size > _length || _head == NULL) {
return 0;
}

KeyTable* current = _head;
KeyTable* prev = NULL;
uint32_t count = 0;
while (current != NULL && count < size) {
prev = current;
current = current->next;
count++;
}
if (prev != NULL) {
prev->next = NULL;
if (*target == NULL) {
*target = _head;
} else {
(*target)->next = _head;
}
_head = current;
_length -= count;
if (_head == NULL) {
_tail = NULL;
}
}
return count;
}

inline uint32_t get_length() {
return _length;
}

// Only for test
inline bool check_length() {
KeyTable* current = _head;
uint32_t count = 0;
while (current != NULL) {
current = current->next;
count++;
}
return count == _length;
}

private:
KeyTable* _head;
KeyTable* _tail;
uint32_t _length;
};

static KeyTable* borrow_keytable(bthread_keytable_pool_t* pool) {
KeyTable* borrow_keytable(bthread_keytable_pool_t* pool) {
if (pool != NULL && (pool->list || pool->free_keytables)) {
KeyTable* p;
pthread_rwlock_rdlock(&pool->rwlock);
auto list = (butil::ThreadLocal<bthread::KeyTableList>*)pool->list;
if (list && list->get()->keytable) {
p = list->get()->keytable;
list->get()->keytable = p->next;
pthread_rwlock_unlock(&pool->rwlock);
return p;
if (list) {
p = list->get()->remove_front();
if (p) {
pthread_rwlock_unlock(&pool->rwlock);
return p;
}
}
pthread_rwlock_unlock(&pool->rwlock);
if (pool->free_keytables) {
pthread_rwlock_wrlock(&pool->rwlock);
p = (KeyTable*)pool->free_keytables;
if (p) {
pool->free_keytables = p->next;
if (list) {
for (uint32_t i = 0; i < FLAGS_borrow_from_globle_size; ++i) {
if (p) {
pool->free_keytables = p->next;
list->get()->append(p);
p = (KeyTable*)pool->free_keytables;
--pool->size;
} else {
break;
}
}
KeyTable* result = list->get()->remove_front();
pthread_rwlock_unlock(&pool->rwlock);
return p;
return result;
} else {
if (p) {
pool->free_keytables = p->next;
pthread_rwlock_unlock(&pool->rwlock);
return p;
}
}
pthread_rwlock_unlock(&pool->rwlock);
}
Expand All @@ -276,8 +385,17 @@ void return_keytable(bthread_keytable_pool_t* pool, KeyTable* kt) {
return;
}
auto list = (butil::ThreadLocal<bthread::KeyTableList>*)pool->list;
kt->next = list->get()->keytable;
list->get()->keytable = kt;
list->get()->append(kt);
if (list->get()->get_length() > FLAGS_key_table_list_size) {
pthread_rwlock_unlock(&pool->rwlock);
pthread_rwlock_wrlock(&pool->rwlock);
if (!pool->destroyed) {
int out = list->get()->move_first_n_to_target(
(KeyTable**)(&pool->free_keytables),
FLAGS_key_table_list_size / 2);
pool->size += out;
}
}
pthread_rwlock_unlock(&pool->rwlock);
}

Expand Down Expand Up @@ -327,6 +445,7 @@ int bthread_keytable_pool_init(bthread_keytable_pool_t* pool) {
pthread_rwlock_init(&pool->rwlock, NULL);
pool->list = new butil::ThreadLocal<bthread::KeyTableList>();
pool->free_keytables = NULL;
pool->size = 0;
pool->destroyed = 0;
return 0;
}
Expand All @@ -339,14 +458,16 @@ int bthread_keytable_pool_destroy(bthread_keytable_pool_t* pool) {
bthread::KeyTable* saved_free_keytables = NULL;
pthread_rwlock_wrlock(&pool->rwlock);
pool->destroyed = 1;
pool->size = 0;
delete (butil::ThreadLocal<bthread::KeyTableList>*)pool->list;
saved_free_keytables = (bthread::KeyTable*)pool->free_keytables;
pool->list = NULL;
pool->free_keytables = NULL;
pthread_rwlock_unlock(&pool->rwlock);

// Cheat get/setspecific and destroy the keytables.
bthread::TaskGroup* g = bthread::tls_task_group;
bthread::TaskGroup* g =
bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
bthread::KeyTable* old_kt = bthread::tls_bls.keytable;
while (saved_free_keytables) {
bthread::KeyTable* kt = saved_free_keytables;
Expand All @@ -356,7 +477,7 @@ int bthread_keytable_pool_destroy(bthread_keytable_pool_t* pool) {
g->current_task()->local_storage.keytable = kt;
}
delete kt;
g = bthread::tls_task_group;
g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
}
bthread::tls_bls.keytable = old_kt;
if (g) {
Expand All @@ -374,15 +495,34 @@ int bthread_keytable_pool_getstat(bthread_keytable_pool_t* pool,
LOG(ERROR) << "Param[pool] or Param[stat] is NULL";
return EINVAL;
}
pthread_rwlock_rdlock(&pool->rwlock);
size_t count = 0;
bthread::KeyTable* p = (bthread::KeyTable*)pool->free_keytables;
for (; p; p = p->next, ++count) {}
stat->nfree = count;
pthread_rwlock_wrlock(&pool->rwlock);
stat->nfree = pool->size;
pthread_rwlock_unlock(&pool->rwlock);
return 0;
}

int get_thread_local_keytable_list_length(bthread_keytable_pool_t* pool) {
if (pool == NULL) {
LOG(ERROR) << "Param[pool] is NULL";
return EINVAL;
}
int length = 0;
pthread_rwlock_rdlock(&pool->rwlock);
if (pool->destroyed) {
pthread_rwlock_unlock(&pool->rwlock);
return length;
}
auto list = (butil::ThreadLocal<bthread::KeyTableList>*)pool->list;
if (list) {
length = (int)(list->get()->get_length());
if (!list->get()->check_length()) {
LOG(ERROR) << "Length is not equal";
}
}
pthread_rwlock_unlock(&pool->rwlock);
return length;
}

// TODO: this is not strict `reserve' because we only check #free.
// Currently there's no way to track KeyTables that may be returned
// to the pool in future.
Expand Down Expand Up @@ -418,6 +558,7 @@ void bthread_keytable_pool_reserve(bthread_keytable_pool_t* pool,
}
kt->next = (bthread::KeyTable*)pool->free_keytables;
pool->free_keytables = kt;
++pool->size;
pthread_rwlock_unlock(&pool->rwlock);
if (data == NULL) {
break;
Expand Down Expand Up @@ -467,10 +608,10 @@ int bthread_key_delete(bthread_key_t key) {
++bthread::s_key_info[key.index].version;
}
bthread::s_key_info[key.index].dtor = NULL;
bthread::s_key_info[key.index].dtor_args = NULL;
bthread::s_key_info[key.index].dtor_args = NULL;
bthread::s_free_keys[bthread::nfreekey++] = key.index;
return 0;
}
}
}
CHECK(false) << "bthread_key_delete is called on invalid " << key;
return EINVAL;
Expand All @@ -489,7 +630,7 @@ int bthread_setspecific(bthread_key_t key, void* data) {
return ENOMEM;
}
bthread::tls_bls.keytable = kt;
bthread::TaskGroup* const g = bthread::tls_task_group;
bthread::TaskGroup* const g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
if (g) {
g->current_task()->local_storage.keytable = kt;
} else {
Expand All @@ -510,7 +651,7 @@ void* bthread_getspecific(bthread_key_t key) {
if (kt) {
return kt->get_data(key);
}
bthread::TaskGroup* const g = bthread::tls_task_group;
bthread::TaskGroup* const g = bthread::BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
if (g) {
bthread::TaskMeta* const task = g->current_task();
kt = bthread::borrow_keytable(task->attr.keytable_pool);
Expand Down
1 change: 1 addition & 0 deletions src/bthread/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ typedef struct {
pthread_rwlock_t rwlock;
void* list;
void* free_keytables;
size_t size;
int destroyed;
} bthread_keytable_pool_t;

Expand Down
4 changes: 4 additions & 0 deletions src/bthread/unstable.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ extern int bthread_keytable_pool_destroy(bthread_keytable_pool_t*);
extern int bthread_keytable_pool_getstat(bthread_keytable_pool_t* pool,
bthread_keytable_pool_stat_t* stat);

// [RPC INTERNAL]
// Return thread local keytable list length if exist.
extern int get_thread_local_keytable_list_length(bthread_keytable_pool_t* pool);

// [RPC INTERNAL]
// Reserve at most `nfree' keytables with `key' pointing to data created by
// ctor(args).
Expand Down
Loading

0 comments on commit d94c88f

Please sign in to comment.