From bd93581a9bd07b1b38eba33a22b1068798af9ef8 Mon Sep 17 00:00:00 2001 From: Wesley Shillingford Date: Mon, 10 Feb 2020 15:39:53 +0000 Subject: [PATCH] Poll all nodes, remove outliers, ddos protection & amend RPC response with endpoint (#2521) * Poll all nodes and remove some metrics from bounds when consolidating * Update out of date comment * Formatting * Fix clang build on actions with long std::tuple * Allow square brackets in ipv6 address in RPC * Merge with develop * Fix ASAN issue * Gui review comments * Make parse_address accept v4 and v6 ip addresses * Incorrect order of arguments * Use new cached genesis hash * Move last_telemetry_req to bootstrap_server --- nano/core_test/node_telemetry.cpp | 591 +++++++++++++---------- nano/lib/utility.hpp | 15 +- nano/node/bootstrap/bootstrap_server.cpp | 10 +- nano/node/bootstrap/bootstrap_server.hpp | 1 + nano/node/common.cpp | 162 ++----- nano/node/common.hpp | 13 +- nano/node/json_handler.cpp | 32 +- nano/node/network.cpp | 8 +- nano/node/network.hpp | 3 +- nano/node/telemetry.cpp | 428 +++++++++++++--- nano/node/telemetry.hpp | 72 ++- nano/node/transport/tcp.cpp | 11 +- nano/node/transport/tcp.hpp | 3 +- nano/node/transport/udp.cpp | 31 +- nano/node/transport/udp.hpp | 19 +- nano/rpc_test/rpc.cpp | 83 ++-- nano/slow_test/node.cpp | 239 +++++++++ 17 files changed, 1188 insertions(+), 533 deletions(-) diff --git a/nano/core_test/node_telemetry.cpp b/nano/core_test/node_telemetry.cpp index 7bd76d413f..54c304f335 100644 --- a/nano/core_test/node_telemetry.cpp +++ b/nano/core_test/node_telemetry.cpp @@ -65,7 +65,7 @@ TEST (node_telemetry, consolidate_data) std::vector all_data{ data, data1, data2 }; - auto consolidated_telemetry_data = nano::telemetry_data::consolidate (all_data); + auto consolidated_telemetry_data = nano::consolidate_telemetry_data (all_data); ASSERT_EQ (consolidated_telemetry_data.account_count, 3); ASSERT_EQ (consolidated_telemetry_data.block_count, 3); ASSERT_EQ (consolidated_telemetry_data.cemented_count, 2); @@ -91,7 +91,7 @@ TEST (node_telemetry, consolidate_data) all_data[2].pre_release_version = 6; all_data[2].maker = 2; - auto consolidated_telemetry_data1 = nano::telemetry_data::consolidate (all_data); + auto consolidated_telemetry_data1 = nano::consolidate_telemetry_data (all_data); ASSERT_EQ (consolidated_telemetry_data1.major_version, 10); ASSERT_EQ (*consolidated_telemetry_data1.minor_version, 2); ASSERT_EQ (*consolidated_telemetry_data1.patch_version, 3); @@ -122,7 +122,7 @@ TEST (node_telemetry, consolidate_data_optional_data) nano::telemetry_data missing_all_optional; std::vector all_data{ data, data, missing_minor, missing_all_optional }; - auto consolidated_telemetry_data = nano::telemetry_data::consolidate (all_data); + auto consolidated_telemetry_data = nano::consolidate_telemetry_data (all_data); ASSERT_EQ (consolidated_telemetry_data.major_version, 20); ASSERT_EQ (*consolidated_telemetry_data.minor_version, 1); ASSERT_EQ (*consolidated_telemetry_data.patch_version, 4); @@ -174,15 +174,76 @@ TEST (node_telemetry, serialize_deserialize_json_optional) ASSERT_FALSE (no_optional_data1.maker.is_initialized ()); } +TEST (node_telemetry, consolidate_data_remove_outliers) +{ + nano::telemetry_data data; + data.account_count = 2; + data.block_count = 1; + data.cemented_count = 1; + data.protocol_version = 12; + data.peer_count = 2; + data.bandwidth_cap = 100; + data.unchecked_count = 3; + data.uptime = 6; + data.genesis_block = nano::block_hash (3); + data.major_version = 20; + data.minor_version = 1; + data.patch_version = 5; + data.pre_release_version = 2; + data.maker = 1; + + // Insert 20 of these, and 2 outliers at the lower and upper bounds which should get removed + std::vector all_data (20, data); + + // Insert some outliers + nano::telemetry_data outlier_data; + outlier_data.account_count = 1; + outlier_data.block_count = 0; + outlier_data.cemented_count = 0; + outlier_data.protocol_version = 11; + outlier_data.peer_count = 0; + outlier_data.bandwidth_cap = 8; + outlier_data.unchecked_count = 1; + outlier_data.uptime = 2; + outlier_data.genesis_block = nano::block_hash (2); + outlier_data.major_version = 11; + outlier_data.minor_version = 1; + outlier_data.patch_version = 1; + outlier_data.pre_release_version = 1; + outlier_data.maker = 1; + all_data.push_back (outlier_data); + all_data.push_back (outlier_data); + + nano::telemetry_data outlier_data1; + outlier_data1.account_count = 99; + outlier_data1.block_count = 99; + outlier_data1.cemented_count = 99; + outlier_data1.protocol_version = 99; + outlier_data1.peer_count = 99; + outlier_data1.bandwidth_cap = 999; + outlier_data1.unchecked_count = 99; + outlier_data1.uptime = 999; + outlier_data1.genesis_block = nano::block_hash (99); + outlier_data1.major_version = 99; + outlier_data1.minor_version = 9; + outlier_data1.patch_version = 9; + outlier_data1.pre_release_version = 9; + outlier_data1.maker = 9; + all_data.push_back (outlier_data1); + all_data.push_back (outlier_data1); + + auto consolidated_telemetry_data = nano::consolidate_telemetry_data (all_data); + ASSERT_EQ (data, consolidated_telemetry_data); +} + TEST (node_telemetry, no_peers) { nano::system system (1); std::atomic done{ false }; - system.nodes[0]->telemetry.get_metrics_random_peers_async ([&done](nano::telemetry_data_responses const & responses_a) { - ASSERT_TRUE (responses_a.data.empty ()); + system.nodes[0]->telemetry.get_metrics_peers_async ([&done](nano::telemetry_data_responses const & responses_a) { + ASSERT_TRUE (responses_a.telemetry_data_time_pairs.empty ()); ASSERT_FALSE (responses_a.all_received); - ASSERT_FALSE (responses_a.is_cached); done = true; }); @@ -205,13 +266,12 @@ TEST (node_telemetry, basic) wait_peer_connections (system); // Request telemetry metrics - std::vector all_telemetry_data; + std::unordered_map all_telemetry_data_time_pairs; { std::atomic done{ false }; - node_client->telemetry.get_metrics_random_peers_async ([&done, &all_telemetry_data](nano::telemetry_data_responses const & responses_a) { - ASSERT_FALSE (responses_a.is_cached); + node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) { ASSERT_TRUE (responses_a.all_received); - all_telemetry_data = responses_a.data; + all_telemetry_data_time_pairs = responses_a.telemetry_data_time_pairs; done = true; }); @@ -223,16 +283,14 @@ TEST (node_telemetry, basic) } // Check the metrics are correct - ASSERT_EQ (all_telemetry_data.size (), 1); - auto & telemetry_data = all_telemetry_data.front (); - compare_default_test_result_data (telemetry_data, *node_server); + ASSERT_EQ (all_telemetry_data_time_pairs.size (), 1); + compare_default_test_result_data (all_telemetry_data_time_pairs.begin ()->second.data, *node_server); // Call again straight away. It should use the cache { std::atomic done{ false }; - node_client->telemetry.get_metrics_random_peers_async ([&done, &telemetry_data](nano::telemetry_data_responses const & responses_a) { - ASSERT_EQ (telemetry_data, responses_a.data.front ()); - ASSERT_TRUE (responses_a.is_cached); + node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) { + ASSERT_EQ (all_telemetry_data_time_pairs, responses_a.telemetry_data_time_pairs); ASSERT_TRUE (responses_a.all_received); done = true; }); @@ -245,11 +303,13 @@ TEST (node_telemetry, basic) } // Wait the cache period and check cache is not used - std::this_thread::sleep_for (nano::telemetry_impl::cache_cutoff); + std::this_thread::sleep_for (nano::telemetry_cache_cutoffs::test); + // Arbitrarily change something so that we can confirm different metrics were used + node_server->ledger.cache.block_count = 100; std::atomic done{ false }; - node_client->telemetry.get_metrics_random_peers_async ([&done, &telemetry_data](nano::telemetry_data_responses const & responses_a) { - ASSERT_FALSE (responses_a.is_cached); + node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) { + ASSERT_NE (all_telemetry_data_time_pairs, responses_a.telemetry_data_time_pairs); ASSERT_TRUE (responses_a.all_received); done = true; }); @@ -270,7 +330,7 @@ TEST (node_telemetry, many_nodes) for (auto i = 0; i < num_nodes; ++i) { nano::node_config node_config (nano::get_available_port (), system.logging); - // Make a metric completely different for each node so we can get afterwards that there are no duplicates + // Make a metric completely different for each node so we can check afterwards that there are no duplicates node_config.bandwidth_limit = 100000 + i; system.add_node (node_config); } @@ -291,11 +351,10 @@ TEST (node_telemetry, many_nodes) auto node_client = system.nodes.front (); std::atomic done{ false }; - std::vector all_telemetry_data; - node_client->telemetry.get_metrics_random_peers_async ([&done, &all_telemetry_data](nano::telemetry_data_responses const & responses_a) { - ASSERT_FALSE (responses_a.is_cached); + std::unordered_map all_telemetry_data_time_pairs; + node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) { ASSERT_TRUE (responses_a.all_received); - all_telemetry_data = responses_a.data; + all_telemetry_data_time_pairs = responses_a.telemetry_data_time_pairs; done = true; }); @@ -307,8 +366,9 @@ TEST (node_telemetry, many_nodes) // Check the metrics nano::network_params params; - for (auto & data : all_telemetry_data) + for (auto & telemetry_data_time_pair : all_telemetry_data_time_pairs) { + auto & data = telemetry_data_time_pair.second.data; ASSERT_EQ (data.unchecked_count, 0); ASSERT_EQ (data.cemented_count, 1); ASSERT_LE (data.peer_count, 9); @@ -326,9 +386,11 @@ TEST (node_telemetry, many_nodes) ASSERT_EQ (data.genesis_block, genesis.hash ()); } - // We gave some nodes different bandwidth caps, confirm they are not all the time - auto all_bandwidth_limits_same = std::all_of (all_telemetry_data.begin () + 1, all_telemetry_data.end (), [bandwidth_cap = all_telemetry_data[0].bandwidth_cap](auto & telemetry) { - return telemetry.bandwidth_cap == bandwidth_cap; + // We gave some nodes different bandwidth caps, confirm they are not all the same + auto bandwidth_cap = all_telemetry_data_time_pairs.begin ()->second.data.bandwidth_cap; + all_telemetry_data_time_pairs.erase (all_telemetry_data_time_pairs.begin ()); + auto all_bandwidth_limits_same = std::all_of (all_telemetry_data_time_pairs.begin (), all_telemetry_data_time_pairs.end (), [bandwidth_cap](auto & telemetry_data_time_pair) { + return telemetry_data_time_pair.second.data.bandwidth_cap == bandwidth_cap; }); ASSERT_FALSE (all_bandwidth_limits_same); } @@ -354,11 +416,10 @@ TEST (node_telemetry, over_udp) wait_peer_connections (system); std::atomic done{ false }; - std::vector all_telemetry_data; - node_client->telemetry.get_metrics_random_peers_async ([&done, &all_telemetry_data](nano::telemetry_data_responses const & responses_a) { - ASSERT_FALSE (responses_a.is_cached); + std::unordered_map all_telemetry_data_time_pairs; + node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) { ASSERT_TRUE (responses_a.all_received); - all_telemetry_data = responses_a.data; + all_telemetry_data_time_pairs = responses_a.telemetry_data_time_pairs; done = true; }); @@ -368,8 +429,8 @@ TEST (node_telemetry, over_udp) ASSERT_NO_ERROR (system.poll ()); } - ASSERT_EQ (all_telemetry_data.size (), 1); - compare_default_test_result_data (all_telemetry_data.front (), *node_server); + ASSERT_EQ (all_telemetry_data_time_pairs.size (), 1); + compare_default_test_result_data (all_telemetry_data_time_pairs.begin ()->second.data, *node_server); // Check channels are indeed udp ASSERT_EQ (1, node_client->network.size ()); @@ -382,87 +443,6 @@ TEST (node_telemetry, over_udp) ASSERT_EQ (nano::transport::transport_type::udp, list2[0]->get_type ()); } -TEST (node_telemetry, simultaneous_random_requests) -{ - const auto num_nodes = 4; - nano::system system (num_nodes); - - // Wait until peers are stored as they are done in the background - wait_peer_connections (system); - - std::vector threads; - const auto num_threads = 4; - - std::atomic done{ false }; - class Data - { - public: - std::atomic awaiting_cache{ false }; - std::atomic keep_requesting_metrics{ true }; - std::shared_ptr node; - }; - - std::array all_data{}; - for (auto i = 0; i < num_nodes; ++i) - { - all_data[i].node = system.nodes[i]; - } - - std::atomic count{ 0 }; - std::promise promise; - std::shared_future shared_future (promise.get_future ()); - - // Create a few threads where each node sends out telemetry request messages to all other nodes continuously, until the cache it reached and subsequently expired. - // The test waits until all telemetry_ack messages have been received. - for (int i = 0; i < num_threads; ++i) - { - threads.emplace_back ([&all_data, &done, &count, &promise, &shared_future]() { - while (std::any_of (all_data.cbegin (), all_data.cend (), [](auto const & data) { return data.keep_requesting_metrics.load (); })) - { - for (auto & data : all_data) - { - // Keep calling get_metrics_async until the cache has been saved and then become outdated (after a certain period of time) for each node - if (data.keep_requesting_metrics) - { - ++count; - - data.node->telemetry.get_metrics_random_peers_async ([&promise, &done, &data, &all_data, &count](nano::telemetry_data_responses const & responses_a) { - if (data.awaiting_cache && !responses_a.is_cached) - { - data.keep_requesting_metrics = false; - } - if (responses_a.is_cached) - { - data.awaiting_cache = true; - } - if (--count == 0 && std::all_of (all_data.begin (), all_data.end (), [](auto const & data) { return !data.keep_requesting_metrics; })) - { - done = true; - promise.set_value (); - } - }); - } - std::this_thread::sleep_for (1ms); - } - } - - ASSERT_EQ (count, 0); - shared_future.wait (); - }); - } - - system.deadline_set (20s); - while (!done) - { - ASSERT_NO_ERROR (system.poll ()); - } - - for (auto & thread : threads) - { - thread.join (); - } -} - namespace nano { TEST (node_telemetry, single_request) @@ -476,14 +456,14 @@ TEST (node_telemetry, single_request) // Request telemetry metrics auto channel = node_client->network.find_channel (node_server->network.endpoint ()); - nano::telemetry_data telemetry_data; + nano::telemetry_data_time_pair telemetry_data_time_pair; { std::atomic done{ false }; - node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) { - ASSERT_FALSE (response_a.is_cached); + node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data_time_pair, &channel](nano::telemetry_data_response const & response_a) { ASSERT_FALSE (response_a.error); - telemetry_data = response_a.data; + ASSERT_EQ (channel->get_endpoint (), response_a.endpoint); + telemetry_data_time_pair = response_a.telemetry_data_time_pair; done = true; }); @@ -495,14 +475,13 @@ TEST (node_telemetry, single_request) } // Check the metrics are correct - compare_default_test_result_data (telemetry_data, *node_server); + compare_default_test_result_data (telemetry_data_time_pair.data, *node_server); // Call again straight away. It should use the cache { std::atomic done{ false }; - node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) { - ASSERT_EQ (telemetry_data, response_a.data); - ASSERT_TRUE (response_a.is_cached); + node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data_time_pair](nano::telemetry_data_response const & response_a) { + ASSERT_EQ (telemetry_data_time_pair, response_a.telemetry_data_time_pair); ASSERT_FALSE (response_a.error); done = true; }); @@ -515,11 +494,11 @@ TEST (node_telemetry, single_request) } // Wait the cache period and check cache is not used - std::this_thread::sleep_for (nano::telemetry_impl::cache_cutoff); + std::this_thread::sleep_for (nano::telemetry_cache_cutoffs::test); std::atomic done{ false }; - node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) { - ASSERT_FALSE (response_a.is_cached); + node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data_time_pair](nano::telemetry_data_response const & response_a) { + ASSERT_NE (telemetry_data_time_pair, response_a.telemetry_data_time_pair); ASSERT_FALSE (response_a.error); done = true; }); @@ -552,100 +531,6 @@ TEST (node_telemetry, single_request_invalid_channel) } } -TEST (node_telemetry, simultaneous_single_and_random_requests) -{ - const auto num_nodes = 4; - nano::system system (num_nodes); - - wait_peer_connections (system); - - std::vector threads; - const auto num_threads = 4; - - class data - { - public: - std::atomic awaiting_cache{ false }; - std::atomic keep_requesting_metrics{ true }; - std::shared_ptr node; - }; - - std::array node_data_single{}; - std::array node_data_random{}; - for (auto i = 0; i < num_nodes; ++i) - { - node_data_single[i].node = system.nodes[i]; - node_data_random[i].node = system.nodes[i]; - } - - class shared_data - { - public: - std::atomic done{ false }; - std::atomic count{ 0 }; - std::promise promise; - std::shared_future shared_future{ promise.get_future () }; - }; - - shared_data shared_data_single; - shared_data shared_data_random; - - // Create a few threads where each node sends out telemetry request messages to all other nodes continuously, until the cache it reached and subsequently expired. - // The test waits until all telemetry_ack messages have been received. - for (int i = 0; i < num_threads; ++i) - { - threads.emplace_back ([&node_data_single, &node_data_random, &shared_data_single, &shared_data_random]() { - auto func = [](auto & all_node_data_a, shared_data & shared_data_a) { - while (std::any_of (all_node_data_a.cbegin (), all_node_data_a.cend (), [](auto const & data) { return data.keep_requesting_metrics.load (); })) - { - for (auto & data : all_node_data_a) - { - // Keep calling get_metrics_async until the cache has been saved and then become outdated (after a certain period of time) for each node - if (data.keep_requesting_metrics) - { - ++shared_data_a.count; - - data.node->telemetry.get_metrics_random_peers_async ([& shared_data = shared_data_a, &data, &all_node_data = all_node_data_a](nano::telemetry_data_responses const & responses_a) { - if (data.awaiting_cache && !responses_a.is_cached) - { - data.keep_requesting_metrics = false; - } - if (responses_a.is_cached) - { - data.awaiting_cache = true; - } - if (--shared_data.count == 0 && std::all_of (all_node_data.begin (), all_node_data.end (), [](auto const & data) { return !data.keep_requesting_metrics; })) - { - shared_data.done = true; - shared_data.promise.set_value (); - } - }); - } - std::this_thread::sleep_for (1ms); - } - } - - ASSERT_EQ (shared_data_a.count, 0); - shared_data_a.shared_future.wait (); - }; - - func (node_data_single, shared_data_single); - func (node_data_random, shared_data_random); - }); - } - - system.deadline_set (20s); - while (!shared_data_single.done || !shared_data_random.done) - { - ASSERT_NO_ERROR (system.poll ()); - } - - for (auto & thread : threads) - { - thread.join (); - } -} - TEST (node_telemetry, blocking_single_and_random) { nano::system system (2); @@ -676,16 +561,15 @@ TEST (node_telemetry, blocking_single_and_random) node_client->worker.push_task (call_system_poll); // Blocking version of get_random_metrics_async - auto telemetry_data_responses = node_client->telemetry.get_metrics_random_peers (); - ASSERT_FALSE (telemetry_data_responses.is_cached); + auto telemetry_data_responses = node_client->telemetry.get_metrics_peers (); ASSERT_TRUE (telemetry_data_responses.all_received); - compare_default_test_result_data (telemetry_data_responses.data.front (), *node_server); + compare_default_test_result_data (telemetry_data_responses.telemetry_data_time_pairs.begin ()->second.data, *node_server); // Now try single request metric auto telemetry_data_response = node_client->telemetry.get_metrics_single_peer (node_client->network.find_channel (node_server->network.endpoint ())); - ASSERT_FALSE (telemetry_data_response.is_cached); ASSERT_FALSE (telemetry_data_response.error); - compare_default_test_result_data (telemetry_data_response.data, *node_server); + compare_default_test_result_data (telemetry_data_response.telemetry_data_time_pair.data, *node_server); + ASSERT_EQ (telemetry_data_response.telemetry_data_time_pair.last_updated, telemetry_data_responses.telemetry_data_time_pairs.begin ()->second.last_updated); done = true; promise.get_future ().wait (); @@ -710,15 +594,14 @@ TEST (node_telemetry, multiple_single_request_clearing) auto channel = node_client->network.find_channel (node_server->network.endpoint ()); std::atomic done{ false }; - node_client->telemetry.get_metrics_single_peer_async (channel, [&done](nano::telemetry_data_response const & response_a) { + std::chrono::steady_clock::time_point last_updated; + node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &last_updated](nano::telemetry_data_response const & response_a) { ASSERT_FALSE (response_a.error); - ASSERT_FALSE (response_a.is_cached); + last_updated = response_a.telemetry_data_time_pair.last_updated; done = true; }); ASSERT_EQ (1, node_client->telemetry.single_requests.size ()); - auto last_updated = node_client->telemetry.single_requests.begin ()->second.last_updated; - system.deadline_set (10s); while (!done) { @@ -726,11 +609,11 @@ TEST (node_telemetry, multiple_single_request_clearing) } done = false; - // Make another request to keep + // Make another request to keep the time updated system.deadline_set (10s); - node_client->telemetry.get_metrics_single_peer_async (channel, [&done](nano::telemetry_data_response const & response_a) { + node_client->telemetry.get_metrics_single_peer_async (channel, [&done, last_updated](nano::telemetry_data_response const & response_a) { ASSERT_FALSE (response_a.error); - ASSERT_TRUE (response_a.is_cached); + ASSERT_EQ (last_updated, response_a.telemetry_data_time_pair.last_updated); done = true; }); @@ -744,9 +627,10 @@ TEST (node_telemetry, multiple_single_request_clearing) done = false; auto channel1 = node_client->network.find_channel (node_server1->network.endpoint ()); - node_client->telemetry.get_metrics_single_peer_async (channel1, [&done](nano::telemetry_data_response const & response_a) { + node_client->telemetry.get_metrics_single_peer_async (channel1, [&done, &last_updated](nano::telemetry_data_response const & response_a) { ASSERT_FALSE (response_a.error); - ASSERT_FALSE (response_a.is_cached); + ASSERT_NE (last_updated, response_a.telemetry_data_time_pair.last_updated); + last_updated = response_a.telemetry_data_time_pair.last_updated; done = true; }); @@ -758,9 +642,9 @@ TEST (node_telemetry, multiple_single_request_clearing) } done = false; - node_client->telemetry.get_metrics_single_peer_async (channel1, [&done](nano::telemetry_data_response const & response_a) { + node_client->telemetry.get_metrics_single_peer_async (channel1, [&done, last_updated](nano::telemetry_data_response const & response_a) { ASSERT_FALSE (response_a.error); - ASSERT_TRUE (response_a.is_cached); + ASSERT_EQ (last_updated, response_a.telemetry_data_time_pair.last_updated); done = true; }); @@ -791,7 +675,7 @@ TEST (node_telemetry, disconnects) ASSERT_TRUE (channel); std::atomic done{ false }; - node_client->telemetry.get_metrics_random_peers_async ([&done](nano::telemetry_data_responses const & responses_a) { + node_client->telemetry.get_metrics_peers_async ([&done](nano::telemetry_data_responses const & responses_a) { ASSERT_FALSE (responses_a.all_received); done = true; }); @@ -815,6 +699,229 @@ TEST (node_telemetry, disconnects) } } +TEST (node_telemetry, batch_use_single_request_cache) +{ + nano::system system (2); + + auto node_client = system.nodes.front (); + auto node_server = system.nodes.back (); + + wait_peer_connections (system); + + // Request telemetry metrics + nano::telemetry_data_time_pair telemetry_data_time_pair; + { + std::atomic done{ false }; + auto channel = node_client->network.find_channel (node_server->network.endpoint ()); + node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data_time_pair](nano::telemetry_data_response const & response_a) { + telemetry_data_time_pair = response_a.telemetry_data_time_pair; + done = true; + }); + + system.deadline_set (10s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + } + + { + std::atomic done{ false }; + node_client->telemetry.get_metrics_peers_async ([&done, &telemetry_data_time_pair](nano::telemetry_data_responses const & responses_a) { + ASSERT_TRUE (responses_a.all_received); + ASSERT_EQ (telemetry_data_time_pair, responses_a.telemetry_data_time_pairs.begin ()->second); + done = true; + }); + + system.deadline_set (10s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + } + + // Confirm only 1 request was made + ASSERT_EQ (1, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); + ASSERT_EQ (0, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); + ASSERT_EQ (1, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); + ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); + ASSERT_EQ (1, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); + ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); + + // Wait until there is something pending + system.deadline_set (10s); + while (node_client->telemetry.finished_single_requests_size () == 0) + { + ASSERT_NO_ERROR (system.poll ()); + } + + system.deadline_set (10s); + std::atomic done{ false }; + node_client->telemetry.get_metrics_peers_async ([&done, &telemetry_data_time_pair](nano::telemetry_data_responses const & responses_a) { + ASSERT_EQ (1, responses_a.telemetry_data_time_pairs.size ()); + done = true; + }); + + system.deadline_set (10s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + + ASSERT_EQ (0, node_client->telemetry.finished_single_requests_size ()); + ASSERT_EQ (2, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); + ASSERT_EQ (0, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); + ASSERT_EQ (2, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); + ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); + ASSERT_EQ (2, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); + ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); +} + +TEST (node_telemetry, single_request_use_batch_cache) +{ + nano::system system (2); + + auto node_client = system.nodes.front (); + auto node_server = system.nodes.back (); + + wait_peer_connections (system); + + // Request batched metric first + std::unordered_map all_telemetry_data_time_pairs; + { + std::atomic done{ false }; + node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) { + ASSERT_TRUE (responses_a.all_received); + ASSERT_EQ (1, responses_a.telemetry_data_time_pairs.size ()); + all_telemetry_data_time_pairs = responses_a.telemetry_data_time_pairs; + done = true; + }); + + system.deadline_set (10s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + } + + std::atomic done{ false }; + auto channel = node_client->network.find_channel (node_server->network.endpoint ()); + node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &all_telemetry_data_time_pairs](nano::telemetry_data_response const & response_a) { + ASSERT_EQ (all_telemetry_data_time_pairs.begin ()->second, response_a.telemetry_data_time_pair); + ASSERT_FALSE (response_a.error); + done = true; + }); + + system.deadline_set (10s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + + // Confirm only 1 request was made + ASSERT_EQ (1, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); + ASSERT_EQ (0, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); + ASSERT_EQ (1, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); + ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); + ASSERT_EQ (1, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); + ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); +} + +TEST (node_telemetry, dos_tcp) +{ + // Confirm that telemetry_reqs are not processed + nano::system system (2); + + auto node_client = system.nodes.front (); + auto node_server = system.nodes.back (); + + wait_peer_connections (system); + + nano::telemetry_req message; + auto channel = node_client->network.tcp_channels.find_channel (nano::transport::map_endpoint_to_tcp (node_server->network.endpoint ())); + channel->send (message, [](boost::system::error_code const & ec, size_t size_a) { + ASSERT_FALSE (ec); + }); + + system.deadline_set (10s); + while (1 != node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)) + { + ASSERT_NO_ERROR (system.poll ()); + } + + auto orig = std::chrono::steady_clock::now (); + for (int i = 0; i < 10; ++i) + { + channel->send (message, [](boost::system::error_code const & ec, size_t size_a) { + ASSERT_FALSE (ec); + }); + } + + system.deadline_set (10s); + while ((nano::telemetry_cache_cutoffs::test + orig) > std::chrono::steady_clock::now ()) + { + ASSERT_NO_ERROR (system.poll ()); + } + + // Should process no more telemetry_req messages + ASSERT_EQ (1, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); + + // Now spam messages waiting for it to be processed + while (node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in) == 1) + { + channel->send (message); + ASSERT_NO_ERROR (system.poll ()); + } +} + +TEST (node_telemetry, dos_udp) +{ + // Confirm that telemetry_reqs are not processed + nano::system system (2); + + auto node_client = system.nodes.front (); + auto node_server = system.nodes.back (); + + wait_peer_connections (system); + + nano::telemetry_req message; + auto channel (node_server->network.udp_channels.create (node_server->network.endpoint ())); + channel->send (message, [](boost::system::error_code const & ec, size_t size_a) { + ASSERT_FALSE (ec); + }); + + system.deadline_set (20s); + while (1 != node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)) + { + ASSERT_NO_ERROR (system.poll ()); + } + + auto orig = std::chrono::steady_clock::now (); + for (int i = 0; i < 10; ++i) + { + channel->send (message, [](boost::system::error_code const & ec, size_t size_a) { + ASSERT_FALSE (ec); + }); + } + + system.deadline_set (20s); + while ((nano::telemetry_cache_cutoffs::test + orig) > std::chrono::steady_clock::now ()) + { + ASSERT_NO_ERROR (system.poll ()); + } + + // Should process no more telemetry_req messages + ASSERT_EQ (1, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); + + // Now spam messages waiting for it to be processed + system.deadline_set (20s); + while (node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in) == 1) + { + channel->send (message); + ASSERT_NO_ERROR (system.poll ()); + } +} + TEST (node_telemetry, disable_metrics_single) { nano::system system (1); @@ -846,7 +953,7 @@ TEST (node_telemetry, disable_metrics_single) auto channel1 = node_server->network.find_channel (node_client->network.endpoint ()); node_server->telemetry.get_metrics_single_peer_async (channel1, [&done, node_server](nano::telemetry_data_response const & response_a) { ASSERT_FALSE (response_a.error); - compare_default_test_result_data (response_a.data, *node_server); + compare_default_test_result_data (response_a.telemetry_data_time_pair.data, *node_server); done = true; }); @@ -872,7 +979,7 @@ TEST (node_telemetry, disable_metrics_batch) ASSERT_TRUE (channel); std::atomic done{ false }; - node_client->telemetry.get_metrics_random_peers_async ([&done](nano::telemetry_data_responses const & responses_a) { + node_client->telemetry.get_metrics_peers_async ([&done](nano::telemetry_data_responses const & responses_a) { ASSERT_FALSE (responses_a.all_received); done = true; }); @@ -885,9 +992,9 @@ TEST (node_telemetry, disable_metrics_batch) // It should still be able to receive metrics though done = false; - node_server->telemetry.get_metrics_random_peers_async ([&done, node_server](nano::telemetry_data_responses const & responses_a) { + node_server->telemetry.get_metrics_peers_async ([&done, node_server](nano::telemetry_data_responses const & responses_a) { ASSERT_TRUE (responses_a.all_received); - compare_default_test_result_data (responses_a.data.front (), *node_server); + compare_default_test_result_data (responses_a.telemetry_data_time_pairs.begin ()->second.data, *node_server); done = true; }); @@ -930,6 +1037,6 @@ void compare_default_test_result_data (nano::telemetry_data const & telemetry_da ASSERT_EQ (*telemetry_data_a.pre_release_version, nano::get_pre_release_node_version ()); ASSERT_EQ (*telemetry_data_a.maker, 0); ASSERT_LT (telemetry_data_a.uptime, 100); - ASSERT_EQ (telemetry_data_a.genesis_block, nano::genesis ().hash ()); + ASSERT_EQ (telemetry_data_a.genesis_block, node_server_a.network_params.ledger.genesis_hash); } } diff --git a/nano/lib/utility.hpp b/nano/lib/utility.hpp index 37dbca344f..4b66d3feb7 100644 --- a/nano/lib/utility.hpp +++ b/nano/lib/utility.hpp @@ -147,8 +147,21 @@ std::unique_ptr collect_container_info (observer_set +void transform_if (InputIt first, InputIt last, OutputIt dest, Pred pred, Func transform) +{ + while (first != last) + { + if (pred (*first)) + { + *dest++ = transform (*first); + } + + ++first; + } +} } -// Have our own async_write which we must use? void release_assert_internal (bool check, const char * check_expr, const char * file, unsigned int line); #define release_assert(check) release_assert_internal (check, #check, __FILE__, __LINE__) diff --git a/nano/node/bootstrap/bootstrap_server.cpp b/nano/node/bootstrap/bootstrap_server.cpp index 25bcc39332..f3de55dccc 100644 --- a/nano/node/bootstrap/bootstrap_server.cpp +++ b/nano/node/bootstrap/bootstrap_server.cpp @@ -239,10 +239,16 @@ void nano::bootstrap_server::receive_header_action (boost::system::error_code co } case nano::message_type::telemetry_req: { - node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::telemetry_req, nano::stat::dir::in); if (is_realtime_connection ()) { - add_request (std::make_unique (header)); + // Only handle telemetry requests if they are outside of the cutoff time + auto is_very_first_message = last_telemetry_req == std::chrono::steady_clock::time_point{}; + auto cache_exceeded = std::chrono::steady_clock::now () >= last_telemetry_req + nano::telemetry_cache_cutoffs::network_to_time (node->network_params.network); + if (is_very_first_message || cache_exceeded) + { + last_telemetry_req = std::chrono::steady_clock::now (); + add_request (std::make_unique (header)); + } } receive (); break; diff --git a/nano/node/bootstrap/bootstrap_server.hpp b/nano/node/bootstrap/bootstrap_server.hpp index 63dfd9b0f3..9e520955bb 100644 --- a/nano/node/bootstrap/bootstrap_server.hpp +++ b/nano/node/bootstrap/bootstrap_server.hpp @@ -75,5 +75,6 @@ class bootstrap_server final : public std::enable_shared_from_this #include -#include #include #include #include +#include + std::bitset<16> constexpr nano::message_header::block_type_mask; std::bitset<16> constexpr nano::message_header::count_mask; +std::chrono::seconds constexpr nano::telemetry_cache_cutoffs::test; +std::chrono::seconds constexpr nano::telemetry_cache_cutoffs::beta; +std::chrono::seconds constexpr nano::telemetry_cache_cutoffs::live; + namespace { nano::protocol_constants const & get_protocol_constants () @@ -1176,135 +1181,6 @@ bool nano::telemetry_ack::is_empty_payload () const return size () == 0; } -nano::telemetry_data nano::telemetry_data::consolidate (std::vector const & telemetry_data_responses_a) -{ - if (telemetry_data_responses_a.empty ()) - { - return {}; - } - else if (telemetry_data_responses_a.size () == 1) - { - // Only 1 element in the collection, so just return it. - return telemetry_data_responses_a.front (); - } - - nano::uint128_t account_sum{ 0 }; - nano::uint128_t block_sum{ 0 }; - nano::uint128_t cemented_sum{ 0 }; - nano::uint128_t peer_sum{ 0 }; - nano::uint128_t unchecked_sum{ 0 }; - nano::uint128_t uptime_sum{ 0 }; - nano::uint128_t bandwidth_sum{ 0 }; - - std::unordered_map protocol_versions; - std::unordered_map vendor_versions; - std::unordered_map bandwidth_caps; - std::unordered_map genesis_blocks; - - nano::uint128_t account_average{ 0 }; - - for (auto const & telemetry_data : telemetry_data_responses_a) - { - account_sum += telemetry_data.account_count; - block_sum += telemetry_data.block_count; - cemented_sum += telemetry_data.cemented_count; - - std::ostringstream ss; - ss << telemetry_data.major_version; - if (telemetry_data.minor_version.is_initialized ()) - { - ss << "." << *telemetry_data.minor_version; - if (telemetry_data.patch_version.is_initialized ()) - { - ss << "." << *telemetry_data.patch_version; - if (telemetry_data.pre_release_version.is_initialized ()) - { - ss << "." << *telemetry_data.pre_release_version; - if (telemetry_data.maker.is_initialized ()) - { - ss << "." << *telemetry_data.maker; - } - } - } - } - - ++vendor_versions[ss.str ()]; - ++protocol_versions[telemetry_data.protocol_version]; - peer_sum += telemetry_data.peer_count; - - // 0 has a special meaning (unlimited), don't include it in the average as it will be heavily skewed - if (telemetry_data.bandwidth_cap != 0) - { - bandwidth_sum += telemetry_data.bandwidth_cap; - } - ++bandwidth_caps[telemetry_data.bandwidth_cap]; - unchecked_sum += telemetry_data.unchecked_count; - uptime_sum += telemetry_data.uptime; - ++genesis_blocks[telemetry_data.genesis_block]; - } - - nano::telemetry_data consolidated_data; - auto size = telemetry_data_responses_a.size (); - consolidated_data.account_count = boost::numeric_cast (account_sum / size); - consolidated_data.block_count = boost::numeric_cast (block_sum / size); - consolidated_data.cemented_count = boost::numeric_cast (cemented_sum / size); - consolidated_data.peer_count = boost::numeric_cast (peer_sum / size); - consolidated_data.uptime = boost::numeric_cast (uptime_sum / size); - consolidated_data.unchecked_count = boost::numeric_cast (unchecked_sum / size); - - auto set_mode_or_average = [](auto const & collection, auto & var, auto const & sum, size_t size) { - auto max = std::max_element (collection.begin (), collection.end (), [](auto const & lhs, auto const & rhs) { - return lhs.second < rhs.second; - }); - if (max->second > 1) - { - var = max->first; - } - else - { - var = (sum / size).template convert_to> (); - } - }; - - auto set_mode = [](auto const & collection, auto & var, size_t size) { - auto max = std::max_element (collection.begin (), collection.end (), [](auto const & lhs, auto const & rhs) { - return lhs.second < rhs.second; - }); - if (max->second > 1) - { - var = max->first; - } - else - { - // Just pick the first one - var = collection.begin ()->first; - } - }; - - // Use the mode of protocol version, vendor version and bandwidth cap if there is 2 or more using it - set_mode_or_average (bandwidth_caps, consolidated_data.bandwidth_cap, bandwidth_sum, size); - set_mode (protocol_versions, consolidated_data.protocol_version, size); - set_mode (genesis_blocks, consolidated_data.genesis_block, size); - - // Vendor version, needs to be parsed out of the string - std::string version; - set_mode (vendor_versions, version, size); - - // May only have major version, but check for optional parameters as well, only output if all are used - std::vector version_fragments; - boost::split (version_fragments, version, boost::is_any_of (".")); - assert (!version_fragments.empty () && version_fragments.size () <= 5); - consolidated_data.major_version = boost::lexical_cast (version_fragments.front ()); - if (version_fragments.size () == 5) - { - consolidated_data.minor_version = boost::lexical_cast (version_fragments[1]); - consolidated_data.patch_version = boost::lexical_cast (version_fragments[2]); - consolidated_data.pre_release_version = boost::lexical_cast (version_fragments[3]); - consolidated_data.maker = boost::lexical_cast (version_fragments[4]); - } - return consolidated_data; -} - nano::error nano::telemetry_data::serialize_json (nano::jsonconfig & json) const { json.put ("block_count", block_count); @@ -1369,6 +1245,11 @@ bool nano::telemetry_data::operator== (nano::telemetry_data const & data_a) cons return (block_count == data_a.block_count && cemented_count == data_a.cemented_count && unchecked_count == data_a.unchecked_count && account_count == data_a.account_count && bandwidth_cap == data_a.bandwidth_cap && uptime == data_a.uptime && peer_count == data_a.peer_count && protocol_version == data_a.protocol_version && genesis_block == data_a.genesis_block && major_version == data_a.major_version && minor_version == data_a.minor_version && patch_version == data_a.patch_version && pre_release_version == data_a.pre_release_version && maker == data_a.maker); } +bool nano::telemetry_data::operator!= (nano::telemetry_data const & data_a) const +{ + return !(*this == data_a); +} + nano::node_id_handshake::node_id_handshake (bool & error_a, nano::stream & stream_a, nano::message_header const & header_a) : message (header_a), query (boost::none), @@ -1484,6 +1365,22 @@ bool nano::parse_port (std::string const & string_a, uint16_t & port_a) return result; } +// Can handle both ipv4 & ipv6 addresses (with and without square brackets) +bool nano::parse_address (std::string const & address_text_a, boost::asio::ip::address & address_a) +{ + auto result (false); + auto address_text = address_text_a; + if (!address_text.empty () && address_text.front () == '[' && address_text.back () == ']') + { + // Chop the square brackets off as make_address doesn't always like them + address_text = address_text.substr (1, address_text.size () - 2); + } + + boost::system::error_code address_ec; + address_a = boost::asio::ip::make_address (address_text, address_ec); + return !!address_ec; +} + bool nano::parse_address_port (std::string const & string, boost::asio::ip::address & address_a, uint16_t & port_a) { auto result (false); @@ -1550,6 +1447,11 @@ bool nano::parse_tcp_endpoint (std::string const & string, nano::tcp_endpoint & return result; } +std::chrono::seconds nano::telemetry_cache_cutoffs::network_to_time (network_constants const & network_constants) +{ + return std::chrono::seconds{ network_constants.is_live_network () ? live : network_constants.is_beta_network () ? beta : test }; +} + nano::node_singleton_memory_pool_purge_guard::node_singleton_memory_pool_purge_guard () : cleanup_guard ({ nano::block_memory_pool_purge, nano::purge_singleton_pool_memory, nano::purge_singleton_pool_memory }) { diff --git a/nano/node/common.hpp b/nano/node/common.hpp index 1f4f4b4acb..44d037ba4c 100644 --- a/nano/node/common.hpp +++ b/nano/node/common.hpp @@ -14,6 +14,7 @@ namespace nano { using endpoint = boost::asio::ip::udp::endpoint; bool parse_port (std::string const &, uint16_t &); +bool parse_address (std::string const &, boost::asio::ip::address &); bool parse_address_port (std::string const &, boost::asio::ip::address &, uint16_t &); using tcp_endpoint = boost::asio::ip::tcp::endpoint; bool parse_endpoint (std::string const &, nano::endpoint &); @@ -348,10 +349,10 @@ class telemetry_data boost::optional pre_release_version; boost::optional maker; // 0 for NF node - static nano::telemetry_data consolidate (std::vector const & telemetry_data_responses); nano::error serialize_json (nano::jsonconfig & json) const; nano::error deserialize_json (nano::jsonconfig & json); bool operator== (nano::telemetry_data const &) const; + bool operator!= (nano::telemetry_data const &) const; static auto constexpr size_v0 = sizeof (block_count) + sizeof (cemented_count) + sizeof (unchecked_count) + sizeof (account_count) + sizeof (bandwidth_cap) + sizeof (peer_count) + sizeof (protocol_version) + sizeof (uptime) + sizeof (genesis_block) + sizeof (major_version); static auto constexpr size = size_v0 + sizeof (decltype (minor_version)::value_type) + sizeof (decltype (patch_version)::value_type) + sizeof (decltype (pre_release_version)::value_type) + sizeof (decltype (maker)::value_type); @@ -451,6 +452,16 @@ class message_visitor virtual ~message_visitor (); }; +class telemetry_cache_cutoffs +{ +public: + static std::chrono::seconds constexpr test{ 2 }; + static std::chrono::seconds constexpr beta{ 15 }; + static std::chrono::seconds constexpr live{ 60 }; + + static std::chrono::seconds network_to_time (network_constants const & network_constants); +}; + /** Helper guard which contains all the necessary purge (remove all memory even if used) functions */ class node_singleton_memory_pool_purge_guard { diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index eac194bd38..f83799c407 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -3910,12 +3910,11 @@ void nano::json_handler::telemetry () uint16_t port; if (!nano::parse_port (*port_text, port)) { - boost::system::error_code address_ec; - auto address (boost::asio::ip::make_address_v6 (*address_text, address_ec)); - if (!address_ec) + boost::asio::ip::address address; + if (!nano::parse_address (*address_text, address)) { nano::endpoint endpoint (address, port); - channel = node.network.find_channel (endpoint); + channel = node.network.find_channel (nano::transport::map_endpoint_to_v6 (endpoint)); if (!channel) { ec = nano::error_rpc::peer_not_found; @@ -3943,13 +3942,13 @@ void nano::json_handler::telemetry () if (!single_telemetry_metric_a.error) { nano::jsonconfig config_l; - auto err = single_telemetry_metric_a.data.serialize_json (config_l); + auto err = single_telemetry_metric_a.telemetry_data_time_pair.data.serialize_json (config_l); + config_l.put ("timestamp", std::chrono::duration_cast (single_telemetry_metric_a.telemetry_data_time_pair.system_last_updated.time_since_epoch ()).count ()); auto const & ptree = config_l.get_tree (); if (!err) { rpc_l->response_l.insert (rpc_l->response_l.begin (), ptree.begin (), ptree.end ()); - rpc_l->response_l.put ("cached", single_telemetry_metric_a.is_cached); } else { @@ -3975,14 +3974,17 @@ void nano::json_handler::telemetry () // setting "raw" to true returns metrics from all nodes requested. auto raw = request.get_optional ("raw"); auto output_raw = raw.value_or (false); - node.telemetry.get_metrics_random_peers_async ([rpc_l, output_raw](auto const & batched_telemetry_metrics_a) { + node.telemetry.get_metrics_peers_async ([rpc_l, output_raw](auto const & batched_telemetry_metrics_a) { if (output_raw) { boost::property_tree::ptree metrics; - for (auto & telemetry_metrics : batched_telemetry_metrics_a.data) + for (auto & telemetry_metrics : batched_telemetry_metrics_a.telemetry_data_time_pairs) { nano::jsonconfig config_l; - auto err = telemetry_metrics.serialize_json (config_l); + auto err = telemetry_metrics.second.data.serialize_json (config_l); + config_l.put ("timestamp", std::chrono::duration_cast (telemetry_metrics.second.system_last_updated.time_since_epoch ()).count ()); + config_l.put ("address", telemetry_metrics.first.address ()); + config_l.put ("port", telemetry_metrics.first.port ()); if (!err) { metrics.push_back (std::make_pair ("", config_l.get_tree ())); @@ -3998,8 +4000,15 @@ void nano::json_handler::telemetry () else { nano::jsonconfig config_l; - auto average_telemetry_metrics = nano::telemetry_data::consolidate (batched_telemetry_metrics_a.data); - auto err = average_telemetry_metrics.serialize_json (config_l); + std::vector telemetry_data_time_pairs; + telemetry_data_time_pairs.reserve (batched_telemetry_metrics_a.telemetry_data_time_pairs.size ()); + std::transform (batched_telemetry_metrics_a.telemetry_data_time_pairs.begin (), batched_telemetry_metrics_a.telemetry_data_time_pairs.end (), std::back_inserter (telemetry_data_time_pairs), [](auto const & telemetry_data_time_pair_a) { + return telemetry_data_time_pair_a.second; + }); + + auto average_telemetry_metrics = nano::consolidate_telemetry_data_time_pairs (telemetry_data_time_pairs); + auto err = average_telemetry_metrics.data.serialize_json (config_l); + config_l.put ("timestamp", std::chrono::duration_cast (average_telemetry_metrics.system_last_updated.time_since_epoch ()).count ()); auto const & ptree = config_l.get_tree (); if (!err) @@ -4012,7 +4021,6 @@ void nano::json_handler::telemetry () } } - rpc_l->response_l.put ("cached", batched_telemetry_metrics_a.is_cached); rpc_l->response_errors (); }); } diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 2952897a50..7afd39efcf 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -476,7 +476,7 @@ class network_message_visitor : public nano::message_visitor telemetry_ack = nano::telemetry_ack (telemetry_data); } - channel->send (telemetry_ack); + channel->send (telemetry_ack, nullptr, false); } void telemetry_ack (nano::telemetry_ack const & message_a) override { @@ -551,11 +551,11 @@ bool nano::network::reachout (nano::endpoint const & endpoint_a, bool allow_loca return error; } -std::deque> nano::network::list (size_t count_a) +std::deque> nano::network::list (size_t count_a, uint8_t minimum_version_a, bool include_tcp_temporary_channels_a) { std::deque> result; - tcp_channels.list (result); - udp_channels.list (result); + tcp_channels.list (result, minimum_version_a, include_tcp_temporary_channels_a); + udp_channels.list (result, minimum_version_a); nano::random_pool_shuffle (result.begin (), result.end ()); if (result.size () > count_a) { diff --git a/nano/node/network.hpp b/nano/node/network.hpp index 0b506e9c57..e55790e154 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -130,11 +130,12 @@ class network final bool not_a_peer (nano::endpoint const &, bool); // Should we reach out to this endpoint with a keepalive message bool reachout (nano::endpoint const &, bool = false); - std::deque> list (size_t); + std::deque> list (size_t, uint8_t = 0, bool = true); std::deque> list_non_pr (size_t); // Desired fanout for a given scale size_t fanout (float scale = 1.0f) const; void random_fill (std::array &) const; + // Note: The minimum protocol version is used after the random selection, so number of peers can be less than expected. std::unordered_set> random_set (size_t, uint8_t = 0, bool = false) const; // Get the next peer for attempting a tcp bootstrap connection nano::tcp_endpoint bootstrap_peer (bool = false); diff --git a/nano/node/telemetry.cpp b/nano/node/telemetry.cpp index f31fde1f3d..ed65d6640a 100644 --- a/nano/node/telemetry.cpp +++ b/nano/node/telemetry.cpp @@ -5,13 +5,16 @@ #include #include +#include + #include #include #include #include #include +#include -std::chrono::milliseconds constexpr nano::telemetry_impl::cache_cutoff; +std::chrono::seconds constexpr nano::telemetry_impl::alarm_cutoff; nano::telemetry::telemetry (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a) : network (network_a), @@ -19,6 +22,44 @@ alarm (alarm_a), worker (worker_a), batch_request (std::make_shared (network, alarm, worker)) { + // Before callbacks are called with the batch request, check if any of the single request data can be appended to give + batch_request->pre_callback_callback = [this](std::unordered_map & data_a, std::mutex & mutex_a) { + nano::lock_guard guard (this->mutex); + for (auto & single_request : single_requests) + { + nano::lock_guard guard (single_request.second.impl->mutex); + if (!single_request.second.impl->cached_telemetry_data.empty ()) + { + nano::lock_guard batch_request_guard (mutex_a); + auto it = this->batch_request->cached_telemetry_data.find (single_request.first); + if (it != this->batch_request->cached_telemetry_data.cend () && single_request.second.last_updated > it->second.last_updated) + { + it->second = single_request.second.impl->cached_telemetry_data.begin ()->second; + } + else + { + data_a.emplace (single_request.first, single_request.second.impl->cached_telemetry_data.begin ()->second); + } + } + } + + for (auto & pending : finished_single_requests) + { + nano::lock_guard batch_request_guard (mutex_a); + auto it = this->batch_request->cached_telemetry_data.find (pending.first); + if (it != this->batch_request->cached_telemetry_data.cend () && pending.second.last_updated > it->second.last_updated) + { + it->second = pending.second; + } + else + { + data_a.emplace (pending.first, pending.second); + } + } + finished_single_requests.clear (); + }; + + ongoing_req_all_peers (); } void nano::telemetry::stop () @@ -43,28 +84,61 @@ void nano::telemetry::add (nano::telemetry_data const & telemetry_data_a, nano:: } } -void nano::telemetry::get_metrics_random_peers_async (std::function const & callback_a) +void nano::telemetry::ongoing_req_all_peers () +{ + alarm.add (std::chrono::steady_clock::now () + batch_request->cache_cutoff + batch_request->alarm_cutoff, [this, telemetry_impl_w = std::weak_ptr (batch_request)]() { + if (auto batch_telemetry_impl = telemetry_impl_w.lock ()) + { + nano::lock_guard guard (this->mutex); + if (!this->stopped) + { + auto peers = this->network.list (std::numeric_limits::max (), network_params.protocol.telemetry_protocol_version_min, false); + // If exists in single_requests don't request because they will just be rejected by other peers until the next round + auto const & single_requests = this->single_requests; + peers.erase (std::remove_if (peers.begin (), peers.end (), [&single_requests](auto const & channel_a) { + return single_requests.count (channel_a->get_endpoint ()) > 0; + }), + peers.cend ()); + if (!peers.empty ()) + { + batch_telemetry_impl->get_metrics_async (peers, [](nano::telemetry_data_responses const &) { + // Intentionally empty, just using to refresh the cache + }); + } + + this->ongoing_req_all_peers (); + } + } + }); +} + +void nano::telemetry::get_metrics_peers_async (std::function const & callback_a) { - // These peers will only be used if there isn't an already ongoing batch telemetry request round - auto random_peers = network.random_set (network.size_sqrt (), network_params.protocol.telemetry_protocol_version_min, true); + auto peers = network.list (std::numeric_limits::max (), network_params.protocol.telemetry_protocol_version_min, false); nano::lock_guard guard (mutex); - if (!stopped && !random_peers.empty ()) + if (!stopped && !peers.empty ()) { - batch_request->get_metrics_async (random_peers, [callback_a](nano::telemetry_data_responses const & telemetry_data_responses) { + // If exists in single_requests, don't request because they will just be rejected by other nodes, instead all it as additional values + peers.erase (std::remove_if (peers.begin (), peers.end (), [& single_requests = this->single_requests](auto const & channel_a) { + return single_requests.count (channel_a->get_endpoint ()) > 0; + }), + peers.cend ()); + + batch_request->get_metrics_async (peers, [callback_a](nano::telemetry_data_responses const & telemetry_data_responses) { callback_a (telemetry_data_responses); }); } else { const auto all_received = false; - callback_a (nano::telemetry_data_responses{ {}, false, all_received }); + callback_a (nano::telemetry_data_responses{ {}, all_received }); } } -nano::telemetry_data_responses nano::telemetry::get_metrics_random_peers () +nano::telemetry_data_responses nano::telemetry::get_metrics_peers () { std::promise promise; - get_metrics_random_peers_async ([&promise](telemetry_data_responses const & telemetry_data_responses_a) { + get_metrics_peers_async ([&promise](telemetry_data_responses const & telemetry_data_responses_a) { promise.set_value (telemetry_data_responses_a); }); @@ -74,38 +148,27 @@ nano::telemetry_data_responses nano::telemetry::get_metrics_random_peers () // After a request is made to a single peer we want to remove it from the container after the peer has not been requested for a while (cache_cutoff). void nano::telemetry::ongoing_single_request_cleanup (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data const & single_request_data_a) { - // This class is just - class ongoing_func_wrapper - { - public: - std::function ongoing_func; - }; - - auto wrapper = std::make_shared (); - // Keep calling ongoing_func while the peer is still being called - const auto & last_updated = single_request_data_a.last_updated; - wrapper->ongoing_func = [this, telemetry_impl_w = std::weak_ptr (single_request_data_a.impl), &last_updated, &endpoint_a, wrapper]() { + alarm.add (std::chrono::steady_clock::now () + single_request_data_a.impl->cache_cutoff, [this, telemetry_impl_w = std::weak_ptr (single_request_data_a.impl), &single_request_data_a, &endpoint_a]() { if (auto telemetry_impl = telemetry_impl_w.lock ()) { nano::lock_guard guard (this->mutex); - if (std::chrono::steady_clock::now () - telemetry_impl->cache_cutoff > last_updated && telemetry_impl->callbacks.empty ()) + if (std::chrono::steady_clock::now () - telemetry_impl->cache_cutoff > single_request_data_a.last_updated && telemetry_impl->callbacks.empty ()) { + // This will be picked up by the batch request next round + this->finished_single_requests[endpoint_a] = telemetry_impl->cached_telemetry_data.begin ()->second; this->single_requests.erase (endpoint_a); } else { // Request is still active, so call again - this->alarm.add (std::chrono::steady_clock::now () + telemetry_impl->cache_cutoff, wrapper->ongoing_func); + this->ongoing_single_request_cleanup (endpoint_a, single_request_data_a); } } - }; - - alarm.add (std::chrono::steady_clock::now () + single_request_data_a.impl->cache_cutoff, wrapper->ongoing_func); + }); } void nano::telemetry::update_cleanup_data (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data & single_request_data_a, bool is_new_a) { - auto telemetry_impl = single_request_data_a.impl; if (is_new_a) { // Clean this request up when it isn't being used anymore @@ -120,10 +183,16 @@ void nano::telemetry::update_cleanup_data (nano::endpoint const & endpoint_a, na void nano::telemetry::get_metrics_single_peer_async (std::shared_ptr const & channel_a, std::function const & callback_a) { - auto invoke_callback_with_error = [&callback_a]() { - auto const is_cached = false; - auto const error = true; - callback_a ({ nano::telemetry_data (), is_cached, error }); + auto invoke_callback_with_error = [&callback_a, &worker = this->worker, channel_a]() { + nano::endpoint endpoint; + if (channel_a) + { + endpoint = channel_a->get_endpoint (); + } + worker.push_task ([callback_a, endpoint]() { + auto const error = true; + callback_a ({ nano::telemetry_data_time_pair{}, endpoint, error }); + }); }; nano::lock_guard guard (mutex); @@ -131,20 +200,47 @@ void nano::telemetry::get_metrics_single_peer_async (std::shared_ptrget_network_version () >= network_params.protocol.telemetry_protocol_version_min)) { + auto add_callback_async = [& worker = this->worker, &callback_a](telemetry_data_time_pair const & telemetry_data_time_pair_a, nano::endpoint const & endpoint_a) { + telemetry_data_response telemetry_data_response_l{ telemetry_data_time_pair_a, endpoint_a, false }; + worker.push_task ([telemetry_data_response_l, callback_a]() { + callback_a (telemetry_data_response_l); + }); + }; + + // First check if the batched metrics have processed this endpoint. + { + nano::lock_guard guard (batch_request->mutex); + auto it = batch_request->cached_telemetry_data.find (channel_a->get_endpoint ()); + if (it != batch_request->cached_telemetry_data.cend ()) + { + add_callback_async (it->second, it->first); + return; + } + } + // Next check single requests which finished and are awaiting batched requests + auto it = finished_single_requests.find (channel_a->get_endpoint ()); + if (it != finished_single_requests.cend ()) + { + add_callback_async (it->second, it->first); + return; + } + auto pair = single_requests.emplace (channel_a->get_endpoint (), single_request_data{ std::make_shared (network, alarm, worker), std::chrono::steady_clock::now () }); - update_cleanup_data (pair.first->first, pair.first->second, pair.second); + auto & single_request_data_it = pair.first; + update_cleanup_data (single_request_data_it->first, single_request_data_it->second, pair.second); - pair.first->second.impl->get_metrics_async ({ channel_a }, [callback_a](telemetry_data_responses const & telemetry_data_responses_a) { + single_request_data_it->second.impl->get_metrics_async ({ channel_a }, [callback_a, channel_a](telemetry_data_responses const & telemetry_data_responses_a) { // There should only be 1 response, so if this hasn't been received then conclude it is an error. auto const error = !telemetry_data_responses_a.all_received; if (!error) { - assert (telemetry_data_responses_a.data.size () == 1); - callback_a ({ telemetry_data_responses_a.data.front (), telemetry_data_responses_a.is_cached, error }); + assert (telemetry_data_responses_a.telemetry_data_time_pairs.size () == 1); + auto it = telemetry_data_responses_a.telemetry_data_time_pairs.begin (); + callback_a ({ it->second, it->first, error }); } else { - callback_a ({ nano::telemetry_data (), telemetry_data_responses_a.is_cached, error }); + callback_a ({ nano::telemetry_data_time_pair{}, channel_a->get_endpoint (), error }); } }); } @@ -183,6 +279,12 @@ size_t nano::telemetry::telemetry_data_size () return total; } +size_t nano::telemetry::finished_single_requests_size () +{ + nano::lock_guard guard (mutex); + return finished_single_requests.size (); +} + nano::telemetry_impl::telemetry_impl (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a) : network (network_a), alarm (alarm_a), @@ -190,25 +292,30 @@ worker (worker_a) { } -void nano::telemetry_impl::flush_callbacks (nano::unique_lock & lk_a, bool cached_a) +void nano::telemetry_impl::flush_callbacks_async () { - // Invoke all callbacks, it's possible that during the mutex unlock other callbacks were added, - // so check again and invoke those too - assert (lk_a.owns_lock ()); - invoking = true; - while (!callbacks.empty ()) - { - lk_a.unlock (); - invoke_callbacks (cached_a); - lk_a.lock (); - } - invoking = false; + // Post to worker so that it's truly async and not on the calling thread (same problem as std::async otherwise) + worker.push_task ([this_w = std::weak_ptr (shared_from_this ())]() { + if (auto this_l = this_w.lock ()) + { + nano::unique_lock lk (this_l->mutex); + // Invoke all callbacks, it's possible that during the mutex unlock other callbacks were added, + // so check again and invoke those too + this_l->invoking = true; + while (!this_l->callbacks.empty ()) + { + lk.unlock (); + this_l->invoke_callbacks (); + lk.lock (); + } + this_l->invoking = false; + } + }); } -void nano::telemetry_impl::get_metrics_async (std::unordered_set> const & channels_a, std::function const & callback_a) +void nano::telemetry_impl::get_metrics_async (std::deque> const & channels_a, std::function const & callback_a) { { - assert (!channels_a.empty ()); nano::unique_lock lk (mutex); callbacks.push_back (callback_a); if (callbacks.size () > 1 || invoking) @@ -218,21 +325,13 @@ void nano::telemetry_impl::get_metrics_async (std::unordered_set (shared_from_this ())]() { - if (auto this_l = this_w.lock ()) - { - nano::unique_lock lk (this_l->mutex); - const auto is_cached = true; - this_l->flush_callbacks (lk, is_cached); - } - }); + flush_callbacks_async (); return; } - all_received = true; + failed.clear (); assert (required_responses.empty ()); std::transform (channels_a.begin (), channels_a.end (), std::inserter (required_responses, required_responses.end ()), [](auto const & channel) { return channel->get_endpoint (); @@ -253,14 +352,15 @@ void nano::telemetry_impl::add (nano::telemetry_data const & telemetry_data_a, n if (!is_empty_a) { - current_telemetry_data_responses.push_back (telemetry_data_a); + current_telemetry_data_responses[endpoint_a] = { telemetry_data_a, std::chrono::steady_clock::now (), std::chrono::system_clock::now () }; } channel_processed (lk, endpoint_a); } -void nano::telemetry_impl::invoke_callbacks (bool cached_a) +void nano::telemetry_impl::invoke_callbacks () { decltype (callbacks) callbacks_l; + bool all_received; decltype (cached_telemetry_data) cached_telemetry_data_l; { // Copy callbacks so that they can be called outside of holding the lock @@ -269,12 +369,18 @@ void nano::telemetry_impl::invoke_callbacks (bool cached_a) cached_telemetry_data_l = cached_telemetry_data; current_telemetry_data_responses.clear (); callbacks.clear (); + all_received = failed.empty (); + } + + if (pre_callback_callback) + { + pre_callback_callback (cached_telemetry_data_l, mutex); } // Need to account for nodes which disable telemetry data in responses bool all_received_l = !cached_telemetry_data_l.empty () && all_received; for (auto & callback : callbacks_l) { - callback ({ cached_telemetry_data_l, cached_a, all_received_l }); + callback ({ cached_telemetry_data_l, all_received_l }); } } @@ -288,12 +394,11 @@ void nano::telemetry_impl::channel_processed (nano::unique_lock & lk cached_telemetry_data = current_telemetry_data_responses; last_time = std::chrono::steady_clock::now (); - auto const is_cached = false; - flush_callbacks (lk_a, is_cached); + flush_callbacks_async (); } } -void nano::telemetry_impl::fire_request_messages (std::unordered_set> const & channels) +void nano::telemetry_impl::fire_request_messages (std::deque> const & channels) { uint64_t round_l; { @@ -309,6 +414,7 @@ void nano::telemetry_impl::fire_request_messages (std::unordered_setget_network_version () >= network_params.protocol.telemetry_protocol_version_min); std::weak_ptr this_w (shared_from_this ()); + // clang-format off channel->send (message, [this_w, endpoint = channel->get_endpoint ()](boost::system::error_code const & ec, size_t size_a) { if (auto this_l = this_w.lock ()) { @@ -316,20 +422,22 @@ void nano::telemetry_impl::fire_request_messages (std::unordered_set lk (this_l->mutex); - this_l->all_received = false; + this_l->failed.push_back (endpoint); this_l->channel_processed (lk, endpoint); } } - }); + }, + false); + // clang-format on // If no response is seen after a certain period of time, remove it from the list of expected responses. However, only if it is part of the same round. - alarm.add (std::chrono::steady_clock::now () + cache_cutoff, [this_w, endpoint = channel->get_endpoint (), round_l]() { + alarm.add (std::chrono::steady_clock::now () + alarm_cutoff, [this_w, endpoint = channel->get_endpoint (), round_l]() { if (auto this_l = this_w.lock ()) { nano::unique_lock lk (this_l->mutex); if (this_l->round == round_l && this_l->required_responses.find (endpoint) != this_l->required_responses.cend ()) { - this_l->all_received = false; + this_l->failed.push_back (endpoint); this_l->channel_processed (lk, endpoint); } } @@ -343,6 +451,16 @@ size_t nano::telemetry_impl::telemetry_data_size () return current_telemetry_data_responses.size (); } +bool nano::telemetry_data_time_pair::operator== (telemetry_data_time_pair const & telemetry_data_time_pair_a) const +{ + return data == telemetry_data_time_pair_a.data && last_updated == telemetry_data_time_pair_a.last_updated; +} + +bool nano::telemetry_data_time_pair::operator!= (telemetry_data_time_pair const & telemetry_data_time_pair_a) const +{ + return !(*this == telemetry_data_time_pair_a); +} + std::unique_ptr nano::collect_container_info (telemetry & telemetry, const std::string & name) { size_t single_requests_count; @@ -357,6 +475,7 @@ std::unique_ptr nano::collect_container_info (te composite->add_component (collect_container_info (*telemetry.batch_request, "batch_request")); } composite->add_component (std::make_unique (container_info{ "single_requests", single_requests_count, sizeof (decltype (telemetry.single_requests)::value_type) })); + composite->add_component (std::make_unique (container_info{ "finished_single_requests", telemetry.finished_single_requests_size (), sizeof (decltype (telemetry.finished_single_requests)::value_type) })); return composite; } @@ -381,3 +500,172 @@ std::unique_ptr nano::collect_container_info (te composite->add_component (std::make_unique (container_info{ "required_responses", required_responses_count, sizeof (decltype (telemetry_impl.required_responses)::value_type) })); return composite; } + +nano::telemetry_data nano::consolidate_telemetry_data (std::vector const & telemetry_datas) +{ + std::vector telemetry_data_time_pairs; + telemetry_data_time_pairs.reserve (telemetry_datas.size ()); + + std::transform (telemetry_datas.begin (), telemetry_datas.end (), std::back_inserter (telemetry_data_time_pairs), [](nano::telemetry_data const & telemetry_data_a) { + // Don't care about the timestamps here + return nano::telemetry_data_time_pair{ telemetry_data_a, {}, {} }; + }); + + return consolidate_telemetry_data_time_pairs (telemetry_data_time_pairs).data; +} + +nano::telemetry_data_time_pair nano::consolidate_telemetry_data_time_pairs (std::vector const & telemetry_data_time_pairs_a) +{ + if (telemetry_data_time_pairs_a.empty ()) + { + return {}; + } + else if (telemetry_data_time_pairs_a.size () == 1) + { + // Only 1 element in the collection, so just return it. + return telemetry_data_time_pairs_a.front (); + } + + std::unordered_map protocol_versions; + std::unordered_map vendor_versions; + std::unordered_map bandwidth_caps; + std::unordered_map genesis_blocks; + + // Use a trimmed average which excludes the upper and lower 10% of the results + std::multiset account_counts; + std::multiset block_counts; + std::multiset cemented_counts; + std::multiset peer_counts; + std::multiset unchecked_counts; + std::multiset uptime_counts; + std::multiset bandwidth_counts; + std::multiset timestamp_counts; + + for (auto const & telemetry_data_time_pair : telemetry_data_time_pairs_a) + { + auto & telemetry_data = telemetry_data_time_pair.data; + account_counts.insert (telemetry_data.account_count); + block_counts.insert (telemetry_data.block_count); + cemented_counts.insert (telemetry_data.cemented_count); + + std::ostringstream ss; + ss << telemetry_data.major_version; + if (telemetry_data.minor_version.is_initialized ()) + { + ss << "." << *telemetry_data.minor_version; + if (telemetry_data.patch_version.is_initialized ()) + { + ss << "." << *telemetry_data.patch_version; + if (telemetry_data.pre_release_version.is_initialized ()) + { + ss << "." << *telemetry_data.pre_release_version; + if (telemetry_data.maker.is_initialized ()) + { + ss << "." << *telemetry_data.maker; + } + } + } + } + + ++vendor_versions[ss.str ()]; + ++protocol_versions[telemetry_data.protocol_version]; + peer_counts.insert (telemetry_data.peer_count); + unchecked_counts.insert (telemetry_data.unchecked_count); + uptime_counts.insert (telemetry_data.uptime); + // 0 has a special meaning (unlimited), don't include it in the average as it will be heavily skewed + if (telemetry_data.bandwidth_cap != 0) + { + bandwidth_counts.insert (telemetry_data.bandwidth_cap); + } + + ++bandwidth_caps[telemetry_data.bandwidth_cap]; + ++genesis_blocks[telemetry_data.genesis_block]; + + timestamp_counts.insert (std::chrono::time_point_cast (telemetry_data_time_pair.system_last_updated).time_since_epoch ().count ()); + } + + // Remove 10% of the results from the lower and upper bounds to catch any outliers. Need at least 10 responses before any are removed. + auto num_either_side_to_remove = telemetry_data_time_pairs_a.size () / 10; + + auto strip_outliers_and_sum = [num_either_side_to_remove](auto & counts) { + counts.erase (counts.begin (), std::next (counts.begin (), num_either_side_to_remove)); + counts.erase (std::next (counts.rbegin (), num_either_side_to_remove).base (), counts.end ()); + return std::accumulate (counts.begin (), counts.end (), nano::uint128_t (0), [](nano::uint128_t total, auto count) { + return total += count; + }); + }; + + auto account_sum = strip_outliers_and_sum (account_counts); + auto block_sum = strip_outliers_and_sum (block_counts); + auto cemented_sum = strip_outliers_and_sum (cemented_counts); + auto peer_sum = strip_outliers_and_sum (peer_counts); + auto unchecked_sum = strip_outliers_and_sum (unchecked_counts); + auto uptime_sum = strip_outliers_and_sum (uptime_counts); + auto bandwidth_sum = strip_outliers_and_sum (bandwidth_counts); + + nano::telemetry_data consolidated_data; + auto size = telemetry_data_time_pairs_a.size () - num_either_side_to_remove * 2; + consolidated_data.account_count = boost::numeric_cast (account_sum / size); + consolidated_data.block_count = boost::numeric_cast (block_sum / size); + consolidated_data.cemented_count = boost::numeric_cast (cemented_sum / size); + consolidated_data.peer_count = boost::numeric_cast (peer_sum / size); + consolidated_data.uptime = boost::numeric_cast (uptime_sum / size); + consolidated_data.unchecked_count = boost::numeric_cast (unchecked_sum / size); + + auto set_mode_or_average = [](auto const & collection, auto & var, auto const & sum, size_t size) { + auto max = std::max_element (collection.begin (), collection.end (), [](auto const & lhs, auto const & rhs) { + return lhs.second < rhs.second; + }); + if (max->second > 1) + { + var = max->first; + } + else + { + var = (sum / size).template convert_to> (); + } + }; + + auto set_mode = [](auto const & collection, auto & var, size_t size) { + auto max = std::max_element (collection.begin (), collection.end (), [](auto const & lhs, auto const & rhs) { + return lhs.second < rhs.second; + }); + if (max->second > 1) + { + var = max->first; + } + else + { + // Just pick the first one + var = collection.begin ()->first; + } + }; + + // Use the mode of protocol version and vendor version. Also use it for bandwidth cap if there is 2 or more of the same cap. + set_mode_or_average (bandwidth_caps, consolidated_data.bandwidth_cap, bandwidth_sum, size); + set_mode (protocol_versions, consolidated_data.protocol_version, size); + set_mode (genesis_blocks, consolidated_data.genesis_block, size); + + // Vendor version, needs to be parsed out of the string + std::string version; + set_mode (vendor_versions, version, size); + + // May only have major version, but check for optional parameters as well, only output if all are used + std::vector version_fragments; + boost::split (version_fragments, version, boost::is_any_of (".")); + assert (!version_fragments.empty () && version_fragments.size () <= 5); + consolidated_data.major_version = boost::lexical_cast (version_fragments.front ()); + if (version_fragments.size () == 5) + { + consolidated_data.minor_version = boost::lexical_cast (version_fragments[1]); + consolidated_data.patch_version = boost::lexical_cast (version_fragments[2]); + consolidated_data.pre_release_version = boost::lexical_cast (version_fragments[3]); + consolidated_data.maker = boost::lexical_cast (version_fragments[4]); + } + + // Consolidate timestamps + auto timestamp_sum = strip_outliers_and_sum (timestamp_counts); + auto consolidated_timestamp = boost::numeric_cast (timestamp_sum / size); + + return telemetry_data_time_pair{ consolidated_data, std::chrono::steady_clock::time_point{}, std::chrono::system_clock::time_point (std::chrono::milliseconds (consolidated_timestamp)) }; +} \ No newline at end of file diff --git a/nano/node/telemetry.hpp b/nano/node/telemetry.hpp index f1690b5770..e82d8c18be 100644 --- a/nano/node/telemetry.hpp +++ b/nano/node/telemetry.hpp @@ -18,15 +18,25 @@ namespace transport class channel; } +class telemetry_data_time_pair +{ +public: + nano::telemetry_data data; + std::chrono::steady_clock::time_point last_updated; + std::chrono::system_clock::time_point system_last_updated; + bool operator== (telemetry_data_time_pair const &) const; + bool operator!= (telemetry_data_time_pair const &) const; +}; + /* * Holds a response from a telemetry request */ class telemetry_data_response { public: - nano::telemetry_data data; - bool is_cached; - bool error; + nano::telemetry_data_time_pair telemetry_data_time_pair; + nano::endpoint endpoint; + bool error{ true }; }; /* @@ -35,9 +45,8 @@ class telemetry_data_response class telemetry_data_responses { public: - std::vector data; - bool is_cached; - bool all_received; + std::unordered_map telemetry_data_time_pairs; + bool all_received{ false }; }; /* @@ -52,46 +61,58 @@ class telemetry_impl : public std::enable_shared_from_this private: // Class only available to the telemetry class - void get_metrics_async (std::unordered_set> const & channels_a, std::function const & callback_a); + void get_metrics_async (std::deque> const & channels_a, std::function const & callback_a); void add (nano::telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a, bool is_empty_a); size_t telemetry_data_size (); nano::network_params network_params; - // Anything older than this requires requesting metrics from other nodes - static std::chrono::milliseconds constexpr cache_cutoff{ 3000 }; + // Anything older than this requires requesting metrics from other nodes. + std::chrono::seconds const cache_cutoff{ nano::telemetry_cache_cutoffs::network_to_time (network_params.network) }; + static std::chrono::seconds constexpr alarm_cutoff{ 3 }; // All data in this chunk is protected by this mutex std::mutex mutex; std::vector> callbacks; std::chrono::steady_clock::time_point last_time = std::chrono::steady_clock::now () - cache_cutoff; /* The responses received during this request round */ - std::vector current_telemetry_data_responses; + std::unordered_map current_telemetry_data_responses; /* The metrics for the last request round */ - std::vector cached_telemetry_data; + std::unordered_map cached_telemetry_data; std::unordered_set required_responses; uint64_t round{ 0 }; /* Currently executing callbacks */ bool invoking{ false }; - - std::atomic all_received{ true }; + std::vector failed; nano::network & network; nano::alarm & alarm; nano::worker & worker; - void invoke_callbacks (bool cached_a); + std::function & data_a, std::mutex &)> pre_callback_callback; + + void invoke_callbacks (); void channel_processed (nano::unique_lock & lk_a, nano::endpoint const & endpoint_a); - void flush_callbacks (nano::unique_lock & lk_a, bool cached_a); - void fire_request_messages (std::unordered_set> const & channels); + void flush_callbacks_async (); + void fire_request_messages (std::deque> const & channels); friend std::unique_ptr collect_container_info (telemetry_impl &, const std::string &); friend nano::telemetry; friend class node_telemetry_single_request_Test; friend class node_telemetry_basic_Test; + friend class node_telemetry_ongoing_requests_Test; }; std::unique_ptr collect_container_info (telemetry_impl & telemetry_impl, const std::string & name); +/* + * This class has 2 main operations: + * Request metrics from specific single peers (single_requests) + * - If this peer is in the batched request, it will use the value from that, otherwise send a telemetry_req message (non-droppable) + * Request metrics from all peers (batched_request) + * - This is polled every minute. + * - If a single request is currently underway, do not request because other peers will just reject if within a hotzone time + * - This will be proactively added when callbacks are called inside pre_callback_callback + */ class telemetry { public: @@ -105,14 +126,14 @@ class telemetry void add (nano::telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a, bool is_empty_a); /* - * Collects metrics from square root number of peers and invokes the callback when complete. + * Collects metrics from all known peers and invokes the callback when complete. */ - void get_metrics_random_peers_async (std::function const & callback_a); + void get_metrics_peers_async (std::function const & callback_a); /* - * A blocking version of get_metrics_random_peers_async (). + * A blocking version of get_metrics_peers_async (). */ - telemetry_data_responses get_metrics_random_peers (); + telemetry_data_responses get_metrics_peers (); /* * This makes a telemetry request to the specific channel @@ -129,6 +150,11 @@ class telemetry */ size_t telemetry_data_size (); + /* + * Return the number of finished_single_requests elements + */ + size_t finished_single_requests_size (); + /* * Stop the telemetry processor */ @@ -153,14 +179,20 @@ class telemetry std::shared_ptr batch_request; /* Any requests to specific individual peers is maintained here */ std::unordered_map single_requests; + /* This holds data from single_requests after the cache is removed */ + std::unordered_map finished_single_requests; bool stopped{ false }; void update_cleanup_data (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data & single_request_data_a, bool is_new_a); void ongoing_single_request_cleanup (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data const & single_request_data_a); + void ongoing_req_all_peers (); friend class node_telemetry_multiple_single_request_clearing_Test; friend std::unique_ptr collect_container_info (telemetry &, const std::string &); }; std::unique_ptr collect_container_info (telemetry & telemetry, const std::string & name); + +nano::telemetry_data consolidate_telemetry_data (std::vector const & telemetry_data); +nano::telemetry_data_time_pair consolidate_telemetry_data_time_pairs (std::vector const & telemetry_data_time_pairs); } \ No newline at end of file diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 434537960f..171433b2e3 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -449,13 +449,14 @@ void nano::transport::tcp_channels::ongoing_keepalive () }); } -void nano::transport::tcp_channels::list (std::deque> & deque_a) +void nano::transport::tcp_channels::list (std::deque> & deque_a, uint8_t minimum_version_a, bool include_temporary_channels_a) { nano::lock_guard lock (mutex); - for (auto const & channel : channels.get ()) - { - deque_a.push_back (channel.channel); - } + // clang-format off + nano::transform_if (channels.get ().begin (), channels.get ().end (), std::back_inserter (deque_a), + [include_temporary_channels_a, minimum_version_a](auto & channel_a) { return channel_a.channel->get_network_version () >= minimum_version_a && (include_temporary_channels_a || !channel_a.channel->temporary); }, + [](const auto & channel) { return channel.channel; }); + // clang-format on } void nano::transport::tcp_channels::modify (std::shared_ptr channel_a, std::function)> modify_callback_a) diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index 03578e7ca5..b733ae0393 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -78,6 +78,7 @@ namespace transport class tcp_channels final { friend class nano::transport::channel_tcp; + friend class node_telemetry_simultaneous_single_and_random_requests_Test; public: tcp_channels (nano::node &); @@ -102,7 +103,7 @@ namespace transport std::unique_ptr collect_container_info (std::string const &); void purge (std::chrono::steady_clock::time_point const &); void ongoing_keepalive (); - void list (std::deque> &); + void list (std::deque> &, uint8_t = 0, bool = true); void modify (std::shared_ptr, std::function)>); void update (nano::tcp_endpoint const &); // Connection start diff --git a/nano/node/transport/udp.cpp b/nano/node/transport/udp.cpp index 20b382b747..5ef34c6544 100644 --- a/nano/node/transport/udp.cpp +++ b/nano/node/transport/udp.cpp @@ -434,7 +434,25 @@ class udp_message_visitor : public nano::message_visitor } void telemetry_req (nano::telemetry_req const & message_a) override { - message (message_a); + auto find_channel (node.network.udp_channels.channel (endpoint)); + if (find_channel) + { + auto is_very_first_message = find_channel->get_last_telemetry_req () == std::chrono::steady_clock::time_point{}; + auto cache_exceeded = std::chrono::steady_clock::now () >= find_channel->get_last_telemetry_req () + nano::telemetry_cache_cutoffs::network_to_time (node.network_params.network); + if (is_very_first_message || cache_exceeded) + { + node.network.udp_channels.modify (find_channel, [](std::shared_ptr channel_a) { + channel_a->set_last_telemetry_req (std::chrono::steady_clock::now ()); + }); + message (message_a); + } + else + { + node.network.udp_channels.modify (find_channel, [](std::shared_ptr channel_a) { + channel_a->set_last_packet_received (std::chrono::steady_clock::now ()); + }); + } + } } void telemetry_ack (nano::telemetry_ack const & message_a) override { @@ -688,13 +706,14 @@ void nano::transport::udp_channels::ongoing_keepalive () }); } -void nano::transport::udp_channels::list (std::deque> & deque_a) +void nano::transport::udp_channels::list (std::deque> & deque_a, uint8_t minimum_version_a) { nano::lock_guard lock (mutex); - for (auto const & channel : channels.get ()) - { - deque_a.push_back (channel.channel); - } + // clang-format off + nano::transform_if (channels.begin (), channels.end (), std::back_inserter (deque_a), + [minimum_version_a](auto & channel_a) { return channel_a.channel->get_network_version () >= minimum_version_a; }, + [](const auto & channel) { return channel.channel; }); + // clang-format on } void nano::transport::udp_channels::modify (std::shared_ptr channel_a, std::function)> modify_callback_a) diff --git a/nano/node/transport/udp.hpp b/nano/node/transport/udp.hpp index 9945546d0c..27da1f64a1 100644 --- a/nano/node/transport/udp.hpp +++ b/nano/node/transport/udp.hpp @@ -54,9 +54,22 @@ namespace transport return nano::transport::transport_type::udp; } + std::chrono::steady_clock::time_point get_last_telemetry_req () + { + nano::lock_guard lk (channel_mutex); + return last_telemetry_req; + } + + void set_last_telemetry_req (std::chrono::steady_clock::time_point const time_a) + { + nano::lock_guard lk (channel_mutex); + last_telemetry_req = time_a; + } + private: nano::endpoint endpoint; nano::transport::udp_channels & channels; + std::chrono::steady_clock::time_point last_telemetry_req{ std::chrono::steady_clock::time_point () }; }; class udp_channels final { @@ -90,7 +103,7 @@ namespace transport std::unique_ptr collect_container_info (std::string const &); void purge (std::chrono::steady_clock::time_point const &); void ongoing_keepalive (); - void list (std::deque> &); + void list (std::deque> &, uint8_t = 0); void modify (std::shared_ptr, std::function)>); nano::node & node; @@ -134,6 +147,10 @@ namespace transport { return channel->get_last_bootstrap_attempt (); } + std::chrono::steady_clock::time_point last_telemetry_req () const + { + return channel->get_last_telemetry_req (); + } boost::asio::ip::address ip_address () const { return endpoint ().address (); diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index 96ef64e654..b45e266876 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -7825,7 +7825,6 @@ namespace void compare_default_test_result_data (test_response & response, nano::node const & node_server_a) { ASSERT_EQ (200, response.status); - ASSERT_FALSE (response.json.get ("cached")); ASSERT_EQ (1, response.json.get ("block_count")); ASSERT_EQ (1, response.json.get ("cemented_count")); ASSERT_EQ (0, response.json.get ("unchecked_count")); @@ -7834,12 +7833,13 @@ void compare_default_test_result_data (test_response & response, nano::node cons ASSERT_EQ (1, response.json.get ("peer_count")); ASSERT_EQ (node_server_a.network_params.protocol.protocol_version, response.json.get ("protocol_version")); ASSERT_GE (100, response.json.get ("uptime")); - ASSERT_EQ (nano::genesis ().hash ().to_string (), response.json.get ("genesis_block")); + ASSERT_EQ (node_server_a.network_params.ledger.genesis_hash.to_string (), response.json.get ("genesis_block")); ASSERT_EQ (nano::get_major_node_version (), response.json.get ("major_version")); ASSERT_EQ (nano::get_minor_node_version (), response.json.get ("minor_version")); ASSERT_EQ (nano::get_patch_node_version (), response.json.get ("patch_version")); ASSERT_EQ (nano::get_pre_release_node_version (), response.json.get ("pre_release_version")); ASSERT_EQ (0, response.json.get ("maker")); + ASSERT_GE (std::chrono::duration_cast (std::chrono::system_clock::now ().time_since_epoch ()).count (), response.json.get ("timestamp")); } } @@ -7941,7 +7941,7 @@ TEST (rpc, node_telemetry_single) } } -TEST (rpc, node_telemetry_random) +TEST (rpc, node_telemetry_all) { nano::system system (1); auto & node1 = *add_ipc_enabled_node (system); @@ -7974,22 +7974,7 @@ TEST (rpc, node_telemetry_random) { ASSERT_NO_ERROR (system.poll ()); } - ASSERT_EQ (200, response.status); - ASSERT_FALSE (response.json.get ("cached")); - ASSERT_EQ (1, response.json.get ("block_count")); - ASSERT_EQ (1, response.json.get ("cemented_count")); - ASSERT_EQ (0, response.json.get ("unchecked_count")); - ASSERT_EQ (1, response.json.get ("account_count")); - ASSERT_EQ (node->config.bandwidth_limit, response.json.get ("bandwidth_cap")); - ASSERT_EQ (1, response.json.get ("peer_count")); - ASSERT_EQ (node->network_params.protocol.protocol_version, response.json.get ("protocol_version")); - ASSERT_GE (100, response.json.get ("uptime")); - ASSERT_EQ (nano::genesis ().hash ().to_string (), response.json.get ("genesis_block")); - ASSERT_EQ (nano::get_major_node_version (), response.json.get ("major_version")); - ASSERT_EQ (nano::get_minor_node_version (), response.json.get ("minor_version")); - ASSERT_EQ (nano::get_patch_node_version (), response.json.get ("patch_version")); - ASSERT_EQ (nano::get_pre_release_node_version (), response.json.get ("pre_release_version")); - ASSERT_EQ (0, response.json.get ("maker")); + compare_default_test_result_data (response, *node); } request.put ("raw", "true"); @@ -8002,30 +7987,54 @@ TEST (rpc, node_telemetry_random) ASSERT_EQ (200, response.status); // This may fail if the response has taken longer than the cache cutoff time. - ASSERT_TRUE (response.json.get ("cached")); - auto & all_metrics = response.json.get_child ("metrics"); - std::vector> raw_metrics_json_l; + + class telemetry_response_data + { + public: + uint64_t block_count; + uint64_t cemented_count; + uint64_t unchecked_count; + uint64_t account_count; + uint64_t bandwidth_cap; + uint32_t peer_count; + uint8_t protocol_version; + uint64_t uptime; + std::string genesis_block; + uint8_t major_version; + uint8_t minor_version; + uint8_t patch_version; + uint8_t pre_release_version; + uint8_t maker; + uint64_t timestamp; + std::string address; + uint16_t port; + }; + + std::vector raw_metrics_json_l; for (auto & metrics_pair : all_metrics) { auto & metrics = metrics_pair.second; - raw_metrics_json_l.emplace_back (metrics.get ("block_count"), metrics.get ("cemented_count"), metrics.get ("unchecked_count"), metrics.get ("account_count"), metrics.get ("bandwidth_cap"), metrics.get ("peer_count"), metrics.get ("protocol_version"), metrics.get ("uptime"), metrics.get ("genesis_block"), metrics.get ("major_version"), metrics.get ("minor_version"), metrics.get ("patch_version"), metrics.get ("pre_release_version"), metrics.get ("maker")); + raw_metrics_json_l.push_back ({ metrics.get ("block_count"), metrics.get ("cemented_count"), metrics.get ("unchecked_count"), metrics.get ("account_count"), metrics.get ("bandwidth_cap"), metrics.get ("peer_count"), metrics.get ("protocol_version"), metrics.get ("uptime"), metrics.get ("genesis_block"), metrics.get ("major_version"), metrics.get ("minor_version"), metrics.get ("patch_version"), metrics.get ("pre_release_version"), metrics.get ("maker"), metrics.get ("timestamp"), metrics.get ("address"), metrics.get ("port") }); } ASSERT_EQ (1, raw_metrics_json_l.size ()); auto const & metrics = raw_metrics_json_l.front (); - ASSERT_EQ (1, std::get<0> (metrics)); - ASSERT_EQ (1, std::get<1> (metrics)); - ASSERT_EQ (0, std::get<2> (metrics)); - ASSERT_EQ (1, std::get<3> (metrics)); - ASSERT_EQ (node->config.bandwidth_limit, std::get<4> (metrics)); - ASSERT_EQ (1, std::get<5> (metrics)); - ASSERT_EQ (node->network_params.protocol.protocol_version, std::get<6> (metrics)); - ASSERT_GE (100, std::get<7> (metrics)); - ASSERT_EQ (nano::genesis ().hash ().to_string (), std::get<8> (metrics)); - ASSERT_EQ (nano::get_major_node_version (), std::get<9> (metrics)); - ASSERT_EQ (nano::get_minor_node_version (), std::get<10> (metrics)); - ASSERT_EQ (nano::get_patch_node_version (), std::get<11> (metrics)); - ASSERT_EQ (nano::get_pre_release_node_version (), std::get<12> (metrics)); - ASSERT_EQ (0, std::get<13> (metrics)); + ASSERT_EQ (1, metrics.block_count); + ASSERT_EQ (1, metrics.cemented_count); + ASSERT_EQ (0, metrics.unchecked_count); + ASSERT_EQ (1, metrics.account_count); + ASSERT_EQ (node->config.bandwidth_limit, metrics.bandwidth_cap); + ASSERT_EQ (1, metrics.peer_count); + ASSERT_EQ (node->network_params.protocol.protocol_version, metrics.protocol_version); + ASSERT_GE (100, metrics.uptime); + ASSERT_EQ (node1.network_params.ledger.genesis_hash.to_string (), metrics.genesis_block); + ASSERT_EQ (nano::get_major_node_version (), metrics.major_version); + ASSERT_EQ (nano::get_minor_node_version (), metrics.minor_version); + ASSERT_EQ (nano::get_patch_node_version (), metrics.patch_version); + ASSERT_EQ (nano::get_pre_release_node_version (), metrics.pre_release_version); + ASSERT_EQ (0, metrics.maker); + ASSERT_GE (std::chrono::duration_cast (std::chrono::system_clock::now ().time_since_epoch ()).count (), metrics.timestamp); + ASSERT_EQ (node->network.endpoint ().address ().to_string (), metrics.address); + ASSERT_EQ (node->network.endpoint ().port (), metrics.port); } diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index 7938176c57..45dd46ce37 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -7,6 +7,8 @@ #include +#include + using namespace std::chrono_literals; TEST (system, generate_mass_activity) @@ -791,3 +793,240 @@ TEST (confirmation_height, prioritize_frontiers_overwrite) } } } + +namespace +{ +void wait_peer_connections (nano::system & system_a) +{ + system_a.deadline_set (10s); + auto peer_count = 0; + auto num_nodes = system_a.nodes.size (); + while (peer_count != num_nodes * (num_nodes - 1)) + { + ASSERT_NO_ERROR (system_a.poll ()); + peer_count = std::accumulate (system_a.nodes.cbegin (), system_a.nodes.cend (), 0, [](auto total, auto const & node) { + auto transaction = node->store.tx_begin_read (); + return total += node->store.peer_count (transaction); + }); + } +} + +class data +{ +public: + std::atomic awaiting_cache{ false }; + std::atomic keep_requesting_metrics{ true }; + std::shared_ptr node; + std::chrono::steady_clock::time_point orig_time; + std::atomic_flag orig_time_set = ATOMIC_FLAG_INIT; +}; +class shared_data +{ +public: + std::atomic done{ false }; + std::atomic count{ 0 }; + std::promise promise; + std::shared_future shared_future{ promise.get_future () }; +}; + +template +void callback_process (shared_data & shared_data_a, data & data, T & all_node_data_a, std::chrono::steady_clock::time_point last_updated) +{ + if (!data.orig_time_set.test_and_set ()) + { + data.orig_time = last_updated; + } + + if (data.awaiting_cache && data.orig_time != last_updated) + { + data.keep_requesting_metrics = false; + } + if (data.orig_time != last_updated) + { + data.awaiting_cache = true; + data.orig_time = last_updated; + } + if (--shared_data_a.count == 0 && std::all_of (all_node_data_a.begin (), all_node_data_a.end (), [](auto const & data) { return !data.keep_requesting_metrics; })) + { + shared_data_a.done = true; + shared_data_a.promise.set_value (); + } +}; +} + +namespace nano +{ +TEST (node_telemetry, ongoing_requests) +{ + nano::system system (2); + + auto node_client = system.nodes.front (); + auto node_server = system.nodes.back (); + + wait_peer_connections (system); + + ASSERT_EQ (0, node_client->telemetry.telemetry_data_size ()); + ASSERT_EQ (0, node_server->telemetry.telemetry_data_size ()); + ASSERT_EQ (0, node_client->stats.count (nano::stat::type::bootstrap, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); + ASSERT_EQ (0, node_client->stats.count (nano::stat::type::bootstrap, nano::stat::detail::telemetry_req, nano::stat::dir::out)); + + system.deadline_set (20s); + while (node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in) != 1 || node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out) != 1) + { + ASSERT_NO_ERROR (system.poll ()); + } + + // Wait till the next ongoing will be called, and add a 1s buffer for the actual processing + auto time = std::chrono::steady_clock::now (); + while (std::chrono::steady_clock::now () < (time + nano::telemetry_cache_cutoffs::test + nano::telemetry_impl::alarm_cutoff + 1s)) + { + ASSERT_NO_ERROR (system.poll ()); + } + + ASSERT_EQ (2, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); + ASSERT_EQ (2, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); + ASSERT_EQ (2, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); + ASSERT_EQ (2, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); + ASSERT_EQ (2, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); + ASSERT_EQ (2, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); +} +} + +TEST (node_telemetry, simultaneous_random_requests) +{ + const auto num_nodes = 4; + nano::system system (num_nodes); + + // Wait until peers are stored as they are done in the background + wait_peer_connections (system); + + std::vector threads; + const auto num_threads = 4; + + std::array all_data{}; + for (auto i = 0; i < num_nodes; ++i) + { + all_data[i].node = system.nodes[i]; + } + + shared_data shared_data; + + // Create a few threads where each node sends out telemetry request messages to all other nodes continuously, until the cache it reached and subsequently expired. + // The test waits until all telemetry_ack messages have been received. + for (int i = 0; i < num_threads; ++i) + { + threads.emplace_back ([&all_data, &shared_data]() { + while (std::any_of (all_data.cbegin (), all_data.cend (), [](auto const & data) { return data.keep_requesting_metrics.load (); })) + { + for (auto & data : all_data) + { + // Keep calling requesting telemetry metrics until the cache has been saved and then become outdated (after a certain period of time) for each node + if (data.keep_requesting_metrics) + { + ++shared_data.count; + data.node->telemetry.get_metrics_peers_async ([&shared_data, &data, &all_data](nano::telemetry_data_responses const & responses_a) { + callback_process (shared_data, data, all_data, responses_a.telemetry_data_time_pairs.begin ()->second.last_updated); + }); + } + std::this_thread::sleep_for (1ms); + } + } + + shared_data.shared_future.wait (); + ASSERT_EQ (shared_data.count, 0); + }); + } + + system.deadline_set (20s); + while (!shared_data.done) + { + ASSERT_NO_ERROR (system.poll ()); + } + + for (auto & thread : threads) + { + thread.join (); + } +} + +namespace nano +{ +namespace transport +{ + TEST (node_telemetry, simultaneous_single_and_random_requests) + { + const auto num_nodes = 4; + nano::system system (num_nodes); + + wait_peer_connections (system); + + std::vector threads; + const auto num_threads = 4; + + std::array node_data_single{}; + std::array node_data_random{}; + for (auto i = 0; i < num_nodes; ++i) + { + node_data_single[i].node = system.nodes[i]; + node_data_random[i].node = system.nodes[i]; + } + + shared_data shared_data_single; + shared_data shared_data_random; + + // Create a few threads where each node sends out telemetry request messages to all other nodes continuously, until the cache it reached and subsequently expired. + // The test waits until all telemetry_ack messages have been received. + for (int i = 0; i < num_threads; ++i) + { + threads.emplace_back ([&node_data_single, &node_data_random, &shared_data_single, &shared_data_random]() { + auto func = [](auto & all_node_data_a, shared_data & shared_data_a, bool single_a) { + while (std::any_of (all_node_data_a.cbegin (), all_node_data_a.cend (), [](auto const & data) { return data.keep_requesting_metrics.load (); })) + { + for (auto & data : all_node_data_a) + { + // Keep calling get_metrics_async until the cache has been saved and then become outdated (after a certain period of time) for each node + if (data.keep_requesting_metrics) + { + ++shared_data_a.count; + + if (single_a) + { + // Pick first peer to be consistent + auto peer = data.node->network.tcp_channels.channels[0].channel; + data.node->telemetry.get_metrics_single_peer_async (peer, [&shared_data_a, &data, &all_node_data_a](nano::telemetry_data_response const & telemetry_data_response_a) { + callback_process (shared_data_a, data, all_node_data_a, telemetry_data_response_a.telemetry_data_time_pair.last_updated); + }); + } + else + { + data.node->telemetry.get_metrics_peers_async ([&shared_data_a, &data, &all_node_data_a](nano::telemetry_data_responses const & telemetry_data_responses_a) { + callback_process (shared_data_a, data, all_node_data_a, telemetry_data_responses_a.telemetry_data_time_pairs.begin ()->second.last_updated); + }); + } + } + std::this_thread::sleep_for (1ms); + } + } + + shared_data_a.shared_future.wait (); + ASSERT_EQ (shared_data_a.count, 0); + }; + + func (node_data_single, shared_data_single, true); + func (node_data_random, shared_data_random, false); + }); + } + + system.deadline_set (30s); + while (!shared_data_random.done || !shared_data_single.done) + { + ASSERT_NO_ERROR (system.poll ()); + } + + for (auto & thread : threads) + { + thread.join (); + } + } +} +}