Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeout idle node-to-node channels #5266

Merged
merged 9 commits into from
May 19, 2023
Merged
2 changes: 1 addition & 1 deletion .threading_canary
Original file line number Diff line number Diff line change
@@ -1 +1 @@
This looks like a "job" for Threading Canary!!!!!
THIS looks like a job for Threading Canary!!
3 changes: 3 additions & 0 deletions src/consensus/aft/test/logging_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,9 @@ namespace aft
}

void set_message_limit(size_t message_limit) override {}
void set_idle_timeout(std::chrono::milliseconds idle_timeout) override {}

void tick(std::chrono::milliseconds elapsed) override {}

bool recv_authenticated_with_load(
const ccf::NodeId& from, const uint8_t*& data, size_t& size) override
Expand Down
6 changes: 6 additions & 0 deletions src/enclave/enclave.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,12 @@ namespace ccf

node->set_n2n_message_limit(ccf_config_.node_to_node_message_limit);

// If we haven't heard from a node for multiple elections, then cleanup
// their node-to-node channel
const auto idle_timeout =
std::chrono::milliseconds(ccf_config_.consensus.election_timeout) * 4;
node->set_n2n_idle_timeout(idle_timeout);

ccf::NodeCreateInfo r;
try
{
Expand Down
7 changes: 7 additions & 0 deletions src/node/node_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -1559,6 +1559,8 @@ namespace ccf
const auto tx_id = consensus->get_committed_txid();
indexer->update_strategies(elapsed, {tx_id.first, tx_id.second});
}

n2n_channels->tick(elapsed);
}

void tick_end()
Expand Down Expand Up @@ -2596,6 +2598,11 @@ namespace ccf
n2n_channels->set_message_limit(message_limit);
}

void set_n2n_idle_timeout(std::chrono::milliseconds idle_timeout)
{
n2n_channels->set_idle_timeout(idle_timeout);
}

virtual const StartupConfig& get_node_config() const override
{
return config;
Expand Down
3 changes: 3 additions & 0 deletions src/node/node_to_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,5 +141,8 @@ namespace ccf
size_t size) = 0;

virtual void set_message_limit(size_t message_limit) = 0;
virtual void set_idle_timeout(std::chrono::milliseconds idle_timeout) = 0;

virtual void tick(std::chrono::milliseconds elapsed) = 0;
};
}
100 changes: 77 additions & 23 deletions src/node/node_to_node_channel_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ namespace ccf
ringbuffer::AbstractWriterFactory& writer_factory;
ringbuffer::WriterPtr to_host;

std::unordered_map<NodeId, std::shared_ptr<Channel>> channels;
struct ChannelInfo
{
std::shared_ptr<Channel> channel;
std::chrono::milliseconds idle_time;
};

std::unordered_map<NodeId, ChannelInfo> channels;
ccf::pal::Mutex lock; //< Protects access to channels map

struct ThisNode
Expand All @@ -36,6 +42,10 @@ namespace ccf
std::nullopt;
#endif

// This is set during node startup, using a value derived from the run-time
// configuration. Before that, no timeout applies
std::optional<std::chrono::milliseconds> idle_timeout = std::nullopt;

std::shared_ptr<Channel> get_channel(const NodeId& peer_id)
{
CCF_ASSERT_FMT(
Expand All @@ -55,21 +65,23 @@ namespace ccf
auto search = channels.find(peer_id);
if (search != channels.end())
{
return search->second;
auto& channel_info = search->second;
channel_info.idle_time = std::chrono::milliseconds(0);
return channel_info.channel;
}

// Create channel
channels.try_emplace(
auto channel = std::make_shared<Channel>(
writer_factory,
this_node->service_cert,
this_node->node_kp,
this_node->endorsed_node_cert.value(),
this_node->node_id,
peer_id,
std::make_shared<Channel>(
writer_factory,
this_node->service_cert,
this_node->node_kp,
this_node->endorsed_node_cert.value(),
this_node->node_id,
peer_id,
message_limit.value()));
return channels.at(peer_id);
message_limit.value());
auto info = ChannelInfo{channel, std::chrono::milliseconds(0)};
channels.try_emplace(peer_id, info);
return channel;
}

public:
Expand Down Expand Up @@ -116,6 +128,41 @@ namespace ccf
message_limit = message_limit_;
}

void set_idle_timeout(std::chrono::milliseconds idle_timeout_) override
{
idle_timeout = idle_timeout_;
}

void tick(std::chrono::milliseconds elapsed) override
{
std::lock_guard<ccf::pal::Mutex> guard(lock);

if (idle_timeout.has_value())
{
// Close idle channels
auto it = channels.begin();
while (it != channels.end())
{
const auto idle_time = it->second.idle_time += elapsed;
if (idle_time < idle_timeout.value())
{
++it;
}
else
{
LOG_DEBUG_FMT(
"Closing idle channel to node {}. Was idle for {}, threshold for "
"closure is {}",
it->first,
idle_time,
idle_timeout.value());
it->second.channel->close_channel();
it = channels.erase(it);
}
}
}
}

virtual void associate_node_address(
const NodeId& peer_id,
const std::string& peer_hostname,
Expand All @@ -129,22 +176,12 @@ namespace ccf
peer_service);
}

void close_channel(const NodeId& peer_id) override
{
get_channel(peer_id)->close_channel();
}

bool have_channel(const ccf::NodeId& nid) override
{
std::lock_guard<ccf::pal::Mutex> guard(lock);
return channels.find(nid) != channels.end();
}

