Skip to content

Commit

Permalink
Merge pull request #4642 from pwojcikdev/cemented-observers-optimize
Browse files Browse the repository at this point in the history
Optimize cemented callbacks
  • Loading branch information
pwojcikdev authored Jun 18, 2024
2 parents f874eae + ea84af9 commit 70148f2
Show file tree
Hide file tree
Showing 17 changed files with 286 additions and 104 deletions.
6 changes: 3 additions & 3 deletions nano/core_test/active_elections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ TEST (active_elections, dropped_cleanup)
ASSERT_FALSE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));

// An election was recently dropped
ASSERT_EQ (1, node.stats.count (nano::stat::type::active_dropped, nano::stat::detail::manual));
ASSERT_EQ (1, node.stats.count (nano::stat::type::active_elections_dropped, nano::stat::detail::manual));

// Block cleared from active
ASSERT_FALSE (node.vote_router.active (hash));
Expand All @@ -684,7 +684,7 @@ TEST (active_elections, dropped_cleanup)
ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));

// Not dropped
ASSERT_EQ (1, node.stats.count (nano::stat::type::active_dropped, nano::stat::detail::manual));
ASSERT_EQ (1, node.stats.count (nano::stat::type::active_elections_dropped, nano::stat::detail::manual));

// Block cleared from active
ASSERT_FALSE (node.vote_router.active (hash));
Expand Down Expand Up @@ -1387,7 +1387,7 @@ TEST (active_elections, limit_vote_hinted_elections)
ASSERT_TIMELY (5s, nano::test::active (node, { open1 }));

// Ensure there was no overflow of elections
ASSERT_EQ (0, node.stats.count (nano::stat::type::active_dropped, nano::stat::detail::priority));
ASSERT_EQ (0, node.stats.count (nano::stat::type::active_elections_dropped, nano::stat::detail::priority));
}

/*
Expand Down
8 changes: 4 additions & 4 deletions nano/core_test/confirming_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ using namespace std::chrono_literals;
TEST (confirming_set, construction)
{
auto ctx = nano::test::context::ledger_empty ();
nano::confirming_set confirming_set (ctx.ledger ());
nano::confirming_set confirming_set (ctx.ledger (), ctx.stats ());
}

TEST (confirming_set, add_exists)
{
auto ctx = nano::test::context::ledger_send_receive ();
nano::confirming_set confirming_set (ctx.ledger ());
nano::confirming_set confirming_set (ctx.ledger (), ctx.stats ());
auto send = ctx.blocks ()[0];
confirming_set.add (send->hash ());
ASSERT_TRUE (confirming_set.exists (send->hash ()));
Expand All @@ -34,7 +34,7 @@ TEST (confirming_set, add_exists)
TEST (confirming_set, process_one)
{
auto ctx = nano::test::context::ledger_send_receive ();
nano::confirming_set confirming_set (ctx.ledger ());
nano::confirming_set confirming_set (ctx.ledger (), ctx.stats ());
std::atomic<int> count = 0;
std::mutex mutex;
std::condition_variable condition;
Expand All @@ -50,7 +50,7 @@ TEST (confirming_set, process_one)
TEST (confirming_set, process_multiple)
{
auto ctx = nano::test::context::ledger_send_receive ();
nano::confirming_set confirming_set (ctx.ledger ());
nano::confirming_set confirming_set (ctx.ledger (), ctx.stats ());
std::atomic<int> count = 0;
std::mutex mutex;
std::condition_variable condition;
Expand Down
1 change: 1 addition & 0 deletions nano/lib/logging_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ enum class detail
// active_elections
active_started,
active_stopped,
active_cemented,

// election
election_confirmed,
Expand Down
37 changes: 33 additions & 4 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ enum class type
socket,
confirmation_height,
confirmation_observer,
confirming_set,
drop,
aggregator,
requests,
Expand All @@ -60,10 +61,12 @@ enum class type
bootstrap_server_response,
active,
active_elections,
active_started,
active_confirmed,
active_dropped,
active_timeout,
active_elections_started,
active_elections_stopped,
active_elections_confirmed,
active_elections_dropped,
active_elections_timeout,
active_elections_cemented,
backlog,
unchecked,
election_scheduler,
Expand Down Expand Up @@ -114,6 +117,11 @@ enum class detail
rebroadcast,
queue_overflow,
triggered,
notify,
duplicate,
confirmed,
unconfirmed,
cemented,

// processing queue
queue,
Expand Down Expand Up @@ -373,6 +381,10 @@ enum class detail
insert,
insert_failed,

// active_elections
started,
stopped,

// unchecked
put,
satisfied,
Expand Down Expand Up @@ -440,6 +452,23 @@ enum class detail
tier_2,
tier_3,

// confirming_set
notify_cemented,
notify_already_cemented,
already_cemented,

// election_state
passive,
active,
expired_confirmed,
expired_unconfirmed,

// election_status_type
ongoing,
active_confirmed_quorum,
active_confirmation_height,
inactive_confirmation_height,

_last // Must be the last enum
};

Expand Down
4 changes: 2 additions & 2 deletions nano/lib/thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ void nano::thread_pool::set_thread_names (nano::thread_role::name thread_name)
thread_names_latch.wait ();
}

std::unique_ptr<nano::container_info_component> nano::collect_container_info (thread_pool & thread_pool, std::string const & name)
std::unique_ptr<nano::container_info_component> nano::thread_pool::collect_container_info (std::string const & name) const
{
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "count", thread_pool.num_queued_tasks (), sizeof (std::function<void ()>) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "count", num_queued_tasks (), sizeof (std::function<void ()>) }));
return composite;
}
6 changes: 3 additions & 3 deletions nano/lib/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace nano
class thread_pool final
{
public:
explicit thread_pool (unsigned, nano::thread_role::name);
explicit thread_pool (unsigned num_threads, nano::thread_role::name);
~thread_pool ();

/** This will run when there is an available thread for execution */
Expand All @@ -37,6 +37,8 @@ class thread_pool final
/** Returns the number of tasks which are awaiting execution by the thread pool **/
uint64_t num_queued_tasks () const;

