Skip to content

Commit

Permalink
Fix issue #255 - Incorrect concurrent increment of value with `std::s…
Browse files Browse the repository at this point in the history
…hared_mutex`.
  • Loading branch information
greg7mdp committed Oct 28, 2024
1 parent 4817a6d commit c9520f1
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 26 deletions.
36 changes: 10 additions & 26 deletions parallel_hashmap/phmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -2616,7 +2616,6 @@ class parallel_hash_set
using UniqueLock = typename Lockable::UniqueLock;
using SharedLock = typename Lockable::SharedLock;
using ReadWriteLock = typename Lockable::ReadWriteLock;


// --------------------------------------------------------------------
struct Inner : public Lockable
Expand Down Expand Up @@ -3178,14 +3177,9 @@ class parallel_hash_set
{
Inner& inner = sets_[subidx(hashval)];
auto& set = inner.set_;
ReadWriteLock m(inner);
UniqueLock m(inner);

size_t offset = set._find_key(key, hashval);
if (offset == (size_t)-1 && m.switch_to_unique()) {
// we did an unlock/lock, and another thread could have inserted the same key, so we need to
// do a find() again.
offset = set._find_key(key, hashval);
}
if (offset == (size_t)-1) {
offset = set.prepare_insert(hashval);
set.emplace_at(offset, std::forward<Args>(args)...);
Expand Down Expand Up @@ -3268,13 +3262,8 @@ class parallel_hash_set
iterator lazy_emplace_with_hash(const key_arg<K>& key, size_t hashval, F&& f) {
Inner& inner = sets_[subidx(hashval)];
auto& set = inner.set_;
ReadWriteLock m(inner);
UniqueLock m(inner);
size_t offset = set._find_key(key, hashval);
if (offset == (size_t)-1 && m.switch_to_unique()) {
// we did an unlock/lock, and another thread could have inserted the same key, so we need to
// do a find() again.
offset = set._find_key(key, hashval);
}
if (offset == (size_t)-1) {
offset = set.prepare_insert(hashval);
set.lazy_emplace_at(offset, std::forward<F>(f));
Expand Down Expand Up @@ -3389,7 +3378,7 @@ class parallel_hash_set
template <class K = key_type, class FExists, class FEmplace>
bool lazy_emplace_l(const key_arg<K>& key, FExists&& fExists, FEmplace&& fEmplace) {
size_t hashval = this->hash(key);
ReadWriteLock m;
UniqueLock m;
auto res = this->find_or_prepare_insert_with_hash(hashval, key, m);
Inner* inner = std::get<0>(res);
if (std::get<2>(res)) {
Expand Down Expand Up @@ -3843,16 +3832,11 @@ class parallel_hash_set

template <class K>
std::tuple<Inner*, size_t, bool>
find_or_prepare_insert_with_hash(size_t hashval, const K& key, ReadWriteLock &mutexlock) {
find_or_prepare_insert_with_hash(size_t hashval, const K& key, UniqueLock &mutexlock) {
Inner& inner = sets_[subidx(hashval)];
auto& set = inner.set_;
mutexlock = std::move(ReadWriteLock(inner));
mutexlock = std::move(UniqueLock(inner));
size_t offset = set._find_key(key, hashval);
if (offset == (size_t)-1 && mutexlock.switch_to_unique()) {
// we did an unlock/lock, and another thread could have inserted the same key, so we need to
// do a find() again.
offset = set._find_key(key, hashval);
}
if (offset == (size_t)-1) {
offset = set.prepare_insert(hashval);
return std::make_tuple(&inner, offset, true);
Expand All @@ -3862,7 +3846,7 @@ class parallel_hash_set

template <class K>
std::tuple<Inner*, size_t, bool>
find_or_prepare_insert(const K& key, ReadWriteLock &mutexlock) {
find_or_prepare_insert(const K& key, UniqueLock &mutexlock) {
return find_or_prepare_insert_with_hash<K>(this->hash(key), key, mutexlock);
}

Expand Down Expand Up @@ -4084,7 +4068,7 @@ class parallel_hash_map : public parallel_hash_set<N, RefSet, Mtx_, Policy, Hash
template <class K = key_type, class F, class... Args>
bool try_emplace_l(K&& k, F&& f, Args&&... args) {
size_t hashval = this->hash(k);
ReadWriteLock m;
UniqueLock m;
auto res = this->find_or_prepare_insert_with_hash(hashval, k, m);
typename Base::Inner *inner = std::get<0>(res);
if (std::get<2>(res)) {
Expand All @@ -4105,7 +4089,7 @@ class parallel_hash_map : public parallel_hash_set<N, RefSet, Mtx_, Policy, Hash
template <class K = key_type, class... Args>
std::pair<typename parallel_hash_map::parallel_hash_set::pointer, bool> try_emplace_p(K&& k, Args&&... args) {
size_t hashval = this->hash(k);
ReadWriteLock m;
UniqueLock m;
auto res = this->find_or_prepare_insert_with_hash(hashval, k, m);
typename Base::Inner *inner = std::get<0>(res);
if (std::get<2>(res)) {
Expand Down Expand Up @@ -4135,7 +4119,7 @@ class parallel_hash_map : public parallel_hash_set<N, RefSet, Mtx_, Policy, Hash
template <class K, class V>
std::pair<iterator, bool> insert_or_assign_impl(K&& k, V&& v) {
size_t hashval = this->hash(k);
ReadWriteLock m;
UniqueLock m;
auto res = this->find_or_prepare_insert_with_hash(hashval, k, m);
typename Base::Inner *inner = std::get<0>(res);
if (std::get<2>(res)) {
Expand All @@ -4155,7 +4139,7 @@ class parallel_hash_map : public parallel_hash_set<N, RefSet, Mtx_, Policy, Hash

template <class K = key_type, class... Args>
std::pair<iterator, bool> try_emplace_impl_with_hash(size_t hashval, K&& k, Args&&... args) {
ReadWriteLock m;
UniqueLock m;
auto res = this->find_or_prepare_insert_with_hash(hashval, k, m);
typename Base::Inner *inner = std::get<0>(res);
if (std::get<2>(res)) {
Expand Down
48 changes: 48 additions & 0 deletions tests/parallel_flat_hash_map_test.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,52 @@
#define THIS_HASH_MAP parallel_flat_hash_map
#define THIS_TEST_NAME ParallelFlatHashMap
#include <thread>

#include "parallel_hash_map_test.cc"


#if PHMAP_HAVE_SHARED_MUTEX
#include <shared_mutex>

template <typename K> using HashEqual = phmap::priv::hash_default_eq<K>;
template <typename V> using HashFn = phmap::priv::hash_default_hash<V>;
template <typename K> using Allocator = phmap::priv::Allocator<K>;

template <typename K, typename V, size_t N>
using parallel_flat_hash_map =
phmap::parallel_flat_hash_map<K, V, HashFn<K>, HashEqual<K>,
Allocator<phmap::priv::Pair<K, V>>, N,
std::shared_mutex>;

using Table = parallel_flat_hash_map<int, int, 10>;

TEST(THIS_TEST_NAME, ConcurrencyCheck) {
static constexpr int THREADS = 10;
static constexpr int EPOCH = 1000;
static constexpr int KEY = 12345;

auto Incr = [](Table *table) {
auto exist_fn = [](typename Table::value_type &value) { value.second += 1; };
auto emplace_fn = [](const typename Table::constructor &ctor) {
ctor(KEY, 1);
};
for (int i = 0; i < EPOCH; ++i) {
(void)table->lazy_emplace_l(KEY, exist_fn, emplace_fn);
}
};

Table table;
std::vector<std::thread> threads;
threads.reserve(THREADS);
for (int i = 0; i < THREADS; ++i) {
threads.emplace_back([&]() { Incr(&table); });
}

for (auto &thread : threads) {
thread.join();
}

EXPECT_EQ(table[KEY], 10000);
}

#endif

0 comments on commit c9520f1

Please sign in to comment.