Skip to content

Commit

Permalink
Refactor test_concur_concurrent.cpp to add dirty write test
Browse files Browse the repository at this point in the history
  • Loading branch information
mutouyun committed Aug 10, 2024
1 parent 7e1731c commit c1dc10e
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 24 deletions.
41 changes: 21 additions & 20 deletions include/libconcur/concurrent.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,8 @@ struct producer<trans::broadcast, relation::single> {
hdr.w_idx.fetch_add(1, std::memory_order_release);
// Set data & flag.
elem.set_flag(w_idx | state::enqueue_mask);
elem.set_data(std::forward<U>(src)); // Here should not be interrupted.
elem.set_flag(w_idx | state::commit_mask);
elem.set_data(std::forward<U>(src));
elem.set_flag(w_idx);
return true;
}
};
Expand All @@ -336,43 +336,44 @@ template <>
struct producer<trans::broadcast, relation::multi> {

struct header_impl {
std::atomic<state::flag_t> w_flags {0}; ///< write flags, combined current and starting index.
private: padding<decltype(w_flags)> ___;
std::atomic<std::uint64_t> w_contexts {0}; ///< write contexts, combined current and starting index.
private: padding<decltype(w_contexts)> ___;

public:
void get(index_t &idx, index_t &beg) const noexcept {
auto w_flags = this->w_flags.load(std::memory_order_relaxed);
idx = get_index(w_flags);
beg = get_begin(w_flags);
auto w_contexts = this->w_contexts.load(std::memory_order_relaxed);
idx = get_index(w_contexts);
beg = get_begin(w_contexts);
}
};

template <typename T, typename H, typename C, typename U,
verify_elems_header<H> = true,
convertible<H, header_impl> = true>
static bool enqueue(::LIBIMP::span<element<T>> elems, H &hdr, C &/*ctx*/, U &&src) noexcept {
auto w_flags = hdr.w_flags.load(std::memory_order_acquire);
auto w_contexts = hdr.w_contexts.load(std::memory_order_acquire);
index_t w_idx;
for (;;) {
w_idx = get_index(w_flags);
auto w_beg = get_begin(w_flags);
w_idx = get_index(w_contexts);
auto w_beg = get_begin(w_contexts);
// Move the queue head index.
if (w_beg + hdr.circ_size <= w_idx) {
w_beg += 1;
}
// Update flags.
auto n_flags = make_flags(w_idx + 1/*iterate backwards*/, w_beg);
if (hdr.w_flags.compare_exchange_weak(w_flags, n_flags, std::memory_order_acq_rel)) {
// Update write contexts.
auto n_contexts = make_w_contexts(w_idx + 1/*iterate backwards*/, w_beg);
if (hdr.w_contexts.compare_exchange_weak(w_contexts, n_contexts, std::memory_order_acq_rel)) {
break;
}
}
// Get element.
auto w_cur = trunc_index(hdr, w_idx);
auto &elem = elems[w_cur];
// Set data & flag.
// Set data & flag. Dirty write is not considered here.
// By default, when dirty write occurs, the previous writer must no longer exist.
elem.set_flag(w_idx | state::enqueue_mask);
elem.set_data(std::forward<U>(src)); // Here should not be interrupted.
elem.set_flag(w_idx | state::commit_mask);
elem.set_data(std::forward<U>(src));
elem.set_flag(w_idx);
return true;
}

Expand All @@ -387,8 +388,8 @@ struct producer<trans::broadcast, relation::multi> {
return index_t(flags >> (sizeof(index_t) * CHAR_BIT));
}

static constexpr state::flag_t make_flags(index_t idx, index_t beg) noexcept {
return state::flag_t(idx) | (state::flag_t(beg) << (sizeof(index_t) * CHAR_BIT));
static constexpr std::uint64_t make_w_contexts(index_t idx, index_t beg) noexcept {
return std::uint64_t(idx) | (std::uint64_t(beg) << (sizeof(index_t) * CHAR_BIT));
}
};

Expand Down Expand Up @@ -426,16 +427,16 @@ struct consumer<trans::broadcast, relation::multi> {
}
// Try getting data.
for (;;) {
if ((f_ct & state::enqueue_mask) == state::enqueue_mask) {
if (f_ct & state::enqueue_mask) {
return false; // unreadable
}
des = LIBCONCUR::get(elem);
// Correct data can be obtained only if
// the elem data is not modified during the getting process.
if (elem.cas_flag(f_ct, f_ct)) break;
}
ctx.w_lst = (f_ct & ~state::enqueue_mask) + 1;
// Get a valid index and iterate backwards.
ctx.w_lst = index_t(f_ct) + 1;
ctx.r_idx += 1;
return true;
}
Expand Down
1 change: 0 additions & 1 deletion include/libconcur/element.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ using flag_t = std::uint64_t;
enum : flag_t {
invalid_value = ~flag_t(0),
enqueue_mask = invalid_value << 32,
commit_mask = ~flag_t(1) << 32,
};

} // namespace state
Expand Down
61 changes: 60 additions & 1 deletion test/concur/test_concur_concurrent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include "libimp/log.h"
#include "libimp/nameof.h"

#include "test_util.h"

TEST(concurrent, cache_line_size) {
std::cout << concur::cache_line_size << "\n";
EXPECT_TRUE(concur::cache_line_size >= alignof(std::max_align_t));
Expand Down Expand Up @@ -311,4 +313,61 @@ TEST(concurrent, broadcast) {

/// \brief 8-8
test_broadcast<prod_cons<trans::broadcast, relation::multi , relation::multi>>(8, 8);
}
}

TEST(concurrent, broadcast_multi_dirtywrite) {
using namespace concur;

struct data {
std::uint64_t n{};

data &operator=(test::latch &l) noexcept {
l.arrive_and_wait();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
n = 1;
return *this;
}

data &operator=(data const &rhs) noexcept {
n = rhs.n;
return *this;
}
};

element<data> circ[2] {};
prod_cons<trans::broadcast, relation::multi, relation::multi> pc;
typename traits<decltype(pc)>::header hdr {imp::make_span(circ)};

auto push_one = [&, ctx = typename concur::traits<decltype(pc)>::context{}](auto &i) mutable {
return pc.enqueue(imp::make_span(circ), hdr, ctx, i);
};
auto pop_one = [&, ctx = typename concur::traits<decltype(pc)>::context{}]() mutable {
data i;
if (pc.dequeue(imp::make_span(circ), hdr, ctx, i)) {
return i;
}
return data{};
};

test::latch l(2);
std::thread t[2];
t[0] = std::thread([&] {
push_one(l); // 1
});
t[1] = std::thread([&] {
l.arrive_and_wait();
push_one(data{3});
push_one(data{2}); // dirty write
});

for (int i = 0; i < 2; ++i) {
t[i].join();
}
std::set<std::uint64_t> s{1, 2, 3};
for (int i = 0; i < 2; ++i) {
auto d = pop_one();
EXPECT_TRUE(s.find(d.n) != s.end());
s.erase(d.n);
}
EXPECT_TRUE(s.find(3) == s.end());
}
49 changes: 47 additions & 2 deletions test/test_util.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@

#pragma once

#include <sys/wait.h>
#include <unistd.h>
#include "libimp/detect_plat.h"
#ifndef LIBIMP_OS_WIN
# include <sys/wait.h>
# include <unistd.h>
#else
# define pid_t int
#endif

#include <condition_variable>
#include <mutex>

namespace test {

template <typename Fn>
pid_t subproc(Fn&& fn) {
#ifndef LIBIMP_OS_WIN
pid_t pid = fork();
if (pid == -1) {
return pid;
Expand All @@ -21,13 +30,49 @@ pid_t subproc(Fn&& fn) {
exit(0);
}
return pid;
#else
return -1;
#endif
}

inline void join_subproc(pid_t pid) {
#ifndef LIBIMP_OS_WIN
int ret_code;
waitpid(pid, &ret_code, 0);
#endif
}

/// \brief A simple latch implementation.
class latch {
public:
explicit latch(int count) : count_(count) {}

void count_down() {
std::unique_lock<std::mutex> lock(mutex_);
if (count_ > 0) {
--count_;
if (count_ == 0) {
cv_.notify_all();
}
}
}

void wait() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [&] { return count_ == 0; });
}

void arrive_and_wait() {
count_down();
wait();
}

private:
std::mutex mutex_;
std::condition_variable cv_;
int count_;
};

} // namespace test

#define REQUIRE_EXIT(...) \
Expand Down

0 comments on commit c1dc10e

Please sign in to comment.