From e6cf45411bdf9c32cb2ea2b1cf5cb56b5acf0aa2 Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Mon, 21 Sep 2020 10:08:47 +0100 Subject: [PATCH 01/25] Do not truncate committable entries on election --- src/consensus/aft/raft.h | 22 ++++++++++++++++------ src/consensus/aft/raft_types.h | 5 +---- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/src/consensus/aft/raft.h b/src/consensus/aft/raft.h index 816ea36c36c1..18f86e5818aa 100644 --- a/src/consensus/aft/raft.h +++ b/src/consensus/aft/raft.h @@ -182,6 +182,12 @@ namespace aft return replica_state == Follower; } + Index last_committable_index() const + { + return committable_indices.empty() ? state->commit_idx : + committable_indices.back(); + } + void enable_all_domains() { // When receiving append entries as a follower, all security domains will @@ -914,10 +920,14 @@ namespace aft { LOG_INFO_FMT("Send request vote from {} to {}", state->my_node_id, to); + auto last_committable_idx = last_committable_index(); + CCF_ASSERT(last_committable_idx >= state->commit_idx, "lci < ci"); + RequestVote rv = {{raft_request_vote, state->my_node_id}, state->current_view, state->commit_idx, - get_term_internal(state->commit_idx)}; + get_term_internal(state->commit_idx), + last_committable_idx}; channels->send_authenticated(ccf::NodeMsgType::consensus_msg, to, rv); } @@ -989,7 +999,8 @@ namespace aft auto answer = (r.last_commit_term > last_commit_term) || ((r.last_commit_term == last_commit_term) && - (r.last_commit_idx >= state->commit_idx)); + (r.last_commit_idx >= state->commit_idx) && + (r.last_committable_idx >= last_committable_index())); if (answer) { @@ -1126,12 +1137,12 @@ namespace aft void become_leader() { - // Discard any un-committed updates we may hold, + // Discard any un-committable updates we may hold, // since we have no signature for them. Except at startup, // where we do not want to roll back the genesis transaction. if (state->commit_idx) { - rollback(state->commit_idx); + rollback(last_committable_index()); } else { @@ -1180,8 +1191,7 @@ namespace aft votes_for_me.clear(); // Rollback unreplicated commits. - rollback(state->commit_idx); - committable_indices.clear(); + rollback(last_committable_index()); LOG_INFO_FMT( "Becoming follower {}: {}", state->my_node_id, state->current_view); diff --git a/src/consensus/aft/raft_types.h b/src/consensus/aft/raft_types.h index 133de25a4487..94694d26db1c 100644 --- a/src/consensus/aft/raft_types.h +++ b/src/consensus/aft/raft_types.h @@ -144,12 +144,9 @@ namespace aft struct RequestVote : RaftHeader { Term term; - // last_log_idx in vanilla raft but last_commit_idx here to preserve - // verifiability Index last_commit_idx; - // last_log_term in vanilla raft but last_commit_term here to preserve - // verifiability Term last_commit_term; + Index last_committable_idx; }; struct RequestVoteResponse : RaftHeader From c4458cfbaf9d2b442d2cb106cda1607db2e716f8 Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Mon, 21 Sep 2020 17:17:35 +0100 Subject: [PATCH 02/25] Fix test --- src/consensus/aft/test/main.cpp | 68 ++++++++++----------------------- 1 file changed, 20 insertions(+), 48 deletions(-) diff --git a/src/consensus/aft/test/main.cpp b/src/consensus/aft/test/main.cpp index 236a38c3c393..3305e24f22c4 100644 --- a/src/consensus/aft/test/main.cpp +++ b/src/consensus/aft/test/main.cpp @@ -1042,12 +1042,9 @@ DOCTEST_TEST_CASE("Exceed append entries limit") DOCTEST_REQUIRE(r2.ledger->ledger.size() == individual_entries); } -// Reproduces issue described here: https://github.com/microsoft/CCF/issues/521 -// Once this is fixed test will need to be modified since right now it -// DOCTEST_CHECKs that the issue stands DOCTEST_TEST_CASE( - "Primary gets invalidated if it compacts right before a term change that it " - "doesn't participate in") + "Primary is isolated after advancing commit index but before secondaries are " + "notified") { auto kv_store0 = std::make_shared(0); auto kv_store1 = std::make_shared(1); @@ -1281,6 +1278,7 @@ DOCTEST_TEST_CASE( DOCTEST_REQUIRE(msg.term == 2); DOCTEST_REQUIRE(msg.last_commit_idx == 1); DOCTEST_REQUIRE(msg.last_commit_term == 1); + DOCTEST_REQUIRE(msg.last_committable_idx == 2); })); DOCTEST_INFO("Node 2 votes for Node 1, Node 0 is suspended"); @@ -1305,9 +1303,9 @@ DOCTEST_TEST_CASE( nodes, ((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries, [](const auto& msg) { - DOCTEST_REQUIRE(msg.idx == 1); + DOCTEST_REQUIRE(msg.idx == 2); DOCTEST_REQUIRE(msg.term == 2); - DOCTEST_REQUIRE(msg.prev_idx == 1); + DOCTEST_REQUIRE(msg.prev_idx == 2); DOCTEST_REQUIRE(msg.prev_term == 1); DOCTEST_REQUIRE(msg.leader_commit_idx == 1); })); @@ -1323,12 +1321,10 @@ DOCTEST_TEST_CASE( ->sent_append_entries_response)); } - DOCTEST_INFO( - "Node 1 and Node 2 proceed to compact at idx 2, where Node 0 has " - "compacted for a previous term"); + DOCTEST_INFO("Node 1 resumes replication"); { - DOCTEST_REQUIRE(r1.replicate(kv::BatchVector{{2, second_entry, true}}, 2)); - DOCTEST_REQUIRE(r1.ledger->ledger.size() == 2); + DOCTEST_REQUIRE(r1.replicate(kv::BatchVector{{3, third_entry, true}}, 2)); + DOCTEST_REQUIRE(r1.ledger->ledger.size() == 3); r1.periodic(ms(10)); DOCTEST_REQUIRE( ((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries.size() == @@ -1346,20 +1342,23 @@ DOCTEST_TEST_CASE( DOCTEST_REQUIRE( 1 == - dispatch_all( + dispatch_all_and_DOCTEST_CHECK( nodes, ((aft::ChannelStubProxy*)r2.channels.get()) - ->sent_append_entries_response)); + ->sent_append_entries_response, + [](const auto& msg) { DOCTEST_REQUIRE(msg.success); })); - // Node 0 will not respond here since it received an append entries it can - // not process [prev_idex (1) < commit_idx (2)] DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r0.channels.get()) - ->sent_append_entries_response.size() == 0); + 1 == + dispatch_all_and_DOCTEST_CHECK( + nodes, + ((aft::ChannelStubProxy*)r0.channels.get()) + ->sent_append_entries_response, + [](const auto& msg) { DOCTEST_REQUIRE(msg.success); })); DOCTEST_INFO("Another entry from Node 1 so that Node 2 can also compact"); - DOCTEST_REQUIRE(r1.replicate(kv::BatchVector{{3, third_entry, true}}, 2)); - DOCTEST_REQUIRE(r1.ledger->ledger.size() == 3); + DOCTEST_REQUIRE(r1.replicate(kv::BatchVector{{4, third_entry, true}}, 2)); + DOCTEST_REQUIRE(r1.ledger->ledger.size() == 4); r1.periodic(ms(10)); DOCTEST_REQUIRE( ((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries.size() == @@ -1374,32 +1373,5 @@ DOCTEST_TEST_CASE( DOCTEST_REQUIRE( ((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries.size() == 0); - - // Node 0 will now have an ae response which will return false because - // its log for index 2 has the wrong term (ours: 1, theirs: 2) - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r0.channels.get()) - ->sent_append_entries_response.size() == 1); - DOCTEST_REQUIRE( - 1 == - dispatch_all_and_DOCTEST_CHECK( - nodes, - ((aft::ChannelStubProxy*)r0.channels.get()) - ->sent_append_entries_response, - [](const auto& msg) { - DOCTEST_REQUIRE(msg.last_log_idx == 2); - DOCTEST_REQUIRE(!msg.success); - })); - - DOCTEST_REQUIRE(r1.ledger->ledger.size() == 3); - DOCTEST_REQUIRE(r2.ledger->ledger.size() == 3); - - DOCTEST_CHECK(r1.get_term() == 2); - DOCTEST_CHECK(r1.get_commit_idx() == 2); - DOCTEST_CHECK(r1.get_last_idx() == 3); - - DOCTEST_CHECK(r2.get_term() == 2); - DOCTEST_CHECK(r2.get_commit_idx() == 2); - DOCTEST_CHECK(r2.get_last_idx() == 3); } -} +} \ No newline at end of file From 5b0cb0c7332ed8f6d3c2d81ca1c4bfd23a5e476b Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Tue, 22 Sep 2020 14:42:07 +0100 Subject: [PATCH 03/25] Fix test --- src/consensus/aft/test/main.cpp | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/consensus/aft/test/main.cpp b/src/consensus/aft/test/main.cpp index 3305e24f22c4..0bf67ec1cd22 100644 --- a/src/consensus/aft/test/main.cpp +++ b/src/consensus/aft/test/main.cpp @@ -1321,6 +1321,18 @@ DOCTEST_TEST_CASE( ->sent_append_entries_response)); } + DOCTEST_CHECK(r0.get_term() == 1); + DOCTEST_CHECK(r0.get_commit_idx() == 2); + DOCTEST_CHECK(r0.get_last_idx() == 2); + + DOCTEST_CHECK(r1.get_term() == 2); + DOCTEST_CHECK(r1.get_commit_idx() == 1); + DOCTEST_CHECK(r1.get_last_idx() == 2); + + DOCTEST_CHECK(r2.get_term() == 2); + DOCTEST_CHECK(r2.get_commit_idx() == 1); + DOCTEST_CHECK(r2.get_last_idx() == 2); + DOCTEST_INFO("Node 1 resumes replication"); { DOCTEST_REQUIRE(r1.replicate(kv::BatchVector{{3, third_entry, true}}, 2)); @@ -1373,5 +1385,17 @@ DOCTEST_TEST_CASE( DOCTEST_REQUIRE( ((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries.size() == 0); + + DOCTEST_CHECK(r0.get_term() == 2); + DOCTEST_CHECK(r0.get_commit_idx() == 3); + DOCTEST_CHECK(r0.get_last_idx() == 4); + + DOCTEST_CHECK(r1.get_term() == 2); + DOCTEST_CHECK(r1.get_commit_idx() == 3); + DOCTEST_CHECK(r1.get_last_idx() == 4); + + DOCTEST_CHECK(r2.get_term() == 2); + DOCTEST_CHECK(r2.get_commit_idx() == 3); + DOCTEST_CHECK(r2.get_last_idx() == 4); } } \ No newline at end of file From 6eaa15f57c7d4e89ad87ef0aca32dcf67c296c09 Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Tue, 22 Sep 2020 16:29:27 +0100 Subject: [PATCH 04/25] Fix test --- src/node/rpc/node_frontend.h | 24 ++++++++++++++++++++++++ tests/reconfiguration.py | 25 +++++++++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/src/node/rpc/node_frontend.h b/src/node/rpc/node_frontend.h index b9ce5f0cebd1..f58a5467f2cd 100644 --- a/src/node/rpc/node_frontend.h +++ b/src/node/rpc/node_frontend.h @@ -354,6 +354,30 @@ namespace ccf make_read_only_endpoint("primary", HTTP_HEAD, is_primary) .set_forwarding_required(ForwardingRequired::Never) .install(); + + auto consensus_config = [this](CommandEndpointContext& args) { + // Query node for configurations, separate current from pending + if (consensus != nullptr) + { + auto cfg = consensus->get_latest_configuration(); + nlohmann::json c; + for (auto& [nid, ninfo] : cfg) + { + nlohmann::json n; + n["address"] = fmt::format("{}:{}", ninfo.hostname, ninfo.port); + c[fmt::format("{}", nid)] = n; + } + args.rpc_ctx->set_response_body(c.dump()); + } + else + { + args.rpc_ctx->set_response_status(HTTP_STATUS_NOT_FOUND); + } + }; + + make_command_endpoint("config", HTTP_GET, consensus_config) + .set_forwarding_required(ForwardingRequired::Never) + .install(); } }; diff --git a/tests/reconfiguration.py b/tests/reconfiguration.py index 07d0b62ccf10..587e36b31c3a 100644 --- a/tests/reconfiguration.py +++ b/tests/reconfiguration.py @@ -9,6 +9,26 @@ from loguru import logger as LOG +def node_configs(network): + configs = {} + for node in network.nodes: + try: + with node.client() as nc: + configs[node.node_id] = nc.get("/node/config").body.json() + except Exception: + pass + return configs + + +def count_nodes(configs): + nodes = set(str(k) for k in configs.keys()) + for node_id, node_config in configs.items(): + assert nodes == set( + node_config.keys() + ), f"{nodes} {set(node_config.keys())} {node_id}" + return len(nodes) + + def check_can_progress(node, timeout=3): with node.client() as c: r = c.get("/node/commit") @@ -102,6 +122,8 @@ def test_retire_backup(network, args): @reqs.description("Retiring the primary") @reqs.can_kill_n_nodes(1) def test_retire_primary(network, args): + pre_count = count_nodes(node_configs(network)) + primary, backup = network.find_primary_and_any_backup() network.consortium.retire_node(primary, primary) LOG.debug( @@ -112,6 +134,9 @@ def test_retire_primary(network, args): assert new_primary.node_id != primary.node_id LOG.debug(f"New primary is {new_primary.node_id} in term {new_term}") check_can_progress(backup) + network.nodes.remove(primary) + post_count = count_nodes(node_configs(network)) + assert pre_count == post_count + 1 primary.stop() return network From 1bbd8298c66ef4013989e640268fb1e2a7d0fc25 Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Tue, 22 Sep 2020 17:59:59 +0100 Subject: [PATCH 05/25] Fix test --- src/consensus/aft/raft.h | 1 - src/node/rpc/node_frontend.h | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/src/consensus/aft/raft.h b/src/consensus/aft/raft.h index bb4f6e3bb9e6..d25bfd1bbeeb 100644 --- a/src/consensus/aft/raft.h +++ b/src/consensus/aft/raft.h @@ -1263,7 +1263,6 @@ namespace aft voted_for = NoNode; votes_for_me.clear(); - // Rollback unreplicated commits. rollback(last_committable_index()); LOG_INFO_FMT( diff --git a/src/node/rpc/node_frontend.h b/src/node/rpc/node_frontend.h index f58a5467f2cd..41fae18f9d18 100644 --- a/src/node/rpc/node_frontend.h +++ b/src/node/rpc/node_frontend.h @@ -372,6 +372,7 @@ namespace ccf else { args.rpc_ctx->set_response_status(HTTP_STATUS_NOT_FOUND); + args.rpc_ctx->set_response_body("No configured consensus"); } }; From 5eb87d93dbd6562d76af6f46daa52fa248f8b867 Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Wed, 23 Sep 2020 11:23:34 +0100 Subject: [PATCH 06/25] Use last committable index only in elections --- src/consensus/aft/raft.h | 18 ++++++++++-------- src/consensus/aft/raft_types.h | 3 +-- src/consensus/aft/test/driver.h | 4 ++-- src/consensus/aft/test/main.cpp | 13 +++++++------ 4 files changed, 20 insertions(+), 18 deletions(-) diff --git a/src/consensus/aft/raft.h b/src/consensus/aft/raft.h index d25bfd1bbeeb..62d9519e493d 100644 --- a/src/consensus/aft/raft.h +++ b/src/consensus/aft/raft.h @@ -998,9 +998,8 @@ namespace aft RequestVote rv = {{raft_request_vote, state->my_node_id}, state->current_view, - state->commit_idx, - get_term_internal(state->commit_idx), - last_committable_idx}; + last_committable_idx, + get_term_internal(state->commit_idx)}; channels->send_authenticated(ccf::NodeMsgType::consensus_msg, to, rv); } @@ -1068,12 +1067,15 @@ namespace aft } // If the candidate's log is at least as up-to-date as ours, vote yes - auto last_commit_term = get_term_internal(state->commit_idx); - auto answer = (r.last_commit_term > last_commit_term) || - ((r.last_commit_term == last_commit_term) && - (r.last_commit_idx >= state->commit_idx) && - (r.last_committable_idx >= last_committable_index())); + auto last_committable_idx = last_committable_index(); + auto term_of_last_committable_index = + get_term_internal(last_committable_idx); + + auto answer = + (r.term_of_last_committable_idx > term_of_last_committable_index) || + ((r.term_of_last_committable_idx == term_of_last_committable_index) && + (r.last_committable_idx >= last_committable_idx)); if (answer) { diff --git a/src/consensus/aft/raft_types.h b/src/consensus/aft/raft_types.h index 13dd29a0340c..8b4db6fdc832 100644 --- a/src/consensus/aft/raft_types.h +++ b/src/consensus/aft/raft_types.h @@ -169,9 +169,8 @@ namespace aft struct RequestVote : RaftHeader { Term term; - Index last_commit_idx; - Term last_commit_term; Index last_committable_idx; + Term term_of_last_committable_idx; }; struct RequestVoteResponse : RaftHeader diff --git a/src/consensus/aft/test/driver.h b/src/consensus/aft/test/driver.h index a020214be0ef..1a20747db1d9 100644 --- a/src/consensus/aft/test/driver.h +++ b/src/consensus/aft/test/driver.h @@ -88,8 +88,8 @@ class RaftDriver aft::NodeId node_id, aft::NodeId tgt_node_id, aft::RequestVote rv) { std::ostringstream s; - s << "request_vote t: " << rv.term << ", lli: " << rv.last_commit_idx - << ", llt: " << rv.last_commit_term; + s << "request_vote t: " << rv.term << ", lci: " << rv.last_committable_idx + << ", tolci: " << rv.term_of_last_committable_idx; log(node_id, tgt_node_id, s.str()); } diff --git a/src/consensus/aft/test/main.cpp b/src/consensus/aft/test/main.cpp index 0bf67ec1cd22..1b6cbc8a1162 100644 --- a/src/consensus/aft/test/main.cpp +++ b/src/consensus/aft/test/main.cpp @@ -195,8 +195,9 @@ DOCTEST_TEST_CASE( DOCTEST_REQUIRE(get<0>(rv) == node_id1); auto rvc = get<1>(rv); DOCTEST_REQUIRE(rvc.term == 1); - DOCTEST_REQUIRE(rvc.last_commit_idx == 0); - DOCTEST_REQUIRE(rvc.last_commit_term == aft::ViewHistory::InvalidView); + DOCTEST_REQUIRE(rvc.last_committable_idx == 0); + DOCTEST_REQUIRE( + rvc.term_of_last_committable_idx == aft::ViewHistory::InvalidView); r1.recv_message(reinterpret_cast(&rvc), sizeof(rvc)); @@ -207,8 +208,9 @@ DOCTEST_TEST_CASE( DOCTEST_REQUIRE(get<0>(rv) == node_id2); rvc = get<1>(rv); DOCTEST_REQUIRE(rvc.term == 1); - DOCTEST_REQUIRE(rvc.last_commit_idx == 0); - DOCTEST_REQUIRE(rvc.last_commit_term == aft::ViewHistory::InvalidView); + DOCTEST_REQUIRE(rvc.last_committable_idx == 0); + DOCTEST_REQUIRE( + rvc.term_of_last_committable_idx == aft::ViewHistory::InvalidView); r2.recv_message(reinterpret_cast(&rvc), sizeof(rvc)); @@ -1276,8 +1278,7 @@ DOCTEST_TEST_CASE( ((aft::ChannelStubProxy*)r1.channels.get())->sent_request_vote, [](const auto& msg) { DOCTEST_REQUIRE(msg.term == 2); - DOCTEST_REQUIRE(msg.last_commit_idx == 1); - DOCTEST_REQUIRE(msg.last_commit_term == 1); + DOCTEST_REQUIRE(msg.term_of_last_committable_idx == 1); DOCTEST_REQUIRE(msg.last_committable_idx == 2); })); From c51ffa0edbd10c9477861dc49dca62391d115305 Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Wed, 23 Sep 2020 14:58:45 +0100 Subject: [PATCH 07/25] Update src/consensus/aft/raft.h Co-authored-by: Eddy Ashton --- src/consensus/aft/raft.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/consensus/aft/raft.h b/src/consensus/aft/raft.h index 62d9519e493d..204867289682 100644 --- a/src/consensus/aft/raft.h +++ b/src/consensus/aft/raft.h @@ -999,7 +999,7 @@ namespace aft RequestVote rv = {{raft_request_vote, state->my_node_id}, state->current_view, last_committable_idx, - get_term_internal(state->commit_idx)}; + get_term_internal(last_committable_idx)}; channels->send_authenticated(ccf::NodeMsgType::consensus_msg, to, rv); } @@ -1522,4 +1522,4 @@ namespace aft } } }; -} \ No newline at end of file +} From 50d3c8178ca39d7fc9a45051970b3673429f5430 Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Thu, 24 Sep 2020 17:09:25 +0100 Subject: [PATCH 08/25] rotation test --- CMakeLists.txt | 7 +++++ src/node/rpc/member_frontend.h | 2 +- tests/reconfiguration.py | 7 +---- tests/rotation.py | 56 ++++++++++++++++++++++++++++++++++ 4 files changed, 65 insertions(+), 7 deletions(-) create mode 100644 tests/rotation.py diff --git a/CMakeLists.txt b/CMakeLists.txt index c01d00289f36..812c1897ef75 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -589,6 +589,13 @@ if(BUILD_TESTS) 4000 ) + add_e2e_test( + NAME rotation_test + PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/rotation.py + CONSENSUS cft + ADDITIONAL_ARGS --raft-election-timeout 4000 + ) + add_e2e_test( NAME lua_e2e_batched PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/e2e_batched.py diff --git a/src/node/rpc/member_frontend.h b/src/node/rpc/member_frontend.h index e9f414ba4366..5bc9ebe137af 100644 --- a/src/node/rpc/member_frontend.h +++ b/src/node/rpc/member_frontend.h @@ -793,7 +793,7 @@ namespace ccf }; make_endpoint("read", HTTP_POST, json_adapter(read)) // This can be executed locally, but can't currently take ReadOnlyTx due - // to restristions in our lua wrappers + // to restrictions in our lua wrappers .set_forwarding_required(ForwardingRequired::Sometimes) .set_auto_schema() .install(); diff --git a/tests/reconfiguration.py b/tests/reconfiguration.py index 587e36b31c3a..ea9aec2d2f72 100644 --- a/tests/reconfiguration.py +++ b/tests/reconfiguration.py @@ -126,12 +126,7 @@ def test_retire_primary(network, args): primary, backup = network.find_primary_and_any_backup() network.consortium.retire_node(primary, primary) - LOG.debug( - f"Waiting {network.election_duration}s for a new primary to be elected..." - ) - time.sleep(network.election_duration) - new_primary, new_term = network.find_primary() - assert new_primary.node_id != primary.node_id + new_primary, new_term = network.wait_for_new_primary(primary.node_id) LOG.debug(f"New primary is {new_primary.node_id} in term {new_term}") check_can_progress(backup) network.nodes.remove(primary) diff --git a/tests/rotation.py b/tests/rotation.py new file mode 100644 index 000000000000..54eb56008632 --- /dev/null +++ b/tests/rotation.py @@ -0,0 +1,56 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the Apache 2.0 License. +import infra.e2e_args +import infra.network +import infra.proc +import suite.test_requirements as reqs +import reconfiguration + +from loguru import logger as LOG + + +@reqs.description("Suspend and resume primary") +@reqs.at_least_n_nodes(3) +def test_suspend_primary(network, args): + primary, _ = network.find_primary() + primary.suspend() + new_primary, new_term = network.wait_for_new_primary(primary.node_id) + LOG.debug(f"New primary is {new_primary.node_id} in term {new_term}") + reconfiguration.check_can_progress(new_primary) + primary.resume() + reconfiguration.check_can_progress(new_primary) + return network + + +def run(args): + hosts = ["localhost", "localhost"] + + with infra.network.network( + hosts, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb + ) as network: + network.start_and_join(args) + + # Replace primary repeatedly and check the network still operates + for _ in range(5): + reconfiguration.test_add_node(network, args) + reconfiguration.test_retire_primary(network, args) + + reconfiguration.test_add_node(network, args) + # Suspend primary repeatedly and check the network still operates + for _ in range(5): + test_suspend_primary(network, args) + + +if __name__ == "__main__": + + def add(parser): + parser.add_argument( + "-p", + "--package", + help="The enclave package to load (e.g., libsimplebank)", + default="liblogging", + ) + + args = infra.e2e_args.cli_args(add) + args.package = args.app_script and "liblua_generic" or "liblogging" + run(args) From e52bb00900b48d2b8eab40523db5007c3cf00d58 Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Thu, 24 Sep 2020 17:24:07 +0100 Subject: [PATCH 09/25] rotation test --- CMakeLists.txt | 6 +++--- tests/infra/network.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 812c1897ef75..f46dbe8b1311 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -529,7 +529,7 @@ if(BUILD_TESTS) NAME recovery_test_suite PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/e2e_suite.py CONSENSUS cft - LABEL suite + LABEL long_test ADDITIONAL_ARGS --test-duration 150 @@ -544,7 +544,7 @@ if(BUILD_TESTS) NAME reconfiguration_test_suite PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/e2e_suite.py CONSENSUS cft - LABEL suite + LABEL long_test ADDITIONAL_ARGS --test-duration 150 @@ -576,7 +576,7 @@ if(BUILD_TESTS) NAME full_test_suite PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/e2e_suite.py CONSENSUS cft - LABEL suite + LABEL long_test ADDITIONAL_ARGS --ledger-recovery-timeout 20 diff --git a/tests/infra/network.py b/tests/infra/network.py index 750dd8383a0a..5c0aa1029286 100644 --- a/tests/infra/network.py +++ b/tests/infra/network.py @@ -608,7 +608,7 @@ def find_primary_and_any_backup(self, timeout=3): backup = random.choice(backups) return primary, backup - def wait_for_all_nodes_to_catch_up(self, primary, timeout=3): + def wait_for_all_nodes_to_catch_up(self, primary, timeout=10): """ Wait for all nodes to have joined the network and globally replicated all transactions globally executed on the primary (including transactions From 67545d11ea6b10a07ec5e9db2c46c215df7ec6f7 Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Thu, 24 Sep 2020 18:42:39 +0100 Subject: [PATCH 10/25] No such thing as simplebank anymore --- tests/code_update.py | 2 +- tests/reconfiguration.py | 2 +- tests/rotation.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/code_update.py b/tests/code_update.py index 07897925aaf2..a0757399eaa7 100644 --- a/tests/code_update.py +++ b/tests/code_update.py @@ -152,7 +152,7 @@ def add(parser): parser.add_argument( "-p", "--package", - help="The enclave package to load (e.g., libsimplebank)", + help="The enclave package to load (e.g., liblogging)", default="liblogging", ) parser.add_argument( diff --git a/tests/reconfiguration.py b/tests/reconfiguration.py index ea9aec2d2f72..e50debe64c07 100644 --- a/tests/reconfiguration.py +++ b/tests/reconfiguration.py @@ -161,7 +161,7 @@ def add(parser): parser.add_argument( "-p", "--package", - help="The enclave package to load (e.g., libsimplebank)", + help="The enclave package to load (e.g., liblogging)", default="liblogging", ) diff --git a/tests/rotation.py b/tests/rotation.py index 54eb56008632..186ba5b9923b 100644 --- a/tests/rotation.py +++ b/tests/rotation.py @@ -47,7 +47,7 @@ def add(parser): parser.add_argument( "-p", "--package", - help="The enclave package to load (e.g., libsimplebank)", + help="The enclave package to load (e.g., liblogging)", default="liblogging", ) From a946640690e0fc180a8982242e9dcfa92a332c08 Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Thu, 24 Sep 2020 18:46:30 +0100 Subject: [PATCH 11/25] Remove now unnecessary barrier in recovery --- tests/recovery.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tests/recovery.py b/tests/recovery.py index ee37d138572a..4718992fb678 100644 --- a/tests/recovery.py +++ b/tests/recovery.py @@ -56,12 +56,6 @@ def test_share_resilience(network, args): ) submitted_shares_count += 1 - # In theory, check_commit should be sufficient to guarantee that the new primary - # will know about all the recovery shares submitted so far. However, because of - # https://github.com/microsoft/CCF/issues/589, we have to wait for all nodes - # to have committed all transactions. - recovered_network.wait_for_all_nodes_to_catch_up(primary) - # Here, we kill the current primary instead of just suspending it. # However, because of https://github.com/microsoft/CCF/issues/99#issuecomment-630875387, # the new primary will most likely be the previous primary, which defies the point of this test. From d85c25efae6e523357a06cf61fddcde31670c0e9 Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Fri, 25 Sep 2020 10:52:01 +0100 Subject: [PATCH 12/25] rotation test --- python/requirements.txt | 2 +- src/consensus/aft/raft.h | 7 ++++--- tests/infra/network.py | 1 + 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/python/requirements.txt b/python/requirements.txt index 9847311cd145..b8e4c52a28fc 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -1,5 +1,5 @@ msgpack==1.0.0 -loguru==0.5.2 +loguru==0.5.3 requests==2.24.0 requests-http-signature==0.1.0 websocket-client==0.57.0 diff --git a/src/consensus/aft/raft.h b/src/consensus/aft/raft.h index 204867289682..2cc5e1e9b980 100644 --- a/src/consensus/aft/raft.h +++ b/src/consensus/aft/raft.h @@ -604,11 +604,12 @@ namespace aft } LOG_DEBUG_FMT( - "Received pt: {} pi: {} t: {} i: {}", + "Received pt: {} pi: {} t: {} i: {} toi: {}", r.prev_term, r.prev_idx, r.term, - r.idx); + r.idx, + r.term_of_idx); // Don't check that the sender node ID is valid. Accept anything that // passes the integrity check. This way, entries containing dynamic @@ -818,7 +819,7 @@ namespace aft // After entries have been deserialised, we try to commit the leader's // commit index and update our term history accordingly commit_if_possible(r.leader_commit_idx); - state->view_history.update(state->commit_idx + 1, r.term_of_idx); + state->view_history.update(last_committable_index() + 1, r.term_of_idx); send_append_entries_response(r.from_node, true); } diff --git a/tests/infra/network.py b/tests/infra/network.py index 5c0aa1029286..f16d991d9e03 100644 --- a/tests/infra/network.py +++ b/tests/infra/network.py @@ -633,6 +633,7 @@ def wait_for_all_nodes_to_catch_up(self, primary, timeout=10): caught_up_nodes = [] for node in self.get_joined_nodes(): with node.client() as c: + c.get(f"/node/commit") resp = c.get(f"/node/local_tx?view={view}&seqno={seqno}") if resp.status_code != 200: # Node may not have joined the network yet, try again From a567cb86929512f464a5a6e3ef961bd1e31ff161 Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Fri, 25 Sep 2020 15:00:59 +0100 Subject: [PATCH 13/25] rotation test --- src/consensus/aft/raft.h | 11 +++++++++-- tests/rotation.py | 12 ++++++++++-- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/src/consensus/aft/raft.h b/src/consensus/aft/raft.h index 2cc5e1e9b980..3386f6de4956 100644 --- a/src/consensus/aft/raft.h +++ b/src/consensus/aft/raft.h @@ -786,11 +786,13 @@ namespace aft case kv::DeserialiseSuccess::PASS_SIGNATURE: { LOG_DEBUG_FMT("Deserialising signature at {}", i); + auto prev_lci = last_committable_index(); committable_indices.push_back(i); if (sig_term) { - state->view_history.update(state->commit_idx + 1, sig_term); + LOG_DEBUG_FMT("Found sig_term", sig_term); + state->view_history.update(prev_lci + 1, sig_term); commit_if_possible(r.leader_commit_idx); } if (consensus_type == ConsensusType::BFT) @@ -819,7 +821,12 @@ namespace aft // After entries have been deserialised, we try to commit the leader's // commit index and update our term history accordingly commit_if_possible(r.leader_commit_idx); - state->view_history.update(last_committable_index() + 1, r.term_of_idx); + if (r.term_of_idx > r.prev_term) + { + auto lci = last_committable_index(); + CCF_ASSERT(lci >= r.prev_idx, fmt::format("lci: {} !> r.prev_idx", lci, r.prev_idx)); + state->view_history.update(lci + 1, r.term_of_idx); + } send_append_entries_response(r.from_node, true); } diff --git a/tests/rotation.py b/tests/rotation.py index 186ba5b9923b..ca42f4603b35 100644 --- a/tests/rotation.py +++ b/tests/rotation.py @@ -5,6 +5,7 @@ import infra.proc import suite.test_requirements as reqs import reconfiguration +import e2e_logging from loguru import logger as LOG @@ -30,14 +31,21 @@ def run(args): ) as network: network.start_and_join(args) + """ + reconfiguration.test_add_node(network, args) + e2e_logging.test_view_history(network, args) + reconfiguration.test_retire_primary(network, args) + e2e_logging.test_view_history(network, args) + """ # Replace primary repeatedly and check the network still operates - for _ in range(5): + for _ in range(10): reconfiguration.test_add_node(network, args) reconfiguration.test_retire_primary(network, args) + reconfiguration.test_add_node(network, args) # Suspend primary repeatedly and check the network still operates - for _ in range(5): + for _ in range(10): test_suspend_primary(network, args) From 835aac5cba63db033ef50ee1baa6a8b74d68dcaa Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Fri, 25 Sep 2020 16:21:19 +0100 Subject: [PATCH 14/25] fix --- .azure-pipelines-templates/daily-matrix.yml | 2 +- .azure-pipelines-templates/matrix.yml | 4 ++-- CMakeLists.txt | 6 +++--- src/consensus/aft/raft.h | 16 ++++++++-------- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/.azure-pipelines-templates/daily-matrix.yml b/.azure-pipelines-templates/daily-matrix.yml index 2185e939966c..5cf5d6189610 100644 --- a/.azure-pipelines-templates/daily-matrix.yml +++ b/.azure-pipelines-templates/daily-matrix.yml @@ -37,7 +37,7 @@ jobs: cmake_args: '${{ parameters.build.debug.cmake_args }} ${{ parameters.build.NoSGX.cmake_args }}' suffix: 'Instrumented' artifact_name: 'NoSGX_Instrumented' - ctest_filter: '-LE "benchmark|perf|long_test"' + ctest_filter: '-LE "benchmark|perf"' ctest_timeout: '300' - template: common.yml diff --git a/.azure-pipelines-templates/matrix.yml b/.azure-pipelines-templates/matrix.yml index 7b8d2d258dd8..1740bd065427 100644 --- a/.azure-pipelines-templates/matrix.yml +++ b/.azure-pipelines-templates/matrix.yml @@ -29,9 +29,9 @@ parameters: test: NoSGX: - ctest_args: '-LE "benchmark|perf|tlstest|long_test"' + ctest_args: '-LE "benchmark|perf|tlstest"' SGX: - ctest_args: '-LE "benchmark|perf|tlstest|long_test"' + ctest_args: '-LE "benchmark|perf|tlstest"' perf: ctest_args: '-L "benchmark|perf"' diff --git a/CMakeLists.txt b/CMakeLists.txt index f46dbe8b1311..812c1897ef75 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -529,7 +529,7 @@ if(BUILD_TESTS) NAME recovery_test_suite PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/e2e_suite.py CONSENSUS cft - LABEL long_test + LABEL suite ADDITIONAL_ARGS --test-duration 150 @@ -544,7 +544,7 @@ if(BUILD_TESTS) NAME reconfiguration_test_suite PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/e2e_suite.py CONSENSUS cft - LABEL long_test + LABEL suite ADDITIONAL_ARGS --test-duration 150 @@ -576,7 +576,7 @@ if(BUILD_TESTS) NAME full_test_suite PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/e2e_suite.py CONSENSUS cft - LABEL long_test + LABEL suite ADDITIONAL_ARGS --ledger-recovery-timeout 20 diff --git a/src/consensus/aft/raft.h b/src/consensus/aft/raft.h index 3386f6de4956..4c136a938f42 100644 --- a/src/consensus/aft/raft.h +++ b/src/consensus/aft/raft.h @@ -791,7 +791,9 @@ namespace aft if (sig_term) { - LOG_DEBUG_FMT("Found sig_term", sig_term); + // A signature for sig_term tells us that all transactions from the + // previous signature onwards (at least, if not further back) happened + // in sig_term. We reflect this in the history. state->view_history.update(prev_lci + 1, sig_term); commit_if_possible(r.leader_commit_idx); } @@ -821,12 +823,10 @@ namespace aft // After entries have been deserialised, we try to commit the leader's // commit index and update our term history accordingly commit_if_possible(r.leader_commit_idx); - if (r.term_of_idx > r.prev_term) - { - auto lci = last_committable_index(); - CCF_ASSERT(lci >= r.prev_idx, fmt::format("lci: {} !> r.prev_idx", lci, r.prev_idx)); - state->view_history.update(lci + 1, r.term_of_idx); - } + + // The term may have changed, but we may not have seen a signature yet. + auto lci = last_committable_index(); + state->view_history.update(lci + 1, r.term_of_idx); send_append_entries_response(r.from_node, true); } @@ -1074,7 +1074,7 @@ namespace aft return; } - // If the candidate's log is at least as up-to-date as ours, vote yes + // If the candidate's committable log is at least as up-to-date as ours, vote yes auto last_committable_idx = last_committable_index(); auto term_of_last_committable_index = From 6a4d3dbf71263dd01a59a6aa0fc8e6f0f018a45a Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Fri, 25 Sep 2020 17:06:11 +0100 Subject: [PATCH 15/25] fix --- src/consensus/aft/raft.h | 17 ++++++- src/consensus/aft/raft_consensus.h | 5 ++ src/kv/kv_types.h | 1 + src/kv/test/stub_consensus.h | 5 ++ src/node/history.h | 77 ++++++++++++++++-------------- tests/rotation.py | 7 --- 6 files changed, 68 insertions(+), 44 deletions(-) diff --git a/src/consensus/aft/raft.h b/src/consensus/aft/raft.h index 4c136a938f42..2c497d981663 100644 --- a/src/consensus/aft/raft.h +++ b/src/consensus/aft/raft.h @@ -74,6 +74,7 @@ namespace aft ReplicaState replica_state; std::chrono::milliseconds timeout_elapsed; + kv::Version election_index = 0; // BFT RequestsMap& pbft_requests_map; @@ -301,6 +302,19 @@ namespace aft return {get_term_internal(state->commit_idx), state->commit_idx}; } + std::optional> get_signable_commit_term_and_idx() + { + std::lock_guard guard(state->lock); + if (state->commit_idx >= election_index) + { + return std::pair{get_term_internal(state->commit_idx), state->commit_idx}; + } + else + { + return std::nullopt; + } + } + Term get_term(Index idx) { if (consensus_type == ConsensusType::BFT && is_follower()) @@ -1220,12 +1234,13 @@ namespace aft void become_leader() { + election_index = last_committable_index(); // Discard any un-committable updates we may hold, // since we have no signature for them. Except at startup, // where we do not want to roll back the genesis transaction. if (state->commit_idx) { - rollback(last_committable_index()); + rollback(election_index); } else { diff --git a/src/consensus/aft/raft_consensus.h b/src/consensus/aft/raft_consensus.h index 13d4cb68e670..c0a3471b4ad5 100644 --- a/src/consensus/aft/raft_consensus.h +++ b/src/consensus/aft/raft_consensus.h @@ -69,6 +69,11 @@ namespace aft return aft->get_commit_term_and_idx(); } + std::optional> get_signable_txid() override + { + return aft->get_signable_commit_term_and_idx(); + } + View get_view(SeqNo seqno) override { return aft->get_term(seqno); diff --git a/src/kv/kv_types.h b/src/kv/kv_types.h index 34093ba0ba65..d98dd194cdd4 100644 --- a/src/kv/kv_types.h +++ b/src/kv/kv_types.h @@ -280,6 +280,7 @@ namespace kv virtual bool replicate(const BatchVector& entries, View view) = 0; virtual std::pair get_committed_txid() = 0; + virtual std::optional> get_signable_txid() = 0; virtual View get_view(SeqNo seqno) = 0; virtual View get_view() = 0; diff --git a/src/kv/test/stub_consensus.h b/src/kv/test/stub_consensus.h index 5e3ef419d7c0..df86e03b763a 100644 --- a/src/kv/test/stub_consensus.h +++ b/src/kv/test/stub_consensus.h @@ -93,6 +93,11 @@ namespace kv return {2, 0}; } + std::optional> get_signable_txid() override + { + return get_committed_txid(); + } + SeqNo get_committed_seqno() override { return 0; diff --git a/src/node/history.h b/src/node/history.h index b08e48782744..90ecb800acf8 100644 --- a/src/node/history.h +++ b/src/node/history.h @@ -665,43 +665,48 @@ namespace ccf return; } - auto txid = store.next_txid(); - auto commit_txid = consensus->get_committed_txid(); - - LOG_DEBUG_FMT( - "Signed at {} in view: {} commit was: {}.{}", - txid.version, - txid.term, - commit_txid.first, - commit_txid.second); - - store.commit( - txid, - [txid, commit_txid, this]() { - kv::Tx sig(txid.version); - auto sig_view = sig.get_view(signatures); - crypto::Sha256Hash root = replicated_state_tree.get_root(); - - auto progress_tracker = store.get_progress_tracker(); - if (progress_tracker) - { - progress_tracker->record_primary(txid.term, txid.version, root); - } - - PrimarySignature sig_value( - id, - txid.version, - txid.term, - commit_txid.second, - commit_txid.first, - root, - kp.sign_hash(root.h.data(), root.h.size()), - replicated_state_tree.serialise()); + auto signable_txid = consensus->get_signable_txid(); - sig_view->put(0, sig_value); - return sig.commit_reserved(); - }, - true); + if (signable_txid.has_value()) + { + auto commit_txid = signable_txid.value(); + auto txid = store.next_txid(); + + LOG_DEBUG_FMT( + "Signed at {} in view: {} commit was: {}.{}", + txid.version, + txid.term, + commit_txid.first, + commit_txid.second); + + store.commit( + txid, + [txid, commit_txid, this]() { + kv::Tx sig(txid.version); + auto sig_view = sig.get_view(signatures); + crypto::Sha256Hash root = replicated_state_tree.get_root(); + + auto progress_tracker = store.get_progress_tracker(); + if (progress_tracker) + { + progress_tracker->record_primary(txid.term, txid.version, root); + } + + PrimarySignature sig_value( + id, + txid.version, + txid.term, + commit_txid.second, + commit_txid.first, + root, + kp.sign_hash(root.h.data(), root.h.size()), + replicated_state_tree.serialise()); + + sig_view->put(0, sig_value); + return sig.commit_reserved(); + }, + true); + } } std::vector get_receipt(kv::Version index) override diff --git a/tests/rotation.py b/tests/rotation.py index ca42f4603b35..707d18ac94ee 100644 --- a/tests/rotation.py +++ b/tests/rotation.py @@ -31,18 +31,11 @@ def run(args): ) as network: network.start_and_join(args) - """ - reconfiguration.test_add_node(network, args) - e2e_logging.test_view_history(network, args) - reconfiguration.test_retire_primary(network, args) - e2e_logging.test_view_history(network, args) - """ # Replace primary repeatedly and check the network still operates for _ in range(10): reconfiguration.test_add_node(network, args) reconfiguration.test_retire_primary(network, args) - reconfiguration.test_add_node(network, args) # Suspend primary repeatedly and check the network still operates for _ in range(10): From a1a5e6895c9c69f6a5fab2190c9cf24826d98fa2 Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Mon, 28 Sep 2020 21:05:54 +0100 Subject: [PATCH 16/25] wip --- src/consensus/aft/raft.h | 21 +++++++++++++------- src/consensus/aft/test/main.cpp | 34 +++++++++++++++++++++++++-------- src/node/history.h | 2 ++ 3 files changed, 42 insertions(+), 15 deletions(-) diff --git a/src/consensus/aft/raft.h b/src/consensus/aft/raft.h index 2c497d981663..a7e4f3a3b45b 100644 --- a/src/consensus/aft/raft.h +++ b/src/consensus/aft/raft.h @@ -617,7 +617,7 @@ namespace aft return; } - LOG_DEBUG_FMT( + LOG_INFO_FMT( "Received pt: {} pi: {} t: {} i: {} toi: {}", r.prev_term, r.prev_idx, @@ -665,7 +665,7 @@ namespace aft // whose term is r.prev_term. if (prev_term == 0) { - LOG_DEBUG_FMT( + LOG_INFO_FMT( "Recv append entries to {} from {} but our log does not yet " "contain index {}", state->my_node_id, @@ -674,7 +674,7 @@ namespace aft } else { - LOG_DEBUG_FMT( + LOG_INFO_FMT( "Recv append entries to {} from {} but our log at {} has the wrong " "previous term (ours: {}, theirs: {})", state->my_node_id, @@ -808,7 +808,10 @@ namespace aft // A signature for sig_term tells us that all transactions from the // previous signature onwards (at least, if not further back) happened // in sig_term. We reflect this in the history. - state->view_history.update(prev_lci + 1, sig_term); + if (r.term_of_idx == aft::ViewHistory::InvalidView) + state->view_history.update(1, r.term); + else + state->view_history.update(prev_lci + 1, sig_term); commit_if_possible(r.leader_commit_idx); } if (consensus_type == ConsensusType::BFT) @@ -838,9 +841,12 @@ namespace aft // commit index and update our term history accordingly commit_if_possible(r.leader_commit_idx); - // The term may have changed, but we may not have seen a signature yet. + // The term may have changed, and we have not have seen a signature yet. auto lci = last_committable_index(); - state->view_history.update(lci + 1, r.term_of_idx); + if (r.term_of_idx == aft::ViewHistory::InvalidView) + state->view_history.update(1, r.term); + else + state->view_history.update(lci + 1, r.term_of_idx); send_append_entries_response(r.from_node, true); } @@ -1235,6 +1241,7 @@ namespace aft void become_leader() { election_index = last_committable_index(); + LOG_DEBUG_FMT("Election index is {}", election_index); // Discard any un-committable updates we may hold, // since we have no signature for them. Except at startup, // where we do not want to roll back the genesis transaction. @@ -1248,7 +1255,6 @@ namespace aft store->set_term(state->current_view); } - committable_indices.clear(); replica_state = Leader; leader_id = state->my_node_id; @@ -1365,6 +1371,7 @@ namespace aft void commit_if_possible(Index idx) { + LOG_DEBUG_FMT("Commit if possible {} (ci: {}) (ti {})", idx, state->commit_idx, get_term_internal(idx)); if ( (idx > state->commit_idx) && (get_term_internal(idx) <= state->current_view)) diff --git a/src/consensus/aft/test/main.cpp b/src/consensus/aft/test/main.cpp index 1b6cbc8a1162..b738468f3e8c 100644 --- a/src/consensus/aft/test/main.cpp +++ b/src/consensus/aft/test/main.cpp @@ -1180,22 +1180,31 @@ DOCTEST_TEST_CASE( dispatch_all( nodes, ((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries)); + DOCTEST_REQUIRE( ((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries.size() == 0); DOCTEST_REQUIRE( 1 == - dispatch_all( + dispatch_all_and_DOCTEST_CHECK( nodes, ((aft::ChannelStubProxy*)r1.channels.get()) - ->sent_append_entries_response)); + ->sent_append_entries_response, + [](const auto& msg) { + DOCTEST_REQUIRE(msg.success); + DOCTEST_REQUIRE(msg.last_log_idx == 1); + })); DOCTEST_REQUIRE( 1 == - dispatch_all( + dispatch_all_and_DOCTEST_CHECK( nodes, ((aft::ChannelStubProxy*)r2.channels.get()) - ->sent_append_entries_response)); + ->sent_append_entries_response, + [](const auto& msg) { + DOCTEST_REQUIRE(msg.success); + DOCTEST_REQUIRE(msg.last_log_idx == 1); + })); DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{2, second_entry, true}}, 1)); DOCTEST_REQUIRE(r0.ledger->ledger.size() == 2); @@ -1217,16 +1226,25 @@ DOCTEST_TEST_CASE( DOCTEST_REQUIRE( 1 == - dispatch_all( + dispatch_all_and_DOCTEST_CHECK( nodes, ((aft::ChannelStubProxy*)r1.channels.get()) - ->sent_append_entries_response)); + ->sent_append_entries_response, + [](const auto& msg) { + + DOCTEST_REQUIRE(msg.success); + DOCTEST_REQUIRE(msg.last_log_idx == 2); + })); DOCTEST_REQUIRE( 1 == - dispatch_all( + dispatch_all_and_DOCTEST_CHECK( nodes, ((aft::ChannelStubProxy*)r2.channels.get()) - ->sent_append_entries_response)); + ->sent_append_entries_response, + [](const auto& msg) { + DOCTEST_REQUIRE(msg.success); + DOCTEST_REQUIRE(msg.last_log_idx == 2); + })); DOCTEST_CHECK(r0.get_term() == 1); DOCTEST_CHECK(r0.get_commit_idx() == 2); diff --git a/src/node/history.h b/src/node/history.h index 90ecb800acf8..7ffaa578389a 100644 --- a/src/node/history.h +++ b/src/node/history.h @@ -671,6 +671,8 @@ namespace ccf { auto commit_txid = signable_txid.value(); auto txid = store.next_txid(); + + LOG_DEBUG_FMT("Signable TXID: {}.{}", commit_txid.first, commit_txid.second); LOG_DEBUG_FMT( "Signed at {} in view: {} commit was: {}.{}", From 68469050114d6a5d7dccec84f76629dc4c76b6fe Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Mon, 28 Sep 2020 22:41:52 +0100 Subject: [PATCH 17/25] fix --- src/consensus/aft/raft.h | 26 ++++++++++++++++---------- src/consensus/aft/test/main.cpp | 9 ++++----- src/node/history.h | 5 +++-- 3 files changed, 23 insertions(+), 17 deletions(-) diff --git a/src/consensus/aft/raft.h b/src/consensus/aft/raft.h index a7e4f3a3b45b..fa2e8b184af1 100644 --- a/src/consensus/aft/raft.h +++ b/src/consensus/aft/raft.h @@ -307,7 +307,8 @@ namespace aft std::lock_guard guard(state->lock); if (state->commit_idx >= election_index) { - return std::pair{get_term_internal(state->commit_idx), state->commit_idx}; + return std::pair{get_term_internal(state->commit_idx), + state->commit_idx}; } else { @@ -617,7 +618,7 @@ namespace aft return; } - LOG_INFO_FMT( + LOG_DEBUG_FMT( "Received pt: {} pi: {} t: {} i: {} toi: {}", r.prev_term, r.prev_idx, @@ -644,7 +645,7 @@ namespace aft else if (state->current_view > r.term) { // Reply false, since our term is later than the received term. - LOG_DEBUG_FMT( + LOG_INFO_FMT( "Recv append entries to {} from {} but our term is later ({} > {})", state->my_node_id, r.from_node, @@ -665,7 +666,7 @@ namespace aft // whose term is r.prev_term. if (prev_term == 0) { - LOG_INFO_FMT( + LOG_DEBUG_FMT( "Recv append entries to {} from {} but our log does not yet " "contain index {}", state->my_node_id, @@ -674,7 +675,7 @@ namespace aft } else { - LOG_INFO_FMT( + LOG_DEBUG_FMT( "Recv append entries to {} from {} but our log at {} has the wrong " "previous term (ours: {}, theirs: {})", state->my_node_id, @@ -805,9 +806,9 @@ namespace aft if (sig_term) { - // A signature for sig_term tells us that all transactions from the - // previous signature onwards (at least, if not further back) happened - // in sig_term. We reflect this in the history. + // A signature for sig_term tells us that all transactions from + // the previous signature onwards (at least, if not further back) + // happened in sig_term. We reflect this in the history. if (r.term_of_idx == aft::ViewHistory::InvalidView) state->view_history.update(1, r.term); else @@ -1094,7 +1095,8 @@ namespace aft return; } - // If the candidate's committable log is at least as up-to-date as ours, vote yes + // If the candidate's committable log is at least as up-to-date as ours, + // vote yes auto last_committable_idx = last_committable_index(); auto term_of_last_committable_index = @@ -1371,7 +1373,11 @@ namespace aft void commit_if_possible(Index idx) { - LOG_DEBUG_FMT("Commit if possible {} (ci: {}) (ti {})", idx, state->commit_idx, get_term_internal(idx)); + LOG_DEBUG_FMT( + "Commit if possible {} (ci: {}) (ti {})", + idx, + state->commit_idx, + get_term_internal(idx)); if ( (idx > state->commit_idx) && (get_term_internal(idx) <= state->current_view)) diff --git a/src/consensus/aft/test/main.cpp b/src/consensus/aft/test/main.cpp index b738468f3e8c..5d7895a5963f 100644 --- a/src/consensus/aft/test/main.cpp +++ b/src/consensus/aft/test/main.cpp @@ -1191,7 +1191,7 @@ DOCTEST_TEST_CASE( nodes, ((aft::ChannelStubProxy*)r1.channels.get()) ->sent_append_entries_response, - [](const auto& msg) { + [](const auto& msg) { DOCTEST_REQUIRE(msg.success); DOCTEST_REQUIRE(msg.last_log_idx == 1); })); @@ -1201,7 +1201,7 @@ DOCTEST_TEST_CASE( nodes, ((aft::ChannelStubProxy*)r2.channels.get()) ->sent_append_entries_response, - [](const auto& msg) { + [](const auto& msg) { DOCTEST_REQUIRE(msg.success); DOCTEST_REQUIRE(msg.last_log_idx == 1); })); @@ -1230,8 +1230,7 @@ DOCTEST_TEST_CASE( nodes, ((aft::ChannelStubProxy*)r1.channels.get()) ->sent_append_entries_response, - [](const auto& msg) { - + [](const auto& msg) { DOCTEST_REQUIRE(msg.success); DOCTEST_REQUIRE(msg.last_log_idx == 2); })); @@ -1241,7 +1240,7 @@ DOCTEST_TEST_CASE( nodes, ((aft::ChannelStubProxy*)r2.channels.get()) ->sent_append_entries_response, - [](const auto& msg) { + [](const auto& msg) { DOCTEST_REQUIRE(msg.success); DOCTEST_REQUIRE(msg.last_log_idx == 2); })); diff --git a/src/node/history.h b/src/node/history.h index 7ffaa578389a..da7cb9bd0c33 100644 --- a/src/node/history.h +++ b/src/node/history.h @@ -672,8 +672,9 @@ namespace ccf auto commit_txid = signable_txid.value(); auto txid = store.next_txid(); - LOG_DEBUG_FMT("Signable TXID: {}.{}", commit_txid.first, commit_txid.second); - + LOG_DEBUG_FMT( + "Signable TXID: {}.{}", commit_txid.first, commit_txid.second); + LOG_DEBUG_FMT( "Signed at {} in view: {} commit was: {}.{}", txid.version, From d401a2a5db12807ee68a416623ca1c61e20dd314 Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Tue, 29 Sep 2020 13:51:03 +0100 Subject: [PATCH 18/25] test committable suffix --- CMakeLists.txt | 7 + src/consensus/aft/test/main.cpp | 374 -------------------------------- tests/infra/network.py | 2 +- tests/rotation.py | 1 - 4 files changed, 8 insertions(+), 376 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 42608bc7b068..23e5630080ff 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -604,6 +604,13 @@ if(BUILD_TESTS) ADDITIONAL_ARGS --raft-election-timeout 4000 ) + add_e2e_test( + NAME committable_suffix_test + PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/committable.py + CONSENSUS cft + ADDITIONAL_ARGS --raft-election-timeout 4000 + ) + add_e2e_test( NAME lua_e2e_batched PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/e2e_batched.py diff --git a/src/consensus/aft/test/main.cpp b/src/consensus/aft/test/main.cpp index 5d7895a5963f..aeeed0c6bed1 100644 --- a/src/consensus/aft/test/main.cpp +++ b/src/consensus/aft/test/main.cpp @@ -1042,378 +1042,4 @@ DOCTEST_TEST_CASE("Exceed append entries limit") (sent_entries > num_small_entries_sent && sent_entries <= num_small_entries_sent + num_big_entries)); DOCTEST_REQUIRE(r2.ledger->ledger.size() == individual_entries); -} - -DOCTEST_TEST_CASE( - "Primary is isolated after advancing commit index but before secondaries are " - "notified") -{ - auto kv_store0 = std::make_shared(0); - auto kv_store1 = std::make_shared(1); - auto kv_store2 = std::make_shared(2); - - aft::NodeId node_id0(0); - aft::NodeId node_id1(1); - aft::NodeId node_id2(2); - - ms request_timeout(10); - - TRaft r0( - ConsensusType::CFT, - std::make_unique(kv_store0), - std::make_unique(node_id0), - std::make_shared(), - std::make_shared(), - nullptr, - nullptr, - cert, - request_map, - std::make_shared(node_id0), - nullptr, - request_timeout, - ms(20)); - TRaft r1( - ConsensusType::CFT, - std::make_unique(kv_store1), - std::make_unique(node_id1), - std::make_shared(), - std::make_shared(), - nullptr, - nullptr, - cert, - request_map, - std::make_shared(node_id1), - nullptr, - request_timeout, - ms(100)); - TRaft r2( - ConsensusType::CFT, - std::make_unique(kv_store2), - std::make_unique(node_id2), - std::make_shared(), - std::make_shared(), - nullptr, - nullptr, - cert, - request_map, - std::make_shared(node_id2), - nullptr, - request_timeout, - ms(50)); - - aft::Configuration::Nodes config0; - config0[node_id0] = {}; - config0[node_id1] = {}; - config0[node_id2] = {}; - r0.add_configuration(0, config0); - r1.add_configuration(0, config0); - r2.add_configuration(0, config0); - - map nodes; - nodes[node_id0] = &r0; - nodes[node_id1] = &r1; - nodes[node_id2] = &r2; - - r0.periodic(std::chrono::milliseconds(200)); - - DOCTEST_INFO("Initial election for Node 0"); - { - DOCTEST_REQUIRE( - 2 == - dispatch_all( - nodes, ((aft::ChannelStubProxy*)r0.channels.get())->sent_request_vote)); - DOCTEST_REQUIRE( - 1 == - dispatch_all( - nodes, - ((aft::ChannelStubProxy*)r1.channels.get()) - ->sent_request_vote_response)); - - DOCTEST_REQUIRE(r0.is_leader()); - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries.size() == - 2); - DOCTEST_REQUIRE( - 2 == - dispatch_all( - nodes, - ((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries)); - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries.size() == - 0); - } - - ((aft::ChannelStubProxy*)r1.channels.get()) - ->sent_append_entries_response.clear(); - ((aft::ChannelStubProxy*)r2.channels.get()) - ->sent_append_entries_response.clear(); - - auto first_entry = std::make_shared>(); - for (auto i = 0; i < 3; ++i) - { - first_entry->push_back(1); - } - auto second_entry = std::make_shared>(); - for (auto i = 0; i < 3; ++i) - { - second_entry->push_back(2); - } - - auto third_entry = std::make_shared>(); - for (auto i = 0; i < 3; ++i) - { - third_entry->push_back(3); - } - - DOCTEST_INFO("Node 0 compacts twice but Nodes 1 and 2 only once"); - { - DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{1, first_entry, true}}, 1)); - DOCTEST_REQUIRE(r0.ledger->ledger.size() == 1); - r0.periodic(ms(10)); - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries.size() == - 2); - - // Nodes 1 and 2 receive append entries and respond - DOCTEST_REQUIRE( - 2 == - dispatch_all( - nodes, - ((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries)); - - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries.size() == - 0); - - DOCTEST_REQUIRE( - 1 == - dispatch_all_and_DOCTEST_CHECK( - nodes, - ((aft::ChannelStubProxy*)r1.channels.get()) - ->sent_append_entries_response, - [](const auto& msg) { - DOCTEST_REQUIRE(msg.success); - DOCTEST_REQUIRE(msg.last_log_idx == 1); - })); - DOCTEST_REQUIRE( - 1 == - dispatch_all_and_DOCTEST_CHECK( - nodes, - ((aft::ChannelStubProxy*)r2.channels.get()) - ->sent_append_entries_response, - [](const auto& msg) { - DOCTEST_REQUIRE(msg.success); - DOCTEST_REQUIRE(msg.last_log_idx == 1); - })); - - DOCTEST_REQUIRE(r0.replicate(kv::BatchVector{{2, second_entry, true}}, 1)); - DOCTEST_REQUIRE(r0.ledger->ledger.size() == 2); - r0.periodic(ms(10)); - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries.size() == - 2); - - // Nodes 1 and 2 receive append entries and respond - // Node 0 will compact again and be ahead of Node 1 and 2 - DOCTEST_REQUIRE( - 2 == - dispatch_all( - nodes, - ((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries)); - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r0.channels.get())->sent_append_entries.size() == - 0); - - DOCTEST_REQUIRE( - 1 == - dispatch_all_and_DOCTEST_CHECK( - nodes, - ((aft::ChannelStubProxy*)r1.channels.get()) - ->sent_append_entries_response, - [](const auto& msg) { - DOCTEST_REQUIRE(msg.success); - DOCTEST_REQUIRE(msg.last_log_idx == 2); - })); - DOCTEST_REQUIRE( - 1 == - dispatch_all_and_DOCTEST_CHECK( - nodes, - ((aft::ChannelStubProxy*)r2.channels.get()) - ->sent_append_entries_response, - [](const auto& msg) { - DOCTEST_REQUIRE(msg.success); - DOCTEST_REQUIRE(msg.last_log_idx == 2); - })); - - DOCTEST_CHECK(r0.get_term() == 1); - DOCTEST_CHECK(r0.get_commit_idx() == 2); - DOCTEST_CHECK(r0.get_last_idx() == 2); - - DOCTEST_CHECK(r1.get_term() == 1); - DOCTEST_CHECK(r1.get_commit_idx() == 1); - DOCTEST_CHECK(r1.get_last_idx() == 2); - - DOCTEST_CHECK(r2.get_term() == 1); - DOCTEST_CHECK(r2.get_commit_idx() == 1); - DOCTEST_CHECK(r2.get_last_idx() == 2); - - // clean up - ((aft::ChannelStubProxy*)r2.channels.get()) - ->sent_request_vote_response.clear(); - - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r0.channels.get())->sent_msg_count() == 0); - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r1.channels.get())->sent_msg_count() == 0); - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r2.channels.get())->sent_msg_count() == 0); - } - - DOCTEST_INFO("Node 1 exceeds its election timeout and starts an election"); - { - auto by_0 = [](auto const& lhs, auto const& rhs) -> bool { - return get<0>(lhs) < get<0>(rhs); - }; - - r1.periodic(std::chrono::milliseconds(200)); - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r1.channels.get())->sent_request_vote.size() == - 2); - ((aft::ChannelStubProxy*)r1.channels.get())->sent_request_vote.sort(by_0); - - DOCTEST_INFO("Node 2 receives the vote request"); - // pop for first node (node 0) so that it doesn't participate in the - // election - auto vote_req_from_1_to_0 = - ((aft::ChannelStubProxy*)r1.channels.get())->sent_request_vote.front(); - ((aft::ChannelStubProxy*)r1.channels.get())->sent_request_vote.pop_front(); - - DOCTEST_REQUIRE( - 1 == - dispatch_all_and_DOCTEST_CHECK( - nodes, - ((aft::ChannelStubProxy*)r1.channels.get())->sent_request_vote, - [](const auto& msg) { - DOCTEST_REQUIRE(msg.term == 2); - DOCTEST_REQUIRE(msg.term_of_last_committable_idx == 1); - DOCTEST_REQUIRE(msg.last_committable_idx == 2); - })); - - DOCTEST_INFO("Node 2 votes for Node 1, Node 0 is suspended"); - DOCTEST_REQUIRE( - 1 == - dispatch_all_and_DOCTEST_CHECK( - nodes, - ((aft::ChannelStubProxy*)r2.channels.get())->sent_request_vote_response, - [](const auto& msg) { - DOCTEST_REQUIRE(msg.term == 2); - DOCTEST_REQUIRE(msg.vote_granted); - })); - - DOCTEST_INFO("Node 1 is now leader"); - DOCTEST_REQUIRE(r1.is_leader()); - // pop Node 0's append entries - ((aft::ChannelStubProxy*)r1.channels.get()) - ->sent_append_entries.pop_front(); - DOCTEST_REQUIRE( - 1 == - dispatch_all_and_DOCTEST_CHECK( - nodes, - ((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries, - [](const auto& msg) { - DOCTEST_REQUIRE(msg.idx == 2); - DOCTEST_REQUIRE(msg.term == 2); - DOCTEST_REQUIRE(msg.prev_idx == 2); - DOCTEST_REQUIRE(msg.prev_term == 1); - DOCTEST_REQUIRE(msg.leader_commit_idx == 1); - })); - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries.size() == - 0); - - DOCTEST_REQUIRE( - 1 == - dispatch_all( - nodes, - ((aft::ChannelStubProxy*)r2.channels.get()) - ->sent_append_entries_response)); - } - - DOCTEST_CHECK(r0.get_term() == 1); - DOCTEST_CHECK(r0.get_commit_idx() == 2); - DOCTEST_CHECK(r0.get_last_idx() == 2); - - DOCTEST_CHECK(r1.get_term() == 2); - DOCTEST_CHECK(r1.get_commit_idx() == 1); - DOCTEST_CHECK(r1.get_last_idx() == 2); - - DOCTEST_CHECK(r2.get_term() == 2); - DOCTEST_CHECK(r2.get_commit_idx() == 1); - DOCTEST_CHECK(r2.get_last_idx() == 2); - - DOCTEST_INFO("Node 1 resumes replication"); - { - DOCTEST_REQUIRE(r1.replicate(kv::BatchVector{{3, third_entry, true}}, 2)); - DOCTEST_REQUIRE(r1.ledger->ledger.size() == 3); - r1.periodic(ms(10)); - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries.size() == - 2); - - // Nodes 0 and 2 receive append entries and respond - DOCTEST_REQUIRE( - 2 == - dispatch_all( - nodes, - ((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries)); - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries.size() == - 0); - - DOCTEST_REQUIRE( - 1 == - dispatch_all_and_DOCTEST_CHECK( - nodes, - ((aft::ChannelStubProxy*)r2.channels.get()) - ->sent_append_entries_response, - [](const auto& msg) { DOCTEST_REQUIRE(msg.success); })); - - DOCTEST_REQUIRE( - 1 == - dispatch_all_and_DOCTEST_CHECK( - nodes, - ((aft::ChannelStubProxy*)r0.channels.get()) - ->sent_append_entries_response, - [](const auto& msg) { DOCTEST_REQUIRE(msg.success); })); - - DOCTEST_INFO("Another entry from Node 1 so that Node 2 can also compact"); - DOCTEST_REQUIRE(r1.replicate(kv::BatchVector{{4, third_entry, true}}, 2)); - DOCTEST_REQUIRE(r1.ledger->ledger.size() == 4); - r1.periodic(ms(10)); - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries.size() == - 2); - - // Nodes 0 and 2 receive append entries - DOCTEST_REQUIRE( - 2 == - dispatch_all( - nodes, - ((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries)); - DOCTEST_REQUIRE( - ((aft::ChannelStubProxy*)r1.channels.get())->sent_append_entries.size() == - 0); - - DOCTEST_CHECK(r0.get_term() == 2); - DOCTEST_CHECK(r0.get_commit_idx() == 3); - DOCTEST_CHECK(r0.get_last_idx() == 4); - - DOCTEST_CHECK(r1.get_term() == 2); - DOCTEST_CHECK(r1.get_commit_idx() == 3); - DOCTEST_CHECK(r1.get_last_idx() == 4); - - DOCTEST_CHECK(r2.get_term() == 2); - DOCTEST_CHECK(r2.get_commit_idx() == 3); - DOCTEST_CHECK(r2.get_last_idx() == 4); - } } \ No newline at end of file diff --git a/tests/infra/network.py b/tests/infra/network.py index f16d991d9e03..ecf2eec08aa9 100644 --- a/tests/infra/network.py +++ b/tests/infra/network.py @@ -633,7 +633,7 @@ def wait_for_all_nodes_to_catch_up(self, primary, timeout=10): caught_up_nodes = [] for node in self.get_joined_nodes(): with node.client() as c: - c.get(f"/node/commit") + c.get("/node/commit") resp = c.get(f"/node/local_tx?view={view}&seqno={seqno}") if resp.status_code != 200: # Node may not have joined the network yet, try again diff --git a/tests/rotation.py b/tests/rotation.py index 707d18ac94ee..f531a7ba21f5 100644 --- a/tests/rotation.py +++ b/tests/rotation.py @@ -5,7 +5,6 @@ import infra.proc import suite.test_requirements as reqs import reconfiguration -import e2e_logging from loguru import logger as LOG From 06c67f2c929790772ebf9c767ad5b8abb01c7f95 Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Tue, 29 Sep 2020 14:08:07 +0100 Subject: [PATCH 19/25] missing file --- tests/committable.py | 63 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 tests/committable.py diff --git a/tests/committable.py b/tests/committable.py new file mode 100644 index 000000000000..63b5dec0ea95 --- /dev/null +++ b/tests/committable.py @@ -0,0 +1,63 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the Apache 2.0 License. +import infra.e2e_args +import infra.network +import infra.proc +import time + +from loguru import logger as LOG + + +def run(args): + hosts = ["localhost"] * 5 + + with infra.network.network( + hosts, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb + ) as network: + network.start_and_join(args) + primary, backups = network.find_nodes() + + # Suspend three of the backups to prevent commit + backups[1].suspend() + backups[2].suspend() + backups[3].stop() + + txs = [] + # Run some transactions that can't be committed + with primary.client("user0") as uc: + for i in range(10): + txs.append( + uc.post("/app/log/private", {"id": 100 + i, "msg": "Hello world"}) + ) + + # Wait for a signature to ensure those transactions are committable + time.sleep(args.sig_tx_interval * 2 / 1000) + + # Kill the primary, restore other backups + primary.stop() + backups[1].resume() + backups[2].resume() + new_primary, new_term = network.wait_for_new_primary(primary.node_id) + LOG.debug(f"New primary is {new_primary.node_id} in term {new_term}") + assert new_primary.node_id == backups[0].node_id + + # Check that uncommitted but committsble suffix is preserved + with new_primary.client("user0") as uc: + check_commit = infra.checker.Checker(uc) + for tx in txs: + check_commit(tx) + + +if __name__ == "__main__": + + def add(parser): + parser.add_argument( + "-p", + "--package", + help="The enclave package to load (e.g., liblogging)", + default="liblogging", + ) + + args = infra.e2e_args.cli_args(add) + args.package = args.app_script and "liblua_generic" or "liblogging" + run(args) From f3d5b0b19b90d8354551e7c47b8034526f38bb81 Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Tue, 29 Sep 2020 16:54:05 +0100 Subject: [PATCH 20/25] wait for commit --- tests/memberclient.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/memberclient.py b/tests/memberclient.py index 4533440691ba..580a791baf29 100644 --- a/tests/memberclient.py +++ b/tests/memberclient.py @@ -57,7 +57,10 @@ def test_add_member(network, args): except infra.member.NoRecoveryShareFound as e: assert e.response.body.text() == "Only active members are given recovery shares" - new_member.ack(primary) + r = new_member.ack(primary) + with primary.client() as nc: + check_commit = infra.checker.Checker(nc) + check_commit(r) return network From 97c5a72ac0df6aefc4a61e192c85e3b83d7ddd43 Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Tue, 29 Sep 2020 17:06:29 +0100 Subject: [PATCH 21/25] comment --- src/consensus/aft/raft.h | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/consensus/aft/raft.h b/src/consensus/aft/raft.h index f77cebf32e9a..31f8d0191a0e 100644 --- a/src/consensus/aft/raft.h +++ b/src/consensus/aft/raft.h @@ -74,6 +74,12 @@ namespace aft ReplicaState replica_state; std::chrono::milliseconds timeout_elapsed; + // Last (committable) index preceding the node's election, this is + // used to decide when to start issuing signatures. While commit_idx + // hasn't caught up with election_index, a newly elected leader is + // effectively finishing establishing commit over the previous term + // or even previous terms, and can therefore not meaningfully sign + // over the commit level. kv::Version election_index = 0; // BFT From 0ab0ec5634d23b976b50c1ec302425eb26327240 Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Tue, 29 Sep 2020 17:09:51 +0100 Subject: [PATCH 22/25] diff --- src/node/history.h | 119 +++++++++++++++++++++++---------------------- 1 file changed, 61 insertions(+), 58 deletions(-) diff --git a/src/node/history.h b/src/node/history.h index 891215b4de1a..df348a094d67 100644 --- a/src/node/history.h +++ b/src/node/history.h @@ -676,72 +676,75 @@ namespace ccf return; } + // Signatures are only emitted when the consensus is establishing commit over + // the node's own transactions auto signable_txid = consensus->get_signable_txid(); - - if (signable_txid.has_value()) + if (!signable_txid.has_value()) { - auto commit_txid = signable_txid.value(); - auto txid = store.next_txid(); + return; + } - LOG_DEBUG_FMT( - "Signed at {} in view: {} commit was: {}.{}", - txid.version, - txid.term, - commit_txid.first, - commit_txid.second); + auto commit_txid = signable_txid.value(); + auto txid = store.next_txid(); - store.commit( - txid, - [txid, commit_txid, this]() { - kv::Tx sig(txid.version); - auto sig_view = sig.get_view(signatures); - crypto::Sha256Hash root = replicated_state_tree.get_root(); + LOG_DEBUG_FMT( + "Signed at {} in view: {} commit was: {}.{}", + txid.version, + txid.term, + commit_txid.first, + commit_txid.second); - Nonce hashed_nonce; - auto consensus = store.get_consensus(); - if (consensus != nullptr && consensus->type() == ConsensusType::BFT) - { - auto progress_tracker = store.get_progress_tracker(); - CCF_ASSERT( - progress_tracker != nullptr, "progress_tracker is not set"); - auto r = progress_tracker->record_primary( - txid.term, txid.version, id, root, hashed_nonce); - if (r != kv::TxHistory::Result::OK) - { - throw ccf::ccf_logic_error(fmt::format( - "Expected success when primary added signature to the " - "progress " - "tracker. r:{}, view:{}, seqno:{}", - r, - txid.term, - txid.version)); - } - - auto h = - progress_tracker->get_my_hashed_nonce(txid.term, txid.version); - std::copy(h.begin(), h.end(), hashed_nonce.begin()); - } - else + store.commit( + txid, + [txid, commit_txid, this]() { + kv::Tx sig(txid.version); + auto sig_view = sig.get_view(signatures); + crypto::Sha256Hash root = replicated_state_tree.get_root(); + + Nonce hashed_nonce; + auto consensus = store.get_consensus(); + if (consensus != nullptr && consensus->type() == ConsensusType::BFT) + { + auto progress_tracker = store.get_progress_tracker(); + CCF_ASSERT( + progress_tracker != nullptr, "progress_tracker is not set"); + auto r = progress_tracker->record_primary( + txid.term, txid.version, id, root, hashed_nonce); + if (r != kv::TxHistory::Result::OK) { - hashed_nonce.fill(0); + throw ccf::ccf_logic_error(fmt::format( + "Expected success when primary added signature to the " + "progress " + "tracker. r:{}, view:{}, seqno:{}", + r, + txid.term, + txid.version)); } - PrimarySignature sig_value( - id, - txid.version, - txid.term, - commit_txid.second, - commit_txid.first, - root, - hashed_nonce, - kp.sign_hash(root.h.data(), root.h.size()), - replicated_state_tree.serialise()); - - sig_view->put(0, sig_value); - return sig.commit_reserved(); - }, - true); - } + auto h = + progress_tracker->get_my_hashed_nonce(txid.term, txid.version); + std::copy(h.begin(), h.end(), hashed_nonce.begin()); + } + else + { + hashed_nonce.fill(0); + } + + PrimarySignature sig_value( + id, + txid.version, + txid.term, + commit_txid.second, + commit_txid.first, + root, + hashed_nonce, + kp.sign_hash(root.h.data(), root.h.size()), + replicated_state_tree.serialise()); + + sig_view->put(0, sig_value); + return sig.commit_reserved(); + }, + true); } std::vector get_receipt(kv::Version index) override From b43cea60ee10b5c24a47a565ee51fdcd7d1f02b2 Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Tue, 29 Sep 2020 17:16:09 +0100 Subject: [PATCH 23/25] Update tests/committable.py Co-authored-by: Eddy Ashton --- tests/committable.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/committable.py b/tests/committable.py index 63b5dec0ea95..909bf6b31cb9 100644 --- a/tests/committable.py +++ b/tests/committable.py @@ -41,7 +41,7 @@ def run(args): LOG.debug(f"New primary is {new_primary.node_id} in term {new_term}") assert new_primary.node_id == backups[0].node_id - # Check that uncommitted but committsble suffix is preserved + # Check that uncommitted but committable suffix is preserved with new_primary.client("user0") as uc: check_commit = infra.checker.Checker(uc) for tx in txs: From 281795c1d29281353ee069957a2e10108817be8b Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Tue, 29 Sep 2020 19:32:22 +0100 Subject: [PATCH 24/25] format --- src/node/history.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/node/history.h b/src/node/history.h index df348a094d67..4e944b8da567 100644 --- a/src/node/history.h +++ b/src/node/history.h @@ -676,8 +676,8 @@ namespace ccf return; } - // Signatures are only emitted when the consensus is establishing commit over - // the node's own transactions + // Signatures are only emitted when the consensus is establishing commit + // over the node's own transactions auto signable_txid = consensus->get_signable_txid(); if (!signable_txid.has_value()) { From c9d5ba98a7507c06091d03c467c951973f4e3d99 Mon Sep 17 00:00:00 2001 From: Amaury Chamayou Date: Wed, 30 Sep 2020 11:52:28 +0100 Subject: [PATCH 25/25] ignore stopped nodes in config check --- tests/reconfiguration.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/reconfiguration.py b/tests/reconfiguration.py index e50debe64c07..dc40f6d56b6a 100644 --- a/tests/reconfiguration.py +++ b/tests/reconfiguration.py @@ -20,12 +20,12 @@ def node_configs(network): return configs -def count_nodes(configs): +def count_nodes(configs, network): nodes = set(str(k) for k in configs.keys()) + stopped = {str(n.node_id) for n in network.nodes if n.is_stopped()} for node_id, node_config in configs.items(): - assert nodes == set( - node_config.keys() - ), f"{nodes} {set(node_config.keys())} {node_id}" + nodes_in_config = set(node_config.keys()) - stopped + assert nodes == nodes_in_config, f"{nodes} {nodes_in_config} {node_id}" return len(nodes) @@ -122,7 +122,7 @@ def test_retire_backup(network, args): @reqs.description("Retiring the primary") @reqs.can_kill_n_nodes(1) def test_retire_primary(network, args): - pre_count = count_nodes(node_configs(network)) + pre_count = count_nodes(node_configs(network), network) primary, backup = network.find_primary_and_any_backup() network.consortium.retire_node(primary, primary) @@ -130,7 +130,7 @@ def test_retire_primary(network, args): LOG.debug(f"New primary is {new_primary.node_id} in term {new_term}") check_can_progress(backup) network.nodes.remove(primary) - post_count = count_nodes(node_configs(network)) + post_count = count_nodes(node_configs(network), network) assert pre_count == post_count + 1 primary.stop() return network