Skip to content

Commit

Permalink
Support FastPthreadMutex contention profiler && expose FastPthreadMut…
Browse files Browse the repository at this point in the history
…ex to user (#2589)
  • Loading branch information
chenBright authored Jun 3, 2024
1 parent 4f85335 commit 3775b41
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 25 deletions.
6 changes: 3 additions & 3 deletions src/bthread/butex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ struct BAIDU_CACHELINE_ALIGNMENT Butex {

butil::atomic<int> value;
ButexWaiterList waiters;
internal::FastPthreadMutex waiter_lock;
FastPthreadMutex waiter_lock;
};

BAIDU_CASSERT(offsetof(Butex, value) == 0, offsetof_value_must_0);
Expand Down Expand Up @@ -460,8 +460,8 @@ int butex_requeue(void* arg, void* arg2) {

ButexWaiter* front = NULL;
{
std::unique_lock<internal::FastPthreadMutex> lck1(b->waiter_lock, std::defer_lock);
std::unique_lock<internal::FastPthreadMutex> lck2(m->waiter_lock, std::defer_lock);
std::unique_lock<FastPthreadMutex> lck1(b->waiter_lock, std::defer_lock);
std::unique_lock<FastPthreadMutex> lck2(m->waiter_lock, std::defer_lock);
butil::double_lock(lck1, lck2);
if (b->waiters.empty()) {
return 0;
Expand Down
2 changes: 1 addition & 1 deletion src/bthread/id.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ struct BAIDU_CACHELINE_ALIGNMENT Id {
// contended_ver: locked and contended
uint32_t first_ver;
uint32_t locked_ver;
internal::FastPthreadMutex mutex;
FastPthreadMutex mutex;
void* data;
int (*on_error)(bthread_id_t, void*, int);
int (*on_error2)(bthread_id_t, void*, int, const std::string&);
Expand Down
80 changes: 65 additions & 15 deletions src/bthread/mutex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,8 @@ int first_sys_pthread_mutex_unlock(pthread_mutex_t* mutex) {
return sys_pthread_mutex_unlock(mutex);
}

inline uint64_t hash_mutex_ptr(const pthread_mutex_t* m) {
template <typename Mutex>
inline uint64_t hash_mutex_ptr(const Mutex* m) {
return butil::fmix64((uint64_t)m);
}

Expand All @@ -468,7 +469,7 @@ static __thread bool tls_inside_lock = false;
#ifndef DONT_SPEEDUP_PTHREAD_CONTENTION_PROFILER_WITH_TLS
const int TLS_MAX_COUNT = 3;
struct MutexAndContentionSite {
pthread_mutex_t* mutex;
void* mutex;
bthread_contention_site_t csite;
};
struct TLSPthreadContentionSites {
Expand All @@ -482,8 +483,9 @@ static __thread TLSPthreadContentionSites tls_csites = {0,0,{}};
// Guaranteed in linux/win.
const int PTR_BITS = 48;

template <typename Mutex>
inline bthread_contention_site_t*
add_pthread_contention_site(pthread_mutex_t* mutex) {
add_pthread_contention_site(const Mutex* mutex) {
MutexMapEntry& entry = g_mutex_map[hash_mutex_ptr(mutex) & (MUTEX_MAP_SIZE - 1)];
butil::static_atomic<uint64_t>& m = entry.versioned_mutex;
uint64_t expected = m.load(butil::memory_order_relaxed);
Expand All @@ -500,8 +502,9 @@ add_pthread_contention_site(pthread_mutex_t* mutex) {
return NULL;
}

inline bool remove_pthread_contention_site(
pthread_mutex_t* mutex, bthread_contention_site_t* saved_csite) {
template <typename Mutex>
inline bool remove_pthread_contention_site(const Mutex* mutex,
bthread_contention_site_t* saved_csite) {
MutexMapEntry& entry = g_mutex_map[hash_mutex_ptr(mutex) & (MUTEX_MAP_SIZE - 1)];
butil::static_atomic<uint64_t>& m = entry.versioned_mutex;
if ((m.load(butil::memory_order_relaxed) & ((((uint64_t)1) << PTR_BITS) - 1))
Expand Down Expand Up @@ -538,16 +541,44 @@ void submit_contention(const bthread_contention_site_t& csite, int64_t now_ns) {
tls_inside_lock = false;
}

BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(pthread_mutex_t* mutex) {
namespace internal {
BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(pthread_mutex_t* mutex) {
return sys_pthread_mutex_lock(mutex);
}

BUTIL_FORCE_INLINE int pthread_mutex_trylock_internal(pthread_mutex_t* mutex) {
return ::pthread_mutex_trylock(mutex);
}

BUTIL_FORCE_INLINE int pthread_mutex_unlock_internal(pthread_mutex_t* mutex) {
return sys_pthread_mutex_unlock(mutex);
}

BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(FastPthreadMutex* mutex) {
mutex->lock();
return 0;
}

BUTIL_FORCE_INLINE int pthread_mutex_trylock_internal(FastPthreadMutex* mutex) {
return mutex->try_lock() ? 0 : EBUSY;
}

BUTIL_FORCE_INLINE int pthread_mutex_unlock_internal(FastPthreadMutex* mutex) {
mutex->unlock();
return 0;
}

template <typename Mutex>
BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(Mutex* mutex) {
// Don't change behavior of lock when profiler is off.
if (!g_cp ||
// collecting code including backtrace() and submit() may call
// pthread_mutex_lock and cause deadlock. Don't sample.
tls_inside_lock) {
return sys_pthread_mutex_lock(mutex);
return pthread_mutex_lock_internal(mutex);
}
// Don't slow down non-contended locks.
int rc = pthread_mutex_trylock(mutex);
int rc = pthread_mutex_trylock_internal(mutex);
if (rc != EBUSY) {
return rc;
}
Expand All @@ -567,16 +598,16 @@ BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(pthread_mutex_t* mutex) {
csite = &entry.csite;
if (!sampling_range) {
make_contention_site_invalid(&entry.csite);
return sys_pthread_mutex_lock(mutex);
return pthread_mutex_lock_internal(mutex);
}
}
#endif
if (!sampling_range) { // don't sample
return sys_pthread_mutex_lock(mutex);
return pthread_mutex_lock_internal(mutex);
}
// Lock and monitor the waiting time.
const int64_t start_ns = butil::cpuwide_time_ns();
rc = sys_pthread_mutex_lock(mutex);
rc = pthread_mutex_lock_internal(mutex);
if (!rc) { // Inside lock
if (!csite) {
csite = add_pthread_contention_site(mutex);
Expand All @@ -590,13 +621,14 @@ BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(pthread_mutex_t* mutex) {
return rc;
}

BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(pthread_mutex_t* mutex) {
template <typename Mutex>
BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(Mutex* mutex) {
// Don't change behavior of unlock when profiler is off.
if (!g_cp || tls_inside_lock) {
// This branch brings an issue that an entry created by
// add_pthread_contention_site may not be cleared. Thus we add a
// add_pthread_contention_site may not be cleared. Thus we add a
// 16-bit rolling version in the entry to find out such entry.
return sys_pthread_mutex_unlock(mutex);
return pthread_mutex_unlock_internal(mutex);
}
int64_t unlock_start_ns = 0;
bool miss_in_tls = true;
Expand All @@ -622,7 +654,7 @@ BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(pthread_mutex_t* mutex) {
unlock_start_ns = butil::cpuwide_time_ns();
}
}
const int rc = sys_pthread_mutex_unlock(mutex);
const int rc = pthread_mutex_unlock_internal(mutex);
// [Outside lock]
if (unlock_start_ns) {
const int64_t unlock_end_ns = butil::cpuwide_time_ns();
Expand All @@ -632,6 +664,16 @@ BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(pthread_mutex_t* mutex) {
return rc;
}

}

BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(pthread_mutex_t* mutex) {
return internal::pthread_mutex_lock_impl(mutex);
}

BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(pthread_mutex_t* mutex) {
return internal::pthread_mutex_unlock_impl(mutex);
}

// Implement bthread_mutex_t related functions
struct MutexInternal {
butil::static_atomic<unsigned char> locked;
Expand Down Expand Up @@ -714,6 +756,14 @@ void FastPthreadMutex::unlock() {
} // namespace internal
#endif // BTHREAD_USE_FAST_PTHREAD_MUTEX

void FastPthreadMutex::lock() {
internal::pthread_mutex_lock_impl(&_mutex);
}

void FastPthreadMutex::unlock() {
internal::pthread_mutex_unlock_impl(&_mutex);
}

} // namespace bthread

extern "C" {
Expand Down
15 changes: 14 additions & 1 deletion src/bthread/mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ namespace internal {
class FastPthreadMutex {
public:
FastPthreadMutex() : _futex(0) {}
~FastPthreadMutex() {}
~FastPthreadMutex() = default;
void lock();
void unlock();
bool try_lock();
Expand All @@ -86,6 +86,19 @@ typedef butil::Mutex FastPthreadMutex;
#endif
}

class FastPthreadMutex {
public:
FastPthreadMutex() = default;
~FastPthreadMutex() = default;
DISALLOW_COPY_AND_ASSIGN(FastPthreadMutex);

void lock();
void unlock();
bool try_lock() { return _mutex.try_lock(); }
private:
internal::FastPthreadMutex _mutex;
};

} // namespace bthread

// Specialize std::lock_guard and std::unique_lock for bthread_mutex_t
Expand Down
2 changes: 1 addition & 1 deletion src/bthread/timer_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class BAIDU_CACHELINE_ALIGNMENT TimerThread::Bucket {
Task* consume_tasks();

private:
internal::FastPthreadMutex _mutex;
FastPthreadMutex _mutex;
int64_t _nearest_run_time;
Task* _task_head;
};
Expand Down
2 changes: 1 addition & 1 deletion src/bthread/timer_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class TimerThread {

TimerThreadOptions _options;
Bucket* _buckets; // list of tasks to be run
internal::FastPthreadMutex _mutex; // protect _nearest_run_time
FastPthreadMutex _mutex; // protect _nearest_run_time
int64_t _nearest_run_time;
// the futex for wake up timer thread. can't use _nearest_run_time because
// it's 64-bit.
Expand Down
40 changes: 37 additions & 3 deletions test/bthread_mutex_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,9 @@ TEST(MutexTest, performance) {
PerfTest(&bth_mutex, (bthread_t*)NULL, thread_num, bthread_start_background, bthread_join);
}

template <typename Mutex>
void* loop_until_stopped(void* arg) {
bthread::Mutex *m = (bthread::Mutex*)arg;
auto m = (Mutex*)arg;
while (!g_stopped) {
BAIDU_SCOPED_LOCK(*m);
bthread_usleep(20);
Expand All @@ -251,11 +252,11 @@ TEST(MutexTest, mix_thread_types) {
// true, thus loop_until_stopped spins forever)
bthread_setconcurrency(M);
for (int i = 0; i < N; ++i) {
ASSERT_EQ(0, pthread_create(&pthreads[i], NULL, loop_until_stopped, &m));
ASSERT_EQ(0, pthread_create(&pthreads[i], NULL, loop_until_stopped<bthread::Mutex>, &m));
}
for (int i = 0; i < M; ++i) {
const bthread_attr_t *attr = i % 2 ? NULL : &BTHREAD_ATTR_PTHREAD;
ASSERT_EQ(0, bthread_start_urgent(&bthreads[i], attr, loop_until_stopped, &m));
ASSERT_EQ(0, bthread_start_urgent(&bthreads[i], attr, loop_until_stopped<bthread::Mutex>, &m));
}
bthread_usleep(1000L * 1000);
g_stopped = true;
Expand All @@ -266,4 +267,37 @@ TEST(MutexTest, mix_thread_types) {
pthread_join(pthreads[i], NULL);
}
}

TEST(MutexTest, fast_pthread_mutex) {
bthread::FastPthreadMutex mutex;
ASSERT_TRUE(mutex.try_lock());
mutex.unlock();
mutex.lock();
mutex.unlock();
{
BAIDU_SCOPED_LOCK(mutex);
}
{
std::unique_lock<bthread::FastPthreadMutex> lck1;
std::unique_lock<bthread::FastPthreadMutex> lck2(mutex);
lck1.swap(lck2);
lck1.unlock();
lck1.lock();
}
ASSERT_TRUE(mutex.try_lock());
mutex.unlock();

const int N = 16;
pthread_t pthreads[N];
for (int i = 0; i < N; ++i) {
ASSERT_EQ(0, pthread_create(&pthreads[i], NULL,
loop_until_stopped<bthread::FastPthreadMutex>, &mutex));
}
bthread_usleep(1000L * 1000);
g_stopped = true;
for (int i = 0; i < N; ++i) {
pthread_join(pthreads[i], NULL);
}
}

} // namespace

0 comments on commit 3775b41

Please sign in to comment.