Skip to content

Commit

Permalink
feature/dpos-pbft-bos-upgrade bugfix and snapshot support (EOSIO#94)
Browse files Browse the repository at this point in the history
* bugfix: fork_db migration incorrect pos align

* bug fix: return default block id when trying to fetch block num 0; update pbft status during ctrl init.

* add migration log

* improve: change algorithm in LIB upgrade fork database migration, huge performance boost when fork db size is large

* use schedule in ucb when there is no stable checkpoint.

* update snapshot migration logic to reflect struct change in block header state

* add forkdb block states into snapshot after pbft is enabled; fix replay bug.

* change block producing log
  • Loading branch information
qianxiaofeng authored May 11, 2019
1 parent 76fc46f commit 9407bcf
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 76 deletions.
143 changes: 94 additions & 49 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,21 +361,23 @@ struct controller_impl {

read_from_snapshot( snapshot );

//do upgrade migration if necessary;
migrate_upgrade(); //compatiable for snapshot integrity test
//do upgrade migration if necessary;
migrate_upgrade(); //compatiable for snapshot integrity test

auto end = blog.read_head();
if( !end ) {
blog.reset( conf.genesis, signed_block_ptr(), head->block_num + 1 );
auto reset_block_num = head->block_num + 1;
if (pbft_enabled) reset_block_num = head->pbft_stable_checkpoint_blocknum;
blog.reset( conf.genesis, signed_block_ptr(), reset_block_num );
} else if( end->block_num() > head->block_num ) {
replay( shutdown );
} else {
EOS_ASSERT( end->block_num() == head->block_num, fork_database_exception,
"Block log is provided with snapshot but does not contain the head block from the snapshot" );
}
} else {
//do upgrade migration if necessary;
migrate_upgrade(); //compatiable for snapshot integrity test
//do upgrade migration if necessary;
migrate_upgrade(); //compatiable for snapshot integrity test
if( !head ) {
initialize_fork_db(); // set head to genesis state
}
Expand All @@ -388,7 +390,7 @@ struct controller_impl {
report_integrity_hash = true;
}
}

if( shutdown() ) return;

const auto& ubi = reversible_blocks.get_index<reversible_block_index,by_num>();
Expand Down Expand Up @@ -430,11 +432,39 @@ struct controller_impl {
//generate upo.
try {
db.get<upgrade_property_object>();
if (pbft_enabled) wlog("pbft enabled");
} catch( const boost::exception& e) {
wlog("no upo found, generating...");
db.create<upgrade_property_object>([](auto&){});
}
update_pbft_status();
}

void update_pbft_status() {
auto utb = optional<block_num_type>{};
auto& upo = db.get<upgrade_property_object>();
if (upo.upgrade_target_block_num > 0) utb = upo.upgrade_target_block_num;

auto ucb = optional<block_num_type>{};
if (upo.upgrade_complete_block_num > 0) ucb = upo.upgrade_complete_block_num;


if (utb && !ucb && head->dpos_irreversible_blocknum >= *utb) {
db.modify( upo, [&]( auto& up ) {
up.upgrade_complete_block_num = head->block_num;
});
if (!replaying) wlog("pbft will be working after the block ${b}", ("b", head->block_num));
}

if ( !pbft_enabled && utb && head->block_num >= *utb) {
if (!pbft_upgrading) pbft_upgrading = true;

// new version starts from the next block of ucb, this is to avoid inconsistency after pre calculation inside schedule loop.
if (ucb && head->block_num > *ucb) {
if (pbft_upgrading) pbft_upgrading = false;
pbft_enabled = true;
}
}
}

~controller_impl() {
Expand Down Expand Up @@ -527,9 +557,21 @@ struct controller_impl {
section.add_row(batch_pbft_snapshot_migration{}, db);
});

snapshot->write_section<block_state>([this]( auto &section ){
section.template add_row<block_header_state>(*fork_db.head(), db);
});
if (pbft_enabled) {
snapshot->write_section<batch_pbft_enabled>([this]( auto &section ) {
section.add_row(batch_pbft_enabled{}, db);
});
snapshot->write_section<branch_type>([this](auto &section) {
auto bid = fork_db.get_block_in_current_chain_by_num(fork_db.head()->pbft_stable_checkpoint_blocknum)->id;
EOS_ASSERT(bid != block_id_type{}, snapshot_exception, "cannot find lscb block");
auto bss = fork_db.fetch_branch_from(fork_db.head()->id, bid).first;
section.template add_row<branch_type>(bss, db);
});
} else {
snapshot->write_section<block_state>([this]( auto &section ){
section.template add_row<block_header_state>(*fork_db.head(), db);
});
}

controller_index_set::walk_indices([this, &snapshot]( auto utils ){
using value_t = typename decltype(utils)::index_t::value_type;
Expand Down Expand Up @@ -560,19 +602,42 @@ struct controller_impl {
});

bool migrated = snapshot->has_section<batch_pbft_snapshot_migration>();
if(migrated) {
snapshot->read_section<block_state>([this](auto &section) {
block_header_state head_header_state;
section.read_row(head_header_state, db);

auto head_state = std::make_shared<block_state>(head_header_state);
fork_db.set(head_state);
fork_db.set_validity(head_state, true);
fork_db.mark_in_current_chain(head_state, true);
head = head_state;
snapshot_head_block = head->block_num;
});
}else{
if (migrated) {
auto upgraded = snapshot->has_section<batch_pbft_enabled>();
if (upgraded) {
snapshot->read_section<branch_type>([this](auto &section) {
branch_type bss;
section.template read_row<branch_type>(bss, db);
EOS_ASSERT(!bss.empty(), snapshot_exception, "no last stable checkpoint block in the snapshot");

wlog("${n} reversible blocks found in the snapshot", ("n", bss.size()));

for (auto i = bss.rbegin(); i != bss.rend(); ++i ) {
if (i == bss.rbegin()) {
fork_db.set(*i);
snapshot_head_block = (*i)->block_num;
} else {
fork_db.add((*i), true, true);
}
fork_db.set_validity((*i), true);
fork_db.mark_in_current_chain((*i), true);
}
head = fork_db.head();
});
} else {
snapshot->read_section<block_state>([this](auto &section) {
block_header_state head_header_state;
section.read_row(head_header_state, db);

auto head_state = std::make_shared<block_state>(head_header_state);
fork_db.set(head_state);
fork_db.set_validity(head_state, true);
fork_db.mark_in_current_chain(head_state, true);
head = head_state;
snapshot_head_block = head->block_num;
});
}
} else {
snapshot->read_section<block_state>([this](snapshot_reader::section_reader &section) {
block_header_state head_header_state;
section.read_pbft_migrate_row(head_header_state, db);
Expand Down Expand Up @@ -1338,30 +1403,7 @@ struct controller_impl {
pending.emplace(maybe_session());
}

auto utb = optional<block_num_type>{};
auto& upo = db.get<upgrade_property_object>();
if (upo.upgrade_target_block_num > 0) utb = upo.upgrade_target_block_num;

auto ucb = optional<block_num_type>{};
if (upo.upgrade_complete_block_num > 0) ucb = upo.upgrade_complete_block_num;


if (utb && !ucb && head->dpos_irreversible_blocknum >= *utb) {
db.modify( upo, [&]( auto& up ) {
up.upgrade_complete_block_num = head->block_num;
});
if (!replaying) wlog("pbft will be working after the block ${b}", ("b", head->block_num));
}

if ( !pbft_enabled && utb && head->block_num >= *utb) {
if (!pbft_upgrading) pbft_upgrading = true;

// new version starts from the next block of ucb, this is to avoid inconsistency after pre calculation inside schedule loop.
if (ucb && head->block_num > *ucb) {
if (pbft_upgrading) pbft_upgrading = false;
pbft_enabled = true;
}
}
update_pbft_status();

pending->_block_status = s;
pending->_producer_block_id = producer_block_id;
Expand Down Expand Up @@ -1616,6 +1658,9 @@ struct controller_impl {

EOS_ASSERT(!pending, block_validate_exception, "it is not valid to push a block when there is a pending block");

set_pbft_lib();
set_pbft_lscb();

try {
EOS_ASSERT( b, block_validate_exception, "trying to push empty block" );
EOS_ASSERT( (s == controller::block_status::irreversible || s == controller::block_status::validated),
Expand All @@ -1636,9 +1681,7 @@ struct controller_impl {
for (const auto &extn: b->block_extensions) {
if (extn.first == static_cast<uint16_t>(block_extension_type::pbft_stable_checkpoint)) {
pbft_commit_local(b->id());
set_pbft_lib();
set_pbft_latest_checkpoint(b->id());
set_pbft_lscb();
break;
}
}
Expand Down Expand Up @@ -2330,7 +2373,9 @@ block_id_type controller::last_stable_checkpoint_block_id() const {
if( block_header::num_from_id(tapos_block_summary.block_id) == lscb_num )
return tapos_block_summary.block_id;

return fetch_block_by_number(lscb_num)->id();
auto b = fetch_block_by_number(lscb_num);
if (b) return b->id();
return block_id_type{};
}


Expand Down
36 changes: 25 additions & 11 deletions libraries/chain/fork_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,29 +67,43 @@ namespace eosio { namespace chain {
bool is_version_1 = version_label != "version";
if(is_version_1){
/*start upgrade migration and this is a hack and ineffecient, but lucky we only need to do it once */

wlog("doing LIB upgrade migration");
auto start = ds.pos();
unsigned_int size; fc::raw::unpack( ds, size );
auto skipped_size_pos = ds.pos();

vector<char> data(content.begin()+(skipped_size_pos - start), content.end());

data.insert(data.end(),{0,0,0,0});//append 4 bytes for the very last block state, avoid underflow in case
fc::datastream<const char*> tmp_ds(data.data(), data.size());

for( uint32_t i = 0, n = size.value; i < n; ++i ) {
vector<char> tmp = data;
tmp.insert(tmp.begin(), {0,0,0,0});
fc::datastream<const char*> tmp_ds(tmp.data(), tmp.size());
block_state s;
fc::raw::unpack( tmp_ds, s );
//prepend 4bytes for pbft_stable_checkpoint_blocknum and append 2 bytes for pbft_prepared and pbft_my_prepare
auto tmp_data_length = tmp_ds.tellp() - 6;
data.erase(data.begin(),data.begin()+tmp_data_length);
wlog("processing block state in fork database ${i} of ${size}", ("i",i+1)("size",n));
block_header_state h;
fc::raw::unpack( tmp_ds, h );
h.pbft_stable_checkpoint_blocknum = 0;

//move pos backward 4 bytes for pbft_stable_checkpoint_blocknum
auto tmp_accumulated_data_length = tmp_ds.tellp() - 4;
tmp_ds.seekp(tmp_accumulated_data_length);

signed_block_ptr b;
fc::raw::unpack( tmp_ds, b );
bool validated;
fc::raw::unpack( tmp_ds, validated );
bool in_current_chain;
fc::raw::unpack( tmp_ds, in_current_chain );
block_state s{h};
s.block = b;
s.validated = validated;
s.in_current_chain = in_current_chain;

s.pbft_prepared = false;
s.pbft_my_prepare = false;
set( std::make_shared<block_state>( move( s ) ) );
}
fc::datastream<const char*> head_id_stream(data.data(), data.size());
block_id_type head_id;
fc::raw::unpack( head_id_stream, head_id );
fc::raw::unpack( tmp_ds, head_id );

my->head = get_block( head_id );
/*end upgrade migration*/
Expand Down
6 changes: 3 additions & 3 deletions libraries/chain/include/eosio/chain/block_header_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ namespace eosio { namespace chain {
* @brief defines the minimum state necessary to validate transaction headers
*/
struct block_header_state {
uint32_t pbft_stable_checkpoint_blocknum = 0;
block_id_type id;
uint32_t block_num = 0;
signed_block_header header;
Expand All @@ -28,6 +27,7 @@ struct block_header_state {
public_key_type block_signing_key;
vector<uint8_t> confirm_count;
vector<header_confirmation> confirmations;
uint32_t pbft_stable_checkpoint_blocknum = 0;

block_header_state next( const signed_block_header& h, bool trust = false, bool new_version = false)const;
block_header_state generate_next( block_timestamp_type when, bool new_version = false )const;
Expand Down Expand Up @@ -62,10 +62,10 @@ struct block_header_state {
} } /// namespace eosio::chain

FC_REFLECT( eosio::chain::block_header_state,
(pbft_stable_checkpoint_blocknum)
(id)(block_num)(header)(dpos_proposed_irreversible_blocknum)(dpos_irreversible_blocknum)(bft_irreversible_blocknum)

(pending_schedule_lib_num)(pending_schedule_hash)
(pending_schedule)(active_schedule)(blockroot_merkle)
(producer_to_last_produced)(producer_to_last_implied_irb)(block_signing_key)
(confirm_count)(confirmations) )
(confirm_count)(confirmations)
(pbft_stable_checkpoint_blocknum))
5 changes: 5 additions & 0 deletions libraries/chain/include/eosio/chain/chain_snapshot.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ struct batch_pbft_snapshot_migration{
bool migrated = true;
};

struct batch_pbft_enabled {
bool enabled = true;
};

} }

FC_REFLECT(eosio::chain::chain_snapshot_header,(version))
FC_REFLECT(eosio::chain::batch_pbft_snapshot_migration,(migrated))
FC_REFLECT(eosio::chain::batch_pbft_enabled,(enabled))
4 changes: 2 additions & 2 deletions libraries/chain/include/eosio/chain/snapshot.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,9 @@ namespace eosio { namespace chain {
std::ostringstream sstream;
sstream << in.rdbuf();
std::string str(sstream.str());
//prepend uint32_t 0
//append uint32_t 0
std::vector<char> tmp(str.begin(), str.end());
tmp.insert(tmp.begin(), {0,0,0,0});
tmp.insert(tmp.end(), {0,0,0,0});
fc::datastream<const char*> tmp_ds(tmp.data(), tmp.size());
fc::raw::unpack(tmp_ds, data);
auto original_data_length = tmp_ds.tellp() - 4;
Expand Down
19 changes: 12 additions & 7 deletions libraries/chain/pbft_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1173,7 +1173,7 @@ namespace eosio {
const auto& ucb = ctrl.get_upgrade_properties().upgrade_complete_block_num;
if (!ctrl.is_pbft_enabled()) return false;
return in >= ucb
&& (in % 100 == 1 || std::find(prepare_watermarks.begin(), prepare_watermarks.end(), in) != prepare_watermarks.end());
&& (in == ucb + 1 || in % 100 == 1 || std::find(prepare_watermarks.begin(), prepare_watermarks.end(), in) != prepare_watermarks.end());
};

for (auto i = psp->block_num;
Expand Down Expand Up @@ -1413,14 +1413,19 @@ namespace eosio {
}

producer_schedule_type pbft_database::lscb_active_producers() const {
auto lscb_num = ctrl.last_stable_checkpoint_block_num();
if (lscb_num == 0) return ctrl.initial_schedule();
auto num = ctrl.last_stable_checkpoint_block_num();

if (num == 0) {
const auto &ucb = ctrl.get_upgrade_properties().upgrade_complete_block_num;
if (ucb == 0) return ctrl.initial_schedule();
num = ucb;
}

auto lscb_state = ctrl.fetch_block_state_by_number(lscb_num);
if (!lscb_state) return ctrl.initial_schedule();
auto bs = ctrl.fetch_block_state_by_number(num);
if (!bs) return ctrl.initial_schedule();

if (lscb_state->pending_schedule.producers.empty()) return lscb_state->active_schedule;
return lscb_state->pending_schedule;
if (bs->pending_schedule.producers.empty()) return bs->active_schedule;
return bs->pending_schedule;
}

chain_id_type pbft_database::chain_id() {
Expand Down
16 changes: 12 additions & 4 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1607,10 +1607,18 @@ void producer_plugin_impl::produce_block() {
block_state_ptr new_bs = chain.head_block_state();
_producer_watermarks[new_bs->header.producer] = chain.head_block_num();

ilog("Produced block ${id}... #${n} @ ${t} signed by ${p} [trxs: ${count}, lib: ${lib}, lscb: ${lscb}]",
("p",new_bs->header.producer)("id",fc::variant(new_bs->id).as_string().substr(0,16))
("n",new_bs->block_num)("t",new_bs->header.timestamp)
("count",new_bs->block->transactions.size())("lib",chain.last_irreversible_block_num())("lscb", chain.last_stable_checkpoint_block_num()));
if (chain.is_pbft_enabled()) {
ilog("Produced block ${id}... #${n} @ ${t} signed by ${p} [trxs: ${count}, lib: ${lib}, lscb: ${lscb}]",
("p", new_bs->header.producer)("id", fc::variant(new_bs->id).as_string().substr(0, 16))
("n", new_bs->block_num)("t", new_bs->header.timestamp)
("count", new_bs->block->transactions.size())
("lib", chain.last_irreversible_block_num())("lscb", chain.last_stable_checkpoint_block_num()));
} else {
ilog("Produced block ${id}... #${n} @ ${t} signed by ${p} [trxs: ${count}, lib: ${lib}, confirmed: ${confs}]",
("p",new_bs->header.producer)("id",fc::variant(new_bs->id).as_string().substr(0,16))
("n",new_bs->block_num)("t",new_bs->header.timestamp)
("count",new_bs->block->transactions.size())("lib",chain.last_irreversible_block_num())("confs", new_bs->header.confirmed));
}
}

} // namespace eosio

0 comments on commit 9407bcf

Please sign in to comment.