Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Network filter for publish and confirm_ack messages #2539

Closed
145 changes: 144 additions & 1 deletion nano/core_test/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ TEST (active_transactions, inactive_votes_cache_existing_vote)
ASSERT_EQ (send->hash (), last_vote1.hash);
ASSERT_EQ (1, last_vote1.sequence);
// Attempt to change vote with inactive_votes_cache
node.active.add_inactive_votes_cache (send->hash (), key.pub);
node.active.add_inactive_votes_cache (send->hash (), key.pub, node.network.confirm_ack_filter.hash (vote1));
ASSERT_EQ (1, node.active.find_inactive_votes_cache (send->hash ()).voters.size ());
election->insert_inactive_votes_cache (send->hash ());
// Check that election data is not changed
Expand Down Expand Up @@ -575,6 +575,45 @@ TEST (active_transactions, inactive_votes_cache_multiple_votes)
ASSERT_EQ (2, node.stats.count (nano::stat::type::election, nano::stat::detail::vote_cached));
}

TEST (active_transactions, inactive_votes_cache_overflow)
{
nano::system system;
nano::node_flags node_flags;
node_flags.inactive_votes_cache_size = 1;
auto & node (*system.add_node (node_flags));
nano::genesis genesis;
nano::block_hash hash1{ 1 };
nano::block_hash hash2{ 2 };
auto vote1 (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, std::vector<nano::block_hash>{ hash1 }));
nano::uint128_t digest = node.network.confirm_ack_filter.hash (vote1);
{
nano::lock_guard<std::mutex> active_guard (node.active.mutex);
node.active.add_inactive_votes_cache (hash1, vote1->account, digest);
auto existing (node.active.find_inactive_votes_cache (hash1));
ASSERT_EQ (1, existing.filter_digests.size ());
ASSERT_EQ (digest, existing.filter_digests.front ());
}
// Add the vote to the network filter, should be cleared once the inactive vote is dropped
std::vector<uint8_t> bytes;
{
nano::vectorstream stream (bytes);
vote1->serialize (stream);
}
ASSERT_FALSE (node.network.confirm_ack_filter.apply (bytes.data (), bytes.size ()));
ASSERT_TRUE (node.network.confirm_ack_filter.apply (bytes.data (), bytes.size ()));
// Add another inactive vote to drop vote1, and ensure proper cleanup
{
node.active.add_inactive_votes_cache (hash2, vote1->account, digest + 1);
auto existing (node.active.find_inactive_votes_cache (hash2));
ASSERT_EQ (1, existing.filter_digests.size ());
ASSERT_EQ (digest + 1, existing.filter_digests.front ());

auto non_existing (node.active.find_inactive_votes_cache (hash1));
ASSERT_TRUE (non_existing.filter_digests.empty ());
}
ASSERT_FALSE (node.network.confirm_ack_filter.apply (bytes.data (), bytes.size ()));
}

TEST (active_transactions, update_difficulty)
{
nano::system system (2);
Expand Down Expand Up @@ -772,3 +811,107 @@ TEST (active_transactions, activate_dependencies)
ASSERT_TRUE (node1->ledger.block_confirmed (node1->store.tx_begin_read (), block2->hash ()));
ASSERT_TRUE (node2->ledger.block_confirmed (node2->store.tx_begin_read (), block2->hash ()));
}

namespace nano
{
// Tests correct handling of network filter digests
TEST (active_transactions, dropped_cleanup)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto & node (*system.add_node (node_config));
nano::genesis genesis;
nano::keypair key1;
nano::keypair key2;
// Setup two representatives
auto large_amount (nano::genesis_amount / 20);
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, nano::genesis_hash, nano::test_genesis_key.pub, nano::genesis_amount - large_amount, key1.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (nano::genesis_hash)));
auto send2 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, send1->hash (), nano::test_genesis_key.pub, nano::genesis_amount - 2 * large_amount, key2.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send1->hash ())));
auto open1 (std::make_shared<nano::state_block> (key1.pub, 0, key1.pub, large_amount, send1->hash (), key1.prv, key1.pub, *system.work.generate (key1.pub)));
auto open2 (std::make_shared<nano::state_block> (key2.pub, 0, key2.pub, large_amount, send2->hash (), key2.prv, key2.pub, *system.work.generate (key2.pub)));
ASSERT_EQ (nano::process_result::progress, node.process (*send1).code);
ASSERT_EQ (nano::process_result::progress, node.process (*send2).code);
ASSERT_EQ (nano::process_result::progress, node.process (*open1).code);
ASSERT_EQ (nano::process_result::progress, node.process (*open2).code);
ASSERT_EQ (large_amount, node.ledger.weight (key1.pub));
ASSERT_EQ (large_amount, node.ledger.weight (key2.pub));

