From 5f62d0ed837b6c71c37a658af2099de187f41cde Mon Sep 17 00:00:00 2001 From: Qi Xu Date: Thu, 15 Feb 2024 18:54:07 -0800 Subject: [PATCH 01/18] add wal close log Signed-off-by: Qi Xu --- db/db_impl/db_impl_files.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 1db50b476d2..3dd0533e5c6 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -317,6 +317,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, // logs_ could have changed while we were waiting. continue; } + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "deleting log %" PRIu64 " from logs_\n", log.number); logs_to_free_.push_back(log.ReleaseWriter()); logs_.pop_front(); } @@ -491,6 +493,8 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { // Close WALs before trying to delete them. for (const auto w : state.logs_to_free) { // TODO: maybe check the return value of Close. + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Close log %" PRIu64 " from logs_\n", w->get_log_number()); auto s = w->Close(); s.PermitUncheckedError(); } From 6d6d381fcc619c8db210734d8d1dff6f8d6045c1 Mon Sep 17 00:00:00 2001 From: Qi Xu Date: Thu, 15 Feb 2024 19:34:45 -0800 Subject: [PATCH 02/18] add wal log Signed-off-by: Qi Xu --- db/db_impl/db_impl.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 4c0798531e7..44512a0c30f 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1493,6 +1493,8 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir, wal.GetPreSyncSize() > 0) { synced_wals->AddWal(wal.number, WalMetadata(wal.GetPreSyncSize())); } + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "deleting log %" PRIu64 " from logs_\n", wal.number); logs_to_free_.push_back(wal.ReleaseWriter()); it = logs_.erase(it); } else { From b957e9a3fb0170c4ab92c922fec05dfdb9a5278e Mon Sep 17 00:00:00 2001 From: Qi Xu Date: Fri, 16 Feb 2024 10:10:51 -0800 Subject: [PATCH 03/18] add more logs Signed-off-by: Qi Xu --- db/db_impl/db_impl_write.cc | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index dd7f65f03f7..b2ff9e1a1e6 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1550,6 +1550,24 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal); RecordTick(stats_, WRITE_WITH_WAL, write_with_wal); } + + if (log_writer->get_log_number() != logs_.back().number) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Not writing to latest WAL: [%" PRIu64 ", %" PRIu64 "]", + log_writer->get_log_number(), logs_.back().number); + } + if (!need_log_sync) { + for (auto& log : logs_) { + if (log_writer->get_log_number() == log.number) { + if (log.IsSyncing()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "WAL is being Syncing and writting: [%" PRIu64 "", + log_writer->get_log_number()); + } + break; + } + } + } return io_s; } From d79880b005c4141a859b4776f07f8a27e121601d Mon Sep 17 00:00:00 2001 From: Qi Xu Date: Mon, 11 Mar 2024 11:37:37 -0700 Subject: [PATCH 04/18] add log when wal is being deleted Signed-off-by: Qi Xu --- db/db_impl/db_impl_files.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 3dd0533e5c6..1556e047826 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -293,6 +293,9 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, earliest.number); log_recycle_files_.push_back(earliest.number); } else { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "deleting WAL log %" PRIu64 "\n", + earliest.number); job_context->log_delete_files.push_back(earliest.number); } if (job_context->size_log_to_delete == 0) { From 7f68db679593a1042c69c7405d2c186073d7857b Mon Sep 17 00:00:00 2001 From: Qi Xu Date: Mon, 11 Mar 2024 12:08:26 -0700 Subject: [PATCH 05/18] add more logs Signed-off-by: Qi Xu --- db/db_impl/db_impl.cc | 9 ++++++++- db/db_impl/db_impl_write.cc | 13 +------------ 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 44512a0c30f..39f4280cc17 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1487,7 +1487,8 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir, for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) { auto& wal = *it; assert(wal.IsSyncing()); - + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Synced log %" PRIu64 " from logs_\n", wal.number); if (logs_.size() > 1) { if (immutable_db_options_.track_and_verify_wals_in_manifest && wal.GetPreSyncSize() > 0) { @@ -1509,12 +1510,18 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir, void DBImpl::MarkLogsNotSynced(uint64_t up_to) { log_write_mutex_.AssertHeld(); + uint64_t min_wal = 0; for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to; ++it) { auto& wal = *it; + if (min_wal == 0) { + min_wal = it->number; + } wal.FinishSync(); } log_sync_cv_.SignalAll(); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "MarkLogsNotSynced from %" PRIu64 " to %d\n", min_wal, up_to); } SequenceNumber DBImpl::GetLatestSequenceNumber() const { diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index b2ff9e1a1e6..1a51f6cbe03 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1556,18 +1556,7 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, "Not writing to latest WAL: [%" PRIu64 ", %" PRIu64 "]", log_writer->get_log_number(), logs_.back().number); } - if (!need_log_sync) { - for (auto& log : logs_) { - if (log_writer->get_log_number() == log.number) { - if (log.IsSyncing()) { - ROCKS_LOG_INFO(immutable_db_options_.info_log, - "WAL is being Syncing and writting: [%" PRIu64 "", - log_writer->get_log_number()); - } - break; - } - } - } + return io_s; } From 46713c97835e83046e948ab288869c07e69a646c Mon Sep 17 00:00:00 2001 From: Qi Xu Date: Mon, 11 Mar 2024 12:47:04 -0700 Subject: [PATCH 06/18] fix Signed-off-by: Qi Xu --- db/db_impl/db_impl.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 39f4280cc17..bcf8c8a0dc2 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1521,7 +1521,7 @@ void DBImpl::MarkLogsNotSynced(uint64_t up_to) { } log_sync_cv_.SignalAll(); ROCKS_LOG_INFO(immutable_db_options_.info_log, - "MarkLogsNotSynced from %" PRIu64 " to %d\n", min_wal, up_to); + "MarkLogsNotSynced from %" PRIu64 " to %" PRIu64 "\n", min_wal, up_to); } SequenceNumber DBImpl::GetLatestSequenceNumber() const { From e0ca53ae4c54e0b66a9e6aa61a39cc5123e030fb Mon Sep 17 00:00:00 2001 From: Qi Xu Date: Wed, 13 Mar 2024 14:16:36 -0700 Subject: [PATCH 07/18] fix format Signed-off-by: Qi Xu --- db/db_impl/db_impl.cc | 5 +++-- db/db_impl/db_impl_files.cc | 3 +-- db/db_impl/db_impl_write.cc | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index bcf8c8a0dc2..dac9e42d251 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1488,7 +1488,7 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir, auto& wal = *it; assert(wal.IsSyncing()); ROCKS_LOG_INFO(immutable_db_options_.info_log, - "Synced log %" PRIu64 " from logs_\n", wal.number); + "Synced log %" PRIu64 " from logs_\n", wal.number); if (logs_.size() > 1) { if (immutable_db_options_.track_and_verify_wals_in_manifest && wal.GetPreSyncSize() > 0) { @@ -1521,7 +1521,8 @@ void DBImpl::MarkLogsNotSynced(uint64_t up_to) { } log_sync_cv_.SignalAll(); ROCKS_LOG_INFO(immutable_db_options_.info_log, - "MarkLogsNotSynced from %" PRIu64 " to %" PRIu64 "\n", min_wal, up_to); + "MarkLogsNotSynced from %" PRIu64 " to %" PRIu64 "\n", min_wal, + up_to); } SequenceNumber DBImpl::GetLatestSequenceNumber() const { diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 1556e047826..d86a5d4bd42 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -294,8 +294,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, log_recycle_files_.push_back(earliest.number); } else { ROCKS_LOG_INFO(immutable_db_options_.info_log, - "deleting WAL log %" PRIu64 "\n", - earliest.number); + "deleting WAL log %" PRIu64 "\n", earliest.number); job_context->log_delete_files.push_back(earliest.number); } if (job_context->size_log_to_delete == 0) { diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 1a51f6cbe03..bfa21411850 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1556,7 +1556,7 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, "Not writing to latest WAL: [%" PRIu64 ", %" PRIu64 "]", log_writer->get_log_number(), logs_.back().number); } - + return io_s; } From 1e78d43a8c6e7b906bda61f09f3f7016326ff3b4 Mon Sep 17 00:00:00 2001 From: Qi Xu Date: Wed, 13 Mar 2024 20:17:29 -0700 Subject: [PATCH 08/18] add log for the first write Signed-off-by: Qi Xu --- db/db_impl/db_impl_write.cc | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index bfa21411850..8ccb9ddb40c 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1434,7 +1434,11 @@ IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch, uint64_t* log_size, LogFileNumberSize& log_file_number_size) { assert(log_size != nullptr); - + if (log_writer->file()->GetFileSize() == 0) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Start writing to WAL: [%" PRIu64 "]", + log_writer->get_log_number()); + } Slice log_entry = WriteBatchInternal::Contents(&merged_batch); *log_size = log_entry.size(); // When two_write_queues_ WriteToWAL has to be protected from concurretn calls From bfc7eca13cf62c14cf0149289efb19611b210801 Mon Sep 17 00:00:00 2001 From: Qi Xu Date: Thu, 14 Mar 2024 13:47:40 -0700 Subject: [PATCH 09/18] add more logs Signed-off-by: Qi Xu --- db/db_impl/db_impl.cc | 7 +++++-- db/db_impl/db_impl.h | 2 +- db/db_impl/db_impl_files.cc | 11 ++++++++--- db/db_impl/db_impl_open.cc | 2 +- db/db_impl/db_impl_write.cc | 23 +++++++++++++---------- db/log_writer.cc | 1 + db/log_writer.h | 5 +++++ 7 files changed, 34 insertions(+), 17 deletions(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index dac9e42d251..d0706ac4377 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1494,9 +1494,12 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir, wal.GetPreSyncSize() > 0) { synced_wals->AddWal(wal.number, WalMetadata(wal.GetPreSyncSize())); } + auto writer = wal.ReleaseWriter(); ROCKS_LOG_INFO(immutable_db_options_.info_log, - "deleting log %" PRIu64 " from logs_\n", wal.number); - logs_to_free_.push_back(wal.ReleaseWriter()); + "deleting log %" PRIu64 + " from logs_. Last Seq number of the WAL is %" PRIu64 "\n", + wal.number, writer->GetLastSequence()); + logs_to_free_.push_back(writer); it = logs_.erase(it); } else { wal.FinishSync(); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index ee5d1825215..61fccfcf6d7 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1845,7 +1845,7 @@ class DBImpl : public DB { IOStatus WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer, uint64_t* log_used, uint64_t* log_size, - LogFileNumberSize& log_file_number_size); + LogFileNumberSize& log_file_number_size, int caller_id); IOStatus WriteToWAL(const WriteThread::WriteGroup& write_group, log::Writer* log_writer, uint64_t* log_used, diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index d86a5d4bd42..d6289155dc7 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -319,9 +319,12 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, // logs_ could have changed while we were waiting. continue; } + auto writer = log.ReleaseWriter(); ROCKS_LOG_INFO(immutable_db_options_.info_log, - "deleting log %" PRIu64 " from logs_\n", log.number); - logs_to_free_.push_back(log.ReleaseWriter()); + "deleting log %" PRIu64 + " from logs_, last seq number of WAL %" PRIu64 "\n", + log.number, writer->GetLastSequence()); + logs_to_free_.push_back(writer); logs_.pop_front(); } // Current log cannot be obsolete. @@ -496,7 +499,9 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { for (const auto w : state.logs_to_free) { // TODO: maybe check the return value of Close. ROCKS_LOG_INFO(immutable_db_options_.info_log, - "Close log %" PRIu64 " from logs_\n", w->get_log_number()); + "Close log %" PRIu64 + " from logs_, last Seq number in WAL %" PRIu64 "\n", + w->get_log_number(), w->GetLastSequence()); auto s = w->Close(); s.PermitUncheckedError(); } diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index a75a62f5b69..6dd5fc00e73 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1751,7 +1751,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, assert(log_writer->get_log_number() == log_file_number_size.number); impl->mutex_.AssertHeld(); s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size, - log_file_number_size); + log_file_number_size, 0); if (s.ok()) { // Need to fsync, otherwise it might get lost after a power reset. s = impl->FlushWAL(false); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 8ccb9ddb40c..0fc6f47e77d 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1432,14 +1432,23 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer, uint64_t* log_used, uint64_t* log_size, - LogFileNumberSize& log_file_number_size) { + LogFileNumberSize& log_file_number_size, + int caller_id) { assert(log_size != nullptr); if (log_writer->file()->GetFileSize() == 0) { ROCKS_LOG_INFO(immutable_db_options_.info_log, - "Start writing to WAL: [%" PRIu64 "]", + "Start writing to WAL: [%" PRIu64 " ]", log_writer->get_log_number()); } + if (log_writer->get_log_number() != logs_.back().number) { + ROCKS_LOG_INFO( + immutable_db_options_.info_log, + "Not writing to latest WAL: [%" PRIu64 ", %" PRIu64 "] CallerId: %d", + log_writer->get_log_number(), logs_.back().number, caller_id); + } Slice log_entry = WriteBatchInternal::Contents(&merged_batch); + SequenceNumber seq = WriteBatchInternal::Sequence(&merged_batch); + log_writer->SetLastSequence(seq); *log_size = log_entry.size(); // When two_write_queues_ WriteToWAL has to be protected from concurretn calls // from the two queues anyway and log_write_mutex_ is already held. Otherwise @@ -1492,7 +1501,7 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, uint64_t log_size; io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size, - log_file_number_size); + log_file_number_size, 1); if (to_be_cached_state) { cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_empty_ = false; @@ -1555,12 +1564,6 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, RecordTick(stats_, WRITE_WITH_WAL, write_with_wal); } - if (log_writer->get_log_number() != logs_.back().number) { - ROCKS_LOG_INFO(immutable_db_options_.info_log, - "Not writing to latest WAL: [%" PRIu64 ", %" PRIu64 "]", - log_writer->get_log_number(), logs_.back().number); - } - return io_s; } @@ -1600,7 +1603,7 @@ IOStatus DBImpl::ConcurrentWriteToWAL( uint64_t log_size; io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size, - log_file_number_size); + log_file_number_size, 2); if (to_be_cached_state) { cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_empty_ = false; diff --git a/db/log_writer.cc b/db/log_writer.cc index e2e596596aa..2d6a3ba1ba4 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -29,6 +29,7 @@ Writer::Writer(std::unique_ptr&& dest, uint64_t log_number, char t = static_cast(i); type_crc_[i] = crc32c::Value(&t, 1); } + last_seq_ = 0; } Writer::~Writer() { diff --git a/db/log_writer.h b/db/log_writer.h index 1a91b21994d..71da305d739 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -92,11 +92,16 @@ class Writer { bool TEST_BufferIsEmpty(); + void SetLastSequence(SequenceNumber seq) { last_seq_ = seq; } + + SequenceNumber GetLastSequence() const { return last_seq_; } + private: std::unique_ptr dest_; size_t block_offset_; // Current offset in block uint64_t log_number_; bool recycle_log_files_; + SequenceNumber last_seq_; // crc32c values for all supported record types. These are // pre-computed to reduce the overhead of computing the crc of the From a4674f04e4274e12db3f530772919bb5168eeeed Mon Sep 17 00:00:00 2001 From: Qi Xu Date: Thu, 14 Mar 2024 16:10:32 -0700 Subject: [PATCH 10/18] fix compile Signed-off-by: Qi Xu --- db/log_writer.h | 1 + 1 file changed, 1 insertion(+) diff --git a/db/log_writer.h b/db/log_writer.h index 71da305d739..20db1abc4cc 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -15,6 +15,7 @@ #include "rocksdb/io_status.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" +#include "rocksdb/types.h" namespace ROCKSDB_NAMESPACE { From 48eee8bd5181371ca767a6bd158d8ae1f62d21ce Mon Sep 17 00:00:00 2001 From: Qi Xu Date: Thu, 21 Mar 2024 16:46:42 -0700 Subject: [PATCH 11/18] ensure wal is synced into manifest in FindObsoleteFiles as well Signed-off-by: Qi Xu --- db/db_impl/db_impl.cc | 8 ++++++-- db/db_impl/db_impl_files.cc | 10 ++++++++++ db/wal_edit.cc | 6 ++++-- db/wal_edit.h | 10 ++++++++-- 4 files changed, 28 insertions(+), 6 deletions(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index d0706ac4377..7c253d9fa0a 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1488,11 +1488,15 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir, auto& wal = *it; assert(wal.IsSyncing()); ROCKS_LOG_INFO(immutable_db_options_.info_log, - "Synced log %" PRIu64 " from logs_\n", wal.number); + "Synced log %" PRIu64 " from logs_, last seq number %" PRIu64 + "\n", + wal.number, wal.writer->GetLastSequence()); if (logs_.size() > 1) { if (immutable_db_options_.track_and_verify_wals_in_manifest && wal.GetPreSyncSize() > 0) { - synced_wals->AddWal(wal.number, WalMetadata(wal.GetPreSyncSize())); + synced_wals->AddWal( + wal.number, + WalMetadata(wal.GetPreSyncSize(), wal.writer->GetLastSequence())); } auto writer = wal.ReleaseWriter(); ROCKS_LOG_INFO(immutable_db_options_.info_log, diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index d6289155dc7..45b3289a9a4 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -280,6 +280,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, return; } + VersionEdit synced_wals; if (!alive_log_files_.empty() && !logs_.empty()) { uint64_t min_log_number = job_context->log_number; size_t num_alive_log_files = alive_log_files_.size(); @@ -319,6 +320,12 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, // logs_ could have changed while we were waiting. continue; } + if (immutable_db_options_.track_and_verify_wals_in_manifest && + log.GetPreSyncSize() > 0) { + synced_wals.AddWal( + log.number, + WalMetadata(log.GetPreSyncSize(), log.writer->GetLastSequence())); + } auto writer = log.ReleaseWriter(); ROCKS_LOG_INFO(immutable_db_options_.info_log, "deleting log %" PRIu64 @@ -338,6 +345,9 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, logs_to_free_.clear(); log_write_mutex_.Unlock(); mutex_.Lock(); + if (synced_wals.IsWalAddition()) { + ApplyWALToManifest(&synced_wals); + } job_context->log_recycle_files.assign(log_recycle_files_.begin(), log_recycle_files_.end()); } diff --git a/db/wal_edit.cc b/db/wal_edit.cc index 2525be610b4..1e1e820b71c 100644 --- a/db/wal_edit.cc +++ b/db/wal_edit.cc @@ -58,13 +58,15 @@ Status WalAddition::DecodeFrom(Slice* src) { JSONWriter& operator<<(JSONWriter& jw, const WalAddition& wal) { jw << "LogNumber" << wal.GetLogNumber() << "SyncedSizeInBytes" - << wal.GetMetadata().GetSyncedSizeInBytes(); + << wal.GetMetadata().GetSyncedSizeInBytes() << "LastSeqNumber" + << wal.GetMetadata().GetLastSequence(); return jw; } std::ostream& operator<<(std::ostream& os, const WalAddition& wal) { os << "log_number: " << wal.GetLogNumber() - << " synced_size_in_bytes: " << wal.GetMetadata().GetSyncedSizeInBytes(); + << " synced_size_in_bytes: " << wal.GetMetadata().GetSyncedSizeInBytes() + << " last_seq_number: " << wal.GetMetadata().GetLastSequence(); return os; } diff --git a/db/wal_edit.h b/db/wal_edit.h index d27f74ef137..9a4403cc675 100644 --- a/db/wal_edit.h +++ b/db/wal_edit.h @@ -32,8 +32,10 @@ class WalMetadata { public: WalMetadata() = default; - explicit WalMetadata(uint64_t synced_size_bytes) - : synced_size_bytes_(synced_size_bytes) {} + explicit WalMetadata(uint64_t synced_size_bytes, + uint64_t last_sequence_number) + : synced_size_bytes_(synced_size_bytes), + last_sequence_number_(last_sequence_number) {} bool HasSyncedSize() const { return synced_size_bytes_ != kUnknownWalSize; } @@ -41,6 +43,8 @@ class WalMetadata { uint64_t GetSyncedSizeInBytes() const { return synced_size_bytes_; } + uint64_t GetLastSequence() const { return last_sequence_number_; } + private: friend bool operator==(const WalMetadata& lhs, const WalMetadata& rhs); friend bool operator!=(const WalMetadata& lhs, const WalMetadata& rhs); @@ -50,6 +54,8 @@ class WalMetadata { // Size of the most recently synced WAL in bytes. uint64_t synced_size_bytes_ = kUnknownWalSize; + + uint64_t last_sequence_number_ = 0; }; inline bool operator==(const WalMetadata& lhs, const WalMetadata& rhs) { From 844300e24c3ac405444726c50c7201c1623874ed Mon Sep 17 00:00:00 2001 From: Qi Xu Date: Thu, 21 Mar 2024 18:50:49 -0700 Subject: [PATCH 12/18] fix case Signed-off-by: Qi Xu --- db/version_edit_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index c9e918c4e80..1c95472b0a0 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -475,7 +475,7 @@ TEST_F(VersionEditTest, AddWalDebug) { VersionEdit edit; for (int i = 0; i < n; i++) { - edit.AddWal(kLogNumbers[i], WalMetadata(kSizeInBytes[i])); + edit.AddWal(kLogNumbers[i], WalMetadata(kSizeInBytes[i], 0)); } const WalAdditions& wals = edit.GetWalAdditions(); @@ -573,7 +573,7 @@ TEST_F(VersionEditTest, IgnorableTags) { VersionEdit edit; // Add some ignorable entries. for (int i = 0; i < 2; i++) { - edit.AddWal(i + 1, WalMetadata(i + 2)); + edit.AddWal(i + 1, WalMetadata(i + 2, 0)); } edit.SetDBId("db_id"); // Add unignorable entries. From 81b4ebe8ea5e7c2d7e740195e413618145c3c027 Mon Sep 17 00:00:00 2001 From: Qi Xu Date: Thu, 21 Mar 2024 18:54:24 -0700 Subject: [PATCH 13/18] Revert "fix case" This reverts commit 844300e24c3ac405444726c50c7201c1623874ed. --- db/version_edit_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index 1c95472b0a0..c9e918c4e80 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -475,7 +475,7 @@ TEST_F(VersionEditTest, AddWalDebug) { VersionEdit edit; for (int i = 0; i < n; i++) { - edit.AddWal(kLogNumbers[i], WalMetadata(kSizeInBytes[i], 0)); + edit.AddWal(kLogNumbers[i], WalMetadata(kSizeInBytes[i])); } const WalAdditions& wals = edit.GetWalAdditions(); @@ -573,7 +573,7 @@ TEST_F(VersionEditTest, IgnorableTags) { VersionEdit edit; // Add some ignorable entries. for (int i = 0; i < 2; i++) { - edit.AddWal(i + 1, WalMetadata(i + 2, 0)); + edit.AddWal(i + 1, WalMetadata(i + 2)); } edit.SetDBId("db_id"); // Add unignorable entries. From 49a974ac2525b5b542e2f53ce6d2d22307691bdd Mon Sep 17 00:00:00 2001 From: Qi Xu Date: Thu, 21 Mar 2024 18:54:49 -0700 Subject: [PATCH 14/18] add default value Signed-off-by: Qi Xu --- db/wal_edit.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/wal_edit.h b/db/wal_edit.h index 9a4403cc675..e4807b9b633 100644 --- a/db/wal_edit.h +++ b/db/wal_edit.h @@ -33,7 +33,7 @@ class WalMetadata { WalMetadata() = default; explicit WalMetadata(uint64_t synced_size_bytes, - uint64_t last_sequence_number) + uint64_t last_sequence_number = 0) : synced_size_bytes_(synced_size_bytes), last_sequence_number_(last_sequence_number) {} From a9806592c1041c466c99017dcc79e1896253ed24 Mon Sep 17 00:00:00 2001 From: Qi Xu Date: Thu, 21 Mar 2024 21:32:28 -0700 Subject: [PATCH 15/18] fix manifest incorrect WAL size Signed-off-by: Qi Xu --- db/db_impl/db_impl_write.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 0fc6f47e77d..9cea7cc2b84 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1529,7 +1529,9 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, } for (auto& log : logs_) { + log.PrepareForSync(); io_s = log.writer->file()->Sync(immutable_db_options_.use_fsync); + log.FinishSync(); if (!io_s.ok()) { break; } From 027b6d25c62801830e865674a6511ee3fdf9b1c6 Mon Sep 17 00:00:00 2001 From: Qi Xu Date: Thu, 21 Mar 2024 23:37:47 -0700 Subject: [PATCH 16/18] fix Wal in manifest Signed-off-by: Qi Xu --- db/wal_edit.cc | 9 +++++++++ db/wal_edit.h | 4 ++++ 2 files changed, 13 insertions(+) diff --git a/db/wal_edit.cc b/db/wal_edit.cc index 1e1e820b71c..4435636dd13 100644 --- a/db/wal_edit.cc +++ b/db/wal_edit.cc @@ -17,6 +17,8 @@ void WalAddition::EncodeTo(std::string* dst) const { if (metadata_.HasSyncedSize()) { PutVarint32(dst, static_cast(WalAdditionTag::kSyncedSize)); PutVarint64(dst, metadata_.GetSyncedSizeInBytes()); + PutVarint32(dst, static_cast(WalAdditionTag::kLastSyncSeq)); + PutVarint64(dst, metadata_.GetLastSequence()); } PutVarint32(dst, static_cast(WalAdditionTag::kTerminate)); @@ -44,6 +46,13 @@ Status WalAddition::DecodeFrom(Slice* src) { metadata_.SetSyncedSizeInBytes(size); break; } + case WalAdditionTag::kLastSyncSeq: { + uint64_t lsn = 0; + if (!GetVarint64(src, &lsn)) { + return Status::Corruption(class_name, "Error decoding WAL file size"); + } + metadata_.SetLastSequence(lsn); + } // TODO: process future tags such as checksum. case WalAdditionTag::kTerminate: return Status::OK(); diff --git a/db/wal_edit.h b/db/wal_edit.h index e4807b9b633..c3c2566a942 100644 --- a/db/wal_edit.h +++ b/db/wal_edit.h @@ -45,6 +45,8 @@ class WalMetadata { uint64_t GetLastSequence() const { return last_sequence_number_; } + void SetLastSequence(uint64_t lsn) { last_sequence_number_ = lsn; } + private: friend bool operator==(const WalMetadata& lhs, const WalMetadata& rhs); friend bool operator!=(const WalMetadata& lhs, const WalMetadata& rhs); @@ -72,6 +74,8 @@ enum class WalAdditionTag : uint32_t { kTerminate = 1, // Synced Size in bytes. kSyncedSize = 2, + + kLastSyncSeq = 3, // Add tags in the future, such as checksum? }; From e5ba41556636b6f28d488f9c60a97863f6f74031 Mon Sep 17 00:00:00 2001 From: Qi Xu Date: Thu, 21 Mar 2024 23:39:07 -0700 Subject: [PATCH 17/18] fix compile Signed-off-by: Qi Xu --- db/wal_edit.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/db/wal_edit.cc b/db/wal_edit.cc index 4435636dd13..fb7018dd098 100644 --- a/db/wal_edit.cc +++ b/db/wal_edit.cc @@ -51,7 +51,8 @@ Status WalAddition::DecodeFrom(Slice* src) { if (!GetVarint64(src, &lsn)) { return Status::Corruption(class_name, "Error decoding WAL file size"); } - metadata_.SetLastSequence(lsn); + metadata_.SetLastSequence(lsn); + break; } // TODO: process future tags such as checksum. case WalAdditionTag::kTerminate: From e344ae507aba426c2f12ccbc254f6f1b304959c8 Mon Sep 17 00:00:00 2001 From: Qi Xu Date: Fri, 22 Mar 2024 14:37:49 -0700 Subject: [PATCH 18/18] fix manifest LSN issue Signed-off-by: Qi Xu --- db/wal_edit.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/db/wal_edit.cc b/db/wal_edit.cc index fb7018dd098..ee159b08b3d 100644 --- a/db/wal_edit.cc +++ b/db/wal_edit.cc @@ -17,9 +17,9 @@ void WalAddition::EncodeTo(std::string* dst) const { if (metadata_.HasSyncedSize()) { PutVarint32(dst, static_cast(WalAdditionTag::kSyncedSize)); PutVarint64(dst, metadata_.GetSyncedSizeInBytes()); - PutVarint32(dst, static_cast(WalAdditionTag::kLastSyncSeq)); - PutVarint64(dst, metadata_.GetLastSequence()); } + PutVarint32(dst, static_cast(WalAdditionTag::kLastSyncSeq)); + PutVarint64(dst, metadata_.GetLastSequence()); PutVarint32(dst, static_cast(WalAdditionTag::kTerminate)); } @@ -151,6 +151,7 @@ Status WalSet::AddWal(const WalAddition& wal) { // Update synced size for the given WAL. it->second.SetSyncedSizeInBytes(wal.GetMetadata().GetSyncedSizeInBytes()); + it->second.SetLastSequence(wal.GetMetadata().GetLastSequence()); return Status::OK(); }