Skip to content

Commit

Permalink
Active Spinning and queue old bthread at the head for bthread mutex (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
chenBright authored Sep 6, 2024
1 parent 2c6644b commit fea2952
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 34 deletions.
29 changes: 22 additions & 7 deletions src/bthread/butex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,14 @@ inline bool erase_from_butex(ButexWaiter* bw, bool wakeup, WaiterState state) {
return erased;
}

struct WaitForButexArgs {
ButexBthreadWaiter* bw;
bool prepend;
};

static void wait_for_butex(void* arg) {
ButexBthreadWaiter* const bw = static_cast<ButexBthreadWaiter*>(arg);
auto args = static_cast<WaitForButexArgs*>(arg);
ButexBthreadWaiter* const bw = args->bw;
Butex* const b = bw->initial_butex;
// 1: waiter with timeout should have waiter_state == WAITER_STATE_READY
// before they're queued, otherwise the waiter is already timedout
Expand All @@ -560,7 +566,11 @@ static void wait_for_butex(void* arg) {
bw->waiter_state = WAITER_STATE_UNMATCHEDVALUE;
} else if (bw->waiter_state == WAITER_STATE_READY/*1*/ &&
!bw->task_meta->interrupted) {
b->waiters.Append(bw);
if (args->prepend) {
b->waiters.Prepend(bw);
} else {
b->waiters.Append(bw);
}
bw->container.store(b, butil::memory_order_relaxed);
if (bw->abstime != NULL) {
bw->sleep_id = get_global_timer_thread()->schedule(
Expand Down Expand Up @@ -593,7 +603,7 @@ static void wait_for_butex(void* arg) {
}

static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value,
const timespec* abstime) {
const timespec* abstime, bool prepend) {
TaskMeta* task = NULL;
ButexPthreadWaiter pw;
pw.tid = 0;
Expand All @@ -616,7 +626,11 @@ static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value,
errno = EINTR;
rc = -1;
} else {
b->waiters.Append(&pw);
if (prepend) {
b->waiters.Prepend(&pw);
} else {
b->waiters.Append(&pw);
}
pw.container.store(b, butil::memory_order_relaxed);
b->waiter_lock.unlock();

Expand Down Expand Up @@ -646,7 +660,7 @@ static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value,
return rc;
}

int butex_wait(void* arg, int expected_value, const timespec* abstime) {
int butex_wait(void* arg, int expected_value, const timespec* abstime, bool prepend) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
if (b->value.load(butil::memory_order_relaxed) != expected_value) {
errno = EWOULDBLOCK;
Expand All @@ -657,7 +671,7 @@ int butex_wait(void* arg, int expected_value, const timespec* abstime) {
}
TaskGroup* g = tls_task_group;
if (NULL == g || g->is_current_pthread_task()) {
return butex_wait_from_pthread(g, b, expected_value, abstime);
return butex_wait_from_pthread(g, b, expected_value, abstime, prepend);
}
ButexBthreadWaiter bbw;
// tid is 0 iff the thread is non-bthread
Expand Down Expand Up @@ -690,7 +704,8 @@ int butex_wait(void* arg, int expected_value, const timespec* abstime) {
// release fence matches with acquire fence in interrupt_and_consume_waiters
// in task_group.cpp to guarantee visibility of `interrupted'.
bbw.task_meta->current_waiter.store(&bbw, butil::memory_order_release);
g->set_remained(wait_for_butex, &bbw);
WaitForButexArgs args{ &bbw, prepend};
g->set_remained(wait_for_butex, &args);
TaskGroup::sched(&g);

// erase_from_butex_and_wakeup (called by TimerThread) is possibly still
Expand Down
7 changes: 6 additions & 1 deletion src/bthread/butex.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,13 @@ int butex_requeue(void* butex1, void* butex2);
// abstime is not NULL.
// About |abstime|:
// Different from FUTEX_WAIT, butex_wait uses absolute time.
// About |prepend|:
// If |prepend| is true, queue the bthread at the head of the queue,
// otherwise at the tail.
// Returns 0 on success, -1 otherwise and errno is set.
int butex_wait(void* butex, int expected_value, const timespec* abstime);
int butex_wait(void* butex, int expected_value,
const timespec* abstime,
bool prepend = false);

} // namespace bthread

Expand Down
63 changes: 40 additions & 23 deletions src/bthread/mutex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,22 @@
#include "butil/logging.h"
#include "butil/object_pool.h"
#include "butil/debug/stack_trace.h"
#include "butil/thread_local.h"
#include "bthread/butex.h" // butex_*
#include "bthread/mutex.h" // bthread_mutex_t
#include "bthread/sys_futex.h"
#include "bthread/log.h"
#include "butil/debug/stack_trace.h"
#include "bthread/processor.h"
#include "bthread/task_group.h"

extern "C" {
extern void* BAIDU_WEAK _dl_sym(void* handle, const char* symbol, void* caller);
}

namespace bthread {

EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(TaskGroup*, tls_task_group);

// Warm up backtrace before main().
const butil::debug::StackTrace ALLOW_UNUSED dummy_bt;

Expand Down Expand Up @@ -772,29 +777,41 @@ const MutexInternal MUTEX_LOCKED_RAW = {{1},{0},0};
BAIDU_CASSERT(sizeof(unsigned) == sizeof(MutexInternal),
sizeof_mutex_internal_must_equal_unsigned);

inline int mutex_lock_contended(bthread_mutex_t* m) {
butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0 &&
errno != EWOULDBLOCK && errno != EINTR/*note*/) {
// a mutex lock should ignore interruptions in general since
// user code is unlikely to check the return value.
return errno;
const int MAX_SPIN_ITER = 4;

inline int mutex_lock_contended_impl(
bthread_mutex_t* m, const struct timespec* __restrict abstime) {
// When a bthread first contends for a lock, active spinning makes sense.
// Spin only few times and only if local `rq' is empty.
TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
if (BAIDU_UNLIKELY(NULL == g || g->rq_size() == 0)) {
for (int i = 0; i < MAX_SPIN_ITER; ++i) {
cpu_relax();
}
}
return 0;
}

inline int mutex_timedlock_contended(
bthread_mutex_t* m, const struct timespec* __restrict abstime) {
butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
bool queue_lifo = false;
bool first_wait = true;
auto whole = (butil::atomic<unsigned>*)m->butex;
while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, abstime) < 0 &&
if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, abstime, queue_lifo) < 0 &&
errno != EWOULDBLOCK && errno != EINTR/*note*/) {
// a mutex lock should ignore interrruptions in general since
// A mutex lock should ignore interruptions in general since
// user code is unlikely to check the return value.
return errno;
}
// Ignore EWOULDBLOCK and EINTR.
if (first_wait && 0 == errno) {
first_wait = false;
}
if (!first_wait) {
// Normally, bthreads are queued in FIFO order. But competing with new
// arriving bthreads over the ownership of mutex, a woken up bthread
// has good chances of losing. Because new arriving bthreads are already
// running on CPU and there can be lots of them. In such case, for fairness,
// to avoid starvation, it is queued at the head of the waiter queue.
queue_lifo = true;
}
}
return 0;
}
Expand Down Expand Up @@ -880,7 +897,7 @@ int bthread_mutex_trylock(bthread_mutex_t* m) {
}

int bthread_mutex_lock_contended(bthread_mutex_t* m) {
return bthread::mutex_lock_contended(m);
return bthread::mutex_lock_contended_impl(m, NULL);
}

int bthread_mutex_lock(bthread_mutex_t* m) {
Expand All @@ -890,18 +907,18 @@ int bthread_mutex_lock(bthread_mutex_t* m) {
}
// Don't sample when contention profiler is off.
if (!bthread::g_cp) {
return bthread::mutex_lock_contended(m);
return bthread::mutex_lock_contended_impl(m, NULL);
}
// Ask Collector if this (contended) locking should be sampled.
const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
if (!sampling_range) { // Don't sample
return bthread::mutex_lock_contended(m);
return bthread::mutex_lock_contended_impl(m, NULL);
}
// Start sampling.
const int64_t start_ns = butil::cpuwide_time_ns();
// NOTE: Don't modify m->csite outside lock since multiple threads are
// still contending with each other.
const int rc = bthread::mutex_lock_contended(m);
const int rc = bthread::mutex_lock_contended_impl(m, NULL);
if (!rc) { // Inside lock
m->csite.duration_ns = butil::cpuwide_time_ns() - start_ns;
m->csite.sampling_range = sampling_range;
Expand All @@ -917,18 +934,18 @@ int bthread_mutex_timedlock(bthread_mutex_t* __restrict m,
}
// Don't sample when contention profiler is off.
if (!bthread::g_cp) {
return bthread::mutex_timedlock_contended(m, abstime);
return bthread::mutex_lock_contended_impl(m, abstime);
}
// Ask Collector if this (contended) locking should be sampled.
const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
if (!sampling_range) { // Don't sample
return bthread::mutex_timedlock_contended(m, abstime);
return bthread::mutex_lock_contended_impl(m, abstime);
}
// Start sampling.
const int64_t start_ns = butil::cpuwide_time_ns();
// NOTE: Don't modify m->csite outside lock since multiple threads are
// still contending with each other.
const int rc = bthread::mutex_timedlock_contended(m, abstime);
const int rc = bthread::mutex_lock_contended_impl(m, abstime);
if (!rc) { // Inside lock
m->csite.duration_ns = butil::cpuwide_time_ns() - start_ns;
m->csite.sampling_range = sampling_range;
Expand Down
5 changes: 5 additions & 0 deletions src/bthread/task_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ class TaskGroup {
// process make go on indefinitely.
void push_rq(bthread_t tid);

// Returns size of local run queue.
size_t rq_size() const {
return _rq.volatile_size();
}

bthread_tag_t tag() const { return _tag; }

private:
Expand Down
5 changes: 5 additions & 0 deletions src/butil/containers/linked_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ class LinkedList {
e->InsertBefore(&root_);
}

// Prepend |e| to the head of the linked list.
void Prepend(LinkNode<T>* e) {
e->InsertAfter(&root_);
}

LinkNode<T>* head() const {
return root_.next();
}
Expand Down
18 changes: 15 additions & 3 deletions src/butil/thread_local.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,43 @@

#define BAIDU_VOLATILE_THREAD_LOCAL(type, var_name, default_value) \
BAIDU_THREAD_LOCAL type var_name = default_value; \
static __attribute__((noinline, unused)) type get_##var_name(void) { \
__attribute__((noinline, unused)) type get_##var_name(void) { \
asm volatile(""); \
return var_name; \
} \
static __attribute__((noinline, unused)) type *get_ptr_##var_name(void) { \
__attribute__((noinline, unused)) type *get_ptr_##var_name(void) { \
type *ptr = &var_name; \
asm volatile("" : "+rm"(ptr)); \
return ptr; \
} \
static __attribute__((noinline, unused)) void set_##var_name(type v) { \
__attribute__((noinline, unused)) void set_##var_name(type v) { \
asm volatile(""); \
var_name = v; \
}

#define EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(type, var_name) \
type get_##var_name(void); \
type *get_ptr_##var_name(void); \
void set_##var_name(type v)

#if (defined (__aarch64__) && defined (__GNUC__)) || defined(__clang__)
// GNU compiler under aarch and Clang compiler is incorrectly caching the
// address of thread_local variables across a suspend-point. The following
// macros used to disable the volatile thread local access optimization.
#define BAIDU_GET_VOLATILE_THREAD_LOCAL(var_name) get_##var_name()
#define BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(var_name) get_ptr_##var_name()
#define BAIDU_SET_VOLATILE_THREAD_LOCAL(var_name, value) set_##var_name(value)

#define EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(type, var_name) \
type get_##var_name(void); \
type *get_ptr_##var_name(void); \
void set_##var_name(type v)
#else
#define BAIDU_GET_VOLATILE_THREAD_LOCAL(var_name) var_name
#define BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(var_name) &##var_name
#define BAIDU_SET_VOLATILE_THREAD_LOCAL(var_name, value) var_name = value
#define EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(type, var_name) \
extern BAIDU_THREAD_LOCAL type var_name
#endif

namespace butil {
Expand Down

0 comments on commit fea2952

Please sign in to comment.