Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cft election #1641

Merged
merged 40 commits into from
Sep 30, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
e6cf454
Do not truncate committable entries on election
achamayou Sep 21, 2020
c4458cf
Fix test
achamayou Sep 21, 2020
5b0cb0c
Fix test
achamayou Sep 22, 2020
6eaa15f
Fix test
achamayou Sep 22, 2020
a0eeecb
Merge branch 'master' into fix_cft_election
achamayou Sep 22, 2020
1bbd829
Fix test
achamayou Sep 22, 2020
e9bcbbb
Merge branch 'master' into fix_cft_election
achamayou Sep 23, 2020
5eb87d9
Use last committable index only in elections
achamayou Sep 23, 2020
c51ffa0
Update src/consensus/aft/raft.h
achamayou Sep 23, 2020
f72406f
Merge branch 'master' into fix_cft_election
achamayou Sep 24, 2020
87d85a0
Merge branch 'master' into fix_cft_election
achamayou Sep 24, 2020
50d3c81
rotation test
achamayou Sep 24, 2020
f5991d9
Merge branch 'fix_cft_election' of https://github.com/achamayou/CCF i…
achamayou Sep 24, 2020
e52bb00
rotation test
achamayou Sep 24, 2020
67545d1
No such thing as simplebank anymore
achamayou Sep 24, 2020
a946640
Remove now unnecessary barrier in recovery
achamayou Sep 24, 2020
d85c25e
rotation test
achamayou Sep 25, 2020
a567cb8
rotation test
achamayou Sep 25, 2020
835aac5
fix
achamayou Sep 25, 2020
2bf6d0f
Merge branch 'master' into fix_cft_election
achamayou Sep 25, 2020
6a4d3db
fix
achamayou Sep 25, 2020
d2aa627
Merge branch 'master' into fix_cft_election
achamayou Sep 28, 2020
a1a5e68
wip
achamayou Sep 28, 2020
6846905
fix
achamayou Sep 28, 2020
d401a2a
test committable suffix
achamayou Sep 29, 2020
ccc3b91
.
achamayou Sep 29, 2020
06c67f2
missing file
achamayou Sep 29, 2020
02faaf6
Merge branch 'master' into fix_cft_election
achamayou Sep 29, 2020
f3d5b0b
wait for commit
achamayou Sep 29, 2020
ba5c1f1
Merge branch 'fix_cft_election' of https://github.com/achamayou/CCF i…
achamayou Sep 29, 2020
97c5a72
comment
achamayou Sep 29, 2020
0ab0ec5
diff
achamayou Sep 29, 2020
b43cea6
Update tests/committable.py
achamayou Sep 29, 2020
e99cb7b
Merge branch 'master' into fix_cft_election
achamayou Sep 29, 2020
d005173
Merge branch 'master' into fix_cft_election
achamayou Sep 29, 2020
281795c
format
achamayou Sep 29, 2020
8d3d9bc
Merge branch 'master' into fix_cft_election
achamayou Sep 29, 2020
c9d5ba9
ignore stopped nodes in config check
achamayou Sep 30, 2020
5514fca
Merge branch 'fix_cft_election' of https://github.com/achamayou/CCF i…
achamayou Sep 30, 2020
07cbb0b
Merge branch 'master' into fix_cft_election
achamayou Sep 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ if(BUILD_TESTS)
NAME recovery_test_suite
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/e2e_suite.py
CONSENSUS cft
LABEL suite
LABEL long_test
ADDITIONAL_ARGS
--test-duration
150
Expand All @@ -544,7 +544,7 @@ if(BUILD_TESTS)
NAME reconfiguration_test_suite
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/e2e_suite.py
CONSENSUS cft
LABEL suite
LABEL long_test
ADDITIONAL_ARGS
--test-duration
150
Expand Down Expand Up @@ -576,7 +576,7 @@ if(BUILD_TESTS)
NAME full_test_suite
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/e2e_suite.py
CONSENSUS cft
LABEL suite
LABEL long_test
ADDITIONAL_ARGS
--ledger-recovery-timeout
20
Expand All @@ -589,6 +589,13 @@ if(BUILD_TESTS)
4000
)

add_e2e_test(
NAME rotation_test
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/rotation.py
CONSENSUS cft
ADDITIONAL_ARGS --raft-election-timeout 4000
)

add_e2e_test(
NAME lua_e2e_batched
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/e2e_batched.py
Expand Down
35 changes: 23 additions & 12 deletions src/consensus/aft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@ namespace aft
return replica_state == Follower;
}

Index last_committable_index() const
{
return committable_indices.empty() ? state->commit_idx :
committable_indices.back();
}

