Skip to content

Commit

Permalink
Do not query new channels on network thread in rep_crawler
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed Oct 27, 2024
1 parent 251d3e7 commit 3544eeb
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 16 deletions.
32 changes: 22 additions & 10 deletions nano/node/repcrawler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ nano::rep_crawler::rep_crawler (nano::rep_crawler_config const & config_a, nano:
network_constants{ node_a.network_params.network },
active{ node_a.active }
{
if (!node.flags.disable_rep_crawler)
{
node.observers.endpoint.add ([this] (std::shared_ptr<nano::transport::channel> const & channel) {
query (channel);
});
}
node.observers.endpoint.add ([this] (std::shared_ptr<nano::transport::channel> const & channel) {
if (!node.flags.disable_rep_crawler)
{
nano::lock_guard<nano::mutex> lock{ mutex };
prioritized.push_back (channel);
}
});
}

nano::rep_crawler::~rep_crawler ()
Expand Down Expand Up @@ -160,7 +161,7 @@ void nano::rep_crawler::run ()
lock.lock ();

condition.wait_for (lock, query_interval (sufficient_weight), [this, sufficient_weight] {
return stopped || query_predicate (sufficient_weight) || !responses.empty ();
return stopped || query_predicate (sufficient_weight) || !responses.empty () || !prioritized.empty ();
});

if (stopped)
Expand All @@ -179,6 +180,16 @@ void nano::rep_crawler::run ()

cleanup ();

if (!prioritized.empty ())
{
decltype (prioritized) prioritized_l;
prioritized_l.swap (prioritized);

lock.unlock ();
query (prioritized_l);
lock.lock ();
}

if (query_predicate (sufficient_weight))
{
last_query = std::chrono::steady_clock::now ();
Expand Down Expand Up @@ -229,7 +240,7 @@ void nano::rep_crawler::cleanup ()
});
}

std::vector<std::shared_ptr<nano::transport::channel>> nano::rep_crawler::prepare_crawl_targets (bool sufficient_weight) const
std::deque<std::shared_ptr<nano::transport::channel>> nano::rep_crawler::prepare_crawl_targets (bool sufficient_weight) const
{
debug_assert (!mutex.try_lock ());

Expand Down Expand Up @@ -330,7 +341,7 @@ bool nano::rep_crawler::track_rep_request (hash_root_t hash_root, std::shared_pt
return true;
}

void nano::rep_crawler::query (std::vector<std::shared_ptr<nano::transport::channel>> const & target_channels)
void nano::rep_crawler::query (std::deque<std::shared_ptr<nano::transport::channel>> const & target_channels)
{
auto maybe_hash_root = prepare_query_target ();
if (!maybe_hash_root)
Expand Down Expand Up @@ -376,7 +387,7 @@ void nano::rep_crawler::query (std::vector<std::shared_ptr<nano::transport::chan

void nano::rep_crawler::query (std::shared_ptr<nano::transport::channel> const & target_channel)
{
query (std::vector{ target_channel });
query (std::deque{ target_channel });
}

bool nano::rep_crawler::is_pr (std::shared_ptr<nano::transport::channel> const & channel) const
Expand Down Expand Up @@ -452,6 +463,7 @@ std::vector<nano::representative> nano::rep_crawler::representatives (std::size_
}

std::vector<nano::representative> result;
result.reserve (ordered.size ());
for (auto i = ordered.begin (), n = ordered.end (); i != n && result.size () < count; ++i)
{
auto const & [weight, rep] = *i;
Expand Down
15 changes: 9 additions & 6 deletions nano/node/repcrawler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class rep_crawler final
bool process (std::shared_ptr<nano::vote> const &, std::shared_ptr<nano::transport::channel> const &);

/** Attempt to determine if the peer manages one or more representative accounts */
void query (std::vector<std::shared_ptr<nano::transport::channel>> const & target_channels);
void query (std::deque<std::shared_ptr<nano::transport::channel>> const & target_channels);

/** Attempt to determine if the peer manages one or more representative accounts */
void query (std::shared_ptr<nano::transport::channel> const & target_channel);
Expand All @@ -74,12 +74,10 @@ class rep_crawler final
/** Get total available weight from representatives */
nano::uint128_t total_weight () const;

/** Request a list of the top \p count known representatives in descending order of weight, with at least \p weight_a voting weight, and optionally with a minimum version \p minimum_protocol_version
*/
/** Request a list of the top \p count known representatives in descending order of weight, with at least \p weight_a voting weight, and optionally with a minimum version \p minimum_protocol_version */
std::vector<representative> representatives (std::size_t count = std::numeric_limits<std::size_t>::max (), nano::uint128_t minimum_weight = 0, std::optional<decltype (nano::network_constants::protocol_version)> const & minimum_protocol_version = {}) const;

/** Request a list of the top \p count known principal representatives in descending order of weight, optionally with a minimum version \p minimum_protocol_version
*/
/** Request a list of the top \p count known principal representatives in descending order of weight, optionally with a minimum version \p minimum_protocol_version */
std::vector<representative> principal_representatives (std::size_t count = std::numeric_limits<std::size_t>::max (), std::optional<decltype (nano::network_constants::protocol_version)> const & minimum_protocol_version = {}) const;

/** Total number of representatives */
Expand All @@ -105,7 +103,8 @@ class rep_crawler final
using hash_root_t = std::pair<nano::block_hash, nano::root>;

/** Returns a list of endpoints to crawl. The total weight is passed in to avoid computing it twice. */
std::vector<std::shared_ptr<nano::transport::channel>> prepare_crawl_targets (bool sufficient_weight) const;
std::deque<std::shared_ptr<nano::transport::channel>> prepare_crawl_targets (bool sufficient_weight) const;
/** Return a random ledger block to query */
std::optional<hash_root_t> prepare_query_target ();
bool track_rep_request (hash_root_t hash_root, std::shared_ptr<nano::transport::channel> const & channel);

Expand Down Expand Up @@ -172,9 +171,13 @@ class rep_crawler final

private:
static size_t constexpr max_responses{ 1024 * 4 };

using response_t = std::pair<std::shared_ptr<nano::transport::channel>, std::shared_ptr<nano::vote>>;
boost::circular_buffer<response_t> responses{ max_responses };

// Freshly established connections that should be queried asap
std::deque<std::shared_ptr<nano::transport::channel>> prioritized;

std::chrono::steady_clock::time_point last_query{};

std::atomic<bool> stopped{ false };
Expand Down

0 comments on commit 3544eeb

Please sign in to comment.