diff --git a/.threading_canary b/.threading_canary index a286117986a1..bc32cd175b55 100644 --- a/.threading_canary +++ b/.threading_canary @@ -1 +1 @@ -This looks like a "job" for Threading Canary!!!!! +THIS looks like a job for Threading Canary!! diff --git a/src/consensus/aft/test/logging_stub.h b/src/consensus/aft/test/logging_stub.h index 047ef4853d8d..08fd1c42b238 100644 --- a/src/consensus/aft/test/logging_stub.h +++ b/src/consensus/aft/test/logging_stub.h @@ -253,6 +253,9 @@ namespace aft } void set_message_limit(size_t message_limit) override {} + void set_idle_timeout(std::chrono::milliseconds idle_timeout) override {} + + void tick(std::chrono::milliseconds elapsed) override {} bool recv_authenticated_with_load( const ccf::NodeId& from, const uint8_t*& data, size_t& size) override diff --git a/src/enclave/enclave.h b/src/enclave/enclave.h index 2de95d87b964..b1e7524e199e 100644 --- a/src/enclave/enclave.h +++ b/src/enclave/enclave.h @@ -214,6 +214,12 @@ namespace ccf node->set_n2n_message_limit(ccf_config_.node_to_node_message_limit); + // If we haven't heard from a node for multiple elections, then cleanup + // their node-to-node channel + const auto idle_timeout = + std::chrono::milliseconds(ccf_config_.consensus.election_timeout) * 4; + node->set_n2n_idle_timeout(idle_timeout); + ccf::NodeCreateInfo r; try { diff --git a/src/node/node_state.h b/src/node/node_state.h index 9b714e433d18..0a7467a1962a 100644 --- a/src/node/node_state.h +++ b/src/node/node_state.h @@ -1559,6 +1559,8 @@ namespace ccf const auto tx_id = consensus->get_committed_txid(); indexer->update_strategies(elapsed, {tx_id.first, tx_id.second}); } + + n2n_channels->tick(elapsed); } void tick_end() @@ -2596,6 +2598,11 @@ namespace ccf n2n_channels->set_message_limit(message_limit); } + void set_n2n_idle_timeout(std::chrono::milliseconds idle_timeout) + { + n2n_channels->set_idle_timeout(idle_timeout); + } + virtual const StartupConfig& get_node_config() const override { return config; diff --git a/src/node/node_to_node.h b/src/node/node_to_node.h index bb1b42499b26..6e2af37cbc4b 100644 --- a/src/node/node_to_node.h +++ b/src/node/node_to_node.h @@ -141,5 +141,8 @@ namespace ccf size_t size) = 0; virtual void set_message_limit(size_t message_limit) = 0; + virtual void set_idle_timeout(std::chrono::milliseconds idle_timeout) = 0; + + virtual void tick(std::chrono::milliseconds elapsed) = 0; }; } diff --git a/src/node/node_to_node_channel_manager.h b/src/node/node_to_node_channel_manager.h index 1e1b62de5bbf..414f0b09b5f2 100644 --- a/src/node/node_to_node_channel_manager.h +++ b/src/node/node_to_node_channel_manager.h @@ -14,7 +14,13 @@ namespace ccf ringbuffer::AbstractWriterFactory& writer_factory; ringbuffer::WriterPtr to_host; - std::unordered_map> channels; + struct ChannelInfo + { + std::shared_ptr channel; + std::chrono::milliseconds idle_time; + }; + + std::unordered_map channels; ccf::pal::Mutex lock; //< Protects access to channels map struct ThisNode @@ -36,6 +42,10 @@ namespace ccf std::nullopt; #endif + // This is set during node startup, using a value derived from the run-time + // configuration. Before that, no timeout applies + std::optional idle_timeout = std::nullopt; + std::shared_ptr get_channel(const NodeId& peer_id) { CCF_ASSERT_FMT( @@ -55,21 +65,23 @@ namespace ccf auto search = channels.find(peer_id); if (search != channels.end()) { - return search->second; + auto& channel_info = search->second; + channel_info.idle_time = std::chrono::milliseconds(0); + return channel_info.channel; } // Create channel - channels.try_emplace( + auto channel = std::make_shared( + writer_factory, + this_node->service_cert, + this_node->node_kp, + this_node->endorsed_node_cert.value(), + this_node->node_id, peer_id, - std::make_shared( - writer_factory, - this_node->service_cert, - this_node->node_kp, - this_node->endorsed_node_cert.value(), - this_node->node_id, - peer_id, - message_limit.value())); - return channels.at(peer_id); + message_limit.value()); + auto info = ChannelInfo{channel, std::chrono::milliseconds(0)}; + channels.try_emplace(peer_id, info); + return channel; } public: @@ -116,6 +128,41 @@ namespace ccf message_limit = message_limit_; } + void set_idle_timeout(std::chrono::milliseconds idle_timeout_) override + { + idle_timeout = idle_timeout_; + } + + void tick(std::chrono::milliseconds elapsed) override + { + std::lock_guard guard(lock); + + if (idle_timeout.has_value()) + { + // Close idle channels + auto it = channels.begin(); + while (it != channels.end()) + { + const auto idle_time = it->second.idle_time += elapsed; + if (idle_time < idle_timeout.value()) + { + ++it; + } + else + { + LOG_DEBUG_FMT( + "Closing idle channel to node {}. Was idle for {}, threshold for " + "closure is {}", + it->first, + idle_time, + idle_timeout.value()); + it->second.channel->close_channel(); + it = channels.erase(it); + } + } + } + } + virtual void associate_node_address( const NodeId& peer_id, const std::string& peer_hostname, @@ -129,22 +176,12 @@ namespace ccf peer_service); } - void close_channel(const NodeId& peer_id) override - { - get_channel(peer_id)->close_channel(); - } - bool have_channel(const ccf::NodeId& nid) override { std::lock_guard guard(lock); return channels.find(nid) != channels.end(); } - bool channel_open(const NodeId& peer_id) - { - return get_channel(peer_id)->channel_open(); - } - bool send_authenticated( const NodeId& to, NodeMsgType type, @@ -232,10 +269,27 @@ namespace ccf return get_channel(from)->recv_key_exchange_message(data, size); } - // NB: Only used by tests! + // NB: Following methods are only used by tests! bool recv_channel_message(const NodeId& from, std::vector&& body) { return recv_channel_message(from, body.data(), body.size()); } + + void close_channel(const NodeId& peer_id) override + { + std::lock_guard guard(lock); + + auto search = channels.find(peer_id); + if (search != channels.end()) + { + search->second.channel->close_channel(); + channels.erase(search); + } + } + + bool channel_open(const NodeId& peer_id) + { + return get_channel(peer_id)->channel_open(); + } }; } diff --git a/src/node/test/channels.cpp b/src/node/test/channels.cpp index e5a871501f2e..cba3ee51004c 100644 --- a/src/node/test/channels.cpp +++ b/src/node/test/channels.cpp @@ -1490,3 +1490,150 @@ TEST_CASE_FIXTURE(IORingbuffersFixture, "Key rotation") REQUIRE(received_by_1 == expected_received_by_1); REQUIRE(received_by_2 == expected_received_by_2); } + +TEST_CASE_FIXTURE(IORingbuffersFixture, "Timeout idle channels") +{ + auto network_kp = crypto::make_key_pair(default_curve); + auto service_cert = generate_self_signed_cert(network_kp, "CN=Network"); + + auto channel1_kp = crypto::make_key_pair(default_curve); + auto channel1_cert = + generate_endorsed_cert(channel1_kp, "CN=Node1", network_kp, service_cert); + + auto channel2_kp = crypto::make_key_pair(default_curve); + auto channel2_cert = + generate_endorsed_cert(channel2_kp, "CN=Node2", network_kp, service_cert); + + const auto idle_timeout = std::chrono::milliseconds(10); + const auto not_quite_idle = 2 * idle_timeout / 3; + + auto channels1 = NodeToNodeChannelManager(wf1); + channels1.initialize(nid1, service_cert, channel1_kp, channel1_cert); + channels1.set_idle_timeout(idle_timeout); + + auto channels2 = NodeToNodeChannelManager(wf2); + channels2.initialize(nid2, service_cert, channel2_kp, channel2_cert); + channels2.set_idle_timeout(idle_timeout); + + MsgType msg; + msg.fill(0x42); + + { + INFO("Idle channels are destroyed"); + REQUIRE_FALSE(channels1.have_channel(nid2)); + REQUIRE(channels1.send_authenticated( + nid2, NodeMsgType::consensus_msg, msg.begin(), msg.size())); + + REQUIRE_FALSE(channels2.have_channel(nid1)); + REQUIRE(channels2.send_authenticated( + nid1, NodeMsgType::consensus_msg, msg.begin(), msg.size())); + + REQUIRE(channels1.have_channel(nid2)); + REQUIRE(channels2.have_channel(nid1)); + + channels1.tick(not_quite_idle); + REQUIRE(channels1.have_channel(nid2)); + REQUIRE(channels2.have_channel(nid1)); + + channels1.tick(not_quite_idle); + REQUIRE_FALSE(channels1.have_channel(nid2)); + REQUIRE(channels2.have_channel(nid1)); + + channels2.tick(idle_timeout); + REQUIRE_FALSE(channels1.have_channel(nid2)); + REQUIRE_FALSE(channels2.have_channel(nid1)); + + // Flush previous messages + read_outbound_msgs(eio1); + read_outbound_msgs(eio2); + } + + // Send some messages from 1 to 2. Confirm that those keep the channel (on + // both ends) from being destroyed + bool handshake_complete = false; + + for (size_t i = 0; i < 20; ++i) + { + REQUIRE(channels1.send_authenticated( + nid2, NodeMsgType::consensus_msg, msg.begin(), msg.size())); + + auto msgs = read_outbound_msgs(eio1); + for (const auto& msg : msgs) + { + switch (msg.type) + { + case NodeMsgType::channel_msg: + { + channels2.recv_channel_message(msg.from, msg.data()); + break; + } + case NodeMsgType::consensus_msg: + { + auto hdr = msg.authenticated_hdr; + const auto* data = msg.payload.data(); + auto size = msg.payload.size(); + + REQUIRE(channels2.recv_authenticated( + msg.from, {hdr.data(), hdr.size()}, data, size)); + break; + } + default: + { + REQUIRE(false); + } + } + } + + if (!handshake_complete) + { + // Deliver any responses from 2 to 1, to complete handshake + msgs = read_outbound_msgs(eio2); + if (msgs.empty()) + { + handshake_complete = true; + } + else + { + for (const auto& msg : msgs) + { + switch (msg.type) + { + case NodeMsgType::channel_msg: + { + channels1.recv_channel_message(msg.from, msg.data()); + break; + } + default: + { + REQUIRE(false); + } + } + } + } + } + + { + INFO("Sends preserve channels"); + REQUIRE(channels1.have_channel(nid2)); + } + + { + INFO("Receives preserve channels"); + REQUIRE(channels2.have_channel(nid1)); + } + + channels1.tick(not_quite_idle); + channels2.tick(not_quite_idle); + } + + REQUIRE(handshake_complete); + + { + INFO("After comms, channels may still close due to idleness"); + channels1.tick(not_quite_idle); + REQUIRE_FALSE(channels1.have_channel(nid2)); + + channels2.tick(not_quite_idle); + REQUIRE_FALSE(channels2.have_channel(nid1)); + } +}