diff --git a/libraries/chain/block_log.cpp b/libraries/chain/block_log.cpp index b83d7599cc..946e4764b8 100644 --- a/libraries/chain/block_log.cpp +++ b/libraries/chain/block_log.cpp @@ -185,6 +185,16 @@ namespace eosio { namespace chain { return block; } + template + std::vector read_serialized_block(Stream&& ds, uint64_t block_size) { + std::vector buff; + buff.resize(block_size); + + ds.read(buff.data(), block_size); + + return buff; + } + template signed_block_header read_block_header(Stream&& ds, uint32_t expect_block_num) { signed_block_header bh; @@ -197,6 +207,7 @@ namespace eosio { namespace chain { return bh; } + /// Provide the read only view of the blocks.log file class block_log_data : public chain::log_data_base { block_log_preamble preamble; @@ -245,10 +256,12 @@ namespace eosio { namespace chain { // block_id_type previous; //bytes 14:45, low 4 bytes is big endian block number // of previous block - EOS_ASSERT(position <= size(), block_log_exception, "Invalid block position ${position}", - ("position", position)); + int blknum_offset = 14; + + EOS_ASSERT(position + blknum_offset + sizeof(uint32_t) <= size(), block_log_exception, + "Read outside of file: position ${position}, blknum_offset ${o}, file size ${s}", + ("position", position)("o", blknum_offset)("s", size())); - int blknum_offset = 14; uint32_t prev_block_num = read_data_at(file, position + blknum_offset); return fc::endian_reverse_u32(prev_block_num) + 1; } @@ -485,6 +498,7 @@ namespace eosio { namespace chain { virtual void flush() = 0; virtual signed_block_ptr read_block_by_num(uint32_t block_num) = 0; + virtual std::vector read_serialized_block_by_num(uint32_t block_num) = 0; virtual std::optional read_block_header_by_num(uint32_t block_num) = 0; virtual uint32_t version() const = 0; @@ -518,6 +532,7 @@ namespace eosio { namespace chain { void flush() final {} signed_block_ptr read_block_by_num(uint32_t block_num) final { return {}; }; + std::vector read_serialized_block_by_num(uint32_t block_num) final { return {}; }; std::optional read_block_header_by_num(uint32_t block_num) final { return {}; }; uint32_t version() const final { return 0; } @@ -562,6 +577,7 @@ namespace eosio { namespace chain { virtual uint32_t working_block_file_first_block_num() { return preamble.first_block_num; } virtual void post_append(uint64_t pos) {} virtual signed_block_ptr retry_read_block_by_num(uint32_t block_num) { return {}; } + virtual std::vector retry_read_serialized_block_by_num(uint32_t block_num) { return {}; } virtual std::optional retry_read_block_header_by_num(uint32_t block_num) { return {}; } void append(const signed_block_ptr& b, const block_id_type& id, @@ -603,6 +619,46 @@ namespace eosio { namespace chain { return pos; } + block_pos_size_t get_block_position_and_size(uint32_t block_num) { + uint64_t pos = get_block_pos(block_num); + + if (pos == block_log::npos) { + return block_pos_size_t {.position = block_log::npos, .size = 0}; + } + + assert(head); + uint32_t last_block_num = block_header::num_from_id(head->id); + EOS_ASSERT(block_num <= last_block_num, block_log_exception, + "block_num ${bn} should not be greater than last_block_num ${lbn}", + ("bn", block_num)("lbn", last_block_num)); + + uint64_t block_size = 0; + constexpr uint32_t block_pos_size = sizeof(uint64_t); // size of block position field in the block log file + + if (block_num < last_block_num) { + // current block is not the last block in the log file. + uint64_t next_block_pos = get_block_pos(block_num + 1); + + EOS_ASSERT(next_block_pos > pos + block_pos_size, block_log_exception, + "next block position ${np} should be greater than current block position ${p} plus block position field size ${bps}", + ("np", next_block_pos)("p", pos)("bps", block_pos_size)); + + block_size = next_block_pos - pos - block_pos_size; + } else { + // current block is the last block in the file. + + block_file.seek_end(0); + auto file_size = block_file.tellp(); + EOS_ASSERT(file_size > pos + block_pos_size, block_log_exception, + "block log file size ${fs} should be greater than current block position ${p} plus block position field size ${bps}", + ("fs", file_size)("p", pos)("bps", block_pos_size)); + + block_size = file_size - pos - block_pos_size; + } + + return block_pos_size_t {.position = pos, .size = block_size}; + } + signed_block_ptr read_block_by_num(uint32_t block_num) final { try { uint64_t pos = get_block_pos(block_num); @@ -615,6 +671,18 @@ namespace eosio { namespace chain { FC_LOG_AND_RETHROW() } + std::vector read_serialized_block_by_num(uint32_t block_num) final { + try { + auto [ position, size ] = get_block_position_and_size(block_num); + if (position != block_log::npos) { + block_file.seek(position); + return read_serialized_block(block_file, size); + } + return retry_read_serialized_block_by_num(block_num); + } + FC_LOG_AND_RETHROW() + } + std::optional read_block_header_by_num(uint32_t block_num) final { try { uint64_t pos = get_block_pos(block_num); @@ -1032,6 +1100,16 @@ namespace eosio { namespace chain { return {}; } + std::vector retry_read_serialized_block_by_num(uint32_t block_num) final { + uint64_t block_size = 0; + + auto ds = catalog.ro_stream_and_size_for_block(block_num, block_size); + if (ds) { + return read_serialized_block(*ds, block_size); + } + return {}; + } + std::optional retry_read_block_header_by_num(uint32_t block_num) final { auto ds = catalog.ro_stream_for_block(block_num); if (ds) @@ -1230,6 +1308,11 @@ namespace eosio { namespace chain { return my->read_block_by_num(block_num); } + std::vector block_log::read_serialized_block_by_num(uint32_t block_num) const { + std::lock_guard g(my->mtx); + return my->read_serialized_block_by_num(block_num); + } + std::optional block_log::read_block_header_by_num(uint32_t block_num) const { std::lock_guard g(my->mtx); return my->read_block_header_by_num(block_num); diff --git a/libraries/chain/controller.cpp b/libraries/chain/controller.cpp index 68b88c78a0..012bfd8a4d 100644 --- a/libraries/chain/controller.cpp +++ b/libraries/chain/controller.cpp @@ -5404,6 +5404,14 @@ signed_block_ptr controller::fetch_block_by_number( uint32_t block_num )const { return my->blog.read_block_by_num(block_num); } FC_CAPTURE_AND_RETHROW( (block_num) ) } +std::vector controller::fetch_serialized_block_by_number( uint32_t block_num)const { try { + if (signed_block_ptr b = my->fork_db_fetch_block_on_best_branch_by_num(block_num)) { + return fc::raw::pack(*b); + } + + return my->blog.read_serialized_block_by_num(block_num); +} FC_CAPTURE_AND_RETHROW( (block_num) ) } + std::optional controller::fetch_block_header_by_number( uint32_t block_num )const { try { auto b = my->fork_db_fetch_block_on_best_branch_by_num(block_num); if (b) diff --git a/libraries/chain/include/eosio/chain/block_log.hpp b/libraries/chain/include/eosio/chain/block_log.hpp index be539a5ddd..e933cb7920 100644 --- a/libraries/chain/include/eosio/chain/block_log.hpp +++ b/libraries/chain/include/eosio/chain/block_log.hpp @@ -53,7 +53,8 @@ namespace eosio { namespace chain { void reset( const genesis_state& gs, const signed_block_ptr& genesis_block ); void reset( const chain_id_type& chain_id, uint32_t first_block_num ); - signed_block_ptr read_block_by_num(uint32_t block_num)const; + signed_block_ptr read_block_by_num(uint32_t block_num)const; + std::vector read_serialized_block_by_num(uint32_t block_num)const; std::optional read_block_header_by_num(uint32_t block_num)const; std::optional read_block_id_by_num(uint32_t block_num)const; diff --git a/libraries/chain/include/eosio/chain/controller.hpp b/libraries/chain/include/eosio/chain/controller.hpp index de3b83130b..6f4b4de0d8 100644 --- a/libraries/chain/include/eosio/chain/controller.hpp +++ b/libraries/chain/include/eosio/chain/controller.hpp @@ -342,6 +342,8 @@ namespace eosio::chain { signed_block_ptr fetch_block_by_number( uint32_t block_num )const; // thread-safe signed_block_ptr fetch_block_by_id( const block_id_type& id )const; + // thread-safe, retrieves serialized signed block + std::vector fetch_serialized_block_by_number( uint32_t block_num)const; // thread-safe bool block_exists(const block_id_type& id) const; bool validated_block_exists(const block_id_type& id) const; diff --git a/libraries/chain/include/eosio/chain/log_catalog.hpp b/libraries/chain/include/eosio/chain/log_catalog.hpp index 70a60574f7..332bd2a0b0 100644 --- a/libraries/chain/include/eosio/chain/log_catalog.hpp +++ b/libraries/chain/include/eosio/chain/log_catalog.hpp @@ -31,6 +31,11 @@ struct null_verifier { void verify(const LogData&, const std::filesystem::path&) {} }; +struct block_pos_size_t { + uint64_t position = 0; // start position of the block in block log + uint64_t size = 0; // size of the block +}; + template struct log_catalog { using block_num_t = uint32_t; @@ -159,6 +164,26 @@ struct log_catalog { } } + std::optional get_block_position_and_size(uint32_t block_num) { + std::optional pos = get_block_position(block_num); + + if (!pos) { + return {}; + } + + constexpr uint32_t block_pos_size = sizeof(uint64_t); + uint64_t block_size = 0; + assert(block_num <= log_data.last_block_num()); + if (block_num < log_data.last_block_num()) { + uint64_t next_block_pos = log_index.nth_block_position(block_num + 1 - log_data.first_block_num()); + block_size = next_block_pos - *pos - block_pos_size; + } else { + block_size = log_data.size() - *pos - block_pos_size; + } + + return block_pos_size_t { .position = *pos, .size = block_size }; + } + fc::datastream* ro_stream_for_block(uint32_t block_num) { auto pos = get_block_position(block_num); if (pos) { @@ -176,6 +201,15 @@ struct log_catalog { return {}; } + fc::datastream* ro_stream_and_size_for_block(uint32_t block_num, uint64_t& block_size) { + auto pos_size = get_block_position_and_size(block_num); + if (pos_size) { + block_size = pos_size->size; + return &log_data.ro_stream_at(pos_size->position); + } + return nullptr; + } + std::optional id_for_block(uint32_t block_num) { auto pos = get_block_position(block_num); if (pos) { diff --git a/libraries/state_history/abi.cpp b/libraries/state_history/abi.cpp index 095b150886..fb5cf7f3fe 100644 --- a/libraries/state_history/abi.cpp +++ b/libraries/state_history/abi.cpp @@ -588,6 +588,13 @@ extern const char* const state_history_plugin_abi = R"({ { "type": "uint32", "name": "account_net_usage_average_window" } ] }, + { + "name": "finalizer_authority", "fields": [ + { "name": "description", "type": "string" }, + { "name": "weight", "type": "uint64" }, + { "name": "public_key", "type": "bytes" } + ] + }, { "name": "finalizer_authority_with_string_key", "fields": [ { "name": "description", "type": "string" }, @@ -616,6 +623,55 @@ extern const char* const state_history_plugin_abi = R"({ { "name": "pending_finalizer_policy", "type": "finalizer_policy_with_string_key?" }, { "name": "last_pending_finalizer_policy_generation", "type": "uint32" } ] + }, + { + "name": "protocol_feature_activation_extension", "fields": [ + { "name": "protocol_features", "type": "checksum256[]" } + ] + }, + { + "name": "producer_schedule_change_extension", "base": "producer_authority_schedule", "fields": [] + }, + { + "name": "qc_claim", "fields": [ + { "name": "block_num", "type": "uint32" }, + { "name": "is_strong_qc", "type": "bool" } + ] + }, + { + "name": "insert_finalizer_policy_index_pair", "fields": [ + { "name": "index", "type": "uint16" }, + { "name": "value", "type": "finalizer_authority" } + ] + }, + { + "name": "finalizer_policy_diff", "fields": [ + { "name": "generation", "type": "uint32" }, + { "name": "threshold", "type": "uint64" }, + { "name": "remove_indexes", "type": "uint16[]" }, + { "name": "insert_indexes", "type": "insert_finalizer_policy_index_pair[]" } + ] + }, + { + "name": "insert_proposer_policy_index_pair", "fields": [ + { "name": "index", "type": "uint16" }, + { "name": "value", "type": "producer_authority" } + ] + }, + { + "name": "proposer_policy_diff", "fields": [ + { "name": "version", "type": "uint32" }, + { "name": "proposal_time", "type": "block_timestamp_type" }, + { "name": "remove_indexes", "type": "uint16[]" }, + { "name": "insert_indexes", "type": "insert_proposer_policy_index_pair[]" } + ] + }, + { + "name": "finality_extension", "fields": [ + { "name": "qc_claim", "type": "qc_claim" }, + { "name": "new_finalizer_policy_diff", "type": "finalizer_policy_diff?" }, + { "name": "new_proposer_policy_diff", "type": "proposer_policy_diff?" } + ] } ], "types": [ @@ -657,7 +713,8 @@ extern const char* const state_history_plugin_abi = R"({ { "name": "resource_limits_ratio", "types": ["resource_limits_ratio_v0"] }, { "name": "elastic_limit_parameters", "types": ["elastic_limit_parameters_v0"] }, { "name": "resource_limits_config", "types": ["resource_limits_config_v0"] }, - { "name": "block_signing_authority", "types": ["block_signing_authority_v0"] } + { "name": "block_signing_authority", "types": ["block_signing_authority_v0"] }, + { "name": "block_header_extension", "types": ["protocol_feature_activation_extension", "producer_schedule_change_extension", "finality_extension"] } ], "tables": [ { "name": "account", "type": "account", "key_names": ["name"] }, diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index cd0138df83..6e3ba6b0e3 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -971,10 +971,9 @@ namespace eosio { void blk_send_branch( const block_id_type& msg_head_id ); void blk_send_branch( uint32_t msg_head_num, uint32_t lib_num, uint32_t head_num ); - void blk_send(const block_id_type& blkid); void enqueue( const net_message &msg ); - size_t enqueue_block( const signed_block_ptr& sb, bool to_sync_queue = false); + size_t enqueue_block( const std::vector& sb, uint32_t block_num, bool to_sync_queue = false); void enqueue_buffer( const std::shared_ptr>& send_buffer, go_away_reason close_after_send, bool to_sync_queue = false); @@ -1470,25 +1469,6 @@ namespace eosio { } } - // called from connection strand - void connection::blk_send( const block_id_type& blkid ) { - try { - controller& cc = my_impl->chain_plug->chain(); - signed_block_ptr b = cc.fetch_block_by_id( blkid ); // thread-safe - if( b ) { - peer_dlog( this, "fetch_block_by_id num ${n}", ("n", b->block_num()) ); - enqueue_block( b ); - } else { - peer_ilog( this, "fetch block by id returned null, id ${id}", ("id", blkid) ); - } - } catch( const assert_exception& ex ) { - // possible corrupted block log - peer_elog( this, "caught assert on fetch_block_by_id, ${ex}, id ${id}", ("ex", ex.to_string())("id", blkid) ); - } catch( ... ) { - peer_elog( this, "caught other exception fetching block id ${id}", ("id", blkid) ); - } - } - void connection::send_handshake() { if (closed()) return; @@ -1656,11 +1636,11 @@ namespace eosio { uint32_t num = peer_requested->last + 1; controller& cc = my_impl->chain_plug->chain(); - signed_block_ptr sb; + std::vector sb; try { - sb = cc.fetch_block_by_number( num ); // thread-safe + sb = cc.fetch_serialized_block_by_number( num ); // thread-safe } FC_LOG_AND_DROP(); - if( sb ) { + if( !sb.empty() ) { // Skip transmitting block this loop if threshold exceeded if (block_sync_send_start == 0ns) { // start of enqueue blocks block_sync_send_start = get_time(); @@ -1679,7 +1659,7 @@ namespace eosio { } } block_sync_throttling = false; - auto sent = enqueue_block( sb, true ); + auto sent = enqueue_block( sb, num, true ); block_sync_total_bytes_sent += sent; block_sync_frame_bytes_sent += sent; ++peer_requested->last; @@ -1748,6 +1728,25 @@ namespace eosio { return send_buffer; } + static send_buffer_type create_send_buffer_from_serialized_block( const std::vector& v ) { + static_assert( signed_block_which == fc::get_index() ); + + // match net_message static_variant pack + const uint32_t which_size = fc::raw::pack_size( unsigned_int( signed_block_which ) ); + const uint32_t payload_size = which_size + v.size(); + + const char* const header = reinterpret_cast(&payload_size); // avoid variable size encoding of uint32_t + const size_t buffer_size = message_header_size + payload_size; + + auto send_buffer = std::make_shared>( buffer_size ); + fc::datastream ds( send_buffer->data(), buffer_size ); + ds.write( header, message_header_size ); + fc::raw::pack( ds, unsigned_int( signed_block_which ) ); + ds.write( v.data(), v.size() ); + + return send_buffer; + } + }; struct block_buffer_factory : public buffer_factory { @@ -1760,6 +1759,13 @@ namespace eosio { return send_buffer; } + const send_buffer_type& get_send_buffer( const std::vector& sb ) { + if( !send_buffer ) { + send_buffer = create_send_buffer( sb ); + } + return send_buffer; + } + private: static std::shared_ptr> create_send_buffer( const signed_block_ptr& sb ) { @@ -1769,6 +1775,12 @@ namespace eosio { fc_dlog( logger, "sending block ${bn}", ("bn", sb->block_num()) ); return buffer_factory::create_send_buffer( signed_block_which, *sb ); } + + static std::shared_ptr> create_send_buffer( const std::vector& ssb ) { // ssb: serialized signed block + // this implementation is to avoid copy of signed_block to net_message + // matches which of net_message for signed_block + return buffer_factory::create_send_buffer_from_serialized_block( ssb ); + } }; struct trx_buffer_factory : public buffer_factory { @@ -1807,8 +1819,8 @@ namespace eosio { } // called from connection strand - size_t connection::enqueue_block( const signed_block_ptr& b, bool to_sync_queue) { - peer_dlog( this, "enqueue block ${num}", ("num", b->block_num()) ); + size_t connection::enqueue_block( const std::vector& b, uint32_t block_num, bool to_sync_queue ) { + peer_dlog( this, "enqueue block ${num}", ("num", block_num) ); verify_strand_in_this_thread( strand, __func__, __LINE__ ); block_buffer_factory buff_factory; @@ -2103,10 +2115,12 @@ namespace eosio { auto forkdb_head = cc.fork_db_head(); auto calculated_lib = forkdb_head.irreversible_blocknum(); auto num_blocks_that_can_be_applied = calculated_lib > head_num ? calculated_lib - head_num : 0; + // add blocks that can potentially be applied as they are not in the forkdb yet + num_blocks_that_can_be_applied += blk_num > forkdb_head.block_num() ? blk_num - forkdb_head.block_num() : 0; if (num_blocks_that_can_be_applied < sync_fetch_span) { if (head_num ) - fc_ilog(logger, "sync ahead allowed past sync-fetch-span ${sp}, block ${bn} for paused LIB ${l}, chain_lib ${cl}, forkdb size ${s}", - ("bn", blk_num)("sp", sync_fetch_span)("l", calculated_lib)("cl", head_num)("s", cc.fork_db_size())); + fc_ilog(logger, "sync ahead allowed past sync-fetch-span ${sp}, block ${bn} for paused LIB ${l}, chain_lib ${cl}, forkdb size ${s}", + ("bn", blk_num)("sp", sync_fetch_span)("l", calculated_lib)("cl", head_num)("s", cc.fork_db_size())); return true; } } @@ -3611,11 +3625,9 @@ namespace eosio { blk_send_branch( msg.req_blocks.ids.empty() ? block_id_type() : msg.req_blocks.ids.back() ); break; case normal : - peer_dlog( this, "received request_message:normal" ); - if( !msg.req_blocks.ids.empty() ) { - blk_send( msg.req_blocks.ids.back() ); - } - break; + peer_wlog( this, "Invalid request_message, req_blocks.mode = normal" ); + close(); + return; default:; } @@ -3627,14 +3639,15 @@ namespace eosio { if( msg.req_blocks.mode == none ) { peer_syncing_from_us = false; } - // no break - case normal : if( !msg.req_trx.ids.empty() ) { - peer_wlog( this, "Invalid request_message, req_trx.ids.size ${s}", ("s", msg.req_trx.ids.size()) ); + peer_wlog( this, "Invalid request_message, req_trx.mode=none, req_trx.ids.size ${s}", ("s", msg.req_trx.ids.size()) ); close(); - return; } break; + case normal : + peer_wlog( this, "Invalid request_message, req_trx.mode=normal" ); + close(); + break; default:; } } diff --git a/plugins/producer_plugin/test/test_block_timing_util.cpp b/plugins/producer_plugin/test/test_block_timing_util.cpp index 1c9108710e..dd53c809a7 100644 --- a/plugins/producer_plugin/test/test_block_timing_util.cpp +++ b/plugins/producer_plugin/test/test_block_timing_util.cpp @@ -151,7 +151,7 @@ BOOST_AUTO_TEST_CASE(test_calculate_producer_wake_up_time) { for (uint32_t i = 0; i < static_cast(config::producer_repetitions * active_schedule.size() * 3); ++i) { // 3 rounds to test boundaries block_timestamp_type block_timestamp(prod_round_1st_block_slot + i); auto block_time = block_timestamp.to_time_point(); - BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp, producers, active_schedule, empty_watermarks), block_time); + BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp, producers, active_schedule, empty_watermarks), std::optional{block_time}); } } { // We have all producers in active_schedule configured, we should produce every block @@ -161,7 +161,7 @@ BOOST_AUTO_TEST_CASE(test_calculate_producer_wake_up_time) { for (uint32_t i = 0; i < static_cast(config::producer_repetitions * active_schedule.size() * 3); ++i) { // 3 rounds to test boundaries block_timestamp_type block_timestamp(prod_round_1st_block_slot + i); auto block_time = block_timestamp.to_time_point(); - BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp, producers, active_schedule, empty_watermarks), block_time); + BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp, producers, active_schedule, empty_watermarks), std::optional{block_time}); } } { // We have all producers in active_schedule of 21 (plus a couple of extra producers configured), we should produce every block @@ -179,7 +179,7 @@ BOOST_AUTO_TEST_CASE(test_calculate_producer_wake_up_time) { for (uint32_t i = 0; i < static_cast(config::producer_repetitions * active_schedule.size() * 3); ++i) { // 3 rounds to test boundaries block_timestamp_type block_timestamp(prod_round_1st_block_slot + i); auto block_time = block_timestamp.to_time_point(); - BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp, producers, active_schedule, empty_watermarks), block_time); + BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp, producers, active_schedule, empty_watermarks), std::optional{block_time}); } } { // Tests for when we only have a subset of all active producers, we do not produce all blocks, only produce blocks for our round @@ -194,48 +194,48 @@ BOOST_AUTO_TEST_CASE(test_calculate_producer_wake_up_time) { std::set producers = { "initb"_n }; block_timestamp_type block_timestamp(prod_round_1st_block_slot); auto expected_block_time = block_timestamp_type(prod_round_1st_block_slot + config::producer_repetitions).to_time_point(); - BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp_type{block_timestamp.slot-1}, producers, active_schedule, empty_watermarks), expected_block_time); // same - BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp_type{block_timestamp.slot+config::producer_repetitions-1}, producers, active_schedule, empty_watermarks), expected_block_time); // same - BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp_type{block_timestamp.slot+config::producer_repetitions-2}, producers, active_schedule, empty_watermarks), expected_block_time); // same - BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp_type{block_timestamp.slot+config::producer_repetitions-3}, producers, active_schedule, empty_watermarks), expected_block_time); // same + BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp_type{block_timestamp.slot-1}, producers, active_schedule, empty_watermarks), std::optional{expected_block_time}); // same + BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp_type{block_timestamp.slot+config::producer_repetitions-1}, producers, active_schedule, empty_watermarks), std::optional{expected_block_time}); // same + BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp_type{block_timestamp.slot+config::producer_repetitions-2}, producers, active_schedule, empty_watermarks), std::optional{expected_block_time}); // same + BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp_type{block_timestamp.slot+config::producer_repetitions-3}, producers, active_schedule, empty_watermarks), std::optional{expected_block_time}); // same // current which gives same expected - BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp_type{block_timestamp.slot+config::producer_repetitions}, producers, active_schedule, empty_watermarks), expected_block_time); + BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp_type{block_timestamp.slot+config::producer_repetitions}, producers, active_schedule, empty_watermarks), std::optional{expected_block_time}); expected_block_time += fc::microseconds(config::block_interval_us); - BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp_type{block_timestamp.slot+config::producer_repetitions+1}, producers, active_schedule, empty_watermarks), expected_block_time); + BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp_type{block_timestamp.slot+config::producer_repetitions+1}, producers, active_schedule, empty_watermarks), std::optional{expected_block_time}); // inita is first in the schedule, prod_round_1st_block_slot is block time of the first block, so will return the next block time as that is when current should be produced producers = std::set{ "inita"_n }; block_timestamp = block_timestamp_type{prod_round_1st_block_slot}; expected_block_time = block_timestamp.to_time_point(); - BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp_type{block_timestamp.slot-1}, producers, active_schedule, empty_watermarks), expected_block_time); // same - BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp_type{block_timestamp.slot-2}, producers, active_schedule, empty_watermarks), expected_block_time); // same - BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp_type{block_timestamp.slot-3}, producers, active_schedule, empty_watermarks), expected_block_time); // same + BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp_type{block_timestamp.slot-1}, producers, active_schedule, empty_watermarks), std::optional{expected_block_time}); // same + BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp_type{block_timestamp.slot-2}, producers, active_schedule, empty_watermarks), std::optional{expected_block_time}); // same + BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp_type{block_timestamp.slot-3}, producers, active_schedule, empty_watermarks), std::optional{expected_block_time}); // same for (size_t i = 0; i < config::producer_repetitions; ++i) { expected_block_time = block_timestamp_type(prod_round_1st_block_slot+i).to_time_point(); - BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp, producers, active_schedule, empty_watermarks), expected_block_time); + BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp, producers, active_schedule, empty_watermarks), std::optional{expected_block_time}); block_timestamp = block_timestamp.next(); } expected_block_time = block_timestamp.to_time_point(); - BOOST_CHECK_NE(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp, producers, active_schedule, empty_watermarks), expected_block_time); // end of round, so not the next + BOOST_CHECK_NE(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp, producers, active_schedule, empty_watermarks), std::optional{expected_block_time}); // end of round, so not the next // initc is third in the schedule, verify its wake-up time is as expected producers = std::set{ "initc"_n }; block_timestamp = block_timestamp_type(prod_round_1st_block_slot); // expect 2*producer_repetitions since we expect wake-up time to be after the first two rounds expected_block_time = block_timestamp_type(prod_round_1st_block_slot + 2*config::producer_repetitions).to_time_point(); - BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp, producers, active_schedule, empty_watermarks), expected_block_time); + BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp, producers, active_schedule, empty_watermarks), std::optional{expected_block_time}); // inith, initk - configured for 2 of the 21 producers. inith is 8th in schedule, initk is 11th in schedule producers = std::set{ "inith"_n, "initk"_n }; block_timestamp = block_timestamp_type(prod_round_1st_block_slot); // expect to produce after 7 rounds since inith is 8th expected_block_time = block_timestamp_type(prod_round_1st_block_slot + 7*config::producer_repetitions).to_time_point(); - BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp, producers, active_schedule, empty_watermarks), expected_block_time); + BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp, producers, active_schedule, empty_watermarks), std::optional{expected_block_time}); // give it a time after inith otherwise would return inith time block_timestamp = block_timestamp_type(prod_round_1st_block_slot + 8*config::producer_repetitions); // after inith round // expect to produce after 10 rounds since inith is 11th expected_block_time = block_timestamp_type(prod_round_1st_block_slot + 10*config::producer_repetitions).to_time_point(); - BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp, producers, active_schedule, empty_watermarks), expected_block_time); + BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp, producers, active_schedule, empty_watermarks), std::optional{expected_block_time}); // cpu_effort at 50%, initc constexpr fc::microseconds half_cpu_effort = fc::microseconds{eosio::chain::config::block_interval_us / 2u}; @@ -243,18 +243,18 @@ BOOST_AUTO_TEST_CASE(test_calculate_producer_wake_up_time) { block_timestamp = block_timestamp_type(prod_round_1st_block_slot); expected_block_time = block_timestamp_type(prod_round_1st_block_slot + 2*config::producer_repetitions).to_time_point(); // first in round is not affected by cpu effort - BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(half_cpu_effort, 2, block_timestamp, producers, active_schedule, empty_watermarks), expected_block_time); + BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(half_cpu_effort, 2, block_timestamp, producers, active_schedule, empty_watermarks), std::optional{expected_block_time}); block_timestamp = block_timestamp_type(prod_round_1st_block_slot + 2*config::producer_repetitions + 1); // second in round is 50% sooner expected_block_time = block_timestamp.to_time_point(); expected_block_time -= fc::microseconds(half_cpu_effort); - BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(half_cpu_effort, 2, block_timestamp, producers, active_schedule, empty_watermarks), expected_block_time); + BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(half_cpu_effort, 2, block_timestamp, producers, active_schedule, empty_watermarks), std::optional{expected_block_time}); // third in round is 2*50% sooner block_timestamp = block_timestamp_type(prod_round_1st_block_slot + 2*config::producer_repetitions + 2); // second in round is 50% sooner expected_block_time = block_timestamp.to_time_point(); expected_block_time -= fc::microseconds(2*half_cpu_effort.count()); - BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(half_cpu_effort, 2, block_timestamp, producers, active_schedule, empty_watermarks), expected_block_time); + BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(half_cpu_effort, 2, block_timestamp, producers, active_schedule, empty_watermarks), std::optional{expected_block_time}); } { // test watermark std::vector active_schedule{ // 21 @@ -270,15 +270,15 @@ BOOST_AUTO_TEST_CASE(test_calculate_producer_wake_up_time) { // initc, with no watermarks producers = std::set{ "initc"_n }; auto expected_block_time = block_timestamp_type(prod_round_1st_block_slot + 2*config::producer_repetitions).to_time_point(); // without watermark - BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp, producers, active_schedule, empty_watermarks), expected_block_time); + BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp, producers, active_schedule, empty_watermarks), std::optional{expected_block_time}); // add watermark at first block, first block should not be allowed, wake-up time should be after first block of initc prod_watermarks.consider_new_watermark("initc"_n, 2, block_timestamp_type((prod_round_1st_block_slot + 2*config::producer_repetitions + 1))); // +1 since watermark is in block production time expected_block_time = block_timestamp_type(prod_round_1st_block_slot + 2*config::producer_repetitions + 1).to_time_point(); // with watermark, wait until next - BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp, producers, active_schedule, prod_watermarks), expected_block_time); + BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp, producers, active_schedule, prod_watermarks), std::optional{expected_block_time}); // add watermark at first 2 blocks, first & second block should not be allowed, wake-up time should be after second block of initc prod_watermarks.consider_new_watermark("initc"_n, 2, block_timestamp_type((prod_round_1st_block_slot + 2*config::producer_repetitions + 1 + 1))); expected_block_time = block_timestamp_type(prod_round_1st_block_slot + 2*config::producer_repetitions + 2).to_time_point(); // with watermark, wait until next - BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp, producers, active_schedule, prod_watermarks), expected_block_time); + BOOST_CHECK_EQUAL(calculate_producer_wake_up_time(full_cpu_effort, 2, block_timestamp, producers, active_schedule, prod_watermarks), std::optional{expected_block_time}); } { // actual example that caused multiple start blocks producer_watermarks prod_watermarks; diff --git a/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp b/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp index 572c2d3ec6..324c5db818 100644 --- a/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp +++ b/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp @@ -327,7 +327,7 @@ class session final : public session_base { std::optional& chain_state_log; std::optional& finality_data_log; - GetBlockID get_block_id; + GetBlockID get_block_id; // call from main app thread GetBlock get_block; ///these items might be used on either the strand or main thread diff --git a/plugins/state_history_plugin/state_history_plugin.cpp b/plugins/state_history_plugin/state_history_plugin.cpp index 89eb78b9d6..df6b839a31 100644 --- a/plugins/state_history_plugin/state_history_plugin.cpp +++ b/plugins/state_history_plugin/state_history_plugin.cpp @@ -100,22 +100,24 @@ struct state_history_plugin_impl { template void create_listener(const std::string& address) { const boost::posix_time::milliseconds accept_timeout(200); - // connections set must only be modified by main thread; run listener on main thread to avoid needing another post() - fc::create_listener(app().get_io_service(), _log, accept_timeout, address, "", [this](Protocol::socket&& socket) { - catch_and_log([this, &socket]() { - connections.emplace(new session(std::move(socket), boost::asio::make_strand(thread_pool.get_executor()), chain_plug->chain(), - trace_log, chain_state_log, finality_data_log, - [this](const chain::block_num_type block_num) { - return get_block_id(block_num); - }, - [this](const chain::block_id_type& block_id) { - return chain_plug->chain().fetch_block_by_id(block_id); - }, - [this](session_base* conn) { - boost::asio::post(app().get_io_service(), [conn, this]() { - connections.erase(connections.find(conn)); - }); - }, _log)); + // connections set must only be modified by main thread; run listener on ship thread so sockets use default executor of the ship thread + fc::create_listener(thread_pool.get_executor(), _log, accept_timeout, address, "", [this](Protocol::socket&& socket) { + boost::asio::post(app().get_io_service(), [this, socket{std::move(socket)}]() mutable { + catch_and_log([this, &socket]() { + connections.emplace(new session(std::move(socket), boost::asio::make_strand(thread_pool.get_executor()), chain_plug->chain(), + trace_log, chain_state_log, finality_data_log, + [this](const chain::block_num_type block_num) { + return get_block_id(block_num); + }, + [this](const chain::block_id_type& block_id) { + return chain_plug->chain().fetch_block_by_id(block_id); + }, + [this](session_base* conn) { + boost::asio::post(app().get_io_service(), [conn, this]() { + connections.erase(connections.find(conn)); + }); + }, _log)); + }); }); }); } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index be8d73d774..a99d2b7d96 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -55,6 +55,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/cli_test.py ${CMAKE_CURRENT_BINARY_DI configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_reqs_across_svnn_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_reqs_across_svnn_test.py COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_test.py COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_streamer_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_streamer_test.py COPYONLY) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_kill_client_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_kill_client_test.py COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/bridge_for_fork_test_shape.json ${CMAKE_CURRENT_BINARY_DIR}/bridge_for_fork_test_shape.json COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/lib_advance_test.py ${CMAKE_CURRENT_BINARY_DIR}/lib_advance_test.py COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/http_plugin_test.py ${CMAKE_CURRENT_BINARY_DIR}/http_plugin_test.py COPYONLY) @@ -187,6 +188,8 @@ add_test(NAME ship_streamer_if_test COMMAND tests/ship_streamer_test.py -v --num set_property(TEST ship_streamer_if_test PROPERTY LABELS long_running_tests) add_test(NAME ship_streamer_if_fetch_finality_data_test COMMAND tests/ship_streamer_test.py -v --num-clients 10 --activate-if --finality-data-history ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) set_property(TEST ship_streamer_if_fetch_finality_data_test PROPERTY LABELS long_running_tests) +add_test(NAME ship_kill_client_test COMMAND tests/ship_kill_client_test.py -v --num-clients 20 ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) +set_property(TEST ship_kill_client_test PROPERTY LABELS nonparallelizable_tests) add_test(NAME p2p_dawn515_test COMMAND tests/p2p_tests/dawn_515/test.sh WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) set_property(TEST p2p_dawn515_test PROPERTY LABELS nonparallelizable_tests) diff --git a/tests/TestHarness/Node.py b/tests/TestHarness/Node.py index 0ccedfda0c..8cfab82bac 100644 --- a/tests/TestHarness/Node.py +++ b/tests/TestHarness/Node.py @@ -687,6 +687,27 @@ def findInLog(self, searchStr): return True return False + def linesInLog(self, searchStr): + dataDir=Utils.getNodeDataDir(self.nodeId) + files=Node.findStderrFiles(dataDir) + lines=[] + for file in files: + with open(file, 'r') as f: + for line in f: + if searchStr in line: + lines.append(line) + return lines + + def countInLog(self, searchStr) -> int: + dataDir=Utils.getNodeDataDir(self.nodeId) + files=Node.findStderrFiles(dataDir) + count = 0 + for file in files: + with open(file, 'r') as f: + contents = f.read() + count += contents.count(searchStr) + return count + # verify only one or two 'Starting block' per block number unless block is restarted def verifyStartingBlockMessages(self): dataDir=Utils.getNodeDataDir(self.nodeId) diff --git a/tests/abieos b/tests/abieos index 5bd9a54824..4663982fc2 160000 --- a/tests/abieos +++ b/tests/abieos @@ -1 +1 @@ -Subproject commit 5bd9a5482411e1a3054792f4210ce2bebe29d9a5 +Subproject commit 4663982fc2c61d85a5d56e2277862ee46699ac3b diff --git a/tests/nodeos_startup_catchup.py b/tests/nodeos_startup_catchup.py index 760715f3e6..669e1d7233 100755 --- a/tests/nodeos_startup_catchup.py +++ b/tests/nodeos_startup_catchup.py @@ -135,9 +135,9 @@ def waitForNodeStarted(node): transactionsPerBlock=targetTpsPerGenerator*trxGeneratorCnt*timePerBlock/1000 steadyStateWait=20 startBlockNum=blockNum+steadyStateWait - numBlocks=20 + numBlocks=400 endBlockNum=startBlockNum+numBlocks - waitForBlock(node0, endBlockNum) + waitForBlock(node0, endBlockNum, timeout=numBlocks) steadyStateWindowTrxs=0 steadyStateAvg=0 steadyStateWindowBlks=0 @@ -182,23 +182,18 @@ def waitForNodeStarted(node): Print("Shutdown catchup node and validate exit code") catchupNode.interruptAndVerifyExitStatus(60) - # every other catchup make a lib catchup - if catchup_num % 2 == 0: - Print(f"Wait for producer to advance lib past head of catchup {catchupHead}") - # catchupHead+5 to allow for advancement of head during shutdown of catchupNode - waitForBlock(node0, catchupHead+5, timeout=twoRoundsTimeout*2, blockType=BlockType.lib) + Print(f"Wait for producer to advance lib past head of catchup {catchupHead}") + # catchupHead+5 to allow for advancement of head during shutdown of catchupNode + waitForBlock(node0, catchupHead+5, timeout=twoRoundsTimeout*2, blockType=BlockType.lib) Print("Restart catchup node") - addSwapFlags = None - if catchup_num % 3 == 0: - addSwapFlags = {"--block-log-retain-blocks": "0", "--delete-all": ""} - catchupNode.relaunch(skipGenesis=False, addSwapFlags=addSwapFlags) + catchupNode.relaunch(skipGenesis=False) waitForNodeStarted(catchupNode) lastCatchupLibNum=lib(catchupNode) Print("Verify catchup node is advancing") # verify catchup node is advancing to producer - waitForBlock(catchupNode, lastCatchupLibNum+1, timeout=twoRoundsTimeout, blockType=BlockType.lib) + waitForBlock(catchupNode, lastLibNum, timeout=twoRoundsTimeout, blockType=BlockType.lib) Print("Verify producer is still advancing LIB") lastLibNum=lib(node0) @@ -209,20 +204,56 @@ def waitForNodeStarted(node): # verify catchup node is advancing to producer waitForBlock(catchupNode, lastLibNum, timeout=(numBlocksToCatchup/2 + 60), blockType=BlockType.lib) catchupNode.interruptAndVerifyExitStatus(60) + + Print("Verify catchup without a block log") + addSwapFlags = {"--block-log-retain-blocks": "0", "--delete-all": ""} + catchupNode.relaunch(skipGenesis=False, addSwapFlags=addSwapFlags) + waitForNodeStarted(catchupNode) + lastCatchupLibNum=lib(catchupNode) + + Print("Verify catchup node is advancing without block log") + # verify catchup node is advancing to producer + waitForBlock(catchupNode, lastLibNum+1, timeout=twoRoundsTimeout, blockType=BlockType.lib) + catchupNode.interruptAndVerifyExitStatus(60) catchupNode.popenProc=None - logFile = Utils.getNodeDataDir(catchupNodeNum) + "/stderr.txt" - f = open(logFile) - contents = f.read() + # Verify not syncing ahead of sync-fetch-span + sync_fetch_span = 1000 # default + irreversible = False + if catchupNode.nodeId in specificExtraNodeosArgs: + m = re.search(r"sync-fetch-span (\d+)", specificExtraNodeosArgs[catchupNode.nodeId]) + if m is not None: + sync_fetch_span = int(m.group(1)) + irreversible = re.search(r"irreversible", specificExtraNodeosArgs[catchupNode.nodeId]) is not None + Print(f"Verify request span for sync-fetch-span {sync_fetch_span} of {catchupNode.data_dir}") + lines = catchupNode.linesInLog("requesting range") + for line in lines: + m = re.search(r"requesting range (\d+) to (\d+), fhead (\d+), lib (\d+)", line) + if m is not None: + startBlockNum=int(m.group(1)) + endBlockNum=int(m.group(2)) + fhead=int(m.group(3)) + libNum=int(m.group(4)) + if endBlockNum-startBlockNum > sync_fetch_span: + errorExit(f"Requested range exceeds sync-fetch-span {sync_fetch_span}: {line}") + if irreversible: + # for now just use a larger tolerance, later when the logs include calculated lib this can be more precise + # See https://github.com/AntelopeIO/spring/issues/806 + if endBlockNum > fhead and fhead > libNum and endBlockNum - fhead > (sync_fetch_span*10): + errorExit(f"Requested range too far head of fork head {fhead} in irreversible mode, sync-fetch-span {sync_fetch_span}: {line}") + else: + if endBlockNum > fhead and fhead > libNum and endBlockNum - fhead > (sync_fetch_span*2-1): + errorExit(f"Requested range too far head of fork head {fhead} sync-fetch-span {sync_fetch_span}: {line}") + # See https://github.com/AntelopeIO/spring/issues/81 for fix to reduce the number of expected unlinkable blocks # Test verifies LIB is advancing, check to see that not too many unlinkable block exceptions are generated # while syncing up to head. - numUnlinkable = contents.count("unlinkable_block") + numUnlinkable = catchupNode.countInLog("unlinkable_block") numUnlinkableAllowed = 500 - Print(f"Node{catchupNodeNum} has {numUnlinkable} unlinkable_block in {logFile}") + Print(f"Node{catchupNodeNum} has {numUnlinkable} unlinkable_block in {catchupNode.data_dir}") if numUnlinkable > numUnlinkableAllowed: errorExit(f"Node{catchupNodeNum} has {numUnlinkable} which is more than the configured " - f"allowed {numUnlinkableAllowed} unlinkable blocks: {logFile}.") + f"allowed {numUnlinkableAllowed} unlinkable blocks: {catchupNode.data_dir}.") testSuccessful=True diff --git a/tests/ship_kill_client_test.py b/tests/ship_kill_client_test.py new file mode 100755 index 0000000000..a050271063 --- /dev/null +++ b/tests/ship_kill_client_test.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python3 + +import time +import json +import os +import shutil +import signal +import sys + +from TestHarness import Account, Cluster, TestHelper, Utils, WalletMgr +from TestHarness.TestHelper import AppArgs + +############################################################### +# ship_kill_client_test +# +# Setup a nodeos with SHiP (state_history_plugin). +# Connect a number of clients and then kill the clients and shutdown nodoes. +# nodeos should exit cleanly and not hang or SEGfAULT. +# +############################################################### + +Print=Utils.Print + +appArgs = AppArgs() +extraArgs = appArgs.add(flag="--num-clients", type=int, help="How many ship_streamers should be started", default=1) +args = TestHelper.parse_args({"--dump-error-details","--keep-logs","-v","--leave-running","--unshared"}, applicationSpecificArgs=appArgs) + +Utils.Debug=args.v +cluster=Cluster(unshared=args.unshared, keepRunning=args.leave_running, keepLogs=args.keep_logs) +dumpErrorDetails=args.dump_error_details +walletPort=TestHelper.DEFAULT_WALLET_PORT + +# simpler to have two producer nodes then setup different accounts for trx generator +totalProducerNodes=2 +totalNonProducerNodes=1 +totalNodes=totalProducerNodes+totalNonProducerNodes + +walletMgr=WalletMgr(True, port=walletPort) +testSuccessful=False + +WalletdName=Utils.EosWalletName +shipTempDir=None + +try: + TestHelper.printSystemInfo("BEGIN") + + cluster.setWalletMgr(walletMgr) + Print("Stand up cluster") + + shipNodeNum = 2 + specificExtraNodeosArgs={} + specificExtraNodeosArgs[shipNodeNum]="--plugin eosio::state_history_plugin --trace-history --chain-state-history --finality-data-history --state-history-stride 200 --plugin eosio::net_api_plugin --plugin eosio::producer_api_plugin " + + if cluster.launch(pnodes=totalProducerNodes, loadSystemContract=False, + totalNodes=totalNodes, totalProducers=totalProducerNodes, activateIF=True, biosFinalizer=False, + specificExtraNodeosArgs=specificExtraNodeosArgs) is False: + Utils.cmdError("launcher") + Utils.errorExit("Failed to stand up cluster.") + + # verify nodes are in sync and advancing + cluster.waitOnClusterSync(blockAdvancing=5) + Print("Cluster in Sync") + + prodNode0 = cluster.getNode(0) + prodNode1 = cluster.getNode(1) + shipNode = cluster.getNode(shipNodeNum) + + # cluster.waitOnClusterSync(blockAdvancing=3) + start_block_num = shipNode.getBlockNum() + + #verify nodes are in sync and advancing + cluster.waitOnClusterSync(blockAdvancing=3) + Print("Shutdown unneeded bios node") + cluster.biosNode.kill(signal.SIGTERM) + + Print("Configure and launch txn generators") + targetTpsPerGenerator = 10 + testTrxGenDurationSec=60*60 + numTrxGenerators=2 + cluster.launchTrxGenerators(contractOwnerAcctName=cluster.eosioAccount.name, acctNamesList=[cluster.defproduceraAccount.name, cluster.defproducerbAccount.name], + acctPrivKeysList=[cluster.defproduceraAccount.activePrivateKey,cluster.defproducerbAccount.activePrivateKey], nodeId=prodNode1.nodeId, + tpsPerGenerator=targetTpsPerGenerator, numGenerators=numTrxGenerators, durationSec=testTrxGenDurationSec, + waitToComplete=False) + + status = cluster.waitForTrxGeneratorsSpinup(nodeId=prodNode1.nodeId, numGenerators=numTrxGenerators) + assert status is not None and status is not False, "ERROR: Failed to spinup Transaction Generators" + + prodNode1.waitForProducer("defproducera") + + block_range = 100000 # we are going to kill the client, so just make this a huge number + end_block_num = start_block_num + block_range + + shipClient = "tests/ship_streamer" + cmd = f"{shipClient} --start-block-num {start_block_num} --end-block-num {end_block_num} --fetch-block --fetch-traces --fetch-deltas --fetch-finality-data" + if Utils.Debug: Utils.Print(f"cmd: {cmd}") + clients = [] + files = [] + shipTempDir = os.path.join(Utils.DataDir, "ship") + os.makedirs(shipTempDir, exist_ok = True) + shipClientFilePrefix = os.path.join(shipTempDir, "client") + + for i in range(0, args.num_clients): + outFile = open(f"{shipClientFilePrefix}{i}.out", "w") + errFile = open(f"{shipClientFilePrefix}{i}.err", "w") + Print(f"Start client {i}") + popen=Utils.delayedCheckOutput(cmd, stdout=outFile, stderr=errFile) + clients.append((popen, cmd)) + files.append((outFile, errFile)) + Print(f"Client {i} started, Ship node head is: {shipNode.getBlockNum()}") + + + # allow time for all clients to connect + shipNode.waitForHeadToAdvance(5) + shipNode.waitForLibToAdvance() + + Print(f"Kill all {args.num_clients} clients and ship node") + for index, (popen, _) in zip(range(len(clients)), clients): + popen.kill() + if index == len(clients)/2: + shipNode.kill(signal.SIGTERM) + assert not shipNode.verifyAlive(), "ship node did not shutdown" + + testSuccessful = True +finally: + TestHelper.shutdown(cluster, walletMgr, testSuccessful=testSuccessful, dumpErrorDetails=dumpErrorDetails) + if shipTempDir is not None: + if testSuccessful and not args.keep_logs: + shutil.rmtree(shipTempDir, ignore_errors=True) + +errorCode = 0 if testSuccessful else 1 +exit(errorCode) diff --git a/unittests/block_log_get_block_tests.cpp b/unittests/block_log_get_block_tests.cpp new file mode 100644 index 0000000000..61019859af --- /dev/null +++ b/unittests/block_log_get_block_tests.cpp @@ -0,0 +1,112 @@ +#include +#include // for fc_exception_message_contains + +#include + +#include + +using namespace eosio::chain; +using namespace eosio::testing; + +struct block_log_get_block_fixture { + block_log_get_block_fixture() { + block_dir = dir.path(); + + log.emplace(block_dir); + + log->reset(genesis_state(), std::make_shared()); + BOOST_REQUIRE_EQUAL(log->first_block_num(), 1u); + BOOST_REQUIRE_EQUAL(log->head()->block_num(), 1u); + + for(uint32_t i = 2; i < last_block_num + 1; ++i) { + auto p = std::make_shared(); + p->previous._hash[0] = fc::endian_reverse_u32(i-1); + log->append(p, p->calculate_id()); + } + BOOST_REQUIRE_EQUAL(log->head()->block_num(), last_block_num); + }; + + void test_read_serialized_block(const block_log& blog, uint32_t block_num) { + // read the serialized block + auto serialized_block = blog.read_serialized_block_by_num(block_num); + + // the serialized block can be deserialized + BOOST_REQUIRE_NO_THROW(fc::raw::unpack(serialized_block)); + + // read the signed block by regular read_block_by_num + signed_block_ptr block = blog.read_block_by_num(block_num); + BOOST_REQUIRE(block); + + // the serialized block should match the signed block's serialized form + BOOST_REQUIRE(serialized_block == fc::raw::pack(*block)); + } + + fc::temp_directory dir; + std::filesystem::path block_dir; + std::optional log; + static constexpr uint32_t last_block_num = 50; +}; + +BOOST_AUTO_TEST_SUITE(block_log_get_block_tests) + +BOOST_FIXTURE_TEST_CASE(basic_block_log, block_log_get_block_fixture) try { + // test reading a non-last-block + test_read_serialized_block(*log, last_block_num - 2); + + // test reading last block + test_read_serialized_block(*log, last_block_num); +} FC_LOG_AND_RETHROW() + +BOOST_FIXTURE_TEST_CASE(splitted_block_log, block_log_get_block_fixture) try { + uint32_t stride = last_block_num / 2; + auto retained_dir = block_dir / "retained"; + + block_log::split_blocklog(block_dir, retained_dir, stride); + + std::filesystem::remove(block_dir / "blocks.log"); + std::filesystem::remove(block_dir / "blocks.index"); + + block_log blog(block_dir, partitioned_blocklog_config{ .retained_dir = retained_dir }); + + // test reading a block in the first partitioned log + test_read_serialized_block(blog, stride - 1); + + // test reading a block in the second partitioned log + test_read_serialized_block(blog, stride + 1); +} FC_LOG_AND_RETHROW() + +BOOST_FIXTURE_TEST_CASE(nonexisting_block_num, block_log_get_block_fixture) try { + // read a non-existing block + auto serialized_block = log->read_serialized_block_by_num(last_block_num + 1); + + // should return an empty vector of char + BOOST_REQUIRE(serialized_block.empty()); +} FC_LOG_AND_RETHROW() + +BOOST_FIXTURE_TEST_CASE(corrupted_next_block_position, block_log_get_block_fixture) try { + // intentionally modify block position for next block (which is the last block) + uint64_t bad_pos = sizeof(uint64_t) * (last_block_num); + fc::datastream index_file; + index_file.set_file_path(block_dir / "blocks.index"); + index_file.open(fc::cfile::update_rw_mode); + index_file.seek_end(-sizeof(uint64_t)); + index_file.write((char*)&bad_pos, sizeof(bad_pos)); + index_file.flush(); + index_file.close(); + + BOOST_CHECK_EXCEPTION(log->read_serialized_block_by_num(last_block_num - 1), + block_log_exception, + fc_exception_message_contains("next block position")); +} FC_LOG_AND_RETHROW() + +BOOST_FIXTURE_TEST_CASE(corrupted_file_size, block_log_get_block_fixture) try { + // corrupt file size by truncating it + auto new_size = log->get_block_pos(last_block_num) + sizeof(uint64_t); + std::filesystem::resize_file(block_dir / "blocks.log", new_size); + + BOOST_CHECK_EXCEPTION(log->read_serialized_block_by_num(last_block_num), + block_log_exception, + fc_exception_message_contains("block log file size")); +} FC_LOG_AND_RETHROW() + +BOOST_AUTO_TEST_SUITE_END() diff --git a/unittests/finalizer_tests.cpp b/unittests/finalizer_tests.cpp index e44f228138..d0fc4ee5f5 100644 --- a/unittests/finalizer_tests.cpp +++ b/unittests/finalizer_tests.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include using namespace eosio; @@ -95,6 +96,16 @@ void set_fsi(my_finalizers_t& fset, const std::vector& keys, const F ((fset.set_fsi(keys[I].pubkey, fsi[I])), ...); } +// sleep for n periods of the file clock +// -------------------------------------- +void sleep_for_n_file_clock_periods(uint32_t n) { + using file_clock = std::chrono::file_clock; + auto n_periods = file_clock::duration(n); + auto sleep_duration = std::chrono::duration_cast(n_periods); + + std::this_thread::sleep_for(sleep_duration); +} + BOOST_AUTO_TEST_SUITE(finalizer_tests) BOOST_AUTO_TEST_CASE( basic_finalizer_safety_file_io ) try { @@ -228,32 +239,37 @@ BOOST_AUTO_TEST_CASE( finalizer_safety_file_io ) try { } FC_LOG_AND_RETHROW() +namespace fs = std::filesystem; -BOOST_AUTO_TEST_CASE( finalizer_safety_file_versioning ) try { - namespace fs = std::filesystem; +void create_fsi_reference(my_finalizers_t& fset) { + std::vector keys = create_keys(3); + std::vector fsi = create_random_fsi(3); - fs::path test_data_path { UNITTEST_TEST_DATA_DIR }; - auto fsi_reference_dir = test_data_path / "fsi"; + bls_pub_priv_key_map_t local_finalizers = create_local_finalizers<0, 1, 2>(keys); + fset.set_keys(local_finalizers); + set_fsi(fset, keys, fsi); +} - auto create_fsi_reference = [&](my_finalizers_t& fset) { - std::vector keys = create_keys(3); - std::vector fsi = create_random_fsi(3); +void create_fsi_reference_file(const fs::path& safety_file_path) { + my_finalizers_t fset{safety_file_path}; + create_fsi_reference(fset); + fset.save_finalizer_safety_info(); +} - bls_pub_priv_key_map_t local_finalizers = create_local_finalizers<0, 1, 2>(keys); - fset.set_keys(local_finalizers); - set_fsi(fset, keys, fsi); - }; +fs::path mk_versioned_fsi_file_path(uint32_t v) { + fs::path test_data_path { UNITTEST_TEST_DATA_DIR }; + auto fsi_reference_dir = test_data_path / "fsi"; - auto create_fsi_reference_file = [&](const fs::path& safety_file_path) { - my_finalizers_t fset{safety_file_path}; - create_fsi_reference(fset); - fset.save_finalizer_safety_info(); - }; + return fsi_reference_dir / ("safety_v"s + std::to_string(v) + ".dat"); +} - auto mk_versioned_fsi_file_path = [&](uint32_t v) { - return fsi_reference_dir / ("safety_v"s + std::to_string(v) + ".dat"); - }; +std::string read_file(const fs::path& path) { + std::string res; + fc::read_file_contents(path, res); + return res; +} +BOOST_AUTO_TEST_CASE( finalizer_safety_file_versioning ) try { auto current_version = my_finalizers_t::current_safety_file_version; // run this unittest with the option `-- --save-fsi-ref` to save ref file for the current version. @@ -288,7 +304,8 @@ BOOST_AUTO_TEST_CASE( finalizer_safety_file_versioning ) try { auto ref_path = mk_versioned_fsi_file_path(i); auto copy_path = tempdir.path() / ref_path.filename(); fs::copy_file(ref_path, copy_path, fs::copy_options::none); - std::this_thread::sleep_for(std::chrono::milliseconds{10}); + + sleep_for_n_file_clock_periods(2); // first load the reference file in the old format, and then save it in the new version format // ------------------------------------------------------------------------------------------- @@ -307,5 +324,52 @@ BOOST_AUTO_TEST_CASE( finalizer_safety_file_versioning ) try { } FC_LOG_AND_RETHROW() +// Verify that we have not changed the fsi file serialiization +// ------------------------------------------------------------ +BOOST_AUTO_TEST_CASE( finalizer_safety_file_serialization_unchanged ) try { + auto current_version = my_finalizers_t::current_safety_file_version; + auto ref_path = mk_versioned_fsi_file_path(current_version); // the saved file for current_version + + fc::temp_directory tempdir; + auto tmp_path = tempdir.path() / "new_safety.dat"; + create_fsi_reference_file(tmp_path); // save a new file in tmp_path + + BOOST_REQUIRE(read_file(ref_path) == read_file(tmp_path)); + +} FC_LOG_AND_RETHROW() + + +// Verify that the current version of safety.dat file committed to the repo can be loaded on +// nodeos startup (it is not saved until we actually vote, and voting would change the fsi). +// ----------------------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE( finalizer_safety_file_serialization_io ) try { + fc::temp_directory tempdir; + auto [cfg, genesis_state] = tester::default_config(tempdir); + + fs::path tmp_path = cfg.finalizers_dir / config::safety_filename; + + auto current_version = my_finalizers_t::current_safety_file_version; + fs::path ref_path = mk_versioned_fsi_file_path(current_version); // the saved file for current_version + + tester t( tempdir, true ); + + fs::create_directory(cfg.finalizers_dir); + fs::copy_file(ref_path, tmp_path, fs::copy_options::none); + auto initial_time = fs::last_write_time(tmp_path); + + sleep_for_n_file_clock_periods(2); + + // set finalizer, so that the file is overwritten. set the last one so that order is unchanged. + std::vector keys = create_keys(3); + bls_pub_priv_key_map_t local_finalizer_keys; + local_finalizer_keys[keys.back().pubkey_str] = keys.back().privkey_str; + t.control->set_node_finalizer_keys(local_finalizer_keys); + + // Since we didn't vote, the file time should not have changed. + auto last_time = fs::last_write_time(tmp_path); + BOOST_REQUIRE(last_time == initial_time); + +} FC_LOG_AND_RETHROW() + BOOST_AUTO_TEST_SUITE_END()