void enable_all_domains()
{
// When receiving append entries as a follower, all security domains will
Expand Down Expand Up @@ -987,10 +993,13 @@ namespace aft
{
LOG_INFO_FMT("Send request vote from {} to {}", state->my_node_id, to);

auto last_committable_idx = last_committable_index();
CCF_ASSERT(last_committable_idx >= state->commit_idx, "lci < ci");

RequestVote rv = {{raft_request_vote, state->my_node_id},
state->current_view,
state->commit_idx,
get_term_internal(state->commit_idx)};
last_committable_idx,
get_term_internal(last_committable_idx)};

channels->send_authenticated(ccf::NodeMsgType::consensus_msg, to, rv);
}
Expand Down Expand Up @@ -1058,11 +1067,15 @@ namespace aft
}

// If the candidate's log is at least as up-to-date as ours, vote yes
auto last_commit_term = get_term_internal(state->commit_idx);

auto answer = (r.last_commit_term > last_commit_term) ||
((r.last_commit_term == last_commit_term) &&
(r.last_commit_idx >= state->commit_idx));
auto last_committable_idx = last_committable_index();
auto term_of_last_committable_index =
get_term_internal(last_committable_idx);

auto answer =
(r.term_of_last_committable_idx > term_of_last_committable_index) ||
((r.term_of_last_committable_idx == term_of_last_committable_index) &&
(r.last_committable_idx >= last_committable_idx));

if (answer)
{
Expand Down Expand Up @@ -1199,12 +1212,12 @@ namespace aft

void become_leader()
{
// Discard any un-committed updates we may hold,
// Discard any un-committable updates we may hold,
// since we have no signature for them. Except at startup,
// where we do not want to roll back the genesis transaction.
if (state->commit_idx)
{
rollback(state->commit_idx);
rollback(last_committable_index());
}
else
{
Expand Down Expand Up @@ -1252,9 +1265,7 @@ namespace aft
voted_for = NoNode;
votes_for_me.clear();

// Rollback unreplicated commits.
rollback(state->commit_idx);
committable_indices.clear();
rollback(last_committable_index());

LOG_INFO_FMT(
"Becoming follower {}: {}", state->my_node_id, state->current_view);
Expand Down Expand Up @@ -1511,4 +1522,4 @@ namespace aft
}
}
};
}
}
8 changes: 2 additions & 6 deletions src/consensus/aft/raft_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,8 @@ namespace aft
struct RequestVote : RaftHeader
{
Term term;
// last_log_idx in vanilla raft but last_commit_idx here to preserve
// verifiability
Index last_commit_idx;
// last_log_term in vanilla raft but last_commit_term here to preserve
// verifiability
Term last_commit_term;
Index last_committable_idx;
Term term_of_last_committable_idx;
};

struct RequestVoteResponse : RaftHeader
Expand Down
4 changes: 2 additions & 2 deletions src/consensus/aft/test/driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ class RaftDriver
aft::NodeId node_id, aft::NodeId tgt_node_id, aft::RequestVote rv)
{
std::ostringstream s;
s << "request_vote t: " << rv.term << ", lli: " << rv.last_commit_idx
<< ", llt: " << rv.last_commit_term;
s << "request_vote t: " << rv.term << ", lci: " << rv.last_committable_idx
<< ", tolci: " << rv.term_of_last_committable_idx;
log(node_id, tgt_node_id, s.str());
}

