From ab0134da4fe30b7caa92ea5a7a0c1186143b8a6d Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Thu, 13 Oct 2022 10:48:06 +0000 Subject: [PATCH 01/12] Initial implementation --- include/ccf/odata_error.h | 1 + include/ccf/rpc_context.h | 2 ++ src/node/rpc/frontend.h | 50 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 53 insertions(+) diff --git a/include/ccf/odata_error.h b/include/ccf/odata_error.h index 15bca958432f..e65be5b45f07 100644 --- a/include/ccf/odata_error.h +++ b/include/ccf/odata_error.h @@ -91,6 +91,7 @@ namespace ccf ERROR(PrimaryNotFound) ERROR(RequestAlreadyForwarded) ERROR(NodeNotRetiredCommitted) + ERROR(SessionConsistencyLost) // node-to-node (/join and /create): ERROR(ConsensusTypeMismatch) diff --git a/include/ccf/rpc_context.h b/include/ccf/rpc_context.h index e265bb3b24de..8b8ef51f6048 100644 --- a/include/ccf/rpc_context.h +++ b/include/ccf/rpc_context.h @@ -32,6 +32,8 @@ namespace ccf // Only set in the case of a forwarded RPC bool is_forwarded = false; + std::optional last_tx_id = std::nullopt; + SessionContext( size_t client_session_id_, const std::vector& caller_cert_, diff --git a/src/node/rpc/frontend.h b/src/node/rpc/frontend.h index 972fe62ccd31..87a090c0ce05 100644 --- a/src/node/rpc/frontend.h +++ b/src/node/rpc/frontend.h @@ -197,6 +197,36 @@ namespace ccf return true; } + bool session_consistency_maintained( + std::shared_ptr session_ctx) + { + if (session_ctx->last_tx_id.has_value()) + { + if (consensus == nullptr) + { + LOG_FAIL_FMT("consensus deleted after TxID assigned"); + return false; + } + + auto tx_id = session_ctx->last_tx_id.value(); + + const auto tx_view = consensus->get_view(tx_id.seqno); + const auto committed_seqno = consensus->get_committed_seqno(); + const auto committed_view = consensus->get_view(committed_seqno); + + auto tx_status = ccf::evaluate_tx_status( + tx_id.view, tx_id.seqno, tx_view, committed_view, committed_seqno); + // Pending and Committed obviously retain session consistency. It's also + // possible for the status to be Unknown - the last_tx_id was populated + // from a forwarded response about state this node doesn't yet know. + // Only once this Tx is known to be invalidated can we report that + // session consistency has been lost. + return tx_status != ccf::TxStatus::Invalid; + } + + return true; + } + std::unique_ptr get_authenticated_identity( std::shared_ptr ctx, kv::CommittableTx& tx, @@ -409,6 +439,24 @@ namespace ccf endpoints.execute_endpoint(endpoint, args); + // NB: Do this check after execution, to ensure this transaction has + // claimed a read version if it is going to. If we do it any earlier, + // it is possible that we report that the previous TxID is still + // valid, but invalidate it between that point and when this + // transaction executes. + if (!session_consistency_maintained(ctx->get_session_context())) + { + ctx->set_error( + HTTP_STATUS_INTERNAL_SERVER_ERROR, + ccf::errors::SessionConsistencyLost, + fmt::format( + "Previous transaction reported on this session ({}) is no " + "longer valid. Closing session", + ctx->get_session_context()->last_tx_id->to_str())); + // TODO: Ensure the session is actually closed after this response + return; + } + if (!ctx->should_apply_writes()) { update_metrics(ctx); @@ -464,6 +512,8 @@ namespace ccf auto tx_id = tx.get_txid(); if (tx_id.has_value() && consensus != nullptr) { + // TODO: Also set this on forwarding path + ctx->get_session_context()->last_tx_id = tx_id; try { // Only transactions that acquired one or more map handles From be9073e5698cad2ead2e21861202f273708a9aae Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Thu, 13 Oct 2022 14:47:12 +0000 Subject: [PATCH 02/12] Sketching a test --- src/node/rpc/frontend.h | 3 +- tests/infra/node.py | 9 ++- tests/partitions_test.py | 125 ++++++++++++++++++++++++++++++++++++--- 3 files changed, 123 insertions(+), 14 deletions(-) diff --git a/src/node/rpc/frontend.h b/src/node/rpc/frontend.h index 87a090c0ce05..c7a7c2bce290 100644 --- a/src/node/rpc/frontend.h +++ b/src/node/rpc/frontend.h @@ -451,9 +451,8 @@ namespace ccf ccf::errors::SessionConsistencyLost, fmt::format( "Previous transaction reported on this session ({}) is no " - "longer valid. Closing session", + "longer valid. Please start a new TLS session.", ctx->get_session_context()->last_tx_id->to_str())); - // TODO: Ensure the session is actually closed after this response return; } diff --git a/tests/infra/node.py b/tests/infra/node.py index b3d000189702..67f03ed26a16 100644 --- a/tests/infra/node.py +++ b/tests/infra/node.py @@ -591,6 +591,7 @@ def client( signing_identity=None, interface_name=infra.interfaces.PRIMARY_RPC_INTERFACE, verify_ca=True, + description=None, **kwargs, ): if self.network_state == NodeNetworkState.stopped: @@ -619,9 +620,11 @@ def client( akwargs["http2"] = True akwargs.update(self.session_auth(identity)) akwargs.update(self.signing_auth(signing_identity)) - akwargs[ - "description" - ] = f"[{self.local_node_id}|{identity or ''}|{signing_identity or ''}]" + description = ( + description + or f"{self.local_node_id}|{identity or ''}|{signing_identity or ''}" + ) + akwargs["description"] = f"[{description}]" akwargs.update(kwargs) if self.curl: diff --git a/tests/partitions_test.py b/tests/partitions_test.py index 90e367282605..7cefe78352eb 100644 --- a/tests/partitions_test.py +++ b/tests/partitions_test.py @@ -502,6 +502,112 @@ def test_forwarding_timeout(network, args): network.wait_for_primary_unanimity() +@reqs.description( + "Session consistency is provided, and inconsistencies result in errors after elections" +) +# @reqs.supports_methods("/app/log/private") +@reqs.at_least_n_nodes(3) +@reqs.no_http2() +def test_session_consistency(network, args): + primary, backups = network.find_nodes() + backup = backups[0] + + with contextlib.ExitStack() as stack: + client_primary_0 = stack.enter_context( + primary.client("user0", description="primary 0") + ) + client_primary_1 = stack.enter_context( + primary.client("user0", description="primary 1") + ) + client_backup_0 = stack.enter_context( + backup.client("user0", description="backup 0") + ) + client_backup_1 = stack.enter_context( + backup.client("user0", description="backup 1") + ) + + # Create some new state + msg_id = 42 + msg_a = "First write, to primary" + r = client_primary_0.post( + "/app/log/private", + { + "id": msg_id, + "msg": msg_a, + }, + ) + assert r.status_code == http.HTTPStatus.OK, r + + # Read this state on a second session + r = client_primary_1.get(f"/app/log/private?id={msg_id}") + assert r.status_code == http.HTTPStatus.OK, r + assert r.body.json()["msg"] == msg_a, r + + # Wait for that to be committed on backup + client_backup_0.wait_for_commit(r) + + # Write on backup, resulting in a forwarded request + # Confirm that this session can read that write, since it remains forwarded + # Meanwhile another session to the same node may not see it + # NB: The latter property is not possible to test systematically, as it + # relies on a race - does the read on the second session happen before consensus + # update's the backup's state. So we try in a loop + n_attempts = 20 + for i in range(n_attempts): + last_message = f"Second write, via backup ({i})" + r = client_backup_0.post( + "/app/log/private", + { + "id": msg_id, + "msg": last_message, + }, + ) + assert r.status_code == http.HTTPStatus.OK, r + + r = client_backup_1.get(f"/app/log/private?id={msg_id}") + assert r.status_code == http.HTTPStatus.OK, r + if r.body.json()["msg"] != last_message: + LOG.info( + f"Successfully saw a different value on second session after {i} attempts" + ) + break + else: + raise RuntimeError( + f"Failed to observe evidence of session forwarding after {n_attempts} attempts" + ) + + # Write on a partitioned primary, confirm that this session gets an error once an election has occurred + with network.partitioner.partition([primary]): + msg0 = "AA" + r0 = client_primary_0.post( + "/app/log/private", + { + "id": msg_id, + "msg": msg0, + }, + ) + + msg1 = "BB" + r1 = client_backup_0.get(f"/app/log/private?id={msg_id}") + + new_primary, new_view = network.wait_for_new_primary(primary, nodes=backups) + + LOG.warning("What happens now?") + client_primary_0.get("/node/commit") + client_primary_1.get("/node/commit") + client_backup_0.get("/node/commit") + client_backup_1.get("/node/commit") + + network.wait_for_primary_unanimity(min_view=new_view - 1) + LOG.warning("And after de-partition?") + client_primary_0.get("/node/commit") + client_primary_1.get("/node/commit") + client_backup_0.get("/node/commit") + client_backup_1.get("/node/commit") + + return network + + def run_2tx_reconfig_tests(args): if not args.include_2tx_reconfig: return @@ -538,14 +644,15 @@ def run(args): ) as network: network.start_and_open(args) - test_invalid_partitions(network, args) - test_partition_majority(network, args) - test_isolate_primary_from_one_backup(network, args) - test_new_joiner_helps_liveness(network, args) - for n in range(5): - test_isolate_and_reconnect_primary(network, args, iteration=n) - test_election_reconfiguration(network, args) - test_forwarding_timeout(network, args) + # test_invalid_partitions(network, args) + # test_partition_majority(network, args) + # test_isolate_primary_from_one_backup(network, args) + # test_new_joiner_helps_liveness(network, args) + # for n in range(5): + # test_isolate_and_reconnect_primary(network, args, iteration=n) + # test_election_reconfiguration(network, args) + # test_forwarding_timeout(network, args) + test_session_consistency(network, args) if __name__ == "__main__": @@ -563,4 +670,4 @@ def add(parser): args.package = "samples/apps/logging/liblogging" run(args) - run_2tx_reconfig_tests(args) + # run_2tx_reconfig_tests(args) From d791e3332682f4a6012a2cc77d1f124ad43a5bf6 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Thu, 13 Oct 2022 16:41:00 +0000 Subject: [PATCH 03/12] Insert a way to record txid in forwarded responses --- src/enclave/endpoint.h | 3 +++ src/enclave/rpc_sessions.h | 1 + src/http/http_endpoint.h | 54 ++++++++++++++++++++++++++++++++++---- tests/partitions_test.py | 5 ++++ 4 files changed, 58 insertions(+), 5 deletions(-) diff --git a/src/enclave/endpoint.h b/src/enclave/endpoint.h index 29a6f7e1ef55..ab8c238e47f5 100644 --- a/src/enclave/endpoint.h +++ b/src/enclave/endpoint.h @@ -2,6 +2,7 @@ // Licensed under the Apache 2.0 License. #pragma once +#include #include #include @@ -14,5 +15,7 @@ namespace ccf virtual void recv(const uint8_t* data, size_t size, sockaddr) = 0; virtual void send(std::vector&& data, sockaddr) = 0; + + virtual void record_response_txid(std::span raw_response) {} }; } diff --git a/src/enclave/rpc_sessions.h b/src/enclave/rpc_sessions.h index 9549e38908a4..6b11bfd96377 100644 --- a/src/enclave/rpc_sessions.h +++ b/src/enclave/rpc_sessions.h @@ -476,6 +476,7 @@ namespace ccf LOG_DEBUG_FMT("Replying to session {}", id); + search->second.second->record_response_txid({data.data(), data.size()}); search->second.second->send(std::move(data), {}); return true; } diff --git a/src/http/http_endpoint.h b/src/http/http_endpoint.h index e9f8aab031bd..fbe0664f2503 100644 --- a/src/http/http_endpoint.h +++ b/src/http/http_endpoint.h @@ -148,6 +148,15 @@ namespace http tls::ConnID session_id; ccf::ListenInterfaceID interface_id; + void update_session_ctx() + { + if (session_ctx == nullptr) + { + session_ctx = std::make_shared( + session_id, peer_cert(), interface_id); + } + } + public: HTTPServerEndpoint( std::shared_ptr rpc_map, @@ -174,6 +183,45 @@ namespace http send_raw(std::move(data)); } + void record_response_txid(std::span raw_response) override + { + // To avoid a full HTTP parse, search for the desired header directly + std::string_view response( + (char const*)raw_response.data(), raw_response.size()); + std::string_view target(http::headers::CCF_TX_ID); + auto header_begin = std::search( + response.begin(), response.end(), target.begin(), target.end()); + auto header_name_end = std::find(header_begin, response.end(), ':'); + auto header_value_end = std::find(header_name_end, response.end(), '\r'); + + if (header_value_end == response.end()) + { + // Response has no TxID header value - this is valid + return; + } + + std::string_view header_value( + header_name_end, header_value_end - header_name_end); + const auto leading_spaces = header_value.find_first_not_of(' '); + if (leading_spaces != std::string::npos) + { + header_value.remove_prefix(leading_spaces); + } + + auto tx_id = ccf::TxID::from_str(header_value); + if (!tx_id.has_value()) + { + LOG_FAIL_FMT( + "Error parsing TxID from response header: {}", + std::string_view(header_begin, header_value_end - header_begin)); + return; + } + + update_session_ctx(); + + session_ctx->last_tx_id = tx_id; + } + void handle_request( llhttp_method verb, const std::string_view& url, @@ -189,11 +237,7 @@ namespace http try { - if (session_ctx == nullptr) - { - session_ctx = std::make_shared( - session_id, peer_cert(), interface_id); - } + update_session_ctx(); std::shared_ptr rpc_ctx = nullptr; try diff --git a/tests/partitions_test.py b/tests/partitions_test.py index 7cefe78352eb..64f0490ab62e 100644 --- a/tests/partitions_test.py +++ b/tests/partitions_test.py @@ -598,6 +598,11 @@ def test_session_consistency(network, args): client_backup_0.get("/node/commit") client_backup_1.get("/node/commit") + # Goal: + # - p0 should get errors - it wrote during partition + # - p1 should be fine, it only read some early, still valid state + # - b0 should get errors - it got discarded data over forwarded channel + # - b1 should get errors - similar? network.wait_for_primary_unanimity(min_view=new_view - 1) LOG.warning("And after de-partition?") client_primary_0.get("/node/commit") From 1db29112dd2c2f6c34bc5bc72ce1f86058ec69e0 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Thu, 13 Oct 2022 17:29:14 +0000 Subject: [PATCH 04/12] An expensive off-by-one error --- src/http/http_endpoint.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/http/http_endpoint.h b/src/http/http_endpoint.h index fbe0664f2503..97930163d9b7 100644 --- a/src/http/http_endpoint.h +++ b/src/http/http_endpoint.h @@ -200,8 +200,9 @@ namespace http return; } + auto after_name_end = std::next(header_name_end); std::string_view header_value( - header_name_end, header_value_end - header_name_end); + after_name_end, header_value_end - after_name_end); const auto leading_spaces = header_value.find_first_not_of(' '); if (leading_spaces != std::string::npos) { From 05ba86ba23c9e91c5fa4a318e40404971a521e6c Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Fri, 14 Oct 2022 14:13:05 +0000 Subject: [PATCH 05/12] Add helper method to resize network --- tests/infra/network.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/tests/infra/network.py b/tests/infra/network.py index 5714d5cc972c..48eeabfad6c7 100644 --- a/tests/infra/network.py +++ b/tests/infra/network.py @@ -986,6 +986,23 @@ def find_primary_and_any_backup(self, timeout=3): backup = random.choice(backups) return primary, backup + def resize(self, target_count, args): + node_count = len(self.get_joined_nodes()) + initial_node_count = node_count + LOG.info(f"Resizing network from {initial_node_count} to {target_count} nodes") + while node_count < target_count: + new_node = self.create_node("local://localhost") + self.join_node(new_node, args.package, args) + self.trust_node(new_node, args) + node_count += 1 + while node_count > target_count: + primary, backup = self.find_primary_and_any_backup() + self.retire_node(primary, backup) + node_count -= 1 + primary, _ = self.find_primary() + self.wait_for_all_nodes_to_commit(primary) + LOG.success(f"Resized network from {initial_node_count} to {target_count} nodes") + def wait_for_all_nodes_to_commit(self, primary=None, tx_id=None, timeout=10): """ Wait for all nodes to have joined the network and committed all transactions From 555a8b406f566a713c1460dcd69b71e6c083c4d8 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Fri, 14 Oct 2022 14:16:14 +0000 Subject: [PATCH 06/12] A complete test --- tests/partitions_test.py | 88 +++++++++++++++++++++++++++++----------- 1 file changed, 64 insertions(+), 24 deletions(-) diff --git a/tests/partitions_test.py b/tests/partitions_test.py index 64f0490ab62e..1e31e14d4f11 100644 --- a/tests/partitions_test.py +++ b/tests/partitions_test.py @@ -503,12 +503,14 @@ def test_forwarding_timeout(network, args): @reqs.description( - "Session consistency is provided, and inconsistencies result in errors after elections" + "Session consistency is provided, and inconsistencies after elections are replaced by errors" ) -# @reqs.supports_methods("/app/log/private") -@reqs.at_least_n_nodes(3) +@reqs.supports_methods("/log/private") @reqs.no_http2() def test_session_consistency(network, args): + # Ensure we have 5 nodes + network.resize(5, args) + primary, backups = network.find_nodes() backup = backups[0] @@ -543,8 +545,8 @@ def test_session_consistency(network, args): assert r.status_code == http.HTTPStatus.OK, r assert r.body.json()["msg"] == msg_a, r - # Wait for that to be committed on backup - client_backup_0.wait_for_commit(r) + # Wait for that to be committed on all backups + network.wait_for_all_nodes_to_commit(primary) # Write on backup, resulting in a forwarded request # Confirm that this session can read that write, since it remains forwarded @@ -576,8 +578,24 @@ def test_session_consistency(network, args): f"Failed to observe evidence of session forwarding after {n_attempts} attempts" ) - # Write on a partitioned primary, confirm that this session gets an error once an election has occurred - with network.partitioner.partition([primary]): + def check_reads(sessions=None, should_error=False): + sessions = sessions or ( + client_primary_0, + client_primary_1, + client_backup_0, + client_backup_1, + ) + for client in sessions: + r = client.get(f"/app/log/private?id={msg_id}") + if should_error: + assert r.status_code == http.HTTPStatus.INTERNAL_SERVER_ERROR, r + assert r.body.json()["error"]["code"] == "SessionConsistencyLost", r + else: + assert r.status_code == http.HTTPStatus.OK, r + + # Partition primary and forwarding backup from other backups + with network.partitioner.partition([primary, backup]): + # Write on partitioned primary msg0 = "AA" r0 = client_primary_0.post( "/app/log/private", @@ -586,29 +604,50 @@ def test_session_consistency(network, args): "msg": msg0, }, ) + assert r0.status_code == http.HTTPStatus.OK - msg1 = "BB" + # Read from partitioned backup, over forwarded session to primary r1 = client_backup_0.get(f"/app/log/private?id={msg_id}") + assert r1.status_code == http.HTTPStatus.OK + + # At this point despite partition these nodes believe this new transaction + # is still valid, and will still report it, so there is no consistency error + check_reads( + # NB: Only use these sessions, or we 'pollute' the other sessions + # with recent TxIDs + (client_primary_0, client_backup_0), + False, + ) - new_primary, new_view = network.wait_for_new_primary(primary, nodes=backups) + # Even once CheckQuorum takes effect and the primary stands down, they + # know nothing contradictory so session consistency is maintained + primary.wait_for_leadership_state( + r0.view, "Candidate", timeout=2 * args.election_timeout_ms / 1000 + ) + check_reads( + (client_primary_0, client_backup_0), + False, + ) - LOG.warning("What happens now?") - client_primary_0.get("/node/commit") - client_primary_1.get("/node/commit") - client_backup_0.get("/node/commit") - client_backup_1.get("/node/commit") + # Ensure the majority partition have elected their own new primary + _, new_view = network.wait_for_new_primary(primary, nodes=backups[1:]) - # Goal: - # - p0 should get errors - it wrote during partition - # - p1 should be fine, it only read some early, still valid state - # - b0 should get errors - it got discarded data over forwarded channel - # - b1 should get errors - similar? + # Now the partition heals, and the partitioned primary and backup are brought + # back up-to-date. This causes them to discard the state produced while partitioned network.wait_for_primary_unanimity(min_view=new_view - 1) - LOG.warning("And after de-partition?") - client_primary_0.get("/node/commit") - client_primary_1.get("/node/commit") - client_backup_0.get("/node/commit") - client_backup_1.get("/node/commit") + + check_reads( + # These sessions saw discarded state, either directly from their target node + # or via forwarding, so now report consistency errors + (client_primary_0, client_backup_0), + True, + ) + check_reads( + # These sessions saw earlier state from before the partition, which remains + # valid in the new view. Their session consistency is maintained + (client_primary_1, client_backup_1), + False, + ) return network @@ -673,6 +712,7 @@ def add(parser): args = infra.e2e_args.cli_args(add) args.nodes = infra.e2e_args.min_nodes(args, f=1) args.package = "samples/apps/logging/liblogging" + args.snapshot_tx_interval = 20 run(args) # run_2tx_reconfig_tests(args) From f4f797271f3a4b403cf7a0244c9bc5bdb8310d4c Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Fri, 14 Oct 2022 14:37:01 +0000 Subject: [PATCH 07/12] Cleanup and tweak comment --- src/node/rpc/frontend.h | 60 +++++++++++++++++++++++++++------------- tests/infra/network.py | 1 + tests/partitions_test.py | 21 ++++++-------- 3 files changed, 51 insertions(+), 31 deletions(-) diff --git a/src/node/rpc/frontend.h b/src/node/rpc/frontend.h index c7a7c2bce290..a35122b76149 100644 --- a/src/node/rpc/frontend.h +++ b/src/node/rpc/frontend.h @@ -197,7 +197,14 @@ namespace ccf return true; } - bool session_consistency_maintained( + struct SessionConsistencyConflict + { + ccf::TxID session_tx_id; + ccf::TxID commit_tx_id; + ccf::TxStatus status; + }; + + std::optional check_session_consistency( std::shared_ptr session_ctx) { if (session_ctx->last_tx_id.has_value()) @@ -205,7 +212,7 @@ namespace ccf if (consensus == nullptr) { LOG_FAIL_FMT("consensus deleted after TxID assigned"); - return false; + return std::nullopt; } auto tx_id = session_ctx->last_tx_id.value(); @@ -219,12 +226,20 @@ namespace ccf // Pending and Committed obviously retain session consistency. It's also // possible for the status to be Unknown - the last_tx_id was populated // from a forwarded response about state this node doesn't yet know. - // Only once this Tx is known to be invalidated can we report that + // Only once this Tx is known to be invalidated do we report that // session consistency has been lost. - return tx_status != ccf::TxStatus::Invalid; + if (tx_status == ccf::TxStatus::Invalid) + { + SessionConsistencyConflict scc; + scc.session_tx_id = tx_id; + scc.commit_tx_id.view = committed_view; + scc.commit_tx_id.seqno = committed_seqno; + scc.status = tx_status; + return scc; + } } - return true; + return std::nullopt; } std::unique_ptr get_authenticated_identity( @@ -439,21 +454,28 @@ namespace ccf endpoints.execute_endpoint(endpoint, args); - // NB: Do this check after execution, to ensure this transaction has - // claimed a read version if it is going to. If we do it any earlier, - // it is possible that we report that the previous TxID is still - // valid, but invalidate it between that point and when this - // transaction executes. - if (!session_consistency_maintained(ctx->get_session_context())) { - ctx->set_error( - HTTP_STATUS_INTERNAL_SERVER_ERROR, - ccf::errors::SessionConsistencyLost, - fmt::format( - "Previous transaction reported on this session ({}) is no " - "longer valid. Please start a new TLS session.", - ctx->get_session_context()->last_tx_id->to_str())); - return; + // NB: Do this check after execution, to ensure this transaction has + // claimed a read version if it is going to. If we do it any + // earlier, it is possible that we report that the previous TxID is + // still valid, but invalidate it between that point and when this + // transaction executes. + const auto scc = + check_session_consistency(ctx->get_session_context()); + if (scc.has_value()) + { + ctx->set_error( + HTTP_STATUS_INTERNAL_SERVER_ERROR, + ccf::errors::SessionConsistencyLost, + fmt::format( + "Previously reported transaction ID {} on this session, but " + "this node is now at {}, making the previous transaction ID " + "{}. Please start a new TLS session.", + scc->session_tx_id.to_str(), + scc->commit_tx_id.to_str(), + tx_status_to_str(scc->status))); + return; + } } if (!ctx->should_apply_writes()) diff --git a/tests/infra/network.py b/tests/infra/network.py index 48eeabfad6c7..eceed9045b82 100644 --- a/tests/infra/network.py +++ b/tests/infra/network.py @@ -1002,6 +1002,7 @@ def resize(self, target_count, args): primary, _ = self.find_primary() self.wait_for_all_nodes_to_commit(primary) LOG.success(f"Resized network from {initial_node_count} to {target_count} nodes") + return initial_node_count def wait_for_all_nodes_to_commit(self, primary=None, tx_id=None, timeout=10): """ diff --git a/tests/partitions_test.py b/tests/partitions_test.py index 1e31e14d4f11..8974fbdbdd78 100644 --- a/tests/partitions_test.py +++ b/tests/partitions_test.py @@ -509,7 +509,7 @@ def test_forwarding_timeout(network, args): @reqs.no_http2() def test_session_consistency(network, args): # Ensure we have 5 nodes - network.resize(5, args) + original_size = network.resize(5, args) primary, backups = network.find_nodes() backup = backups[0] @@ -578,13 +578,7 @@ def test_session_consistency(network, args): f"Failed to observe evidence of session forwarding after {n_attempts} attempts" ) - def check_reads(sessions=None, should_error=False): - sessions = sessions or ( - client_primary_0, - client_primary_1, - client_backup_0, - client_backup_1, - ) + def check_session_consistency(sessions, should_error=False): for client in sessions: r = client.get(f"/app/log/private?id={msg_id}") if should_error: @@ -612,7 +606,7 @@ def check_reads(sessions=None, should_error=False): # At this point despite partition these nodes believe this new transaction # is still valid, and will still report it, so there is no consistency error - check_reads( + check_session_consistency( # NB: Only use these sessions, or we 'pollute' the other sessions # with recent TxIDs (client_primary_0, client_backup_0), @@ -624,7 +618,7 @@ def check_reads(sessions=None, should_error=False): primary.wait_for_leadership_state( r0.view, "Candidate", timeout=2 * args.election_timeout_ms / 1000 ) - check_reads( + check_session_consistency( (client_primary_0, client_backup_0), False, ) @@ -636,19 +630,22 @@ def check_reads(sessions=None, should_error=False): # back up-to-date. This causes them to discard the state produced while partitioned network.wait_for_primary_unanimity(min_view=new_view - 1) - check_reads( + check_session_consistency( # These sessions saw discarded state, either directly from their target node # or via forwarding, so now report consistency errors (client_primary_0, client_backup_0), True, ) - check_reads( + check_session_consistency( # These sessions saw earlier state from before the partition, which remains # valid in the new view. Their session consistency is maintained (client_primary_1, client_backup_1), False, ) + # Restore original network size + network.resize(original_size, args) + return network From c7edf25bb0e02a323cd9eb088ac3f65e6940d444 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Fri, 14 Oct 2022 14:37:35 +0000 Subject: [PATCH 08/12] doot --- src/node/rpc/frontend.h | 1 - 1 file changed, 1 deletion(-) diff --git a/src/node/rpc/frontend.h b/src/node/rpc/frontend.h index a35122b76149..2a9a85a87c86 100644 --- a/src/node/rpc/frontend.h +++ b/src/node/rpc/frontend.h @@ -533,7 +533,6 @@ namespace ccf auto tx_id = tx.get_txid(); if (tx_id.has_value() && consensus != nullptr) { - // TODO: Also set this on forwarding path ctx->get_session_context()->last_tx_id = tx_id; try { From 4f59d36d863999a16c3d73ea25e3e54bfa9863f0 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Fri, 14 Oct 2022 14:39:54 +0000 Subject: [PATCH 09/12] doot --- tests/infra/network.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/infra/network.py b/tests/infra/network.py index eceed9045b82..67e0fee703f8 100644 --- a/tests/infra/network.py +++ b/tests/infra/network.py @@ -1001,7 +1001,9 @@ def resize(self, target_count, args): node_count -= 1 primary, _ = self.find_primary() self.wait_for_all_nodes_to_commit(primary) - LOG.success(f"Resized network from {initial_node_count} to {target_count} nodes") + LOG.success( + f"Resized network from {initial_node_count} to {target_count} nodes" + ) return initial_node_count def wait_for_all_nodes_to_commit(self, primary=None, tx_id=None, timeout=10): From 82c60270f45e2bbab867cf9ab08fab12cb014e33 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Mon, 17 Oct 2022 08:36:01 +0000 Subject: [PATCH 10/12] Oops --- tests/partitions_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/partitions_test.py b/tests/partitions_test.py index 8974fbdbdd78..559359919c5d 100644 --- a/tests/partitions_test.py +++ b/tests/partitions_test.py @@ -616,7 +616,7 @@ def check_session_consistency(sessions, should_error=False): # Even once CheckQuorum takes effect and the primary stands down, they # know nothing contradictory so session consistency is maintained primary.wait_for_leadership_state( - r0.view, "Candidate", timeout=2 * args.election_timeout_ms / 1000 + r0.view, "Follower", timeout=2 * args.election_timeout_ms / 1000 ) check_session_consistency( (client_primary_0, client_backup_0), From b160dcd28a4585d9190d3ebca328958a1ca498b5 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Thu, 20 Oct 2022 08:08:56 +0000 Subject: [PATCH 11/12] Stricter search for txID header --- src/http/http_endpoint.h | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/src/http/http_endpoint.h b/src/http/http_endpoint.h index 97930163d9b7..15a15004bdf4 100644 --- a/src/http/http_endpoint.h +++ b/src/http/http_endpoint.h @@ -186,13 +186,19 @@ namespace http void record_response_txid(std::span raw_response) override { // To avoid a full HTTP parse, search for the desired header directly - std::string_view response( + const std::string_view response( (char const*)raw_response.data(), raw_response.size()); - std::string_view target(http::headers::CCF_TX_ID); - auto header_begin = std::search( + const std::string_view header_line_break = "\r\n"; + const auto target = + fmt::format("{}{}:", header_line_break, http::headers::CCF_TX_ID); + const auto header_begin = std::search( response.begin(), response.end(), target.begin(), target.end()); - auto header_name_end = std::find(header_begin, response.end(), ':'); - auto header_value_end = std::find(header_name_end, response.end(), '\r'); + const auto header_name_end = header_begin + target.length(); + const auto header_value_end = std::search( + header_name_end, + response.end(), + header_line_break.begin(), + header_line_break.end()); if (header_value_end == response.end()) { @@ -200,7 +206,7 @@ namespace http return; } - auto after_name_end = std::next(header_name_end); + const auto after_name_end = std::next(header_name_end); std::string_view header_value( after_name_end, header_value_end - after_name_end); const auto leading_spaces = header_value.find_first_not_of(' '); @@ -209,7 +215,7 @@ namespace http header_value.remove_prefix(leading_spaces); } - auto tx_id = ccf::TxID::from_str(header_value); + const auto tx_id = ccf::TxID::from_str(header_value); if (!tx_id.has_value()) { LOG_FAIL_FMT( From cee91ef9ae91425f3979abf36814ee1c3580a969 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Thu, 20 Oct 2022 08:09:03 +0000 Subject: [PATCH 12/12] Restore disabled tests --- tests/partitions_test.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/partitions_test.py b/tests/partitions_test.py index 559359919c5d..b3108249adc0 100644 --- a/tests/partitions_test.py +++ b/tests/partitions_test.py @@ -685,14 +685,14 @@ def run(args): ) as network: network.start_and_open(args) - # test_invalid_partitions(network, args) - # test_partition_majority(network, args) - # test_isolate_primary_from_one_backup(network, args) - # test_new_joiner_helps_liveness(network, args) - # for n in range(5): - # test_isolate_and_reconnect_primary(network, args, iteration=n) - # test_election_reconfiguration(network, args) - # test_forwarding_timeout(network, args) + test_invalid_partitions(network, args) + test_partition_majority(network, args) + test_isolate_primary_from_one_backup(network, args) + test_new_joiner_helps_liveness(network, args) + for n in range(5): + test_isolate_and_reconnect_primary(network, args, iteration=n) + test_election_reconfiguration(network, args) + test_forwarding_timeout(network, args) test_session_consistency(network, args) @@ -712,4 +712,4 @@ def add(parser): args.snapshot_tx_interval = 20 run(args) - # run_2tx_reconfig_tests(args) + run_2tx_reconfig_tests(args)