From d13820d217c78cb5db5e53f6fefe92888afe658c Mon Sep 17 00:00:00 2001 From: Wesley Shillingford Date: Wed, 29 Jan 2020 09:10:49 +0000 Subject: [PATCH 01/12] Poll all nodes and remove some metrics from bounds when consolidating --- nano/core_test/node_telemetry.cpp | 559 +++++++++++++---------- nano/lib/utility.hpp | 15 +- nano/node/bootstrap/bootstrap_server.cpp | 18 +- nano/node/common.cpp | 78 +++- nano/node/common.hpp | 11 + nano/node/json_handler.cpp | 22 +- nano/node/network.cpp | 8 +- nano/node/network.hpp | 3 +- nano/node/telemetry.cpp | 254 +++++++--- nano/node/telemetry.hpp | 69 ++- nano/node/transport/tcp.cpp | 13 +- nano/node/transport/tcp.hpp | 3 +- nano/node/transport/transport.hpp | 13 + nano/node/transport/udp.cpp | 31 +- nano/node/transport/udp.hpp | 6 +- nano/rpc_test/rpc.cpp | 26 +- nano/slow_test/node.cpp | 239 ++++++++++ 17 files changed, 989 insertions(+), 379 deletions(-) diff --git a/nano/core_test/node_telemetry.cpp b/nano/core_test/node_telemetry.cpp index 1c06daf543..4dab005c53 100644 --- a/nano/core_test/node_telemetry.cpp +++ b/nano/core_test/node_telemetry.cpp @@ -83,15 +83,64 @@ TEST (node_telemetry, consolidate_data) ASSERT_EQ (consolidated_telemetry_data, consolidated_telemetry_data); } +TEST (node_telemetry, consolidate_data_remove_outliers) +{ + nano::telemetry_data data; + data.account_count = 2; + data.block_count = 1; + data.cemented_count = 1; + data.vendor_version = 20; + data.protocol_version_number = 12; + data.peer_count = 2; + data.bandwidth_cap = 100; + data.unchecked_count = 3; + data.uptime = 6; + data.genesis_block = nano::block_hash (3); + + // Insert 20 of these, and 2 outliers at the lower and upper bounds which should get removed + std::vector all_data (20, data); + + // Insert some outliers + nano::telemetry_data outlier_data; + outlier_data.account_count = 1; + outlier_data.block_count = 0; + outlier_data.cemented_count = 0; + outlier_data.vendor_version = 11; + outlier_data.protocol_version_number = 11; + outlier_data.peer_count = 0; + outlier_data.bandwidth_cap = 8; + outlier_data.unchecked_count = 1; + outlier_data.uptime = 2; + outlier_data.genesis_block = nano::block_hash (2); + all_data.push_back (outlier_data); + all_data.push_back (outlier_data); + + nano::telemetry_data outlier_data1; + outlier_data1.account_count = 99; + outlier_data1.block_count = 99; + outlier_data1.cemented_count = 99; + outlier_data1.vendor_version = 99; + outlier_data1.protocol_version_number = 99; + outlier_data1.peer_count = 99; + outlier_data1.bandwidth_cap = 999; + outlier_data1.unchecked_count = 99; + outlier_data1.uptime = 999; + outlier_data1.genesis_block = nano::block_hash (99); + all_data.push_back (outlier_data1); + all_data.push_back (outlier_data1); + + auto consolidated_telemetry_data = nano::telemetry_data::consolidate (all_data); + ASSERT_EQ (data, consolidated_telemetry_data); +} + TEST (node_telemetry, no_peers) { nano::system system (1); std::atomic done{ false }; - system.nodes[0]->telemetry.get_metrics_random_peers_async ([&done](nano::telemetry_data_responses const & responses_a) { - ASSERT_TRUE (responses_a.data.empty ()); + system.nodes[0]->telemetry.get_metrics_peers_async ([&done](nano::telemetry_data_responses const & responses_a) { + ASSERT_TRUE (responses_a.telemetry_data_time_pairs.empty ()); ASSERT_FALSE (responses_a.all_received); - ASSERT_FALSE (responses_a.is_cached); done = true; }); @@ -114,13 +163,12 @@ TEST (node_telemetry, basic) wait_peer_connections (system); // Request telemetry metrics - std::vector all_telemetry_data; + std::unordered_map all_telemetry_data_time_pairs; { std::atomic done{ false }; - node_client->telemetry.get_metrics_random_peers_async ([&done, &all_telemetry_data](nano::telemetry_data_responses const & responses_a) { - ASSERT_FALSE (responses_a.is_cached); + node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) { ASSERT_TRUE (responses_a.all_received); - all_telemetry_data = responses_a.data; + all_telemetry_data_time_pairs = responses_a.telemetry_data_time_pairs; done = true; }); @@ -132,16 +180,14 @@ TEST (node_telemetry, basic) } // Check the metrics are correct - ASSERT_EQ (all_telemetry_data.size (), 1); - auto & telemetry_data = all_telemetry_data.front (); - compare_default_test_result_data (telemetry_data, *node_server); + ASSERT_EQ (all_telemetry_data_time_pairs.size (), 1); + compare_default_test_result_data (all_telemetry_data_time_pairs.begin ()->second.data, *node_server); // Call again straight away. It should use the cache { std::atomic done{ false }; - node_client->telemetry.get_metrics_random_peers_async ([&done, &telemetry_data](nano::telemetry_data_responses const & responses_a) { - ASSERT_EQ (telemetry_data, responses_a.data.front ()); - ASSERT_TRUE (responses_a.is_cached); + node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) { + ASSERT_EQ (all_telemetry_data_time_pairs, responses_a.telemetry_data_time_pairs); ASSERT_TRUE (responses_a.all_received); done = true; }); @@ -154,11 +200,13 @@ TEST (node_telemetry, basic) } // Wait the cache period and check cache is not used - std::this_thread::sleep_for (nano::telemetry_impl::cache_cutoff); + std::this_thread::sleep_for (nano::telemetry_cache_cutoffs::test); + // Arbitrarily change something so that we can confirm different metrics were used + node_server->ledger.cache.block_count = 100; std::atomic done{ false }; - node_client->telemetry.get_metrics_random_peers_async ([&done, &telemetry_data](nano::telemetry_data_responses const & responses_a) { - ASSERT_FALSE (responses_a.is_cached); + node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) { + ASSERT_NE (all_telemetry_data_time_pairs, responses_a.telemetry_data_time_pairs); ASSERT_TRUE (responses_a.all_received); done = true; }); @@ -200,11 +248,10 @@ TEST (node_telemetry, many_nodes) auto node_client = system.nodes.front (); std::atomic done{ false }; - std::vector all_telemetry_data; - node_client->telemetry.get_metrics_random_peers_async ([&done, &all_telemetry_data](nano::telemetry_data_responses const & responses_a) { - ASSERT_FALSE (responses_a.is_cached); + std::unordered_map all_telemetry_data_time_pairs; + node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) { ASSERT_TRUE (responses_a.all_received); - all_telemetry_data = responses_a.data; + all_telemetry_data_time_pairs = responses_a.telemetry_data_time_pairs; done = true; }); @@ -216,8 +263,9 @@ TEST (node_telemetry, many_nodes) // Check the metrics nano::network_params params; - for (auto & data : all_telemetry_data) + for (auto & telemetry_data_time_pair : all_telemetry_data_time_pairs) { + auto & data = telemetry_data_time_pair.second.data; ASSERT_EQ (data.unchecked_count, 0); ASSERT_EQ (data.cemented_count, 1); ASSERT_LE (data.peer_count, 9); @@ -232,8 +280,10 @@ TEST (node_telemetry, many_nodes) } // We gave some nodes different bandwidth caps, confirm they are not all the time - auto all_bandwidth_limits_same = std::all_of (all_telemetry_data.begin () + 1, all_telemetry_data.end (), [bandwidth_cap = all_telemetry_data[0].bandwidth_cap](auto & telemetry) { - return telemetry.bandwidth_cap == bandwidth_cap; + auto bandwidth_cap = all_telemetry_data_time_pairs.begin ()->second.data.bandwidth_cap; + all_telemetry_data_time_pairs.erase (all_telemetry_data_time_pairs.begin ()); + auto all_bandwidth_limits_same = std::all_of (all_telemetry_data_time_pairs.begin (), all_telemetry_data_time_pairs.end (), [bandwidth_cap](auto & telemetry_data_time_pair) { + return telemetry_data_time_pair.second.data.bandwidth_cap == bandwidth_cap; }); ASSERT_FALSE (all_bandwidth_limits_same); } @@ -259,11 +309,10 @@ TEST (node_telemetry, over_udp) wait_peer_connections (system); std::atomic done{ false }; - std::vector all_telemetry_data; - node_client->telemetry.get_metrics_random_peers_async ([&done, &all_telemetry_data](nano::telemetry_data_responses const & responses_a) { - ASSERT_FALSE (responses_a.is_cached); + std::unordered_map all_telemetry_data_time_pairs; + node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) { ASSERT_TRUE (responses_a.all_received); - all_telemetry_data = responses_a.data; + all_telemetry_data_time_pairs = responses_a.telemetry_data_time_pairs; done = true; }); @@ -273,8 +322,8 @@ TEST (node_telemetry, over_udp) ASSERT_NO_ERROR (system.poll ()); } - ASSERT_EQ (all_telemetry_data.size (), 1); - compare_default_test_result_data (all_telemetry_data.front (), *node_server); + ASSERT_EQ (all_telemetry_data_time_pairs.size (), 1); + compare_default_test_result_data (all_telemetry_data_time_pairs.begin ()->second.data, *node_server); // Check channels are indeed udp ASSERT_EQ (1, node_client->network.size ()); @@ -287,87 +336,6 @@ TEST (node_telemetry, over_udp) ASSERT_EQ (nano::transport::transport_type::udp, list2[0]->get_type ()); } -TEST (node_telemetry, simultaneous_random_requests) -{ - const auto num_nodes = 4; - nano::system system (num_nodes); - - // Wait until peers are stored as they are done in the background - wait_peer_connections (system); - - std::vector threads; - const auto num_threads = 4; - - std::atomic done{ false }; - class Data - { - public: - std::atomic awaiting_cache{ false }; - std::atomic keep_requesting_metrics{ true }; - std::shared_ptr node; - }; - - std::array all_data{}; - for (auto i = 0; i < num_nodes; ++i) - { - all_data[i].node = system.nodes[i]; - } - - std::atomic count{ 0 }; - std::promise promise; - std::shared_future shared_future (promise.get_future ()); - - // Create a few threads where each node sends out telemetry request messages to all other nodes continuously, until the cache it reached and subsequently expired. - // The test waits until all telemetry_ack messages have been received. - for (int i = 0; i < num_threads; ++i) - { - threads.emplace_back ([&all_data, &done, &count, &promise, &shared_future]() { - while (std::any_of (all_data.cbegin (), all_data.cend (), [](auto const & data) { return data.keep_requesting_metrics.load (); })) - { - for (auto & data : all_data) - { - // Keep calling get_metrics_async until the cache has been saved and then become outdated (after a certain period of time) for each node - if (data.keep_requesting_metrics) - { - ++count; - - data.node->telemetry.get_metrics_random_peers_async ([&promise, &done, &data, &all_data, &count](nano::telemetry_data_responses const & responses_a) { - if (data.awaiting_cache && !responses_a.is_cached) - { - data.keep_requesting_metrics = false; - } - if (responses_a.is_cached) - { - data.awaiting_cache = true; - } - if (--count == 0 && std::all_of (all_data.begin (), all_data.end (), [](auto const & data) { return !data.keep_requesting_metrics; })) - { - done = true; - promise.set_value (); - } - }); - } - std::this_thread::sleep_for (1ms); - } - } - - ASSERT_EQ (count, 0); - shared_future.wait (); - }); - } - - system.deadline_set (20s); - while (!done) - { - ASSERT_NO_ERROR (system.poll ()); - } - - for (auto & thread : threads) - { - thread.join (); - } -} - namespace nano { TEST (node_telemetry, single_request) @@ -381,14 +349,14 @@ TEST (node_telemetry, single_request) // Request telemetry metrics auto channel = node_client->network.find_channel (node_server->network.endpoint ()); - nano::telemetry_data telemetry_data; + nano::telemetry_data_time_pair telemetry_data_time_pair; { std::atomic done{ false }; - node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) { - ASSERT_FALSE (response_a.is_cached); + node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data_time_pair, &channel](nano::telemetry_data_response const & response_a) { ASSERT_FALSE (response_a.error); - telemetry_data = response_a.data; + ASSERT_EQ (channel->get_endpoint (), response_a.endpoint); + telemetry_data_time_pair = response_a.telemetry_data_time_pair; done = true; }); @@ -400,14 +368,13 @@ TEST (node_telemetry, single_request) } // Check the metrics are correct - compare_default_test_result_data (telemetry_data, *node_server); + compare_default_test_result_data (telemetry_data_time_pair.data, *node_server); // Call again straight away. It should use the cache { std::atomic done{ false }; - node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) { - ASSERT_EQ (telemetry_data, response_a.data); - ASSERT_TRUE (response_a.is_cached); + node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data_time_pair](nano::telemetry_data_response const & response_a) { + ASSERT_EQ (telemetry_data_time_pair, response_a.telemetry_data_time_pair); ASSERT_FALSE (response_a.error); done = true; }); @@ -420,11 +387,11 @@ TEST (node_telemetry, single_request) } // Wait the cache period and check cache is not used - std::this_thread::sleep_for (nano::telemetry_impl::cache_cutoff); + std::this_thread::sleep_for (nano::telemetry_cache_cutoffs::test); std::atomic done{ false }; - node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) { - ASSERT_FALSE (response_a.is_cached); + node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data_time_pair](nano::telemetry_data_response const & response_a) { + ASSERT_NE (telemetry_data_time_pair, response_a.telemetry_data_time_pair); ASSERT_FALSE (response_a.error); done = true; }); @@ -457,100 +424,6 @@ TEST (node_telemetry, single_request_invalid_channel) } } -TEST (node_telemetry, simultaneous_single_and_random_requests) -{ - const auto num_nodes = 4; - nano::system system (num_nodes); - - wait_peer_connections (system); - - std::vector threads; - const auto num_threads = 4; - - class data - { - public: - std::atomic awaiting_cache{ false }; - std::atomic keep_requesting_metrics{ true }; - std::shared_ptr node; - }; - - std::array node_data_single{}; - std::array node_data_random{}; - for (auto i = 0; i < num_nodes; ++i) - { - node_data_single[i].node = system.nodes[i]; - node_data_random[i].node = system.nodes[i]; - } - - class shared_data - { - public: - std::atomic done{ false }; - std::atomic count{ 0 }; - std::promise promise; - std::shared_future shared_future{ promise.get_future () }; - }; - - shared_data shared_data_single; - shared_data shared_data_random; - - // Create a few threads where each node sends out telemetry request messages to all other nodes continuously, until the cache it reached and subsequently expired. - // The test waits until all telemetry_ack messages have been received. - for (int i = 0; i < num_threads; ++i) - { - threads.emplace_back ([&node_data_single, &node_data_random, &shared_data_single, &shared_data_random]() { - auto func = [](auto & all_node_data_a, shared_data & shared_data_a) { - while (std::any_of (all_node_data_a.cbegin (), all_node_data_a.cend (), [](auto const & data) { return data.keep_requesting_metrics.load (); })) - { - for (auto & data : all_node_data_a) - { - // Keep calling get_metrics_async until the cache has been saved and then become outdated (after a certain period of time) for each node - if (data.keep_requesting_metrics) - { - ++shared_data_a.count; - - data.node->telemetry.get_metrics_random_peers_async ([& shared_data = shared_data_a, &data, &all_node_data = all_node_data_a](nano::telemetry_data_responses const & responses_a) { - if (data.awaiting_cache && !responses_a.is_cached) - { - data.keep_requesting_metrics = false; - } - if (responses_a.is_cached) - { - data.awaiting_cache = true; - } - if (--shared_data.count == 0 && std::all_of (all_node_data.begin (), all_node_data.end (), [](auto const & data) { return !data.keep_requesting_metrics; })) - { - shared_data.done = true; - shared_data.promise.set_value (); - } - }); - } - std::this_thread::sleep_for (1ms); - } - } - - ASSERT_EQ (shared_data_a.count, 0); - shared_data_a.shared_future.wait (); - }; - - func (node_data_single, shared_data_single); - func (node_data_random, shared_data_random); - }); - } - - system.deadline_set (20s); - while (!shared_data_single.done || !shared_data_random.done) - { - ASSERT_NO_ERROR (system.poll ()); - } - - for (auto & thread : threads) - { - thread.join (); - } -} - TEST (node_telemetry, blocking_single_and_random) { nano::system system (2); @@ -581,16 +454,15 @@ TEST (node_telemetry, blocking_single_and_random) node_client->worker.push_task (call_system_poll); // Blocking version of get_random_metrics_async - auto telemetry_data_responses = node_client->telemetry.get_metrics_random_peers (); - ASSERT_FALSE (telemetry_data_responses.is_cached); + auto telemetry_data_responses = node_client->telemetry.get_metrics_peers (); ASSERT_TRUE (telemetry_data_responses.all_received); - compare_default_test_result_data (telemetry_data_responses.data.front (), *node_server); + compare_default_test_result_data (telemetry_data_responses.telemetry_data_time_pairs.begin ()->second.data, *node_server); // Now try single request metric auto telemetry_data_response = node_client->telemetry.get_metrics_single_peer (node_client->network.find_channel (node_server->network.endpoint ())); - ASSERT_FALSE (telemetry_data_response.is_cached); ASSERT_FALSE (telemetry_data_response.error); - compare_default_test_result_data (telemetry_data_response.data, *node_server); + compare_default_test_result_data (telemetry_data_response.telemetry_data_time_pair.data, *node_server); + ASSERT_EQ (telemetry_data_response.telemetry_data_time_pair.last_updated, telemetry_data_responses.telemetry_data_time_pairs.begin ()->second.last_updated); done = true; promise.get_future ().wait (); @@ -615,15 +487,14 @@ TEST (node_telemetry, multiple_single_request_clearing) auto channel = node_client->network.find_channel (node_server->network.endpoint ()); std::atomic done{ false }; - node_client->telemetry.get_metrics_single_peer_async (channel, [&done](nano::telemetry_data_response const & response_a) { + std::chrono::steady_clock::time_point last_updated; + node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &last_updated](nano::telemetry_data_response const & response_a) { ASSERT_FALSE (response_a.error); - ASSERT_FALSE (response_a.is_cached); + last_updated = response_a.telemetry_data_time_pair.last_updated; done = true; }); ASSERT_EQ (1, node_client->telemetry.single_requests.size ()); - auto last_updated = node_client->telemetry.single_requests.begin ()->second.last_updated; - system.deadline_set (10s); while (!done) { @@ -631,11 +502,11 @@ TEST (node_telemetry, multiple_single_request_clearing) } done = false; - // Make another request to keep + // Make another request to keep the time updated system.deadline_set (10s); - node_client->telemetry.get_metrics_single_peer_async (channel, [&done](nano::telemetry_data_response const & response_a) { + node_client->telemetry.get_metrics_single_peer_async (channel, [&done, last_updated](nano::telemetry_data_response const & response_a) { ASSERT_FALSE (response_a.error); - ASSERT_TRUE (response_a.is_cached); + ASSERT_EQ (last_updated, response_a.telemetry_data_time_pair.last_updated); done = true; }); @@ -649,9 +520,10 @@ TEST (node_telemetry, multiple_single_request_clearing) done = false; auto channel1 = node_client->network.find_channel (node_server1->network.endpoint ()); - node_client->telemetry.get_metrics_single_peer_async (channel1, [&done](nano::telemetry_data_response const & response_a) { + node_client->telemetry.get_metrics_single_peer_async (channel1, [&done, &last_updated](nano::telemetry_data_response const & response_a) { ASSERT_FALSE (response_a.error); - ASSERT_FALSE (response_a.is_cached); + ASSERT_NE (last_updated, response_a.telemetry_data_time_pair.last_updated); + last_updated = response_a.telemetry_data_time_pair.last_updated; done = true; }); @@ -663,9 +535,9 @@ TEST (node_telemetry, multiple_single_request_clearing) } done = false; - node_client->telemetry.get_metrics_single_peer_async (channel1, [&done](nano::telemetry_data_response const & response_a) { + node_client->telemetry.get_metrics_single_peer_async (channel1, [&done, last_updated](nano::telemetry_data_response const & response_a) { ASSERT_FALSE (response_a.error); - ASSERT_TRUE (response_a.is_cached); + ASSERT_EQ (last_updated, response_a.telemetry_data_time_pair.last_updated); done = true; }); @@ -696,7 +568,7 @@ TEST (node_telemetry, disconnects) ASSERT_TRUE (channel); std::atomic done{ false }; - node_client->telemetry.get_metrics_random_peers_async ([&done](nano::telemetry_data_responses const & responses_a) { + node_client->telemetry.get_metrics_peers_async ([&done](nano::telemetry_data_responses const & responses_a) { ASSERT_FALSE (responses_a.all_received); done = true; }); @@ -720,6 +592,229 @@ TEST (node_telemetry, disconnects) } } +TEST (node_telemetry, batch_use_single_request_cache) +{ + nano::system system (2); + + auto node_client = system.nodes.front (); + auto node_server = system.nodes.back (); + + wait_peer_connections (system); + + // Request telemetry metrics + nano::telemetry_data_time_pair telemetry_data_time_pair; + { + std::atomic done{ false }; + auto channel = node_client->network.find_channel (node_server->network.endpoint ()); + node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data_time_pair](nano::telemetry_data_response const & response_a) { + telemetry_data_time_pair = response_a.telemetry_data_time_pair; + done = true; + }); + + system.deadline_set (10s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + } + + { + std::atomic done{ false }; + node_client->telemetry.get_metrics_peers_async ([&done, &telemetry_data_time_pair](nano::telemetry_data_responses const & responses_a) { + ASSERT_TRUE (responses_a.all_received); + ASSERT_EQ (telemetry_data_time_pair, responses_a.telemetry_data_time_pairs.begin ()->second); + done = true; + }); + + system.deadline_set (10s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + } + + // Confirm only 1 request was made + ASSERT_EQ (1, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); + ASSERT_EQ (0, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); + ASSERT_EQ (1, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); + ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); + ASSERT_EQ (1, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); + ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); + + // Wait until there is something pending + system.deadline_set (10s); + while (node_client->telemetry.finished_single_requests_size () == 0) + { + ASSERT_NO_ERROR (system.poll ()); + } + + system.deadline_set (10s); + std::atomic done{ false }; + node_client->telemetry.get_metrics_peers_async ([&done, &telemetry_data_time_pair](nano::telemetry_data_responses const & responses_a) { + ASSERT_EQ (1, responses_a.telemetry_data_time_pairs.size ()); + done = true; + }); + + system.deadline_set (10s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + + ASSERT_EQ (0, node_client->telemetry.finished_single_requests_size ()); + ASSERT_EQ (2, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); + ASSERT_EQ (0, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); + ASSERT_EQ (2, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); + ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); + ASSERT_EQ (2, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); + ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); +} + +TEST (node_telemetry, single_request_use_batch_cache) +{ + nano::system system (2); + + auto node_client = system.nodes.front (); + auto node_server = system.nodes.back (); + + wait_peer_connections (system); + + // Request batched metric first + std::unordered_map all_telemetry_data_time_pairs; + { + std::atomic done{ false }; + node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) { + ASSERT_TRUE (responses_a.all_received); + ASSERT_EQ (1, responses_a.telemetry_data_time_pairs.size ()); + all_telemetry_data_time_pairs = responses_a.telemetry_data_time_pairs; + done = true; + }); + + system.deadline_set (10s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + } + + std::atomic done{ false }; + auto channel = node_client->network.find_channel (node_server->network.endpoint ()); + node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &all_telemetry_data_time_pairs](nano::telemetry_data_response const & response_a) { + ASSERT_EQ (all_telemetry_data_time_pairs.begin ()->second, response_a.telemetry_data_time_pair); + ASSERT_FALSE (response_a.error); + done = true; + }); + + system.deadline_set (10s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + + // Confirm only 1 request was made + ASSERT_EQ (1, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); + ASSERT_EQ (0, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); + ASSERT_EQ (1, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); + ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); + ASSERT_EQ (1, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); + ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); +} + +TEST (node_telemetry, dos_tcp) +{ + // Confirm that telemetry_reqs are not processed + nano::system system (2); + + auto node_client = system.nodes.front (); + auto node_server = system.nodes.back (); + + wait_peer_connections (system); + + nano::telemetry_req message; + auto channel = node_client->network.tcp_channels.find_channel (nano::transport::map_endpoint_to_tcp (node_server->network.endpoint ())); + channel->send (message, [](boost::system::error_code const & ec, size_t size_a) { + ASSERT_FALSE (ec); + }); + + system.deadline_set (10s); + while (1 != node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)) + { + ASSERT_NO_ERROR (system.poll ()); + } + + auto orig = std::chrono::steady_clock::now (); + for (int i = 0; i < 10; ++i) + { + channel->send (message, [](boost::system::error_code const & ec, size_t size_a) { + ASSERT_FALSE (ec); + }); + } + + system.deadline_set (10s); + while ((nano::telemetry_cache_cutoffs::test + orig) > std::chrono::steady_clock::now ()) + { + ASSERT_NO_ERROR (system.poll ()); + } + + // Should process no more telemetry_req messages + ASSERT_EQ (1, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); + + // Now spam messages waiting for it to be processed + while (node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in) == 1) + { + channel->send (message); + ASSERT_NO_ERROR (system.poll ()); + } +} + +TEST (node_telemetry, dos_udp) +{ + // Confirm that telemetry_reqs are not processed + nano::system system (2); + + auto node_client = system.nodes.front (); + auto node_server = system.nodes.back (); + + wait_peer_connections (system); + + nano::telemetry_req message; + auto channel (node_server->network.udp_channels.create (node_server->network.endpoint ())); + channel->send (message, [](boost::system::error_code const & ec, size_t size_a) { + ASSERT_FALSE (ec); + }); + + system.deadline_set (20s); + while (1 != node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)) + { + ASSERT_NO_ERROR (system.poll ()); + } + + auto orig = std::chrono::steady_clock::now (); + for (int i = 0; i < 10; ++i) + { + channel->send (message, [](boost::system::error_code const & ec, size_t size_a) { + ASSERT_FALSE (ec); + }); + } + + system.deadline_set (20s); + while ((nano::telemetry_cache_cutoffs::test + orig) > std::chrono::steady_clock::now ()) + { + ASSERT_NO_ERROR (system.poll ()); + } + + // Should process no more telemetry_req messages + ASSERT_EQ (1, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); + + // Now spam messages waiting for it to be processed + system.deadline_set (20s); + while (node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in) == 1) + { + channel->send (message); + ASSERT_NO_ERROR (system.poll ()); + } +} + namespace { void wait_peer_connections (nano::system & system_a) diff --git a/nano/lib/utility.hpp b/nano/lib/utility.hpp index 37dbca344f..4b66d3feb7 100644 --- a/nano/lib/utility.hpp +++ b/nano/lib/utility.hpp @@ -147,8 +147,21 @@ std::unique_ptr collect_container_info (observer_set +void transform_if (InputIt first, InputIt last, OutputIt dest, Pred pred, Func transform) +{ + while (first != last) + { + if (pred (*first)) + { + *dest++ = transform (*first); + } + + ++first; + } +} } -// Have our own async_write which we must use? void release_assert_internal (bool check, const char * check_expr, const char * file, unsigned int line); #define release_assert(check) release_assert_internal (check, #check, __FILE__, __LINE__) diff --git a/nano/node/bootstrap/bootstrap_server.cpp b/nano/node/bootstrap/bootstrap_server.cpp index dd005ee7f1..36e86f4ed2 100644 --- a/nano/node/bootstrap/bootstrap_server.cpp +++ b/nano/node/bootstrap/bootstrap_server.cpp @@ -239,10 +239,24 @@ void nano::bootstrap_server::receive_header_action (boost::system::error_code co } case nano::message_type::telemetry_req: { - node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::telemetry_req, nano::stat::dir::in); if (is_realtime_connection ()) { - add_request (std::make_unique (header)); + auto find_channel (node->network.tcp_channels.find_channel (remote_endpoint)); + if (!find_channel) + { + find_channel = node->network.tcp_channels.find_node_id (remote_node_id); + } + + // Only handle telemetry requests if they are outside of the cutoff time + auto is_very_first_message = find_channel->get_last_telemetry_req () == std::chrono::steady_clock::time_point{}; + auto cache_exceeded = std::chrono::steady_clock::now () >= find_channel->get_last_telemetry_req () + nano::telemetry_cache_cutoffs::network_to_time (node->network_params.network); + if (find_channel && (is_very_first_message || cache_exceeded)) + { + node->network.tcp_channels.modify (find_channel, [](std::shared_ptr channel_a) { + channel_a->set_last_telemetry_req (std::chrono::steady_clock::now ()); + }); + add_request (std::make_unique (header)); + } } receive (); break; diff --git a/nano/node/common.cpp b/nano/node/common.cpp index 6b9d225861..cdaf9f014c 100644 --- a/nano/node/common.cpp +++ b/nano/node/common.cpp @@ -10,9 +10,16 @@ #include #include +#include +#include + std::bitset<16> constexpr nano::message_header::block_type_mask; std::bitset<16> constexpr nano::message_header::count_mask; +std::chrono::seconds constexpr nano::telemetry_cache_cutoffs::test; +std::chrono::seconds constexpr nano::telemetry_cache_cutoffs::beta; +std::chrono::seconds constexpr nano::telemetry_cache_cutoffs::live; + namespace { nano::protocol_constants const & get_protocol_constants () @@ -1148,43 +1155,61 @@ nano::telemetry_data nano::telemetry_data::consolidate (std::vector protocol_versions; std::unordered_map vendor_versions; std::unordered_map bandwidth_caps; std::unordered_map genesis_blocks; - nano::uint128_t account_average{ 0 }; + // Use a trimmed average which excludes the upper and lower 5% of the results + std::multiset account_counts; + std::multiset block_counts; + std::multiset cemented_counts; + std::multiset peer_counts; + std::multiset unchecked_counts; + std::multiset uptime_counts; + std::multiset bandwidth_counts; for (auto const & telemetry_data : telemetry_data_responses_a) { - account_sum += telemetry_data.account_count; - block_sum += telemetry_data.block_count; - cemented_sum += telemetry_data.cemented_count; - ++vendor_versions[telemetry_data.vendor_version]; - ++protocol_versions[telemetry_data.protocol_version_number]; - peer_sum += telemetry_data.peer_count; - + account_counts.insert (telemetry_data.account_count); + block_counts.insert (telemetry_data.block_count); + cemented_counts.insert (telemetry_data.cemented_count); + peer_counts.insert (telemetry_data.peer_count); + unchecked_counts.insert (telemetry_data.unchecked_count); + uptime_counts.insert (telemetry_data.uptime); // 0 has a special meaning (unlimited), don't include it in the average as it will be heavily skewed if (telemetry_data.bandwidth_cap != 0) { - bandwidth_sum += telemetry_data.bandwidth_cap; + bandwidth_counts.insert (telemetry_data.bandwidth_cap); } + + ++vendor_versions[telemetry_data.vendor_version]; + ++protocol_versions[telemetry_data.protocol_version_number]; ++bandwidth_caps[telemetry_data.bandwidth_cap]; - unchecked_sum += telemetry_data.unchecked_count; - uptime_sum += telemetry_data.uptime; ++genesis_blocks[telemetry_data.genesis_block]; } + // Remove 10% of the results from the lower and upper bounds to catch any outliers. Need at least 10 responses before any are removed. + auto num_either_side_to_remove = telemetry_data_responses_a.size () / 10; + + auto strip_outliers_and_sum = [num_either_side_to_remove](auto & counts) { + counts.erase (counts.begin (), std::next (counts.begin (), num_either_side_to_remove)); + counts.erase (std::next (counts.rbegin (), num_either_side_to_remove).base (), counts.end ()); + return std::accumulate (counts.begin (), counts.end (), nano::uint128_t (0), [](nano::uint128_t total, auto count) { + return total += count; + }); + }; + + auto account_sum = strip_outliers_and_sum (account_counts); + auto block_sum = strip_outliers_and_sum (block_counts); + auto cemented_sum = strip_outliers_and_sum (cemented_counts); + auto peer_sum = strip_outliers_and_sum (peer_counts); + auto unchecked_sum = strip_outliers_and_sum (unchecked_counts); + auto uptime_sum = strip_outliers_and_sum (uptime_counts); + auto bandwidth_sum = strip_outliers_and_sum (bandwidth_counts); + nano::telemetry_data consolidated_data; - auto size = telemetry_data_responses_a.size (); + auto size = telemetry_data_responses_a.size () - num_either_side_to_remove * 2; consolidated_data.account_count = boost::numeric_cast (account_sum / size); consolidated_data.block_count = boost::numeric_cast (block_sum / size); consolidated_data.cemented_count = boost::numeric_cast (cemented_sum / size); @@ -1221,7 +1246,7 @@ nano::telemetry_data nano::telemetry_data::consolidate (std::vector, nano::purge_singleton_pool_memory }) { diff --git a/nano/node/common.hpp b/nano/node/common.hpp index 6021803fb6..0dbb13d358 100644 --- a/nano/node/common.hpp +++ b/nano/node/common.hpp @@ -348,6 +348,7 @@ class telemetry_data nano::error serialize_json (nano::jsonconfig & json) const; nano::error deserialize_json (nano::jsonconfig & json); bool operator== (nano::telemetry_data const &) const; + bool operator!= (nano::telemetry_data const &) const; static auto constexpr size = sizeof (block_count) + sizeof (cemented_count) + sizeof (unchecked_count) + sizeof (account_count) + sizeof (bandwidth_cap) + sizeof (peer_count) + sizeof (protocol_version_number) + sizeof (vendor_version) + sizeof (uptime) + sizeof (genesis_block); }; @@ -443,6 +444,16 @@ class message_visitor virtual ~message_visitor (); }; +class telemetry_cache_cutoffs +{ +public: + static std::chrono::seconds constexpr test{ 2 }; + static std::chrono::seconds constexpr beta{ 15 }; + static std::chrono::seconds constexpr live{ 60 }; + + static std::chrono::seconds network_to_time (network_constants const & network_constants); +}; + /** Helper guard which contains all the necessary purge (remove all memory even if used) functions */ class node_singleton_memory_pool_purge_guard { diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index 4a0150d087..16ced7f597 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -3944,13 +3944,13 @@ void nano::json_handler::telemetry () if (!single_telemetry_metric_a.error) { nano::jsonconfig config_l; - auto err = single_telemetry_metric_a.data.serialize_json (config_l); + auto err = single_telemetry_metric_a.telemetry_data_time_pair.data.serialize_json (config_l); + config_l.put ("timestamp", std::chrono::duration_cast (single_telemetry_metric_a.telemetry_data_time_pair.system_last_updated.time_since_epoch ()).count ()); auto const & ptree = config_l.get_tree (); if (!err) { rpc_l->response_l.insert (rpc_l->response_l.begin (), ptree.begin (), ptree.end ()); - rpc_l->response_l.put ("cached", single_telemetry_metric_a.is_cached); } else { @@ -3976,14 +3976,18 @@ void nano::json_handler::telemetry () // setting "raw" to true returns metrics from all nodes requested. auto raw = request.get_optional ("raw"); auto output_raw = raw.value_or (false); - node.telemetry.get_metrics_random_peers_async ([rpc_l, output_raw](auto const & batched_telemetry_metrics_a) { + node.telemetry.get_metrics_peers_async ([rpc_l, output_raw](auto const & batched_telemetry_metrics_a) { if (output_raw) { + std::unordered_map telemetry_data_time_pairs; boost::property_tree::ptree metrics; - for (auto & telemetry_metrics : batched_telemetry_metrics_a.data) + for (auto & telemetry_metrics : batched_telemetry_metrics_a.telemetry_data_time_pairs) { nano::jsonconfig config_l; - auto err = telemetry_metrics.serialize_json (config_l); + auto err = telemetry_metrics.second.data.serialize_json (config_l); + config_l.put ("timestamp", std::chrono::duration_cast (telemetry_metrics.second.system_last_updated.time_since_epoch ()).count ()); + config_l.put ("address", telemetry_metrics.first.address ()); + config_l.put ("port", telemetry_metrics.first.port ()); if (!err) { metrics.push_back (std::make_pair ("", config_l.get_tree ())); @@ -3999,7 +4003,12 @@ void nano::json_handler::telemetry () else { nano::jsonconfig config_l; - auto average_telemetry_metrics = nano::telemetry_data::consolidate (batched_telemetry_metrics_a.data); + std::vector telemetry_data; + telemetry_data.reserve (batched_telemetry_metrics_a.telemetry_data_time_pairs.size ()); + std::transform (batched_telemetry_metrics_a.telemetry_data_time_pairs.begin (), batched_telemetry_metrics_a.telemetry_data_time_pairs.end (), std::back_inserter (telemetry_data), [](auto const & telemetry_data_time_pair_a) { + return telemetry_data_time_pair_a.second.data; + }); + auto average_telemetry_metrics = nano::telemetry_data::consolidate (telemetry_data); auto err = average_telemetry_metrics.serialize_json (config_l); auto const & ptree = config_l.get_tree (); @@ -4013,7 +4022,6 @@ void nano::json_handler::telemetry () } } - rpc_l->response_l.put ("cached", batched_telemetry_metrics_a.is_cached); rpc_l->response_errors (); }); } diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 149fa847f4..e3f7463c12 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -530,7 +530,7 @@ class network_message_visitor : public nano::message_visitor } nano::telemetry_ack telemetry_ack (telemetry_data); - channel->send (telemetry_ack); + channel->send (telemetry_ack, nullptr, false); } void telemetry_ack (nano::telemetry_ack const & message_a) override { @@ -605,11 +605,11 @@ bool nano::network::reachout (nano::endpoint const & endpoint_a, bool allow_loca return error; } -std::deque> nano::network::list (size_t count_a) +std::deque> nano::network::list (size_t count_a, bool include_tcp_server_channels_a, uint8_t minimum_version_a) { std::deque> result; - tcp_channels.list (result); - udp_channels.list (result); + tcp_channels.list (result, include_tcp_server_channels_a, minimum_version_a); + udp_channels.list (result, minimum_version_a); nano::random_pool_shuffle (result.begin (), result.end ()); if (result.size () > count_a) { diff --git a/nano/node/network.hpp b/nano/node/network.hpp index b6729a3733..fd1bef0c4f 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -135,10 +135,11 @@ class network final bool not_a_peer (nano::endpoint const &, bool); // Should we reach out to this endpoint with a keepalive message bool reachout (nano::endpoint const &, bool = false); - std::deque> list (size_t); + std::deque> list (size_t, bool = true, uint8_t = 0); // A list of random peers sized for the configured rebroadcast fanout std::deque> list_fanout (); void random_fill (std::array &) const; + // Note: The minimum protocol version is used after the random selection, so number of peers can be less than expected. std::unordered_set> random_set (size_t, uint8_t = 0) const; // Get the next peer for attempting a tcp bootstrap connection nano::tcp_endpoint bootstrap_peer (bool = false); diff --git a/nano/node/telemetry.cpp b/nano/node/telemetry.cpp index f0875fd21c..09e0e479f6 100644 --- a/nano/node/telemetry.cpp +++ b/nano/node/telemetry.cpp @@ -11,7 +11,17 @@ #include #include -std::chrono::milliseconds constexpr nano::telemetry_impl::cache_cutoff; +std::chrono::seconds constexpr nano::telemetry_impl::alarm_cutoff; + +namespace +{ +// This class is just a wrapper to allow a recursive lambda while properly handling memory resources +class ongoing_func_wrapper +{ +public: + std::function ongoing_func; +}; +} nano::telemetry::telemetry (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a) : network (network_a), @@ -19,6 +29,44 @@ alarm (alarm_a), worker (worker_a), batch_request (std::make_shared (network, alarm, worker)) { + // Before callbacks are called with the batch request, check if any of the single request data can be appended to give + batch_request->pre_callback_callback = [this](std::unordered_map & data_a, std::mutex & mutex_a) { + nano::lock_guard guard (this->mutex); + for (auto & single_request : single_requests) + { + nano::lock_guard guard (single_request.second.impl->mutex); + if (!single_request.second.impl->cached_telemetry_data.empty ()) + { + nano::lock_guard batch_request_guard (mutex_a); + auto it = this->batch_request->cached_telemetry_data.find (single_request.first); + if (it != this->batch_request->cached_telemetry_data.cend () && single_request.second.last_updated > it->second.last_updated) + { + it->second = single_request.second.impl->cached_telemetry_data.begin ()->second; + } + else + { + data_a.emplace (single_request.first, single_request.second.impl->cached_telemetry_data.begin ()->second); + } + } + } + + for (auto & pending : finished_single_requests) + { + nano::lock_guard batch_request_guard (mutex_a); + auto it = this->batch_request->cached_telemetry_data.find (pending.first); + if (it != this->batch_request->cached_telemetry_data.cend () && pending.second.last_updated > it->second.last_updated) + { + it->second = pending.second; + } + else + { + data_a.emplace (pending.first, pending.second); + } + } + finished_single_requests.clear (); + }; + + ongoing_req_all_peers (); } void nano::telemetry::stop () @@ -43,28 +91,64 @@ void nano::telemetry::add (nano::telemetry_data const & telemetry_data_a, nano:: } } -void nano::telemetry::get_metrics_random_peers_async (std::function const & callback_a) +void nano::telemetry::ongoing_req_all_peers () { - // These peers will only be used if there isn't an already ongoing batch telemetry request round - auto random_peers = network.random_set (network.size_sqrt (), network_params.protocol.telemetry_protocol_version_min); + auto wrapper = std::make_shared (); + // Keep calling ongoing_func while the peer is still being called + wrapper->ongoing_func = [this, telemetry_impl_w = std::weak_ptr (batch_request), wrapper]() { + if (auto batch_telemetry_impl = telemetry_impl_w.lock ()) + { + nano::lock_guard guard (this->mutex); + if (!this->stopped) + { + auto peers = this->network.list (std::numeric_limits::max (), false, network_params.protocol.telemetry_protocol_version_min); + // If exists in single_requests don't request because they will just be rejected by other peers until the next round + auto const & single_requests = this->single_requests; + peers.erase (std::remove_if (peers.begin (), peers.end (), [&single_requests](auto const & channel_a) { + return single_requests.count (channel_a->get_endpoint ()) > 0; + }), + peers.cend ()); + if (!peers.empty ()) + { + batch_telemetry_impl->get_metrics_async (peers, [](nano::telemetry_data_responses const &) { + // Intentionally empty, just using to refresh the cache + }); + } + this->alarm.add (std::chrono::steady_clock::now () + batch_telemetry_impl->cache_cutoff + batch_telemetry_impl->alarm_cutoff, wrapper->ongoing_func); + } + } + }; + + alarm.add (std::chrono::steady_clock::now () + batch_request->cache_cutoff + batch_request->alarm_cutoff, wrapper->ongoing_func); +} + +void nano::telemetry::get_metrics_peers_async (std::function const & callback_a) +{ + auto peers = network.list (std::numeric_limits::max (), false, network_params.protocol.telemetry_protocol_version_min); nano::lock_guard guard (mutex); - if (!stopped && !random_peers.empty ()) + if (!stopped && !peers.empty ()) { - batch_request->get_metrics_async (random_peers, [callback_a](nano::telemetry_data_responses const & telemetry_data_responses) { + // If exists in single_requests, don't request because they will just be rejected by other nodes, instead all it as additional values + peers.erase (std::remove_if (peers.begin (), peers.end (), [& single_requests = this->single_requests](auto const & channel_a) { + return single_requests.count (channel_a->get_endpoint ()) > 0; + }), + peers.cend ()); + + batch_request->get_metrics_async (peers, [callback_a](nano::telemetry_data_responses const & telemetry_data_responses) { callback_a (telemetry_data_responses); }); } else { const auto all_received = false; - callback_a (nano::telemetry_data_responses{ {}, false, all_received }); + callback_a (nano::telemetry_data_responses{ {}, all_received }); } } -nano::telemetry_data_responses nano::telemetry::get_metrics_random_peers () +nano::telemetry_data_responses nano::telemetry::get_metrics_peers () { std::promise promise; - get_metrics_random_peers_async ([&promise](telemetry_data_responses const & telemetry_data_responses_a) { + get_metrics_peers_async ([&promise](telemetry_data_responses const & telemetry_data_responses_a) { promise.set_value (telemetry_data_responses_a); }); @@ -74,13 +158,6 @@ nano::telemetry_data_responses nano::telemetry::get_metrics_random_peers () // After a request is made to a single peer we want to remove it from the container after the peer has not been requested for a while (cache_cutoff). void nano::telemetry::ongoing_single_request_cleanup (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data const & single_request_data_a) { - // This class is just - class ongoing_func_wrapper - { - public: - std::function ongoing_func; - }; - auto wrapper = std::make_shared (); // Keep calling ongoing_func while the peer is still being called const auto & last_updated = single_request_data_a.last_updated; @@ -90,6 +167,8 @@ void nano::telemetry::ongoing_single_request_cleanup (nano::endpoint const & end nano::lock_guard guard (this->mutex); if (std::chrono::steady_clock::now () - telemetry_impl->cache_cutoff > last_updated && telemetry_impl->callbacks.empty ()) { + // This will be picked up by the batch request next round + this->finished_single_requests[endpoint_a] = telemetry_impl->cached_telemetry_data.begin ()->second; this->single_requests.erase (endpoint_a); } else @@ -105,7 +184,6 @@ void nano::telemetry::ongoing_single_request_cleanup (nano::endpoint const & end void nano::telemetry::update_cleanup_data (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data & single_request_data_a, bool is_new_a) { - auto telemetry_impl = single_request_data_a.impl; if (is_new_a) { // Clean this request up when it isn't being used anymore @@ -120,10 +198,16 @@ void nano::telemetry::update_cleanup_data (nano::endpoint const & endpoint_a, na void nano::telemetry::get_metrics_single_peer_async (std::shared_ptr const & channel_a, std::function const & callback_a) { - auto invoke_callback_with_error = [&callback_a]() { - auto const is_cached = false; - auto const error = true; - callback_a ({ nano::telemetry_data (), is_cached, error }); + auto invoke_callback_with_error = [&callback_a, &worker = this->worker, channel_a]() { + nano::endpoint endpoint; + if (channel_a) + { + endpoint = channel_a->get_endpoint (); + } + worker.push_task ([callback_a, endpoint]() { + auto const error = true; + callback_a ({ nano::telemetry_data_time_pair{}, endpoint, error }); + }); }; nano::lock_guard guard (mutex); @@ -131,20 +215,46 @@ void nano::telemetry::get_metrics_single_peer_async (std::shared_ptrget_network_version () >= network_params.protocol.telemetry_protocol_version_min)) { + auto add_callback_async = [& worker = this->worker, &callback_a](telemetry_data_time_pair const & telemetry_data_time_pair_a, nano::endpoint const & endpoint_a) { + telemetry_data_response telemetry_data_response_l{ telemetry_data_time_pair_a, endpoint_a, false }; + worker.push_task ([telemetry_data_response_l, callback_a]() { + callback_a (telemetry_data_response_l); + }); + }; + + // First check if the batched metrics have processed this endpoint. + { + nano::lock_guard guard (batch_request->mutex); + auto it = batch_request->cached_telemetry_data.find (channel_a->get_endpoint ()); + if (it != batch_request->cached_telemetry_data.cend ()) + { + add_callback_async (it->second, it->first); + return; + } + } + // Next check single requests which finished and are awaiting batched requests + auto it = finished_single_requests.find (channel_a->get_endpoint ()); + if (it != finished_single_requests.cend ()) + { + add_callback_async (it->second, it->first); + return; + } + auto pair = single_requests.emplace (channel_a->get_endpoint (), single_request_data{ std::make_shared (network, alarm, worker), std::chrono::steady_clock::now () }); update_cleanup_data (pair.first->first, pair.first->second, pair.second); - pair.first->second.impl->get_metrics_async ({ channel_a }, [callback_a](telemetry_data_responses const & telemetry_data_responses_a) { + pair.first->second.impl->get_metrics_async ({ channel_a }, [callback_a, channel_a](telemetry_data_responses const & telemetry_data_responses_a) { // There should only be 1 response, so if this hasn't been received then conclude it is an error. auto const error = !telemetry_data_responses_a.all_received; if (!error) { - assert (telemetry_data_responses_a.data.size () == 1); - callback_a ({ telemetry_data_responses_a.data.front (), telemetry_data_responses_a.is_cached, error }); + assert (telemetry_data_responses_a.telemetry_data_time_pairs.size () == 1); + auto it = telemetry_data_responses_a.telemetry_data_time_pairs.begin (); + callback_a ({ it->second, it->first, error }); } else { - callback_a ({ nano::telemetry_data (), telemetry_data_responses_a.is_cached, error }); + callback_a ({ nano::telemetry_data_time_pair{}, channel_a->get_endpoint (), error }); } }); } @@ -183,6 +293,12 @@ size_t nano::telemetry::telemetry_data_size () return total; } +size_t nano::telemetry::finished_single_requests_size () +{ + nano::lock_guard guard (mutex); + return finished_single_requests.size (); +} + nano::telemetry_impl::telemetry_impl (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a) : network (network_a), alarm (alarm_a), @@ -190,25 +306,30 @@ worker (worker_a) { } -void nano::telemetry_impl::flush_callbacks (nano::unique_lock & lk_a, bool cached_a) +void nano::telemetry_impl::flush_callbacks_async () { - // Invoke all callbacks, it's possible that during the mutex unlock other callbacks were added, - // so check again and invoke those too - assert (lk_a.owns_lock ()); - invoking = true; - while (!callbacks.empty ()) - { - lk_a.unlock (); - invoke_callbacks (cached_a); - lk_a.lock (); - } - invoking = false; + // Post to worker so that it's truly async and not on the calling thread (same problem as std::async otherwise) + worker.push_task ([this_w = std::weak_ptr (shared_from_this ())]() { + if (auto this_l = this_w.lock ()) + { + nano::unique_lock lk (this_l->mutex); + // Invoke all callbacks, it's possible that during the mutex unlock other callbacks were added, + // so check again and invoke those too + this_l->invoking = true; + while (!this_l->callbacks.empty ()) + { + lk.unlock (); + this_l->invoke_callbacks (); + lk.lock (); + } + this_l->invoking = false; + } + }); } -void nano::telemetry_impl::get_metrics_async (std::unordered_set> const & channels_a, std::function const & callback_a) +void nano::telemetry_impl::get_metrics_async (std::deque> const & channels_a, std::function const & callback_a) { { - assert (!channels_a.empty ()); nano::unique_lock lk (mutex); callbacks.push_back (callback_a); if (callbacks.size () > 1 || invoking) @@ -218,21 +339,13 @@ void nano::telemetry_impl::get_metrics_async (std::unordered_set (shared_from_this ())]() { - if (auto this_l = this_w.lock ()) - { - nano::unique_lock lk (this_l->mutex); - const auto is_cached = true; - this_l->flush_callbacks (lk, is_cached); - } - }); + flush_callbacks_async (); return; } - all_received = true; + failed.clear (); assert (required_responses.empty ()); std::transform (channels_a.begin (), channels_a.end (), std::inserter (required_responses, required_responses.end ()), [](auto const & channel) { return channel->get_endpoint (); @@ -251,13 +364,14 @@ void nano::telemetry_impl::add (nano::telemetry_data const & telemetry_data_a, n return; } - current_telemetry_data_responses.push_back (telemetry_data_a); + current_telemetry_data_responses[endpoint_a] = { telemetry_data_a, std::chrono::steady_clock::now (), std::chrono::system_clock::now () }; channel_processed (lk, endpoint_a); } -void nano::telemetry_impl::invoke_callbacks (bool cached_a) +void nano::telemetry_impl::invoke_callbacks () { decltype (callbacks) callbacks_l; + bool all_received; { // Copy callbacks so that they can be called outside of holding the lock @@ -265,10 +379,17 @@ void nano::telemetry_impl::invoke_callbacks (bool cached_a) callbacks_l = callbacks; current_telemetry_data_responses.clear (); callbacks.clear (); + all_received = failed.empty (); } + + if (pre_callback_callback) + { + pre_callback_callback (cached_telemetry_data, mutex); + } + for (auto & callback : callbacks_l) { - callback ({ cached_telemetry_data, cached_a, all_received }); + callback ({ cached_telemetry_data, all_received }); } } @@ -282,12 +403,11 @@ void nano::telemetry_impl::channel_processed (nano::unique_lock & lk cached_telemetry_data = current_telemetry_data_responses; last_time = std::chrono::steady_clock::now (); - auto const is_cached = false; - flush_callbacks (lk_a, is_cached); + flush_callbacks_async (); } } -void nano::telemetry_impl::fire_request_messages (std::unordered_set> const & channels) +void nano::telemetry_impl::fire_request_messages (std::deque> const & channels) { uint64_t round_l; { @@ -303,6 +423,7 @@ void nano::telemetry_impl::fire_request_messages (std::unordered_setget_network_version () >= network_params.protocol.telemetry_protocol_version_min); std::weak_ptr this_w (shared_from_this ()); + // clang-format off channel->send (message, [this_w, endpoint = channel->get_endpoint ()](boost::system::error_code const & ec, size_t size_a) { if (auto this_l = this_w.lock ()) { @@ -310,20 +431,22 @@ void nano::telemetry_impl::fire_request_messages (std::unordered_set lk (this_l->mutex); - this_l->all_received = false; + this_l->failed.push_back (endpoint); this_l->channel_processed (lk, endpoint); } } - }); + }, + false); + // clang-format on // If no response is seen after a certain period of time, remove it from the list of expected responses. However, only if it is part of the same round. - alarm.add (std::chrono::steady_clock::now () + cache_cutoff, [this_w, endpoint = channel->get_endpoint (), round_l]() { + alarm.add (std::chrono::steady_clock::now () + alarm_cutoff, [this_w, endpoint = channel->get_endpoint (), round_l]() { if (auto this_l = this_w.lock ()) { nano::unique_lock lk (this_l->mutex); if (this_l->round == round_l && this_l->required_responses.find (endpoint) != this_l->required_responses.cend ()) { - this_l->all_received = false; + this_l->failed.push_back (endpoint); this_l->channel_processed (lk, endpoint); } } @@ -337,6 +460,16 @@ size_t nano::telemetry_impl::telemetry_data_size () return current_telemetry_data_responses.size (); } +bool nano::telemetry_data_time_pair::operator== (telemetry_data_time_pair const & telemetry_data_time_pair_a) const +{ + return data == telemetry_data_time_pair_a.data && last_updated == telemetry_data_time_pair_a.last_updated; +} + +bool nano::telemetry_data_time_pair::operator!= (telemetry_data_time_pair const & telemetry_data_time_pair_a) const +{ + return !(*this == telemetry_data_time_pair_a); +} + std::unique_ptr nano::collect_container_info (telemetry & telemetry, const std::string & name) { size_t single_requests_count; @@ -351,6 +484,7 @@ std::unique_ptr nano::collect_container_info (te composite->add_component (collect_container_info (*telemetry.batch_request, "batch_request")); } composite->add_component (std::make_unique (container_info{ "single_requests", single_requests_count, sizeof (decltype (telemetry.single_requests)::value_type) })); + composite->add_component (std::make_unique (container_info{ "finished_single_requests", telemetry.finished_single_requests_size (), sizeof (decltype (telemetry.finished_single_requests)::value_type) })); return composite; } diff --git a/nano/node/telemetry.hpp b/nano/node/telemetry.hpp index 72d4b1bdf9..97606d37fb 100644 --- a/nano/node/telemetry.hpp +++ b/nano/node/telemetry.hpp @@ -18,15 +18,25 @@ namespace transport class channel; } +class telemetry_data_time_pair +{ +public: + nano::telemetry_data data; + std::chrono::steady_clock::time_point last_updated; + std::chrono::system_clock::time_point system_last_updated; + bool operator== (telemetry_data_time_pair const &) const; + bool operator!= (telemetry_data_time_pair const &) const; +}; + /* * Holds a response from a telemetry request */ class telemetry_data_response { public: - nano::telemetry_data data; - bool is_cached; - bool error; + nano::telemetry_data_time_pair telemetry_data_time_pair; + nano::endpoint endpoint; + bool error{ true }; }; /* @@ -35,9 +45,8 @@ class telemetry_data_response class telemetry_data_responses { public: - std::vector data; - bool is_cached; - bool all_received; + std::unordered_map telemetry_data_time_pairs; + bool all_received{ false }; }; /* @@ -52,46 +61,58 @@ class telemetry_impl : public std::enable_shared_from_this private: // Class only available to the telemetry class - void get_metrics_async (std::unordered_set> const & channels_a, std::function const & callback_a); + void get_metrics_async (std::deque> const & channels_a, std::function const & callback_a); void add (nano::telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a); size_t telemetry_data_size (); nano::network_params network_params; - // Anything older than this requires requesting metrics from other nodes - static std::chrono::milliseconds constexpr cache_cutoff{ 3000 }; + // Anything older than this requires requesting metrics from other nodes. + std::chrono::seconds const cache_cutoff{ nano::telemetry_cache_cutoffs::network_to_time (network_params.network) }; + static std::chrono::seconds constexpr alarm_cutoff{ 3 }; // All data in this chunk is protected by this mutex std::mutex mutex; std::vector> callbacks; std::chrono::steady_clock::time_point last_time = std::chrono::steady_clock::now () - cache_cutoff; /* The responses received during this request round */ - std::vector current_telemetry_data_responses; + std::unordered_map current_telemetry_data_responses; /* The metrics for the last request round */ - std::vector cached_telemetry_data; + std::unordered_map cached_telemetry_data; std::unordered_set required_responses; uint64_t round{ 0 }; /* Currently executing callbacks */ bool invoking{ false }; - - std::atomic all_received{ true }; + std::vector failed; nano::network & network; nano::alarm & alarm; nano::worker & worker; - void invoke_callbacks (bool cached_a); + std::function & data_a, std::mutex &)> pre_callback_callback; + + void invoke_callbacks (); void channel_processed (nano::unique_lock & lk_a, nano::endpoint const & endpoint_a); - void flush_callbacks (nano::unique_lock & lk_a, bool cached_a); - void fire_request_messages (std::unordered_set> const & channels); + void flush_callbacks_async (); + void fire_request_messages (std::deque> const & channels); friend std::unique_ptr collect_container_info (telemetry_impl &, const std::string &); friend nano::telemetry; friend class node_telemetry_single_request_Test; friend class node_telemetry_basic_Test; + friend class node_telemetry_ongoing_requests_Test; }; std::unique_ptr collect_container_info (telemetry_impl & telemetry_impl, const std::string & name); +/* + * This class has 2 main operations: + * Request metrics from specific single peers (single_requests) + * - If this peer is in the batched request, it will use the value from that, otherwise send a telemetry_req message (non-droppable) + * Request metrics from all peers (batched_request) + * - This is polled every minute. + * - If a single request is currently underway, do not request because other peers will just reject if within a hotzone time + * - This will be proactively added when callbacks are called inside pre_callback_callback + */ class telemetry { public: @@ -104,14 +125,14 @@ class telemetry void add (nano::telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a); /* - * Collects metrics from square root number of peers and invokes the callback when complete. + * Collects metrics from all known peers and invokes the callback when complete. */ - void get_metrics_random_peers_async (std::function const & callback_a); + void get_metrics_peers_async (std::function const & callback_a); /* - * A blocking version of get_metrics_random_peers_async (). + * A blocking version of get_metrics_peers_async (). */ - telemetry_data_responses get_metrics_random_peers (); + telemetry_data_responses get_metrics_peers (); /* * This makes a telemetry request to the specific channel @@ -128,6 +149,11 @@ class telemetry */ size_t telemetry_data_size (); + /* + * Return the number of finished_single_requests elements + */ + size_t finished_single_requests_size (); + /* * Stop the telemetry processor */ @@ -152,10 +178,13 @@ class telemetry std::shared_ptr batch_request; /* Any requests to specific individual peers is maintained here */ std::unordered_map single_requests; + /* This holds data from single_requests after the cache is removed */ + std::unordered_map finished_single_requests; bool stopped{ false }; void update_cleanup_data (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data & single_request_data_a, bool is_new_a); void ongoing_single_request_cleanup (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data const & single_request_data_a); + void ongoing_req_all_peers (); friend class node_telemetry_multiple_single_request_clearing_Test; friend std::unique_ptr collect_container_info (telemetry &, const std::string &); diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 27e51a000c..75efc2db41 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -169,7 +169,7 @@ std::unordered_set> nano::transport::t auto index (nano::random_pool::generate_word32 (0, static_cast (peers_size - 1))); auto channel = channels.get ()[index].channel; - if (channel->get_network_version () >= min_version && !channel->server) + if (channel->get_network_version () >= min_version) { result.insert (channel); } @@ -449,13 +449,14 @@ void nano::transport::tcp_channels::ongoing_keepalive () }); } -void nano::transport::tcp_channels::list (std::deque> & deque_a) +void nano::transport::tcp_channels::list (std::deque> & deque_a, bool include_server_channels_a, uint8_t minimum_version_a) { nano::lock_guard lock (mutex); - for (auto const & channel : channels.get ()) - { - deque_a.push_back (channel.channel); - } + // clang-format off + nano::transform_if (channels.begin (), channels.end (), std::back_inserter (deque_a), + [include_server_channels_a, minimum_version_a](auto & channel_a) { return channel_a.channel->get_network_version () >= minimum_version_a && (include_server_channels_a || !channel_a.channel->server); }, + [](const auto & channel) { return channel.channel; }); + // clang-format on } void nano::transport::tcp_channels::modify (std::shared_ptr channel_a, std::function)> modify_callback_a) diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index 50cbf798ad..30f3c3db2f 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -76,6 +76,7 @@ namespace transport class tcp_channels final { friend class nano::transport::channel_tcp; + friend class node_telemetry_simultaneous_single_and_random_requests_Test; public: tcp_channels (nano::node &); @@ -100,7 +101,7 @@ namespace transport std::unique_ptr collect_container_info (std::string const &); void purge (std::chrono::steady_clock::time_point const &); void ongoing_keepalive (); - void list (std::deque> &); + void list (std::deque> &, bool = true, uint8_t = 0); void modify (std::shared_ptr, std::function)>); void update (nano::tcp_endpoint const &); // Connection start diff --git a/nano/node/transport/transport.hpp b/nano/node/transport/transport.hpp index 7c3dd751a6..02824a5554 100644 --- a/nano/node/transport/transport.hpp +++ b/nano/node/transport/transport.hpp @@ -103,6 +103,18 @@ namespace transport last_packet_sent = time_a; } + std::chrono::steady_clock::time_point get_last_telemetry_req () + { + nano::lock_guard lk (channel_mutex); + return last_telemetry_req; + } + + void set_last_telemetry_req (std::chrono::steady_clock::time_point const time_a) + { + nano::lock_guard lk (channel_mutex); + last_telemetry_req = time_a; + } + boost::optional get_node_id_optional () const { nano::lock_guard lk (channel_mutex); @@ -144,6 +156,7 @@ namespace transport std::chrono::steady_clock::time_point last_bootstrap_attempt{ std::chrono::steady_clock::time_point () }; std::chrono::steady_clock::time_point last_packet_received{ std::chrono::steady_clock::time_point () }; std::chrono::steady_clock::time_point last_packet_sent{ std::chrono::steady_clock::time_point () }; + std::chrono::steady_clock::time_point last_telemetry_req{ std::chrono::steady_clock::time_point () }; boost::optional node_id{ boost::none }; std::atomic network_version{ 0 }; diff --git a/nano/node/transport/udp.cpp b/nano/node/transport/udp.cpp index 20b382b747..5ef34c6544 100644 --- a/nano/node/transport/udp.cpp +++ b/nano/node/transport/udp.cpp @@ -434,7 +434,25 @@ class udp_message_visitor : public nano::message_visitor } void telemetry_req (nano::telemetry_req const & message_a) override { - message (message_a); + auto find_channel (node.network.udp_channels.channel (endpoint)); + if (find_channel) + { + auto is_very_first_message = find_channel->get_last_telemetry_req () == std::chrono::steady_clock::time_point{}; + auto cache_exceeded = std::chrono::steady_clock::now () >= find_channel->get_last_telemetry_req () + nano::telemetry_cache_cutoffs::network_to_time (node.network_params.network); + if (is_very_first_message || cache_exceeded) + { + node.network.udp_channels.modify (find_channel, [](std::shared_ptr channel_a) { + channel_a->set_last_telemetry_req (std::chrono::steady_clock::now ()); + }); + message (message_a); + } + else + { + node.network.udp_channels.modify (find_channel, [](std::shared_ptr channel_a) { + channel_a->set_last_packet_received (std::chrono::steady_clock::now ()); + }); + } + } } void telemetry_ack (nano::telemetry_ack const & message_a) override { @@ -688,13 +706,14 @@ void nano::transport::udp_channels::ongoing_keepalive () }); } -void nano::transport::udp_channels::list (std::deque> & deque_a) +void nano::transport::udp_channels::list (std::deque> & deque_a, uint8_t minimum_version_a) { nano::lock_guard lock (mutex); - for (auto const & channel : channels.get ()) - { - deque_a.push_back (channel.channel); - } + // clang-format off + nano::transform_if (channels.begin (), channels.end (), std::back_inserter (deque_a), + [minimum_version_a](auto & channel_a) { return channel_a.channel->get_network_version () >= minimum_version_a; }, + [](const auto & channel) { return channel.channel; }); + // clang-format on } void nano::transport::udp_channels::modify (std::shared_ptr channel_a, std::function)> modify_callback_a) diff --git a/nano/node/transport/udp.hpp b/nano/node/transport/udp.hpp index 9945546d0c..8f6a6b0898 100644 --- a/nano/node/transport/udp.hpp +++ b/nano/node/transport/udp.hpp @@ -90,7 +90,7 @@ namespace transport std::unique_ptr collect_container_info (std::string const &); void purge (std::chrono::steady_clock::time_point const &); void ongoing_keepalive (); - void list (std::deque> &); + void list (std::deque> &, uint8_t = 0); void modify (std::shared_ptr, std::function)>); nano::node & node; @@ -134,6 +134,10 @@ namespace transport { return channel->get_last_bootstrap_attempt (); } + std::chrono::steady_clock::time_point last_telemetry_req () const + { + return channel->get_last_telemetry_req (); + } boost::asio::ip::address ip_address () const { return endpoint ().address (); diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index c5e2f3361b..7f052a5bfa 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -7837,7 +7837,6 @@ namespace void compare_default_test_result_data (test_response & response, nano::node const & node_server_a) { ASSERT_EQ (200, response.status); - ASSERT_FALSE (response.json.get ("cached")); ASSERT_EQ (1, response.json.get ("block_count")); ASSERT_EQ (1, response.json.get ("cemented_count")); ASSERT_EQ (0, response.json.get ("unchecked_count")); @@ -7982,18 +7981,7 @@ TEST (rpc, node_telemetry_random) { ASSERT_NO_ERROR (system.poll ()); } - ASSERT_EQ (200, response.status); - ASSERT_FALSE (response.json.get ("cached")); - ASSERT_EQ (1, response.json.get ("block_count")); - ASSERT_EQ (1, response.json.get ("cemented_count")); - ASSERT_EQ (0, response.json.get ("unchecked_count")); - ASSERT_EQ (1, response.json.get ("account_count")); - ASSERT_EQ (node->config.bandwidth_limit, response.json.get ("bandwidth_cap")); - ASSERT_EQ (1, response.json.get ("peer_count")); - ASSERT_EQ (node->network_params.protocol.protocol_version, response.json.get ("protocol_version_number")); - ASSERT_EQ (nano::get_major_node_version (), response.json.get ("vendor_version")); - ASSERT_GE (100, response.json.get ("uptime")); - ASSERT_EQ (nano::genesis ().hash ().to_string (), response.json.get ("genesis_block")); + compare_default_test_result_data (response, *node); } request.put ("raw", "true"); @@ -8006,14 +7994,12 @@ TEST (rpc, node_telemetry_random) ASSERT_EQ (200, response.status); // This may fail if the response has taken longer than the cache cutoff time. - ASSERT_TRUE (response.json.get ("cached")); - auto & all_metrics = response.json.get_child ("metrics"); - std::vector> raw_metrics_json_l; + std::vector> raw_metrics_json_l; for (auto & metrics_pair : all_metrics) { auto & metrics = metrics_pair.second; - raw_metrics_json_l.emplace_back (metrics.get ("block_count"), metrics.get ("cemented_count"), metrics.get ("unchecked_count"), metrics.get ("account_count"), metrics.get ("bandwidth_cap"), metrics.get ("peer_count"), metrics.get ("protocol_version_number"), metrics.get ("vendor_version"), metrics.get ("uptime"), metrics.get ("genesis_block")); + raw_metrics_json_l.emplace_back (metrics.get ("block_count"), metrics.get ("cemented_count"), metrics.get ("unchecked_count"), metrics.get ("account_count"), metrics.get ("bandwidth_cap"), metrics.get ("peer_count"), metrics.get ("protocol_version_number"), metrics.get ("vendor_version"), metrics.get ("uptime"), metrics.get ("genesis_block"), metrics.get ("timestamp"), metrics.get ("address"), metrics.get ("port")); } ASSERT_EQ (1, raw_metrics_json_l.size ()); @@ -8028,4 +8014,10 @@ TEST (rpc, node_telemetry_random) ASSERT_EQ (nano::get_major_node_version (), std::get<7> (metrics)); ASSERT_GE (100, std::get<8> (metrics)); ASSERT_EQ (nano::genesis ().hash ().to_string (), std::get<9> (metrics)); + auto timestamp = std::chrono::duration_cast (std::chrono::system_clock::now ().time_since_epoch ()).count (); + ASSERT_GE (timestamp, std::get<10> (metrics)); + ASSERT_LT (timestamp - 100, std::get<10> (metrics)); + ASSERT_GE (std::chrono::duration_cast (std::chrono::system_clock::now ().time_since_epoch ()).count (), std::get<10> (metrics)); + ASSERT_EQ (node->network.endpoint ().address ().to_string (), std::get<11> (metrics)); + ASSERT_EQ (node->network.endpoint ().port (), std::get<12> (metrics)); } diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index cf1c2a678a..f45c36d47c 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -7,6 +7,8 @@ #include +#include + using namespace std::chrono_literals; TEST (system, generate_mass_activity) @@ -756,3 +758,240 @@ TEST (confirmation_height, prioritize_frontiers_overwrite) } } } + +namespace +{ +void wait_peer_connections (nano::system & system_a) +{ + system_a.deadline_set (10s); + auto peer_count = 0; + auto num_nodes = system_a.nodes.size (); + while (peer_count != num_nodes * (num_nodes - 1)) + { + ASSERT_NO_ERROR (system_a.poll ()); + peer_count = std::accumulate (system_a.nodes.cbegin (), system_a.nodes.cend (), 0, [](auto total, auto const & node) { + auto transaction = node->store.tx_begin_read (); + return total += node->store.peer_count (transaction); + }); + } +} + +class data +{ +public: + std::atomic awaiting_cache{ false }; + std::atomic keep_requesting_metrics{ true }; + std::shared_ptr node; + std::chrono::steady_clock::time_point orig_time; + std::atomic_flag orig_time_set = ATOMIC_FLAG_INIT; +}; +class shared_data +{ +public: + std::atomic done{ false }; + std::atomic count{ 0 }; + std::promise promise; + std::shared_future shared_future{ promise.get_future () }; +}; + +template +void callback_process (shared_data & shared_data_a, data & data, T & all_node_data_a, std::chrono::steady_clock::time_point last_updated) +{ + if (!data.orig_time_set.test_and_set ()) + { + data.orig_time = last_updated; + } + + if (data.awaiting_cache && data.orig_time != last_updated) + { + data.keep_requesting_metrics = false; + } + if (data.orig_time != last_updated) + { + data.awaiting_cache = true; + data.orig_time = last_updated; + } + if (--shared_data_a.count == 0 && std::all_of (all_node_data_a.begin (), all_node_data_a.end (), [](auto const & data) { return !data.keep_requesting_metrics; })) + { + shared_data_a.done = true; + shared_data_a.promise.set_value (); + } +}; +} + +namespace nano +{ +TEST (node_telemetry, ongoing_requests) +{ + nano::system system (2); + + auto node_client = system.nodes.front (); + auto node_server = system.nodes.back (); + + wait_peer_connections (system); + + ASSERT_EQ (0, node_client->telemetry.telemetry_data_size ()); + ASSERT_EQ (0, node_server->telemetry.telemetry_data_size ()); + ASSERT_EQ (0, node_client->stats.count (nano::stat::type::bootstrap, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); + ASSERT_EQ (0, node_client->stats.count (nano::stat::type::bootstrap, nano::stat::detail::telemetry_req, nano::stat::dir::out)); + + system.deadline_set (20s); + while (node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in) != 1 || node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out) != 1) + { + ASSERT_NO_ERROR (system.poll ()); + } + + // Wait till the next ongoing will be called, and add a 1s buffer for the actual processing + auto time = std::chrono::steady_clock::now (); + while (std::chrono::steady_clock::now () < (time + nano::telemetry_cache_cutoffs::test + nano::telemetry_impl::alarm_cutoff + 1s)) + { + ASSERT_NO_ERROR (system.poll ()); + } + + ASSERT_EQ (2, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); + ASSERT_EQ (2, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); + ASSERT_EQ (2, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); + ASSERT_EQ (2, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); + ASSERT_EQ (2, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); + ASSERT_EQ (2, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); +} +} + +TEST (node_telemetry, simultaneous_random_requests) +{ + const auto num_nodes = 4; + nano::system system (num_nodes); + + // Wait until peers are stored as they are done in the background + wait_peer_connections (system); + + std::vector threads; + const auto num_threads = 4; + + std::array all_data{}; + for (auto i = 0; i < num_nodes; ++i) + { + all_data[i].node = system.nodes[i]; + } + + shared_data shared_data; + + // Create a few threads where each node sends out telemetry request messages to all other nodes continuously, until the cache it reached and subsequently expired. + // The test waits until all telemetry_ack messages have been received. + for (int i = 0; i < num_threads; ++i) + { + threads.emplace_back ([&all_data, &shared_data]() { + while (std::any_of (all_data.cbegin (), all_data.cend (), [](auto const & data) { return data.keep_requesting_metrics.load (); })) + { + for (auto & data : all_data) + { + // Keep calling requesting telemetry metrics until the cache has been saved and then become outdated (after a certain period of time) for each node + if (data.keep_requesting_metrics) + { + ++shared_data.count; + data.node->telemetry.get_metrics_peers_async ([&shared_data, &data, &all_data](nano::telemetry_data_responses const & responses_a) { + callback_process (shared_data, data, all_data, responses_a.telemetry_data_time_pairs.begin ()->second.last_updated); + }); + } + std::this_thread::sleep_for (1ms); + } + } + + shared_data.shared_future.wait (); + ASSERT_EQ (shared_data.count, 0); + }); + } + + system.deadline_set (20s); + while (!shared_data.done) + { + ASSERT_NO_ERROR (system.poll ()); + } + + for (auto & thread : threads) + { + thread.join (); + } +} + +namespace nano +{ +namespace transport +{ + TEST (node_telemetry, simultaneous_single_and_random_requests) + { + const auto num_nodes = 4; + nano::system system (num_nodes); + + wait_peer_connections (system); + + std::vector threads; + const auto num_threads = 4; + + std::array node_data_single{}; + std::array node_data_random{}; + for (auto i = 0; i < num_nodes; ++i) + { + node_data_single[i].node = system.nodes[i]; + node_data_random[i].node = system.nodes[i]; + } + + shared_data shared_data_single; + shared_data shared_data_random; + + // Create a few threads where each node sends out telemetry request messages to all other nodes continuously, until the cache it reached and subsequently expired. + // The test waits until all telemetry_ack messages have been received. + for (int i = 0; i < num_threads; ++i) + { + threads.emplace_back ([&node_data_single, &node_data_random, &shared_data_single, &shared_data_random]() { + auto func = [](auto & all_node_data_a, shared_data & shared_data_a, bool single_a) { + while (std::any_of (all_node_data_a.cbegin (), all_node_data_a.cend (), [](auto const & data) { return data.keep_requesting_metrics.load (); })) + { + for (auto & data : all_node_data_a) + { + // Keep calling get_metrics_async until the cache has been saved and then become outdated (after a certain period of time) for each node + if (data.keep_requesting_metrics) + { + ++shared_data_a.count; + + if (single_a) + { + // Pick first peer to be consistent + auto peer = data.node->network.tcp_channels.channels[0].channel; + data.node->telemetry.get_metrics_single_peer_async (peer, [&shared_data_a, &data, &all_node_data_a](nano::telemetry_data_response const & telemetry_data_response_a) { + callback_process (shared_data_a, data, all_node_data_a, telemetry_data_response_a.telemetry_data_time_pair.last_updated); + }); + } + else + { + data.node->telemetry.get_metrics_peers_async ([&shared_data_a, &data, &all_node_data_a](nano::telemetry_data_responses const & telemetry_data_responses_a) { + callback_process (shared_data_a, data, all_node_data_a, telemetry_data_responses_a.telemetry_data_time_pairs.begin ()->second.last_updated); + }); + } + } + std::this_thread::sleep_for (1ms); + } + } + + shared_data_a.shared_future.wait (); + ASSERT_EQ (shared_data_a.count, 0); + }; + + func (node_data_single, shared_data_single, true); + func (node_data_random, shared_data_random, false); + }); + } + + system.deadline_set (30s); + while (!shared_data_random.done || !shared_data_single.done) + { + ASSERT_NO_ERROR (system.poll ()); + } + + for (auto & thread : threads) + { + thread.join (); + } + } +} +} From 24d3615bef6f91001a3f920598878f5df839c7dd Mon Sep 17 00:00:00 2001 From: Wesley Shillingford Date: Wed, 29 Jan 2020 09:33:35 +0000 Subject: [PATCH 02/12] Update out of date comment --- nano/node/common.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nano/node/common.cpp b/nano/node/common.cpp index cdaf9f014c..cfdcea4142 100644 --- a/nano/node/common.cpp +++ b/nano/node/common.cpp @@ -1160,7 +1160,7 @@ nano::telemetry_data nano::telemetry_data::consolidate (std::vector bandwidth_caps; std::unordered_map genesis_blocks; - // Use a trimmed average which excludes the upper and lower 5% of the results + // Use a trimmed average which excludes the upper and lower 10% of the results std::multiset account_counts; std::multiset block_counts; std::multiset cemented_counts; From 859e745f69ba7e9abf2fcdab1440c353c8b0426f Mon Sep 17 00:00:00 2001 From: Wesley Shillingford Date: Mon, 3 Feb 2020 14:55:09 +0000 Subject: [PATCH 03/12] Formatting --- nano/node/json_handler.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index 6a78cea585..f62dc482b3 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -4010,7 +4010,7 @@ void nano::json_handler::telemetry () auto average_telemetry_metrics = nano::consolidate_telemetry_data_time_pairs (telemetry_data_time_pairs); auto err = average_telemetry_metrics.data.serialize_json (config_l); - config_l.put ("timestamp", std::chrono::duration_cast ( average_telemetry_metrics.system_last_updated.time_since_epoch ()).count ()); + config_l.put ("timestamp", std::chrono::duration_cast (average_telemetry_metrics.system_last_updated.time_since_epoch ()).count ()); auto const & ptree = config_l.get_tree (); if (!err) From 9bc3de168b123a7c01482dabd0982a5ec47d2c6d Mon Sep 17 00:00:00 2001 From: Wesley Shillingford Date: Mon, 3 Feb 2020 15:59:46 +0000 Subject: [PATCH 04/12] Fix clang build on actions with long std::tuple --- nano/rpc_test/rpc.cpp | 61 +++++++++++++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 19 deletions(-) diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index 90716e8ce4..2b6700126e 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -8000,30 +8000,53 @@ TEST (rpc, node_telemetry_all) // This may fail if the response has taken longer than the cache cutoff time. auto & all_metrics = response.json.get_child ("metrics"); - std::vector> raw_metrics_json_l; + + class telemetry_response_data + { + public: + uint64_t block_count; + uint64_t cemented_count; + uint64_t unchecked_count; + uint64_t account_count; + uint64_t bandwidth_cap; + uint32_t peer_count; + uint8_t protocol_version; + uint64_t uptime; + std::string genesis_block; + uint8_t major_version; + uint8_t minor_version; + uint8_t patch_version; + uint8_t pre_release_version; + uint8_t maker; + uint64_t timestamp; + std::string address; + uint16_t port; + }; + + std::vector raw_metrics_json_l; for (auto & metrics_pair : all_metrics) { auto & metrics = metrics_pair.second; - raw_metrics_json_l.emplace_back (metrics.get ("block_count"), metrics.get ("cemented_count"), metrics.get ("unchecked_count"), metrics.get ("account_count"), metrics.get ("bandwidth_cap"), metrics.get ("peer_count"), metrics.get ("protocol_version"), metrics.get ("uptime"), metrics.get ("genesis_block"), metrics.get ("major_version"), metrics.get ("minor_version"), metrics.get ("patch_version"), metrics.get ("pre_release_version"), metrics.get ("maker"), metrics.get ("timestamp"), metrics.get ("address"), metrics.get ("port")); + raw_metrics_json_l.push_back ({ metrics.get ("block_count"), metrics.get ("cemented_count"), metrics.get ("unchecked_count"), metrics.get ("account_count"), metrics.get ("bandwidth_cap"), metrics.get ("peer_count"), metrics.get ("protocol_version"), metrics.get ("uptime"), metrics.get ("genesis_block"), metrics.get ("major_version"), metrics.get ("minor_version"), metrics.get ("patch_version"), metrics.get ("pre_release_version"), metrics.get ("maker"), metrics.get ("timestamp"), metrics.get ("address"), metrics.get ("port") }); } ASSERT_EQ (1, raw_metrics_json_l.size ()); auto const & metrics = raw_metrics_json_l.front (); - ASSERT_EQ (1, std::get<0> (metrics)); - ASSERT_EQ (1, std::get<1> (metrics)); - ASSERT_EQ (0, std::get<2> (metrics)); - ASSERT_EQ (1, std::get<3> (metrics)); - ASSERT_EQ (node->config.bandwidth_limit, std::get<4> (metrics)); - ASSERT_EQ (1, std::get<5> (metrics)); - ASSERT_EQ (node->network_params.protocol.protocol_version, std::get<6> (metrics)); - ASSERT_GE (100, std::get<7> (metrics)); - ASSERT_EQ (nano::genesis ().hash ().to_string (), std::get<8> (metrics)); - ASSERT_EQ (nano::get_major_node_version (), std::get<9> (metrics)); - ASSERT_EQ (nano::get_minor_node_version (), std::get<10> (metrics)); - ASSERT_EQ (nano::get_patch_node_version (), std::get<11> (metrics)); - ASSERT_EQ (nano::get_pre_release_node_version (), std::get<12> (metrics)); - ASSERT_EQ (0, std::get<13> (metrics)); - ASSERT_GE (std::chrono::duration_cast (std::chrono::system_clock::now ().time_since_epoch ()).count (), std::get<14> (metrics)); - ASSERT_EQ (node->network.endpoint ().address ().to_string (), std::get<15> (metrics)); - ASSERT_EQ (node->network.endpoint ().port (), std::get<16> (metrics)); + ASSERT_EQ (1, metrics.block_count); + ASSERT_EQ (1, metrics.cemented_count); + ASSERT_EQ (0, metrics.unchecked_count); + ASSERT_EQ (1, metrics.account_count); + ASSERT_EQ (node->config.bandwidth_limit, metrics.bandwidth_cap); + ASSERT_EQ (1, metrics.peer_count); + ASSERT_EQ (node->network_params.protocol.protocol_version, metrics.protocol_version); + ASSERT_GE (100, metrics.uptime); + ASSERT_EQ (nano::genesis ().hash ().to_string (), metrics.genesis_block); + ASSERT_EQ (nano::get_major_node_version (), metrics.major_version); + ASSERT_EQ (nano::get_minor_node_version (), metrics.minor_version); + ASSERT_EQ (nano::get_patch_node_version (), metrics.patch_version); + ASSERT_EQ (nano::get_pre_release_node_version (), metrics.pre_release_version); + ASSERT_EQ (0, metrics.maker); + ASSERT_GE (std::chrono::duration_cast (std::chrono::system_clock::now ().time_since_epoch ()).count (), metrics.timestamp); + ASSERT_EQ (node->network.endpoint ().address ().to_string (), metrics.address); + ASSERT_EQ (node->network.endpoint ().port (), metrics.port); } From 179d029a5a10831c086de6c616f9b5f2a0f3f708 Mon Sep 17 00:00:00 2001 From: Wesley Shillingford Date: Mon, 3 Feb 2020 16:23:08 +0000 Subject: [PATCH 05/12] Allow square brackets in ipv6 address in RPC --- nano/node/json_handler.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index f62dc482b3..67c0f60b60 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -3910,6 +3910,12 @@ void nano::json_handler::telemetry () uint16_t port; if (!nano::parse_port (*port_text, port)) { + if (!address_text->empty () && address_text->front () == '[' && address_text->back () == ']') + { + // Chop the square brackets off as make_address_v6 doesn't always like them + address_text = address_text->substr (1, address_text->size () - 2); + } + boost::system::error_code address_ec; auto address (boost::asio::ip::make_address_v6 (*address_text, address_ec)); if (!address_ec) From 6e95e1d73c481b5fc96251ebfc255d4a6a0461e8 Mon Sep 17 00:00:00 2001 From: Wesley Shillingford Date: Tue, 4 Feb 2020 10:44:12 +0000 Subject: [PATCH 06/12] Merge with develop --- nano/core_test/active_transactions.cpp | 3 +- nano/core_test/block_store.cpp | 12 +++--- nano/core_test/confirmation_height.cpp | 6 +-- nano/core_test/confirmation_solicitor.cpp | 3 +- nano/core_test/memory_pool.cpp | 2 +- nano/core_test/node.cpp | 50 ++++++++++++++++------- nano/core_test/testutil.hpp | 2 + nano/node/bootstrap/bootstrap_server.cpp | 2 +- nano/node/json_handler.cpp | 2 +- nano/node/network.cpp | 15 ++++--- nano/node/network.hpp | 4 +- nano/node/repcrawler.cpp | 2 +- nano/node/telemetry.cpp | 2 +- nano/node/transport/tcp.cpp | 18 ++++---- nano/node/transport/tcp.hpp | 8 ++-- nano/rpc_test/rpc.cpp | 20 ++++----- nano/secure/common.cpp | 15 +++++-- nano/secure/common.hpp | 1 + nano/slow_test/node.cpp | 3 +- 19 files changed, 99 insertions(+), 71 deletions(-) diff --git a/nano/core_test/active_transactions.cpp b/nano/core_test/active_transactions.cpp index 34936344f4..f5bb667ba3 100644 --- a/nano/core_test/active_transactions.cpp +++ b/nano/core_test/active_transactions.cpp @@ -40,10 +40,9 @@ TEST (active_transactions, adjusted_difficulty_priority) node_config.enable_voting = false; node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; auto & node1 = *system.add_node (node_config); - nano::genesis genesis; nano::keypair key1, key2, key3; - auto send1 (std::make_shared (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - 10 * nano::xrb_ratio, key1.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (genesis.hash ()))); + auto send1 (std::make_shared (nano::test_genesis_key.pub, nano::genesis_hash, nano::test_genesis_key.pub, nano::genesis_amount - 10 * nano::xrb_ratio, key1.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (nano::genesis_hash))); auto send2 (std::make_shared (nano::test_genesis_key.pub, send1->hash (), nano::test_genesis_key.pub, nano::genesis_amount - 20 * nano::xrb_ratio, key2.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send1->hash ()))); auto open1 (std::make_shared (key1.pub, 0, key1.pub, 10 * nano::xrb_ratio, send1->hash (), key1.prv, key1.pub, *system.work.generate (key1.pub))); auto open2 (std::make_shared (key2.pub, 0, key2.pub, 10 * nano::xrb_ratio, send2->hash (), key2.prv, key2.pub, *system.work.generate (key2.pub))); diff --git a/nano/core_test/block_store.cpp b/nano/core_test/block_store.cpp index 145c9ed436..e51cc5a7bf 100644 --- a/nano/core_test/block_store.cpp +++ b/nano/core_test/block_store.cpp @@ -969,7 +969,7 @@ TEST (mdb_block_store, upgrade_v6_v7) nano::ledger_cache ledger_cache; store.initialize (transaction, genesis, ledger_cache); store.version_put (transaction, 6); - modify_account_info_to_v13 (store, transaction, nano::genesis_account, genesis.open->hash ()); + modify_account_info_to_v13 (store, transaction, nano::genesis_account, nano::genesis_hash); auto send1 (std::make_shared (0, 0, 0, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0)); store.unchecked_put (transaction, send1->hash (), send1); store.flush (transaction); @@ -1083,7 +1083,7 @@ TEST (block_store, sequence_flush_by_hash) auto transaction (store->tx_begin_write ()); nano::keypair key1; std::vector blocks1; - blocks1.push_back (nano::genesis ().hash ()); + blocks1.push_back (nano::genesis_hash); blocks1.push_back (1234); blocks1.push_back (5678); auto vote1 (store->vote_generate (transaction, key1.pub, key1.prv, blocks1)); @@ -1163,7 +1163,7 @@ TEST (mdb_block_store, upgrade_sideband_genesis) store.version_put (transaction, 11); nano::ledger_cache ledger_cache; store.initialize (transaction, genesis, ledger_cache); - modify_account_info_to_v13 (store, transaction, nano::genesis_account, genesis.open->hash ()); + modify_account_info_to_v13 (store, transaction, nano::genesis_account, nano::genesis_hash); nano::block_sideband sideband; auto genesis_block (store.block_get (transaction, genesis.hash (), &sideband)); ASSERT_NE (nullptr, genesis_block); @@ -1346,7 +1346,7 @@ TEST (mdb_block_store, upgrade_sideband_epoch) nano::mdb_val value; ASSERT_FALSE (mdb_get (store.env.tx (transaction), store.state_blocks_v1, nano::mdb_val (hash2), value)); - ASSERT_FALSE (mdb_get (store.env.tx (transaction), store.open_blocks, nano::mdb_val (genesis.open->hash ()), value)); + ASSERT_FALSE (mdb_get (store.env.tx (transaction), store.open_blocks, nano::mdb_val (nano::genesis_hash), value)); ASSERT_FALSE (mdb_dbi_open (store.env.tx (transaction), "accounts_v1", MDB_CREATE, &store.accounts_v1)); modify_account_info_to_v13 (store, transaction, nano::genesis_account, hash2); @@ -1582,7 +1582,7 @@ TEST (mdb_block_store, upgrade_v13_v14) nano::account_info account_info; ASSERT_FALSE (store.account_get (transaction, nano::genesis_account, account_info)); store.version_put (transaction, 13); - modify_account_info_to_v13 (store, transaction, nano::genesis_account, genesis.open->hash ()); + modify_account_info_to_v13 (store, transaction, nano::genesis_account, nano::genesis_hash); // This should fail as sizes are no longer correct for account_info_v14 nano::mdb_val value; @@ -1917,7 +1917,7 @@ TEST (mdb_block_store, upgrade_confirmation_height_many) store.version_put (transaction, 13); nano::ledger_cache ledger_cache; store.initialize (transaction, genesis, ledger_cache); - modify_account_info_to_v13 (store, transaction, nano::genesis_account, genesis.open->hash ()); + modify_account_info_to_v13 (store, transaction, nano::genesis_account, nano::genesis_hash); // Add many accounts for (auto i = 0; i < total_num_accounts - 1; ++i) diff --git a/nano/core_test/confirmation_height.cpp b/nano/core_test/confirmation_height.cpp index f73662d06b..fab2e8fd97 100644 --- a/nano/core_test/confirmation_height.cpp +++ b/nano/core_test/confirmation_height.cpp @@ -34,7 +34,7 @@ TEST (confirmation_height, single) auto transaction = node->store.tx_begin_read (); ASSERT_FALSE (node->store.confirmation_height_get (transaction, nano::test_genesis_key.pub, confirmation_height_info)); ASSERT_EQ (1, confirmation_height_info.height); - ASSERT_EQ (nano::genesis ().hash (), confirmation_height_info.frontier); + ASSERT_EQ (nano::genesis_hash, confirmation_height_info.frontier); node->process_active (send1); node->block_processor.flush (); @@ -123,7 +123,7 @@ TEST (confirmation_height, multiple_accounts) nano::confirmation_height_info confirmation_height_info; ASSERT_FALSE (node->store.confirmation_height_get (transaction, nano::test_genesis_key.pub, confirmation_height_info)); ASSERT_EQ (1, confirmation_height_info.height); - ASSERT_EQ (nano::genesis ().hash (), confirmation_height_info.frontier); + ASSERT_EQ (nano::genesis_hash, confirmation_height_info.frontier); ASSERT_FALSE (node->store.confirmation_height_get (transaction, key1.pub, confirmation_height_info)); ASSERT_EQ (0, confirmation_height_info.height); ASSERT_EQ (nano::block_hash (0), confirmation_height_info.frontier); @@ -317,7 +317,7 @@ TEST (confirmation_height, gap_live) nano::confirmation_height_info confirmation_height_info; ASSERT_FALSE (node->store.confirmation_height_get (transaction, nano::test_genesis_key.pub, confirmation_height_info)); ASSERT_EQ (1, confirmation_height_info.height); - ASSERT_EQ (nano::genesis ().hash (), confirmation_height_info.frontier); + ASSERT_EQ (nano::genesis_hash, confirmation_height_info.frontier); } // Now complete the chain where the block comes in on the live network diff --git a/nano/core_test/confirmation_solicitor.cpp b/nano/core_test/confirmation_solicitor.cpp index 4d83b7b2ef..cb4b512249 100644 --- a/nano/core_test/confirmation_solicitor.cpp +++ b/nano/core_test/confirmation_solicitor.cpp @@ -30,8 +30,7 @@ TEST (confirmation_solicitor, batches) ASSERT_EQ (1, representatives.size ()); ASSERT_EQ (channel1, representatives.front ().channel); ASSERT_EQ (nano::test_genesis_key.pub, representatives.front ().account); - nano::genesis genesis; - auto send (std::make_shared (genesis.open->hash (), nano::keypair ().pub, nano::genesis_amount - 100, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (genesis.open->hash ()))); + auto send (std::make_shared (nano::genesis_hash, nano::keypair ().pub, nano::genesis_amount - 100, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (nano::genesis_hash))); for (size_t i (0); i < nano::network::confirm_req_hashes_max; ++i) { auto election (std::make_shared (node2, send, false, nullptr)); diff --git a/nano/core_test/memory_pool.cpp b/nano/core_test/memory_pool.cpp index e9e5ae8e2b..0a97764a71 100644 --- a/nano/core_test/memory_pool.cpp +++ b/nano/core_test/memory_pool.cpp @@ -49,7 +49,7 @@ size_t get_allocated_size () { std::vector allocated; record_allocations_new_delete_allocator alloc (&allocated); - std::allocate_shared> (alloc); + (void)std::allocate_shared> (alloc); assert (allocated.size () == 1); return allocated.front (); } diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index ae909967dd..224a5062d0 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -2742,14 +2742,13 @@ TEST (node, local_votes_cache_size) node_config.vote_minimum = 0; // wallet will pick up the second account as voting even if unopened auto & node (*system.add_node (node_config)); ASSERT_EQ (node.network_params.voting.max_cache, 2); // effective cache size is 1 with 2 voting accounts - nano::genesis genesis; nano::keypair key; auto & wallet (*system.wallet (0)); wallet.insert_adhoc (nano::test_genesis_key.prv); wallet.insert_adhoc (nano::keypair ().prv); ASSERT_EQ (2, node.wallets.rep_counts ().voting); auto transaction (node.store.tx_begin_read ()); - auto vote1 (node.store.vote_generate (transaction, nano::test_genesis_key.pub, nano::test_genesis_key.prv, { genesis.open->hash () })); + auto vote1 (node.store.vote_generate (transaction, nano::test_genesis_key.pub, nano::test_genesis_key.prv, { nano::genesis_hash })); nano::block_hash hash (1); auto vote2 (node.store.vote_generate (transaction, nano::test_genesis_key.pub, nano::test_genesis_key.prv, { hash })); node.votes_cache.add (vote1); @@ -2757,7 +2756,7 @@ TEST (node, local_votes_cache_size) auto existing2 (node.votes_cache.find (hash)); ASSERT_EQ (1, existing2.size ()); ASSERT_EQ (vote2, existing2.front ()); - ASSERT_EQ (0, node.votes_cache.find (genesis.open->hash ()).size ()); + ASSERT_EQ (0, node.votes_cache.find (nano::genesis_hash).size ()); } TEST (node, vote_republish) @@ -3428,16 +3427,15 @@ TEST (node, dont_write_lock_node) finished_promise.set_value (); } -// Test is unstable on github actions for windows, disable if CI detected -#if (defined(_WIN32) && CI) -TEST (node, DISABLED_bidirectional_tcp) -#else TEST (node, bidirectional_tcp) -#endif { nano::system system; nano::node_flags node_flags; node_flags.disable_udp = true; // Disable UDP connections + // Disable bootstrap to start elections for new blocks + node_flags.disable_legacy_bootstrap = true; + node_flags.disable_lazy_bootstrap = true; + node_flags.disable_wallet_bootstrap = true; nano::node_config node_config (nano::get_available_port (), system.logging); node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; auto node1 = system.add_node (node_config, node_flags); @@ -3466,17 +3464,32 @@ TEST (node, bidirectional_tcp) { ASSERT_NO_ERROR (system.poll ()); } - // Test block confirmation from node 1 + // Test block confirmation from node 1 (add representative to node 1) system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); + // Wait to find new reresentative + system.deadline_set (10s); + while (node2->rep_crawler.representative_count () == 0) + { + ASSERT_NO_ERROR (system.poll ()); + } + /* Wait for confirmation + To check connection we need only node 2 confirmation status + Node 1 election can be unconfirmed because representative private key was inserted after election start (and node 2 isn't flooding new votes to principal representatives) */ bool confirmed (false); system.deadline_set (10s); while (!confirmed) { - auto transaction1 (node1->store.tx_begin_read ()); auto transaction2 (node2->store.tx_begin_read ()); - confirmed = node1->ledger.block_confirmed (transaction1, send1->hash ()) && node2->ledger.block_confirmed (transaction2, send1->hash ()); + confirmed = node2->ledger.block_confirmed (transaction2, send1->hash ()); ASSERT_NO_ERROR (system.poll ()); } + // Test block propagation & confirmation from node 2 (remove representative from node 1) + { + auto transaction (system.wallet (0)->wallets.tx_begin_write ()); + system.wallet (0)->store.erase (transaction, nano::test_genesis_key.pub); + } + /* Test block propagation from node 2 + Node 2 has only ephemeral TCP port open. Node 1 cannot establish connection to node 2 listening port */ auto send2 (std::make_shared (nano::test_genesis_key.pub, send1->hash (), nano::test_genesis_key.pub, nano::genesis_amount - 2 * nano::Gxrb_ratio, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node1->work_generate_blocking (send1->hash ()))); node2->process_active (send2); node2->block_processor.flush (); @@ -3485,14 +3498,23 @@ TEST (node, bidirectional_tcp) { ASSERT_NO_ERROR (system.poll ()); } - // Test block confirmation from node 2 + // Test block confirmation from node 2 (add representative to node 2) + system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv); + // Wait to find changed reresentative + system.deadline_set (10s); + while (node1->rep_crawler.representative_count () == 0) + { + ASSERT_NO_ERROR (system.poll ()); + } + /* Wait for confirmation + To check connection we need only node 1 confirmation status + Node 2 election can be unconfirmed because representative private key was inserted after election start (and node 1 isn't flooding new votes to principal representatives) */ confirmed = false; system.deadline_set (20s); while (!confirmed) { auto transaction1 (node1->store.tx_begin_read ()); - auto transaction2 (node2->store.tx_begin_read ()); - confirmed = node1->ledger.block_confirmed (transaction1, send2->hash ()) && node2->ledger.block_confirmed (transaction2, send2->hash ()); + confirmed = node1->ledger.block_confirmed (transaction1, send2->hash ()); ASSERT_NO_ERROR (system.poll ()); } } diff --git a/nano/core_test/testutil.hpp b/nano/core_test/testutil.hpp index afd7849a58..24202a4ccc 100644 --- a/nano/core_test/testutil.hpp +++ b/nano/core_test/testutil.hpp @@ -38,10 +38,12 @@ namespace nano using uint128_t = boost::multiprecision::uint128_t; class keypair; class public_key; +class block_hash; extern nano::keypair const & zero_key; extern nano::keypair const & test_genesis_key; extern std::string const & nano_test_genesis; extern std::string const & genesis_block; +extern nano::block_hash const & genesis_hash; extern nano::public_key const & nano_test_account; extern nano::public_key const & genesis_account; extern nano::public_key const & burn_account; diff --git a/nano/node/bootstrap/bootstrap_server.cpp b/nano/node/bootstrap/bootstrap_server.cpp index 36e86f4ed2..a2a7441ab3 100644 --- a/nano/node/bootstrap/bootstrap_server.cpp +++ b/nano/node/bootstrap/bootstrap_server.cpp @@ -118,7 +118,7 @@ nano::bootstrap_server::~bootstrap_server () auto exisiting_response_channel (node->network.tcp_channels.find_channel (remote_endpoint)); if (exisiting_response_channel != nullptr) { - exisiting_response_channel->server = false; + exisiting_response_channel->temporary = false; node->network.tcp_channels.erase (remote_endpoint); } } diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index 67c0f60b60..a5ad885a64 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -4230,7 +4230,7 @@ void nano::json_handler::version () response_l.put ("node_vendor", boost::str (boost::format ("Nano %1%") % NANO_VERSION_STRING)); response_l.put ("store_vendor", node.store.vendor_get ()); response_l.put ("network", node.network_params.network.get_current_network_as_string ()); - response_l.put ("network_identifier", nano::genesis ().hash ().to_string ()); + response_l.put ("network_identifier", node.network_params.ledger.genesis_hash.to_string ()); response_l.put ("build_info", BUILD_INFO); response_errors (); } diff --git a/nano/node/network.cpp b/nano/node/network.cpp index df10da2a1c..672d81efcd 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -105,6 +105,9 @@ void nano::network::send_keepalive (std::shared_ptr ch void nano::network::send_keepalive_self (std::shared_ptr channel_a) { nano::keepalive message; + random_fill (message.peers); + // Replace part of message with node external address or listening port + message.peers[1] = nano::endpoint (boost::asio::ip::address_v6{}, 0); // For node v19 (response channels) if (node.config.external_address != boost::asio::ip::address_v6{}.to_string () && node.config.external_port != 0) { message.peers[0] = nano::endpoint (boost::asio::ip::make_address_v6 (node.config.external_address), node.config.external_port); @@ -542,7 +545,7 @@ class network_message_visitor : public nano::message_visitor telemetry_data.protocol_version = node.network_params.protocol.protocol_version; telemetry_data.uptime = std::chrono::duration_cast (std::chrono::steady_clock::now () - node.startup_time).count (); telemetry_data.unchecked_count = node.ledger.cache.unchecked_count; - telemetry_data.genesis_block = nano::genesis ().hash (); + telemetry_data.genesis_block = node.network_params.ledger.genesis_hash; telemetry_data.peer_count = node.network.size (); telemetry_data.account_count = node.ledger.cache.account_count; telemetry_data.major_version = nano::get_major_node_version (); @@ -628,10 +631,10 @@ bool nano::network::reachout (nano::endpoint const & endpoint_a, bool allow_loca return error; } -std::deque> nano::network::list (size_t count_a, bool include_tcp_server_channels_a, uint8_t minimum_version_a) +std::deque> nano::network::list (size_t count_a, uint8_t minimum_version_a, bool include_tcp_temporary_channels_a) { std::deque> result; - tcp_channels.list (result, include_tcp_server_channels_a, minimum_version_a); + tcp_channels.list (result, minimum_version_a, include_tcp_temporary_channels_a); udp_channels.list (result, minimum_version_a); nano::random_pool_shuffle (result.begin (), result.end ()); if (result.size () > count_a) @@ -664,9 +667,9 @@ size_t nano::network::fanout (float scale) const return static_cast (std::ceil (scale * size_sqrt ())); } -std::unordered_set> nano::network::random_set (size_t count_a, uint8_t min_version_a) const +std::unordered_set> nano::network::random_set (size_t count_a, uint8_t min_version_a, bool include_temporary_channels_a) const { - std::unordered_set> result (tcp_channels.random_set (count_a, min_version_a)); + std::unordered_set> result (tcp_channels.random_set (count_a, min_version_a, include_temporary_channels_a)); std::unordered_set> udp_random (udp_channels.random_set (count_a, min_version_a)); for (auto i (udp_random.begin ()), n (udp_random.end ()); i != n && result.size () < count_a * 1.5; ++i) { @@ -681,7 +684,7 @@ std::unordered_set> nano::network::ran void nano::network::random_fill (std::array & target_a) const { - auto peers (random_set (target_a.size ())); + auto peers (random_set (target_a.size (), 0, false)); // Don't include channels with ephemeral remote ports assert (peers.size () <= target_a.size ()); auto endpoint (nano::endpoint (boost::asio::ip::address_v6{}, 0)); assert (endpoint.address ().is_v6 ()); diff --git a/nano/node/network.hpp b/nano/node/network.hpp index 2695ab3407..d76e8eedeb 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -132,13 +132,13 @@ class network final bool not_a_peer (nano::endpoint const &, bool); // Should we reach out to this endpoint with a keepalive message bool reachout (nano::endpoint const &, bool = false); - std::deque> list (size_t, bool = true, uint8_t = 0); + std::deque> list (size_t, uint8_t = 0, bool = true); std::deque> list_non_pr (size_t); // Desired fanout for a given scale size_t fanout (float scale = 1.0f) const; void random_fill (std::array &) const; // Note: The minimum protocol version is used after the random selection, so number of peers can be less than expected. - std::unordered_set> random_set (size_t, uint8_t = 0) const; + std::unordered_set> random_set (size_t, uint8_t = 0, bool = false) const; // Get the next peer for attempting a tcp bootstrap connection nano::tcp_endpoint bootstrap_peer (bool = false); nano::endpoint endpoint (); diff --git a/nano/node/repcrawler.cpp b/nano/node/repcrawler.cpp index 8e30473bdc..a805ecdce4 100644 --- a/nano/node/repcrawler.cpp +++ b/nano/node/repcrawler.cpp @@ -139,7 +139,7 @@ std::vector> nano::rep_crawler::get_cr required_peer_count += required_peer_count / 2; // The rest of the endpoints are picked randomly - auto random_peers (node.network.random_set (required_peer_count)); + auto random_peers (node.network.random_set (required_peer_count, 0, true)); // Include channels with ephemeral remote ports std::vector> result; result.insert (result.end (), random_peers.begin (), random_peers.end ()); return result; diff --git a/nano/node/telemetry.cpp b/nano/node/telemetry.cpp index 952ae9b603..2324d2abad 100644 --- a/nano/node/telemetry.cpp +++ b/nano/node/telemetry.cpp @@ -127,7 +127,7 @@ void nano::telemetry::ongoing_req_all_peers () void nano::telemetry::get_metrics_peers_async (std::function const & callback_a) { - auto peers = network.list (std::numeric_limits::max (), false, network_params.protocol.telemetry_protocol_version_min); + auto peers = network.list (std::numeric_limits::max (), network_params.protocol.telemetry_protocol_version_min, false); nano::lock_guard guard (mutex); if (!stopped && !peers.empty ()) { diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 75efc2db41..171433b2e3 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -16,7 +16,7 @@ nano::transport::channel_tcp::~channel_tcp () // Close socket. Exception: socket is used by bootstrap_server if (auto socket_l = socket.lock ()) { - if (!server) + if (!temporary) { socket_l->close (); } @@ -111,7 +111,7 @@ bool nano::transport::tcp_channels::insert (std::shared_ptr ().end ()) { auto node_id (channel_a->get_node_id ()); - if (!channel_a->server) + if (!channel_a->temporary) { channels.get ().erase (node_id); } @@ -152,7 +152,7 @@ std::shared_ptr nano::transport::tcp_channels::fin return result; } -std::unordered_set> nano::transport::tcp_channels::random_set (size_t count_a, uint8_t min_version) const +std::unordered_set> nano::transport::tcp_channels::random_set (size_t count_a, uint8_t min_version, bool include_temporary_channels_a) const { std::unordered_set> result; result.reserve (count_a); @@ -169,7 +169,7 @@ std::unordered_set> nano::transport::t auto index (nano::random_pool::generate_word32 (0, static_cast (peers_size - 1))); auto channel = channels.get ()[index].channel; - if (channel->get_network_version () >= min_version) + if (channel->get_network_version () >= min_version && (include_temporary_channels_a || !channel->temporary)) { result.insert (channel); } @@ -284,7 +284,7 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa temporary_channel->set_network_version (message_a.header.version_using); temporary_channel->set_last_packet_received (std::chrono::steady_clock::now ()); temporary_channel->set_last_packet_sent (std::chrono::steady_clock::now ()); - temporary_channel->server = true; + temporary_channel->temporary = true; assert (type_a == nano::bootstrap_server_type::realtime || type_a == nano::bootstrap_server_type::realtime_response_server); // Don't insert temporary channels for response_server if (type_a == nano::bootstrap_server_type::realtime) @@ -449,12 +449,12 @@ void nano::transport::tcp_channels::ongoing_keepalive () }); } -void nano::transport::tcp_channels::list (std::deque> & deque_a, bool include_server_channels_a, uint8_t minimum_version_a) +void nano::transport::tcp_channels::list (std::deque> & deque_a, uint8_t minimum_version_a, bool include_temporary_channels_a) { nano::lock_guard lock (mutex); // clang-format off - nano::transform_if (channels.begin (), channels.end (), std::back_inserter (deque_a), - [include_server_channels_a, minimum_version_a](auto & channel_a) { return channel_a.channel->get_network_version () >= minimum_version_a && (include_server_channels_a || !channel_a.channel->server); }, + nano::transform_if (channels.get ().begin (), channels.get ().end (), std::back_inserter (deque_a), + [include_temporary_channels_a, minimum_version_a](auto & channel_a) { return channel_a.channel->get_network_version () >= minimum_version_a && (include_temporary_channels_a || !channel_a.channel->temporary); }, [](const auto & channel) { return channel.channel; }); // clang-format on } @@ -604,7 +604,7 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptrnetwork.tcp_channels.find_node_id (node_id)); if (existing_channel) { - process = existing_channel->server; + process = existing_channel->temporary; } } if (process) diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index 30f3c3db2f..b733ae0393 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -40,7 +40,9 @@ namespace transport } std::weak_ptr socket; std::weak_ptr response_server; - bool server{ false }; + /* Mark for temporary channels. Usually remote ports of these channels are ephemeral and received from incoming connections to server. + If remote part has open listening port, temporary channel will be replaced with direct connection to listening port soon. But if other side is behing NAT or firewall this connection can be pemanent. */ + std::atomic temporary{ false }; nano::endpoint get_endpoint () const override { @@ -85,7 +87,7 @@ namespace transport size_t size () const; std::shared_ptr find_channel (nano::tcp_endpoint const &) const; void random_fill (std::array &) const; - std::unordered_set> random_set (size_t, uint8_t = 0) const; + std::unordered_set> random_set (size_t, uint8_t = 0, bool = false) const; bool store_all (bool = true); std::shared_ptr find_node_id (nano::account const &); // Get the next peer for attempting a tcp connection @@ -101,7 +103,7 @@ namespace transport std::unique_ptr collect_container_info (std::string const &); void purge (std::chrono::steady_clock::time_point const &); void ongoing_keepalive (); - void list (std::deque> &, bool = true, uint8_t = 0); + void list (std::deque> &, uint8_t = 0, bool = true); void modify (std::shared_ptr, std::function)>); void update (nano::tcp_endpoint const &); // Connection start diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index 2b6700126e..e9274f82de 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -6391,7 +6391,7 @@ TEST (rpc, block_confirm_confirmed) // Check confirmation history auto confirmed (node->active.list_confirmed ()); ASSERT_EQ (1, confirmed.size ()); - ASSERT_EQ (genesis.hash (), confirmed.begin ()->winner->hash ()); + ASSERT_EQ (nano::genesis_hash, confirmed.begin ()->winner->hash ()); // Check callback system.deadline_set (5s); while (node->stats.count (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out) == 0) @@ -6781,7 +6781,6 @@ TEST (rpc, wallet_history) { nano::system system; auto node = add_ipc_enabled_node (system); - nano::genesis genesis; system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); auto timestamp1 (nano::seconds_since_epoch ()); auto send (system.wallet (0)->send_action (nano::test_genesis_key.pub, nano::test_genesis_key.pub, node->config.receive_minimum.number ())); @@ -6843,7 +6842,7 @@ TEST (rpc, wallet_history) ASSERT_EQ ("receive", std::get<0> (history_l[3])); ASSERT_EQ (nano::test_genesis_key.pub.to_account (), std::get<1> (history_l[3])); ASSERT_EQ (nano::genesis_amount.convert_to (), std::get<2> (history_l[3])); - ASSERT_EQ (genesis.hash ().to_string (), std::get<3> (history_l[3])); + ASSERT_EQ (nano::genesis_hash.to_string (), std::get<3> (history_l[3])); ASSERT_EQ (nano::test_genesis_key.pub.to_account (), std::get<4> (history_l[3])); } @@ -7415,7 +7414,6 @@ TEST (rpc_config, migrate) TEST (rpc, deprecated_account_format) { nano::system system; - nano::genesis genesis; auto node = add_ipc_enabled_node (system); nano::node_rpc_config node_rpc_config; nano::ipc::ipc_server ipc_server (*node, node_rpc_config); @@ -7447,7 +7445,7 @@ TEST (rpc, deprecated_account_format) } ASSERT_EQ (200, response2.status); std::string frontier (response.json.get ("frontier")); - ASSERT_EQ (genesis.hash ().to_string (), frontier); + ASSERT_EQ (nano::genesis_hash.to_string (), frontier); boost::optional deprecated_account_format2 (response2.json.get_optional ("deprecated_account_format")); ASSERT_TRUE (deprecated_account_format2.is_initialized ()); } @@ -7457,9 +7455,8 @@ TEST (rpc, epoch_upgrade) nano::system system; auto node = add_ipc_enabled_node (system); nano::keypair key1, key2, key3; - nano::genesis genesis; nano::keypair epoch_signer (nano::test_genesis_key); - auto send1 (std::make_shared (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - 1, key1.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (genesis.hash ()))); // to opened account + auto send1 (std::make_shared (nano::test_genesis_key.pub, nano::genesis_hash, nano::test_genesis_key.pub, nano::genesis_amount - 1, key1.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (nano::genesis_hash))); // to opened account ASSERT_EQ (nano::process_result::progress, node->process (*send1).code); auto send2 (std::make_shared (nano::test_genesis_key.pub, send1->hash (), nano::test_genesis_key.pub, nano::genesis_amount - 2, key2.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send1->hash ()))); // to unopened account (pending) ASSERT_EQ (nano::process_result::progress, node->process (*send2).code); @@ -7579,10 +7576,9 @@ TEST (rpc, account_lazy_start) nano::node_flags node_flags; node_flags.disable_legacy_bootstrap = true; auto node1 = system.add_node (node_flags); - nano::genesis genesis; nano::keypair key; // Generating test chain - auto send1 (std::make_shared (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (genesis.hash ()))); + auto send1 (std::make_shared (nano::test_genesis_key.pub, nano::genesis_hash, nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (nano::genesis_hash))); ASSERT_EQ (nano::process_result::progress, node1->process (*send1).code); auto open (std::make_shared (send1->hash (), key.pub, key.pub, key.prv, key.pub, *system.work.generate (key.pub))); ASSERT_EQ (nano::process_result::progress, node1->process (*open).code); @@ -7634,8 +7630,7 @@ TEST (rpc, receive) wallet->insert_adhoc (nano::test_genesis_key.prv); nano::keypair key1; wallet->insert_adhoc (key1.prv); - nano::genesis genesis; - auto send1 (wallet->send_action (nano::test_genesis_key.pub, key1.pub, node.config.receive_minimum.number (), *node.work_generate_blocking (genesis.hash ()))); + auto send1 (wallet->send_action (nano::test_genesis_key.pub, key1.pub, node.config.receive_minimum.number (), *node.work_generate_blocking (nano::genesis_hash))); system.deadline_set (5s); while (node.balance (nano::test_genesis_key.pub) == nano::genesis_amount) { @@ -7708,8 +7703,7 @@ TEST (rpc, receive_unopened) wallet->insert_adhoc (nano::test_genesis_key.prv); // Test receiving for unopened account nano::keypair key1; - nano::genesis genesis; - auto send1 (wallet->send_action (nano::test_genesis_key.pub, key1.pub, node.config.receive_minimum.number () - 1, *node.work_generate_blocking (genesis.hash ()))); + auto send1 (wallet->send_action (nano::test_genesis_key.pub, key1.pub, node.config.receive_minimum.number () - 1, *node.work_generate_blocking (nano::genesis_hash))); system.deadline_set (5s); while (node.balance (nano::test_genesis_key.pub) == nano::genesis_amount) { diff --git a/nano/secure/common.cpp b/nano/secure/common.cpp index 9adb292112..b8d126be8d 100644 --- a/nano/secure/common.cpp +++ b/nano/secure/common.cpp @@ -56,6 +56,14 @@ char const * live_genesis_data = R"%%%({ "work": "62f05417dd3fb691", "signature": "9F0C933C8ADE004D808EA1985FA746A7E95BA2A38F867640F53EC8F180BDFE9E2C1268DEAD7C2664F356E37ABA362BC58E46DBA03E523A7B5A19E4B6EB12BB02" })%%%"; + +std::shared_ptr parse_block_from_genesis_data (std::string const & genesis_data_a) +{ + boost::property_tree::ptree tree; + std::stringstream istream (genesis_data_a); + boost::property_tree::read_json (istream, tree); + return nano::deserialize_block_json (tree); +} } nano::network_params::network_params () : @@ -92,6 +100,7 @@ nano_beta_genesis (beta_genesis_data), nano_live_genesis (live_genesis_data), genesis_account (network_a == nano::nano_networks::nano_test_network ? nano_test_account : network_a == nano::nano_networks::nano_beta_network ? nano_beta_account : nano_live_account), genesis_block (network_a == nano::nano_networks::nano_test_network ? nano_test_genesis : network_a == nano::nano_networks::nano_beta_network ? nano_beta_genesis : nano_live_genesis), +genesis_hash (parse_block_from_genesis_data (genesis_block)->hash ()), genesis_amount (std::numeric_limits::max ()), burn_account (0) { @@ -161,6 +170,7 @@ nano::keypair const & nano::test_genesis_key (test_constants.test_genesis_key); nano::account const & nano::nano_test_account (test_constants.nano_test_account); std::string const & nano::nano_test_genesis (test_constants.nano_test_genesis); nano::account const & nano::genesis_account (test_constants.genesis_account); +nano::block_hash const & nano::genesis_hash (test_constants.genesis_hash); nano::uint128_t const & nano::genesis_amount (test_constants.genesis_amount); nano::account const & nano::burn_account (test_constants.burn_account); @@ -795,10 +805,7 @@ std::unique_ptr nano::collect_container_info (vo nano::genesis::genesis () { static nano::network_params network_params; - boost::property_tree::ptree tree; - std::stringstream istream (network_params.ledger.genesis_block); - boost::property_tree::read_json (istream, tree); - open = nano::deserialize_block_json (tree); + open = parse_block_from_genesis_data (network_params.ledger.genesis_block); assert (open != nullptr); } diff --git a/nano/secure/common.hpp b/nano/secure/common.hpp index 650bcbd9ee..6325e95c08 100644 --- a/nano/secure/common.hpp +++ b/nano/secure/common.hpp @@ -381,6 +381,7 @@ class ledger_constants std::string nano_live_genesis; nano::account genesis_account; std::string genesis_block; + nano::block_hash genesis_hash; nano::uint128_t genesis_amount; nano::account burn_account; nano::epochs epochs; diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index f45c36d47c..2ac2d3c28c 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -438,8 +438,7 @@ TEST (node, mass_vote_by_hash) { nano::system system (1); system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); - nano::genesis genesis; - nano::block_hash previous (genesis.hash ()); + nano::block_hash previous (nano::genesis_hash); nano::keypair key; std::vector> blocks; for (auto i (0); i < 10000; ++i) From 07818f2fba4d50191629adb2a58a9eb1250d7527 Mon Sep 17 00:00:00 2001 From: Wesley Shillingford Date: Wed, 5 Feb 2020 09:42:07 +0000 Subject: [PATCH 07/12] Fix ASAN issue --- nano/core_test/node_telemetry.cpp | 4 ++-- nano/node/telemetry.cpp | 39 +++++++++---------------------- 2 files changed, 13 insertions(+), 30 deletions(-) diff --git a/nano/core_test/node_telemetry.cpp b/nano/core_test/node_telemetry.cpp index 2777375acc..0070438728 100644 --- a/nano/core_test/node_telemetry.cpp +++ b/nano/core_test/node_telemetry.cpp @@ -330,7 +330,7 @@ TEST (node_telemetry, many_nodes) for (auto i = 0; i < num_nodes; ++i) { nano::node_config node_config (nano::get_available_port (), system.logging); - // Make a metric completely different for each node so we can get afterwards that there are no duplicates + // Make a metric completely different for each node so we can check afterwards that there are no duplicates node_config.bandwidth_limit = 100000 + i; system.add_node (node_config); } @@ -386,7 +386,7 @@ TEST (node_telemetry, many_nodes) ASSERT_EQ (data.genesis_block, genesis.hash ()); } - // We gave some nodes different bandwidth caps, confirm they are not all the time + // We gave some nodes different bandwidth caps, confirm they are not all the same auto bandwidth_cap = all_telemetry_data_time_pairs.begin ()->second.data.bandwidth_cap; all_telemetry_data_time_pairs.erase (all_telemetry_data_time_pairs.begin ()); auto all_bandwidth_limits_same = std::all_of (all_telemetry_data_time_pairs.begin (), all_telemetry_data_time_pairs.end (), [bandwidth_cap](auto & telemetry_data_time_pair) { diff --git a/nano/node/telemetry.cpp b/nano/node/telemetry.cpp index 2324d2abad..91148b15dd 100644 --- a/nano/node/telemetry.cpp +++ b/nano/node/telemetry.cpp @@ -16,16 +16,6 @@ std::chrono::seconds constexpr nano::telemetry_impl::alarm_cutoff; -namespace -{ -// This class is just a wrapper to allow a recursive lambda while properly handling memory resources -class ongoing_func_wrapper -{ -public: - std::function ongoing_func; -}; -} - nano::telemetry::telemetry (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a) : network (network_a), alarm (alarm_a), @@ -96,9 +86,7 @@ void nano::telemetry::add (nano::telemetry_data const & telemetry_data_a, nano:: void nano::telemetry::ongoing_req_all_peers () { - auto wrapper = std::make_shared (); - // Keep calling ongoing_func while the peer is still being called - wrapper->ongoing_func = [this, telemetry_impl_w = std::weak_ptr (batch_request), wrapper]() { + alarm.add (std::chrono::steady_clock::now () + batch_request->cache_cutoff + batch_request->alarm_cutoff, [this, telemetry_impl_w = std::weak_ptr (batch_request)]() { if (auto batch_telemetry_impl = telemetry_impl_w.lock ()) { nano::lock_guard guard (this->mutex); @@ -117,12 +105,11 @@ void nano::telemetry::ongoing_req_all_peers () // Intentionally empty, just using to refresh the cache }); } - this->alarm.add (std::chrono::steady_clock::now () + batch_telemetry_impl->cache_cutoff + batch_telemetry_impl->alarm_cutoff, wrapper->ongoing_func); + + this->ongoing_req_all_peers (); } } - }; - - alarm.add (std::chrono::steady_clock::now () + batch_request->cache_cutoff + batch_request->alarm_cutoff, wrapper->ongoing_func); + }); } void nano::telemetry::get_metrics_peers_async (std::function const & callback_a) @@ -161,14 +148,11 @@ nano::telemetry_data_responses nano::telemetry::get_metrics_peers () // After a request is made to a single peer we want to remove it from the container after the peer has not been requested for a while (cache_cutoff). void nano::telemetry::ongoing_single_request_cleanup (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data const & single_request_data_a) { - auto wrapper = std::make_shared (); - // Keep calling ongoing_func while the peer is still being called - const auto & last_updated = single_request_data_a.last_updated; - wrapper->ongoing_func = [this, telemetry_impl_w = std::weak_ptr (single_request_data_a.impl), &last_updated, &endpoint_a, wrapper]() { + alarm.add (std::chrono::steady_clock::now () + single_request_data_a.impl->cache_cutoff, [this, telemetry_impl_w = std::weak_ptr (single_request_data_a.impl), &single_request_data_a, &endpoint_a]() { if (auto telemetry_impl = telemetry_impl_w.lock ()) { nano::lock_guard guard (this->mutex); - if (std::chrono::steady_clock::now () - telemetry_impl->cache_cutoff > last_updated && telemetry_impl->callbacks.empty ()) + if (std::chrono::steady_clock::now () - telemetry_impl->cache_cutoff > single_request_data_a.last_updated && telemetry_impl->callbacks.empty ()) { // This will be picked up by the batch request next round this->finished_single_requests[endpoint_a] = telemetry_impl->cached_telemetry_data.begin ()->second; @@ -177,12 +161,10 @@ void nano::telemetry::ongoing_single_request_cleanup (nano::endpoint const & end else { // Request is still active, so call again - this->alarm.add (std::chrono::steady_clock::now () + telemetry_impl->cache_cutoff, wrapper->ongoing_func); + this->ongoing_single_request_cleanup (endpoint_a, single_request_data_a); } } - }; - - alarm.add (std::chrono::steady_clock::now () + single_request_data_a.impl->cache_cutoff, wrapper->ongoing_func); + }); } void nano::telemetry::update_cleanup_data (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data & single_request_data_a, bool is_new_a) @@ -244,9 +226,10 @@ void nano::telemetry::get_metrics_single_peer_async (std::shared_ptrget_endpoint (), single_request_data{ std::make_shared (network, alarm, worker), std::chrono::steady_clock::now () }); - update_cleanup_data (pair.first->first, pair.first->second, pair.second); + auto & single_request_data_it = pair.first; + update_cleanup_data (single_request_data_it->first, single_request_data_it->second, pair.second); - pair.first->second.impl->get_metrics_async ({ channel_a }, [callback_a, channel_a](telemetry_data_responses const & telemetry_data_responses_a) { + single_request_data_it->second.impl->get_metrics_async ({ channel_a }, [callback_a, channel_a](telemetry_data_responses const & telemetry_data_responses_a) { // There should only be 1 response, so if this hasn't been received then conclude it is an error. auto const error = !telemetry_data_responses_a.all_received; if (!error) From 009ad45a8252dee1d49d505deb554075460198d9 Mon Sep 17 00:00:00 2001 From: Wesley Shillingford Date: Wed, 5 Feb 2020 09:42:30 +0000 Subject: [PATCH 08/12] Gui review comments --- nano/node/common.cpp | 19 +++++++++++++++++-- nano/node/common.hpp | 1 + nano/node/json_handler.cpp | 12 ++---------- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/nano/node/common.cpp b/nano/node/common.cpp index aaf4d884d1..42527abea7 100644 --- a/nano/node/common.cpp +++ b/nano/node/common.cpp @@ -1365,6 +1365,22 @@ bool nano::parse_port (std::string const & string_a, uint16_t & port_a) return result; } +// Can handle ipv6 address with and without square brackets +bool nano::parse_address_ipv6 (std::string const & address_text_a, boost::asio::ip::address_v6 & address_a) +{ + auto result (false); + auto address_text = address_text_a; + if (!address_text.empty () && address_text.front () == '[' && address_text.back () == ']') + { + // Chop the square brackets off as make_address_v6 doesn't always like them + address_text = address_text.substr (1, address_text.size () - 2); + } + + boost::system::error_code address_ec; + address_a = boost::asio::ip::make_address_v6 (address_text, address_ec); + return !!address_ec; +} + bool nano::parse_address_port (std::string const & string, boost::asio::ip::address & address_a, uint16_t & port_a) { auto result (false); @@ -1433,8 +1449,7 @@ bool nano::parse_tcp_endpoint (std::string const & string, nano::tcp_endpoint & std::chrono::seconds nano::telemetry_cache_cutoffs::network_to_time (network_constants const & network_constants) { - // 15s for beta to allow for quicker diagnostics, 60s for live - return std::chrono::seconds{ network_constants.is_live_network () ? 60 : network_constants.is_beta_network () ? 15 : 2 }; + return std::chrono::seconds{ network_constants.is_live_network () ? live : network_constants.is_beta_network () ? beta : test }; } nano::node_singleton_memory_pool_purge_guard::node_singleton_memory_pool_purge_guard () : diff --git a/nano/node/common.hpp b/nano/node/common.hpp index aafe53f2ff..96c3cfc725 100644 --- a/nano/node/common.hpp +++ b/nano/node/common.hpp @@ -14,6 +14,7 @@ namespace nano { using endpoint = boost::asio::ip::udp::endpoint; bool parse_port (std::string const &, uint16_t &); +bool parse_address_ipv6 (std::string const &, boost::asio::ip::address_v6 &); bool parse_address_port (std::string const &, boost::asio::ip::address &, uint16_t &); using tcp_endpoint = boost::asio::ip::tcp::endpoint; bool parse_endpoint (std::string const &, nano::endpoint &); diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index a5ad885a64..64c3362f2b 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -3910,15 +3910,8 @@ void nano::json_handler::telemetry () uint16_t port; if (!nano::parse_port (*port_text, port)) { - if (!address_text->empty () && address_text->front () == '[' && address_text->back () == ']') - { - // Chop the square brackets off as make_address_v6 doesn't always like them - address_text = address_text->substr (1, address_text->size () - 2); - } - - boost::system::error_code address_ec; - auto address (boost::asio::ip::make_address_v6 (*address_text, address_ec)); - if (!address_ec) + boost::asio::ip::address_v6 address; + if (!nano::parse_address_ipv6 (*address_text, address)) { nano::endpoint endpoint (address, port); channel = node.network.find_channel (endpoint); @@ -3984,7 +3977,6 @@ void nano::json_handler::telemetry () node.telemetry.get_metrics_peers_async ([rpc_l, output_raw](auto const & batched_telemetry_metrics_a) { if (output_raw) { - std::unordered_map telemetry_data_time_pairs; boost::property_tree::ptree metrics; for (auto & telemetry_metrics : batched_telemetry_metrics_a.telemetry_data_time_pairs) { From af8e91588379c3becbc62591d7dc0bb2c035ce83 Mon Sep 17 00:00:00 2001 From: Wesley Shillingford Date: Fri, 7 Feb 2020 17:01:23 +0000 Subject: [PATCH 09/12] Make parse_address accept v4 and v6 ip addresses --- nano/node/common.cpp | 8 ++++---- nano/node/common.hpp | 2 +- nano/node/json_handler.cpp | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/nano/node/common.cpp b/nano/node/common.cpp index 42527abea7..9e2cb2c682 100644 --- a/nano/node/common.cpp +++ b/nano/node/common.cpp @@ -1365,19 +1365,19 @@ bool nano::parse_port (std::string const & string_a, uint16_t & port_a) return result; } -// Can handle ipv6 address with and without square brackets -bool nano::parse_address_ipv6 (std::string const & address_text_a, boost::asio::ip::address_v6 & address_a) +// Can handle both ipv4 & ipv6 addresses (with and without square brackets) +bool nano::parse_address (std::string const & address_text_a, boost::asio::ip::address & address_a) { auto result (false); auto address_text = address_text_a; if (!address_text.empty () && address_text.front () == '[' && address_text.back () == ']') { - // Chop the square brackets off as make_address_v6 doesn't always like them + // Chop the square brackets off as make_address doesn't always like them address_text = address_text.substr (1, address_text.size () - 2); } boost::system::error_code address_ec; - address_a = boost::asio::ip::make_address_v6 (address_text, address_ec); + address_a = boost::asio::ip::make_address (address_text, address_ec); return !!address_ec; } diff --git a/nano/node/common.hpp b/nano/node/common.hpp index 96c3cfc725..44d037ba4c 100644 --- a/nano/node/common.hpp +++ b/nano/node/common.hpp @@ -14,7 +14,7 @@ namespace nano { using endpoint = boost::asio::ip::udp::endpoint; bool parse_port (std::string const &, uint16_t &); -bool parse_address_ipv6 (std::string const &, boost::asio::ip::address_v6 &); +bool parse_address (std::string const &, boost::asio::ip::address &); bool parse_address_port (std::string const &, boost::asio::ip::address &, uint16_t &); using tcp_endpoint = boost::asio::ip::tcp::endpoint; bool parse_endpoint (std::string const &, nano::endpoint &); diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index 34de357d81..f83799c407 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -3910,11 +3910,11 @@ void nano::json_handler::telemetry () uint16_t port; if (!nano::parse_port (*port_text, port)) { - boost::asio::ip::address_v6 address; - if (!nano::parse_address_ipv6 (*address_text, address)) + boost::asio::ip::address address; + if (!nano::parse_address (*address_text, address)) { nano::endpoint endpoint (address, port); - channel = node.network.find_channel (endpoint); + channel = node.network.find_channel (nano::transport::map_endpoint_to_v6 (endpoint)); if (!channel) { ec = nano::error_rpc::peer_not_found; From fd30f7bbd01d92e79d41f3c1d58b259cb7cacfdf Mon Sep 17 00:00:00 2001 From: Wesley Shillingford Date: Fri, 7 Feb 2020 17:01:45 +0000 Subject: [PATCH 10/12] Incorrect order of arguments --- nano/node/telemetry.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nano/node/telemetry.cpp b/nano/node/telemetry.cpp index 91148b15dd..ed65d6640a 100644 --- a/nano/node/telemetry.cpp +++ b/nano/node/telemetry.cpp @@ -92,7 +92,7 @@ void nano::telemetry::ongoing_req_all_peers () nano::lock_guard guard (this->mutex); if (!this->stopped) { - auto peers = this->network.list (std::numeric_limits::max (), false, network_params.protocol.telemetry_protocol_version_min); + auto peers = this->network.list (std::numeric_limits::max (), network_params.protocol.telemetry_protocol_version_min, false); // If exists in single_requests don't request because they will just be rejected by other peers until the next round auto const & single_requests = this->single_requests; peers.erase (std::remove_if (peers.begin (), peers.end (), [&single_requests](auto const & channel_a) { From defaf992ae366e5c57c28077496d9e24d70c48dd Mon Sep 17 00:00:00 2001 From: Wesley Shillingford Date: Fri, 7 Feb 2020 17:04:04 +0000 Subject: [PATCH 11/12] Use new cached genesis hash --- nano/core_test/node_telemetry.cpp | 2 +- nano/rpc_test/rpc.cpp | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/nano/core_test/node_telemetry.cpp b/nano/core_test/node_telemetry.cpp index 0070438728..54c304f335 100644 --- a/nano/core_test/node_telemetry.cpp +++ b/nano/core_test/node_telemetry.cpp @@ -1037,6 +1037,6 @@ void compare_default_test_result_data (nano::telemetry_data const & telemetry_da ASSERT_EQ (*telemetry_data_a.pre_release_version, nano::get_pre_release_node_version ()); ASSERT_EQ (*telemetry_data_a.maker, 0); ASSERT_LT (telemetry_data_a.uptime, 100); - ASSERT_EQ (telemetry_data_a.genesis_block, nano::genesis ().hash ()); + ASSERT_EQ (telemetry_data_a.genesis_block, node_server_a.network_params.ledger.genesis_hash); } } diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index 0d8f2c683a..b45e266876 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -7833,7 +7833,7 @@ void compare_default_test_result_data (test_response & response, nano::node cons ASSERT_EQ (1, response.json.get ("peer_count")); ASSERT_EQ (node_server_a.network_params.protocol.protocol_version, response.json.get ("protocol_version")); ASSERT_GE (100, response.json.get ("uptime")); - ASSERT_EQ (nano::genesis ().hash ().to_string (), response.json.get ("genesis_block")); + ASSERT_EQ (node_server_a.network_params.ledger.genesis_hash.to_string (), response.json.get ("genesis_block")); ASSERT_EQ (nano::get_major_node_version (), response.json.get ("major_version")); ASSERT_EQ (nano::get_minor_node_version (), response.json.get ("minor_version")); ASSERT_EQ (nano::get_patch_node_version (), response.json.get ("patch_version")); @@ -8028,7 +8028,7 @@ TEST (rpc, node_telemetry_all) ASSERT_EQ (1, metrics.peer_count); ASSERT_EQ (node->network_params.protocol.protocol_version, metrics.protocol_version); ASSERT_GE (100, metrics.uptime); - ASSERT_EQ (nano::genesis ().hash ().to_string (), metrics.genesis_block); + ASSERT_EQ (node1.network_params.ledger.genesis_hash.to_string (), metrics.genesis_block); ASSERT_EQ (nano::get_major_node_version (), metrics.major_version); ASSERT_EQ (nano::get_minor_node_version (), metrics.minor_version); ASSERT_EQ (nano::get_patch_node_version (), metrics.patch_version); From 68c739cdb105a78ce7ba4dac517c97dde0e3a4be Mon Sep 17 00:00:00 2001 From: Wesley Shillingford Date: Mon, 10 Feb 2020 09:48:27 +0000 Subject: [PATCH 12/12] Move last_telemetry_req to bootstrap_server --- nano/node/bootstrap/bootstrap_server.cpp | 16 ++++------------ nano/node/bootstrap/bootstrap_server.hpp | 1 + nano/node/transport/transport.hpp | 13 ------------- nano/node/transport/udp.hpp | 13 +++++++++++++ 4 files changed, 18 insertions(+), 25 deletions(-) diff --git a/nano/node/bootstrap/bootstrap_server.cpp b/nano/node/bootstrap/bootstrap_server.cpp index a2a7441ab3..f3de55dccc 100644 --- a/nano/node/bootstrap/bootstrap_server.cpp +++ b/nano/node/bootstrap/bootstrap_server.cpp @@ -241,20 +241,12 @@ void nano::bootstrap_server::receive_header_action (boost::system::error_code co { if (is_realtime_connection ()) { - auto find_channel (node->network.tcp_channels.find_channel (remote_endpoint)); - if (!find_channel) - { - find_channel = node->network.tcp_channels.find_node_id (remote_node_id); - } - // Only handle telemetry requests if they are outside of the cutoff time - auto is_very_first_message = find_channel->get_last_telemetry_req () == std::chrono::steady_clock::time_point{}; - auto cache_exceeded = std::chrono::steady_clock::now () >= find_channel->get_last_telemetry_req () + nano::telemetry_cache_cutoffs::network_to_time (node->network_params.network); - if (find_channel && (is_very_first_message || cache_exceeded)) + auto is_very_first_message = last_telemetry_req == std::chrono::steady_clock::time_point{}; + auto cache_exceeded = std::chrono::steady_clock::now () >= last_telemetry_req + nano::telemetry_cache_cutoffs::network_to_time (node->network_params.network); + if (is_very_first_message || cache_exceeded) { - node->network.tcp_channels.modify (find_channel, [](std::shared_ptr channel_a) { - channel_a->set_last_telemetry_req (std::chrono::steady_clock::now ()); - }); + last_telemetry_req = std::chrono::steady_clock::now (); add_request (std::make_unique (header)); } } diff --git a/nano/node/bootstrap/bootstrap_server.hpp b/nano/node/bootstrap/bootstrap_server.hpp index 63dfd9b0f3..9e520955bb 100644 --- a/nano/node/bootstrap/bootstrap_server.hpp +++ b/nano/node/bootstrap/bootstrap_server.hpp @@ -75,5 +75,6 @@ class bootstrap_server final : public std::enable_shared_from_this lk (channel_mutex); - return last_telemetry_req; - } - - void set_last_telemetry_req (std::chrono::steady_clock::time_point const time_a) - { - nano::lock_guard lk (channel_mutex); - last_telemetry_req = time_a; - } - boost::optional get_node_id_optional () const { nano::lock_guard lk (channel_mutex); @@ -156,7 +144,6 @@ namespace transport std::chrono::steady_clock::time_point last_bootstrap_attempt{ std::chrono::steady_clock::time_point () }; std::chrono::steady_clock::time_point last_packet_received{ std::chrono::steady_clock::time_point () }; std::chrono::steady_clock::time_point last_packet_sent{ std::chrono::steady_clock::time_point () }; - std::chrono::steady_clock::time_point last_telemetry_req{ std::chrono::steady_clock::time_point () }; boost::optional node_id{ boost::none }; std::atomic network_version{ 0 }; diff --git a/nano/node/transport/udp.hpp b/nano/node/transport/udp.hpp index 8f6a6b0898..27da1f64a1 100644 --- a/nano/node/transport/udp.hpp +++ b/nano/node/transport/udp.hpp @@ -54,9 +54,22 @@ namespace transport return nano::transport::transport_type::udp; } + std::chrono::steady_clock::time_point get_last_telemetry_req () + { + nano::lock_guard lk (channel_mutex); + return last_telemetry_req; + } + + void set_last_telemetry_req (std::chrono::steady_clock::time_point const time_a) + { + nano::lock_guard lk (channel_mutex); + last_telemetry_req = time_a; + } + private: nano::endpoint endpoint; nano::transport::udp_channels & channels; + std::chrono::steady_clock::time_point last_telemetry_req{ std::chrono::steady_clock::time_point () }; }; class udp_channels final {