Skip to content

Commit

Permalink
Fix replay wal error in follower/learner mode (#1878)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

WAL replay meet empty WAL file will issue unrecoverable error.

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)

---------

Signed-off-by: Jin Hai <haijin.chn@gmail.com>
  • Loading branch information
JinHai-CN authored Sep 15, 2024
1 parent 3005e61 commit 5be1b0c
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 18 deletions.
1 change: 1 addition & 0 deletions src/main/infinity_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import variables;
// import python_instance;
import status;
import infinity_exception;
import wal_manager;

namespace infinity {

Expand Down
2 changes: 1 addition & 1 deletion src/storage/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ void Storage::SetStorageMode(StorageMode target_mode) {

// Must init catalog before txn manager.
// Replay wal file wrap init catalog
TxnTimeStamp system_start_ts = wal_mgr_->ReplayWalFile();
TxnTimeStamp system_start_ts = wal_mgr_->ReplayWalFile(target_mode);
if (system_start_ts == 0) {
// Init database, need to create default_db
LOG_INFO(fmt::format("Init a new catalog"));
Expand Down
7 changes: 0 additions & 7 deletions src/storage/storage.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,6 @@ export module storage;

namespace infinity {

export enum class StorageMode {
kUnInitialized,
kAdmin,
kReadable,
kWritable,
};

class CleanupInfoTracer;

export class Storage {
Expand Down
11 changes: 6 additions & 5 deletions src/storage/wal/catalog_delta_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -765,12 +765,12 @@ const String AddBlockEntryOp::ToString() const {
}

const String AddColumnEntryOp::ToString() const {
std::ostringstream oss;
oss << fmt::format("AddColumnEntryOp {} outline_infos: [", CatalogDeltaOperation::ToString());
std::stringstream sstream;
sstream << fmt::format("AddColumnEntryOp {} outline_infos: [", CatalogDeltaOperation::ToString());
const auto &[outline_buffer_count, last_chunk_offset] = outline_info_;
oss << fmt::format("outline_buffer_count: {}, last_chunk_offset: {}", outline_buffer_count, last_chunk_offset);
oss << "]";
return std::move(oss).str();
sstream << fmt::format("outline_buffer_count: {}, last_chunk_offset: {}", outline_buffer_count, last_chunk_offset);
sstream << "]";
return sstream.str();
}

const String AddTableIndexEntryOp::ToString() const {
Expand Down Expand Up @@ -1027,6 +1027,7 @@ UniquePtr<CatalogDeltaEntry> CatalogDeltaEntry::ReadAdv(const char *&ptr, i32 ma
UnrecoverableError(error_message);
}
{
LOG_INFO(fmt::format("Deserialize delta op count: {}", entry->operations_.size()));
for (const auto &operation : entry->operations_) {
LOG_INFO(fmt::format("Read delta op: {}", operation->ToString()));
}
Expand Down
20 changes: 16 additions & 4 deletions src/storage/wal/wal_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ String WalManager::GetWalFilename() const { return wal_path_; }
* @return int64_t The max commit timestamp of the transactions in the wal file.
*
*/
i64 WalManager::ReplayWalFile() {
i64 WalManager::ReplayWalFile(StorageMode targe_storage_mode) {
LocalFileSystem fs;

Vector<String> wal_list{};
Expand Down Expand Up @@ -556,9 +556,21 @@ i64 WalManager::ReplayWalFile() {
}

if (last_commit_ts == 0) {
// once wal is not empty, a checkpoint should always be found.
String error_message = "No checkpoint found in wal";
UnrecoverableError(error_message);
switch(targe_storage_mode) {
case StorageMode::kWritable: {
// once wal is not empty, a checkpoint should always be found in leader or standalone mode.
String error_message = "No checkpoint found in wal";
UnrecoverableError(error_message);
break;
}
case StorageMode::kReadable: {
return 0;
}
default: {
String error_message = "Unreachable branch";
UnrecoverableError(error_message);
}
}
}
LOG_INFO(fmt::format("Checkpoint found, replay the catalog"));
auto catalog_fileinfo = CatalogFile::ParseValidCheckpointFilenames(catalog_dir, max_commit_ts);
Expand Down
9 changes: 8 additions & 1 deletion src/storage/wal/wal_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ class TableEntry;
class Txn;
struct SegmentEntry;

export enum class StorageMode {
kUnInitialized,
kAdmin,
kReadable,
kWritable,
};

export class WalManager {
public:
WalManager(Storage *storage, String wal_dir, u64 wal_size_threshold, u64 delta_checkpoint_interval_wal_bytes, FlushOptionType flush_option);
Expand Down Expand Up @@ -63,7 +70,7 @@ public:

String GetWalFilename() const;

i64 ReplayWalFile();
i64 ReplayWalFile(StorageMode targe_storage_mode);

Optional<Pair<FullCatalogFileInfo, Vector<DeltaCatalogFileInfo>>> GetCatalogFiles() const;

Expand Down
10 changes: 10 additions & 0 deletions src/unit_test/storage/wal/catalog_delta_replay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,18 @@ TEST_P(CatalogDeltaReplayTest, replay_with_full_checkpoint) {

txn_mgr->CommitTxn(txn_record3);
EXPECT_EQ(table_entry->row_count(), 3ul);
// TODO: Need to start txn to do the delta check point
usleep(100000);
WaitFlushDeltaOp(storage);
}
{
auto *txn = txn_mgr->BeginTxn(MakeUnique<String>("get table"));
auto [table_entry, table_status] = txn->GetTableByName(*db_name, *table_name);
EXPECT_TRUE(table_status.ok());

EXPECT_EQ(table_entry->row_count(), 3ul);
txn_mgr->CommitTxn(txn);
}
infinity::InfinityContext::instance().UnInit();
}

Expand Down
2 changes: 2 additions & 0 deletions test/data/config/test_close_bgtask.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ time_zone = "utc-8"

[network]
[log]
log_to_stdout = true
log_level = "info"

[storage]
# close auto optimize
Expand Down
2 changes: 2 additions & 0 deletions test/data/config/test_close_bgtask_vfs_off.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ time_zone = "utc-8"

[network]
[log]
log_to_stdout = true
log_level = "info"

[storage]
data_dir = "/var/infinity/data"
Expand Down

0 comments on commit 5be1b0c

Please sign in to comment.