Skip to content

Commit

Permalink
Support semaphore and rwlock for bthread (#2752)
Browse files Browse the repository at this point in the history
* Support bthread semaphore

* Support bthread rwlock

* Support contention profiler for semaphore and rwlock
  • Loading branch information
chenBright authored Sep 26, 2024
1 parent f7b4a0f commit f17048e
Show file tree
Hide file tree
Showing 13 changed files with 1,493 additions and 67 deletions.
66 changes: 59 additions & 7 deletions src/bthread/bthread.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ extern int bthread_usleep(uint64_t microseconds);
// NOTE: mutexattr is not used in current mutex implementation. User shall
// always pass a NULL attribute.
extern int bthread_mutex_init(bthread_mutex_t* __restrict mutex,
const bthread_mutexattr_t* __restrict mutex_attr);
const bthread_mutexattr_t* __restrict attr);

// Destroy `mutex'.
extern int bthread_mutex_destroy(bthread_mutex_t* mutex);
Expand All @@ -188,6 +188,13 @@ extern int bthread_mutex_timedlock(bthread_mutex_t* __restrict mutex,
// Unlock `mutex'.
extern int bthread_mutex_unlock(bthread_mutex_t* mutex);

extern int bthread_mutexattr_init(bthread_mutexattr_t* attr);

// Disable the contention profile of the mutex.
extern int bthread_mutexattr_disable_csite(bthread_mutexattr_t* attr);

extern int bthread_mutexattr_destroy(bthread_mutexattr_t* attr);

// -----------------------------------------------
// Functions for handling conditional variables.
// -----------------------------------------------
Expand Down Expand Up @@ -241,9 +248,8 @@ extern int bthread_rwlock_rdlock(bthread_rwlock_t* rwlock);
extern int bthread_rwlock_tryrdlock(bthread_rwlock_t* rwlock);

// Try to acquire read lock for `rwlock' or return after specfied time.
extern int bthread_rwlock_timedrdlock(
bthread_rwlock_t* __restrict rwlock,
const struct timespec* __restrict abstime);
extern int bthread_rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock,
const struct timespec* __restrict abstime);

// Acquire write lock for `rwlock'.
extern int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock);
Expand All @@ -252,9 +258,8 @@ extern int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock);
extern int bthread_rwlock_trywrlock(bthread_rwlock_t* rwlock);

// Try to acquire write lock for `rwlock' or return after specfied time.
extern int bthread_rwlock_timedwrlock(
bthread_rwlock_t* __restrict rwlock,
const struct timespec* __restrict abstime);
extern int bthread_rwlock_timedwrlock(bthread_rwlock_t* __restrict rwlock,
const struct timespec* __restrict abstime);

