Skip to content

Commit

Permalink
Poll all nodes, remove outliers, ddos protection & amend RPC response…
Browse files Browse the repository at this point in the history
… with endpoint (#2521)

* Poll all nodes and remove some metrics from bounds when consolidating

* Update out of date comment

* Formatting

* Fix clang build on actions with long std::tuple

* Allow square brackets in ipv6 address in RPC

* Merge with develop

* Fix ASAN issue

* Gui review comments

* Make parse_address accept v4 and v6 ip addresses

* Incorrect order of arguments

* Use new cached genesis hash

* Move last_telemetry_req to bootstrap_server
  • Loading branch information
wezrule authored Feb 10, 2020
1 parent ad0396e commit bd93581
Show file tree
Hide file tree
Showing 17 changed files with 1,188 additions and 533 deletions.
591 changes: 349 additions & 242 deletions nano/core_test/node_telemetry.cpp

Large diffs are not rendered by default.

15 changes: 14 additions & 1 deletion nano/lib/utility.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,21 @@ std::unique_ptr<container_info_component> collect_container_info (observer_set<T

void remove_all_files_in_dir (boost::filesystem::path const & dir);
void move_all_files_to_dir (boost::filesystem::path const & from, boost::filesystem::path const & to);

template <class InputIt, class OutputIt, class Pred, class Func>
void transform_if (InputIt first, InputIt last, OutputIt dest, Pred pred, Func transform)
{
while (first != last)
{
if (pred (*first))
{
*dest++ = transform (*first);
}

++first;
}
}
}
// Have our own async_write which we must use?

void release_assert_internal (bool check, const char * check_expr, const char * file, unsigned int line);
#define release_assert(check) release_assert_internal (check, #check, __FILE__, __LINE__)
10 changes: 8 additions & 2 deletions nano/node/bootstrap/bootstrap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,16 @@ void nano::bootstrap_server::receive_header_action (boost::system::error_code co
}
case nano::message_type::telemetry_req:
{
node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::telemetry_req, nano::stat::dir::in);
if (is_realtime_connection ())
{
add_request (std::make_unique<nano::telemetry_req> (header));
// Only handle telemetry requests if they are outside of the cutoff time
auto is_very_first_message = last_telemetry_req == std::chrono::steady_clock::time_point{};
auto cache_exceeded = std::chrono::steady_clock::now () >= last_telemetry_req + nano::telemetry_cache_cutoffs::network_to_time (node->network_params.network);
if (is_very_first_message || cache_exceeded)
{
last_telemetry_req = std::chrono::steady_clock::now ();
add_request (std::make_unique<nano::telemetry_req> (header));
}
}
receive ();
break;
Expand Down
1 change: 1 addition & 0 deletions nano/node/bootstrap/bootstrap_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,6 @@ class bootstrap_server final : public std::enable_shared_from_this<nano::bootstr
// Remote enpoint used to remove response channel even after socket closing
nano::tcp_endpoint remote_endpoint{ boost::asio::ip::address_v6::any (), 0 };
nano::account remote_node_id{ 0 };
std::chrono::steady_clock::time_point last_telemetry_req{ std::chrono::steady_clock::time_point () };
};
}
162 changes: 32 additions & 130 deletions nano/node/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,19 @@
#include <nano/node/wallet.hpp>
#include <nano/secure/buffer.hpp>

#include <boost/algorithm/string.hpp>
#include <boost/endian/conversion.hpp>
#include <boost/pool/pool_alloc.hpp>
#include <boost/variant/get.hpp>

#include <numeric>

std::bitset<16> constexpr nano::message_header::block_type_mask;
std::bitset<16> constexpr nano::message_header::count_mask;

std::chrono::seconds constexpr nano::telemetry_cache_cutoffs::test;
std::chrono::seconds constexpr nano::telemetry_cache_cutoffs::beta;
std::chrono::seconds constexpr nano::telemetry_cache_cutoffs::live;

