diff --git a/src/bthread/rwlock.cpp b/src/bthread/rwlock.cpp new file mode 100644 index 0000000000..4458314d03 --- /dev/null +++ b/src/bthread/rwlock.cpp @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// bthread - An M:N threading library to make applications more concurrent. + +// Date: Tue August 10 23:50:50 CST 2024 + +#include // dlsym +#include +#include // O_RDONLY +#include + +#include "bthread/bthread.h" +#include "bthread/butex.h" // butex_* +#include "bthread/log.h" +#include "bthread/processor.h" // cpu_relax, barrier +#include "bthread/sys_futex.h" +#include "butil/atomicops.h" +#include "butil/containers/flat_map.h" +#include "butil/fd_guard.h" +#include "butil/file_util.h" +#include "butil/files/file.h" +#include "butil/files/file_path.h" +#include "butil/iobuf.h" +#include "butil/logging.h" +#include "butil/macros.h" // BAIDU_CASSERT +#include "butil/object_pool.h" +#include "butil/third_party/murmurhash3/murmurhash3.h" +#include "butil/unique_ptr.h" +#include "bvar/bvar.h" +#include "bvar/collector.h" + +namespace bthread { + +inline int rwlock_unrlock(bthread_rwlock_t* rwlock) { + butil::atomic* whole = (butil::atomic*)rwlock->lock_flag; + + while (true) { + unsigned r = whole->load(); + if (r == 0 || (r >> 31) != 0) { + LOG(ERROR) << "wrong unrlock!"; + return 0; + } + if (!(whole->compare_exchange_weak(r, r - 1))) { + continue; + } + // wake up write waiter + bthread::butex_wake(whole); + return 0; + } +} + +inline int rwlock_unwlock(bthread_rwlock_t* rwlock) { + butil::atomic* whole = (butil::atomic*)rwlock->lock_flag; + + while (true) { + unsigned r = whole->load(); + if (r != (unsigned)(1 << 31)) { + LOG(ERROR) << "wrong unwlock!"; + return 0; + } + if (!whole->compare_exchange_weak(r, 0)) { + continue; + } + // wake up write waiter first + bthread::butex_wake(whole); + butil::atomic* w_wait_count = (butil::atomic*)rwlock->w_wait_count; + // try reduce wait_count for read waiters,and wake up read waiters + w_wait_count->fetch_sub(1); + bthread::butex_wake_all(w_wait_count); + return 0; + } +} + +inline int rwlock_unlock(bthread_rwlock_t* rwlock) { + butil::atomic* whole = (butil::atomic*)rwlock->lock_flag; + if ((whole->load(butil::memory_order_relaxed) >> 31) != 0) { + return rwlock_unwlock(rwlock); + } else { + return rwlock_unrlock(rwlock); + } +} + +inline int rwlock_rlock(bthread_rwlock_t* rwlock) { + butil::atomic* whole = (butil::atomic*)rwlock->lock_flag; + + butil::atomic* w_wait_count = (butil::atomic*)rwlock->w_wait_count; + while (true) { + unsigned w = w_wait_count->load(); + if (w > 0) { + if (bthread::butex_wait(w_wait_count, w, NULL) < 0 && errno != EWOULDBLOCK && errno != EINTR) { + return errno; + } + continue; + } + // FIXME!! we don't consider read_wait_count overflow yet,2^31 should be enough here + unsigned r = whole->load(); + if ((r >> 31) == 0) { + if (whole->compare_exchange_weak(r, r + 1)) { + return 0; + } + } + } +} + +inline int rwlock_wlock(bthread_rwlock_t* rwlock) { + butil::atomic* w_wait_count = (butil::atomic*)rwlock->w_wait_count; + butil::atomic* whole = (butil::atomic*)rwlock->lock_flag; + // we don't consider w_wait_count overflow yet,2^32 should be enough here + w_wait_count->fetch_add(1); + while (true) { + unsigned r = whole->load(); + if (r != 0) { + if (bthread::butex_wait(whole, r, NULL) < 0 && errno != EWOULDBLOCK && errno != EINTR) { + whole->fetch_sub(1); + return errno; + } + continue; + } + if (whole->compare_exchange_weak(r, (unsigned)(1 << 31))) { + return 0; + } + } +} + +} // namespace bthread + +extern "C" { + +int bthread_rwlock_init(bthread_rwlock_t* __restrict rwlock, const bthread_rwlockattr_t* __restrict attr) { + rwlock->w_wait_count = bthread::butex_create_checked(); + rwlock->lock_flag = bthread::butex_create_checked(); + if (!rwlock->w_wait_count || !rwlock->lock_flag) { + LOG(ERROR) << "no memory"; + return ENOMEM; + } + *rwlock->w_wait_count = 0; + *rwlock->lock_flag = 0; + return 0; +} + +int bthread_rwlock_destroy(bthread_rwlock_t* rwlock) { + bthread::butex_destroy(rwlock->w_wait_count); + bthread::butex_destroy(rwlock->lock_flag); + return 0; +} + +int bthread_rwlock_rdlock(bthread_rwlock_t* rwlock) { return bthread::rwlock_rlock(rwlock); } + +int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock) { return bthread::rwlock_wlock(rwlock); } + +int bthread_rwlock_unrlock(bthread_rwlock_t* rwlock) { return bthread::rwlock_unrlock(rwlock); } + +int bthread_rwlock_unwlock(bthread_rwlock_t* rwlock) { return bthread::rwlock_unwlock(rwlock); } + +int bthread_rwlock_unlock(bthread_rwlock_t* rwlock) { return bthread::rwlock_unlock(rwlock); } + +} \ No newline at end of file diff --git a/src/bthread/rwlock.h b/src/bthread/rwlock.h new file mode 100644 index 0000000000..3d8dc612c7 --- /dev/null +++ b/src/bthread/rwlock.h @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// bthread - An M:N threading library to make applications more concurrent. + +// Date: Tue August 10 23:50:50 CST 2024 + +#ifndef BTHREAD_RW_MUTEX_H +#define BTHREAD_RW_MUTEX_H + +#include "bthread/bthread.h" +#include "bthread/types.h" +#include "butil/scoped_lock.h" +#include "bvar/utils/lock_timer.h" + +__BEGIN_DECLS +// ------------------------------------------- +// Functions for handling read-write locks. +// ------------------------------------------- + +// Initialize read-write lock `rwlock' using attributes `attr', or use +// the default values if later is NULL. +extern int bthread_rwlock_init(bthread_rwlock_t* __restrict rwlock, const bthread_rwlockattr_t* __restrict attr); + +// Destroy read-write lock `rwlock'. +extern int bthread_rwlock_destroy(bthread_rwlock_t* rwlock); + +// Acquire read lock for `rwlock'. +extern int bthread_rwlock_rdlock(bthread_rwlock_t* rwlock); + +// Try to acquire read lock for `rwlock'. +extern int bthread_rwlock_tryrdlock(bthread_rwlock_t* rwlock); + +// Try to acquire read lock for `rwlock' or return after specfied time. +extern int bthread_rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock, const struct timespec* __restrict abstime); + +// Acquire write lock for `rwlock'. +extern int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock); + +// Try to acquire write lock for `rwlock'. +extern int bthread_rwlock_trywrlock(bthread_rwlock_t* rwlock); + +// Try to acquire write lock for `rwlock' or return after specfied time. +extern int bthread_rwlock_timedwrlock(bthread_rwlock_t* __restrict rwlock, const struct timespec* __restrict abstime); + +// Unlock `rwlock'. +extern int bthread_rwlock_unlock(bthread_rwlock_t* rwlock); + +// --------------------------------------------------- +// Functions for handling read-write lock attributes. +// --------------------------------------------------- + +// Initialize attribute object `attr' with default values. +extern int bthread_rwlockattr_init(bthread_rwlockattr_t* attr); + +// Destroy attribute object `attr'. +extern int bthread_rwlockattr_destroy(bthread_rwlockattr_t* attr); + +// Return current setting of reader/writer preference. +extern int bthread_rwlockattr_getkind_np(const bthread_rwlockattr_t* attr, int* pref); + +// Set reader/write preference. +extern int bthread_rwlockattr_setkind_np(bthread_rwlockattr_t* attr, int pref); +__END_DECLS + +// Specialize std::lock_guard and std::unique_lock for bthread_rwlock_t + +namespace bthread { + +class wlock_guard { + public: + explicit wlock_guard(bthread_rwlock_t& mutex) : _pmutex(&mutex) { +#if !defined(NDEBUG) + const int rc = bthread_rwlock_wrlock(_pmutex); + if (rc) { + LOG(FATAL) << "Fail to lock bthread_rwlock_t=" << _pmutex << ", " << berror(rc); + _pmutex = NULL; + } +#else + bthread_rwlock_wrlock(_pmutex); +#endif // NDEBUG + } + + ~wlock_guard() { +#ifndef NDEBUG + if (_pmutex) { + bthread_rwlock_unlock(_pmutex); + } +#else + bthread_rwlock_unlock(_pmutex); +#endif + } + + private: + DISALLOW_COPY_AND_ASSIGN(wlock_guard); + bthread_rwlock_t* _pmutex; +}; + +class rlock_guard { + public: + explicit rlock_guard(bthread_rwlock_t& mutex) : _pmutex(&mutex) { +#if !defined(NDEBUG) + const int rc = bthread_rwlock_rdlock(_pmutex); + if (rc) { + LOG(FATAL) << "Fail to lock bthread_rwlock_t=" << _pmutex << ", " << berror(rc); + _pmutex = NULL; + } +#else + bthread_rwlock_rdlock(_pmutex); +#endif // NDEBUG + } + + ~rlock_guard() { +#ifndef NDEBUG + if (_pmutex) { + bthread_rwlock_unlock(_pmutex); + } +#else + bthread_rwlock_unlock(_pmutex); +#endif + } + + private: + DISALLOW_COPY_AND_ASSIGN(rlock_guard); + bthread_rwlock_t* _pmutex; +}; + +} // namespace bthread + +#endif \ No newline at end of file diff --git a/src/bthread/types.h b/src/bthread/types.h index 4b4f0565f5..b610270e78 100644 --- a/src/bthread/types.h +++ b/src/bthread/types.h @@ -181,6 +181,9 @@ typedef struct { } bthread_condattr_t; typedef struct { + bthread_mutex_t* m; + unsigned* w_wait_count; // include the bthread who holding wlock yet + unsigned* lock_flag; // highest bit 1 for wlocked, low 31 bit for read lock } bthread_rwlock_t; typedef struct { diff --git a/test/BUILD.bazel b/test/BUILD.bazel index d9af2ae7b2..e63cf02ce6 100644 --- a/test/BUILD.bazel +++ b/test/BUILD.bazel @@ -226,6 +226,7 @@ cc_test( # glog CHECK die with a fatal error "bthread_key_unittest.cpp", "bthread_butex_multi_tag_unittest.cpp", + "bthread_rwlock_unittest.cpp", ], ), copts = COPTS, diff --git a/test/bthread_rwlock_unittest.cpp b/test/bthread_rwlock_unittest.cpp index 60cbfbe2ae..8c61dee640 100644 --- a/test/bthread_rwlock_unittest.cpp +++ b/test/bthread_rwlock_unittest.cpp @@ -20,8 +20,10 @@ #include #include #include +#include "bthread/rwlock.h" #include "butil/time.h" #include "butil/macros.h" +#include "butil/gperftools_profiler.h" namespace { void* read_thread(void* arg) { @@ -76,4 +78,182 @@ TEST(RWLockTest, rdlock_performance) { pthread_mutex_destroy(&lock1); #endif } + +TEST(RwlockTest, sanity) { + bthread_rwlock_t m; + ASSERT_EQ(0, bthread_rwlock_init(&m, nullptr)); + ASSERT_EQ(0, bthread_rwlock_rdlock(&m)); + ASSERT_EQ(0, bthread_rwlock_unlock(&m)); + ASSERT_EQ(0, bthread_rwlock_wrlock(&m)); + ASSERT_EQ(0, bthread_rwlock_unlock(&m)); + ASSERT_EQ(0, bthread_rwlock_destroy(&m)); +} + +bool g_started = false; +bool g_stopped = false; + +template +struct BAIDU_CACHELINE_ALIGNMENT PerfArgs { + Rwlock* rwlock; + int64_t counter; + int64_t elapse_ns; + bool ready; + int32_t op_type; /*0 for read,1 for write*/ + + PerfArgs() : rwlock(nullptr), counter(0), elapse_ns(0), ready(false), op_type(0) {} +}; + +template +void* add_with_rwlock(void* void_arg) { + PerfArgs* args = (PerfArgs*)void_arg; + args->ready = true; + butil::Timer t; + while (!g_stopped) { + if (g_started) { + break; + } + bthread_usleep(1000); + } + t.start(); + while (!g_stopped) { + if(args->op_type == 0) { + // args->rwlock->Rlock(); + bthread_rwlock_rdlock(args->rwlock); + } + else { + // args->rwlock->Wlock(); + bthread_rwlock_wrlock(args->rwlock); + } + // args->rwlock->Unlock(); + bthread_rwlock_unlock(args->rwlock); + ++args->counter; + } + t.stop(); + args->elapse_ns = t.n_elapsed(); + return nullptr; +} + +int g_prof_name_counter = 0; + +template + void PerfTest(Rwlock* rwlock, + ThreadId* /*dummy*/, + int thread_num, + const ThreadCreateFn& create_fn, + const ThreadJoinFn& join_fn, + int op_type=0 /*0 for read,1 for write*/) { + g_started = false; + g_stopped = false; + ThreadId threads[thread_num]; + std::vector > args(thread_num); + for (int i = 0; i < thread_num; ++i) { + args[i].rwlock = rwlock; + args[i].op_type = op_type; + create_fn(&threads[i], nullptr, add_with_rwlock, &args[i]); + } + while (true) { + bool all_ready = true; + for (int i = 0; i < thread_num; ++i) { + if (!args[i].ready) { + all_ready = false; + break; + } + } + if (all_ready) { + break; + } + usleep(1000); + } + g_started = true; + char prof_name[32]; + snprintf(prof_name, sizeof(prof_name), "rwlock_perf_%d.prof", ++g_prof_name_counter); + ProfilerStart(prof_name); + usleep(500 * 1000); + ProfilerStop(); + g_stopped = true; + int64_t wait_time = 0; + int64_t count = 0; + for (int i = 0; i < thread_num; ++i) { + join_fn(threads[i], nullptr); + wait_time += args[i].elapse_ns; + count += args[i].counter; + } + LOG(INFO) << butil::class_name() << (op_type==0?" readlock ":" writelock ") << " in " + << ((void*)create_fn == (void*)pthread_create ? "pthread" : "bthread") + << " thread_num=" << thread_num + << " count=" << count + << " average_time=" << wait_time / (double)count; +} + + +TEST(RWLockTest, performance) { + const int thread_num = 12; + bthread_rwlock_t brw; + bthread_rwlock_init(&brw, nullptr); + //rlock + PerfTest(&brw, (pthread_t*)nullptr, thread_num, pthread_create, pthread_join); + PerfTest(&brw, (bthread_t*)nullptr, thread_num, bthread_start_background, bthread_join); + + //add test 1 rlock for compare + PerfTest(&brw, (pthread_t*)nullptr, 1, pthread_create, pthread_join); + PerfTest(&brw, (bthread_t*)nullptr, 1, bthread_start_background, bthread_join); + + //for wlock + PerfTest(&brw, (pthread_t*)nullptr, thread_num, pthread_create, pthread_join, 1); + PerfTest(&brw, (bthread_t*)nullptr, thread_num, bthread_start_background, bthread_join, 1); + + //add test 1 wlock for compare + PerfTest(&brw, (pthread_t*)nullptr, 1, pthread_create, pthread_join, 1); + PerfTest(&brw, (bthread_t*)nullptr, 1, bthread_start_background, bthread_join, 1); +} + +void* loop_until_stopped(void* arg) { + bthread_rwlock_t *m = (bthread_rwlock_t*)arg; + while (!g_stopped) { + int r = rand() % 100; + if((r&1)==0) + { + bthread::rlock_guard rg(*m); + } + else{ + bthread::wlock_guard wg(*m); + } + bthread_usleep(20); + } + return nullptr; +} + +TEST(RwlockTest, mix_thread_types) { + g_stopped = false; + const int N = 16; + const int M = N * 2; + // bthread::Mutex m; + bthread_rwlock_t brw; + bthread_rwlock_init(&brw, nullptr); + + pthread_t pthreads[N]; + bthread_t bthreads[M]; + // reserve enough workers for test. This is a must since we have + // BTHREAD_ATTR_PTHREAD bthreads which may cause deadlocks (the + // bhtread_usleep below can't be scheduled and g_stopped is never + // true, thus loop_until_stopped spins forever) + bthread_setconcurrency(M); + for (int i = 0; i < N; ++i) { + ASSERT_EQ(0, pthread_create(&pthreads[i], nullptr, loop_until_stopped, &brw)); + } + for (int i = 0; i < M; ++i) { + const bthread_attr_t *attr = i % 2 ? nullptr : &BTHREAD_ATTR_PTHREAD; + ASSERT_EQ(0, bthread_start_urgent(&bthreads[i], attr, loop_until_stopped, &brw)); + } + bthread_usleep(1000L * 1000); + g_stopped = true; + for (int i = 0; i < M; ++i) { + bthread_join(bthreads[i], nullptr); + } + for (int i = 0; i < N; ++i) { + pthread_join(pthreads[i], nullptr); + } +} + } // namespace