// Unlock `rwlock'.
extern int bthread_rwlock_unlock(bthread_rwlock_t* rwlock);
Expand All @@ -277,6 +282,53 @@ extern int bthread_rwlockattr_getkind_np(const bthread_rwlockattr_t* attr,
extern int bthread_rwlockattr_setkind_np(bthread_rwlockattr_t* attr,
int pref);

// -------------------------------------------
// Functions for handling semaphore.
// -------------------------------------------

// Initialize the semaphore referred to by `sem'. The value of the
// initialized semaphore shall be `value'.
// Return 0 on success, errno otherwise.
extern int bthread_sem_init(bthread_sem_t* sem, unsigned value);

// Disable the contention profile of the semaphore referred to by `sem'.
extern int bthread_sem_disable_csite(bthread_sem_t* sem);

// Destroy the semaphore indicated by `sem'.
// Return 0 on success, errno otherwise.
extern int bthread_sem_destroy(bthread_sem_t* semaphore);

// Lock the semaphore referenced by `sem' by performing a semaphore
// lock operation on that semaphore. If the semaphore value is currently
// zero, then the calling (b)thread shall not return from the call to
// bthread_sema_wait() function until it locks the semaphore.
// Return 0 on success, errno otherwise.
extern int bthread_sem_wait(bthread_sem_t* sem);

// Lock the semaphore referenced by `sem' as in the bthread_sem_wait()
// function. However, if the semaphore cannot be locked without waiting
// for another (b)thread to unlock the semaphore by performing a
// bthread_sem_post() function, this wait shall be terminated when the
// specified timeout expires.
// Return 0 on success, errno otherwise.
extern int bthread_sem_timedwait(bthread_sem_t* sem, const struct timespec* abstime);

// Lock the semaphore referenced by `sem' only if the semaphore is
// currently not locked; that is, if the semaphore value is currently
// positive. Otherwise, it shall not lock the semaphore.
// Return 0 on success, errno otherwise.
extern int bthread_sem_trywait(bthread_sem_t* sem);

// Unlock the semaphore referenced by `sem' by performing
// a semaphore unlock operation on that semaphore.
// Return 0 on success, errno otherwise.
extern int bthread_sem_post(bthread_sem_t* sem);

// Unlock the semaphore referenced by `sem' by performing
// `n' semaphore unlock operation on that semaphore.
// Return 0 on success, errno otherwise.
extern int bthread_sem_post_n(bthread_sem_t* sem, size_t n);


// ----------------------------------------------------------------------
// Functions for handling barrier which is a new feature in 1003.1j-2000.
Expand Down
8 changes: 6 additions & 2 deletions src/bthread/butex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,14 +329,14 @@ int butex_wake(void* arg, bool nosignal) {
return 1;
}

int butex_wake_all(void* arg, bool nosignal) {
int butex_wake_n(void* arg, size_t n, bool nosignal) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);

ButexWaiterList bthread_waiters;
ButexWaiterList pthread_waiters;
{
BAIDU_SCOPED_LOCK(b->waiter_lock);
while (!b->waiters.empty()) {
for (size_t i = 0; (n == 0 || i < n) && !b->waiters.empty(); ++i) {
ButexWaiter* bw = b->waiters.head()->value();
bw->RemoveFromList();
bw->container.store(NULL, butil::memory_order_relaxed);
Expand Down Expand Up @@ -393,6 +393,10 @@ int butex_wake_all(void* arg, bool nosignal) {
return nwakeup;
}

int butex_wake_all(void* arg, bool nosignal) {
return butex_wake_n(arg, 0, nosignal);
}

int butex_wake_except(void* arg, bthread_t excluded_bthread) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);

Expand Down
5 changes: 5 additions & 0 deletions src/bthread/butex.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ void butex_destroy(void* butex);
// Returns # of threads woken up.
int butex_wake(void* butex, bool nosignal = false);

// Wake up all threads waiting on |butex| if n is zero,
// Otherwise, wake up at most n thread waiting on |butex|.
// Returns # of threads woken up.
int butex_wake_n(void* butex, size_t n, bool nosignal = false);

// Wake up all threads waiting on |butex|.
// Returns # of threads woken up.
int butex_wake_all(void* butex, bool nosignal = false);
Expand Down
87 changes: 43 additions & 44 deletions src/bthread/mutex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(TaskGroup*, tls_task_group);
const butil::debug::StackTrace ALLOW_UNUSED dummy_bt;

// For controlling contentions collected per second.
static bvar::CollectorSpeedLimit g_cp_sl = BVAR_COLLECTOR_SPEED_LIMIT_INITIALIZER;
bvar::CollectorSpeedLimit g_cp_sl = BVAR_COLLECTOR_SPEED_LIMIT_INITIALIZER;

const size_t MAX_CACHED_CONTENTIONS = 512;
// Skip frames which are always same: the unlock function and submit_contention()
Expand Down Expand Up @@ -267,7 +267,7 @@ void ContentionProfiler::flush_to_disk(bool ending) {

// If contention profiler is on, this variable will be set with a valid
// instance. NULL otherwise.
BAIDU_CACHELINE_ALIGNMENT static ContentionProfiler* g_cp = NULL;
BAIDU_CACHELINE_ALIGNMENT ContentionProfiler* g_cp = NULL;
// Need this version to solve an issue that non-empty entries left by
// previous contention profilers should be detected and overwritten.
static uint64_t g_cp_version = 0;
Expand Down Expand Up @@ -369,13 +369,11 @@ void ContentionProfilerStop() {
LOG(ERROR) << "Contention profiler is not started!";
}

BUTIL_FORCE_INLINE bool
is_contention_site_valid(const bthread_contention_site_t& cs) {
return cs.sampling_range;
bool is_contention_site_valid(const bthread_contention_site_t& cs) {
return bvar::is_sampling_range_valid(cs.sampling_range);
}

BUTIL_FORCE_INLINE void
make_contention_site_invalid(bthread_contention_site_t* cs) {
void make_contention_site_invalid(bthread_contention_site_t* cs) {
cs->sampling_range = 0;
}

Expand Down Expand Up @@ -671,13 +669,13 @@ BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(Mutex* mutex) {
MutexAndContentionSite& entry = fast_alt.list[fast_alt.count++];
entry.mutex = mutex;
csite = &entry.csite;
if (!sampling_range) {
if (!bvar::is_sampling_range_valid(sampling_range)) {
make_contention_site_invalid(&entry.csite);
return pthread_mutex_lock_internal(mutex);
}
}
#endif
if (!sampling_range) { // don't sample
if (!bvar::is_sampling_range_valid(sampling_range)) { // don't sample
return pthread_mutex_lock_internal(mutex);
}
// Lock and monitor the waiting time.
Expand Down Expand Up @@ -873,13 +871,14 @@ void FastPthreadMutex::unlock() {
extern "C" {

int bthread_mutex_init(bthread_mutex_t* __restrict m,
const bthread_mutexattr_t* __restrict) {
const bthread_mutexattr_t* __restrict attr) {
bthread::make_contention_site_invalid(&m->csite);
m->butex = bthread::butex_create_checked<unsigned>();
if (!m->butex) {
return ENOMEM;
}
*m->butex = 0;
m->enable_csite = NULL == attr ? true : attr->enable_csite;
return 0;
}

Expand All @@ -900,35 +899,9 @@ int bthread_mutex_lock_contended(bthread_mutex_t* m) {
return bthread::mutex_lock_contended_impl(m, NULL);
}

int bthread_mutex_lock(bthread_mutex_t* m) {
bthread::MutexInternal* split = (bthread::MutexInternal*)m->butex;
if (!split->locked.exchange(1, butil::memory_order_acquire)) {
return 0;
}
// Don't sample when contention profiler is off.
if (!bthread::g_cp) {
return bthread::mutex_lock_contended_impl(m, NULL);
}
// Ask Collector if this (contended) locking should be sampled.
const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
if (!sampling_range) { // Don't sample
return bthread::mutex_lock_contended_impl(m, NULL);
}
// Start sampling.
const int64_t start_ns = butil::cpuwide_time_ns();
// NOTE: Don't modify m->csite outside lock since multiple threads are
// still contending with each other.
const int rc = bthread::mutex_lock_contended_impl(m, NULL);
if (!rc) { // Inside lock
m->csite.duration_ns = butil::cpuwide_time_ns() - start_ns;
m->csite.sampling_range = sampling_range;
} // else rare
return rc;
}

int bthread_mutex_timedlock(bthread_mutex_t* __restrict m,
const struct timespec* __restrict abstime) {
bthread::MutexInternal* split = (bthread::MutexInternal*)m->butex;
static int bthread_mutex_lock_impl(bthread_mutex_t* __restrict m,
const struct timespec* __restrict abstime) {
auto split = (bthread::MutexInternal*)m->butex;
if (!split->locked.exchange(1, butil::memory_order_acquire)) {
return 0;
}
Expand All @@ -937,8 +910,9 @@ int bthread_mutex_timedlock(bthread_mutex_t* __restrict m,
return bthread::mutex_lock_contended_impl(m, abstime);
}
// Ask Collector if this (contended) locking should be sampled.
const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
if (!sampling_range) { // Don't sample
const size_t sampling_range =
m->enable_csite ? bvar::is_collectable(&bthread::g_cp_sl) : bvar::INVALID_SAMPLING_RANGE;
if (!bvar::is_sampling_range_valid(sampling_range)) { // Don't sample
return bthread::mutex_lock_contended_impl(m, abstime);
}
// Start sampling.
Expand All @@ -958,10 +932,20 @@ int bthread_mutex_timedlock(bthread_mutex_t* __restrict m,
return rc;
}

int bthread_mutex_lock(bthread_mutex_t* m) {
return bthread_mutex_lock_impl(m, NULL);
}

int bthread_mutex_timedlock(bthread_mutex_t* __restrict m,
const struct timespec* __restrict abstime) {
return bthread_mutex_lock_impl(m, abstime);
}

int bthread_mutex_unlock(bthread_mutex_t* m) {
butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
auto whole = (butil::atomic<unsigned>*)m->butex;
bthread_contention_site_t saved_csite = {0, 0};
if (bthread::is_contention_site_valid(m->csite)) {
bool is_valid = bthread::is_contention_site_valid(m->csite);
if (is_valid) {
saved_csite = m->csite;
bthread::make_contention_site_invalid(&m->csite);
}
Expand All @@ -971,7 +955,7 @@ int bthread_mutex_unlock(bthread_mutex_t* m) {
return 0;
}
// Wakeup one waiter
if (!bthread::is_contention_site_valid(saved_csite)) {
if (!is_valid) {
bthread::butex_wake(whole);
return 0;
}
Expand All @@ -983,6 +967,21 @@ int bthread_mutex_unlock(bthread_mutex_t* m) {
return 0;
}

int bthread_mutexattr_init(bthread_mutexattr_t* attr) {
attr->enable_csite = true;
return 0;
}

int bthread_mutexattr_disable_csite(bthread_mutexattr_t* attr) {
attr->enable_csite = false;
return 0;
}

int bthread_mutexattr_destroy(bthread_mutexattr_t* attr) {
attr->enable_csite = true;
return 0;
}

#ifndef NO_PTHREAD_MUTEX_HOOK
int pthread_mutex_lock(pthread_mutex_t* __mutex) {
return bthread::pthread_mutex_lock_impl(__mutex);
Expand Down
14 changes: 8 additions & 6 deletions src/bthread/mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

__BEGIN_DECLS
extern int bthread_mutex_init(bthread_mutex_t* __restrict mutex,
const bthread_mutexattr_t* __restrict mutex_attr);
const bthread_mutexattr_t* __restrict attr);
extern int bthread_mutex_destroy(bthread_mutex_t* mutex);
extern int bthread_mutex_trylock(bthread_mutex_t* mutex);
extern int bthread_mutex_lock(bthread_mutex_t* mutex);
Expand All @@ -48,19 +48,21 @@ class Mutex {
Mutex() {
int ec = bthread_mutex_init(&_mutex, NULL);
if (ec != 0) {
throw std::system_error(std::error_code(ec, std::system_category()), "Mutex constructor failed");
throw std::system_error(std::error_code(ec, std::system_category()),
"Mutex constructor failed");
}
}
~Mutex() { CHECK_EQ(0, bthread_mutex_destroy(&_mutex)); }
native_handler_type native_handler() { return &_mutex; }
void lock() {
int ec = bthread_mutex_lock(&_mutex);
if (ec != 0) {
throw std::system_error(std::error_code(ec, std::system_category()), "Mutex lock failed");
throw std::system_error(std::error_code(ec, std::system_category()),
"Mutex lock failed");
}
}
void unlock() { bthread_mutex_unlock(&_mutex); }
bool try_lock() { return !bthread_mutex_trylock(&_mutex); }
void unlock() { (bthread_mutex_unlock(&_mutex)); }
bool try_lock() { return 0 == bthread_mutex_trylock(&_mutex); }
// TODO(chenzhangyi01): Complement interfaces for C++11
private:
DISALLOW_COPY_AND_ASSIGN(Mutex);
Expand Down Expand Up @@ -107,7 +109,7 @@ namespace std {

template <> class lock_guard<bthread_mutex_t> {
public:
explicit lock_guard(bthread_mutex_t & mutex) : _pmutex(&mutex) {
explicit lock_guard(bthread_mutex_t& mutex) : _pmutex(&mutex) {
#if !defined(NDEBUG)
const int rc = bthread_mutex_lock(_pmutex);
if (rc) {
Expand Down
Loading

0 comments on commit f17048e

Please sign in to comment.