From 9407bcfc4c2cacab4b0ac3f05fe1c11ed0581deb Mon Sep 17 00:00:00 2001 From: Qian Xiaofeng Date: Sat, 11 May 2019 19:45:01 +0800 Subject: [PATCH] feature/dpos-pbft-bos-upgrade bugfix and snapshot support (#94) * bugfix: fork_db migration incorrect pos align * bug fix: return default block id when trying to fetch block num 0; update pbft status during ctrl init. * add migration log * improve: change algorithm in LIB upgrade fork database migration, huge performance boost when fork db size is large * use schedule in ucb when there is no stable checkpoint. * update snapshot migration logic to reflect struct change in block header state * add forkdb block states into snapshot after pbft is enabled; fix replay bug. * change block producing log --- libraries/chain/controller.cpp | 143 ++++++++++++------ libraries/chain/fork_database.cpp | 36 +++-- .../eosio/chain/block_header_state.hpp | 6 +- .../include/eosio/chain/chain_snapshot.hpp | 5 + .../chain/include/eosio/chain/snapshot.hpp | 4 +- libraries/chain/pbft_database.cpp | 19 ++- plugins/producer_plugin/producer_plugin.cpp | 16 +- 7 files changed, 153 insertions(+), 76 deletions(-) diff --git a/libraries/chain/controller.cpp b/libraries/chain/controller.cpp index 55f3a939acd..c6f611de03c 100644 --- a/libraries/chain/controller.cpp +++ b/libraries/chain/controller.cpp @@ -361,12 +361,14 @@ struct controller_impl { read_from_snapshot( snapshot ); - //do upgrade migration if necessary; - migrate_upgrade(); //compatiable for snapshot integrity test + //do upgrade migration if necessary; + migrate_upgrade(); //compatiable for snapshot integrity test auto end = blog.read_head(); if( !end ) { - blog.reset( conf.genesis, signed_block_ptr(), head->block_num + 1 ); + auto reset_block_num = head->block_num + 1; + if (pbft_enabled) reset_block_num = head->pbft_stable_checkpoint_blocknum; + blog.reset( conf.genesis, signed_block_ptr(), reset_block_num ); } else if( end->block_num() > head->block_num ) { replay( shutdown ); } else { @@ -374,8 +376,8 @@ struct controller_impl { "Block log is provided with snapshot but does not contain the head block from the snapshot" ); } } else { - //do upgrade migration if necessary; - migrate_upgrade(); //compatiable for snapshot integrity test + //do upgrade migration if necessary; + migrate_upgrade(); //compatiable for snapshot integrity test if( !head ) { initialize_fork_db(); // set head to genesis state } @@ -388,7 +390,7 @@ struct controller_impl { report_integrity_hash = true; } } - + if( shutdown() ) return; const auto& ubi = reversible_blocks.get_index(); @@ -430,11 +432,39 @@ struct controller_impl { //generate upo. try { db.get(); + if (pbft_enabled) wlog("pbft enabled"); } catch( const boost::exception& e) { wlog("no upo found, generating..."); db.create([](auto&){}); } + update_pbft_status(); + } + + void update_pbft_status() { + auto utb = optional{}; + auto& upo = db.get(); + if (upo.upgrade_target_block_num > 0) utb = upo.upgrade_target_block_num; + auto ucb = optional{}; + if (upo.upgrade_complete_block_num > 0) ucb = upo.upgrade_complete_block_num; + + + if (utb && !ucb && head->dpos_irreversible_blocknum >= *utb) { + db.modify( upo, [&]( auto& up ) { + up.upgrade_complete_block_num = head->block_num; + }); + if (!replaying) wlog("pbft will be working after the block ${b}", ("b", head->block_num)); + } + + if ( !pbft_enabled && utb && head->block_num >= *utb) { + if (!pbft_upgrading) pbft_upgrading = true; + + // new version starts from the next block of ucb, this is to avoid inconsistency after pre calculation inside schedule loop. + if (ucb && head->block_num > *ucb) { + if (pbft_upgrading) pbft_upgrading = false; + pbft_enabled = true; + } + } } ~controller_impl() { @@ -527,9 +557,21 @@ struct controller_impl { section.add_row(batch_pbft_snapshot_migration{}, db); }); - snapshot->write_section([this]( auto §ion ){ - section.template add_row(*fork_db.head(), db); - }); + if (pbft_enabled) { + snapshot->write_section([this]( auto §ion ) { + section.add_row(batch_pbft_enabled{}, db); + }); + snapshot->write_section([this](auto §ion) { + auto bid = fork_db.get_block_in_current_chain_by_num(fork_db.head()->pbft_stable_checkpoint_blocknum)->id; + EOS_ASSERT(bid != block_id_type{}, snapshot_exception, "cannot find lscb block"); + auto bss = fork_db.fetch_branch_from(fork_db.head()->id, bid).first; + section.template add_row(bss, db); + }); + } else { + snapshot->write_section([this]( auto §ion ){ + section.template add_row(*fork_db.head(), db); + }); + } controller_index_set::walk_indices([this, &snapshot]( auto utils ){ using value_t = typename decltype(utils)::index_t::value_type; @@ -560,19 +602,42 @@ struct controller_impl { }); bool migrated = snapshot->has_section(); - if(migrated) { - snapshot->read_section([this](auto §ion) { - block_header_state head_header_state; - section.read_row(head_header_state, db); - - auto head_state = std::make_shared(head_header_state); - fork_db.set(head_state); - fork_db.set_validity(head_state, true); - fork_db.mark_in_current_chain(head_state, true); - head = head_state; - snapshot_head_block = head->block_num; - }); - }else{ + if (migrated) { + auto upgraded = snapshot->has_section(); + if (upgraded) { + snapshot->read_section([this](auto §ion) { + branch_type bss; + section.template read_row(bss, db); + EOS_ASSERT(!bss.empty(), snapshot_exception, "no last stable checkpoint block in the snapshot"); + + wlog("${n} reversible blocks found in the snapshot", ("n", bss.size())); + + for (auto i = bss.rbegin(); i != bss.rend(); ++i ) { + if (i == bss.rbegin()) { + fork_db.set(*i); + snapshot_head_block = (*i)->block_num; + } else { + fork_db.add((*i), true, true); + } + fork_db.set_validity((*i), true); + fork_db.mark_in_current_chain((*i), true); + } + head = fork_db.head(); + }); + } else { + snapshot->read_section([this](auto §ion) { + block_header_state head_header_state; + section.read_row(head_header_state, db); + + auto head_state = std::make_shared(head_header_state); + fork_db.set(head_state); + fork_db.set_validity(head_state, true); + fork_db.mark_in_current_chain(head_state, true); + head = head_state; + snapshot_head_block = head->block_num; + }); + } + } else { snapshot->read_section([this](snapshot_reader::section_reader §ion) { block_header_state head_header_state; section.read_pbft_migrate_row(head_header_state, db); @@ -1338,30 +1403,7 @@ struct controller_impl { pending.emplace(maybe_session()); } - auto utb = optional{}; - auto& upo = db.get(); - if (upo.upgrade_target_block_num > 0) utb = upo.upgrade_target_block_num; - - auto ucb = optional{}; - if (upo.upgrade_complete_block_num > 0) ucb = upo.upgrade_complete_block_num; - - - if (utb && !ucb && head->dpos_irreversible_blocknum >= *utb) { - db.modify( upo, [&]( auto& up ) { - up.upgrade_complete_block_num = head->block_num; - }); - if (!replaying) wlog("pbft will be working after the block ${b}", ("b", head->block_num)); - } - - if ( !pbft_enabled && utb && head->block_num >= *utb) { - if (!pbft_upgrading) pbft_upgrading = true; - - // new version starts from the next block of ucb, this is to avoid inconsistency after pre calculation inside schedule loop. - if (ucb && head->block_num > *ucb) { - if (pbft_upgrading) pbft_upgrading = false; - pbft_enabled = true; - } - } + update_pbft_status(); pending->_block_status = s; pending->_producer_block_id = producer_block_id; @@ -1616,6 +1658,9 @@ struct controller_impl { EOS_ASSERT(!pending, block_validate_exception, "it is not valid to push a block when there is a pending block"); + set_pbft_lib(); + set_pbft_lscb(); + try { EOS_ASSERT( b, block_validate_exception, "trying to push empty block" ); EOS_ASSERT( (s == controller::block_status::irreversible || s == controller::block_status::validated), @@ -1636,9 +1681,7 @@ struct controller_impl { for (const auto &extn: b->block_extensions) { if (extn.first == static_cast(block_extension_type::pbft_stable_checkpoint)) { pbft_commit_local(b->id()); - set_pbft_lib(); set_pbft_latest_checkpoint(b->id()); - set_pbft_lscb(); break; } } @@ -2330,7 +2373,9 @@ block_id_type controller::last_stable_checkpoint_block_id() const { if( block_header::num_from_id(tapos_block_summary.block_id) == lscb_num ) return tapos_block_summary.block_id; - return fetch_block_by_number(lscb_num)->id(); + auto b = fetch_block_by_number(lscb_num); + if (b) return b->id(); + return block_id_type{}; } diff --git a/libraries/chain/fork_database.cpp b/libraries/chain/fork_database.cpp index 21cfd79dbc2..d6624b54f04 100644 --- a/libraries/chain/fork_database.cpp +++ b/libraries/chain/fork_database.cpp @@ -67,29 +67,43 @@ namespace eosio { namespace chain { bool is_version_1 = version_label != "version"; if(is_version_1){ /*start upgrade migration and this is a hack and ineffecient, but lucky we only need to do it once */ - + wlog("doing LIB upgrade migration"); auto start = ds.pos(); unsigned_int size; fc::raw::unpack( ds, size ); auto skipped_size_pos = ds.pos(); vector data(content.begin()+(skipped_size_pos - start), content.end()); + data.insert(data.end(),{0,0,0,0});//append 4 bytes for the very last block state, avoid underflow in case + fc::datastream tmp_ds(data.data(), data.size()); + for( uint32_t i = 0, n = size.value; i < n; ++i ) { - vector tmp = data; - tmp.insert(tmp.begin(), {0,0,0,0}); - fc::datastream tmp_ds(tmp.data(), tmp.size()); - block_state s; - fc::raw::unpack( tmp_ds, s ); - //prepend 4bytes for pbft_stable_checkpoint_blocknum and append 2 bytes for pbft_prepared and pbft_my_prepare - auto tmp_data_length = tmp_ds.tellp() - 6; - data.erase(data.begin(),data.begin()+tmp_data_length); + wlog("processing block state in fork database ${i} of ${size}", ("i",i+1)("size",n)); + block_header_state h; + fc::raw::unpack( tmp_ds, h ); + h.pbft_stable_checkpoint_blocknum = 0; + + //move pos backward 4 bytes for pbft_stable_checkpoint_blocknum + auto tmp_accumulated_data_length = tmp_ds.tellp() - 4; + tmp_ds.seekp(tmp_accumulated_data_length); + + signed_block_ptr b; + fc::raw::unpack( tmp_ds, b ); + bool validated; + fc::raw::unpack( tmp_ds, validated ); + bool in_current_chain; + fc::raw::unpack( tmp_ds, in_current_chain ); + block_state s{h}; + s.block = b; + s.validated = validated; + s.in_current_chain = in_current_chain; + s.pbft_prepared = false; s.pbft_my_prepare = false; set( std::make_shared( move( s ) ) ); } - fc::datastream head_id_stream(data.data(), data.size()); block_id_type head_id; - fc::raw::unpack( head_id_stream, head_id ); + fc::raw::unpack( tmp_ds, head_id ); my->head = get_block( head_id ); /*end upgrade migration*/ diff --git a/libraries/chain/include/eosio/chain/block_header_state.hpp b/libraries/chain/include/eosio/chain/block_header_state.hpp index 26ba42cc9f8..a443532013a 100644 --- a/libraries/chain/include/eosio/chain/block_header_state.hpp +++ b/libraries/chain/include/eosio/chain/block_header_state.hpp @@ -10,7 +10,6 @@ namespace eosio { namespace chain { * @brief defines the minimum state necessary to validate transaction headers */ struct block_header_state { - uint32_t pbft_stable_checkpoint_blocknum = 0; block_id_type id; uint32_t block_num = 0; signed_block_header header; @@ -28,6 +27,7 @@ struct block_header_state { public_key_type block_signing_key; vector confirm_count; vector confirmations; + uint32_t pbft_stable_checkpoint_blocknum = 0; block_header_state next( const signed_block_header& h, bool trust = false, bool new_version = false)const; block_header_state generate_next( block_timestamp_type when, bool new_version = false )const; @@ -62,10 +62,10 @@ struct block_header_state { } } /// namespace eosio::chain FC_REFLECT( eosio::chain::block_header_state, - (pbft_stable_checkpoint_blocknum) (id)(block_num)(header)(dpos_proposed_irreversible_blocknum)(dpos_irreversible_blocknum)(bft_irreversible_blocknum) (pending_schedule_lib_num)(pending_schedule_hash) (pending_schedule)(active_schedule)(blockroot_merkle) (producer_to_last_produced)(producer_to_last_implied_irb)(block_signing_key) - (confirm_count)(confirmations) ) + (confirm_count)(confirmations) + (pbft_stable_checkpoint_blocknum)) diff --git a/libraries/chain/include/eosio/chain/chain_snapshot.hpp b/libraries/chain/include/eosio/chain/chain_snapshot.hpp index 884293360a5..43801333d2f 100644 --- a/libraries/chain/include/eosio/chain/chain_snapshot.hpp +++ b/libraries/chain/include/eosio/chain/chain_snapshot.hpp @@ -33,7 +33,12 @@ struct batch_pbft_snapshot_migration{ bool migrated = true; }; +struct batch_pbft_enabled { + bool enabled = true; +}; + } } FC_REFLECT(eosio::chain::chain_snapshot_header,(version)) FC_REFLECT(eosio::chain::batch_pbft_snapshot_migration,(migrated)) +FC_REFLECT(eosio::chain::batch_pbft_enabled,(enabled)) \ No newline at end of file diff --git a/libraries/chain/include/eosio/chain/snapshot.hpp b/libraries/chain/include/eosio/chain/snapshot.hpp index 70486b747f9..52b986510bd 100644 --- a/libraries/chain/include/eosio/chain/snapshot.hpp +++ b/libraries/chain/include/eosio/chain/snapshot.hpp @@ -232,9 +232,9 @@ namespace eosio { namespace chain { std::ostringstream sstream; sstream << in.rdbuf(); std::string str(sstream.str()); - //prepend uint32_t 0 + //append uint32_t 0 std::vector tmp(str.begin(), str.end()); - tmp.insert(tmp.begin(), {0,0,0,0}); + tmp.insert(tmp.end(), {0,0,0,0}); fc::datastream tmp_ds(tmp.data(), tmp.size()); fc::raw::unpack(tmp_ds, data); auto original_data_length = tmp_ds.tellp() - 4; diff --git a/libraries/chain/pbft_database.cpp b/libraries/chain/pbft_database.cpp index dea1e58da90..3d7c736285e 100644 --- a/libraries/chain/pbft_database.cpp +++ b/libraries/chain/pbft_database.cpp @@ -1173,7 +1173,7 @@ namespace eosio { const auto& ucb = ctrl.get_upgrade_properties().upgrade_complete_block_num; if (!ctrl.is_pbft_enabled()) return false; return in >= ucb - && (in % 100 == 1 || std::find(prepare_watermarks.begin(), prepare_watermarks.end(), in) != prepare_watermarks.end()); + && (in == ucb + 1 || in % 100 == 1 || std::find(prepare_watermarks.begin(), prepare_watermarks.end(), in) != prepare_watermarks.end()); }; for (auto i = psp->block_num; @@ -1413,14 +1413,19 @@ namespace eosio { } producer_schedule_type pbft_database::lscb_active_producers() const { - auto lscb_num = ctrl.last_stable_checkpoint_block_num(); - if (lscb_num == 0) return ctrl.initial_schedule(); + auto num = ctrl.last_stable_checkpoint_block_num(); + + if (num == 0) { + const auto &ucb = ctrl.get_upgrade_properties().upgrade_complete_block_num; + if (ucb == 0) return ctrl.initial_schedule(); + num = ucb; + } - auto lscb_state = ctrl.fetch_block_state_by_number(lscb_num); - if (!lscb_state) return ctrl.initial_schedule(); + auto bs = ctrl.fetch_block_state_by_number(num); + if (!bs) return ctrl.initial_schedule(); - if (lscb_state->pending_schedule.producers.empty()) return lscb_state->active_schedule; - return lscb_state->pending_schedule; + if (bs->pending_schedule.producers.empty()) return bs->active_schedule; + return bs->pending_schedule; } chain_id_type pbft_database::chain_id() { diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index b41e159a6db..e614e84d398 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -1607,10 +1607,18 @@ void producer_plugin_impl::produce_block() { block_state_ptr new_bs = chain.head_block_state(); _producer_watermarks[new_bs->header.producer] = chain.head_block_num(); - ilog("Produced block ${id}... #${n} @ ${t} signed by ${p} [trxs: ${count}, lib: ${lib}, lscb: ${lscb}]", - ("p",new_bs->header.producer)("id",fc::variant(new_bs->id).as_string().substr(0,16)) - ("n",new_bs->block_num)("t",new_bs->header.timestamp) - ("count",new_bs->block->transactions.size())("lib",chain.last_irreversible_block_num())("lscb", chain.last_stable_checkpoint_block_num())); + if (chain.is_pbft_enabled()) { + ilog("Produced block ${id}... #${n} @ ${t} signed by ${p} [trxs: ${count}, lib: ${lib}, lscb: ${lscb}]", + ("p", new_bs->header.producer)("id", fc::variant(new_bs->id).as_string().substr(0, 16)) + ("n", new_bs->block_num)("t", new_bs->header.timestamp) + ("count", new_bs->block->transactions.size()) + ("lib", chain.last_irreversible_block_num())("lscb", chain.last_stable_checkpoint_block_num())); + } else { + ilog("Produced block ${id}... #${n} @ ${t} signed by ${p} [trxs: ${count}, lib: ${lib}, confirmed: ${confs}]", + ("p",new_bs->header.producer)("id",fc::variant(new_bs->id).as_string().substr(0,16)) + ("n",new_bs->block_num)("t",new_bs->header.timestamp) + ("count",new_bs->block->transactions.size())("lib",chain.last_irreversible_block_num())("confs", new_bs->header.confirmed)); + } } } // namespace eosio