Skip to content

Commit

Permalink
Adds duplicate detection of publish and confirm_ack messages before t…
Browse files Browse the repository at this point in the history
…hey are deserialized.

When they are unique, the digest is saved and passed around to network processing which may drop the messages if the block or vote processor is full.

After processing, there are instances where digests have to be cleared from the filter to ensure the original message can be received again:
- Cleaning up a long unchecked block erases that element from the publish filter.
- Reaching max capacity of the inactive votes cache clears the oldest item from the cache and the confirm_ack filter. For this reason, inactive_votes_cache now stores the digests, and votes pulled from this cache for new elections are now erased.
- Interactions between bootstrap frontier confirmation and inactive_votes_cache
- An active election is dropped. The vote digest is now saved here as well. Special case if a vote came from inactive_votes_cache, must ensure the digest is passed along.

Many tests were added to ensure proper clearing of the filter when blocks and votes are dropped.

The blocks_filter has been removed due to redundancy. 16 extra bytes are used for each vote in an (inactive) vote stored in memory.
  • Loading branch information
guilhermelawless committed Mar 3, 2020
1 parent 95de70d commit 07328be
Show file tree
Hide file tree
Showing 24 changed files with 579 additions and 133 deletions.
142 changes: 141 additions & 1 deletion nano/core_test/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ TEST (active_transactions, inactive_votes_cache_existing_vote)
ASSERT_EQ (send->hash (), last_vote1.hash);
ASSERT_EQ (1, last_vote1.sequence);
// Attempt to change vote with inactive_votes_cache
node.active.add_inactive_votes_cache (send->hash (), key.pub);
node.active.add_inactive_votes_cache (send->hash (), key.pub, node.network.confirm_ack_filter.hash (vote1));
ASSERT_EQ (1, node.active.find_inactive_votes_cache (send->hash ()).voters.size ());
election->insert_inactive_votes_cache (send->hash ());
// Check that election data is not changed
Expand Down Expand Up @@ -552,6 +552,45 @@ TEST (active_transactions, inactive_votes_cache_multiple_votes)
ASSERT_EQ (2, node.stats.count (nano::stat::type::election, nano::stat::detail::vote_cached));
}

TEST (active_transactions, inactive_votes_cache_overflow)
{
nano::system system;
nano::node_flags node_flags;
node_flags.inactive_votes_cache_size = 1;
auto & node (*system.add_node (node_flags));
nano::genesis genesis;
nano::block_hash hash1{ 1 };
nano::block_hash hash2{ 2 };
auto vote1 (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, std::vector<nano::block_hash>{ hash1 }));
nano::uint128_t digest = node.network.confirm_ack_filter.hash (vote1);
{
nano::lock_guard<std::mutex> active_guard (node.active.mutex);
node.active.add_inactive_votes_cache (hash1, vote1->account, digest);
auto existing (node.active.find_inactive_votes_cache (hash1));
ASSERT_EQ (1, existing.filter_digests.size ());
ASSERT_EQ (digest, existing.filter_digests.front ());
}
// Add the vote to the network filter, should be cleared once the inactive vote is dropped
std::vector<uint8_t> bytes;
{
nano::vectorstream stream (bytes);
vote1->serialize (stream);
}
ASSERT_FALSE (node.network.confirm_ack_filter.apply (bytes.data (), bytes.size ()));
ASSERT_TRUE (node.network.confirm_ack_filter.apply (bytes.data (), bytes.size ()));
// Add another inactive vote to drop vote1, and ensure proper cleanup
{
node.active.add_inactive_votes_cache (hash2, vote1->account, digest + 1);
auto existing (node.active.find_inactive_votes_cache (hash2));
ASSERT_EQ (1, existing.filter_digests.size ());
ASSERT_EQ (digest + 1, existing.filter_digests.front ());

auto non_existing (node.active.find_inactive_votes_cache (hash1));
ASSERT_TRUE (non_existing.filter_digests.empty ());
}
ASSERT_FALSE (node.network.confirm_ack_filter.apply (bytes.data (), bytes.size ()));
}