Expand Down
95 changes: 46 additions & 49 deletions src/consensus/aft/test/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,9 @@ DOCTEST_TEST_CASE(
DOCTEST_REQUIRE(get<0>(rv) == node_id1);
auto rvc = get<1>(rv);
DOCTEST_REQUIRE(rvc.term == 1);
DOCTEST_REQUIRE(rvc.last_commit_idx == 0);
DOCTEST_REQUIRE(rvc.last_commit_term == aft::ViewHistory::InvalidView);
DOCTEST_REQUIRE(rvc.last_committable_idx == 0);
DOCTEST_REQUIRE(
rvc.term_of_last_committable_idx == aft::ViewHistory::InvalidView);

r1.recv_message(reinterpret_cast<uint8_t*>(&rvc), sizeof(rvc));

Expand All @@ -207,8 +208,9 @@ DOCTEST_TEST_CASE(
DOCTEST_REQUIRE(get<0>(rv) == node_id2);
rvc = get<1>(rv);
DOCTEST_REQUIRE(rvc.term == 1);
DOCTEST_REQUIRE(rvc.last_commit_idx == 0);
DOCTEST_REQUIRE(rvc.last_commit_term == aft::ViewHistory::InvalidView);
DOCTEST_REQUIRE(rvc.last_committable_idx == 0);
DOCTEST_REQUIRE(
rvc.term_of_last_committable_idx == aft::ViewHistory::InvalidView);

r2.recv_message(reinterpret_cast<uint8_t*>(&rvc), sizeof(rvc));

Expand Down Expand Up @@ -1042,12 +1044,9 @@ DOCTEST_TEST_CASE("Exceed append entries limit")
DOCTEST_REQUIRE(r2.ledger->ledger.size() == individual_entries);
}

// Reproduces issue described here: https://github.com/microsoft/CCF/issues/521
// Once this is fixed test will need to be modified since right now it
// DOCTEST_CHECKs that the issue stands
DOCTEST_TEST_CASE(
"Primary gets invalidated if it compacts right before a term change that it "
"doesn't participate in")
"Primary is isolated after advancing commit index but before secondaries are "
"notified")
{
auto kv_store0 = std::make_shared<StoreSig>(0);
auto kv_store1 = std::make_shared<StoreSig>(1);
Expand Down Expand Up @@ -1279,8 +1278,8 @@ DOCTEST_TEST_CASE(
((aft::ChannelStubProxy*)r1.channels.get())->sent_request_vote,
[](const auto& msg) {
DOCTEST_REQUIRE(msg.term == 2);
DOCTEST_REQUIRE(msg.last_commit_idx == 1);
DOCTEST_REQUIRE(msg.last_commit_term == 1);
DOCTEST_REQUIRE(msg.term_of_last_committable_idx == 1);
DOCTEST_REQUIRE(msg.last_committable_idx == 2);
jumaffre marked this conversation as resolved.
Show resolved Hide resolved
}));

DOCTEST_INFO("Node 2 votes for Node 1, Node 0 is suspended");
Expand All @@ -1305,9 +1304,9 @@ DOCTEST_TEST_CASE(
nodes,
((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries,
[](const auto& msg) {
DOCTEST_REQUIRE(msg.idx == 1);
DOCTEST_REQUIRE(msg.idx == 2);
DOCTEST_REQUIRE(msg.term == 2);
DOCTEST_REQUIRE(msg.prev_idx == 1);
DOCTEST_REQUIRE(msg.prev_idx == 2);
DOCTEST_REQUIRE(msg.prev_term == 1);
DOCTEST_REQUIRE(msg.leader_commit_idx == 1);
}));
Expand All @@ -1323,12 +1322,22 @@ DOCTEST_TEST_CASE(
->sent_append_entries_response));
}

DOCTEST_INFO(
"Node 1 and Node 2 proceed to compact at idx 2, where Node 0 has "
"compacted for a previous term");
DOCTEST_CHECK(r0.get_term() == 1);
DOCTEST_CHECK(r0.get_commit_idx() == 2);
DOCTEST_CHECK(r0.get_last_idx() == 2);

DOCTEST_CHECK(r1.get_term() == 2);
DOCTEST_CHECK(r1.get_commit_idx() == 1);
DOCTEST_CHECK(r1.get_last_idx() == 2);

DOCTEST_CHECK(r2.get_term() == 2);
DOCTEST_CHECK(r2.get_commit_idx() == 1);
DOCTEST_CHECK(r2.get_last_idx() == 2);