namespace
{
nano::protocol_constants const & get_protocol_constants ()
Expand Down Expand Up @@ -1176,135 +1181,6 @@ bool nano::telemetry_ack::is_empty_payload () const
return size () == 0;
}

nano::telemetry_data nano::telemetry_data::consolidate (std::vector<nano::telemetry_data> const & telemetry_data_responses_a)
{
if (telemetry_data_responses_a.empty ())
{
return {};
}
else if (telemetry_data_responses_a.size () == 1)
{
// Only 1 element in the collection, so just return it.
return telemetry_data_responses_a.front ();
}

nano::uint128_t account_sum{ 0 };
nano::uint128_t block_sum{ 0 };
nano::uint128_t cemented_sum{ 0 };
nano::uint128_t peer_sum{ 0 };
nano::uint128_t unchecked_sum{ 0 };
nano::uint128_t uptime_sum{ 0 };
nano::uint128_t bandwidth_sum{ 0 };

std::unordered_map<uint8_t, int> protocol_versions;
std::unordered_map<std::string, int> vendor_versions;
std::unordered_map<uint64_t, int> bandwidth_caps;
std::unordered_map<nano::block_hash, int> genesis_blocks;

nano::uint128_t account_average{ 0 };

for (auto const & telemetry_data : telemetry_data_responses_a)
{
account_sum += telemetry_data.account_count;
block_sum += telemetry_data.block_count;
cemented_sum += telemetry_data.cemented_count;

std::ostringstream ss;
ss << telemetry_data.major_version;
if (telemetry_data.minor_version.is_initialized ())
{
ss << "." << *telemetry_data.minor_version;
if (telemetry_data.patch_version.is_initialized ())
{
ss << "." << *telemetry_data.patch_version;
if (telemetry_data.pre_release_version.is_initialized ())
{
ss << "." << *telemetry_data.pre_release_version;
if (telemetry_data.maker.is_initialized ())
{
ss << "." << *telemetry_data.maker;
}
}
}
}

++vendor_versions[ss.str ()];
++protocol_versions[telemetry_data.protocol_version];
peer_sum += telemetry_data.peer_count;

// 0 has a special meaning (unlimited), don't include it in the average as it will be heavily skewed
if (telemetry_data.bandwidth_cap != 0)
{
bandwidth_sum += telemetry_data.bandwidth_cap;
}
++bandwidth_caps[telemetry_data.bandwidth_cap];
unchecked_sum += telemetry_data.unchecked_count;
uptime_sum += telemetry_data.uptime;
++genesis_blocks[telemetry_data.genesis_block];
}

nano::telemetry_data consolidated_data;
auto size = telemetry_data_responses_a.size ();
consolidated_data.account_count = boost::numeric_cast<decltype (account_count)> (account_sum / size);
consolidated_data.block_count = boost::numeric_cast<decltype (block_count)> (block_sum / size);
consolidated_data.cemented_count = boost::numeric_cast<decltype (cemented_count)> (cemented_sum / size);
consolidated_data.peer_count = boost::numeric_cast<decltype (peer_count)> (peer_sum / size);
consolidated_data.uptime = boost::numeric_cast<decltype (uptime)> (uptime_sum / size);
consolidated_data.unchecked_count = boost::numeric_cast<decltype (unchecked_count)> (unchecked_sum / size);

auto set_mode_or_average = [](auto const & collection, auto & var, auto const & sum, size_t size) {
auto max = std::max_element (collection.begin (), collection.end (), [](auto const & lhs, auto const & rhs) {
return lhs.second < rhs.second;
});
if (max->second > 1)
{
var = max->first;
}
else
{
var = (sum / size).template convert_to<std::remove_reference_t<decltype (var)>> ();
}
};

auto set_mode = [](auto const & collection, auto & var, size_t size) {
auto max = std::max_element (collection.begin (), collection.end (), [](auto const & lhs, auto const & rhs) {
return lhs.second < rhs.second;
});
if (max->second > 1)
{
var = max->first;
}
else
{
// Just pick the first one
var = collection.begin ()->first;
}
};

// Use the mode of protocol version, vendor version and bandwidth cap if there is 2 or more using it
set_mode_or_average (bandwidth_caps, consolidated_data.bandwidth_cap, bandwidth_sum, size);
set_mode (protocol_versions, consolidated_data.protocol_version, size);
set_mode (genesis_blocks, consolidated_data.genesis_block, size);

// Vendor version, needs to be parsed out of the string
std::string version;
set_mode (vendor_versions, version, size);

// May only have major version, but check for optional parameters as well, only output if all are used
std::vector<std::string> version_fragments;
boost::split (version_fragments, version, boost::is_any_of ("."));
assert (!version_fragments.empty () && version_fragments.size () <= 5);
consolidated_data.major_version = boost::lexical_cast<uint8_t> (version_fragments.front ());
if (version_fragments.size () == 5)
{
consolidated_data.minor_version = boost::lexical_cast<uint8_t> (version_fragments[1]);
consolidated_data.patch_version = boost::lexical_cast<uint8_t> (version_fragments[2]);
consolidated_data.pre_release_version = boost::lexical_cast<uint8_t> (version_fragments[3]);
consolidated_data.maker = boost::lexical_cast<uint8_t> (version_fragments[4]);
}
return consolidated_data;
}