TEST (active_transactions, update_difficulty)
{
nano::system system (2);
Expand Down Expand Up @@ -741,3 +780,104 @@ TEST (active_transactions, activate_dependencies)
}
ASSERT_NE (nullptr, node1->block (block2->hash ()));
}

namespace nano
{
// Tests correct handling of network filter digests
TEST (active_transactions, dropped_cleanup)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto & node (*system.add_node (node_config));
nano::genesis genesis;
nano::keypair key1;
nano::keypair key2;
// Setup two representatives
auto large_amount (nano::genesis_amount / 20);
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, nano::genesis_hash, nano::test_genesis_key.pub, nano::genesis_amount - large_amount, key1.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (nano::genesis_hash)));
auto send2 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, send1->hash (), nano::test_genesis_key.pub, nano::genesis_amount - 2 * large_amount, key2.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send1->hash ())));
auto open1 (std::make_shared<nano::state_block> (key1.pub, 0, key1.pub, large_amount, send1->hash (), key1.prv, key1.pub, *system.work.generate (key1.pub)));
auto open2 (std::make_shared<nano::state_block> (key2.pub, 0, key2.pub, large_amount, send2->hash (), key2.prv, key2.pub, *system.work.generate (key2.pub)));
ASSERT_EQ (nano::process_result::progress, node.process (*send1).code);
ASSERT_EQ (nano::process_result::progress, node.process (*send2).code);
ASSERT_EQ (nano::process_result::progress, node.process (*open1).code);
ASSERT_EQ (nano::process_result::progress, node.process (*open2).code);
ASSERT_EQ (large_amount, node.ledger.weight (key1.pub));
ASSERT_EQ (large_amount, node.ledger.weight (key2.pub));

auto block = send2;
auto vote1 (std::make_shared<nano::vote> (key1.pub, key1.prv, 0, std::vector<nano::block_hash>{ block->hash () }));
auto vote2 (std::make_shared<nano::vote> (key2.pub, key2.prv, 0, std::vector<nano::block_hash>{ block->hash () }));
auto digest1 (node.network.confirm_ack_filter.hash (vote1));
auto digest2 (node.network.confirm_ack_filter.hash (vote2));

// Add to network filter to ensure proper cleanup after the election is dropped
std::vector<uint8_t> block_bytes;
{
nano::vectorstream stream (block_bytes);
block->serialize (stream);
}
ASSERT_FALSE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));

std::vector<uint8_t> bytes1;
std::vector<uint8_t> bytes2;
{
nano::vectorstream stream1 (bytes1);
vote1->serialize (stream1);

nano::vectorstream stream2 (bytes2);
vote2->serialize (stream2);
}
ASSERT_FALSE (node.network.confirm_ack_filter.apply (bytes1.data (), bytes1.size ()));
ASSERT_FALSE (node.network.confirm_ack_filter.apply (bytes2.data (), bytes2.size ()));
ASSERT_TRUE (node.network.confirm_ack_filter.apply (bytes1.data (), bytes1.size ()));
ASSERT_TRUE (node.network.confirm_ack_filter.apply (bytes2.data (), bytes2.size ()));

// Before the election, add an inactive vote
ASSERT_EQ (nano::vote_code::indeterminate, node.active.vote (vote1));
{
nano::lock_guard<std::mutex> guard (node.active.mutex);
ASSERT_EQ (digest1, node.active.find_inactive_votes_cache (block->hash ()).filter_digests.front ());
}

// Start the election, ensuring the filter digest propagated from the inactive vote and that the inactive vote was removed
auto election (node.active.insert (block).first);
ASSERT_NE (nullptr, election);
ASSERT_EQ (digest1, election->last_votes.find (key1.pub)->second.filter_digest);
{
nano::lock_guard<std::mutex> guard (node.active.mutex);
ASSERT_TRUE (node.active.find_inactive_votes_cache (block->hash ()).voters.empty ());
}

