Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter channels with spare capacity when broadcasting #4848

Merged
merged 5 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions nano/core_test/active_elections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,7 @@ TEST (active_elections, fork_replacement_tally)
node_config.peering_port = system.get_available_port ();
auto & node2 (*system.add_node (node_config));
node1.network.filter.clear ();
node2.network.flood_block (send_last, nano::transport::traffic_type::test);
ASSERT_TRUE (node2.network.flood_block (send_last, nano::transport::traffic_type::test));
ASSERT_TIMELY (3s, node1.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) > 0);

// Correct block without votes is ignored
Expand All @@ -985,7 +985,7 @@ TEST (active_elections, fork_replacement_tally)
// ensure vote arrives before the block
ASSERT_TIMELY_EQ (5s, 1, node1.vote_cache.find (send_last->hash ()).size ());
node1.network.filter.clear ();
node2.network.flood_block (send_last, nano::transport::traffic_type::test);
ASSERT_TRUE (node2.network.flood_block (send_last, nano::transport::traffic_type::test));
ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) > 1);

// the send_last block should replace one of the existing block of the election because it has higher vote weight
Expand Down
22 changes: 20 additions & 2 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ TEST (network, send_discarded_publish)
.build ();
{
auto transaction = node1.ledger.tx_begin_read ();
node1.network.flood_block (block, nano::transport::traffic_type::test);
auto sent = node1.network.flood_block (block, nano::transport::traffic_type::test);
ASSERT_EQ (1, sent);
ASSERT_EQ (nano::dev::genesis->hash (), node1.ledger.any.account_head (transaction, nano::dev::genesis_key.pub));
ASSERT_EQ (nano::dev::genesis->hash (), node2.latest (nano::dev::genesis_key.pub));
}
Expand All @@ -221,7 +222,8 @@ TEST (network, send_invalid_publish)
.build ();
{
auto transaction = node1.ledger.tx_begin_read ();
node1.network.flood_block (block, nano::transport::traffic_type::test);
auto sent = node1.network.flood_block (block, nano::transport::traffic_type::test);
ASSERT_EQ (1, sent);
ASSERT_EQ (nano::dev::genesis->hash (), node1.ledger.any.account_head (transaction, nano::dev::genesis_key.pub));
ASSERT_EQ (nano::dev::genesis->hash (), node2.latest (nano::dev::genesis_key.pub));
}
Expand Down Expand Up @@ -1149,3 +1151,19 @@ TEST (network, purge_dead_channel_remote)
};
ASSERT_TIMELY (5s, !channel_exists (node2, channel));
}

TEST (network, flood_vote)
{
nano::test::system system{ 4 };

auto & node = *system.nodes[0];

// Make one of the nodes a representative
system.wallet (1)->insert_adhoc (nano::dev::genesis_key.prv);
ASSERT_TIMELY_EQ (5s, node.rep_crawler.representative_count (), 1);

auto vote = nano::test::make_vote (nano::dev::genesis_key, { nano::dev::genesis->hash () });
ASSERT_EQ (3, node.network.flood_vote_rebroadcasted (vote, 999.0f));
ASSERT_EQ (2, node.network.flood_vote_non_pr (vote, 999.0f));
ASSERT_EQ (1, node.network.flood_vote_pr (vote));
}
4 changes: 2 additions & 2 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2238,11 +2238,11 @@ TEST (node, DISABLED_fork_invalid_block_signature)
node1.process_active (send1);
ASSERT_TIMELY (5s, node1.block (send1->hash ()));
// Send the vote with the corrupt block signature
node2.network.flood_vote (vote_corrupt, 1.0f);
ASSERT_TRUE (node2.network.flood_vote_rebroadcasted (vote_corrupt, 1.0f));
// Wait for the rollback
ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::rollback));
// Send the vote with the correct block
node2.network.flood_vote (vote, 1.0f);
ASSERT_TRUE (node2.network.flood_vote_rebroadcasted (vote, 1.0f));
ASSERT_TIMELY (10s, !node1.block (send1->hash ()));
ASSERT_TIMELY (10s, node1.block (send2->hash ()));
ASSERT_EQ (node1.block (send2->hash ())->block_signature (), send2->block_signature ());
Expand Down
5 changes: 4 additions & 1 deletion nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ enum class detail
error,
failed,
refresh,
sent,

// processing queue
queue,
Expand Down Expand Up @@ -432,11 +433,13 @@ enum class detail
cleanup_outdated,
erase_stale,