auto block = send2;
auto vote1 (std::make_shared<nano::vote> (key1.pub, key1.prv, 0, std::vector<nano::block_hash>{ block->hash () }));
auto vote2 (std::make_shared<nano::vote> (key2.pub, key2.prv, 0, std::vector<nano::block_hash>{ block->hash () }));
auto digest1 (node.network.confirm_ack_filter.hash (vote1));
auto digest2 (node.network.confirm_ack_filter.hash (vote2));

// Add to network filter to ensure proper cleanup after the election is dropped
std::vector<uint8_t> block_bytes;
{
nano::vectorstream stream (block_bytes);
block->serialize (stream);
}
ASSERT_FALSE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));

std::vector<uint8_t> bytes1;
std::vector<uint8_t> bytes2;
{
nano::vectorstream stream1 (bytes1);
vote1->serialize (stream1);

nano::vectorstream stream2 (bytes2);
vote2->serialize (stream2);
}
ASSERT_FALSE (node.network.confirm_ack_filter.apply (bytes1.data (), bytes1.size ()));
ASSERT_FALSE (node.network.confirm_ack_filter.apply (bytes2.data (), bytes2.size ()));
ASSERT_TRUE (node.network.confirm_ack_filter.apply (bytes1.data (), bytes1.size ()));
ASSERT_TRUE (node.network.confirm_ack_filter.apply (bytes2.data (), bytes2.size ()));

// Before the election, add an inactive vote
ASSERT_EQ (nano::vote_code::indeterminate, node.active.vote (vote1));
{
nano::lock_guard<std::mutex> guard (node.active.mutex);
ASSERT_EQ (digest1, node.active.find_inactive_votes_cache (block->hash ()).filter_digests.front ());
}

// Start the election, ensuring the filter digest propagated from the inactive vote and that the inactive vote was removed
auto election (node.active.insert (block).first);
ASSERT_NE (nullptr, election);
ASSERT_EQ (digest1, election->last_votes.find (key1.pub)->second.filter_digest);
{
nano::lock_guard<std::mutex> guard (node.active.mutex);
ASSERT_TRUE (node.active.find_inactive_votes_cache (block->hash ()).voters.empty ());
}

// Add the other vote
ASSERT_EQ (nano::vote_code::vote, node.active.vote (vote2));
ASSERT_EQ (digest2, election->last_votes.find (key2.pub)->second.filter_digest);

// Not yet removed
ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
ASSERT_TRUE (node.network.confirm_ack_filter.apply (bytes1.data (), bytes1.size ()));
ASSERT_TRUE (node.network.confirm_ack_filter.apply (bytes2.data (), bytes2.size ()));

// Now simulate dropping the election, which performs a cleanup in the background using the node worker
ASSERT_FALSE (election->confirmed ());
{
nano::lock_guard<std::mutex> guard (node.active.mutex);
election->cleanup ();
}

// Push a worker task to ensure the cleanup is already performed
std::atomic<bool> flag{ false };
node.worker.push_task ([&flag]() {
flag = true;
});
system.deadline_set (5s);
while (!flag)
{
ASSERT_NO_ERROR (system.poll ());
}

