Skip to content

Commit

Permalink
Avoid snapshot creation index inversion (eBay#479)
Browse files Browse the repository at this point in the history
* Since snapshot is created by the background commit thread, index
inversion can happen if user explicitly calls manual creation API.
In that case, the backgroudn thread will create a snapshot on
an index that is older than the latest snapshot, which should not
happen.

* To avoid such a case, added a logic to check the request commit
index and the latest snapshot's index.
  • Loading branch information
greensky00 authored Dec 5, 2023
1 parent ff3d2c7 commit 9de4522
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 2 deletions.
8 changes: 7 additions & 1 deletion include/libnuraft/callback.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,13 @@ public:
* Adding a server failed due to RPC errors and timeout expiry.
* ctx: null
*/
ServerJoinFailed = 25
ServerJoinFailed = 25,

/**
* Snapshot creation begins.
* ctx: pointer to `uint64_t` (committed_idx).
*/
SnapshotCreationBegin = 26
};

struct Param {
Expand Down
28 changes: 27 additions & 1 deletion src/handle_commit.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -512,14 +512,40 @@ bool raft_server::snapshot_and_compact(ulong committed_idx, bool forced_creation
try {
bool f = false;
ptr<snapshot> local_snp = get_last_snapshot();

cb_func::Param param(id_, leader_, -1, &committed_idx);
CbReturnCode rc = invoke_callback(cb_func::SnapshotCreationBegin, &param);
if (rc != CbReturnCode::Ok) {
p_wn("creating a snapshot %" PRIu64 " is rejected by user callback",
committed_idx);
return false;
}

if ( ( forced_creation ||
!local_snp ||
( committed_idx - local_snp->get_last_log_idx() ) >= snapshot_distance ) &&
committed_idx >= snapshot_distance + local_snp->get_last_log_idx() ) &&
snp_in_progress_.compare_exchange_strong(f, true) )
{
snapshot_in_action = true;
p_in("creating a snapshot for index %" PRIu64 "", committed_idx);

// NOTE:
// Due to the public API `raft_server::create_snapshot()`,
// there can be a race between user thread and commit thread,
// which results in snapshot index inversion.
//
// To avoid such a case, while `snp_in_progress_` is true,
// we re-check the latest snapshot index here.
local_snp = get_last_snapshot();
if (local_snp && local_snp->get_last_log_idx() >= committed_idx) {
p_wn("snapshot index inversion detected, "
"skip snapshot creation for index %" PRIu64 ", "
"latest snapshot index %" PRIu64 "",
committed_idx, local_snp->get_last_log_idx());
snp_in_progress_ = false;
return false;
}

while ( conf->get_log_idx() > committed_idx &&
conf->get_prev_log_idx() >= log_store_->start_index() ) {
ptr<log_entry> conf_log
Expand Down
8 changes: 8 additions & 0 deletions tests/unit/raft_functional_common.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public:
, lastCommittedConfigIdx(0)
, targetSnpReadFailures(0)
, snpDelayMs(0)
, numSnapshotCreations(0)
, myLog(logger)
{
(void)myLog;
Expand Down Expand Up @@ -241,6 +242,7 @@ public:
// NOTE: We only handle logical snapshot.
ptr<buffer> snp_buf = s.serialize();
lastSnapshot = snapshot::deserialize(*snp_buf);
numSnapshotCreations++;
}
ptr<std::exception> except(nullptr);
bool ret = true;
Expand Down Expand Up @@ -372,6 +374,10 @@ public:
serversForCommit = src;
}

uint64_t getNumSnapshotCreations() const {
return numSnapshotCreations;
}

private:
std::map<uint64_t, ptr<buffer>> preCommits;
std::map<uint64_t, ptr<buffer>> commits;
Expand All @@ -398,6 +404,8 @@ private:
std::list<int> serversForCommit;
mutable std::mutex serversForCommitLock;

std::atomic<uint64_t> numSnapshotCreations;

SimpleLogger* myLog;
};

Expand Down
96 changes: 96 additions & 0 deletions tests/unit/raft_server_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -2145,6 +2145,99 @@ int snapshot_manual_creation_test() {
return 0;
}

int snapshot_creation_index_inversion_test() {
reset_log_files();
ptr<FakeNetworkBase> f_base = cs_new<FakeNetworkBase>();

std::string s1_addr = "S1";
std::string s2_addr = "S2";
std::string s3_addr = "S3";

RaftPkg s1(f_base, 1, s1_addr);
RaftPkg s2(f_base, 2, s2_addr);
RaftPkg s3(f_base, 3, s3_addr);
std::vector<RaftPkg*> pkgs = {&s1, &s2, &s3};

CHK_Z( launch_servers( pkgs ) );
CHK_Z( make_group( pkgs ) );

// Append a message using separate thread.
ExecArgs exec_args(&s1);
TestSuite::ThreadHolder hh(&exec_args, fake_executer, fake_executer_killer);

for (auto& entry: pkgs) {
RaftPkg* pp = entry;
raft_params param = pp->raftServer->get_current_params();
param.return_method_ = raft_params::async_handler;
pp->raftServer->update_params(param);
}

const size_t NUM = 5;

// Set a callback function to manually create snapshot,
// right before the automatic snapshot creation.
bool manual_snp_creation_succ = false;
s1.ctx->set_cb_func([&](cb_func::Type t, cb_func::Param* p) -> cb_func::ReturnCode {
// At the beginning of an automatic snapshot creation,
// create a manual snapshot, to mimic index inversion.
if (t != cb_func::Type::SnapshotCreationBegin) {
return cb_default(t, p);
}

// This function should be invoked only once, to avoid
// infinite recursive call.
static bool invoked = false;
if (!invoked) {
invoked = true;
ulong log_idx = s1.raftServer->create_snapshot();
manual_snp_creation_succ = (log_idx > 0);
}
return cb_func::ReturnCode::Ok;
});

// Append messages asynchronously.
std::list< ptr< cmd_result< ptr<buffer> > > > handlers;
for (size_t ii=0; ii<NUM; ++ii) {
std::string test_msg = "test" + std::to_string(ii);
ptr<buffer> msg = buffer::alloc(test_msg.size() + 1);
msg->put(test_msg);
ptr< cmd_result< ptr<buffer> > > ret =
s1.raftServer->append_entries( {msg} );

CHK_TRUE( ret->get_accepted() );

handlers.push_back(ret);
}

// NOTE: Send it to S2 only, S3 will be lagging behind.
s1.fNet->execReqResp("S2"); // replication.
s1.fNet->execReqResp("S2"); // commit.
CHK_Z( wait_for_sm_exec(pkgs, COMMIT_TIMEOUT_SEC) ); // commit execution.

// One more time to make sure.
s1.fNet->execReqResp("S2");
s1.fNet->execReqResp("S2");
CHK_Z( wait_for_sm_exec(pkgs, COMMIT_TIMEOUT_SEC) );

// Snapshot creation should have happened only once, by manual creation.
CHK_TRUE(manual_snp_creation_succ);
CHK_EQ(1, s1.getTestSm()->getNumSnapshotCreations());

print_stats(pkgs);

s1.raftServer->shutdown();
s2.raftServer->shutdown();
s3.raftServer->shutdown();

fake_executer_killer(&exec_args);
hh.join();
CHK_Z( hh.getResult() );

f_base->destroy();

return 0;
}

int snapshot_randomized_creation_test() {
reset_log_files();
ptr<FakeNetworkBase> f_base = cs_new<FakeNetworkBase>();
Expand Down Expand Up @@ -3291,6 +3384,9 @@ int main(int argc, char** argv) {
ts.doTest( "snapshot manual creation test",
snapshot_manual_creation_test );

ts.doTest( "snapshot creation index inversion test",
snapshot_creation_index_inversion_test );

ts.doTest( "snapshot randomized creation test",
snapshot_randomized_creation_test );

Expand Down

0 comments on commit 9de4522

Please sign in to comment.