Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

replay ctrl-c support #6237

Merged
merged 5 commits into from
Nov 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libraries/appbase
51 changes: 30 additions & 21 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,22 +307,25 @@ struct controller_impl {
}
}

void replay() {
void replay(std::function<bool()> shutdown) {
auto blog_head = blog.read_head();
auto blog_head_time = blog_head->timestamp.to_time_point();
replaying = true;
replay_head_time = blog_head_time;
ilog( "existing block log, attempting to replay ${n} blocks", ("n",blog_head->block_num()) );
auto start_block_num = head->block_num + 1;
ilog( "existing block log, attempting to replay from ${s} to ${n} blocks",
("s", start_block_num)("n", blog_head->block_num()) );

auto start = fc::time_point::now();
while( auto next = blog.read_block_by_num( head->block_num + 1 ) ) {
self.push_block( next, controller::block_status::irreversible );
if( next->block_num() % 100 == 0 ) {
std::cerr << std::setw(10) << next->block_num() << " of " << blog_head->block_num() <<"\r";
if( shutdown() ) break;
}
}
std::cerr<< "\n";
ilog( "${n} blocks replayed", ("n", head->block_num) );
ilog( "${n} blocks replayed", ("n", head->block_num - start_block_num) );

// if the irreverible log is played without undo sessions enabled, we need to sync the
// revision ordinal to the appropriate expected value here.
Expand All @@ -338,42 +341,48 @@ struct controller_impl {
ilog( "${n} reversible blocks replayed", ("n",rev) );
auto end = fc::time_point::now();
ilog( "replayed ${n} blocks in ${duration} seconds, ${mspb} ms/block",
("n", head->block_num)("duration", (end-start).count()/1000000)
("mspb", ((end-start).count()/1000.0)/head->block_num) );
("n", head->block_num - start_block_num)("duration", (end-start).count()/1000000)
("mspb", ((end-start).count()/1000.0)/(head->block_num-start_block_num)) );
replaying = false;
replay_head_time.reset();
}

void init(const snapshot_reader_ptr& snapshot) {
void init(std::function<bool()> shutdown, const snapshot_reader_ptr& snapshot) {

thread_pool.emplace( conf.thread_pool_size );

bool report_integrity_hash = !!snapshot;
if (snapshot) {
EOS_ASSERT(!head, fork_database_exception, "");
EOS_ASSERT( !head, fork_database_exception, "" );
snapshot->validate();

read_from_snapshot(snapshot);
read_from_snapshot( snapshot );

auto end = blog.read_head();
if( !end ) {
blog.reset(conf.genesis, signed_block_ptr(), head->block_num + 1);
} else if ( end->block_num() > head->block_num) {
replay();
blog.reset( conf.genesis, signed_block_ptr(), head->block_num + 1 );
} else if( end->block_num() > head->block_num ) {
replay( shutdown );
} else {
EOS_ASSERT(end->block_num() == head->block_num, fork_database_exception,
"Block log is provided with snapshot but does not contain the head block from the snapshot");
EOS_ASSERT( end->block_num() == head->block_num, fork_database_exception,
"Block log is provided with snapshot but does not contain the head block from the snapshot" );
}
} else {
if( !head ) {
initialize_fork_db(); // set head to genesis state
}
} else if( !head ) {
initialize_fork_db(); // set head to genesis state

auto end = blog.read_head();
if( end && end->block_num() > 1 ) {
replay();
} else if( !end ) {
if( !end ) {
blog.reset( conf.genesis, head->block );
} else if( end->block_num() > head->block_num ) {
replay( shutdown );
report_integrity_hash = true;
}
}

if( shutdown() ) return;

const auto& ubi = reversible_blocks.get_index<reversible_block_index,by_num>();
auto objitr = ubi.rbegin();
if( objitr != ubi.rend() ) {
Expand All @@ -399,7 +408,7 @@ struct controller_impl {
db.undo();
}

if( snapshot ) {
if( report_integrity_hash ) {
const auto hash = calculate_integrity_hash();
ilog( "database initialized with hash: ${hash}", ("hash", hash) );
}
Expand Down Expand Up @@ -1612,12 +1621,12 @@ void controller::add_indices() {
my->add_indices();
}

void controller::startup( const snapshot_reader_ptr& snapshot ) {
void controller::startup( std::function<bool()> shutdown, const snapshot_reader_ptr& snapshot ) {
my->head = my->fork_db.head();
if( !my->head ) {
elog( "No head block in fork db, perhaps we need to replay" );
}
my->init(snapshot);
my->init(shutdown, snapshot);
}

const chainbase::database& controller::db()const { return my->db; }
Expand Down
2 changes: 1 addition & 1 deletion libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ namespace eosio { namespace chain {
~controller();

void add_indices();
void startup( const snapshot_reader_ptr& snapshot = nullptr );
void startup( std::function<bool()> shutdown, const snapshot_reader_ptr& snapshot = nullptr );

/**
* Starts a new pending block session upon which new transactions can
Expand Down
6 changes: 3 additions & 3 deletions libraries/testing/include/eosio/testing/tester.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ namespace eosio { namespace testing {

validating_node = std::make_unique<controller>(vcfg);
validating_node->add_indices();
validating_node->startup();
validating_node->startup( []() { return false; } );

init(true);
}
Expand All @@ -366,7 +366,7 @@ namespace eosio { namespace testing {

validating_node = std::make_unique<controller>(vcfg);
validating_node->add_indices();
validating_node->startup();
validating_node->startup( []() { return false; } );

init(config);
}
Expand Down Expand Up @@ -411,7 +411,7 @@ namespace eosio { namespace testing {
validating_node.reset();
validating_node = std::make_unique<controller>(vcfg);
validating_node->add_indices();
validating_node->startup();
validating_node->startup( []() { return false; } );

return ok;
}
Expand Down
2 changes: 1 addition & 1 deletion libraries/testing/tester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ namespace eosio { namespace testing {
void base_tester::open( const snapshot_reader_ptr& snapshot) {
control.reset( new controller(cfg) );
control->add_indices();
control->startup(snapshot);
control->startup( []() { return false; }, snapshot);
chain_transactions.clear();
control->accepted_block.connect([this]( const block_state_ptr& block_state ){
FC_ASSERT( block_state->block );
Expand Down
5 changes: 3 additions & 2 deletions plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -703,13 +703,14 @@ void chain_plugin::plugin_initialize(const variables_map& options) {
void chain_plugin::plugin_startup()
{ try {
try {
auto shutdown = [](){ return app().is_quiting(); };
if (my->snapshot_path) {
auto infile = std::ifstream(my->snapshot_path->generic_string(), (std::ios::in | std::ios::binary));
auto reader = std::make_shared<istream_snapshot_reader>(infile);
my->chain->startup(reader);
my->chain->startup(shutdown, reader);
infile.close();
} else {
my->chain->startup();
my->chain->startup(shutdown);
}
} catch (const database_guard_exception& e) {
log_guard_exception(e);
Expand Down