From d511b1fed6da7cd6225e77f5c562c0885a10ccac Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Thu, 9 Jan 2020 11:25:57 +0100 Subject: [PATCH] Indeterminate vote status and enhanced websocket vote sub (#2444) * Indeterminate vote status and enhanced websocket vote sub * unhandled stats switch case * disambiguate conditional expression * Return vote_code directly from active.vote and replace tribool with simpler approach --- nano/core_test/active_transactions.cpp | 46 ++++++++ nano/core_test/ledger.cpp | 14 +-- nano/core_test/node.cpp | 2 +- nano/core_test/websocket.cpp | 157 ++++++++++++++----------- nano/lib/stats.cpp | 3 + nano/lib/stats.hpp | 1 + nano/node/active_transactions.cpp | 25 ++-- nano/node/active_transactions.hpp | 5 +- nano/node/node.cpp | 57 ++++----- nano/node/node_observers.hpp | 2 +- nano/node/vote_processor.cpp | 33 ++---- nano/node/websocket.cpp | 45 +++++-- nano/node/websocket.hpp | 4 +- nano/secure/common.hpp | 3 +- 14 files changed, 251 insertions(+), 146 deletions(-) diff --git a/nano/core_test/active_transactions.cpp b/nano/core_test/active_transactions.cpp index 6a94a25654..c471f430ba 100644 --- a/nano/core_test/active_transactions.cpp +++ b/nano/core_test/active_transactions.cpp @@ -681,3 +681,49 @@ TEST (active_transactions, restart_dropped) ASSERT_EQ (work2, block->block_work ()); } } + +TEST (active_transactions, vote_replays) +{ + nano::system system; + nano::node_config node_config (nano::get_available_port (), system.logging); + node_config.enable_voting = false; + auto & node = *system.add_node (node_config); + nano::genesis genesis; + nano::keypair key; + std::error_code ec; + auto send1 (std::make_shared (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (genesis.hash ()))); + ASSERT_NE (nullptr, send1); + auto open1 (std::make_shared (key.pub, 0, key.pub, nano::Gxrb_ratio, send1->hash (), key.prv, key.pub, *system.work.generate (key.pub))); + ASSERT_NE (nullptr, open1); + node.process_active (send1); + node.process_active (open1); + node.block_processor.flush (); + ASSERT_EQ (2, node.active.size ()); + // First vote is not a replay and confirms the election, second vote should be indeterminate since the election no longer exists + auto vote_send1 (std::make_shared (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, send1)); + ASSERT_EQ (nano::vote_code::vote, node.active.vote (vote_send1)); + ASSERT_EQ (1, node.active.size ()); + ASSERT_EQ (nano::vote_code::indeterminate, node.active.vote (vote_send1)); + // Open new account + auto vote_open1 (std::make_shared (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, open1)); + ASSERT_EQ (nano::vote_code::vote, node.active.vote (vote_open1)); + ASSERT_TRUE (node.active.empty ()); + ASSERT_EQ (nano::vote_code::indeterminate, node.active.vote (vote_open1)); + ASSERT_EQ (nano::Gxrb_ratio, node.ledger.weight (key.pub)); + + auto send2 (std::make_shared (key.pub, open1->hash (), key.pub, nano::Gxrb_ratio - 1, key.pub, key.prv, key.pub, *system.work.generate (open1->hash ()))); + ASSERT_NE (nullptr, send2); + node.process_active (send2); + node.block_processor.flush (); + ASSERT_EQ (1, node.active.size ()); + auto vote1_send2 (std::make_shared (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, send2)); + auto vote2_send2 (std::make_shared (key.pub, key.prv, 0, send2)); + ASSERT_EQ (nano::vote_code::vote, node.active.vote (vote2_send2)); + ASSERT_EQ (1, node.active.size ()); + ASSERT_EQ (nano::vote_code::replay, node.active.vote (vote2_send2)); + ASSERT_EQ (1, node.active.size ()); + ASSERT_EQ (nano::vote_code::vote, node.active.vote (vote1_send2)); + ASSERT_EQ (0, node.active.size ()); + ASSERT_EQ (nano::vote_code::indeterminate, node.active.vote (vote1_send2)); + ASSERT_EQ (nano::vote_code::indeterminate, node.active.vote (vote2_send2)); +} diff --git a/nano/core_test/ledger.cpp b/nano/core_test/ledger.cpp index 04878ed49e..a7a954d144 100644 --- a/nano/core_test/ledger.cpp +++ b/nano/core_test/ledger.cpp @@ -760,9 +760,9 @@ TEST (votes, add_one) ASSERT_EQ (1, votes1->last_votes.size ()); lock.unlock (); auto vote1 (std::make_shared (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 1, send1)); - ASSERT_FALSE (node1.active.vote (vote1)); + ASSERT_EQ (nano::vote_code::vote, node1.active.vote (vote1)); auto vote2 (std::make_shared (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 2, send1)); - ASSERT_FALSE (node1.active.vote (vote2)); + ASSERT_EQ (nano::vote_code::indeterminate, node1.active.vote (vote2)); lock.lock (); ASSERT_EQ (2, votes1->last_votes.size ()); auto existing1 (votes1->last_votes.find (nano::test_genesis_key.pub)); @@ -790,9 +790,9 @@ TEST (votes, add_two) nano::keypair key2; auto send2 (std::make_shared (genesis.hash (), key2.pub, 0, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0)); auto vote2 (std::make_shared (key2.pub, key2.prv, 1, send2)); - ASSERT_FALSE (node1.active.vote (vote2)); + ASSERT_EQ (nano::vote_code::vote, node1.active.vote (vote2)); auto vote1 (std::make_shared (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 1, send1)); - ASSERT_FALSE (node1.active.vote (vote1)); + ASSERT_EQ (nano::vote_code::vote, node1.active.vote (vote1)); lock.lock (); ASSERT_EQ (3, votes1->last_votes.size ()); ASSERT_NE (votes1->last_votes.end (), votes1->last_votes.find (nano::test_genesis_key.pub)); @@ -821,7 +821,7 @@ TEST (votes, add_existing) } node1.active.start (send1); auto vote1 (std::make_shared (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 1, send1)); - ASSERT_FALSE (node1.active.vote (vote1)); + ASSERT_EQ (nano::vote_code::vote, node1.active.vote (vote1)); // Block is already processed from vote ASSERT_TRUE (node1.active.publish (send1)); nano::unique_lock lock (node1.active.mutex); @@ -834,14 +834,14 @@ TEST (votes, add_existing) // Pretend we've waited the timeout votes1->last_votes[nano::test_genesis_key.pub].time = std::chrono::steady_clock::now () - std::chrono::seconds (20); lock.unlock (); - ASSERT_FALSE (node1.active.vote (vote2)); + ASSERT_EQ (nano::vote_code::vote, node1.active.vote (vote2)); ASSERT_FALSE (node1.active.publish (send2)); lock.lock (); ASSERT_EQ (2, votes1->last_votes[nano::test_genesis_key.pub].sequence); // Also resend the old vote, and see if we respect the sequence number votes1->last_votes[nano::test_genesis_key.pub].time = std::chrono::steady_clock::now () - std::chrono::seconds (20); lock.unlock (); - ASSERT_TRUE (node1.active.vote (vote1)); + ASSERT_EQ (nano::vote_code::replay, node1.active.vote (vote1)); lock.lock (); ASSERT_EQ (2, votes1->last_votes[nano::test_genesis_key.pub].sequence); ASSERT_EQ (2, votes1->last_votes.size ()); diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 356b9cd858..35ea3960eb 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -2535,7 +2535,7 @@ TEST (node, vote_by_hash_bundle) nano::keypair key1; system.wallet (0)->insert_adhoc (key1.prv); - system.nodes[0]->observers.vote.add ([&max_hashes](std::shared_ptr vote_a, std::shared_ptr channel_a) { + system.nodes[0]->observers.vote.add ([&max_hashes](std::shared_ptr vote_a, std::shared_ptr, nano::vote_code) { if (vote_a->blocks.size () > max_hashes) { max_hashes = vote_a->blocks.size (); diff --git a/nano/core_test/websocket.cpp b/nano/core_test/websocket.cpp index 9789ac1fed..b6f42a377d 100644 --- a/nano/core_test/websocket.cpp +++ b/nano/core_test/websocket.cpp @@ -80,15 +80,11 @@ boost::optional websocket_test_call (std::string host, std::string /** Tests clients subscribing multiple times or unsubscribing without a subscription */ TEST (websocket, subscription_edge) { - nano::system system (1); + nano::system system; nano::node_config config (nano::get_available_port (), system.logging); - nano::node_flags node_flags; config.websocket_config.enabled = true; config.websocket_config.port = nano::get_available_port (); - - auto node1 (std::make_shared (system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags)); - node1->start (); - system.nodes.push_back (node1); + auto node1 (system.add_node (config)); ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::confirmation)); @@ -158,15 +154,11 @@ TEST (websocket, subscription_edge) // Test client subscribing to changes in active_difficulty TEST (websocket, active_difficulty) { - nano::system system (1); + nano::system system; nano::node_config config (nano::get_available_port (), system.logging); - nano::node_flags node_flags; config.websocket_config.enabled = true; config.websocket_config.port = nano::get_available_port (); - - auto node1 (std::make_shared (system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags)); - node1->start (); - system.nodes.push_back (node1); + auto node1 (system.add_node (config)); ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::active_difficulty)); @@ -226,16 +218,11 @@ TEST (websocket, active_difficulty) /** Subscribes to block confirmations, confirms a block and then awaits websocket notification */ TEST (websocket, confirmation) { - nano::system system (1); + nano::system system; nano::node_config config (nano::get_available_port (), system.logging); - nano::node_flags node_flags; config.websocket_config.enabled = true; config.websocket_config.port = nano::get_available_port (); - - auto node1 (std::make_shared (system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags)); - node1->wallets.create (nano::random_wallet_id ()); - node1->start (); - system.nodes.push_back (node1); + auto node1 (system.add_node (config)); // Start websocket test-client in a separate thread ack_ready = false; @@ -266,7 +253,7 @@ TEST (websocket, confirmation) ASSERT_TRUE (node1->websocket_server->any_subscriber (nano::websocket::topic::confirmation)); nano::keypair key; - system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv); + system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); auto balance = nano::genesis_amount; auto send_amount = node1->config.online_weight_minimum.number () + 1; // Quick-confirm a block, legacy blocks should work without filtering @@ -334,16 +321,11 @@ TEST (websocket, confirmation) /** Tests getting notification of an erased election */ TEST (websocket, stopped_election) { - nano::system system (1); + nano::system system; nano::node_config config (nano::get_available_port (), system.logging); - nano::node_flags node_flags; config.websocket_config.enabled = true; config.websocket_config.port = nano::get_available_port (); - - auto node1 (std::make_shared (system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags)); - node1->wallets.create (nano::random_wallet_id ()); - node1->start (); - system.nodes.push_back (node1); + auto node1 (system.add_node (config)); // Start websocket test-client in a separate thread ack_ready = false; @@ -394,16 +376,11 @@ TEST (websocket, stopped_election) /** Tests the filtering options of block confirmations */ TEST (websocket, confirmation_options) { - nano::system system (1); + nano::system system; nano::node_config config (nano::get_available_port (), system.logging); - nano::node_flags node_flags; config.websocket_config.enabled = true; config.websocket_config.port = nano::get_available_port (); - - auto node1 (std::make_shared (system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags)); - node1->wallets.create (nano::random_wallet_id ()); - node1->start (); - system.nodes.push_back (node1); + auto node1 (system.add_node (config)); // Start websocket test-client in a separate thread ack_ready = false; @@ -427,7 +404,7 @@ TEST (websocket, confirmation_options) ack_ready = false; // Confirm a state block for an in-wallet account - system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv); + system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); nano::keypair key; auto balance = nano::genesis_amount; auto send_amount = node1->config.online_weight_minimum.number () + 1; @@ -545,16 +522,11 @@ TEST (websocket, confirmation_options) /** Subscribes to votes, sends a block and awaits websocket notification of a vote arrival */ TEST (websocket, vote) { - nano::system system (1); + nano::system system; nano::node_config config (nano::get_available_port (), system.logging); - nano::node_flags node_flags; config.websocket_config.enabled = true; config.websocket_config.port = nano::get_available_port (); - - auto node1 (std::make_shared (system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags)); - node1->wallets.create (nano::random_wallet_id ()); - node1->start (); - system.nodes.push_back (node1); + auto node1 (system.add_node (config)); // Start websocket test-client in a separate thread ack_ready = false; @@ -587,7 +559,7 @@ TEST (websocket, vote) // Quick-confirm a block nano::keypair key; - system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv); + system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); nano::block_hash previous (node1->latest (nano::test_genesis_key.pub)); auto send (std::make_shared (nano::test_genesis_key.pub, previous, nano::test_genesis_key.pub, nano::genesis_amount - (node1->config.online_weight_minimum.number () + 1), key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (previous))); node1->process_active (send); @@ -603,19 +575,78 @@ TEST (websocket, vote) node1->stop (); } -/** Tests vote subscription options */ -TEST (websocket, vote_options) +/** Tests vote subscription options - vote type */ +TEST (websocket, vote_options_type) { - nano::system system (1); + nano::system system; nano::node_config config (nano::get_available_port (), system.logging); - nano::node_flags node_flags; config.websocket_config.enabled = true; config.websocket_config.port = nano::get_available_port (); + auto node1 (system.add_node (config)); + + ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::vote)); + + // Subscribe to votes and wait for response asynchronously + ack_ready = false; + std::atomic replay_received{ false }; + std::thread client_thread ([&replay_received, config]() { + auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port), + R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"include_replays": "true", "include_indeterminate": "false"}})json", true, true); + ASSERT_TRUE (response); + boost::property_tree::ptree event; + std::stringstream stream; + stream << response; + boost::property_tree::read_json (stream, event); + auto message_contents = event.get_child ("message"); + ASSERT_EQ (1, message_contents.count ("type")); + ASSERT_EQ ("replay", message_contents.get ("type")); + replay_received = true; + }); + + // Wait for acknowledge + system.deadline_set (5s); + while (!ack_ready) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::vote)); - auto node1 (std::make_shared (system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags)); - node1->wallets.create (nano::random_wallet_id ()); - node1->start (); - system.nodes.push_back (node1); + // Custom made votes for simplicity + nano::genesis genesis; + auto vote (std::make_shared (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, genesis.open)); + + // Indeterminates are not included + { + nano::websocket::message_builder builder; + auto msg (builder.vote_received (vote, nano::vote_code::indeterminate)); + node1->websocket_server->broadcast (msg); + } + + // Replays are included + { + nano::websocket::message_builder builder; + auto msg (builder.vote_received (vote, nano::vote_code::replay)); + node1->websocket_server->broadcast (msg); + } + + // Wait for the websocket client + system.deadline_set (5s); + while (!replay_received) + { + ASSERT_NO_ERROR (system.poll ()); + } + client_thread.join (); + node1->stop (); +} + +/** Tests vote subscription options - list of representatives */ +TEST (websocket, vote_options_representatives) +{ + nano::system system; + nano::node_config config (nano::get_available_port (), system.logging); + config.websocket_config.enabled = true; + config.websocket_config.port = nano::get_available_port (); + auto node1 (system.add_node (config)); // Start websocket test-client in a separate thread ack_ready = false; @@ -650,7 +681,7 @@ TEST (websocket, vote_options) // Quick-confirm a block nano::keypair key; auto balance = nano::genesis_amount; - system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv); + system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); auto send_amount = node1->config.online_weight_minimum.number () + 1; auto confirm_block = [&]() { nano::block_hash previous (node1->latest (nano::test_genesis_key.pub)); @@ -670,10 +701,10 @@ TEST (websocket, vote_options) std::atomic client_thread_2_finished{ false }; std::thread client_thread_2 ([&client_thread_2_finished, config]() { auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port), - R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"representatives": ["xrb_invalid"]}})json", true, true, 1s); + R"json({"action": "subscribe", "topic": "vote", "ack": true, "options": {"representatives": ["xrb_invalid"]}})json", true, true); - // No response expected given the filter - ASSERT_FALSE (response); + // A list of invalid representatives is the same as no filter + ASSERT_TRUE (response); client_thread_2_finished = true; }); @@ -690,7 +721,6 @@ TEST (websocket, vote_options) // Confirm another block confirm_block (); - // No response expected system.deadline_set (5s); while (!client_thread_2_finished) { @@ -705,15 +735,11 @@ TEST (websocket, vote_options) // Test client subscribing to notifications for work generation TEST (websocket, work) { - nano::system system (1); + nano::system system; nano::node_config config (nano::get_available_port (), system.logging); - nano::node_flags node_flags; config.websocket_config.enabled = true; config.websocket_config.port = nano::get_available_port (); - - auto node1 (std::make_shared (system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags)); - node1->start (); - system.nodes.push_back (node1); + auto node1 (system.add_node (config)); ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::work)); @@ -782,15 +808,12 @@ TEST (websocket, work) /** Tests clients subscribing multiple times or unsubscribing without a subscription */ TEST (websocket, ws_keepalive) { - nano::system system (1); + nano::system system; nano::node_config config (nano::get_available_port (), system.logging); - nano::node_flags node_flags; config.websocket_config.enabled = true; config.websocket_config.port = nano::get_available_port (); + auto node1 (system.add_node (config)); - auto node1 (std::make_shared (system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags)); - node1->start (); - system.nodes.push_back (node1); ack_ready = false; std::thread subscription_thread ([config]() { websocket_test_call ("::1", std::to_string (config.websocket_config.port), R"json({"action": "ping"})json", true, false); diff --git a/nano/lib/stats.cpp b/nano/lib/stats.cpp index 257fa6b40a..f2adc10577 100644 --- a/nano/lib/stats.cpp +++ b/nano/lib/stats.cpp @@ -564,6 +564,9 @@ std::string nano::stat::detail_to_string (uint32_t key) case nano::stat::detail::vote_replay: res = "vote_replay"; break; + case nano::stat::detail::vote_indeterminate: + res = "vote_indeterminate"; + break; case nano::stat::detail::vote_invalid: res = "vote_invalid"; break; diff --git a/nano/lib/stats.hpp b/nano/lib/stats.hpp index 92c9650b48..99568b09e0 100644 --- a/nano/lib/stats.hpp +++ b/nano/lib/stats.hpp @@ -255,6 +255,7 @@ class stat final // vote specific vote_valid, vote_replay, + vote_indeterminate, vote_invalid, vote_overflow, diff --git a/nano/node/active_transactions.cpp b/nano/node/active_transactions.cpp index ee9f5b3647..db04bae1f3 100644 --- a/nano/node/active_transactions.cpp +++ b/nano/node/active_transactions.cpp @@ -530,9 +530,11 @@ bool nano::active_transactions::add (std::shared_ptr block_a, bool } // Validate a vote and apply it to the current election if one exists -bool nano::active_transactions::vote (std::shared_ptr vote_a, bool single_lock) +nano::vote_code nano::active_transactions::vote (std::shared_ptr vote_a, bool single_lock) { - std::shared_ptr election; + // If none of the hashes are active, it is unknown whether it's a replay + // In this case, votes are also not republished + bool at_least_one (false); bool replay (false); bool processed (false); { @@ -550,9 +552,10 @@ bool nano::active_transactions::vote (std::shared_ptr vote_a, bool s auto existing (blocks.find (block_hash)); if (existing != blocks.end ()) { + at_least_one = true; result = existing->second->vote (vote_a->account, vote_a->sequence, block_hash); } - else + else // possibly a vote for a recently confirmed election { add_inactive_votes_cache (block_hash, vote_a->account); } @@ -563,6 +566,7 @@ bool nano::active_transactions::vote (std::shared_ptr vote_a, bool s auto existing (roots.get ().find (block->qualified_root ())); if (existing != roots.get ().end ()) { + at_least_one = true; result = existing->election->vote (vote_a->account, vote_a->sequence, block->hash ()); } else @@ -570,15 +574,22 @@ bool nano::active_transactions::vote (std::shared_ptr vote_a, bool s add_inactive_votes_cache (block->hash (), vote_a->account); } } - replay = replay || result.replay; processed = processed || result.processed; + replay = replay || result.replay; + } + } + if (at_least_one) + { + if (processed) + { + node.network.flood_vote (vote_a); } + return replay ? nano::vote_code::replay : nano::vote_code::vote; } - if (processed) + else { - node.network.flood_vote (vote_a); + return nano::vote_code::indeterminate; } - return replay; } bool nano::active_transactions::active (nano::qualified_root const & root_a) diff --git a/nano/node/active_transactions.hpp b/nano/node/active_transactions.hpp index b0f7d9b9dc..5493458c56 100644 --- a/nano/node/active_transactions.hpp +++ b/nano/node/active_transactions.hpp @@ -79,9 +79,8 @@ class active_transactions final // clang-format off bool start (std::shared_ptr, bool const = false, std::function)> const & = [](std::shared_ptr) {}); // clang-format on - // If this returns true, the vote is a replay - // If this returns false, the vote may or may not be a replay - bool vote (std::shared_ptr, bool = false); + // Distinguishes replay votes, cannot be determined if the block is not in any election + nano::vote_code vote (std::shared_ptr, bool = false); // Is the root of this block in the roots container bool active (nano::block const &); bool active (nano::qualified_root const &); diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 12d0f0277c..d6982bccff 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -303,38 +303,41 @@ startup_time (std::chrono::steady_clock::now ()) this->network.send_keepalive_self (channel_a); } }); - observers.vote.add ([this](std::shared_ptr vote_a, std::shared_ptr channel_a) { - this->gap_cache.vote (vote_a); - this->online_reps.observe (vote_a->account); - nano::uint128_t rep_weight; + observers.vote.add ([this](std::shared_ptr vote_a, std::shared_ptr channel_a, nano::vote_code code_a) { + if (code_a == nano::vote_code::vote || code_a == nano::vote_code::indeterminate) { - rep_weight = ledger.weight (vote_a->account); - } - if (rep_weight > minimum_principal_weight ()) - { - bool rep_crawler_exists (false); - for (auto hash : *vote_a) + this->gap_cache.vote (vote_a); + this->online_reps.observe (vote_a->account); + nano::uint128_t rep_weight; { - if (this->rep_crawler.exists (hash)) - { - rep_crawler_exists = true; - break; - } + rep_weight = ledger.weight (vote_a->account); } - if (rep_crawler_exists) + if (rep_weight > minimum_principal_weight ()) { - // We see a valid non-replay vote for a block we requested, this node is probably a representative - if (this->rep_crawler.response (channel_a, vote_a->account, rep_weight)) + bool rep_crawler_exists (false); + for (auto hash : *vote_a) { - logger.try_log (boost::str (boost::format ("Found a representative at %1%") % channel_a->to_string ())); - // Rebroadcasting all active votes to new representative - auto blocks (this->active.list_blocks (true)); - for (auto i (blocks.begin ()), n (blocks.end ()); i != n; ++i) + if (this->rep_crawler.exists (hash)) { - if (*i != nullptr) + rep_crawler_exists = true; + break; + } + } + if (rep_crawler_exists) + { + // We see a valid non-replay vote for a block we requested, this node is probably a representative + if (this->rep_crawler.response (channel_a, vote_a->account, rep_weight)) + { + logger.try_log (boost::str (boost::format ("Found a representative at %1%") % channel_a->to_string ())); + // Rebroadcasting all active votes to new representative + auto blocks (this->active.list_blocks (true)); + for (auto i (blocks.begin ()), n (blocks.end ()); i != n; ++i) { - nano::confirm_req req (*i); - channel_a->send (req); + if (*i != nullptr) + { + nano::confirm_req req (*i); + channel_a->send (req); + } } } } @@ -343,11 +346,11 @@ startup_time (std::chrono::steady_clock::now ()) }); if (websocket_server) { - observers.vote.add ([this](std::shared_ptr vote_a, std::shared_ptr channel_a) { + observers.vote.add ([this](std::shared_ptr vote_a, std::shared_ptr channel_a, nano::vote_code code_a) { if (this->websocket_server->any_subscriber (nano::websocket::topic::vote)) { nano::websocket::message_builder builder; - auto msg (builder.vote_received (vote_a)); + auto msg (builder.vote_received (vote_a, code_a)); this->websocket_server->broadcast (msg); } }); diff --git a/nano/node/node_observers.hpp b/nano/node/node_observers.hpp index 2a1d4aacb9..1fbf576f1c 100644 --- a/nano/node/node_observers.hpp +++ b/nano/node/node_observers.hpp @@ -13,7 +13,7 @@ class node_observers final using blocks_t = nano::observer_set; blocks_t blocks; nano::observer_set wallet; - nano::observer_set, std::shared_ptr> vote; + nano::observer_set, std::shared_ptr, nano::vote_code> vote; nano::observer_set active_stopped; nano::observer_set account_balance; nano::observer_set> endpoint; diff --git a/nano/node/vote_processor.cpp b/nano/node/vote_processor.cpp index 6ee37b92c2..280124589c 100644 --- a/nano/node/vote_processor.cpp +++ b/nano/node/vote_processor.cpp @@ -202,29 +202,16 @@ nano::vote_code nano::vote_processor::vote_blocking (nano::transaction const & t auto result (nano::vote_code::invalid); if (validated || !vote_a->validate ()) { + result = active.vote (vote_a, true); + observers.vote.notify (vote_a, channel_a, result); + // This tries to assist rep nodes that have lost track of their highest sequence number by replaying our highest known vote back to them + // Only do this if the sequence number is significantly different to account for network reordering + // Amplify attack considerations: We're sending out a confirm_ack in response to a confirm_ack for no net traffic increase auto max_vote (store.vote_max (transaction_a, vote_a)); - result = nano::vote_code::replay; - if (!active.vote (vote_a, true)) + if (max_vote->sequence > vote_a->sequence + 10000) { - result = nano::vote_code::vote; - } - switch (result) - { - case nano::vote_code::vote: - observers.vote.notify (vote_a, channel_a); - case nano::vote_code::replay: - // This tries to assist rep nodes that have lost track of their highest sequence number by replaying our highest known vote back to them - // Only do this if the sequence number is significantly different to account for network reordering - // Amplify attack considerations: We're sending out a confirm_ack in response to a confirm_ack for no net traffic increase - if (max_vote->sequence > vote_a->sequence + 10000) - { - nano::confirm_ack confirm (max_vote); - channel_a->send (confirm); // this is non essential traffic as it will be resolicited if not received - } - break; - case nano::vote_code::invalid: - assert (false); - break; + nano::confirm_ack confirm (max_vote); + channel_a->send (confirm); // this is non essential traffic as it will be resolicited if not received } } std::string status; @@ -242,6 +229,10 @@ nano::vote_code nano::vote_processor::vote_blocking (nano::transaction const & t status = "Vote"; stats.inc (nano::stat::type::vote, nano::stat::detail::vote_valid); break; + case nano::vote_code::indeterminate: + status = "Indeterminate"; + stats.inc (nano::stat::type::vote, nano::stat::detail::vote_indeterminate); + break; } if (config.logging.vote_logging ()) { diff --git a/nano/node/websocket.cpp b/nano/node/websocket.cpp index c45c7d121c..e1dd1e89d5 100644 --- a/nano/node/websocket.cpp +++ b/nano/node/websocket.cpp @@ -138,6 +138,8 @@ bool nano::websocket::confirmation_options::should_filter (nano::websocket::mess nano::websocket::vote_options::vote_options (boost::property_tree::ptree const & options_a, nano::logger_mt & logger_a) { + include_replays = options_a.get ("include_replays", false); + include_indeterminate = options_a.get ("include_indeterminate", false); auto representatives_l (options_a.get_child_optional ("representatives")); if (representatives_l) { @@ -154,21 +156,25 @@ nano::websocket::vote_options::vote_options (boost::property_tree::ptree const & logger_a.always_log ("Websocket: invalid account given to filter votes: ", representative_l.second.data ()); } } - } - // Warn the user if the options resulted in an empty filter - if (representatives.empty ()) - { - logger_a.always_log ("Websocket: provided options resulted in an empty vote filter"); + // Warn the user if the option will be ignored + if (representatives.empty ()) + { + logger_a.always_log ("Websocket: account filter for votes is empty, no messages will be filtered"); + } } } bool nano::websocket::vote_options::should_filter (nano::websocket::message const & message_a) const { - bool should_filter_l (true); - auto representative_text_l (message_a.contents.get ("message.account")); - if (representatives.find (representative_text_l) != representatives.end ()) + auto type (message_a.contents.get ("message.type")); + bool should_filter_l = (!include_replays && type == "replay") || (!include_indeterminate && type == "indeterminate"); + if (!should_filter_l && !representatives.empty ()) { - should_filter_l = false; + auto representative_text_l (message_a.contents.get ("message.account")); + if (representatives.find (representative_text_l) == representatives.end ()) + { + should_filter_l = true; + } } return should_filter_l; } @@ -656,7 +662,7 @@ nano::websocket::message nano::websocket::message_builder::block_confirmed (std: return message_l; } -nano::websocket::message nano::websocket::message_builder::vote_received (std::shared_ptr vote_a) +nano::websocket::message nano::websocket::message_builder::vote_received (std::shared_ptr vote_a, nano::vote_code code_a) { nano::websocket::message message_l (nano::websocket::topic::vote); set_common_fields (message_l); @@ -664,6 +670,25 @@ nano::websocket::message nano::websocket::message_builder::vote_received (std::s // Vote information boost::property_tree::ptree vote_node_l; vote_a->serialize_json (vote_node_l); + + // Vote processing information + std::string vote_type = "invalid"; + switch (code_a) + { + case nano::vote_code::vote: + vote_type = "vote"; + break; + case nano::vote_code::replay: + vote_type = "replay"; + break; + case nano::vote_code::indeterminate: + vote_type = "indeterminate"; + break; + case nano::vote_code::invalid: + assert (false); + break; + } + vote_node_l.put ("type", vote_type); message_l.contents.add_child ("message", vote_node_l); return message_l; } diff --git a/nano/node/websocket.hpp b/nano/node/websocket.hpp index 603bb5bbfd..cc2dafa92e 100644 --- a/nano/node/websocket.hpp +++ b/nano/node/websocket.hpp @@ -81,7 +81,7 @@ namespace websocket public: message block_confirmed (std::shared_ptr block_a, nano::account const & account_a, nano::amount const & amount_a, std::string subtype, bool include_block, nano::election_status const & election_status_a, nano::websocket::confirmation_options const & options_a); message stopped_election (nano::block_hash const & hash_a); - message vote_received (std::shared_ptr vote_a); + message vote_received (std::shared_ptr vote_a, nano::vote_code code_a); message difficulty_changed (uint64_t publish_threshold_a, uint64_t difficulty_active_a); message work_generation (nano::block_hash const & root_a, uint64_t const work_a, uint64_t const difficulty_a, uint64_t const publish_threshold_a, std::chrono::milliseconds const & duration_a, std::string const & peer_a, std::vector const & bad_peers_a, bool const completed_a = true, bool const cancelled_a = false); message work_cancelled (nano::block_hash const & root_a, uint64_t const difficulty_a, uint64_t const publish_threshold_a, std::chrono::milliseconds const & duration_a, std::vector const & bad_peers_a); @@ -179,6 +179,8 @@ namespace websocket private: std::unordered_set representatives; + bool include_replays{ false }; + bool include_indeterminate{ false }; }; /** A websocket session managing its own lifetime */ diff --git a/nano/secure/common.hpp b/nano/secure/common.hpp index 9481c61f0b..a4f9573b9b 100644 --- a/nano/secure/common.hpp +++ b/nano/secure/common.hpp @@ -282,7 +282,8 @@ enum class vote_code { invalid, // Vote is not signed correctly replay, // Vote does not have the highest sequence number, it's a replay - vote // Vote has the highest sequence number + vote, // Vote has the highest sequence number + indeterminate // Unknown if replay or vote }; enum class process_result