// Add the other vote
ASSERT_EQ (nano::vote_code::vote, node.active.vote (vote2));
ASSERT_EQ (digest2, election->last_votes.find (key2.pub)->second.filter_digest);

// Not yet removed
ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
ASSERT_TRUE (node.network.confirm_ack_filter.apply (bytes1.data (), bytes1.size ()));
ASSERT_TRUE (node.network.confirm_ack_filter.apply (bytes2.data (), bytes2.size ()));

// Now simulate dropping the election, which performs a cleanup in the background using the node worker
ASSERT_FALSE (election->confirmed ());
election->cleanup ();

// Push a worker task to ensure the cleanup is already performed
std::atomic<bool> flag{ false };
node.worker.push_task ([&flag]() {
flag = true;
});
system.deadline_set (5s);
while (!flag)
{
ASSERT_NO_ERROR (system.poll ());
}

// The filter must have been cleared for both inserted votes and the block
ASSERT_FALSE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
ASSERT_FALSE (node.network.confirm_ack_filter.apply (bytes1.data (), bytes1.size ()));
ASSERT_FALSE (node.network.confirm_ack_filter.apply (bytes2.data (), bytes2.size ()));
}
}
30 changes: 30 additions & 0 deletions nano/core_test/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,16 @@ TEST (bootstrap_processor, frontiers_unconfirmed)
}
ASSERT_FALSE (node3->ledger.block_exists (send1->hash ()));
ASSERT_FALSE (node3->ledger.block_exists (open1->hash ()));
// Inactive votes cache should be cleared for chosen frontiers
std::vector<nano::block_hash> frontiers{ send2->hash (), open1->hash (), open2->hash () };
{
nano::lock_guard<std::mutex> guard (node2->active.mutex);
for (auto hash : frontiers)
{
auto non_existing (node2->active.find_inactive_votes_cache (hash));
EXPECT_TRUE (non_existing.voters.empty ());
}
}
ASSERT_EQ (1, node3->stats.count (nano::stat::type::bootstrap, nano::stat::detail::frontier_confirmation_failed, nano::stat::dir::in)); // failed request from node1
ASSERT_TRUE (node3->bootstrap_initiator.excluded_peers.check (nano::transport::map_endpoint_to_tcp (node1->network.endpoint ())));
}
Expand Down Expand Up @@ -480,6 +490,16 @@ TEST (bootstrap_processor, frontiers_confirmed)
{
ASSERT_NO_ERROR (system.poll ());
}
// Inactive votes cache should be cleared for chosen frontiers
std::vector<nano::block_hash> frontiers{ send2->hash (), open1->hash (), open2->hash () };
{
nano::lock_guard<std::mutex> guard (node2->active.mutex);
for (auto hash : frontiers)
{
auto non_existing (node2->active.find_inactive_votes_cache (hash));
EXPECT_TRUE (non_existing.voters.empty ());
}
}
ASSERT_EQ (1, node2->stats.count (nano::stat::type::bootstrap, nano::stat::detail::frontier_confirmation_successful, nano::stat::dir::in)); // Successful request from node1
ASSERT_EQ (0, node2->stats.count (nano::stat::type::bootstrap, nano::stat::detail::frontier_confirmation_failed, nano::stat::dir::in));
}
Expand Down Expand Up @@ -539,6 +559,16 @@ TEST (bootstrap_processor, frontiers_unconfirmed_threshold)
}
ASSERT_FALSE (node3->ledger.block_exists (send2->hash ()));
ASSERT_FALSE (node3->ledger.block_exists (open2->hash ()));
// Inactive votes cache should be cleared for chosen frontiers
std::vector<nano::block_hash> frontiers{ send2->hash (), open1->hash (), open2->hash () };
{
nano::lock_guard<std::mutex> guard (node2->active.mutex);
for (auto hash : frontiers)
{
auto non_existing (node2->active.find_inactive_votes_cache (hash));
EXPECT_TRUE (non_existing.voters.empty ());
}
}
ASSERT_EQ (1, node3->stats.count (nano::stat::type::bootstrap, nano::stat::detail::frontier_confirmation_failed, nano::stat::dir::in)); // failed confirmation
ASSERT_EQ (0, node3->stats.count (nano::stat::type::bootstrap, nano::stat::detail::frontier_confirmation_successful, nano::stat::dir::in));
}
Expand Down
15 changes: 10 additions & 5 deletions nano/core_test/message_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ TEST (message_parser, exact_confirm_ack_size)
{
nano::system system (1);
test_visitor visitor;
nano::network_filter filter (1);
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer (block_uniquer);
nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work);
nano::message_parser parser (filter, filter, block_uniquer, vote_uniquer, visitor, system.work);
auto block (std::make_shared<nano::send_block> (1, 1, 2, nano::keypair ().prv, 4, *system.work.generate (nano::root (1))));
auto vote (std::make_shared<nano::vote> (0, nano::keypair ().prv, 0, std::move (block)));
nano::confirm_ack message (vote);
Expand Down Expand Up @@ -96,9 +97,10 @@ TEST (message_parser, exact_confirm_req_size)
{
nano::system system (1);
test_visitor visitor;
nano::network_filter filter (1);
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer (block_uniquer);
nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work);
nano::message_parser parser (filter, filter, block_uniquer, vote_uniquer, visitor, system.work);
auto block (std::make_shared<nano::send_block> (1, 1, 2, nano::keypair ().prv, 4, *system.work.generate (nano::root (1))));
nano::confirm_req message (std::move (block));
std::vector<uint8_t> bytes;
Expand Down Expand Up @@ -128,9 +130,10 @@ TEST (message_parser, exact_confirm_req_hash_size)
{
nano::system system (1);
test_visitor visitor;
nano::network_filter filter (1);
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer (block_uniquer);
nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work);
nano::message_parser parser (filter, filter, block_uniquer, vote_uniquer, visitor, system.work);
nano::send_block block (1, 1, 2, nano::keypair ().prv, 4, *system.work.generate (nano::root (1)));
nano::confirm_req message (block.hash (), block.root ());
std::vector<uint8_t> bytes;
Expand Down Expand Up @@ -160,9 +163,10 @@ TEST (message_parser, exact_publish_size)
{
nano::system system (1);
test_visitor visitor;
nano::network_filter filter (1);
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer (block_uniquer);
nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work);
nano::message_parser parser (filter, filter, block_uniquer, vote_uniquer, visitor, system.work);
auto block (std::make_shared<nano::send_block> (1, 1, 2, nano::keypair ().prv, 4, *system.work.generate (nano::root (1))));
nano::publish message (std::move (block));
std::vector<uint8_t> bytes;
Expand Down Expand Up @@ -192,9 +196,10 @@ TEST (message_parser, exact_keepalive_size)
{
nano::system system (1);
test_visitor visitor;
nano::network_filter filter (1);
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer (block_uniquer);
nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work);
nano::message_parser parser (filter, filter, block_uniquer, vote_uniquer, visitor, system.work);
nano::keepalive message;
std::vector<uint8_t> bytes;
{
Expand Down
111 changes: 111 additions & 0 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,117 @@ TEST (network, replace_port)
node1->stop ();
}