std::unique_ptr<nano::container_info_component> collect_container_info (std::string const & name) const;

private:
nano::mutex mutex;
std::atomic<bool> stopped{ false };
Expand All @@ -48,6 +50,4 @@ class thread_pool final
std::latch thread_names_latch;
void set_thread_names (nano::thread_role::name thread_name);
};

std::unique_ptr<nano::container_info_component> collect_container_info (thread_pool & thread_pool, std::string const & name);
} // namespace nano
9 changes: 9 additions & 0 deletions nano/lib/thread_roles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,21 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::confirmation_height_processing:
thread_role_name_string = "Conf height";
break;
case nano::thread_role::name::confirmation_height_notifications:
thread_role_name_string = "Conf notif";
break;
case nano::thread_role::name::worker:
thread_role_name_string = "Worker";
break;
case nano::thread_role::name::bootstrap_worker:
thread_role_name_string = "Bootstrap work";
break;
case nano::thread_role::name::wallet_worker:
thread_role_name_string = "Wallet work";
break;
case nano::thread_role::name::election_worker:
thread_role_name_string = "Election work";
break;
case nano::thread_role::name::request_aggregator:
thread_role_name_string = "Req aggregator";
break;
Expand Down
3 changes: 3 additions & 0 deletions nano/lib/thread_roles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ enum class name
rpc_request_processor,
rpc_process_container,
confirmation_height_processing,
confirmation_height_notifications,
worker,
bootstrap_worker,
wallet_worker,
election_worker,
request_aggregator,
state_block_signature_verification,
epoch_upgrader,
Expand Down
95 changes: 68 additions & 27 deletions nano/node/active_elections.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <nano/lib/blocks.hpp>
#include <nano/lib/enum_util.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/lib/threading.hpp>
#include <nano/node/active_elections.hpp>
Expand All @@ -18,25 +19,31 @@

using namespace std::chrono;

