Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev umegane 1105 #79

Merged
merged 9 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 50 additions & 6 deletions include/limestone/api/datastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ class datastore {
protected: // for tests
auto& log_channels_for_tests() const noexcept { return log_channels_; }
auto epoch_id_informed_for_tests() const noexcept { return epoch_id_informed_.load(); }
auto epoch_id_recorded_for_tests() const noexcept { return epoch_id_to_be_recorded_.load(); }
auto epoch_id_to_be_recorded_for_tests() const noexcept { return epoch_id_to_be_recorded_.load(); }
auto epoch_id_record_finished_for_tests() const noexcept { return epoch_id_record_finished_.load(); }
auto epoch_id_switched_for_tests() const noexcept { return epoch_id_switched_.load(); }
auto& files_for_tests() const noexcept { return files_; }
void rotate_epoch_file_for_tests() { rotate_epoch_file(); }
Expand All @@ -257,12 +258,55 @@ class datastore {
// They allow derived classes to inject custom behavior or notifications
// at specific wait points during the execution of the datastore class.
// The default implementation does nothing, ensuring no impact on production code.
virtual void on_wait1() {}
virtual void on_wait2() {}
virtual void on_wait3() {}
virtual void on_wait4() {}
virtual void on_rotate_log_files() noexcept {}
virtual void on_begin_session_current_epoch_id_store() noexcept {}
virtual void on_end_session_finished_epoch_id_store() noexcept {}
virtual void on_end_session_current_epoch_id_store() noexcept {}
virtual void on_switch_epoch_epoch_id_switched_store() noexcept {}
virtual void on_update_min_epoch_id_epoch_id_switched_load() noexcept {}
virtual void on_update_min_epoch_id_current_epoch_id_load() noexcept {}
virtual void on_update_min_epoch_id_finished_epoch_id_load() noexcept {}
virtual void on_update_min_epoch_id_epoch_id_to_be_recorded_load() noexcept {}
virtual void on_update_min_epoch_id_epoch_id_to_be_recorded_cas() noexcept {}
virtual void on_update_min_epoch_id_epoch_id_record_finished_load() noexcept {}
virtual void on_update_min_epoch_id_epoch_id_informed_load_1() noexcept {}
virtual void on_update_min_epoch_id_epoch_id_informed_cas() noexcept {}
virtual void on_update_min_epoch_id_epoch_id_informed_load_2() noexcept {}

/**
* @brief Sets the callback function for writing epoch to a file.
* @details
* This method allows you to override the default behavior for writing epoch
* information by providing a custom callback. The callback can be a free
* function, a lambda, or a member function bound to an object.
*
* Example:
* @code
* class CustomHandler {
* public:
* void custom_epoch_writer(epoch_id_type epoch) {
* // Custom logic
* }
* };
*
* datastore ds;
* CustomHandler handler;
* ds.set_write_epoch_callback([&handler](epoch_id_type epoch) {
* handler.custom_epoch_writer(epoch);
* });
* @endcode
*
* @param callback The new callback function to use for writing epoch.
*/
void set_write_epoch_callback(std::function<void(epoch_id_type)> callback) {
write_epoch_callback_ = std::move(callback);
}

private:
std::function<void(epoch_id_type)> write_epoch_callback_{
[this](epoch_id_type epoch) { this->write_epoch_to_file(epoch); }
};

std::vector<std::unique_ptr<log_channel>> log_channels_;

boost::filesystem::path location_{};
Expand Down Expand Up @@ -367,7 +411,7 @@ class datastore {
// File descriptor for file lock (flock) on the manifest file
int fd_for_flock_{-1};

void write_epoch_to_file(epoch_id_type epoch_id);
virtual void write_epoch_to_file(epoch_id_type epoch_id);

int epoch_write_counter = 0;
};
Expand Down
15 changes: 13 additions & 2 deletions include/limestone/api/log_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,16 @@ class log_channel {
*/
[[nodiscard]] boost::filesystem::path file_path() const noexcept;

/**
* @brief this is for test purpose only, must not be used for any purpose other than testing
*/
[[nodiscard]] auto current_epoch_id() const noexcept { return current_epoch_id_.load(); }

/**
* @brief this is for test purpose only, must not be used for any purpose other than testing
*/
[[nodiscard]] auto finished_epoch_id() const noexcept { return finished_epoch_id_.load(); }

private:
datastore& envelope_;

Expand All @@ -183,10 +193,11 @@ class log_channel {

std::atomic_uint64_t finished_epoch_id_{0};

log_channel(boost::filesystem::path location, std::size_t id, datastore& envelope) noexcept;

std::string do_rotate_file(epoch_id_type epoch = 0);

protected: // Protected to allow testing with derived classes
log_channel(boost::filesystem::path location, std::size_t id, datastore& envelope) noexcept;

friend class datastore;
friend class rotation_task;
};
Expand Down
18 changes: 15 additions & 3 deletions src/limestone/datastore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ void datastore::ready() {
create_snapshot();
online_compaction_worker_future_ = std::async(std::launch::async, &datastore::online_compaction_worker, this);
if (epoch_id_switched_.load() != 0) {
write_epoch_to_file(epoch_id_informed_.load());
write_epoch_callback_(epoch_id_informed_.load());
}
cleanup_rotated_epoch_files(location_);
state_ = state::ready;
Expand Down Expand Up @@ -224,6 +224,7 @@ void datastore::switch_epoch(epoch_id_type new_epoch_id) {
LOG_LP(WARNING) << "switch to epoch_id_type of " << neid << " (<=" << switched << ") is curious";
}

on_switch_epoch_epoch_id_switched_store(); // for testing
epoch_id_switched_.store(neid);
if (state_ != state::not_ready) {
update_min_epoch_id(true);
Expand All @@ -237,6 +238,8 @@ void datastore::switch_epoch(epoch_id_type new_epoch_id) {

void datastore::update_min_epoch_id(bool from_switch_epoch) { // NOLINT(readability-function-cognitive-complexity)
TRACE_START << "from_switch_epoch=" << from_switch_epoch;

on_update_min_epoch_id_epoch_id_switched_load(); // for testing
auto upper_limit = epoch_id_switched_.load();
if (upper_limit == 0) {
return; // If epoch_id_switched_ is zero, it means no epoch has been switched, so updating epoch_id_to_be_recorded_ and epoch_id_informed_ is unnecessary.
Expand All @@ -246,7 +249,9 @@ void datastore::update_min_epoch_id(bool from_switch_epoch) { // NOLINT(readabi
epoch_id_type max_finished_epoch = 0;

for (const auto& e : log_channels_) {
on_update_min_epoch_id_current_epoch_id_load(); // for testing
auto working_epoch = e->current_epoch_id_.load();
on_update_min_epoch_id_finished_epoch_id_load(); // for testing
auto finished_epoch = e->finished_epoch_id_.load();
if (working_epoch > finished_epoch && working_epoch != UINT64_MAX) {
if ((working_epoch - 1) < upper_limit) {
Expand All @@ -267,23 +272,27 @@ void datastore::update_min_epoch_id(bool from_switch_epoch) { // NOLINT(readabi
}

TRACE << "update epoch file part start with to_be_epoch = " << to_be_epoch;
on_update_min_epoch_id_epoch_id_to_be_recorded_load(); // for testing
auto old_epoch_id = epoch_id_to_be_recorded_.load();
while (true) {
if (old_epoch_id >= to_be_epoch) {
break;
}
on_update_min_epoch_id_epoch_id_to_be_recorded_cas(); // for testing
if (epoch_id_to_be_recorded_.compare_exchange_strong(old_epoch_id, to_be_epoch)) {
TRACE << "epoch_id_to_be_recorded_ updated to " << to_be_epoch;
on_update_min_epoch_id_epoch_id_to_be_recorded_load(); // for testing
std::lock_guard<std::mutex> lock(mtx_epoch_file_);
if (to_be_epoch < epoch_id_to_be_recorded_.load()) {
break;
}
write_epoch_to_file(static_cast<epoch_id_type>(to_be_epoch));
write_epoch_callback_(static_cast<epoch_id_type>(to_be_epoch));
epoch_id_record_finished_.store(to_be_epoch);
TRACE << "epoch_id_record_finished_ updated to " << to_be_epoch;
break;
}
}
on_update_min_epoch_id_epoch_id_record_finished_load(); // for testing
if (to_be_epoch > epoch_id_record_finished_.load()) {
TRACE << "skipping persistent callback part, to_be_epoch = " << to_be_epoch << ", epoch_id_record_finished_ = " << epoch_id_record_finished_.load();
TRACE_END;
Expand All @@ -296,14 +305,17 @@ void datastore::update_min_epoch_id(bool from_switch_epoch) { // NOLINT(readabi
// In `informed_epoch_`, the update restriction based on the `from_switch_epoch` condition is intentionally omitted.
// Due to the interface specifications of Shirakami, it is necessary to advance the epoch even if the log channel
// is not updated. This behavior differs from `recorded_epoch_` and should be maintained as such.
on_update_min_epoch_id_epoch_id_informed_load_1(); // for testing
old_epoch_id = epoch_id_informed_.load();
while (true) {
if (old_epoch_id >= to_be_epoch) {
break;
}
on_update_min_epoch_id_epoch_id_informed_cas(); // for testing
if (epoch_id_informed_.compare_exchange_strong(old_epoch_id, to_be_epoch)) {
TRACE << "epoch_id_informed_ updated to " << to_be_epoch;
{
on_update_min_epoch_id_epoch_id_informed_load_2(); // for testing
std::lock_guard<std::mutex> lock(mtx_epoch_persistent_callback_);
if (to_be_epoch < epoch_id_informed_.load()) {
break;
Expand Down Expand Up @@ -486,7 +498,7 @@ rotation_result datastore::rotate_log_files() {
}
TRACE << "epoch_id = " << epoch_id;
{
on_wait1(); // for testing
on_rotate_log_files(); // for testing
// Wait until epoch_id_informed_ is less than rotated_epoch_id to ensure safe rotation.
std::unique_lock<std::mutex> ul(informed_mutex);
while (epoch_id_informed_.load() < epoch_id) {
Expand Down
3 changes: 3 additions & 0 deletions src/limestone/log_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ void log_channel::begin_session() {
// This loop detects such inconsistencies and repeats until `current_epoch_id_`
// matches the latest value of `epoch_id_switched_`, ensuring consistency.
do {
envelope_.on_begin_session_current_epoch_id_store(); // for testing
current_epoch_id_.store(envelope_.epoch_id_switched_.load());
std::atomic_thread_fence(std::memory_order_acq_rel);
} while (current_epoch_id_.load() != envelope_.epoch_id_switched_.load());
Expand Down Expand Up @@ -88,8 +89,10 @@ void log_channel::end_session() {
if (fsync(fileno(strm_)) != 0) {
LOG_AND_THROW_IO_EXCEPTION("fsync failed", errno);
}
envelope_.on_end_session_finished_epoch_id_store(); // for testing
finished_epoch_id_.store(current_epoch_id_.load());
envelope_.update_min_epoch_id();
envelope_.on_end_session_current_epoch_id_store(); // for testing
current_epoch_id_.store(UINT64_MAX);

if (fclose(strm_) != 0) { // NOLINT(*-owning-memory)
Expand Down
1 change: 1 addition & 0 deletions src/limestone/logging_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#pragma once

#include <glog/logging.h>
#include <array>
#include <string_view>
#include <thread>
Expand Down
10 changes: 5 additions & 5 deletions test/limestone/compaction/compaction_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,24 +143,24 @@ class compaction_test : public ::testing::Test {
throw std::runtime_error("datastore_ must be of type datastore_test");
}

// Set up the on_wait1 callback to signal when rotate_log_files() reaches the wait point
test_datastore->on_wait1_callback = [&]() {
// Set up the on_rotate_log_files callback to signal when rotate_log_files() reaches the wait point
test_datastore->on_rotate_log_files_callback = [&]() {
std::unique_lock<std::mutex> lock(wait_mutex);
wait_triggered = true;
wait_cv.notify_one(); // Notify that on_wait1 has been triggered
wait_cv.notify_one(); // Notify that on_rotate_log_files has been triggered
};

try {
// Run compact_with_online in a separate thread
auto future = std::async(std::launch::async, [&]() { datastore_->compact_with_online(); });

// Wait for on_wait1 to be triggered (simulating the waiting in rotate_log_files)
// Wait for on_rotate_log_files to be triggered (simulating the waiting in rotate_log_files)
{
std::unique_lock<std::mutex> lock(wait_mutex);
wait_cv.wait(lock, [&]() { return wait_triggered; });
}

// Now switch the epoch after on_wait1 has been triggered
// Now switch the epoch after on_rotate_log_files has been triggered
datastore_->switch_epoch(epoch);

// Wait for the compact operation to finish
Expand Down
22 changes: 11 additions & 11 deletions test/limestone/epoch/finish_soon_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,25 @@ TEST_F(finish_soon_test, same) {

datastore_->switch_epoch(2);
ASSERT_EQ(1, datastore_->epoch_id_informed());
ASSERT_EQ(0, datastore_->epoch_id_recorded());
ASSERT_EQ(0, datastore_->epoch_id_to_be_recorded());

datastore_->switch_epoch(3);
ASSERT_EQ(2, datastore_->epoch_id_informed());
ASSERT_EQ(0, datastore_->epoch_id_recorded());
ASSERT_EQ(0, datastore_->epoch_id_to_be_recorded());

channel.begin_session();
channel.end_session();

ASSERT_EQ(2, datastore_->epoch_id_informed());
ASSERT_EQ(0, datastore_->epoch_id_recorded());
ASSERT_EQ(0, datastore_->epoch_id_to_be_recorded());

datastore_->switch_epoch(4);
ASSERT_EQ(3, datastore_->epoch_id_informed());
ASSERT_EQ(3, datastore_->epoch_id_recorded());
ASSERT_EQ(3, datastore_->epoch_id_to_be_recorded());

datastore_->switch_epoch(5);
ASSERT_EQ(4, datastore_->epoch_id_informed());
ASSERT_EQ(3, datastore_->epoch_id_recorded());
ASSERT_EQ(3, datastore_->epoch_id_to_be_recorded());

// cleanup
datastore_->shutdown();
Expand All @@ -77,27 +77,27 @@ TEST_F(finish_soon_test, different) {

datastore_->switch_epoch(2);
ASSERT_EQ(1, datastore_->epoch_id_informed());
ASSERT_EQ(0, datastore_->epoch_id_recorded());
ASSERT_EQ(0, datastore_->epoch_id_to_be_recorded());

datastore_->switch_epoch(3);
ASSERT_EQ(2, datastore_->epoch_id_informed());
ASSERT_EQ(0, datastore_->epoch_id_recorded());
ASSERT_EQ(0, datastore_->epoch_id_to_be_recorded());

channel.begin_session();
ASSERT_EQ(2, datastore_->epoch_id_informed());
ASSERT_EQ(0, datastore_->epoch_id_recorded());
ASSERT_EQ(0, datastore_->epoch_id_to_be_recorded());

datastore_->switch_epoch(4);
ASSERT_EQ(2, datastore_->epoch_id_informed());
ASSERT_EQ(0, datastore_->epoch_id_recorded());
ASSERT_EQ(0, datastore_->epoch_id_to_be_recorded());

channel.end_session();
ASSERT_EQ(3, datastore_->epoch_id_informed());
ASSERT_EQ(3, datastore_->epoch_id_recorded());
ASSERT_EQ(3, datastore_->epoch_id_to_be_recorded());

datastore_->switch_epoch(5);
ASSERT_EQ(4, datastore_->epoch_id_informed());
ASSERT_EQ(3, datastore_->epoch_id_recorded());
ASSERT_EQ(3, datastore_->epoch_id_to_be_recorded());

// cleanup
datastore_->shutdown();
Expand Down
Loading