DOCTEST_INFO("Node 1 resumes replication");
{
DOCTEST_REQUIRE(r1.replicate(kv::BatchVector{{2, second_entry, true}}, 2));
DOCTEST_REQUIRE(r1.ledger->ledger.size() == 2);
DOCTEST_REQUIRE(r1.replicate(kv::BatchVector{{3, third_entry, true}}, 2));
DOCTEST_REQUIRE(r1.ledger->ledger.size() == 3);
r1.periodic(ms(10));
DOCTEST_REQUIRE(
((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries.size() ==
Expand All @@ -1346,20 +1355,23 @@ DOCTEST_TEST_CASE(

DOCTEST_REQUIRE(
1 ==
dispatch_all(
dispatch_all_and_DOCTEST_CHECK(
nodes,
((aft::ChannelStubProxy*)r2.channels.get())
->sent_append_entries_response));
->sent_append_entries_response,
[](const auto& msg) { DOCTEST_REQUIRE(msg.success); }));

// Node 0 will not respond here since it received an append entries it can
// not process [prev_idex (1) < commit_idx (2)]
DOCTEST_REQUIRE(
((aft::ChannelStubProxy*)r0.channels.get())
->sent_append_entries_response.size() == 0);
1 ==
dispatch_all_and_DOCTEST_CHECK(
nodes,
((aft::ChannelStubProxy*)r0.channels.get())
->sent_append_entries_response,
[](const auto& msg) { DOCTEST_REQUIRE(msg.success); }));

DOCTEST_INFO("Another entry from Node 1 so that Node 2 can also compact");
DOCTEST_REQUIRE(r1.replicate(kv::BatchVector{{3, third_entry, true}}, 2));
DOCTEST_REQUIRE(r1.ledger->ledger.size() == 3);
DOCTEST_REQUIRE(r1.replicate(kv::BatchVector{{4, third_entry, true}}, 2));
DOCTEST_REQUIRE(r1.ledger->ledger.size() == 4);
r1.periodic(ms(10));
DOCTEST_REQUIRE(
((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries.size() ==
Expand All @@ -1375,31 +1387,16 @@ DOCTEST_TEST_CASE(
((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries.size() ==
0);

// Node 0 will now have an ae response which will return false because
// its log for index 2 has the wrong term (ours: 1, theirs: 2)
DOCTEST_REQUIRE(
((aft::ChannelStubProxy*)r0.channels.get())
->sent_append_entries_response.size() == 1);
DOCTEST_REQUIRE(
1 ==
dispatch_all_and_DOCTEST_CHECK(
nodes,
((aft::ChannelStubProxy*)r0.channels.get())
->sent_append_entries_response,
[](const auto& msg) {
DOCTEST_REQUIRE(msg.last_log_idx == 2);
DOCTEST_REQUIRE(!msg.success);
}));

DOCTEST_REQUIRE(r1.ledger->ledger.size() == 3);
DOCTEST_REQUIRE(r2.ledger->ledger.size() == 3);
DOCTEST_CHECK(r0.get_term() == 2);
DOCTEST_CHECK(r0.get_commit_idx() == 3);
DOCTEST_CHECK(r0.get_last_idx() == 4);

DOCTEST_CHECK(r1.get_term() == 2);
DOCTEST_CHECK(r1.get_commit_idx() == 2);
DOCTEST_CHECK(r1.get_last_idx() == 3);
DOCTEST_CHECK(r1.get_commit_idx() == 3);
DOCTEST_CHECK(r1.get_last_idx() == 4);

DOCTEST_CHECK(r2.get_term() == 2);
DOCTEST_CHECK(r2.get_commit_idx() == 2);
DOCTEST_CHECK(r2.get_last_idx() == 3);
DOCTEST_CHECK(r2.get_commit_idx() == 3);
DOCTEST_CHECK(r2.get_last_idx() == 4);
}
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Axed in favour of the end to end committable.py for now. The existing raft test scaffolding doesn't have a way to accurately replicate signatures at the moment.

2 changes: 1 addition & 1 deletion src/node/rpc/member_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ namespace ccf
};
make_endpoint("read", HTTP_POST, json_adapter(read))
// This can be executed locally, but can't currently take ReadOnlyTx due
// to restristions in our lua wrappers
// to restrictions in our lua wrappers
.set_forwarding_required(ForwardingRequired::Sometimes)
.set_auto_schema<KVRead>()
.install();
Expand Down
25 changes: 25 additions & 0 deletions src/node/rpc/node_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,31 @@ namespace ccf
make_read_only_endpoint("primary", HTTP_HEAD, is_primary)
.set_forwarding_required(ForwardingRequired::Never)
.install();

auto consensus_config = [this](CommandEndpointContext& args) {
// Query node for configurations, separate current from pending
if (consensus != nullptr)
{
auto cfg = consensus->get_latest_configuration();
achamayou marked this conversation as resolved.
Show resolved Hide resolved
nlohmann::json c;
for (auto& [nid, ninfo] : cfg)
{
nlohmann::json n;
n["address"] = fmt::format("{}:{}", ninfo.hostname, ninfo.port);
c[fmt::format("{}", nid)] = n;
}
args.rpc_ctx->set_response_body(c.dump());
}
else
{
args.rpc_ctx->set_response_status(HTTP_STATUS_NOT_FOUND);
achamayou marked this conversation as resolved.
Show resolved Hide resolved
args.rpc_ctx->set_response_body("No configured consensus");
}
};

make_command_endpoint("config", HTTP_GET, consensus_config)
.set_forwarding_required(ForwardingRequired::Never)
.install();
}
};

Expand Down
2 changes: 1 addition & 1 deletion tests/infra/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ def find_primary_and_any_backup(self, timeout=3):
backup = random.choice(backups)
return primary, backup

def wait_for_all_nodes_to_catch_up(self, primary, timeout=3):
def wait_for_all_nodes_to_catch_up(self, primary, timeout=10):
"""
Wait for all nodes to have joined the network and globally replicated
all transactions globally executed on the primary (including transactions
Expand Down
Loading