// The filter must have been cleared for both inserted votes and the block
ASSERT_FALSE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
ASSERT_FALSE (node.network.confirm_ack_filter.apply (bytes1.data (), bytes1.size ()));
ASSERT_FALSE (node.network.confirm_ack_filter.apply (bytes2.data (), bytes2.size ()));
}
}
30 changes: 30 additions & 0 deletions nano/core_test/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,16 @@ TEST (bootstrap_processor, frontiers_unconfirmed)
}
ASSERT_FALSE (node3->ledger.block_exists (send1->hash ()));
ASSERT_FALSE (node3->ledger.block_exists (open1->hash ()));
// Inactive votes cache should be cleared for chosen frontiers
std::vector<nano::block_hash> frontiers{ send2->hash (), open1->hash (), open2->hash () };
{
nano::lock_guard<std::mutex> guard (node2->active.mutex);
for (auto hash : frontiers)
{
auto non_existing (node2->active.find_inactive_votes_cache (hash));
EXPECT_TRUE (non_existing.voters.empty ());
}
}
ASSERT_EQ (1, node3->stats.count (nano::stat::type::bootstrap, nano::stat::detail::frontier_confirmation_failed, nano::stat::dir::in)); // failed request from node1
ASSERT_TRUE (node3->bootstrap_initiator.excluded_peers.check (nano::transport::map_endpoint_to_tcp (node1->network.endpoint ())));
}
Expand Down Expand Up @@ -480,6 +490,16 @@ TEST (bootstrap_processor, frontiers_confirmed)
{
ASSERT_NO_ERROR (system.poll ());
}
// Inactive votes cache should be cleared for chosen frontiers
std::vector<nano::block_hash> frontiers{ send2->hash (), open1->hash (), open2->hash () };
{
nano::lock_guard<std::mutex> guard (node2->active.mutex);
for (auto hash : frontiers)
{
auto non_existing (node2->active.find_inactive_votes_cache (hash));
EXPECT_TRUE (non_existing.voters.empty ());
}
}
ASSERT_EQ (1, node2->stats.count (nano::stat::type::bootstrap, nano::stat::detail::frontier_confirmation_successful, nano::stat::dir::in)); // Successful request from node1
ASSERT_EQ (0, node2->stats.count (nano::stat::type::bootstrap, nano::stat::detail::frontier_confirmation_failed, nano::stat::dir::in));
}
Expand Down Expand Up @@ -539,6 +559,16 @@ TEST (bootstrap_processor, frontiers_unconfirmed_threshold)
}
ASSERT_FALSE (node3->ledger.block_exists (send2->hash ()));
ASSERT_FALSE (node3->ledger.block_exists (open2->hash ()));
// Inactive votes cache should be cleared for chosen frontiers
std::vector<nano::block_hash> frontiers{ send2->hash (), open1->hash (), open2->hash () };
{
nano::lock_guard<std::mutex> guard (node2->active.mutex);
for (auto hash : frontiers)
{
auto non_existing (node2->active.find_inactive_votes_cache (hash));
EXPECT_TRUE (non_existing.voters.empty ());
}
}
ASSERT_EQ (1, node3->stats.count (nano::stat::type::bootstrap, nano::stat::detail::frontier_confirmation_failed, nano::stat::dir::in)); // failed confirmation
ASSERT_EQ (0, node3->stats.count (nano::stat::type::bootstrap, nano::stat::detail::frontier_confirmation_successful, nano::stat::dir::in));
}
Expand Down
15 changes: 10 additions & 5 deletions nano/core_test/message_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ TEST (message_parser, exact_confirm_ack_size)
{
nano::system system (1);
test_visitor visitor;
nano::network_filter filter (1);
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer (block_uniquer);
nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work);
nano::message_parser parser (filter, filter, block_uniquer, vote_uniquer, visitor, system.work);
auto block (std::make_shared<nano::send_block> (1, 1, 2, nano::keypair ().prv, 4, *system.work.generate (nano::root (1))));
auto vote (std::make_shared<nano::vote> (0, nano::keypair ().prv, 0, std::move (block)));
nano::confirm_ack message (vote);
Expand Down Expand Up @@ -96,9 +97,10 @@ TEST (message_parser, exact_confirm_req_size)
{
nano::system system (1);
test_visitor visitor;
nano::network_filter filter (1);
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer (block_uniquer);
nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work);
nano::message_parser parser (filter, filter, block_uniquer, vote_uniquer, visitor, system.work);
auto block (std::make_shared<nano::send_block> (1, 1, 2, nano::keypair ().prv, 4, *system.work.generate (nano::root (1))));
nano::confirm_req message (std::move (block));
std::vector<uint8_t> bytes;
Expand Down Expand Up @@ -128,9 +130,10 @@ TEST (message_parser, exact_confirm_req_hash_size)
{
nano::system system (1);
test_visitor visitor;
nano::network_filter filter (1);
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer (block_uniquer);
nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work);
nano::message_parser parser (filter, filter, block_uniquer, vote_uniquer, visitor, system.work);
nano::send_block block (1, 1, 2, nano::keypair ().prv, 4, *system.work.generate (nano::root (1)));
nano::confirm_req message (block.hash (), block.root ());
std::vector<uint8_t> bytes;
Expand Down Expand Up @@ -160,9 +163,10 @@ TEST (message_parser, exact_publish_size)
{
nano::system system (1);
test_visitor visitor;
nano::network_filter filter (1);
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer (block_uniquer);
nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work);
nano::message_parser parser (filter, filter, block_uniquer, vote_uniquer, visitor, system.work);
auto block (std::make_shared<nano::send_block> (1, 1, 2, nano::keypair ().prv, 4, *system.work.generate (nano::root (1))));
nano::publish message (std::move (block));
std::vector<uint8_t> bytes;
Expand Down Expand Up @@ -192,9 +196,10 @@ TEST (message_parser, exact_keepalive_size)
{
nano::system system (1);
test_visitor visitor;
nano::network_filter filter (1);
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer (block_uniquer);
nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work);
nano::message_parser parser (filter, filter, block_uniquer, vote_uniquer, visitor, system.work);
nano::keepalive message;
std::vector<uint8_t> bytes;
{
Expand Down
111 changes: 111 additions & 0 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,117 @@ TEST (network, replace_port)
node1->stop ();
}

