Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poll all nodes, remove outliers, ddos protection & amend RPC response with endpoint #2521

Merged
merged 15 commits into from
Feb 10, 2020
Merged
591 changes: 349 additions & 242 deletions nano/core_test/node_telemetry.cpp

Large diffs are not rendered by default.

15 changes: 14 additions & 1 deletion nano/lib/utility.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,21 @@ std::unique_ptr<container_info_component> collect_container_info (observer_set<T

void remove_all_files_in_dir (boost::filesystem::path const & dir);
void move_all_files_to_dir (boost::filesystem::path const & from, boost::filesystem::path const & to);

template <class InputIt, class OutputIt, class Pred, class Func>
void transform_if (InputIt first, InputIt last, OutputIt dest, Pred pred, Func transform)
{
while (first != last)
{
if (pred (*first))
{
*dest++ = transform (*first);
}

++first;
}
}
}
// Have our own async_write which we must use?

void release_assert_internal (bool check, const char * check_expr, const char * file, unsigned int line);
#define release_assert(check) release_assert_internal (check, #check, __FILE__, __LINE__)
10 changes: 8 additions & 2 deletions nano/node/bootstrap/bootstrap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,16 @@ void nano::bootstrap_server::receive_header_action (boost::system::error_code co
}
case nano::message_type::telemetry_req:
{
node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::telemetry_req, nano::stat::dir::in);
if (is_realtime_connection ())
{
add_request (std::make_unique<nano::telemetry_req> (header));
// Only handle telemetry requests if they are outside of the cutoff time
auto is_very_first_message = last_telemetry_req == std::chrono::steady_clock::time_point{};
auto cache_exceeded = std::chrono::steady_clock::now () >= last_telemetry_req + nano::telemetry_cache_cutoffs::network_to_time (node->network_params.network);
if (is_very_first_message || cache_exceeded)
{
last_telemetry_req = std::chrono::steady_clock::now ();
add_request (std::make_unique<nano::telemetry_req> (header));
}
}
receive ();
break;
Expand Down
1 change: 1 addition & 0 deletions nano/node/bootstrap/bootstrap_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,6 @@ class bootstrap_server final : public std::enable_shared_from_this<nano::bootstr
// Remote enpoint used to remove response channel even after socket closing
nano::tcp_endpoint remote_endpoint{ boost::asio::ip::address_v6::any (), 0 };
nano::account remote_node_id{ 0 };
std::chrono::steady_clock::time_point last_telemetry_req{ std::chrono::steady_clock::time_point () };
};
}
162 changes: 32 additions & 130 deletions nano/node/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,19 @@
#include <nano/node/wallet.hpp>
#include <nano/secure/buffer.hpp>

#include <boost/algorithm/string.hpp>
#include <boost/endian/conversion.hpp>
#include <boost/pool/pool_alloc.hpp>
#include <boost/variant/get.hpp>

#include <numeric>

std::bitset<16> constexpr nano::message_header::block_type_mask;
std::bitset<16> constexpr nano::message_header::count_mask;

std::chrono::seconds constexpr nano::telemetry_cache_cutoffs::test;
std::chrono::seconds constexpr nano::telemetry_cache_cutoffs::beta;
std::chrono::seconds constexpr nano::telemetry_cache_cutoffs::live;

namespace
{
nano::protocol_constants const & get_protocol_constants ()
Expand Down Expand Up @@ -1176,135 +1181,6 @@ bool nano::telemetry_ack::is_empty_payload () const
return size () == 0;
}