nano::error nano::telemetry_data::serialize_json (nano::jsonconfig & json) const
{
json.put ("block_count", block_count);
Expand Down Expand Up @@ -1369,6 +1245,11 @@ bool nano::telemetry_data::operator== (nano::telemetry_data const & data_a) cons
return (block_count == data_a.block_count && cemented_count == data_a.cemented_count && unchecked_count == data_a.unchecked_count && account_count == data_a.account_count && bandwidth_cap == data_a.bandwidth_cap && uptime == data_a.uptime && peer_count == data_a.peer_count && protocol_version == data_a.protocol_version && genesis_block == data_a.genesis_block && major_version == data_a.major_version && minor_version == data_a.minor_version && patch_version == data_a.patch_version && pre_release_version == data_a.pre_release_version && maker == data_a.maker);
}

bool nano::telemetry_data::operator!= (nano::telemetry_data const & data_a) const
{
return !(*this == data_a);
}

nano::node_id_handshake::node_id_handshake (bool & error_a, nano::stream & stream_a, nano::message_header const & header_a) :
message (header_a),
query (boost::none),
Expand Down Expand Up @@ -1484,6 +1365,22 @@ bool nano::parse_port (std::string const & string_a, uint16_t & port_a)
return result;
}

// Can handle both ipv4 & ipv6 addresses (with and without square brackets)
bool nano::parse_address (std::string const & address_text_a, boost::asio::ip::address & address_a)
{
auto result (false);
auto address_text = address_text_a;
if (!address_text.empty () && address_text.front () == '[' && address_text.back () == ']')
{
// Chop the square brackets off as make_address doesn't always like them
address_text = address_text.substr (1, address_text.size () - 2);
}

boost::system::error_code address_ec;
address_a = boost::asio::ip::make_address (address_text, address_ec);
return !!address_ec;
}