// vote generator
// vote_generator
generator_broadcasts,
generator_replies,
generator_replies_discarded,
generator_spacing,
sent_pr,
sent_non_pr,

// hinting
missing_block,
Expand Down
101 changes: 74 additions & 27 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,72 +247,109 @@ void nano::network::send_keepalive_self (std::shared_ptr<nano::transport::channe
channel->send (message, nano::transport::traffic_type::keepalive);
}

void nano::network::flood_message (nano::message const & message, nano::transport::traffic_type type, float scale) const
size_t nano::network::flood_message (nano::message const & message, nano::transport::traffic_type type, float scale) const
{
for (auto const & channel : list (fanout (scale)))
auto channels = list (fanout (scale), [type] (auto const & channel) {
return !channel->max (type); // Only use channels that are not full for this traffic type
});
size_t result = 0;
for (auto const & channel : channels)
{
channel->send (message, type);
bool sent = channel->send (message, type);
result += sent;
}
return result;
}

void nano::network::flood_keepalive (float scale) const
size_t nano::network::flood_keepalive (float scale) const
{
nano::keepalive message{ node.network_params.network };
random_fill (message.peers);
flood_message (message, nano::transport::traffic_type::keepalive, scale);
return flood_message (message, nano::transport::traffic_type::keepalive, scale);
}

void nano::network::flood_keepalive_self (float scale) const
size_t nano::network::flood_keepalive_self (float scale) const
{
nano::keepalive message{ node.network_params.network };
fill_keepalive_self (message.peers);
flood_message (message, nano::transport::traffic_type::keepalive, scale);
return flood_message (message, nano::transport::traffic_type::keepalive, scale);
}

void nano::network::flood_block (std::shared_ptr<nano::block> const & block, nano::transport::traffic_type type) const
size_t nano::network::flood_block (std::shared_ptr<nano::block> const & block, nano::transport::traffic_type type) const
{
nano::publish message{ node.network_params.network, block };
flood_message (message, type);
return flood_message (message, type);
}

void nano::network::flood_block_initial (std::shared_ptr<nano::block> const & block) const
size_t nano::network::flood_block_initial (std::shared_ptr<nano::block> const & block) const
{
nano::publish message{ node.network_params.network, block, /* is_originator */ true };

size_t result = 0;
for (auto const & rep : node.rep_crawler.principal_representatives ())
{
rep.channel->send (message, nano::transport::traffic_type::block_broadcast_initial);
bool sent = rep.channel->send (message, nano::transport::traffic_type::block_broadcast_initial);
result += sent;
}
for (auto & peer : list_non_pr (fanout (1.0)))
{
peer->send (message, nano::transport::traffic_type::block_broadcast_initial);
bool sent = peer->send (message, nano::transport::traffic_type::block_broadcast_initial);
result += sent;
}
return result;
}

void nano::network::flood_vote (std::shared_ptr<nano::vote> const & vote, float scale, bool rebroadcasted) const
size_t nano::network::flood_vote_rebroadcasted (std::shared_ptr<nano::vote> const & vote, float scale) const
{
nano::confirm_ack message{ node.network_params.network, vote, rebroadcasted };
for (auto & channel : list (fanout (scale)))
nano::confirm_ack message{ node.network_params.network, vote, /* rebroadcasted */ true };

auto const type = nano::transport::traffic_type::vote_rebroadcast;

auto channels = list (fanout (scale), [type] (auto const & channel) {
return !channel->max (type); // Only use channels that are not full for this traffic type
});

size_t result = 0;
for (auto & channel : channels)
{
channel->send (message, rebroadcasted ? nano::transport::traffic_type::vote_rebroadcast : nano::transport::traffic_type::vote);
bool sent = channel->send (message, type);
result += sent;
}
return result;
}

void nano::network::flood_vote_non_pr (std::shared_ptr<nano::vote> const & vote, float scale, bool rebroadcasted) const
size_t nano::network::flood_vote_non_pr (std::shared_ptr<nano::vote> const & vote, float scale) const
{
nano::confirm_ack message{ node.network_params.network, vote, rebroadcasted };
for (auto & channel : list_non_pr (fanout (scale)))
nano::confirm_ack message{ node.network_params.network, vote };

auto const type = transport::traffic_type::vote;

auto channels = list_non_pr (fanout (scale), [type] (auto const & channel) {
return !channel->max (type); // Only use channels that are not full for this traffic type
});

size_t result = 0;
for (auto & channel : channels)
{
channel->send (message, rebroadcasted ? nano::transport::traffic_type::vote_rebroadcast : nano::transport::traffic_type::vote);
bool sent = channel->send (message, type);
result += sent;
}
return result;
}