nano::telemetry_data nano::telemetry_data::consolidate (std::vector<nano::telemetry_data> const & telemetry_data_responses_a)
{
if (telemetry_data_responses_a.empty ())
{
return {};
}
else if (telemetry_data_responses_a.size () == 1)
{
// Only 1 element in the collection, so just return it.
return telemetry_data_responses_a.front ();
}

nano::uint128_t account_sum{ 0 };
nano::uint128_t block_sum{ 0 };
nano::uint128_t cemented_sum{ 0 };
nano::uint128_t peer_sum{ 0 };
nano::uint128_t unchecked_sum{ 0 };
nano::uint128_t uptime_sum{ 0 };
nano::uint128_t bandwidth_sum{ 0 };

std::unordered_map<uint8_t, int> protocol_versions;
std::unordered_map<std::string, int> vendor_versions;
std::unordered_map<uint64_t, int> bandwidth_caps;
std::unordered_map<nano::block_hash, int> genesis_blocks;

nano::uint128_t account_average{ 0 };

for (auto const & telemetry_data : telemetry_data_responses_a)
{
account_sum += telemetry_data.account_count;
block_sum += telemetry_data.block_count;
cemented_sum += telemetry_data.cemented_count;

std::ostringstream ss;
ss << telemetry_data.major_version;
if (telemetry_data.minor_version.is_initialized ())
{
ss << "." << *telemetry_data.minor_version;
if (telemetry_data.patch_version.is_initialized ())
{
ss << "." << *telemetry_data.patch_version;
if (telemetry_data.pre_release_version.is_initialized ())
{
ss << "." << *telemetry_data.pre_release_version;
if (telemetry_data.maker.is_initialized ())
{
ss << "." << *telemetry_data.maker;
}
}
}
}

++vendor_versions[ss.str ()];
++protocol_versions[telemetry_data.protocol_version];
peer_sum += telemetry_data.peer_count;

// 0 has a special meaning (unlimited), don't include it in the average as it will be heavily skewed
if (telemetry_data.bandwidth_cap != 0)
{
bandwidth_sum += telemetry_data.bandwidth_cap;
}
++bandwidth_caps[telemetry_data.bandwidth_cap];
unchecked_sum += telemetry_data.unchecked_count;
uptime_sum += telemetry_data.uptime;
++genesis_blocks[telemetry_data.genesis_block];
}

nano::telemetry_data consolidated_data;
auto size = telemetry_data_responses_a.size ();
consolidated_data.account_count = boost::numeric_cast<decltype (account_count)> (account_sum / size);
consolidated_data.block_count = boost::numeric_cast<decltype (block_count)> (block_sum / size);
consolidated_data.cemented_count = boost::numeric_cast<decltype (cemented_count)> (cemented_sum / size);
consolidated_data.peer_count = boost::numeric_cast<decltype (peer_count)> (peer_sum / size);
consolidated_data.uptime = boost::numeric_cast<decltype (uptime)> (uptime_sum / size);
consolidated_data.unchecked_count = boost::numeric_cast<decltype (unchecked_count)> (unchecked_sum / size);

auto set_mode_or_average = [](auto const & collection, auto & var, auto const & sum, size_t size) {
auto max = std::max_element (collection.begin (), collection.end (), [](auto const & lhs, auto const & rhs) {
return lhs.second < rhs.second;
});
if (max->second > 1)
{
var = max->first;
}
else
{
var = (sum / size).template convert_to<std::remove_reference_t<decltype (var)>> ();
}
};

auto set_mode = [](auto const & collection, auto & var, size_t size) {
auto max = std::max_element (collection.begin (), collection.end (), [](auto const & lhs, auto const & rhs) {
return lhs.second < rhs.second;
});
if (max->second > 1)
{
var = max->first;
}
else
{
// Just pick the first one
var = collection.begin ()->first;
}
};

// Use the mode of protocol version, vendor version and bandwidth cap if there is 2 or more using it
set_mode_or_average (bandwidth_caps, consolidated_data.bandwidth_cap, bandwidth_sum, size);
set_mode (protocol_versions, consolidated_data.protocol_version, size);
set_mode (genesis_blocks, consolidated_data.genesis_block, size);

// Vendor version, needs to be parsed out of the string
std::string version;
set_mode (vendor_versions, version, size);

// May only have major version, but check for optional parameters as well, only output if all are used
std::vector<std::string> version_fragments;
boost::split (version_fragments, version, boost::is_any_of ("."));
assert (!version_fragments.empty () && version_fragments.size () <= 5);
consolidated_data.major_version = boost::lexical_cast<uint8_t> (version_fragments.front ());
if (version_fragments.size () == 5)
{
consolidated_data.minor_version = boost::lexical_cast<uint8_t> (version_fragments[1]);
consolidated_data.patch_version = boost::lexical_cast<uint8_t> (version_fragments[2]);
consolidated_data.pre_release_version = boost::lexical_cast<uint8_t> (version_fragments[3]);
consolidated_data.maker = boost::lexical_cast<uint8_t> (version_fragments[4]);
}
return consolidated_data;
}

nano::error nano::telemetry_data::serialize_json (nano::jsonconfig & json) const
{
json.put ("block_count", block_count);
Expand Down Expand Up @@ -1369,6 +1245,11 @@ bool nano::telemetry_data::operator== (nano::telemetry_data const & data_a) cons
return (block_count == data_a.block_count && cemented_count == data_a.cemented_count && unchecked_count == data_a.unchecked_count && account_count == data_a.account_count && bandwidth_cap == data_a.bandwidth_cap && uptime == data_a.uptime && peer_count == data_a.peer_count && protocol_version == data_a.protocol_version && genesis_block == data_a.genesis_block && major_version == data_a.major_version && minor_version == data_a.minor_version && patch_version == data_a.patch_version && pre_release_version == data_a.pre_release_version && maker == data_a.maker);
}