bool nano::parse_address_port (std::string const & string, boost::asio::ip::address & address_a, uint16_t & port_a)
{
auto result (false);
Expand Down Expand Up @@ -1550,6 +1447,11 @@ bool nano::parse_tcp_endpoint (std::string const & string, nano::tcp_endpoint &
return result;
}

std::chrono::seconds nano::telemetry_cache_cutoffs::network_to_time (network_constants const & network_constants)
{
return std::chrono::seconds{ network_constants.is_live_network () ? live : network_constants.is_beta_network () ? beta : test };
}

nano::node_singleton_memory_pool_purge_guard::node_singleton_memory_pool_purge_guard () :
cleanup_guard ({ nano::block_memory_pool_purge, nano::purge_singleton_pool_memory<nano::vote>, nano::purge_singleton_pool_memory<nano::election> })
{
Expand Down
13 changes: 12 additions & 1 deletion nano/node/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace nano
{
using endpoint = boost::asio::ip::udp::endpoint;
bool parse_port (std::string const &, uint16_t &);
bool parse_address (std::string const &, boost::asio::ip::address &);
bool parse_address_port (std::string const &, boost::asio::ip::address &, uint16_t &);
using tcp_endpoint = boost::asio::ip::tcp::endpoint;
bool parse_endpoint (std::string const &, nano::endpoint &);
Expand Down Expand Up @@ -348,10 +349,10 @@ class telemetry_data
boost::optional<uint8_t> pre_release_version;
boost::optional<uint8_t> maker; // 0 for NF node

static nano::telemetry_data consolidate (std::vector<nano::telemetry_data> const & telemetry_data_responses);
nano::error serialize_json (nano::jsonconfig & json) const;
nano::error deserialize_json (nano::jsonconfig & json);
bool operator== (nano::telemetry_data const &) const;
bool operator!= (nano::telemetry_data const &) const;

static auto constexpr size_v0 = sizeof (block_count) + sizeof (cemented_count) + sizeof (unchecked_count) + sizeof (account_count) + sizeof (bandwidth_cap) + sizeof (peer_count) + sizeof (protocol_version) + sizeof (uptime) + sizeof (genesis_block) + sizeof (major_version);
static auto constexpr size = size_v0 + sizeof (decltype (minor_version)::value_type) + sizeof (decltype (patch_version)::value_type) + sizeof (decltype (pre_release_version)::value_type) + sizeof (decltype (maker)::value_type);
Expand Down Expand Up @@ -451,6 +452,16 @@ class message_visitor
virtual ~message_visitor ();
};

class telemetry_cache_cutoffs
{
public:
static std::chrono::seconds constexpr test{ 2 };
static std::chrono::seconds constexpr beta{ 15 };
static std::chrono::seconds constexpr live{ 60 };

static std::chrono::seconds network_to_time (network_constants const & network_constants);
};

/** Helper guard which contains all the necessary purge (remove all memory even if used) functions */
class node_singleton_memory_pool_purge_guard
{
Expand Down
32 changes: 20 additions & 12 deletions nano/node/json_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3910,12 +3910,11 @@ void nano::json_handler::telemetry ()
uint16_t port;
if (!nano::parse_port (*port_text, port))
{
boost::system::error_code address_ec;
auto address (boost::asio::ip::make_address_v6 (*address_text, address_ec));
if (!address_ec)
boost::asio::ip::address address;
if (!nano::parse_address (*address_text, address))
{
nano::endpoint endpoint (address, port);
channel = node.network.find_channel (endpoint);
channel = node.network.find_channel (nano::transport::map_endpoint_to_v6 (endpoint));
if (!channel)
{
ec = nano::error_rpc::peer_not_found;
Expand Down Expand Up @@ -3943,13 +3942,13 @@ void nano::json_handler::telemetry ()
if (!single_telemetry_metric_a.error)
{
nano::jsonconfig config_l;
auto err = single_telemetry_metric_a.data.serialize_json (config_l);
auto err = single_telemetry_metric_a.telemetry_data_time_pair.data.serialize_json (config_l);
config_l.put ("timestamp", std::chrono::duration_cast<std::chrono::seconds> (single_telemetry_metric_a.telemetry_data_time_pair.system_last_updated.time_since_epoch ()).count ());
auto const & ptree = config_l.get_tree ();

if (!err)
{
rpc_l->response_l.insert (rpc_l->response_l.begin (), ptree.begin (), ptree.end ());
rpc_l->response_l.put ("cached", single_telemetry_metric_a.is_cached);
}
else
{
Expand All @@ -3975,14 +3974,17 @@ void nano::json_handler::telemetry ()
// setting "raw" to true returns metrics from all nodes requested.
auto raw = request.get_optional<bool> ("raw");
auto output_raw = raw.value_or (false);
node.telemetry.get_metrics_random_peers_async ([rpc_l, output_raw](auto const & batched_telemetry_metrics_a) {
node.telemetry.get_metrics_peers_async ([rpc_l, output_raw](auto const & batched_telemetry_metrics_a) {
if (output_raw)
{
boost::property_tree::ptree metrics;
for (auto & telemetry_metrics : batched_telemetry_metrics_a.data)
for (auto & telemetry_metrics : batched_telemetry_metrics_a.telemetry_data_time_pairs)
{
nano::jsonconfig config_l;
auto err = telemetry_metrics.serialize_json (config_l);
auto err = telemetry_metrics.second.data.serialize_json (config_l);
config_l.put ("timestamp", std::chrono::duration_cast<std::chrono::seconds> (telemetry_metrics.second.system_last_updated.time_since_epoch ()).count ());
config_l.put ("address", telemetry_metrics.first.address ());
config_l.put ("port", telemetry_metrics.first.port ());
if (!err)
{
metrics.push_back (std::make_pair ("", config_l.get_tree ()));
Expand All @@ -3998,8 +4000,15 @@ void nano::json_handler::telemetry ()
else
{
nano::jsonconfig config_l;
auto average_telemetry_metrics = nano::telemetry_data::consolidate (batched_telemetry_metrics_a.data);
auto err = average_telemetry_metrics.serialize_json (config_l);
std::vector<nano::telemetry_data_time_pair> telemetry_data_time_pairs;
telemetry_data_time_pairs.reserve (batched_telemetry_metrics_a.telemetry_data_time_pairs.size ());
std::transform (batched_telemetry_metrics_a.telemetry_data_time_pairs.begin (), batched_telemetry_metrics_a.telemetry_data_time_pairs.end (), std::back_inserter (telemetry_data_time_pairs), [](auto const & telemetry_data_time_pair_a) {
return telemetry_data_time_pair_a.second;
});

auto average_telemetry_metrics = nano::consolidate_telemetry_data_time_pairs (telemetry_data_time_pairs);
auto err = average_telemetry_metrics.data.serialize_json (config_l);
config_l.put ("timestamp", std::chrono::duration_cast<std::chrono::seconds> (average_telemetry_metrics.system_last_updated.time_since_epoch ()).count ());
auto const & ptree = config_l.get_tree ();

if (!err)
Expand All @@ -4012,7 +4021,6 @@ void nano::json_handler::telemetry ()
}
}

rpc_l->response_l.put ("cached", batched_telemetry_metrics_a.is_cached);
rpc_l->response_errors ();
});
}
Expand Down
8 changes: 4 additions & 4 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ class network_message_visitor : public nano::message_visitor

telemetry_ack = nano::telemetry_ack (telemetry_data);
}
channel->send (telemetry_ack);
channel->send (telemetry_ack, nullptr, false);
}
void telemetry_ack (nano::telemetry_ack const & message_a) override
{
Expand Down Expand Up @@ -551,11 +551,11 @@ bool nano::network::reachout (nano::endpoint const & endpoint_a, bool allow_loca
return error;
}

std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list (size_t count_a)
std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list (size_t count_a, uint8_t minimum_version_a, bool include_tcp_temporary_channels_a)
{
std::deque<std::shared_ptr<nano::transport::channel>> result;
tcp_channels.list (result);
udp_channels.list (result);
tcp_channels.list (result, minimum_version_a, include_tcp_temporary_channels_a);
udp_channels.list (result, minimum_version_a);
nano::random_pool_shuffle (result.begin (), result.end ());
if (result.size () > count_a)
{
Expand Down
Loading

0 comments on commit bd93581

Please sign in to comment.