Skip to content
This repository has been archived by the owner on Feb 1, 2021. It is now read-only.

add recursive shared mutex library and add it to makefiles #149

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/Makefile.am
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
DIST_SUBDIRS = secp256k1 univalue
DIST_SUBDIRS = secp256k1 univalue rsm

AM_LDFLAGS = $(PTHREAD_CFLAGS) $(LIBTOOL_LDFLAGS) $(HARDENED_LDFLAGS)
AM_CXXFLAGS = $(HARDENED_CXXFLAGS) $(ERROR_CXXFLAGS)
Expand All @@ -10,6 +10,7 @@ BITCOIN_INCLUDES=-I$(builddir) -I$(builddir)/build $(BDB_CPPFLAGS) $(BOOST_CPPFL

BITCOIN_INCLUDES += -I$(srcdir)/secp256k1/include
BITCOIN_INCLUDES += -I$(srcdir)/univalue/include
BITCOIN_INCLUDES += -I$(srcdir)/rsm

LIBBITCOIN_SERVER=libbitcoin_server.a
LIBSECP256K1=secp256k1/libsecp256k1.la
Expand Down Expand Up @@ -114,6 +115,7 @@ BITCOIN_CORE_H = \
rpc/rpcclient.h \
rpc/rpcprotocol.h \
rpc/rpcserver.h \
rsm/recursive_shared_mutex.h \
script/interpreter.h \
script/script.h \
script/script_error.h \
Expand Down Expand Up @@ -197,6 +199,7 @@ libbitcoin_server_a_SOURCES = \
rpc/rpcnet.cpp \
rpc/rpcrawtransaction.cpp \
rpc/rpcserver.cpp \
rsm/recursive_shared_mutex.cpp \
script/sigcache.cpp \
timedata.cpp \
torcontrol.cpp \
Expand Down
5 changes: 5 additions & 0 deletions src/Makefile.test.include
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ BITCOIN_TESTS = \
test/uint256_tests.cpp \
test/univalue_tests.cpp

BITCOIN_TESTS += \
rsm/test/rsm_promotion_tests.cpp \
rsm/test/rsm_simple_tests.cpp \
rsm/test/rsm_starvation_tests.cpp

BITCOIN_TESTS += \
wallet/test/wallet_tests.cpp
# wallet/test/walletdb_tests.cpp \
Expand Down
42 changes: 42 additions & 0 deletions src/rsm/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# cxx_recursive_shared_mutex

A recursive shared mutex for C++

__Behaviorial Overview__

The recursive shared mutex has the following behavior implemented:
- This mutex has two levels of access, shared and exclusive. Multiple threads can own this mutex in shared mode but only one can own it in exclusive mode.
- A thread is considered to have ownership when it successfully calls either lock or try_lock (for either level of access).
- A thread may recursively call lock for ownership and must call a matching number of unlock calls to end ownership.
- A thread may call for shared ownership if it already has exclusive ownership without giving up exclusive ownership.
- There is internal tracking of how many times a thread locked for shared ownership. A thread can not unlock more times than it locked. Trying to do so will cause an assertion as this is a critical error somewhere in the locking logic.
- A thread may obtain exclusive ownership if no threads excluding itself have shared ownership by calling try_promotion(). Doing so while other threads have shared ownership will block until all other threads have released their shared ownership. Promoting ownership in this way will "jump the line" of other threads that waiting for exclusive ownership and will cause the thread with shared ownership to become the next thread to obtain exclusive ownership. To avoid deadlocks only one thread may attempt this ownership promotion at a time. If a thread has already done this and is currently waiting for promotion and a different thread tries to request promotion the try_promotion() call will return false.
- If a thread has exclusive ownership and checks if it has shared ownership we should should return true.


__NOTES__

- We use try_promotion() for promotions instead of lock() for two reasons:
- We want to be able to signal that we did not get the promotion. try_promotion() returns a boolean while lock() doesn't return anything.
- It is generally discouraged to have a lot of threads potentially calling and waiting for promotions because this creates a sort of race condition where the edited data set will be the same for the first thread to get promoted but all following threads aren't guaranteed to be editing the same data that was observed during shared ownership
- if the thread that has exclusive ownership got that ownership via promotion, another thread can not request a promotion to follow it. this prevents threads that use promotion to continuously "cut the line" for exclusive ownership.




__Development and Testing__

The `master` branch `rsm` folder should be stable at all times. To use rsm in your project just add the `rsm` folder to your project.
The `experimental` folder has changes that are in testing. To build the test suite, run Make. This should produce a binary named text_cxx_rsm.


__Requirements__

- C++14 or higher is required.
- Boost 1.55.0 or higher is recommended.

__Upstream__

The upstream repository can be found at: https://github.com/Greg-Griffith/cxx_recursive_shared_mutex

Initial development and feedback can be found at: https://github.com/BitcoinUnlimited/BitcoinUnlimited/pull/1591
313 changes: 313 additions & 0 deletions src/rsm/recursive_shared_mutex.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
// Copyright (c) 2019 Greg Griffith
// Copyright (c) 2019 The Bitcoin Unlimited developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

#include "recursive_shared_mutex.h"

////////////////////////
///
/// Private Functions
///

bool recursive_shared_mutex::end_of_exclusive_ownership()
{
return (_shared_while_exclusive_counter == 0 && _write_counter == 0);
}

bool recursive_shared_mutex::check_for_write_lock(const std::thread::id &locking_thread_id)
{
return (_write_owner_id == locking_thread_id);
}

bool recursive_shared_mutex::check_for_write_unlock(const std::thread::id &locking_thread_id)
{
if (_write_owner_id == locking_thread_id)
{
if (_shared_while_exclusive_counter == 0)
{
#ifdef DEBUG_ASSERTION
throw std::logic_error("can not unlock_shared more times than we locked for shared ownership while holding "
"exclusive ownership");
#else
return true;
#endif
}
return true;
}
return false;
}

bool recursive_shared_mutex::already_has_lock_shared(const std::thread::id &locking_thread_id)
{
return (_read_owner_ids.find(locking_thread_id) != _read_owner_ids.end());
}

void recursive_shared_mutex::lock_shared_internal(const std::thread::id &locking_thread_id, const uint64_t &count)
{
auto it = _read_owner_ids.find(locking_thread_id);
if (it == _read_owner_ids.end())
{
_read_owner_ids.emplace(locking_thread_id, count);
}
else
{
it->second = it->second + count;
}
}

void recursive_shared_mutex::unlock_shared_internal(const std::thread::id &locking_thread_id, const uint64_t &count)
{
auto it = _read_owner_ids.find(locking_thread_id);
if (it == _read_owner_ids.end())
{
#ifdef DEBUG_ASSERTION
throw std::logic_error("can not unlock_shared more times than we locked for shared ownership");
#else
return;
#endif
}
it->second = it->second - count;
if (it->second == 0)
{
_read_owner_ids.erase(it);
}
}

////////////////////////
///
/// Public Functions
///

void recursive_shared_mutex::lock()
{
const std::thread::id &locking_thread_id = std::this_thread::get_id();
std::unique_lock<std::mutex> _lock(_mutex);
if (_write_owner_id == locking_thread_id)
{
_write_counter++;
}
else
{
// Wait until we can set the write-entered.
_read_gate.wait(_lock, [this] { return end_of_exclusive_ownership(); });

_write_counter++;
// Then wait until there are no more readers.
_write_gate.wait(
_lock, [this] { return _read_owner_ids.size() == 0 && _promotion_candidate_id == NON_THREAD_ID; });
_write_owner_id = locking_thread_id;
}
}

bool recursive_shared_mutex::try_promotion()
{
const std::thread::id &locking_thread_id = std::this_thread::get_id();
std::unique_lock<std::mutex> _lock(_mutex);

if (_write_owner_id == locking_thread_id)
{
_write_counter++;
return true;
}
// checking _write_owner_id might be redundant here with the mutex already being locked
// check if write_counter == 0 to ensure data consistency after promotion
else if (_promotion_candidate_id == NON_THREAD_ID)
{
_promotion_candidate_id = locking_thread_id;
// Then wait until there are no more readers.
_promotion_write_gate.wait(_lock,
[this] { return _read_owner_ids.size() == 1 && already_has_lock_shared(std::this_thread::get_id()); });
_write_owner_id = locking_thread_id;
// it is possible that if we cut the line, another thread could have incremented the _write_counter
// already, so we should check this and decrement + save what they did
if (_write_counter != 0)
{
_write_counter_reserve = _write_counter;
_write_counter = 0;
}
// now increment the _write_counter for our own use
_write_counter++;
return true;
}
return false;
}

bool recursive_shared_mutex::try_lock()
{
const std::thread::id &locking_thread_id = std::this_thread::get_id();
std::unique_lock<std::mutex> _lock(_mutex, std::try_to_lock);

if (_write_owner_id == locking_thread_id)
{
_write_counter++;
return true;
}
else if (_lock.owns_lock() && end_of_exclusive_ownership() && _read_owner_ids.size() == 0 &&
_promotion_candidate_id == NON_THREAD_ID)
{
_write_counter++;
_write_owner_id = locking_thread_id;
return true;
}
return false;
}

void recursive_shared_mutex::unlock()
{
const std::thread::id &locking_thread_id = std::this_thread::get_id();
std::lock_guard<std::mutex> _lock(_mutex);
// you cannot unlock if you are not the write owner so check that here
// this might be redundant with the mutex being locked
if (_write_counter == 0 || _write_owner_id != locking_thread_id)
{
#ifdef DEBUG_ASSERTION
throw std::logic_error("unlock(standard logic) incorrectly called on a thread with no exclusive lock");
#else
return;
#endif
}
if (_promotion_candidate_id != NON_THREAD_ID && _write_owner_id != _promotion_candidate_id)
{
#ifdef DEBUG_ASSERTION
throw std::logic_error("unlock(promotion logic) incorrectly called on a thread with no exclusive lock");
#else
return;
#endif
}
if (_promotion_candidate_id != NON_THREAD_ID)
{
_write_counter--;
if (_write_counter == 0)
{
#ifdef DEBUG_ASSERTION
assert(_shared_while_exclusive_counter == 0);
#endif
if (_shared_while_exclusive_counter > 0)
{
lock_shared_internal(locking_thread_id, _shared_while_exclusive_counter);
_shared_while_exclusive_counter = 0;
}
// reset the write owner id back to a non thread id once we unlock all write locks
_write_owner_id = NON_THREAD_ID;
_promotion_candidate_id = NON_THREAD_ID;
// call notify_all() while mutex is held so that another thread can't
// lock and unlock the mutex then destroy *this before we make the call.

// it is possible that if we cut the line, another thread could have incremented the _write_counter
// already, restore what they did
if (_write_counter_reserve != 0)
{
_write_counter = _write_counter_reserve;
_write_counter_reserve = 0;
}

_read_gate.notify_all();
}
}
else
{
_write_counter--;
#ifdef DEBUG_ASSERTION
assert(_write_counter_reserve == 0);
#endif
if (end_of_exclusive_ownership())
{
// reset the write owner id back to a non thread id once we unlock all write locks
_write_owner_id = NON_THREAD_ID;
// call notify_all() while mutex is held so that another thread can't
// lock and unlock the mutex then destroy *this before we make the call.

_read_gate.notify_all();
}
}
}

void recursive_shared_mutex::lock_shared()
{
const std::thread::id &locking_thread_id = std::this_thread::get_id();
std::unique_lock<std::mutex> _lock(_mutex);
if (check_for_write_lock(locking_thread_id))
{
_shared_while_exclusive_counter++;
return;
}
if (already_has_lock_shared(locking_thread_id))
{
lock_shared_internal(locking_thread_id);
}
else
{
_read_gate.wait(
_lock, [this] { return end_of_exclusive_ownership() && _promotion_candidate_id == NON_THREAD_ID; });
lock_shared_internal(locking_thread_id);
}
}

bool recursive_shared_mutex::try_lock_shared()
{
const std::thread::id &locking_thread_id = std::this_thread::get_id();
std::unique_lock<std::mutex> _lock(_mutex, std::try_to_lock);
if (check_for_write_lock(locking_thread_id))
{
_shared_while_exclusive_counter++;
return true;
}
if (already_has_lock_shared(locking_thread_id))
{
lock_shared_internal(locking_thread_id);
return true;
}
if (!_lock.owns_lock())
{
return false;
}
if (end_of_exclusive_ownership() && _promotion_candidate_id == NON_THREAD_ID)
{
lock_shared_internal(locking_thread_id);
return true;
}
return false;
}

void recursive_shared_mutex::unlock_shared()
{
const std::thread::id &locking_thread_id = std::this_thread::get_id();
std::lock_guard<std::mutex> _lock(_mutex);
if (check_for_write_unlock(locking_thread_id))
{
_shared_while_exclusive_counter--;
return;
}
if (_read_owner_ids.size() == 0)
{
#ifdef DEBUG_ASSERTION
throw std::logic_error("unlock_shared incorrectly called on a thread with no shared lock");
#else
return;
#endif
}
unlock_shared_internal(locking_thread_id);
if (_promotion_candidate_id != NON_THREAD_ID)
{
if (_read_owner_ids.size() == 1 && already_has_lock_shared(_promotion_candidate_id))
{
_promotion_write_gate.notify_one();
}
else
{
_read_gate.notify_one();
}
}
else if (_write_counter != 0 && _promotion_candidate_id == NON_THREAD_ID)
{
if (_read_owner_ids.size() == 0)
{
_write_gate.notify_one();
}
else
{
_read_gate.notify_one();
}
}
}
Loading