diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 535a533ff3..2102685b84 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -3456,13 +3457,13 @@ TEST (node, DISABLED_pruning_age) ASSERT_EQ (3, node1.ledger.block_count ()); // Pruning with default age 1 day - node1.ledger_pruning (1, true); + node1.pruning.ledger_pruning (1, true); ASSERT_EQ (0, node1.ledger.pruned_count ()); ASSERT_EQ (3, node1.ledger.block_count ()); // Pruning with max age 0 node1.config.max_pruning_age = std::chrono::seconds{ 0 }; - node1.ledger_pruning (1, true); + node1.pruning.ledger_pruning (1, true); ASSERT_EQ (1, node1.ledger.pruned_count ()); ASSERT_EQ (3, node1.ledger.block_count ()); @@ -3517,13 +3518,13 @@ TEST (node, DISABLED_pruning_depth) ASSERT_EQ (3, node1.ledger.block_count ()); // Pruning with default depth (unlimited) - node1.ledger_pruning (1, true); + node1.pruning.ledger_pruning (1, true); ASSERT_EQ (0, node1.ledger.pruned_count ()); ASSERT_EQ (3, node1.ledger.block_count ()); // Pruning with max depth 1 node1.config.max_pruning_depth = 1; - node1.ledger_pruning (1, true); + node1.pruning.ledger_pruning (1, true); ASSERT_EQ (1, node1.ledger.pruned_count ()); ASSERT_EQ (3, node1.ledger.block_count ()); diff --git a/nano/lib/logging_enums.hpp b/nano/lib/logging_enums.hpp index dd5e88fe32..64297dbb55 100644 --- a/nano/lib/logging_enums.hpp +++ b/nano/lib/logging_enums.hpp @@ -59,7 +59,7 @@ enum class type tcp_server, tcp_listener, tcp_channels, - prunning, + pruning, conf_processor_bounded, conf_processor_unbounded, distributed_work, diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 4f7f0fbb35..497166bf8f 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -115,6 +115,7 @@ enum class type message_processor_type, process_confirmed, online_reps, + pruning, _last // Must be the last enum }; @@ -643,6 +644,12 @@ enum class detail block_confirmed, large_backlog, + // pruning + ledger_pruning, + pruning_target, + pruned_count, + collect_targets, + _last // Must be the last enum }; diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index 8b4c0314a9..d18901bc8f 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -193,6 +193,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::http_callbacks: thread_role_name_string = "HTTP callbacks"; break; + case nano::thread_role::name::pruning: + thread_role_name_string = "Pruning"; + break; default: debug_assert (false && "nano::thread_role::get_string unhandled thread role"); } diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index 349c02f1bd..6ed5479e6e 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -69,6 +69,7 @@ enum class name online_reps, monitor, http_callbacks, + pruning, }; std::string_view to_string (name); diff --git a/nano/nano_node/entry.cpp b/nano/nano_node/entry.cpp index b096cc7eaa..b2aa9d1393 100644 --- a/nano/nano_node/entry.cpp +++ b/nano/nano_node/entry.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -1923,7 +1924,7 @@ int main (int argc, char * const * argv) nano::update_flags (node_flags, vm); nano::inactive_node inactive_node (data_path, node_flags); auto node = inactive_node.node; - node->ledger_pruning (node_flags.block_processor_batch_size != 0 ? node_flags.block_processor_batch_size : 16 * 1024, true); + node->pruning.ledger_pruning (node_flags.block_processor_batch_size != 0 ? node_flags.block_processor_batch_size : 16 * 1024, true); } else if (vm.count ("debug_stacktrace")) { diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 396fe33cd1..11940bbd4e 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -125,6 +125,8 @@ add_library( portmapping.cpp process_live_dispatcher.cpp process_live_dispatcher.hpp + pruning.hpp + pruning.cpp recently_cemented_cache.cpp recently_cemented_cache.hpp recently_confirmed_cache.cpp diff --git a/nano/node/fwd.hpp b/nano/node/fwd.hpp index 8aa4674085..1e723b1dc2 100644 --- a/nano/node/fwd.hpp +++ b/nano/node/fwd.hpp @@ -29,6 +29,7 @@ class node_config; class node_flags; class node_observers; class online_reps; +class pruning; class recently_cemented_cache; class recently_confirmed_cache; class rep_crawler; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index f3f7e679b8..b4e30be2bc 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -196,6 +197,8 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy monitor{ *monitor_impl }, http_callbacks_impl{ std::make_unique (*this) }, http_callbacks{ *http_callbacks_impl }, + pruning_impl{ std::make_unique (config, flags, ledger, stats, logger) }, + pruning{ *pruning_impl }, startup_time{ std::chrono::steady_clock::now () }, node_seq{ seq } { @@ -491,13 +494,6 @@ void nano::node::start () network.start (); message_processor.start (); - if (flags.enable_pruning) - { - auto this_l (shared ()); - workers.post ([this_l] () { - this_l->ongoing_ledger_pruning (); - }); - } if (!flags.disable_rep_crawler) { rep_crawler.start (); @@ -559,6 +555,7 @@ void nano::node::start () online_reps.start (); monitor.start (); http_callbacks.start (); + pruning.start (); add_initial_peers (); } @@ -607,6 +604,7 @@ void nano::node::stop () network.stop (); monitor.stop (); http_callbacks.stop (); + pruning.stop (); bootstrap_workers.stop (); wallet_workers.stop (); @@ -702,110 +700,6 @@ void nano::node::search_receivable_all () }); } -bool nano::node::collect_ledger_pruning_targets (std::deque & pruning_targets_a, nano::account & last_account_a, uint64_t const batch_read_size_a, uint64_t const max_depth_a, uint64_t const cutoff_time_a) -{ - uint64_t read_operations (0); - bool finish_transaction (false); - auto transaction = ledger.tx_begin_read (); - for (auto i (store.confirmation_height.begin (transaction, last_account_a)), n (store.confirmation_height.end (transaction)); i != n && !finish_transaction;) - { - ++read_operations; - auto const & account (i->first); - nano::block_hash hash (i->second.frontier); - uint64_t depth (0); - while (!hash.is_zero () && depth < max_depth_a) - { - auto block = ledger.any.block_get (transaction, hash); - if (block != nullptr) - { - if (block->sideband ().timestamp > cutoff_time_a || depth == 0) - { - hash = block->previous (); - } - else - { - break; - } - } - else - { - release_assert (depth != 0); - hash = 0; - } - if (++depth % batch_read_size_a == 0) - { - // FIXME: This is triggering an assertion where the iterator is still used after transaction is refreshed - transaction.refresh (); - } - } - if (!hash.is_zero ()) - { - pruning_targets_a.push_back (hash); - } - read_operations += depth; - if (read_operations >= batch_read_size_a) - { - last_account_a = inc_sat (account.number ()); - finish_transaction = true; - } - else - { - ++i; - } - } - return !finish_transaction || last_account_a.is_zero (); -} - -void nano::node::ledger_pruning (uint64_t const batch_size_a, bool bootstrap_weight_reached_a) -{ - uint64_t const max_depth (config.max_pruning_depth != 0 ? config.max_pruning_depth : std::numeric_limits::max ()); - uint64_t const cutoff_time (bootstrap_weight_reached_a ? nano::seconds_since_epoch () - config.max_pruning_age.count () : std::numeric_limits::max ()); - uint64_t pruned_count (0); - uint64_t transaction_write_count (0); - nano::account last_account (1); // 0 Burn account is never opened. So it can be used to break loop - std::deque pruning_targets; - bool target_finished (false); - while ((transaction_write_count != 0 || !target_finished) && !stopped) - { - // Search pruning targets - while (pruning_targets.size () < batch_size_a && !target_finished && !stopped) - { - target_finished = collect_ledger_pruning_targets (pruning_targets, last_account, batch_size_a * 2, max_depth, cutoff_time); - } - // Pruning write operation - transaction_write_count = 0; - if (!pruning_targets.empty () && !stopped) - { - auto write_transaction = ledger.tx_begin_write (nano::store::writer::pruning); - while (!pruning_targets.empty () && transaction_write_count < batch_size_a && !stopped) - { - auto const & pruning_hash (pruning_targets.front ()); - auto account_pruned_count (ledger.pruning_action (write_transaction, pruning_hash, batch_size_a)); - transaction_write_count += account_pruned_count; - pruning_targets.pop_front (); - } - pruned_count += transaction_write_count; - - logger.debug (nano::log::type::prunning, "Pruned blocks: {}", pruned_count); - } - } - - logger.debug (nano::log::type::prunning, "Total recently pruned block count: {}", pruned_count); -} - -void nano::node::ongoing_ledger_pruning () -{ - auto bootstrap_weight_reached (ledger.block_count () >= ledger.bootstrap_weight_max_blocks); - ledger_pruning (flags.block_processor_batch_size != 0 ? flags.block_processor_batch_size : 2 * 1024, bootstrap_weight_reached); - auto const ledger_pruning_interval (bootstrap_weight_reached ? config.max_pruning_age : std::min (config.max_pruning_age, std::chrono::seconds (15 * 60))); - auto this_l (shared ()); - workers.post_delayed (ledger_pruning_interval, [this_l] () { - this_l->workers.post ([this_l] () { - this_l->ongoing_ledger_pruning (); - }); - }); -} - uint64_t nano::node::default_difficulty (nano::work_version const version_a) const { uint64_t result{ std::numeric_limits::max () }; @@ -1078,6 +972,7 @@ nano::container_info nano::node::container_info () const info.add ("backlog_scan", backlog_scan.container_info ()); info.add ("bounded_backlog", backlog.container_info ()); info.add ("http_callbacks", http_callbacks.container_info ()); + info.add ("pruning", pruning.container_info ()); return info; } diff --git a/nano/node/node.hpp b/nano/node/node.hpp index f25098be8e..a263f806c9 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -61,9 +61,6 @@ class node final : public std::enable_shared_from_this nano::uint128_t minimum_principal_weight (); void backup_wallet (); void search_receivable_all (); - bool collect_ledger_pruning_targets (std::deque &, nano::account &, uint64_t const, uint64_t const, uint64_t const); - void ledger_pruning (uint64_t const, bool); - void ongoing_ledger_pruning (); // The default difficulty updates to base only when the first epoch_2 block is processed uint64_t default_difficulty (nano::work_version const) const; uint64_t default_receive_difficulty (nano::work_version const) const; @@ -202,6 +199,8 @@ class node final : public std::enable_shared_from_this nano::monitor & monitor; std::unique_ptr http_callbacks_impl; nano::http_callbacks & http_callbacks; + std::unique_ptr pruning_impl; + nano::pruning & pruning; public: std::chrono::steady_clock::time_point const startup_time; diff --git a/nano/node/pruning.cpp b/nano/node/pruning.cpp new file mode 100644 index 0000000000..ce06b9d38a --- /dev/null +++ b/nano/node/pruning.cpp @@ -0,0 +1,152 @@ +#include +#include +#include +#include + +nano::pruning::pruning (nano::node_config const & config_a, nano::node_flags const & flags_a, nano::ledger & ledger_a, nano::stats & stats_a, nano::logger & logger_a) : + config{ config_a }, + flags{ flags_a }, + ledger{ ledger_a }, + stats{ stats_a }, + logger{ logger_a }, + workers{ /* single threaded */ 1, nano::thread_role::name::pruning } +{ +} + +nano::pruning::~pruning () +{ + // Must be stopped before destruction + debug_assert (stopped); +} + +void nano::pruning::start () +{ + if (flags.enable_pruning) + { + workers.start (); + workers.post ([this] () { + ongoing_ledger_pruning (); + }); + } +} + +void nano::pruning::stop () +{ + stopped = true; + workers.stop (); +} + +void nano::pruning::ongoing_ledger_pruning () +{ + auto bootstrap_weight_reached (ledger.block_count () >= ledger.bootstrap_weight_max_blocks); + ledger_pruning (flags.block_processor_batch_size != 0 ? flags.block_processor_batch_size : 2 * 1024, bootstrap_weight_reached); + auto const ledger_pruning_interval (bootstrap_weight_reached ? config.max_pruning_age : std::min (config.max_pruning_age, std::chrono::seconds (15 * 60))); + workers.post_delayed (ledger_pruning_interval, [this] () { + workers.post ([this] () { + ongoing_ledger_pruning (); + }); + }); +} + +void nano::pruning::ledger_pruning (uint64_t const batch_size_a, bool bootstrap_weight_reached_a) +{ + stats.inc (nano::stat::type::pruning, nano::stat::detail::ledger_pruning); + + uint64_t const max_depth (config.max_pruning_depth != 0 ? config.max_pruning_depth : std::numeric_limits::max ()); + uint64_t const cutoff_time (bootstrap_weight_reached_a ? nano::seconds_since_epoch () - config.max_pruning_age.count () : std::numeric_limits::max ()); + uint64_t pruned_count (0); + uint64_t transaction_write_count (0); + nano::account last_account (1); // 0 Burn account is never opened. So it can be used to break loop + std::deque pruning_targets; + bool target_finished (false); + while ((transaction_write_count != 0 || !target_finished) && !stopped) + { + // Search pruning targets + while (pruning_targets.size () < batch_size_a && !target_finished && !stopped) + { + stats.inc (nano::stat::type::pruning, nano::stat::detail::collect_targets); + target_finished = collect_ledger_pruning_targets (pruning_targets, last_account, batch_size_a * 2, max_depth, cutoff_time); + } + // Pruning write operation + transaction_write_count = 0; + if (!pruning_targets.empty () && !stopped) + { + auto write_transaction = ledger.tx_begin_write (nano::store::writer::pruning); + while (!pruning_targets.empty () && transaction_write_count < batch_size_a && !stopped) + { + stats.inc (nano::stat::type::pruning, nano::stat::detail::pruning_target); + + auto const & pruning_hash (pruning_targets.front ()); + auto account_pruned_count (ledger.pruning_action (write_transaction, pruning_hash, batch_size_a)); + transaction_write_count += account_pruned_count; + pruning_targets.pop_front (); + + stats.add (nano::stat::type::pruning, nano::stat::detail::pruned_count, account_pruned_count); + } + pruned_count += transaction_write_count; + + logger.debug (nano::log::type::pruning, "Pruned blocks: {}", pruned_count); + } + } + + logger.debug (nano::log::type::pruning, "Total recently pruned block count: {}", pruned_count); +} + +bool nano::pruning::collect_ledger_pruning_targets (std::deque & pruning_targets_a, nano::account & last_account_a, uint64_t const batch_read_size_a, uint64_t const max_depth_a, uint64_t const cutoff_time_a) +{ + uint64_t read_operations (0); + bool finish_transaction (false); + auto transaction = ledger.tx_begin_read (); + for (auto i (ledger.store.confirmation_height.begin (transaction, last_account_a)), n (ledger.store.confirmation_height.end (transaction)); i != n && !finish_transaction;) + { + ++read_operations; + auto const & account (i->first); + nano::block_hash hash (i->second.frontier); + uint64_t depth (0); + while (!hash.is_zero () && depth < max_depth_a) + { + auto block = ledger.any.block_get (transaction, hash); + if (block != nullptr) + { + if (block->sideband ().timestamp > cutoff_time_a || depth == 0) + { + hash = block->previous (); + } + else + { + break; + } + } + else + { + release_assert (depth != 0); + hash = 0; + } + if (++depth % batch_read_size_a == 0) + { + // FIXME: This is triggering an assertion where the iterator is still used after transaction is refreshed + transaction.refresh (); + } + } + if (!hash.is_zero ()) + { + pruning_targets_a.push_back (hash); + } + read_operations += depth; + if (read_operations >= batch_read_size_a) + { + last_account_a = inc_sat (account.number ()); + finish_transaction = true; + } + else + { + ++i; + } + } + return !finish_transaction || last_account_a.is_zero (); +} + +nano::container_info nano::pruning::container_info () const +{ + return workers.container_info (); +} \ No newline at end of file diff --git a/nano/node/pruning.hpp b/nano/node/pruning.hpp new file mode 100644 index 0000000000..5688336758 --- /dev/null +++ b/nano/node/pruning.hpp @@ -0,0 +1,39 @@ +#pragma once + +#include +#include + +#include +#include + +namespace nano +{ +class pruning final +{ +public: + pruning (nano::node_config const &, nano::node_flags const &, nano::ledger &, nano::stats &, nano::logger &); + ~pruning (); + + void start (); + void stop (); + + nano::container_info container_info () const; + + void ongoing_ledger_pruning (); + void ledger_pruning (uint64_t batch_size, bool bootstrap_weight_reached); + bool collect_ledger_pruning_targets (std::deque & pruning_targets_out, nano::account & last_account_out, uint64_t batch_read_size, uint64_t max_depth, uint64_t cutoff_time); + +private: // Dependencies + nano::node_config const & config; + nano::node_flags const & flags; + nano::ledger & ledger; + nano::stats & stats; + nano::logger & logger; + +private: + void run (); + + std::atomic stopped{ false }; + nano::thread_pool workers; +}; +} \ No newline at end of file