bool channel_open(const NodeId& peer_id)
{
return get_channel(peer_id)->channel_open();
}

bool send_authenticated(
const NodeId& to,
NodeMsgType type,
Expand Down Expand Up @@ -232,10 +269,27 @@ namespace ccf
return get_channel(from)->recv_key_exchange_message(data, size);
}

// NB: Only used by tests!
// NB: Following methods are only used by tests!
bool recv_channel_message(const NodeId& from, std::vector<uint8_t>&& body)
{
return recv_channel_message(from, body.data(), body.size());
}

void close_channel(const NodeId& peer_id) override
{
std::lock_guard<ccf::pal::Mutex> guard(lock);

auto search = channels.find(peer_id);
if (search != channels.end())
{
search->second.channel->close_channel();
channels.erase(search);
}
}

bool channel_open(const NodeId& peer_id)
{
return get_channel(peer_id)->channel_open();
}
};
}
147 changes: 147 additions & 0 deletions src/node/test/channels.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1490,3 +1490,150 @@ TEST_CASE_FIXTURE(IORingbuffersFixture, "Key rotation")
REQUIRE(received_by_1 == expected_received_by_1);
REQUIRE(received_by_2 == expected_received_by_2);
}

TEST_CASE_FIXTURE(IORingbuffersFixture, "Timeout idle channels")
{
auto network_kp = crypto::make_key_pair(default_curve);
auto service_cert = generate_self_signed_cert(network_kp, "CN=Network");

auto channel1_kp = crypto::make_key_pair(default_curve);
auto channel1_cert =
generate_endorsed_cert(channel1_kp, "CN=Node1", network_kp, service_cert);

auto channel2_kp = crypto::make_key_pair(default_curve);
auto channel2_cert =
generate_endorsed_cert(channel2_kp, "CN=Node2", network_kp, service_cert);

const auto idle_timeout = std::chrono::milliseconds(10);
const auto not_quite_idle = 2 * idle_timeout / 3;

auto channels1 = NodeToNodeChannelManager(wf1);
channels1.initialize(nid1, service_cert, channel1_kp, channel1_cert);
channels1.set_idle_timeout(idle_timeout);

auto channels2 = NodeToNodeChannelManager(wf2);
channels2.initialize(nid2, service_cert, channel2_kp, channel2_cert);
channels2.set_idle_timeout(idle_timeout);

MsgType msg;
msg.fill(0x42);

{
INFO("Idle channels are destroyed");
REQUIRE_FALSE(channels1.have_channel(nid2));
REQUIRE(channels1.send_authenticated(
nid2, NodeMsgType::consensus_msg, msg.begin(), msg.size()));

REQUIRE_FALSE(channels2.have_channel(nid1));
REQUIRE(channels2.send_authenticated(
nid1, NodeMsgType::consensus_msg, msg.begin(), msg.size()));

REQUIRE(channels1.have_channel(nid2));
REQUIRE(channels2.have_channel(nid1));

channels1.tick(not_quite_idle);
REQUIRE(channels1.have_channel(nid2));
REQUIRE(channels2.have_channel(nid1));

channels1.tick(not_quite_idle);
REQUIRE_FALSE(channels1.have_channel(nid2));
REQUIRE(channels2.have_channel(nid1));

channels2.tick(idle_timeout);
REQUIRE_FALSE(channels1.have_channel(nid2));
REQUIRE_FALSE(channels2.have_channel(nid1));

// Flush previous messages
read_outbound_msgs<MsgType>(eio1);
read_outbound_msgs<MsgType>(eio2);
}

// Send some messages from 1 to 2. Confirm that those keep the channel (on
// both ends) from being destroyed
bool handshake_complete = false;

for (size_t i = 0; i < 20; ++i)
{
REQUIRE(channels1.send_authenticated(
nid2, NodeMsgType::consensus_msg, msg.begin(), msg.size()));

auto msgs = read_outbound_msgs<MsgType>(eio1);
for (const auto& msg : msgs)
{
switch (msg.type)
{
case NodeMsgType::channel_msg:
{
channels2.recv_channel_message(msg.from, msg.data());
break;
}
case NodeMsgType::consensus_msg:
{
auto hdr = msg.authenticated_hdr;
const auto* data = msg.payload.data();
auto size = msg.payload.size();

REQUIRE(channels2.recv_authenticated(
msg.from, {hdr.data(), hdr.size()}, data, size));
break;
}
default:
{
REQUIRE(false);
}
}
}

if (!handshake_complete)
{
// Deliver any responses from 2 to 1, to complete handshake
msgs = read_outbound_msgs<MsgType>(eio2);
if (msgs.empty())
{
handshake_complete = true;
}
else
{
for (const auto& msg : msgs)
{
switch (msg.type)
{
case NodeMsgType::channel_msg:
{
channels1.recv_channel_message(msg.from, msg.data());
break;
}
default:
{
REQUIRE(false);
}
}
}
}
}

{
INFO("Sends preserve channels");
REQUIRE(channels1.have_channel(nid2));
}

{
INFO("Receives preserve channels");
REQUIRE(channels2.have_channel(nid1));
}

channels1.tick(not_quite_idle);
channels2.tick(not_quite_idle);
}

REQUIRE(handshake_complete);

{
INFO("After comms, channels may still close due to idleness");
channels1.tick(not_quite_idle);
REQUIRE_FALSE(channels1.have_channel(nid2));

channels2.tick(not_quite_idle);
REQUIRE_FALSE(channels2.have_channel(nid1));
}
}