Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom recovery/checkpointing impl #1232

Draft
wants to merge 9 commits into
base: logwriter_thread
Choose a base branch
from

Conversation

mihirj1993
Copy link

@mihirj1993 mihirj1993 commented Jan 20, 2022

NOTE: Some code comments may be outdated. I will review again tomorrow and fix.

This PR does the following

  • Introduce persistence types relevant for recovery.

  • Introduce iterator to read log records from a persistent log file.

  • Recovery protocol:

           Sort log files to be in increasing order
           MMAP one log file at a time.
           Index decision & txn records in the file. Validate Checksum per record.
           Read decision records in increasing order and add all associated txn writes to the memtable. An optimization could be to read decision records in reverse order but for simplicity we replay txn records in commit order.
           Only committed txns are written to the memtable.
           Memtable (buffer where writes are stored in SST format) is flushed to disk if it becomes full.
           Skip unmapping and closing the file in case it has some unprocessed txn records as decision record for it can be present in the next file.
           At the end of recovery, validate no orphaned txn records exist (unless these records exist at the tail of the log)
           Flush any remaining writes.
           Destroy log.
           Start DB.
    

Recovery is conducted in sorted log order since it is preferable to stop at the first encountered corrupted record.

  • Log checkpointer thread:

      Only kicks in for log files that are completely full.
      Reuses most of the recovery code.
      Truncate log file after it has been completely checkpointed.
      Writes to RocksDB will now be single threaded. Checkpointing is outside the commit path so this isn't a perf impediment. 
    

Todo:

  • Enable the new logging impl once testing is done. I'll do this in a follow up PR
  • Disable RocksDB log (just need to update a rocks config)
  • Unfork rocksdb.
  • Move all rocksdb interaction (checkpointing) to a separate process.