TEST (network, duplicate_detection)
{
nano::system system;
nano::node_flags node_flags;
node_flags.disable_udp = false;
auto & node0 (*system.add_node (node_flags));
auto & node1 (*system.add_node (node_flags));
auto udp_channel (std::make_shared<nano::transport::channel_udp> (node0.network.udp_channels, node1.network.endpoint (), node1.network_params.protocol.protocol_version));
nano::genesis genesis;
nano::publish publish (genesis.open);
auto vote (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 1, genesis.open));
nano::confirm_ack confirm_ack (vote);
// Publish duplicate detection through UDP
ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish));
udp_channel->send (publish);
udp_channel->send (publish);
system.deadline_set (2s);
while (node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish) < 1)
{
ASSERT_NO_ERROR (system.poll ());
}
// Confirm ack duplicate detection through UDP
ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack));
udp_channel->send (confirm_ack);
udp_channel->send (confirm_ack);
system.deadline_set (2s);
while (node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack) < 1)
{
ASSERT_NO_ERROR (system.poll ());
}

auto tcp_channel (node0.network.tcp_channels.find_channel (nano::transport::map_endpoint_to_tcp (node1.network.endpoint ())));
// Publish duplicate detection through TCP
ASSERT_EQ (1, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish));
tcp_channel->send (publish);
system.deadline_set (2s);
while (node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish) < 2)
{
ASSERT_NO_ERROR (system.poll ());
}
// Confirm ack duplicate detection through TCP
ASSERT_EQ (1, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack));
tcp_channel->send (confirm_ack);
system.deadline_set (2s);
while (node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack) < 2)
{
ASSERT_NO_ERROR (system.poll ());
}
}