bool nano::telemetry_data::operator!= (nano::telemetry_data const & data_a) const
{
return !(*this == data_a);
}

nano::node_id_handshake::node_id_handshake (bool & error_a, nano::stream & stream_a, nano::message_header const & header_a) :
message (header_a),
query (boost::none),
Expand Down Expand Up @@ -1484,6 +1365,22 @@ bool nano::parse_port (std::string const & string_a, uint16_t & port_a)
return result;
}

// Can handle both ipv4 & ipv6 addresses (with and without square brackets)
bool nano::parse_address (std::string const & address_text_a, boost::asio::ip::address & address_a)
{
auto result (false);
auto address_text = address_text_a;
if (!address_text.empty () && address_text.front () == '[' && address_text.back () == ']')
{
// Chop the square brackets off as make_address doesn't always like them
address_text = address_text.substr (1, address_text.size () - 2);
}

boost::system::error_code address_ec;
address_a = boost::asio::ip::make_address (address_text, address_ec);
return !!address_ec;
}

bool nano::parse_address_port (std::string const & string, boost::asio::ip::address & address_a, uint16_t & port_a)
{
auto result (false);
Expand Down Expand Up @@ -1550,6 +1447,11 @@ bool nano::parse_tcp_endpoint (std::string const & string, nano::tcp_endpoint &
return result;
}

std::chrono::seconds nano::telemetry_cache_cutoffs::network_to_time (network_constants const & network_constants)
{
return std::chrono::seconds{ network_constants.is_live_network () ? live : network_constants.is_beta_network () ? beta : test };
}

nano::node_singleton_memory_pool_purge_guard::node_singleton_memory_pool_purge_guard () :
cleanup_guard ({ nano::block_memory_pool_purge, nano::purge_singleton_pool_memory<nano::vote>, nano::purge_singleton_pool_memory<nano::election> })
{
Expand Down
13 changes: 12 additions & 1 deletion nano/node/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace nano
{
using endpoint = boost::asio::ip::udp::endpoint;
bool parse_port (std::string const &, uint16_t &);
bool parse_address (std::string const &, boost::asio::ip::address &);
bool parse_address_port (std::string const &, boost::asio::ip::address &, uint16_t &);
using tcp_endpoint = boost::asio::ip::tcp::endpoint;
bool parse_endpoint (std::string const &, nano::endpoint &);
Expand Down Expand Up @@ -348,10 +349,10 @@ class telemetry_data
boost::optional<uint8_t> pre_release_version;
boost::optional<uint8_t> maker; // 0 for NF node

static nano::telemetry_data consolidate (std::vector<nano::telemetry_data> const & telemetry_data_responses);
nano::error serialize_json (nano::jsonconfig & json) const;
nano::error deserialize_json (nano::jsonconfig & json);
bool operator== (nano::telemetry_data const &) const;
bool operator!= (nano::telemetry_data const &) const;

static auto constexpr size_v0 = sizeof (block_count) + sizeof (cemented_count) + sizeof (unchecked_count) + sizeof (account_count) + sizeof (bandwidth_cap) + sizeof (peer_count) + sizeof (protocol_version) + sizeof (uptime) + sizeof (genesis_block) + sizeof (major_version);
static auto constexpr size = size_v0 + sizeof (decltype (minor_version)::value_type) + sizeof (decltype (patch_version)::value_type) + sizeof (decltype (pre_release_version)::value_type) + sizeof (decltype (maker)::value_type);
Expand Down Expand Up @@ -451,6 +452,16 @@ class message_visitor
virtual ~message_visitor ();
};

class telemetry_cache_cutoffs
{
public:
static std::chrono::seconds constexpr test{ 2 };
static std::chrono::seconds constexpr beta{ 15 };
static std::chrono::seconds constexpr live{ 60 };

static std::chrono::seconds network_to_time (network_constants const & network_constants);
};

/** Helper guard which contains all the necessary purge (remove all memory even if used) functions */
class node_singleton_memory_pool_purge_guard
{
Expand Down
32 changes: 20 additions & 12 deletions nano/node/json_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3910,12 +3910,11 @@ void nano::json_handler::telemetry ()
uint16_t port;
if (!nano::parse_port (*port_text, port))
{
boost::system::error_code address_ec;
auto address (boost::asio::ip::make_address_v6 (*address_text, address_ec));
if (!address_ec)
boost::asio::ip::address address;
if (!nano::parse_address (*address_text, address))
{
nano::endpoint endpoint (address, port);
channel = node.network.find_channel (endpoint);
channel = node.network.find_channel (nano::transport::map_endpoint_to_v6 (endpoint));
if (!channel)
{
ec = nano::error_rpc::peer_not_found;
Expand Down Expand Up @@ -3943,13 +3942,13 @@ void nano::json_handler::telemetry ()
if (!single_telemetry_metric_a.error)
{
nano::jsonconfig config_l;
auto err = single_telemetry_metric_a.data.serialize_json (config_l);
auto err = single_telemetry_metric_a.telemetry_data_time_pair.data.serialize_json (config_l);
config_l.put ("timestamp", std::chrono::duration_cast<std::chrono::seconds> (single_telemetry_metric_a.telemetry_data_time_pair.system_last_updated.time_since_epoch ()).count ());
auto const & ptree = config_l.get_tree ();

if (!err)
{
rpc_l->response_l.insert (rpc_l->response_l.begin (), ptree.begin (), ptree.end ());
rpc_l->response_l.put ("cached", single_telemetry_metric_a.is_cached);
}
else
{
Expand All @@ -3975,14 +3974,17 @@ void nano::json_handler::telemetry ()
// setting "raw" to true returns metrics from all nodes requested.
auto raw = request.get_optional<bool> ("raw");
auto output_raw = raw.value_or (false);
node.telemetry.get_metrics_random_peers_async ([rpc_l, output_raw](auto const & batched_telemetry_metrics_a) {
node.telemetry.get_metrics_peers_async ([rpc_l, output_raw](auto const & batched_telemetry_metrics_a) {
if (output_raw)
{
boost::property_tree::ptree metrics;
for (auto & telemetry_metrics : batched_telemetry_metrics_a.data)
for (auto & telemetry_metrics : batched_telemetry_metrics_a.telemetry_data_time_pairs)
{
nano::jsonconfig config_l;
auto err = telemetry_metrics.serialize_json (config_l);
auto err = telemetry_metrics.second.data.serialize_json (config_l);
config_l.put ("timestamp", std::chrono::duration_cast<std::chrono::seconds> (telemetry_metrics.second.system_last_updated.time_since_epoch ()).count ());
config_l.put ("address", telemetry_metrics.first.address ());
config_l.put ("port", telemetry_metrics.first.port ());
if (!err)
{
metrics.push_back (std::make_pair ("", config_l.get_tree ()));
Expand All @@ -3998,8 +4000,15 @@ void nano::json_handler::telemetry ()
else
{
nano::jsonconfig config_l;
auto average_telemetry_metrics = nano::telemetry_data::consolidate (batched_telemetry_metrics_a.data);
auto err = average_telemetry_metrics.serialize_json (config_l);
std::vector<nano::telemetry_data_time_pair> telemetry_data_time_pairs;
telemetry_data_time_pairs.reserve (batched_telemetry_metrics_a.telemetry_data_time_pairs.size ());
std::transform (batched_telemetry_metrics_a.telemetry_data_time_pairs.begin (), batched_telemetry_metrics_a.telemetry_data_time_pairs.end (), std::back_inserter (telemetry_data_time_pairs), [](auto const & telemetry_data_time_pair_a) {
return telemetry_data_time_pair_a.second;
});

auto average_telemetry_metrics = nano::consolidate_telemetry_data_time_pairs (telemetry_data_time_pairs);
auto err = average_telemetry_metrics.data.serialize_json (config_l);
config_l.put ("timestamp", std::chrono::duration_cast<std::chrono::seconds> (average_telemetry_metrics.system_last_updated.time_since_epoch ()).count ());
auto const & ptree = config_l.get_tree ();

if (!err)
Expand All @@ -4012,7 +4021,6 @@ void nano::json_handler::telemetry ()
}
}

rpc_l->response_l.put ("cached", batched_telemetry_metrics_a.is_cached);
rpc_l->response_errors ();
});
}
Expand Down
8 changes: 4 additions & 4 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ class network_message_visitor : public nano::message_visitor

telemetry_ack = nano::telemetry_ack (telemetry_data);
}
channel->send (telemetry_ack);
channel->send (telemetry_ack, nullptr, false);
}
void telemetry_ack (nano::telemetry_ack const & message_a) override
{
Expand Down Expand Up @@ -631,11 +631,11 @@ bool nano::network::reachout (nano::endpoint const & endpoint_a, bool allow_loca
return error;
}

std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list (size_t count_a)
std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list (size_t count_a, uint8_t minimum_version_a, bool include_tcp_temporary_channels_a)
{
std::deque<std::shared_ptr<nano::transport::channel>> result;
tcp_channels.list (result);
udp_channels.list (result);
tcp_channels.list (result, minimum_version_a, include_tcp_temporary_channels_a);
udp_channels.list (result, minimum_version_a);
nano::random_pool_shuffle (result.begin (), result.end ());
if (result.size () > count_a)
{
Expand Down
Loading