Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
umegane committed Jan 9, 2025
1 parent 9e4e969 commit ddc42a8
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 15 deletions.
1 change: 1 addition & 0 deletions src/limestone/logging_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#pragma once

#include <glog/logging.h>
#include <array>
#include <string_view>
#include <thread>
Expand Down
21 changes: 14 additions & 7 deletions test/limestone/epoch/race_condition_test_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
*/

#include "race_condition_test_manager.h"
#include "logging_helper.h"

namespace limestone::testing {

thread_local size_t race_condition_test_manager::thread_local_id = 0;

race_condition_test_manager::race_condition_test_manager(std::vector<std::pair<test_method, size_t>> test_methods)
: test_methods_(std::move(test_methods)) {}

Expand All @@ -27,12 +30,13 @@ void race_condition_test_manager::set_random_seed(unsigned int seed) {

void race_condition_test_manager::run() {
std::lock_guard<std::mutex> lock(mutex_);
size_t thread_id_counter = 0;
size_t thread_id_counter = 100;
for (const auto& [method, count] : test_methods_) {
for (size_t i = 0; i < count; ++i) {
size_t current_id = thread_id_counter++;
threads_.emplace_back([this, method, current_id]() {
static thread_local size_t thread_local_id = current_id;
thread_local_id = current_id;
VLOG_LP(50) << "Thread " << thread_local_id << " started.";
try {
method();
thread_completed(current_id);
Expand All @@ -51,17 +55,19 @@ void race_condition_test_manager::run() {
}

void race_condition_test_manager::wait_at_hook(const std::string& hook_name) {
static thread_local size_t thread_local_id; // Thread-local ID
size_t local_thread_id = thread_local_id; // Copy the thread-local variable
size_t tid = thread_local_id; // Use the already-initialized thread_local_id
{
VLOG_LP(50) << "Thread " << tid << " waiting at hook: " << hook_name;
std::unique_lock<std::mutex> lock(mutex_);
pending_threads_.emplace(local_thread_id, hook_name);
pending_threads_.emplace(tid, hook_name);
cv_.notify_all();
}

{
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this, local_thread_id]() { return resumed_threads_.count(local_thread_id) > 0; });
resumed_threads_.erase(local_thread_id);
cv_.wait(lock, [this, tid]() { return resumed_threads_.count(tid) > 0; });
resumed_threads_.erase(tid);
VLOG_LP(50) << "Thread " << tid << " resumed from hook: " << hook_name;
}
}

Expand All @@ -87,6 +93,7 @@ void race_condition_test_manager::generate_and_set_random_seed() {
}

void race_condition_test_manager::wait_for_all_threads_to_pause_or_complete() {
VLOG_LP(50) << "Waiting for all threads to pause or complete.";
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this]() {
return (pending_threads_.size() + threads_completed_ == threads_.size());
Expand Down
1 change: 1 addition & 0 deletions test/limestone/epoch/race_condition_test_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class race_condition_test_manager {
void join_all_threads();

private:
static thread_local size_t thread_local_id;
std::vector<std::pair<test_method, size_t>> test_methods_;
std::mutex mutex_;
std::condition_variable cv_;
Expand Down
67 changes: 59 additions & 8 deletions test/limestone/epoch/race_detection_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ namespace limestone::testing {
static constexpr const char* OPERATION_WRITE_EPOCH = "write_epoch";
static constexpr const char* OPERATION_PERSIST_CALLBACK = "persist_callback";

// General utility for setting callbacks
template <typename Hook>
void set_callback(std::shared_ptr<limestone::testing::race_condition_test_manager> manager,
Hook& callback, const char* hook_name) {
callback = [manager, hook_name]() {
manager->wait_at_hook(hook_name);
};
}
class my_datastore : public datastore_test {
public:
explicit my_datastore(const configuration& conf) : datastore_test(conf) {
Expand Down Expand Up @@ -163,6 +171,33 @@ class my_datastore : public datastore_test {
}
}

/**
* @brief Registers a `race_condition_test_manager` instance for hook-based synchronization.
*
* This method should be called during the initialization of the `my_datastore` instance.
* Since it is expected to be called in a single-threaded context during initialization,
* thread safety is not required.
*
* @param manager The `race_condition_test_manager` instance to register.
*/
void register_race_condition_manager(
std::shared_ptr<limestone::testing::race_condition_test_manager> manager) {
race_condition_manager_ = manager;

set_callback(manager, on_begin_session_current_epoch_id_store_callback, "on_begin_session_current_epoch_id_store");
set_callback(manager, on_end_session_finished_epoch_id_store_callback, "on_end_session_finished_epoch_id_store");
set_callback(manager, on_end_session_current_epoch_id_store_callback, "on_end_session_current_epoch_id_store");
set_callback(manager, on_switch_epoch_epoch_id_switched_store_callback, "on_switch_epoch_epoch_id_switched_store");
set_callback(manager, on_update_min_epoch_id_epoch_id_switched_load_callback, "on_update_min_epoch_id_epoch_id_switched_load");
set_callback(manager, on_update_min_epoch_id_current_epoch_id_load_callback, "on_update_min_epoch_id_current_epoch_id_load");
set_callback(manager, on_update_min_epoch_id_finished_epoch_id_load_callback, "on_update_min_epoch_id_finished_epoch_id_load");
set_callback(manager, on_update_min_epoch_id_epoch_id_to_be_recorded_load_callback, "on_update_min_epoch_id_epoch_id_to_be_recorded_load");
set_callback(manager, on_update_min_epoch_id_epoch_id_to_be_recorded_cas_callback, "on_update_min_epoch_id_epoch_id_to_be_recorded_cas");
set_callback(manager, on_update_min_epoch_id_epoch_id_record_finished_load_callback, "on_update_min_epoch_id_epoch_id_record_finished_load");
set_callback(manager, on_update_min_epoch_id_epoch_id_informed_load_1_callback, "on_update_min_epoch_id_epoch_id_informed_load_1");
set_callback(manager, on_update_min_epoch_id_epoch_id_informed_cas_callback, "on_update_min_epoch_id_epoch_id_informed_cas");
set_callback(manager, on_update_min_epoch_id_epoch_id_informed_load_2_callback, "on_update_min_epoch_id_epoch_id_informed_load_2");
}

private:
void record_written_epoch(epoch_id_type epoch) {
Expand All @@ -187,6 +222,18 @@ class my_datastore : public datastore_test {
std::vector<epoch_id_type> written_epochs_; // Stores successfully written epochs
std::vector<epoch_id_type> persisted_epochs_; // Stores successfully persisted epochs
std::vector<std::pair<std::string, epoch_id_type>> operation_log_; // Logs operations and epochs

/**
* @brief Invokes a hook in the registered race condition test manager, if available.
* @param hook_name The name of the hook to invoke.
*/
void invoke_hook(const std::string& hook_name) {
if (race_condition_manager_) {
race_condition_manager_->wait_at_hook(hook_name);
}
}

std::shared_ptr<limestone::testing::race_condition_test_manager> race_condition_manager_;
};


Expand Down Expand Up @@ -253,27 +300,31 @@ class race_detection_test : public ::testing::Test {

TEST_F(race_detection_test, example) {
// TestManager の初期化
race_condition_test_manager manager({
{ [this]() { switch_epoch(); }, 5 },
auto manager = std::make_shared<race_condition_test_manager>(std::vector<std::pair<std::function<void()>, size_t>>{
{ [this]() { switch_epoch(); }, 1 },
{ [this]() { write_to_log_channel0(); }, 1 },
{ [this]() { write_to_log_channel1(); }, 1 }
});

manager.run();
datastore_->register_race_condition_manager(manager);
FLAGS_v = 50;

manager->run();

// 全てのスレッドが待機または終了するまで待つ
manager.wait_for_all_threads_to_pause_or_complete();
manager->wait_for_all_threads_to_pause_or_complete();

// スレッドを順番に再開
while (!manager.all_threads_completed()) {
manager.resume_one_thread();
manager.wait_for_all_threads_to_pause_or_complete();
while (!manager->all_threads_completed()) {
manager->resume_one_thread();
manager->wait_for_all_threads_to_pause_or_complete();
}

manager.join_all_threads();
manager->join_all_threads();
}



TEST_F(race_detection_test, race_detection_behavior_test) {
EXPECT_EQ(lc0_->current_epoch_id(), UINT64_MAX);
EXPECT_EQ(lc0_->finished_epoch_id(), 0);
Expand Down

0 comments on commit ddc42a8

Please sign in to comment.