TEST (network, duplicate_revert_publish)
{
nano::system system;
nano::node_flags node_flags;
node_flags.block_processor_full_size = 0;
auto & node (*system.add_node (node_flags));
ASSERT_TRUE (node.block_processor.full ());
nano::genesis genesis;
nano::publish publish (genesis.open);
std::vector<uint8_t> bytes;
{
nano::vectorstream stream (bytes);
publish.block->serialize (stream);
}
// Add to the blocks filter
// Should be cleared when dropping due to a full block processor, as long as the message has the optional digest attached
// Test network.duplicate_detection ensures that the digest is attached when deserializing messages
nano::uint128_t digest;
ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size (), &digest));
ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
auto channel (std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, node.network.endpoint (), node.network_params.protocol.protocol_version));
ASSERT_EQ (0, publish.digest);
node.network.process_message (publish, channel);
ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
publish.digest = digest;
node.network.process_message (publish, channel);
ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
}

TEST (network, duplicate_revert_confirm_ack)
{
nano::system system;
nano::node_flags node_flags;
node_flags.vote_processor_capacity = 0;
auto & node (*system.add_node (node_flags));
ASSERT_TRUE (node.vote_processor.vote ({}, {}));
ASSERT_EQ (1, node.stats.count (nano::stat::type::vote, nano::stat::detail::vote_overflow));
nano::genesis genesis;
nano::keypair key;
auto vote (std::make_shared<nano::vote> (key.pub, key.prv, 0, std::vector<nano::block_hash>{ genesis.hash () }));
nano::confirm_ack confirm_ack (vote);
std::vector<uint8_t> bytes;
{
nano::vectorstream stream (bytes);
confirm_ack.vote->serialize (stream);
}
// Add to the votes filter
// Should be cleared when dropping due to a full vote processor, as long as the message has the optional digest attached
// Test network.duplicate_detection ensures that the digest is attached when deserializing messages
nano::uint128_t digest;
ASSERT_FALSE (node.network.confirm_ack_filter.apply (bytes.data (), bytes.size (), &digest));
ASSERT_TRUE (node.network.confirm_ack_filter.apply (bytes.data (), bytes.size ()));
auto channel (std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, node.network.endpoint (), node.network_params.protocol.protocol_version));
ASSERT_EQ (0, confirm_ack.digest);
node.network.process_message (confirm_ack, channel);
ASSERT_TRUE (node.network.confirm_ack_filter.apply (bytes.data (), bytes.size ()));
confirm_ack.digest = digest;
node.network.process_message (confirm_ack, channel);
ASSERT_FALSE (node.network.confirm_ack_filter.apply (bytes.data (), bytes.size ()));
}

// The test must be completed in less than 1 second
TEST (bandwidth_limiter, validate)
{
Expand Down
Loading

0 comments on commit 07328be

Please sign in to comment.