nano::active_elections::active_elections (nano::node & node_a, nano::confirming_set & confirming_set, nano::block_processor & block_processor_a) :
nano::active_elections::active_elections (nano::node & node_a, nano::confirming_set & confirming_set_a, nano::block_processor & block_processor_a) :
config{ node_a.config.active_elections },
node{ node_a },
confirming_set{ confirming_set },
confirming_set{ confirming_set_a },
block_processor{ block_processor_a },
recently_confirmed{ config.confirmation_cache },
recently_cemented{ config.confirmation_history_size },
election_time_to_live{ node_a.network_params.network.is_dev_network () ? 0s : 2s }
{
count_by_behavior.fill (0); // Zero initialize array

// Register a callback which will get called after a block is cemented
confirming_set.cemented_observers.add ([this] (std::shared_ptr<nano::block> const & callback_block_a) {
this->block_cemented_callback (callback_block_a);
});
confirming_set.batch_cemented.add ([this] (nano::confirming_set::cemented_notification const & notification) {
{
auto transaction = node.ledger.tx_begin_read ();
for (auto const & [block, confirmation_root] : notification.cemented)
{
transaction.refresh_if_needed ();

// Register a callback which will get called if a block is already cemented
confirming_set.block_already_cemented_observers.add ([this] (nano::block_hash const & hash_a) {
this->block_already_cemented_callback (hash_a);
block_cemented_callback (transaction, block, confirmation_root);
}
}
for (auto const & hash : notification.already_cemented)
{
block_already_cemented_callback (hash);
}
});

// Notify elections about alternative (forked) blocks
Expand Down Expand Up @@ -84,9 +91,10 @@ void nano::active_elections::stop ()
clear ();
}

void nano::active_elections::block_cemented_callback (std::shared_ptr<nano::block> const & block)
void nano::active_elections::block_cemented_callback (nano::secure::transaction const & transaction, std::shared_ptr<nano::block> const & block, nano::block_hash const & confirmation_root)
{
debug_assert (node.block_confirmed (block->hash ()));

if (auto election_l = election (block->qualified_root ()))
{
election_l->try_confirm (block->hash ());
Expand All @@ -100,7 +108,7 @@ void nano::active_elections::block_cemented_callback (std::shared_ptr<nano::bloc
status = election->get_status ();
votes = election->votes_with_weight ();
}
if (confirming_set.exists (block->hash ()))
if (block->hash () == confirmation_root)
{
status.type = nano::election_status_type::active_confirmed_quorum;
}
Expand All @@ -113,8 +121,14 @@ void nano::active_elections::block_cemented_callback (std::shared_ptr<nano::bloc
status.type = nano::election_status_type::inactive_confirmation_height;
}
recently_cemented.put (status);
auto transaction = node.ledger.tx_begin_read ();

node.stats.inc (nano::stat::type::active_elections, nano::stat::detail::cemented);
node.stats.inc (nano::stat::type::active_elections_cemented, to_stat_detail (status.type));

node.logger.trace (nano::log::type::active_elections, nano::log::detail::active_cemented, nano::log::arg{ "election", election });

notify_observers (transaction, status, votes);

bool cemented_bootstrap_count_reached = node.ledger.cemented_count () >= node.ledger.bootstrap_weight_max_blocks;
bool was_active = status.type == nano::election_status_type::active_confirmed_quorum || status.type == nano::election_status_type::active_confirmation_height;

Expand Down Expand Up @@ -296,7 +310,11 @@ void nano::active_elections::cleanup_election (nano::unique_lock<nano::mutex> &

roots.get<tag_root> ().erase (roots.get<tag_root> ().find (election->qualified_root));

node.stats.inc (completion_type (*election), to_stat_detail (election->behavior ()));
node.stats.inc (nano::stat::type::active_elections, nano::stat::detail::stopped);
node.stats.inc (nano::stat::type::active_elections, election->confirmed () ? nano::stat::detail::confirmed : nano::stat::detail::unconfirmed);
node.stats.inc (nano::stat::type::active_elections_stopped, to_stat_detail (election->state ()));
node.stats.inc (to_stat_type (election->state ()), to_stat_detail (election->behavior ()));

node.logger.trace (nano::log::type::active_elections, nano::log::detail::active_stopped, nano::log::arg{ "election", election });

node.logger.debug (nano::log::type::active_elections, "Erased election for blocks: {} (behavior: {}, state: {})",
Expand Down Expand Up @@ -326,19 +344,6 @@ void nano::active_elections::cleanup_election (nano::unique_lock<nano::mutex> &
}
}

nano::stat::type nano::active_elections::completion_type (nano::election const & election) const
{
if (election.confirmed ())
{
return nano::stat::type::active_confirmed;
}
if (election.failed ())
{
return nano::stat::type::active_timeout;
}
return nano::stat::type::active_dropped;
}

std::vector<std::shared_ptr<nano::election>> nano::active_elections::list_active (std::size_t max_a)
{
nano::lock_guard<nano::mutex> guard{ mutex };
Expand Down Expand Up @@ -415,7 +420,9 @@ nano::election_insertion_result nano::active_elections::insert (std::shared_ptr<
debug_assert (count_by_behavior[result.election->behavior ()] >= 0);
count_by_behavior[result.election->behavior ()]++;

node.stats.inc (nano::stat::type::active_started, to_stat_detail (election_behavior_a));
node.stats.inc (nano::stat::type::active_elections, nano::stat::detail::started);
node.stats.inc (nano::stat::type::active_elections_started, to_stat_detail (election_behavior_a));

node.logger.trace (nano::log::type::active_elections, nano::log::detail::active_started,
nano::log::arg{ "behavior", election_behavior_a },
nano::log::arg{ "election", result.election });
Expand Down Expand Up @@ -592,3 +599,37 @@ nano::error nano::active_elections_config::deserialize (nano::tomlconfig & toml)

return toml.get_error ();
}

/*
*
*/

nano::stat::type nano::to_stat_type (nano::election_state state)
{
switch (state)
{
case election_state::passive:
case election_state::active:
return nano::stat::type::active_elections_dropped;
break;
case election_state::confirmed:
case election_state::expired_confirmed:
return nano::stat::type::active_elections_confirmed;
break;
case election_state::expired_unconfirmed:
return nano::stat::type::active_elections_timeout;
break;
}
debug_assert (false);
return {};
}

nano::stat::detail nano::to_stat_detail (nano::election_state state)
{
return nano::enum_util::cast<nano::stat::detail> (state);
}

nano::stat::detail nano::to_stat_detail (nano::election_status_type type)
{
return nano::enum_util::cast<nano::stat::detail> (type);
}
Loading

0 comments on commit 70148f2

Please sign in to comment.