TEST (network, duplicate_detection)
{
nano::system system;
nano::node_flags node_flags;
node_flags.disable_udp = false;
auto & node0 (*system.add_node (node_flags));
auto & node1 (*system.add_node (node_flags));
auto udp_channel (std::make_shared<nano::transport::channel_udp> (node0.network.udp_channels, node1.network.endpoint (), node1.network_params.protocol.protocol_version));
nano::genesis genesis;
nano::publish publish (genesis.open);
auto vote (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 1, genesis.open));
nano::confirm_ack confirm_ack (vote);
// Publish duplicate detection through UDP
ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish));
udp_channel->send (publish);
udp_channel->send (publish);
system.deadline_set (2s);
while (node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish) < 1)
{
ASSERT_NO_ERROR (system.poll ());
}
// Confirm ack duplicate detection through UDP
ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack));
udp_channel->send (confirm_ack);
udp_channel->send (confirm_ack);
system.deadline_set (2s);
while (node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack) < 1)
{
ASSERT_NO_ERROR (system.poll ());
}

auto tcp_channel (node0.network.tcp_channels.find_channel (nano::transport::map_endpoint_to_tcp (node1.network.endpoint ())));
// Publish duplicate detection through TCP
ASSERT_EQ (1, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish));
tcp_channel->send (publish);
system.deadline_set (2s);
while (node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish) < 2)
{
ASSERT_NO_ERROR (system.poll ());
}
// Confirm ack duplicate detection through TCP
ASSERT_EQ (1, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack));
tcp_channel->send (confirm_ack);
system.deadline_set (2s);
while (node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack) < 2)
{
ASSERT_NO_ERROR (system.poll ());
}
}

TEST (network, duplicate_revert_publish)
{
nano::system system;
nano::node_flags node_flags;
node_flags.block_processor_full_size = 0;
auto & node (*system.add_node (node_flags));
ASSERT_TRUE (node.block_processor.full ());
nano::genesis genesis;
nano::publish publish (genesis.open);
std::vector<uint8_t> bytes;
{
nano::vectorstream stream (bytes);
publish.block->serialize (stream);
}
// Add to the blocks filter
// Should be cleared when dropping due to a full block processor, as long as the message has the optional digest attached
// Test network.duplicate_detection ensures that the digest is attached when deserializing messages
nano::uint128_t digest;
ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size (), &digest));
ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
auto channel (std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, node.network.endpoint (), node.network_params.protocol.protocol_version));
ASSERT_EQ (0, publish.digest);
node.network.process_message (publish, channel);
ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
publish.digest = digest;
node.network.process_message (publish, channel);
ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
}

TEST (network, duplicate_revert_confirm_ack)
{
nano::system system;
nano::node_flags node_flags;
node_flags.vote_processor_capacity = 0;
auto & node (*system.add_node (node_flags));
ASSERT_TRUE (node.vote_processor.vote ({}, {}));
ASSERT_EQ (1, node.stats.count (nano::stat::type::vote, nano::stat::detail::vote_overflow));
nano::genesis genesis;
nano::keypair key;
auto vote (std::make_shared<nano::vote> (key.pub, key.prv, 0, std::vector<nano::block_hash>{ genesis.hash () }));
nano::confirm_ack confirm_ack (vote);
std::vector<uint8_t> bytes;
{
nano::vectorstream stream (bytes);
confirm_ack.vote->serialize (stream);
}
// Add to the votes filter
// Should be cleared when dropping due to a full vote processor, as long as the message has the optional digest attached
// Test network.duplicate_detection ensures that the digest is attached when deserializing messages
nano::uint128_t digest;
ASSERT_FALSE (node.network.confirm_ack_filter.apply (bytes.data (), bytes.size (), &digest));
ASSERT_TRUE (node.network.confirm_ack_filter.apply (bytes.data (), bytes.size ()));
auto channel (std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, node.network.endpoint (), node.network_params.protocol.protocol_version));
ASSERT_EQ (0, confirm_ack.digest);
node.network.process_message (confirm_ack, channel);
ASSERT_TRUE (node.network.confirm_ack_filter.apply (bytes.data (), bytes.size ()));
confirm_ack.digest = digest;
node.network.process_message (confirm_ack, channel);
ASSERT_FALSE (node.network.confirm_ack_filter.apply (bytes.data (), bytes.size ()));
}

// The test must be completed in less than 1 second
TEST (bandwidth_limiter, validate)
{
Expand Down
Loading