void nano::network::flood_vote_pr (std::shared_ptr<nano::vote> const & vote, bool rebroadcasted) const
size_t nano::network::flood_vote_pr (std::shared_ptr<nano::vote> const & vote) const
{
nano::confirm_ack message{ node.network_params.network, vote, rebroadcasted };
nano::confirm_ack message{ node.network_params.network, vote };

auto const type = nano::transport::traffic_type::vote;

size_t result = 0;
for (auto const & channel : node.rep_crawler.principal_representatives ())
{
channel.channel->send (message, rebroadcasted ? nano::transport::traffic_type::vote_rebroadcast : nano::transport::traffic_type::vote);
bool sent = channel.channel->send (message, type);
result += sent;
}
return result;
}

void nano::network::flood_block_many (std::deque<std::shared_ptr<nano::block>> blocks, nano::transport::traffic_type type, std::chrono::milliseconds delay, std::function<void ()> callback) const
Expand Down Expand Up @@ -397,9 +434,9 @@ bool nano::network::track_reachout (nano::endpoint const & endpoint_a)
return tcp_channels.track_reachout (endpoint_a);
}

std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list (std::size_t max_count, uint8_t minimum_version) const
std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list (std::size_t max_count, channel_filter filter) const
{
auto result = tcp_channels.list (minimum_version);
auto result = tcp_channels.list (filter);
nano::random_pool_shuffle (result.begin (), result.end ()); // Randomize returned peer order
if (max_count > 0 && result.size () > max_count)
{
Expand All @@ -408,9 +445,9 @@ std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list (std::
return result;
}

std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list_non_pr (std::size_t max_count, uint8_t minimum_version) const
std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list_non_pr (std::size_t max_count, channel_filter filter) const
{
auto result = tcp_channels.list (minimum_version);
auto result = tcp_channels.list (filter);

auto partition_point = std::partition (result.begin (), result.end (),
[this] (std::shared_ptr<nano::transport::channel> const & channel) {
Expand All @@ -427,6 +464,16 @@ std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list_non_pr
return result;
}

std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list (std::size_t max_count, uint8_t minimum_version) const
{
return list (max_count, [minimum_version] (auto const & channel) { return channel->get_network_version () >= minimum_version; });
}

std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list_non_pr (std::size_t max_count, uint8_t minimum_version) const
{
return list_non_pr (max_count, [minimum_version] (auto const & channel) { return channel->get_network_version () >= minimum_version; });
}

// Simulating with sqrt_broadcast_simulate shows we only need to broadcast to sqrt(total_peers) random peers in order to successfully publish to everyone with high probability
std::size_t nano::network::fanout (float scale) const
{
Expand Down
25 changes: 15 additions & 10 deletions nano/node/network.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,16 @@ class network final

nano::endpoint endpoint () const;

void flood_message (nano::message const &, nano::transport::traffic_type, float scale = 1.0f) const;
void flood_keepalive (float scale = 1.0f) const;
void flood_keepalive_self (float scale = 0.5f) const;
void flood_vote (std::shared_ptr<nano::vote> const &, float scale, bool rebroadcasted = false) const;
void flood_vote_pr (std::shared_ptr<nano::vote> const &, bool rebroadcasted = false) const;
void flood_vote_non_pr (std::shared_ptr<nano::vote> const &, float scale, bool rebroadcasted = false) const;
size_t flood_message (nano::message const &, nano::transport::traffic_type, float scale = 1.0f) const;
size_t flood_keepalive (float scale = 1.0f) const;
size_t flood_keepalive_self (float scale = 0.5f) const;
size_t flood_vote_pr (std::shared_ptr<nano::vote> const &) const;
size_t flood_vote_non_pr (std::shared_ptr<nano::vote> const &, float scale) const;
size_t flood_vote_rebroadcasted (std::shared_ptr<nano::vote> const &, float scale) const;
// Flood block to all PRs and a random selection of non-PRs
void flood_block_initial (std::shared_ptr<nano::block> const &) const;
size_t flood_block_initial (std::shared_ptr<nano::block> const &) const;
// Flood block to a random selection of peers
void flood_block (std::shared_ptr<nano::block> const &, nano::transport::traffic_type) const;
size_t flood_block (std::shared_ptr<nano::block> const &, nano::transport::traffic_type) const;
void flood_block_many (std::deque<std::shared_ptr<nano::block>>, nano::transport::traffic_type, std::chrono::milliseconds delay = 10ms, std::function<void ()> callback = nullptr) const;

void send_keepalive (std::shared_ptr<nano::transport::channel> const &) const;
Expand All @@ -121,8 +121,13 @@ class network final
// Should we reach out to this endpoint with a keepalive message? If yes, register a new reachout attempt
bool track_reachout (nano::endpoint const &);

std::deque<std::shared_ptr<nano::transport::channel>> list (std::size_t max_count = 0, uint8_t minimum_version = 0) const;
std::deque<std::shared_ptr<nano::transport::channel>> list_non_pr (std::size_t max_count, uint8_t minimum_version = 0) const;
using channel_filter = std::function<bool (std::shared_ptr<nano::transport::channel> const &)>;

std::deque<std::shared_ptr<nano::transport::channel>> list (std::size_t max_count = 0, channel_filter = nullptr) const;
std::deque<std::shared_ptr<nano::transport::channel>> list_non_pr (std::size_t max_count = 0, channel_filter = nullptr) const;

std::deque<std::shared_ptr<nano::transport::channel>> list (std::size_t max_count, uint8_t minimum_version) const;
std::deque<std::shared_ptr<nano::transport::channel>> list_non_pr (std::size_t max_count, uint8_t minimum_version) const;

// Desired fanout for a given scale
std::size_t fanout (float scale = 1.0f) const;
Expand Down
15 changes: 15 additions & 0 deletions nano/node/transport/tcp_channels.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,21 @@ std::deque<std::shared_ptr<nano::transport::channel>> nano::transport::tcp_chann
return result;
}

std::deque<std::shared_ptr<nano::transport::channel>> nano::transport::tcp_channels::list (channel_filter filter) const
{
nano::lock_guard<nano::mutex> lock{ mutex };

std::deque<std::shared_ptr<nano::transport::channel>> result;
for (auto const & entry : channels)
{
if (filter == nullptr || filter (entry.channel))
{
result.push_back (entry.channel);
}
}
return result;
}

bool nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint)
{
return node.tcp_listener.connect (endpoint.address (), endpoint.port ());
Expand Down
4 changes: 4 additions & 0 deletions nano/node/transport/tcp_channels.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,12 @@ class tcp_channels final
// Should we reach out to this endpoint with a keepalive message? If yes, register a new reachout attempt
bool track_reachout (nano::endpoint const &);
void purge (std::chrono::steady_clock::time_point cutoff_deadline);

using channel_filter = std::function<bool (std::shared_ptr<nano::transport::channel> const &)>;
std::deque<std::shared_ptr<nano::transport::channel>> list (channel_filter) const;
std::deque<std::shared_ptr<nano::transport::channel>> list (uint8_t minimum_version = 0) const;
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (std::size_t max_count, uint8_t minimum_version = 0) const;

void keepalive ();
std::optional<nano::keepalive> sample_keepalive ();

Expand Down
8 changes: 6 additions & 2 deletions nano/node/vote_generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,13 @@ void nano::vote_generator::vote (std::vector<nano::block_hash> const & hashes_a,

void nano::vote_generator::broadcast_action (std::shared_ptr<nano::vote> const & vote_a) const
{
network.flood_vote_pr (vote_a);
network.flood_vote_non_pr (vote_a, 2.0f);
vote_processor.vote (vote_a, inproc_channel);

auto sent_pr = network.flood_vote_pr (vote_a);
auto sent_non_pr = network.flood_vote_non_pr (vote_a, 2.0f);

stats.add (nano::stat::type::vote_generator, nano::stat::detail::sent_pr, sent_pr);
stats.add (nano::stat::type::vote_generator, nano::stat::detail::sent_non_pr, sent_non_pr);
}

void nano::vote_generator::run ()
Expand Down
4 changes: 3 additions & 1 deletion nano/node/vote_rebroadcaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ void nano::vote_rebroadcaster::run ()

stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::rebroadcast);
stats.add (nano::stat::type::vote_rebroadcaster, nano::stat::detail::rebroadcast_hashes, vote->hashes.size ());
network.flood_vote (vote, 0.5f, /* rebroadcasted */ true); // TODO: Track number of peers that we sent the vote to

auto sent = network.flood_vote_rebroadcasted (vote, 0.5f);
stats.add (nano::stat::type::vote_rebroadcaster, nano::stat::detail::sent, sent);

lock.lock ();
}
Expand Down
Loading