struct record_iterator_t
{
unsigned char* cursor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some comments to describe the use of these fields.

unsigned char* end;
unsigned char* stop_at;
unsigned char* begin;
void* mapped;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mapped_data perhaps? With just mapped, the question is: "what is mapped?".

uint64_t get_value(const std::string& key);

/**
* Update custom key's value. Used to retain gaia counter across restarts.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update custom key's value.

Something seems to be missing. Should this be "Update a custom key's value."?

@@ -16,7 +16,8 @@ namespace gaia
{
namespace db
{

namespace persistence
{
class rdb_wrapper_t
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave an empty line after the set of namespace declarations.


if (s_server_conf.persistence_mode() == persistence_mode_t::e_reinitialized_on_startup)
{
s_log_handler->destroy_persistent_log(INT64_MAX);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the argument be INT64_MAX when the type is uint64_t? Or should this be UINT64_MAX?

Copy link
Contributor

@LaurentiuCristofor LaurentiuCristofor Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue for the call to recover_from_persistent_log and the second call to destroy_persistent_log below.

}

// Get last processed log.
auto last_processed_log_seq = s_persistent_store->get_value(gaia::db::persistence::persistent_store_manager::c_last_processed_log_num_key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line and line 300 are too long.

@@ -1043,6 +1083,36 @@ void server_t::log_writer_handler()
}
}

void server_t::checkpoint_handler()
{
// Wait for a persistent log file to be closed before checkpointing it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this comment be better placed below, before the call to eventfd_read?

@@ -65,6 +65,7 @@ log_handler_t::log_handler_t(const std::string& wal_dir_path)
void log_handler_t::open_for_writes(int validate_flushed_batch_eventfd, int signal_checkpoint_eventfd)
{
ASSERT_PRECONDITION(validate_flushed_batch_eventfd != -1, "Eventfd to signal post flush maintenance operations invalid!");
ASSERT_PRECONDITION(signal_checkpoint_eventfd != -1, "Eventfd to signal checkpointing of log file is invalid!");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these messages say "Eventfd to..." or "Eventfd for...". I'm not sure what an "eventfd to X" is supposed to mean.

// Done with recovery, delete all files.
for (const auto& file : std::filesystem::directory_iterator(s_wal_dir_path))
{
uint64_t file_seq = std::stoi(file.path().filename());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be handling possible exceptions and perhaps have a stronger check on filenames, so that we only delete the files that we created.

I understand that this is a special folder and nobody else is meant to write to it, but still, there is nothing preventing that from happening either.

// Done with recovery, delete all files.
for (const auto& file : std::filesystem::directory_iterator(s_wal_dir_path))
{
uint64_t file_seq = std::stoi(file.path().filename());
Copy link
Contributor

@LaurentiuCristofor LaurentiuCristofor Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, given that we want sequence numbers to be uint64_t, shouldn't this be a call to stoull (see: https://en.cppreference.com/w/cpp/string/basic_string/stoul) instead of stoi?


void log_handler_t::set_persistent_log_sequence(uint64_t log_seq)
{
s_file_num = log_seq + 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should assert that log_seq is not UINT64_MAX.

// 'last_checkpointed_commit_ts' and will reset this field to zero.
// We don't persist txn ids across restarts.
m_max_decided_commit_ts = last_checkpointed_commit_ts;
// Scan all files and read log records starting from the highest numbered file.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Insert an empty line before each comment block (except for a comment block at the start of a scope, like the previous one).

close_fd(fd);
}

for (auto entry : files_to_unmap)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can const auto work here?


for (auto file_seq : log_files)
{
ASSERT_PRECONDITION(file_seq > last_processed_log_seq, "Log file sequence number should be ");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Message is not terminated: should be ... ordered? Or pehaps: "Log file sequence numbers should appear in increasing order"?

// Open file.
std::stringstream file_name;
file_name << s_wal_dir_path << "/" << file_seq;
auto file_fd = open(file_name.str().c_str(), O_RDONLY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can fail if we've loaded a file that just happened to contain a number value (stoi/stoul will ignore non-numeric prefixes) and no file with just that number name exists. This is why we should have some better validation of the files that we process in the sense that we require a specific format of their names.

Another form of this issue is if we'd had file like:

1007
a1007
aa1007

With the current use of stoi, all of these names would give use the same sequence number 1007, which would lead to a failure of the above assert.

close_fd(file_fd);
});

if (file_fd < 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be doing this check before generating the scope_guard.

});

// halt_recovery is set to true in case an IO issue is encountered while reading the log.
auto halt_recovery = write_log_file_to_persistent_store(last_checkpointed_commit_ts, it);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not use auto where the type is not clear, as would be the case here. For make_scope_guard above, the name says it all, but here I'd need to look up the method declaration.

auto halt_recovery = write_log_file_to_persistent_store(last_checkpointed_commit_ts, it);

// Skip unmapping and closing the file in case it has some unprocessed transactions.
if (txn_index.size() > 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is txn_index related to the file? I'm failing to find where it's created. Is this not a local variable (then it lacks a special prefix to identify it so)?

{
files_to_close.push_back(file_fd);
files_to_unmap.push_back(std::pair(it.mapped, it.map_size));
mmap_cleanup.dismiss();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add these immediately after their respective resources are added to their collections. I.e.:

            files_to_close.push_back(file_fd);
            file_cleanup.dismiss();
            
            files_to_unmap.push_back(std::pair(it.mapped, it.map_size));
            mmap_cleanup.dismiss();

}

// Sort files in ascending order by file name.
sort(log_files.begin(), log_files.end());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the files be already sorted or are their names not fixed length? If the names have the right format, then the folder operation should read them in correct order anyway. In that case, a check for sorting could be kept just for additional validation and asserted on.

}
}

if (halt_recovery)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this check. It's still part of the loop, so why do we expect file_seq to be the last element of the file list?

if (txn_index.size() == 0)
{
// Safe to delete this file as it doesn't have any more txns to write to the persistent store.
max_log_file_seq_to_delete = file_seq;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can max_log_file_seq_to_delete skip one (or more) seq values but be set to another one following them?

I mean: we do this check in a loop. We don't set max for a number of file_seq, but then we check it for a larger one - is that okay? Is it supposed to happen or is it not supposed to happen?

// Iterate over records in file and write them to persistent store.
write_records(&it, last_checkpointed_commit_ts);

// Check that any remaining transactions have commit timestamp greater than the commit ts of the txn that was last written to the persistent store.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment should be broken on multiple lines.

payload_ptr += sizeof(decision_entry_t);
}

// Iterare decisions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iterare? Is that meant to be Iterate?


if (value.empty())
{
return static_cast<uint64_t>(0);
Copy link
Contributor

@LaurentiuCristofor LaurentiuCristofor Jan 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the explicit static_cast? You can return 0 directly.


uint64_t result;
value_reader.read_uint64(result);
return reinterpret_cast<uint64_t>(result);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the reinterpret_cast when result is already a uint64_t?

if (status.IsNotFound())
{
// Not found.
value = "";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the common::c_empty_string constant instead of the literal "".

void rdb_wrapper_t::get(const rocksdb::Slice& key, std::string& value)
{
rocksdb::DestroyDB(m_data_dir, rocksdb::Options{});
std::string val;
Copy link
Contributor

@LaurentiuCristofor LaurentiuCristofor Jan 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you use value directly? If you need a copy, call it read_value or something more descriptive.

@@ -97,6 +97,25 @@ struct alignas(gaia::db::memory_manager::c_allocation_alignment) db_object_t
}
};

struct db_recovered_object_t
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed when we already have db_object_t? If it's necessary, we should at least add a comment explaining the reason for this duplicate struct.

auto decision_entry = reinterpret_cast<decision_entry_t*>(payload_ptr);
if (decision_entry->commit_ts > last_checkpointed_commit_ts)
{
ASSERT_INVARIANT(txn_index.count(decision_entry->commit_ts) > 0, "Transaction record should be written before the decision record.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Break the assert arguments on separate lines.


void log_handler_t::map_log_file(struct record_iterator_t* it, int file_fd, recovery_mode_t recovery_mode)
{
struct stat st;
Copy link
Contributor

@LaurentiuCristofor LaurentiuCristofor Jan 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

st -> statistics. Or fstat_data.


if (fstat(file_fd, &st) == -1)
{
throw_system_error("failed to fstat wal file");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix capitalization and punctuation of error message.

}

auto first_data = lseek(file_fd, 0, SEEK_DATA);
ASSERT_INVARIANT(first_data == 0, "We don't expect any holes in the beginning to the wal files.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the beginning to -> in the beginning of

or perhaps:

at the beginning of


auto first_data = lseek(file_fd, 0, SEEK_DATA);
ASSERT_INVARIANT(first_data == 0, "We don't expect any holes in the beginning to the wal files.");
ASSERT_INVARIANT(st.st_size > 0, "We don't expect to write empty persistent log files.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we perform some additional validation on the file size? For example, if it contains a set of records and it's not empty, its size should be at least the size of a record and perhaps a multiple of that record's size?

bool log_handler_t::is_remaining_file_empty(unsigned char* start, unsigned char* end)
{
auto remaining_size = end - start;
unsigned char zeroblock[remaining_size];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

zeroblock -> zero_block

auto remaining_size = end - start;
unsigned char zeroblock[remaining_size];
memset(zeroblock, 0, sizeof zeroblock);
return memcmp(zeroblock, start, remaining_size) == 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How large can remaining_size be? This check that creates an array of that size for such a check seems rather inefficient. A simple iteration between start and end, checking each char for 0 is basically what memcmp will do itself, so why not do that directly and eliminate the need for zeroblock?

Copy link
Contributor

@LaurentiuCristofor LaurentiuCristofor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just finished a pass through this PR. I need some additional information to completely understand some of this code. I'll do another pass after you have a chance to go through my comments.

Copy link
Contributor

@senderista senderista left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lots to discuss...

@@ -267,6 +267,8 @@ class server_t
// Keep track of the last txn that has been submitted to the async_disk_writer.
static inline std::atomic<gaia_txn_id_t> s_last_queued_commit_ts_upper_bound = c_invalid_gaia_txn_id;

static inline gaia_txn_id_t s_last_checkpointed_commit_ts_lower_bound = c_invalid_gaia_txn_id;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't this atomic like s_last_queued_commit_ts_upper_bound?

@@ -87,6 +87,33 @@ class log_handler_t
*/
void register_commit_ts_for_session_notification(gaia_txn_id_t commit_ts, int session_decision_eventfd);

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we should move these comments to the header file, but you can wait until we move the others as well.

/**
* Destroy all log files with sequence number lesser than or equal to max_log_seq_to_delete.
*/
void destroy_persistent_log(uint64_t max_log_seq_to_delete);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't max_log_seq_to_delete a log_sequence_t?

/**
* Register persistent store create/delete APIs. Rework to call persistent store APIs directly?
*/
void register_write_to_persistent_store_fn(std::function<void(db_recovered_object_t&)> write_obj_fn);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the motivation for making these functions pluggable? Testing?

/**
* Set the log sequence counter.
*/
void set_persistent_log_sequence(uint64_t log_seq);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can log_seq be a log_sequence_t?

{
ASSERT_PRECONDITION(record->header.record_type == record_type_t::txn, "Expected transaction record.");

auto payload_ptr = reinterpret_cast<unsigned char*>(record->payload);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be encapsulated in a helper method in read_record_t.


auto payload_ptr = reinterpret_cast<unsigned char*>(record->payload);
auto start_ptr = payload_ptr;
auto end_ptr = reinterpret_cast<unsigned char*>(record) + record->header.payload_size;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you be adding payload_size to payload_ptr, not to record? This is another example of why I want to keep tricky pointer arithmetic out of client code whenever possible...

auto payload_ptr = reinterpret_cast<unsigned char*>(record->payload);
auto start_ptr = payload_ptr;
auto end_ptr = reinterpret_cast<unsigned char*>(record) + record->header.payload_size;
auto deleted_ids_ptr = end_ptr - (sizeof(common::gaia_id_t) * record->header.deleted_object_count);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to db_object_t, I think it's more robust to have an array of fixed-size objects precede an array of variable-size objects. If the fixed-size objects are packed after the variable-size objects, then it's easy to get alignment errors.

ASSERT_INVARIANT(obj_ptr->payload_size > 0, "Recovered object size should be greater than 0");
write_to_persistent_store_fn(*obj_ptr);

size_t requested_size = obj_ptr->payload_size + c_db_object_header_size;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maybe we should add a total_size() helper to db_object_t (along with data_size()).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I just merged this in #1269, so please use it here.)

payload_ptr += allocation_size;
}

for (size_t i = 0; i < record->header.deleted_object_count; i++)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As elsewhere, you should do pointer arithmetic in the for statement itself:

auto first_id_ptr = reinterpret_cast<common::gaia_id_t*>(deleted_ids_ptr);
for (gaia_id_t* id_ptr = first_id_ptr; id_ptr < first_id_ptr + record->header.deleted_object_count; ++id_ptr)
{
...
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants