Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract pruning class #4825

Merged
merged 3 commits into from
Jan 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <nano/node/make_store.hpp>
#include <nano/node/online_reps.hpp>
#include <nano/node/portmapping.hpp>
#include <nano/node/pruning.hpp>
#include <nano/node/scheduler/component.hpp>
#include <nano/node/scheduler/manual.hpp>
#include <nano/node/scheduler/priority.hpp>
Expand Down Expand Up @@ -3456,13 +3457,13 @@ TEST (node, DISABLED_pruning_age)
ASSERT_EQ (3, node1.ledger.block_count ());

// Pruning with default age 1 day
node1.ledger_pruning (1, true);
node1.pruning.ledger_pruning (1, true);
ASSERT_EQ (0, node1.ledger.pruned_count ());
ASSERT_EQ (3, node1.ledger.block_count ());

// Pruning with max age 0
node1.config.max_pruning_age = std::chrono::seconds{ 0 };
node1.ledger_pruning (1, true);
node1.pruning.ledger_pruning (1, true);
ASSERT_EQ (1, node1.ledger.pruned_count ());
ASSERT_EQ (3, node1.ledger.block_count ());

Expand Down Expand Up @@ -3517,13 +3518,13 @@ TEST (node, DISABLED_pruning_depth)
ASSERT_EQ (3, node1.ledger.block_count ());

// Pruning with default depth (unlimited)
node1.ledger_pruning (1, true);
node1.pruning.ledger_pruning (1, true);
ASSERT_EQ (0, node1.ledger.pruned_count ());
ASSERT_EQ (3, node1.ledger.block_count ());

// Pruning with max depth 1
node1.config.max_pruning_depth = 1;
node1.ledger_pruning (1, true);
node1.pruning.ledger_pruning (1, true);
ASSERT_EQ (1, node1.ledger.pruned_count ());
ASSERT_EQ (3, node1.ledger.block_count ());

Expand Down
2 changes: 1 addition & 1 deletion nano/lib/logging_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ enum class type
tcp_server,
tcp_listener,
tcp_channels,
prunning,
pruning,
conf_processor_bounded,
conf_processor_unbounded,
distributed_work,
Expand Down
7 changes: 7 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ enum class type
message_processor_type,
process_confirmed,
online_reps,
pruning,

_last // Must be the last enum
};
Expand Down Expand Up @@ -643,6 +644,12 @@ enum class detail
block_confirmed,
large_backlog,

// pruning
ledger_pruning,
pruning_target,
pruned_count,
collect_targets,

_last // Must be the last enum
};

Expand Down
3 changes: 3 additions & 0 deletions nano/lib/thread_roles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::http_callbacks:
thread_role_name_string = "HTTP callbacks";
break;
case nano::thread_role::name::pruning:
thread_role_name_string = "Pruning";
break;
default:
debug_assert (false && "nano::thread_role::get_string unhandled thread role");
}
Expand Down
1 change: 1 addition & 0 deletions nano/lib/thread_roles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ enum class name
online_reps,
monitor,
http_callbacks,
pruning,
};

std::string_view to_string (name);
Expand Down
3 changes: 2 additions & 1 deletion nano/nano_node/entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <nano/node/json_handler.hpp>
#include <nano/node/node.hpp>
#include <nano/node/online_reps.hpp>
#include <nano/node/pruning.hpp>
#include <nano/node/transport/inproc.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/secure/ledger_set_any.hpp>
Expand Down Expand Up @@ -1923,7 +1924,7 @@ int main (int argc, char * const * argv)
nano::update_flags (node_flags, vm);
nano::inactive_node inactive_node (data_path, node_flags);
auto node = inactive_node.node;
node->ledger_pruning (node_flags.block_processor_batch_size != 0 ? node_flags.block_processor_batch_size : 16 * 1024, true);
node->pruning.ledger_pruning (node_flags.block_processor_batch_size != 0 ? node_flags.block_processor_batch_size : 16 * 1024, true);
}
else if (vm.count ("debug_stacktrace"))
{
Expand Down
2 changes: 2 additions & 0 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ add_library(
portmapping.cpp
process_live_dispatcher.cpp
process_live_dispatcher.hpp
pruning.hpp
pruning.cpp
recently_cemented_cache.cpp
recently_cemented_cache.hpp
recently_confirmed_cache.cpp
Expand Down
1 change: 1 addition & 0 deletions nano/node/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class node_config;
class node_flags;
class node_observers;
class online_reps;
class pruning;
class recently_cemented_cache;
class recently_confirmed_cache;
class rep_crawler;
Expand Down
117 changes: 6 additions & 111 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <nano/node/online_reps.hpp>
#include <nano/node/peer_history.hpp>
#include <nano/node/portmapping.hpp>
#include <nano/node/pruning.hpp>
#include <nano/node/request_aggregator.hpp>
#include <nano/node/rpc_callbacks.hpp>
#include <nano/node/scheduler/component.hpp>
Expand Down Expand Up @@ -196,6 +197,8 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
monitor{ *monitor_impl },
http_callbacks_impl{ std::make_unique<nano::http_callbacks> (*this) },
http_callbacks{ *http_callbacks_impl },
pruning_impl{ std::make_unique<nano::pruning> (config, flags, ledger, stats, logger) },
pruning{ *pruning_impl },
startup_time{ std::chrono::steady_clock::now () },
node_seq{ seq }
{
Expand Down Expand Up @@ -491,13 +494,6 @@ void nano::node::start ()
network.start ();
message_processor.start ();

if (flags.enable_pruning)
{
auto this_l (shared ());
workers.post ([this_l] () {
this_l->ongoing_ledger_pruning ();
});
}
if (!flags.disable_rep_crawler)
{
rep_crawler.start ();
Expand Down Expand Up @@ -559,6 +555,7 @@ void nano::node::start ()
online_reps.start ();
monitor.start ();
http_callbacks.start ();
pruning.start ();

add_initial_peers ();
}
Expand Down Expand Up @@ -607,6 +604,7 @@ void nano::node::stop ()
network.stop ();
monitor.stop ();
http_callbacks.stop ();
pruning.stop ();

bootstrap_workers.stop ();
wallet_workers.stop ();
Expand Down Expand Up @@ -702,110 +700,6 @@ void nano::node::search_receivable_all ()
});
}

bool nano::node::collect_ledger_pruning_targets (std::deque<nano::block_hash> & pruning_targets_a, nano::account & last_account_a, uint64_t const batch_read_size_a, uint64_t const max_depth_a, uint64_t const cutoff_time_a)
{
uint64_t read_operations (0);
bool finish_transaction (false);
auto transaction = ledger.tx_begin_read ();
for (auto i (store.confirmation_height.begin (transaction, last_account_a)), n (store.confirmation_height.end (transaction)); i != n && !finish_transaction;)
{
++read_operations;
auto const & account (i->first);
nano::block_hash hash (i->second.frontier);
uint64_t depth (0);
while (!hash.is_zero () && depth < max_depth_a)
{
auto block = ledger.any.block_get (transaction, hash);
if (block != nullptr)
{
if (block->sideband ().timestamp > cutoff_time_a || depth == 0)
{
hash = block->previous ();
}
else
{
break;
}
}
else
{
release_assert (depth != 0);
hash = 0;
}
if (++depth % batch_read_size_a == 0)
{
// FIXME: This is triggering an assertion where the iterator is still used after transaction is refreshed
transaction.refresh ();
}
}
if (!hash.is_zero ())
{
pruning_targets_a.push_back (hash);
}
read_operations += depth;
if (read_operations >= batch_read_size_a)
{
last_account_a = inc_sat (account.number ());
finish_transaction = true;
}
else
{
++i;
}
}
return !finish_transaction || last_account_a.is_zero ();
}

void nano::node::ledger_pruning (uint64_t const batch_size_a, bool bootstrap_weight_reached_a)
{
uint64_t const max_depth (config.max_pruning_depth != 0 ? config.max_pruning_depth : std::numeric_limits<uint64_t>::max ());
uint64_t const cutoff_time (bootstrap_weight_reached_a ? nano::seconds_since_epoch () - config.max_pruning_age.count () : std::numeric_limits<uint64_t>::max ());
uint64_t pruned_count (0);
uint64_t transaction_write_count (0);
nano::account last_account (1); // 0 Burn account is never opened. So it can be used to break loop
std::deque<nano::block_hash> pruning_targets;
bool target_finished (false);
while ((transaction_write_count != 0 || !target_finished) && !stopped)
{
// Search pruning targets
while (pruning_targets.size () < batch_size_a && !target_finished && !stopped)
{
target_finished = collect_ledger_pruning_targets (pruning_targets, last_account, batch_size_a * 2, max_depth, cutoff_time);
}
// Pruning write operation
transaction_write_count = 0;
if (!pruning_targets.empty () && !stopped)
{
auto write_transaction = ledger.tx_begin_write (nano::store::writer::pruning);
while (!pruning_targets.empty () && transaction_write_count < batch_size_a && !stopped)
{
auto const & pruning_hash (pruning_targets.front ());
auto account_pruned_count (ledger.pruning_action (write_transaction, pruning_hash, batch_size_a));
transaction_write_count += account_pruned_count;
pruning_targets.pop_front ();
}
pruned_count += transaction_write_count;

logger.debug (nano::log::type::prunning, "Pruned blocks: {}", pruned_count);
}
}

logger.debug (nano::log::type::prunning, "Total recently pruned block count: {}", pruned_count);
}

void nano::node::ongoing_ledger_pruning ()
{
auto bootstrap_weight_reached (ledger.block_count () >= ledger.bootstrap_weight_max_blocks);
ledger_pruning (flags.block_processor_batch_size != 0 ? flags.block_processor_batch_size : 2 * 1024, bootstrap_weight_reached);
auto const ledger_pruning_interval (bootstrap_weight_reached ? config.max_pruning_age : std::min (config.max_pruning_age, std::chrono::seconds (15 * 60)));
auto this_l (shared ());
workers.post_delayed (ledger_pruning_interval, [this_l] () {
this_l->workers.post ([this_l] () {
this_l->ongoing_ledger_pruning ();
});
});
}

uint64_t nano::node::default_difficulty (nano::work_version const version_a) const
{
uint64_t result{ std::numeric_limits<uint64_t>::max () };
Expand Down Expand Up @@ -1078,6 +972,7 @@ nano::container_info nano::node::container_info () const
info.add ("backlog_scan", backlog_scan.container_info ());
info.add ("bounded_backlog", backlog.container_info ());
info.add ("http_callbacks", http_callbacks.container_info ());
info.add ("pruning", pruning.container_info ());
return info;
}

Expand Down
5 changes: 2 additions & 3 deletions nano/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ class node final : public std::enable_shared_from_this<node>
nano::uint128_t minimum_principal_weight ();
void backup_wallet ();
void search_receivable_all ();
bool collect_ledger_pruning_targets (std::deque<nano::block_hash> &, nano::account &, uint64_t const, uint64_t const, uint64_t const);
void ledger_pruning (uint64_t const, bool);
void ongoing_ledger_pruning ();
// The default difficulty updates to base only when the first epoch_2 block is processed
uint64_t default_difficulty (nano::work_version const) const;
uint64_t default_receive_difficulty (nano::work_version const) const;
Expand Down Expand Up @@ -202,6 +199,8 @@ class node final : public std::enable_shared_from_this<node>
nano::monitor & monitor;
std::unique_ptr<nano::http_callbacks> http_callbacks_impl;
nano::http_callbacks & http_callbacks;
std::unique_ptr<nano::pruning> pruning_impl;
nano::pruning & pruning;

public:
std::chrono::steady_clock::time_point const startup_time;
Expand Down
Loading
Loading