diff --git a/.gitignore b/.gitignore index 4c6f4f1a0f8..8bf88545cc3 100644 --- a/.gitignore +++ b/.gitignore @@ -87,3 +87,6 @@ var/lib/node_* .DS_Store !*.swagger.* + +node_modules +package-lock.json diff --git a/README.md b/README.md index 33d875d540f..0ae3cd8b9b4 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ apt-get update && apt-get install \ libusb-1.0-0-dev \ libzstd-dev \ llvm-11-dev \ + npm \ ninja-build \ pkg-config \ time @@ -66,6 +67,7 @@ I highly recommend the ccache options. They don't speed up the first clean build ``` cd build +npm install # Runs parallelizable tests in parallel. This runs much faster when # -DDISABLE_WASM_SPEC_TESTS=yes is used. diff --git a/libraries/CMakeLists.txt b/libraries/CMakeLists.txt index f417a3da84c..2bb99cb7913 100644 --- a/libraries/CMakeLists.txt +++ b/libraries/CMakeLists.txt @@ -7,6 +7,7 @@ add_subdirectory( appbase ) add_subdirectory( chain ) add_subdirectory( testing ) add_subdirectory( version ) +add_subdirectory( state_history ) set(USE_EXISTING_SOFTFLOAT ON CACHE BOOL "use pre-exisiting softfloat lib") set(ENABLE_TOOLS OFF CACHE BOOL "Build tools") diff --git a/libraries/state_history/.clang-format b/libraries/state_history/.clang-format new file mode 100644 index 00000000000..42dd5b7832c --- /dev/null +++ b/libraries/state_history/.clang-format @@ -0,0 +1,8 @@ +BasedOnStyle: LLVM +IndentWidth: 3 +ColumnLimit: 120 +PointerAlignment: Left +AlwaysBreakTemplateDeclarations: true +AlignConsecutiveAssignments: true +AlignConsecutiveDeclarations: true +BreakConstructorInitializers: BeforeComma diff --git a/libraries/state_history/CMakeLists.txt b/libraries/state_history/CMakeLists.txt new file mode 100644 index 00000000000..5a27b819f19 --- /dev/null +++ b/libraries/state_history/CMakeLists.txt @@ -0,0 +1,17 @@ +file(GLOB HEADERS "include/eosio/state-history/*.hpp") + +add_library( state_history + abi.cpp + compression.cpp + create_deltas.cpp + trace_converter.cpp + ${HEADERS} + ) + +target_link_libraries( state_history + PUBLIC eosio_chain fc chainbase softfloat + ) + +target_include_directories( state_history + PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_SOURCE_DIR}/../wasm-jit/Include" + ) diff --git a/plugins/state_history_plugin/state_history_plugin_abi.cpp b/libraries/state_history/abi.cpp similarity index 99% rename from plugins/state_history_plugin/state_history_plugin_abi.cpp rename to libraries/state_history/abi.cpp index 766c747d3d1..628836bae71 100644 --- a/plugins/state_history_plugin/state_history_plugin_abi.cpp +++ b/libraries/state_history/abi.cpp @@ -17,7 +17,8 @@ extern const char* const state_history_plugin_abi = R"({ { "name": "trace_begin_block", "type": "uint32" }, { "name": "trace_end_block", "type": "uint32" }, { "name": "chain_state_begin_block", "type": "uint32" }, - { "name": "chain_state_end_block", "type": "uint32" } + { "name": "chain_state_end_block", "type": "uint32" }, + { "name": "chain_id", "type": "checksum256$" } ] }, { diff --git a/libraries/state_history/compression.cpp b/libraries/state_history/compression.cpp new file mode 100644 index 00000000000..a19238a8f82 --- /dev/null +++ b/libraries/state_history/compression.cpp @@ -0,0 +1,32 @@ +#include + +#include +#include +#include + +namespace eosio { +namespace state_history { + +namespace bio = boost::iostreams; +bytes zlib_compress_bytes(const bytes& in) { + bytes out; + bio::filtering_ostream comp; + comp.push(bio::zlib_compressor(bio::zlib::default_compression)); + comp.push(bio::back_inserter(out)); + bio::write(comp, in.data(), in.size()); + bio::close(comp); + return out; +} + +bytes zlib_decompress(const bytes& in) { + bytes out; + bio::filtering_ostream decomp; + decomp.push(bio::zlib_decompressor()); + decomp.push(bio::back_inserter(out)); + bio::write(decomp, in.data(), in.size()); + bio::close(decomp); + return out; +} + +} // namespace state_history +} // namespace eosio diff --git a/libraries/state_history/create_deltas.cpp b/libraries/state_history/create_deltas.cpp new file mode 100644 index 00000000000..74e62d21c67 --- /dev/null +++ b/libraries/state_history/create_deltas.cpp @@ -0,0 +1,141 @@ +#include +#include + +namespace eosio { +namespace state_history { + +template +bool include_delta(const T& old, const T& curr) { + return true; +} + +bool include_delta(const chain::table_id_object& old, const chain::table_id_object& curr) { + return old.payer != curr.payer; +} + +bool include_delta(const chain::resource_limits::resource_limits_object& old, + const chain::resource_limits::resource_limits_object& curr) { + return // + old.net_weight != curr.net_weight || // + old.cpu_weight != curr.cpu_weight || // + old.ram_bytes != curr.ram_bytes; +} + +bool include_delta(const chain::resource_limits::resource_limits_state_object& old, + const chain::resource_limits::resource_limits_state_object& curr) { + return // + old.average_block_net_usage.last_ordinal != curr.average_block_net_usage.last_ordinal || // + old.average_block_net_usage.value_ex != curr.average_block_net_usage.value_ex || // + old.average_block_net_usage.consumed != curr.average_block_net_usage.consumed || // + old.average_block_cpu_usage.last_ordinal != curr.average_block_cpu_usage.last_ordinal || // + old.average_block_cpu_usage.value_ex != curr.average_block_cpu_usage.value_ex || // + old.average_block_cpu_usage.consumed != curr.average_block_cpu_usage.consumed || // + old.total_net_weight != curr.total_net_weight || // + old.total_cpu_weight != curr.total_cpu_weight || // + old.total_ram_bytes != curr.total_ram_bytes || // + old.virtual_net_limit != curr.virtual_net_limit || // + old.virtual_cpu_limit != curr.virtual_cpu_limit; +} + +bool include_delta(const chain::account_metadata_object& old, const chain::account_metadata_object& curr) { + return // + old.name != curr.name || // + old.is_privileged() != curr.is_privileged() || // + old.last_code_update != curr.last_code_update || // + old.vm_type != curr.vm_type || // + old.vm_version != curr.vm_version || // + old.code_hash != curr.code_hash; +} + +bool include_delta(const chain::code_object& old, const chain::code_object& curr) { // + return false; +} + +bool include_delta(const chain::protocol_state_object& old, const chain::protocol_state_object& curr) { + return old.activated_protocol_features != curr.activated_protocol_features; +} + +std::vector create_deltas(const chainbase::database& db, bool full_snapshot) { + std::vector deltas; + const auto& table_id_index = db.get_index(); + std::map removed_table_id; + for (auto& rem : table_id_index.stack().back().removed_values) + removed_table_id[rem.first._id] = &rem.second; + + auto get_table_id = [&](uint64_t tid) -> const chain::table_id_object& { + auto obj = table_id_index.find(tid); + if (obj) + return *obj; + auto it = removed_table_id.find(tid); + EOS_ASSERT(it != removed_table_id.end(), chain::plugin_exception, "can not found table id ${tid}", ("tid", tid)); + return *it->second; + }; + + auto pack_row = [&](auto& row) { return fc::raw::pack(make_history_serial_wrapper(db, row)); }; + auto pack_contract_row = [&](auto& row) { + return fc::raw::pack(make_history_context_wrapper(db, get_table_id(row.t_id._id), row)); + }; + + auto process_table = [&](auto* name, auto& index, auto& pack_row) { + if (full_snapshot) { + if (index.indices().empty()) + return; + deltas.push_back({}); + auto& delta = deltas.back(); + delta.name = name; + for (auto& row : index.indices()) + delta.rows.obj.emplace_back(true, pack_row(row)); + } else { + if (index.stack().empty()) + return; + auto& undo = index.stack().back(); + if (undo.old_values.empty() && undo.new_ids.empty() && undo.removed_values.empty()) + return; + deltas.push_back({}); + auto& delta = deltas.back(); + delta.name = name; + for (auto& old : undo.old_values) { + auto& row = index.get(old.first); + if (include_delta(old.second, row)) + delta.rows.obj.emplace_back(true, pack_row(row)); + } + for (auto& old : undo.removed_values) + delta.rows.obj.emplace_back(false, pack_row(old.second)); + for (auto id : undo.new_ids) { + auto& row = index.get(id); + delta.rows.obj.emplace_back(true, pack_row(row)); + } + } + }; + + process_table("account", db.get_index(), pack_row); + process_table("account_metadata", db.get_index(), pack_row); + process_table("code", db.get_index(), pack_row); + + process_table("contract_table", db.get_index(), pack_row); + process_table("contract_row", db.get_index(), pack_contract_row); + process_table("contract_index64", db.get_index(), pack_contract_row); + process_table("contract_index128", db.get_index(), pack_contract_row); + process_table("contract_index256", db.get_index(), pack_contract_row); + process_table("contract_index_double", db.get_index(), pack_contract_row); + process_table("contract_index_long_double", db.get_index(), pack_contract_row); + + process_table("global_property", db.get_index(), pack_row); + process_table("generated_transaction", db.get_index(), pack_row); + process_table("protocol_state", db.get_index(), pack_row); + + process_table("permission", db.get_index(), pack_row); + process_table("permission_link", db.get_index(), pack_row); + + process_table("resource_limits", db.get_index(), pack_row); + process_table("resource_usage", db.get_index(), pack_row); + process_table("resource_limits_state", db.get_index(), + pack_row); + process_table("resource_limits_config", db.get_index(), + pack_row); + + return deltas; +} + +} // namespace state_history +} // namespace eosio diff --git a/libraries/state_history/include/eosio/state_history/compression.hpp b/libraries/state_history/include/eosio/state_history/compression.hpp new file mode 100644 index 00000000000..20748ee42ac --- /dev/null +++ b/libraries/state_history/include/eosio/state_history/compression.hpp @@ -0,0 +1,14 @@ +#pragma once + +#include + +namespace eosio { +namespace state_history { + +using chain::bytes; + +bytes zlib_compress_bytes(const bytes& in); +bytes zlib_decompress(const bytes& in); + +} // namespace state_history +} // namespace eosio diff --git a/libraries/state_history/include/eosio/state_history/create_deltas.hpp b/libraries/state_history/include/eosio/state_history/create_deltas.hpp new file mode 100644 index 00000000000..81e4969f197 --- /dev/null +++ b/libraries/state_history/include/eosio/state_history/create_deltas.hpp @@ -0,0 +1,11 @@ +#pragma once + +#include + +namespace eosio { +namespace state_history { + +std::vector create_deltas(const chainbase::database& db, bool full_snapshot); + +} // namespace state_history +} // namespace eosio diff --git a/plugins/state_history_plugin/include/eosio/state_history_plugin/state_history_log.hpp b/libraries/state_history/include/eosio/state_history/log.hpp similarity index 80% rename from plugins/state_history_plugin/include/eosio/state_history_plugin/state_history_log.hpp rename to libraries/state_history/include/eosio/state_history/log.hpp index 0848ad9037e..a14bdaaafc4 100644 --- a/plugins/state_history_plugin/include/eosio/state_history_plugin/state_history_log.hpp +++ b/libraries/state_history/include/eosio/state_history/log.hpp @@ -7,9 +7,9 @@ #include #include #include -#include #include -#include +#include + namespace eosio { /* @@ -87,52 +87,39 @@ class state_history_log { template void write_entry(const state_history_log_header& header, const chain::block_id_type& prev_id, F write_payload) { - try { - auto block_num = chain::block_header::num_from_id(header.block_id); - EOS_ASSERT(_begin_block == _end_block || block_num <= _end_block, chain::plugin_exception, - "missed a block in ${name}.log", ("name", name)); + auto block_num = chain::block_header::num_from_id(header.block_id); + EOS_ASSERT(_begin_block == _end_block || block_num <= _end_block, chain::plugin_exception, + "missed a block in ${name}.log", ("name", name)); - if (_begin_block != _end_block && block_num > _begin_block) { - if (block_num == _end_block) { - EOS_ASSERT(prev_id == last_block_id, chain::plugin_exception, "missed a fork change in ${name}.log", - ("name", name)); - } else { - state_history_log_header prev; - get_entry(block_num - 1, prev); - EOS_ASSERT(prev_id == prev.block_id, chain::plugin_exception, "missed a fork change in ${name}.log", - ("name", name)); - } + if (_begin_block != _end_block && block_num > _begin_block) { + if (block_num == _end_block) { + EOS_ASSERT(prev_id == last_block_id, chain::plugin_exception, "missed a fork change in ${name}.log", + ("name", name)); + } else { + state_history_log_header prev; + get_entry(block_num - 1, prev); + EOS_ASSERT(prev_id == prev.block_id, chain::plugin_exception, "missed a fork change in ${name}.log", + ("name", name)); } + } - if (block_num < _end_block) - truncate(block_num); - log.seek_end(0); - uint64_t pos = log.tellp(); - write_header(header); - write_payload(log); - uint64_t end = log.tellp(); - EOS_ASSERT(end == pos + state_history_log_header_serial_size + header.payload_size, chain::plugin_exception, - "wrote payload with incorrect size to ${name}.log", ("name", name)); - log.write((char*)&pos, sizeof(pos)); + if (block_num < _end_block) + truncate(block_num); + log.seek_end(0); + uint64_t pos = log.tellp(); + write_header(header); + write_payload(log); + uint64_t end = log.tellp(); + EOS_ASSERT(end == pos + state_history_log_header_serial_size + header.payload_size, chain::plugin_exception, + "wrote payload with incorrect size to ${name}.log", ("name", name)); + log.write((char*)&pos, sizeof(pos)); - index.seek_end(0); - index.write((char*)&pos, sizeof(pos)); - if (_begin_block == _end_block) - _begin_block = block_num; - _end_block = block_num + 1; - last_block_id = header.block_id; - } - catch(const chain::plugin_exception& e) { - elog( "chain::plugin_exception: ${details}", ("details", e.to_detail_string()) ); - // Both app().quit() and exception throwing are required. Without app().quit(), - // the exception would be caught and drop before reaching main(). The exception is - // to ensure the block won't be commited. - appbase::app().quit(); - EOS_THROW( - chain::state_history_write_exception, - "State history encountered an Error which it cannot recover from. Please resolve the error and relaunch " - "the process"); - } + index.seek_end(0); + index.write((char*)&pos, sizeof(pos)); + if (_begin_block == _end_block) + _begin_block = block_num; + _end_block = block_num + 1; + last_block_id = header.block_id; } // returns cfile positioned at payload @@ -210,8 +197,8 @@ class state_history_log { } void open_log() { - log.set_file_path( log_filename ); - log.open( "a+b" ); // std::ios_base::binary | std::ios_base::in | std::ios_base::out | std::ios_base::app + log.set_file_path(log_filename); + log.open("a+b"); // std::ios_base::binary | std::ios_base::in | std::ios_base::out | std::ios_base::app log.seek_end(0); uint64_t size = log.tellp(); if (size >= state_history_log_header_serial_size) { @@ -233,14 +220,14 @@ class state_history_log { } void open_index() { - index.set_file_path( index_filename ); - index.open( "a+b" ); // std::ios_base::binary | std::ios_base::in | std::ios_base::out | std::ios_base::app + index.set_file_path(index_filename); + index.open("a+b"); // std::ios_base::binary | std::ios_base::in | std::ios_base::out | std::ios_base::app index.seek_end(0); if (index.tellp() == (static_cast(_end_block) - _begin_block) * sizeof(uint64_t)) return; ilog("Regenerate ${name}.index", ("name", name)); index.close(); - index.open( "w+b" ); // std::ios_base::binary | std::ios_base::in | std::ios_base::out | std::ios_base::trunc + index.open("w+b"); // std::ios_base::binary | std::ios_base::in | std::ios_base::out | std::ios_base::trunc log.seek_end(0); uint64_t size = log.tellp(); diff --git a/plugins/state_history_plugin/include/eosio/state_history_plugin/state_history_serialization.hpp b/libraries/state_history/include/eosio/state_history/serialization.hpp similarity index 93% rename from plugins/state_history_plugin/include/eosio/state_history_plugin/state_history_serialization.hpp rename to libraries/state_history/include/eosio/state_history/serialization.hpp index bbde064e093..2a598c06a3c 100644 --- a/plugins/state_history_plugin/include/eosio/state_history_plugin/state_history_serialization.hpp +++ b/libraries/state_history/include/eosio/state_history/serialization.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -11,8 +12,7 @@ #include #include #include -#include -#include +#include #include @@ -35,8 +35,8 @@ struct history_context_wrapper { }; template -history_context_wrapper, std::decay_t> -make_history_context_wrapper(const chainbase::database& db, const P& context, const T& obj) { +history_context_wrapper, std::decay_t> make_history_context_wrapper(const chainbase::database& db, + const P& context, const T& obj) { return {db, context, obj}; } @@ -77,7 +77,7 @@ datastream& history_context_serialize_container(datastream& ds, const ch } template -datastream& operator<<(datastream& ds, const history_serial_big_vector_wrapper& obj) { +datastream& operator<<(datastream& ds, const eosio::state_history::big_vector_wrapper& obj) { FC_ASSERT(obj.obj.size() <= 1024 * 1024 * 1024); fc::raw::pack(ds, unsigned_int((uint32_t)obj.obj.size())); for (auto& x : obj.obj) @@ -258,34 +258,34 @@ operator<<(datastream& } template -datastream& operator<<( - datastream& ds, - const history_context_wrapper& obj) { +datastream& +operator<<(datastream& ds, + const history_context_wrapper& obj) { return serialize_secondary_index(ds, obj.context, obj.obj); } template -datastream& operator<<(datastream& ds, +datastream& operator<<(datastream& ds, const history_serial_wrapper& obj) { fc::raw::pack(ds, as_type(obj.obj.threshold)); history_serialize_container(ds, obj.db, as_type>(obj.obj.keys)); - } template -datastream& operator<<(datastream& ds, const history_serial_wrapper& obj) { +datastream& operator<<(datastream& ds, + const history_serial_wrapper& obj) { fc::raw::pack(ds, as_type(obj.obj.producer_name.to_uint64_t())); fc::raw::pack(ds, as_type(obj.obj.authority)); return ds; } template -datastream& operator<<(datastream& ds, +datastream& operator<<(datastream& ds, const history_serial_wrapper& obj) { fc::raw::pack(ds, as_type(obj.obj.version)); - history_serialize_container(ds, obj.db, - as_type>(obj.obj.producers)); + history_serialize_container( + ds, obj.db, as_type>(obj.obj.producers)); return ds; } @@ -525,7 +525,7 @@ datastream& operator<<(datastream& ds, const history_serial_wrapper(obj.obj.account.to_uint64_t())); fc::raw::pack(ds, as_type(obj.obj.name.to_uint64_t())); history_serialize_container(ds, obj.db, as_type>(obj.obj.authorization)); - fc::raw::pack(ds, as_type(obj.obj.data)); + fc::raw::pack(ds, as_type(obj.obj.data)); return ds; } @@ -536,7 +536,7 @@ datastream& operator<<(datastream& ds, const history_serial_wrapper(obj.obj.act_digest)); fc::raw::pack(ds, as_type(obj.obj.global_sequence)); fc::raw::pack(ds, as_type(obj.obj.recv_sequence)); - history_serialize_container(ds, obj.db, as_type>(obj.obj.auth_sequence)); + history_serialize_container(ds, obj.db, as_type>(obj.obj.auth_sequence)); fc::raw::pack(ds, as_type(obj.obj.code_sequence)); fc::raw::pack(ds, as_type(obj.obj.abi_sequence)); return ds; @@ -549,10 +549,11 @@ datastream& operator<<(datastream& ds, const history_serial_wrapper cap_error_code( const fc::optional& error_code ) { +inline fc::optional cap_error_code(const fc::optional& error_code) { fc::optional result; - if (!error_code) return result; + if (!error_code) + return result; const uint64_t upper_limit = static_cast(eosio::chain::system_error_code::generic_system_error); @@ -567,7 +568,7 @@ inline fc::optional cap_error_code( const fc::optional& erro template datastream& operator<<(datastream& ds, const history_context_wrapper& obj) { - bool debug_mode = obj.context; + bool debug_mode = obj.context; fc::raw::pack(ds, fc::unsigned_int(0)); fc::raw::pack(ds, as_type(obj.obj.action_ordinal)); fc::raw::pack(ds, as_type(obj.obj.creator_action_ordinal)); @@ -593,17 +594,17 @@ datastream& operator<<(datastream& ds, const history_context_wrapper>(e)); - fc::raw::pack(ds, as_type>(debug_mode ? obj.obj.error_code - : cap_error_code(obj.obj.error_code))); + fc::raw::pack(ds, + as_type>(debug_mode ? obj.obj.error_code : cap_error_code(obj.obj.error_code))); return ds; } template -datastream& operator<<(datastream& ds, - const history_context_wrapper, - eosio::augmented_transaction_trace>& obj) { - auto& trace = *obj.obj.trace; +datastream& operator<<( + datastream& ds, + const history_context_wrapper, eosio::state_history::augmented_transaction_trace>& obj) { + auto& trace = *obj.obj.trace; bool debug_mode = obj.context.second; fc::raw::pack(ds, fc::unsigned_int(0)); fc::raw::pack(ds, as_type(trace.id)); @@ -639,8 +640,7 @@ datastream& operator<<(datastream& e = "Y"; } fc::raw::pack(ds, as_type>(e)); - fc::raw::pack(ds, as_type>(debug_mode ? trace.error_code - : cap_error_code(trace.error_code))); + fc::raw::pack(ds, as_type>(debug_mode ? trace.error_code : cap_error_code(trace.error_code))); fc::raw::pack(ds, bool(trace.failed_dtrx_trace)); if (trace.failed_dtrx_trace) { @@ -650,7 +650,8 @@ datastream& operator<<(datastream& std::pair context = std::make_pair(stat, debug_mode); fc::raw::pack( // ds, make_history_context_wrapper( - obj.db, context, eosio::augmented_transaction_trace{trace.failed_dtrx_trace, obj.obj.partial})); + obj.db, context, + eosio::state_history::augmented_transaction_trace{trace.failed_dtrx_trace, obj.obj.partial})); } bool include_partial = obj.obj.partial && !trace.failed_dtrx_trace; @@ -666,22 +667,23 @@ datastream& operator<<(datastream& fc::raw::pack(ds, as_type(partial.delay_sec)); fc::raw::pack(ds, as_type(partial.transaction_extensions)); fc::raw::pack(ds, as_type>(partial.signatures)); - fc::raw::pack(ds, as_type>(partial.context_free_data)); + fc::raw::pack(ds, as_type>(partial.context_free_data)); } return ds; } template -datastream& operator<<(datastream& ds, - const history_context_wrapper& obj) { +datastream& +operator<<(datastream& ds, + const history_context_wrapper& obj) { std::pair context = std::make_pair(eosio::chain::transaction_receipt_header::hard_fail, obj.context); ds << make_history_context_wrapper(obj.db, context, obj.obj); return ds; } template -datastream& operator<<(datastream& ds, const eosio::get_blocks_result_v0& obj) { +datastream& operator<<(datastream& ds, const eosio::state_history::get_blocks_result_v0& obj) { fc::raw::pack(ds, obj.head); fc::raw::pack(ds, obj.last_irreversible); fc::raw::pack(ds, obj.this_block); diff --git a/libraries/state_history/include/eosio/state_history/trace_converter.hpp b/libraries/state_history/include/eosio/state_history/trace_converter.hpp new file mode 100644 index 00000000000..7ae614ee2a6 --- /dev/null +++ b/libraries/state_history/include/eosio/state_history/trace_converter.hpp @@ -0,0 +1,21 @@ +#pragma once + +#include +#include + +namespace eosio { +namespace state_history { + +using chain::block_state_ptr; +using chain::transaction_id_type; + +struct trace_converter { + std::map cached_traces; + fc::optional onblock_trace; + + void add_transaction(const transaction_trace_ptr& trace, const signed_transaction& transaction); + bytes pack(const chainbase::database& db, bool trace_debug_mode, const block_state_ptr& block_state); +}; + +} // namespace state_history +} // namespace eosio diff --git a/libraries/state_history/include/eosio/state_history/types.hpp b/libraries/state_history/include/eosio/state_history/types.hpp new file mode 100644 index 00000000000..7c7c080a49d --- /dev/null +++ b/libraries/state_history/include/eosio/state_history/types.hpp @@ -0,0 +1,127 @@ +#pragma once + +#include + +namespace eosio { +namespace state_history { + +using chain::block_id_type; +using chain::bytes; +using chain::extensions_type; +using chain::signature_type; +using chain::signed_transaction; +using chain::transaction_trace_ptr; + +template +struct big_vector_wrapper { + T obj; +}; + +struct partial_transaction { + fc::time_point_sec expiration = {}; + uint16_t ref_block_num = {}; + uint32_t ref_block_prefix = {}; + fc::unsigned_int max_net_usage_words = {}; + uint8_t max_cpu_usage_ms = {}; + fc::unsigned_int delay_sec = {}; + extensions_type transaction_extensions = {}; + std::vector signatures = {}; + std::vector context_free_data = {}; + + partial_transaction(const signed_transaction& t) + : expiration(t.expiration) + , ref_block_num(t.ref_block_num) + , ref_block_prefix(t.ref_block_prefix) + , max_net_usage_words(t.max_net_usage_words) + , max_cpu_usage_ms(t.max_cpu_usage_ms) + , delay_sec(t.delay_sec) + , transaction_extensions(t.transaction_extensions) + , signatures(t.signatures) + , context_free_data(t.context_free_data) {} +}; + +struct augmented_transaction_trace { + transaction_trace_ptr trace; + std::shared_ptr partial; + + augmented_transaction_trace() = default; + augmented_transaction_trace(const augmented_transaction_trace&) = default; + augmented_transaction_trace(augmented_transaction_trace&&) = default; + + augmented_transaction_trace(const transaction_trace_ptr& trace) + : trace{trace} {} + + augmented_transaction_trace(const transaction_trace_ptr& trace, const std::shared_ptr& partial) + : trace{trace} + , partial{partial} {} + + augmented_transaction_trace(const transaction_trace_ptr& trace, const signed_transaction& t) + : trace{trace} + , partial{std::make_shared(t)} {} + + augmented_transaction_trace& operator=(const augmented_transaction_trace&) = default; + augmented_transaction_trace& operator=(augmented_transaction_trace&&) = default; +}; + +struct table_delta { + fc::unsigned_int struct_version = 0; + std::string name{}; + state_history::big_vector_wrapper>> rows{}; +}; + +struct block_position { + uint32_t block_num = 0; + block_id_type block_id = {}; +}; + +struct get_status_request_v0 {}; + +struct get_status_result_v0 { + block_position head = {}; + block_position last_irreversible = {}; + uint32_t trace_begin_block = 0; + uint32_t trace_end_block = 0; + uint32_t chain_state_begin_block = 0; + uint32_t chain_state_end_block = 0; + fc::sha256 chain_id = {}; +}; + +struct get_blocks_request_v0 { + uint32_t start_block_num = 0; + uint32_t end_block_num = 0; + uint32_t max_messages_in_flight = 0; + std::vector have_positions = {}; + bool irreversible_only = false; + bool fetch_block = false; + bool fetch_traces = false; + bool fetch_deltas = false; +}; + +struct get_blocks_ack_request_v0 { + uint32_t num_messages = 0; +}; + +struct get_blocks_result_v0 { + block_position head; + block_position last_irreversible; + fc::optional this_block; + fc::optional prev_block; + fc::optional block; + fc::optional traces; + fc::optional deltas; +}; + +using state_request = fc::static_variant; +using state_result = fc::static_variant; + +} // namespace state_history +} // namespace eosio + +// clang-format off +FC_REFLECT(eosio::state_history::table_delta, (struct_version)(name)(rows)); +FC_REFLECT(eosio::state_history::block_position, (block_num)(block_id)); +FC_REFLECT_EMPTY(eosio::state_history::get_status_request_v0); +FC_REFLECT(eosio::state_history::get_status_result_v0, (head)(last_irreversible)(trace_begin_block)(trace_end_block)(chain_state_begin_block)(chain_state_end_block)(chain_id)); +FC_REFLECT(eosio::state_history::get_blocks_request_v0, (start_block_num)(end_block_num)(max_messages_in_flight)(have_positions)(irreversible_only)(fetch_block)(fetch_traces)(fetch_deltas)); +FC_REFLECT(eosio::state_history::get_blocks_ack_request_v0, (num_messages)); +// clang-format on diff --git a/libraries/state_history/trace_converter.cpp b/libraries/state_history/trace_converter.cpp new file mode 100644 index 00000000000..460f94dec60 --- /dev/null +++ b/libraries/state_history/trace_converter.cpp @@ -0,0 +1,43 @@ +#include +#include + +namespace eosio { +namespace state_history { + +using eosio::chain::packed_transaction; +using eosio::chain::plugin_exception; + +void trace_converter::add_transaction(const transaction_trace_ptr& trace, const signed_transaction& transaction) { + if (trace->receipt) { + if (chain::is_onblock(*trace)) + onblock_trace.emplace(trace, transaction); + else if (trace->failed_dtrx_trace) + cached_traces[trace->failed_dtrx_trace->id] = augmented_transaction_trace{trace, transaction}; + else + cached_traces[trace->id] = augmented_transaction_trace{trace, transaction}; + } +} + +bytes trace_converter::pack(const chainbase::database& db, bool trace_debug_mode, const block_state_ptr& block_state) { + std::vector traces; + if (onblock_trace) + traces.push_back(*onblock_trace); + for (auto& r : block_state->block->transactions) { + transaction_id_type id; + if (r.trx.contains()) + id = r.trx.get(); + else + id = r.trx.get().id(); + auto it = cached_traces.find(id); + EOS_ASSERT(it != cached_traces.end() && it->second.trace->receipt, plugin_exception, + "missing trace for transaction ${id}", ("id", id)); + traces.push_back(it->second); + } + cached_traces.clear(); + onblock_trace.reset(); + + return fc::raw::pack(make_history_context_wrapper(db, trace_debug_mode, traces)); +} + +} // namespace state_history +} // namespace eosio diff --git a/package.json b/package.json new file mode 100644 index 00000000000..7eed3e23fba --- /dev/null +++ b/package.json @@ -0,0 +1,11 @@ +{ + "name": "eosio", + "version": "1.0.0", + "dependencies": { + "eosjs": "20.0.0", + "ws": "7.2.0", + "commander": "4.0.1", + "zlib": "1.0.5", + "node-fetch": "2.6.0" + } +} diff --git a/plugins/state_history_plugin/CMakeLists.txt b/plugins/state_history_plugin/CMakeLists.txt index 21f0e10a900..8a72f9d12f3 100644 --- a/plugins/state_history_plugin/CMakeLists.txt +++ b/plugins/state_history_plugin/CMakeLists.txt @@ -1,8 +1,7 @@ file(GLOB HEADERS "include/eosio/state_history_plugin/*.hpp") add_library( state_history_plugin state_history_plugin.cpp - state_history_plugin_abi.cpp ${HEADERS} ) -target_link_libraries( state_history_plugin chain_plugin eosio_chain appbase ) +target_link_libraries( state_history_plugin state_history chain_plugin eosio_chain appbase ) target_include_directories( state_history_plugin PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" ) diff --git a/plugins/state_history_plugin/include/eosio/state_history_plugin/state_history_plugin.hpp b/plugins/state_history_plugin/include/eosio/state_history_plugin/state_history_plugin.hpp index 66f8de30b53..29e4035019c 100644 --- a/plugins/state_history_plugin/include/eosio/state_history_plugin/state_history_plugin.hpp +++ b/plugins/state_history_plugin/include/eosio/state_history_plugin/state_history_plugin.hpp @@ -2,11 +2,7 @@ #include #include - -template -struct history_serial_big_vector_wrapper { - T obj; -}; +#include namespace fc { class variant; @@ -18,103 +14,6 @@ using std::shared_ptr; typedef shared_ptr state_history_ptr; -struct partial_transaction { - chain::time_point_sec expiration = {}; - uint16_t ref_block_num = {}; - uint32_t ref_block_prefix = {}; - fc::unsigned_int max_net_usage_words = {}; - uint8_t max_cpu_usage_ms = {}; - fc::unsigned_int delay_sec = {}; - chain::extensions_type transaction_extensions = {}; - vector signatures = {}; - vector context_free_data = {}; - - partial_transaction(const chain::signed_transaction& t) - : expiration(t.expiration) - , ref_block_num(t.ref_block_num) - , ref_block_prefix(t.ref_block_prefix) - , max_net_usage_words(t.max_net_usage_words) - , max_cpu_usage_ms(t.max_cpu_usage_ms) - , delay_sec(t.delay_sec) - , transaction_extensions(t.transaction_extensions) - , signatures(t.signatures) - , context_free_data(t.context_free_data) {} -}; - -struct augmented_transaction_trace { - chain::transaction_trace_ptr trace; - std::shared_ptr partial; - - augmented_transaction_trace() = default; - augmented_transaction_trace(const augmented_transaction_trace&) = default; - augmented_transaction_trace(augmented_transaction_trace&&) = default; - - augmented_transaction_trace(const chain::transaction_trace_ptr& trace) - : trace{trace} {} - - augmented_transaction_trace(const chain::transaction_trace_ptr& trace, - const std::shared_ptr& partial) - : trace{trace} - , partial{partial} {} - - augmented_transaction_trace(const chain::transaction_trace_ptr& trace, const chain::signed_transaction& t) - : trace{trace} - , partial{std::make_shared(t)} {} - - augmented_transaction_trace& operator=(const augmented_transaction_trace&) = default; - augmented_transaction_trace& operator=(augmented_transaction_trace&&) = default; -}; - -struct table_delta { - fc::unsigned_int struct_version = 0; - std::string name{}; - history_serial_big_vector_wrapper>> rows{}; -}; - -struct block_position { - uint32_t block_num = 0; - chain::block_id_type block_id = {}; -}; - -struct get_status_request_v0 {}; - -struct get_status_result_v0 { - block_position head = {}; - block_position last_irreversible = {}; - uint32_t trace_begin_block = 0; - uint32_t trace_end_block = 0; - uint32_t chain_state_begin_block = 0; - uint32_t chain_state_end_block = 0; -}; - -struct get_blocks_request_v0 { - uint32_t start_block_num = 0; - uint32_t end_block_num = 0; - uint32_t max_messages_in_flight = 0; - std::vector have_positions = {}; - bool irreversible_only = false; - bool fetch_block = false; - bool fetch_traces = false; - bool fetch_deltas = false; -}; - -struct get_blocks_ack_request_v0 { - uint32_t num_messages = 0; -}; - -struct get_blocks_result_v0 { - block_position head; - block_position last_irreversible; - fc::optional this_block; - fc::optional prev_block; - fc::optional block; - fc::optional traces; - fc::optional deltas; -}; - -using state_request = fc::static_variant; -using state_result = fc::static_variant; - class state_history_plugin : public plugin { public: APPBASE_PLUGIN_REQUIRES((chain_plugin)) @@ -133,12 +32,3 @@ class state_history_plugin : public plugin { }; } // namespace eosio - -// clang-format off -FC_REFLECT(eosio::table_delta, (struct_version)(name)(rows)); -FC_REFLECT(eosio::block_position, (block_num)(block_id)); -FC_REFLECT_EMPTY(eosio::get_status_request_v0); -FC_REFLECT(eosio::get_status_result_v0, (head)(last_irreversible)(trace_begin_block)(trace_end_block)(chain_state_begin_block)(chain_state_end_block)); -FC_REFLECT(eosio::get_blocks_request_v0, (start_block_num)(end_block_num)(max_messages_in_flight)(have_positions)(irreversible_only)(fetch_block)(fetch_traces)(fetch_deltas)); -FC_REFLECT(eosio::get_blocks_ack_request_v0, (num_messages)); -// clang-format on diff --git a/plugins/state_history_plugin/state_history_plugin.cpp b/plugins/state_history_plugin/state_history_plugin.cpp index 3c892b2f041..e0558548f3d 100644 --- a/plugins/state_history_plugin/state_history_plugin.cpp +++ b/plugins/state_history_plugin/state_history_plugin.cpp @@ -1,6 +1,10 @@ #include -#include -#include +#include +#include +#include +#include +#include +#include #include #include @@ -8,9 +12,6 @@ #include #include #include -#include -#include -#include #include using tcp = boost::asio::ip::tcp; @@ -20,6 +21,7 @@ extern const char* const state_history_plugin_abi; namespace eosio { using namespace chain; +using namespace state_history; using boost::signals2::scoped_connection; static appbase::abstract_plugin& _state_history_plugin = app().register_plugin(); @@ -37,93 +39,19 @@ auto catch_and_log(F f) { } } -namespace bio = boost::iostreams; -static bytes zlib_compress_bytes(bytes in) { - bytes out; - bio::filtering_ostream comp; - comp.push(bio::zlib_compressor(bio::zlib::default_compression)); - comp.push(bio::back_inserter(out)); - bio::write(comp, in.data(), in.size()); - bio::close(comp); - return out; -} - -static bytes zlib_decompress(const bytes& in) { - bytes out; - bio::filtering_ostream decomp; - decomp.push(bio::zlib_decompressor()); - decomp.push(bio::back_inserter(out)); - bio::write(decomp, in.data(), in.size()); - bio::close(decomp); - return out; -} - -template -bool include_delta(const T& old, const T& curr) { - return true; -} - -bool include_delta(const eosio::chain::table_id_object& old, const eosio::chain::table_id_object& curr) { - return old.payer != curr.payer; -} - -bool include_delta(const eosio::chain::resource_limits::resource_limits_object& old, - const eosio::chain::resource_limits::resource_limits_object& curr) { - return // - old.net_weight != curr.net_weight || // - old.cpu_weight != curr.cpu_weight || // - old.ram_bytes != curr.ram_bytes; -} - -bool include_delta(const eosio::chain::resource_limits::resource_limits_state_object& old, - const eosio::chain::resource_limits::resource_limits_state_object& curr) { - return // - old.average_block_net_usage.last_ordinal != curr.average_block_net_usage.last_ordinal || // - old.average_block_net_usage.value_ex != curr.average_block_net_usage.value_ex || // - old.average_block_net_usage.consumed != curr.average_block_net_usage.consumed || // - old.average_block_cpu_usage.last_ordinal != curr.average_block_cpu_usage.last_ordinal || // - old.average_block_cpu_usage.value_ex != curr.average_block_cpu_usage.value_ex || // - old.average_block_cpu_usage.consumed != curr.average_block_cpu_usage.consumed || // - old.total_net_weight != curr.total_net_weight || // - old.total_cpu_weight != curr.total_cpu_weight || // - old.total_ram_bytes != curr.total_ram_bytes || // - old.virtual_net_limit != curr.virtual_net_limit || // - old.virtual_cpu_limit != curr.virtual_cpu_limit; -} - -bool include_delta(const eosio::chain::account_metadata_object& old, - const eosio::chain::account_metadata_object& curr) { - return // - old.name != curr.name || // - old.is_privileged() != curr.is_privileged() || // - old.last_code_update != curr.last_code_update || // - old.vm_type != curr.vm_type || // - old.vm_version != curr.vm_version || // - old.code_hash != curr.code_hash; -} - -bool include_delta(const eosio::chain::code_object& old, const eosio::chain::code_object& curr) { // - return false; -} - -bool include_delta(const eosio::chain::protocol_state_object& old, const eosio::chain::protocol_state_object& curr) { - return old.activated_protocol_features != curr.activated_protocol_features; -} - struct state_history_plugin_impl : std::enable_shared_from_this { - chain_plugin* chain_plug = nullptr; - fc::optional trace_log; - fc::optional chain_state_log; - bool trace_debug_mode = false; - bool stopping = false; - fc::optional applied_transaction_connection; - fc::optional block_start_connection; - fc::optional accepted_block_connection; - string endpoint_address = "0.0.0.0"; - uint16_t endpoint_port = 8080; - std::unique_ptr acceptor; - std::map cached_traces; - fc::optional onblock_trace; + chain_plugin* chain_plug = nullptr; + fc::optional trace_log; + fc::optional chain_state_log; + bool trace_debug_mode = false; + bool stopping = false; + fc::optional applied_transaction_connection; + fc::optional block_start_connection; + fc::optional accepted_block_connection; + string endpoint_address = "0.0.0.0"; + uint16_t endpoint_port = 8080; + std::unique_ptr acceptor; + state_history::trace_converter trace_converter; void get_log_entry(state_history_log& log, uint32_t block_num, fc::optional& result) { if (block_num < log.begin_block() || block_num >= log.end_block()) @@ -135,7 +63,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this& result) { @@ -225,8 +153,8 @@ struct state_history_plugin_impl : std::enable_shared_from_thisbinary(sent_abi); sent_abi = true; - socket_stream->async_write( // - boost::asio::buffer(send_queue[0]), + socket_stream->async_write( // + boost::asio::buffer(send_queue[0]), // [self = shared_from_this()](boost::system::error_code ec, size_t) { self->callback(ec, "async_write", [self] { self->send_queue.erase(self->send_queue.begin()); @@ -242,6 +170,7 @@ struct state_history_plugin_impl : std::enable_shared_from_thistrace_log) { result.trace_begin_block = plugin->trace_log->begin_block(); result.trace_end_block = plugin->trace_log->end_block(); @@ -277,10 +206,10 @@ struct state_history_plugin_impl : std::enable_shared_from_thismax_messages_in_flight) return; - auto& chain = plugin->chain_plug->chain(); + auto& chain = plugin->chain_plug->chain(); result.last_irreversible = {chain.last_irreversible_block_num(), chain.last_irreversible_block_id()}; uint32_t current = - current_request->irreversible_only ? result.last_irreversible.block_num : result.head.block_num; + current_request->irreversible_only ? result.last_irreversible.block_num : result.head.block_num; if (current_request->start_block_num <= current && current_request->start_block_num < current_request->end_block_num) { auto block_id = plugin->get_block_id(current_request->start_block_num); @@ -319,7 +248,7 @@ struct state_history_plugin_impl : std::enable_shared_from_thismax_messages_in_flight) return; - auto& chain = plugin->chain_plug->chain(); + auto& chain = plugin->chain_plug->chain(); get_blocks_result_v0 result; result.head = {chain.head_block_num(), chain.head_block_id()}; send_update(std::move(result)); @@ -343,13 +272,13 @@ struct state_history_plugin_impl : std::enable_shared_from_this void callback(boost::system::error_code ec, const char* what, F f) { - app().post( priority::medium, [=]() { - if( plugin->stopping ) + app().post(priority::medium, [=]() { + if (plugin->stopping) return; - if( ec ) - return on_fail( ec, what ); - catch_and_close( f ); - } ); + if (ec) + return on_fail(ec, what); + catch_and_close(f); + }); } void on_fail(boost::system::error_code ec, const char* what) { @@ -412,19 +341,26 @@ struct state_history_plugin_impl : std::enable_shared_from_thisreceipt && trace_log) { - if (chain::is_onblock(*p)) - onblock_trace.emplace(p, t); - else if (p->failed_dtrx_trace) - cached_traces[p->failed_dtrx_trace->id] = augmented_transaction_trace{p, t}; - else - cached_traces[p->id] = augmented_transaction_trace{p, t}; - } + if (trace_log) + trace_converter.add_transaction(p, t); } void on_accepted_block(const block_state_ptr& block_state) { - store_traces(block_state); - store_chain_state(block_state); + try { + store_traces(block_state); + store_chain_state(block_state); + } catch (const fc::exception& e) { + elog("fc::exception: ${details}", ("details", e.to_detail_string())); + // Both app().quit() and exception throwing are required. Without app().quit(), + // the exception would be caught and drop before reaching main(). The exception is + // to ensure the block won't be commited. + appbase::app().quit(); + EOS_THROW( + chain::state_history_write_exception, + "State history encountered an Error which it cannot recover from. Please resolve the error and relaunch " + "the process"); + } + for (auto& s : sessions) { auto& p = s.second; if (p) { @@ -435,36 +371,19 @@ struct state_history_plugin_impl : std::enable_shared_from_this traces; - if (onblock_trace) - traces.push_back(*onblock_trace); - for (auto& r : block_state->block->transactions) { - transaction_id_type id; - if (r.trx.contains()) - id = r.trx.get(); - else - id = r.trx.get().id(); - auto it = cached_traces.find(id); - EOS_ASSERT(it != cached_traces.end() && it->second.trace->receipt, plugin_exception, - "missing trace for transaction ${id}", ("id", id)); - traces.push_back(it->second); - } - clear_caches(); + auto traces_bin = state_history::zlib_compress_bytes( + trace_converter.pack(chain_plug->chain().db(), trace_debug_mode, block_state)); - auto& db = chain_plug->chain().db(); - auto traces_bin = zlib_compress_bytes(fc::raw::pack(make_history_context_wrapper(db, trace_debug_mode, traces))); EOS_ASSERT(traces_bin.size() == (uint32_t)traces_bin.size(), plugin_exception, "traces is too big"); state_history_log_header header{.magic = ship_magic(ship_current_version), @@ -485,86 +404,8 @@ struct state_history_plugin_impl : std::enable_shared_from_thisblock->block_num())); - std::vector deltas; - auto& db = chain_plug->chain().db(); - - const auto& table_id_index = db.get_index(); - std::map removed_table_id; - for (auto& rem : table_id_index.stack().back().removed_values) - removed_table_id[rem.first._id] = &rem.second; - - auto get_table_id = [&](uint64_t tid) -> const table_id_object& { - auto obj = table_id_index.find(tid); - if (obj) - return *obj; - auto it = removed_table_id.find(tid); - EOS_ASSERT(it != removed_table_id.end(), chain::plugin_exception, "can not found table id ${tid}", - ("tid", tid)); - return *it->second; - }; - - auto pack_row = [&](auto& row) { return fc::raw::pack(make_history_serial_wrapper(db, row)); }; - auto pack_contract_row = [&](auto& row) { - return fc::raw::pack(make_history_context_wrapper(db, get_table_id(row.t_id._id), row)); - }; - - auto process_table = [&](auto* name, auto& index, auto& pack_row) { - if (fresh) { - if (index.indices().empty()) - return; - deltas.push_back({}); - auto& delta = deltas.back(); - delta.name = name; - for (auto& row : index.indices()) - delta.rows.obj.emplace_back(true, pack_row(row)); - } else { - if (index.stack().empty()) - return; - auto& undo = index.stack().back(); - if (undo.old_values.empty() && undo.new_ids.empty() && undo.removed_values.empty()) - return; - deltas.push_back({}); - auto& delta = deltas.back(); - delta.name = name; - for (auto& old : undo.old_values) { - auto& row = index.get(old.first); - if (include_delta(old.second, row)) - delta.rows.obj.emplace_back(true, pack_row(row)); - } - for (auto& old : undo.removed_values) - delta.rows.obj.emplace_back(false, pack_row(old.second)); - for (auto id : undo.new_ids) { - auto& row = index.get(id); - delta.rows.obj.emplace_back(true, pack_row(row)); - } - } - }; - - process_table("account", db.get_index(), pack_row); - process_table("account_metadata", db.get_index(), pack_row); - process_table("code", db.get_index(), pack_row); - - process_table("contract_table", db.get_index(), pack_row); - process_table("contract_row", db.get_index(), pack_contract_row); - process_table("contract_index64", db.get_index(), pack_contract_row); - process_table("contract_index128", db.get_index(), pack_contract_row); - process_table("contract_index256", db.get_index(), pack_contract_row); - process_table("contract_index_double", db.get_index(), pack_contract_row); - process_table("contract_index_long_double", db.get_index(), pack_contract_row); - - process_table("global_property", db.get_index(), pack_row); - process_table("generated_transaction", db.get_index(), pack_row); - process_table("protocol_state", db.get_index(), pack_row); - - process_table("permission", db.get_index(), pack_row); - process_table("permission_link", db.get_index(), pack_row); - - process_table("resource_limits", db.get_index(), pack_row); - process_table("resource_usage", db.get_index(), pack_row); - process_table("resource_limits_state", db.get_index(), pack_row); - process_table("resource_limits_config", db.get_index(), pack_row); - - auto deltas_bin = zlib_compress_bytes(fc::raw::pack(deltas)); + std::vector deltas = state_history::create_deltas(chain_plug->chain().db(), fresh); + auto deltas_bin = state_history::zlib_compress_bytes(fc::raw::pack(deltas)); EOS_ASSERT(deltas_bin.size() == (uint32_t)deltas_bin.size(), plugin_exception, "deltas is too big"); state_history_log_header header{.magic = ship_magic(ship_current_version), .block_id = block_state->block->id(), @@ -593,8 +434,7 @@ void state_history_plugin::set_program_options(options_description& cli, options options("state-history-endpoint", bpo::value()->default_value("127.0.0.1:8080"), "the endpoint upon which to listen for incoming connections. Caution: only expose this port to " "your internal network."); - options("trace-history-debug-mode", bpo::bool_switch()->default_value(false), - "enable debug mode for trace history"); + options("trace-history-debug-mode", bpo::bool_switch()->default_value(false), "enable debug mode for trace history"); } void state_history_plugin::plugin_initialize(const variables_map& options) { diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index e62d5718d31..19eaa9900fd 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -1,4 +1,4 @@ - +find_program( NODE_FOUND node ) find_package( Gperftools QUIET ) if( GPERFTOOLS_FOUND ) @@ -54,6 +54,8 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/full-version-label.sh ${CMAKE_CURRENT configure_file(${CMAKE_CURRENT_SOURCE_DIR}/print-build-info.sh ${CMAKE_CURRENT_BINARY_DIR}/print-build-info.sh COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/nodeos_producer_watermark_test.py ${CMAKE_CURRENT_BINARY_DIR}/nodeos_producer_watermark_test.py COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/cli_test.py ${CMAKE_CURRENT_BINARY_DIR}/cli_test.py COPYONLY) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_test.py COPYONLY) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_client.js ${CMAKE_CURRENT_BINARY_DIR}/ship_client.js COPYONLY) #To run plugin_test with all log from blockchain displayed, put --verbose after --, i.e. plugin_test -- --verbose add_test(NAME plugin_test COMMAND plugin_test --report_level=detailed --color_output) @@ -65,6 +67,11 @@ set_property(TEST nodeos_run_test PROPERTY LABELS nonparallelizable_tests) add_test(NAME block_log_util_test COMMAND tests/block_log_util_test.py -v --clean-run --dump-error-detail WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) set_property(TEST block_log_util_test PROPERTY LABELS nonparallelizable_tests) +if(NODE_FOUND) + add_test(NAME ship_test COMMAND tests/ship_test.py -v --num-clients 1 --num-requests 5000 --clean-run --dump-error-detail WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) + set_property(TEST ship_test PROPERTY LABELS nonparallelizable_tests) +endif(NODE_FOUND) + add_test(NAME p2p_dawn515_test COMMAND tests/p2p_tests/dawn_515/test.sh WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) set_property(TEST p2p_dawn515_test PROPERTY LABELS nonparallelizable_tests) if(BUILD_MONGO_DB_PLUGIN) diff --git a/tests/ship_client.js b/tests/ship_client.js new file mode 100644 index 00000000000..ff3a522bf96 --- /dev/null +++ b/tests/ship_client.js @@ -0,0 +1,154 @@ +const { TextDecoder, TextEncoder } = require('util'); +const { Serialize } = require('eosjs'); +const commander = require('commander'); +const fetch = require('node-fetch'); +const zlib = require('zlib'); +const WebSocket = require('ws'); + +class Connection { + // Connect to the State-History Plugin + constructor(socketAddress, numRequests) { + // Protocol ABI + this.abi = null; + + // Types in the protocol ABI + this.types = null; + + // Tables in the protocol ABI + this.tables = new Map; + + this.startRequestArray = true; + this.endRequestArray = false; + this.numRequests = numRequests; + this.blockNumChangeTime = null; + this.createTime = Date.now(); + + // The socket + var status = { "status" : "construct" } + status["time"] = Date.now() + console.error('['); + console.error(JSON.stringify(status, null, 4)); + this.ws = new WebSocket(socketAddress, { perMessageDeflate: false }); + this.ws.on('message', data => this.onMessage(data)); + } + + // Convert JSON to binary. type identifies one of the types in this.types. + serialize(type, value) { + const buffer = new Serialize.SerialBuffer({ textEncoder: new TextEncoder, textDecoder: new TextDecoder }); + Serialize.getType(this.types, type).serialize(buffer, value); + return buffer.asUint8Array(); + } + + // Convert binary to JSON. type identifies one of the types in this.types. + deserialize(type, array) { + const buffer = new Serialize.SerialBuffer({ textEncoder: new TextEncoder, textDecoder: new TextDecoder, array }); + return Serialize.getType(this.types, type).deserialize(buffer, new Serialize.SerializerState({ bytesAsUint8Array: true })); + } + + // Serialize a request and send it to the plugin + send(request) { + this.ws.send(this.serialize('request', request)); + } + + // Receive a message + onMessage(data) { + try { + if (!this.abi) { + this.abiTime = Date.now(); + var status = { "status" : "set_abi" } + status["time"] = Date.now() + console.error(', '); + console.error(JSON.stringify(status, null, 4)); + // First message is the protocol ABI + this.abi = JSON.parse(data); + this.types = Serialize.getTypesFromAbi(Serialize.createInitialTypes(), this.abi); + for (const table of this.abi.tables) + this.tables.set(table.name, table.type); + + // Send the first request + this.send(['get_status_request_v0', {}]); + } else { + // Deserialize and dispatch a message + const [type, response] = this.deserialize('result', data); + this[type](response); + } + } catch (e) { + var status = { "status" : "error" } + status["time"] = Date.now() + status["msg"] = e.toString(); + console.error(', '); + console.error(JSON.stringify(status, null, 4)); + console.error(']'); + process.exit(1); + } + } + + // Report status + get_status_result_v0(response) { + if (this.startRequestArray) { + this.startRequestArray = false; + this.startTime = Date.now(); + console.log('['); + response["start_time"] = this.startTime; + this.blockNum = response["head"]["block_num"]; + this.initialBlockNum = this.blockNum; + this.blockNumChangeTime = this.startTime; + this.lastResponseTime = this.startTime; + this.max = { "blockNum": 0, "time": 0.0 }; + } else { + console.log(',' + "\n"); + } + console.log('{ \"get_status_result_v0\":'); + response["resp_num"] = this.numRequests; + var currentTime = Date.now(); + response["resp_time"] = (currentTime - this.startTime) / 1000; + var timeSinceBlockNumChange = (currentTime - this.blockNumChangeTime) / 1000; + response["time_since_block_num_change"] = timeSinceBlockNumChange; + var blockNum = response["head"]["block_num"] + if (blockNum != this.blockNum) { + this.blockNum = response["head"]["block_num"]; + if (this.max["time"] < timeSinceBlockNumChange) { + this.max["time"] = timeSinceBlockNumChange; + this.max["blockNum"] = blockNum; + } + this.blockNumChangeTime = currentTime; + } + response["time_since_last_block"] = (currentTime - this.lastResponseTime) / 1000; + this.lastResponseTime = currentTime; + console.log(JSON.stringify(response, null, 4)); + console.log('}'); + if (this.numRequests < 0) { // request forever + this.send(['get_status_request_v0', {}]); + } else if (--this.numRequests > 0 ) { + this.send(['get_status_request_v0', {}]); + } else { + console.log(']'); + var status = { "status" : "done" } + console.error(' ,'); + status["time"] = Date.now() + if (this.max["time"] < timeSinceBlockNumChange) { + this.max["time"] = timeSinceBlockNumChange; + this.max["blockNum"] = blockNum; + } + status["max_time"] = {"block_num": this.max["blockNum"], "time_delta": this.max["time"] }; + status["first_block_num"] = this.initialBlockNum; + status["last_block_num"] = blockNum; + status["abi_rcv_delta_time"] = (this.abiTime - this.createTime)/1000; + status["req_rcv_delta_time"] = (this.startTime - this.createTime)/1000; + status["done_delta_time"] = (currentTime - this.createTime)/1000; + + console.error(JSON.stringify(status, null, 4)); + console.error(']'); + process.exit(0); + } + } +} // Connection + +commander + .option('-a, --socket-address [addr]', 'Socket address', 'ws://localhost:8080/') + .option('-f, --first-block [num]', 'Socket address', 2) + .option('-n, --num-requests [num]', 'number of requests to make', 1) + .parse(process.argv); + +const connection = new Connection(commander.socketAddress, commander.numRequests); + diff --git a/tests/ship_test.py b/tests/ship_test.py new file mode 100755 index 00000000000..577a7770738 --- /dev/null +++ b/tests/ship_test.py @@ -0,0 +1,223 @@ +#!/usr/bin/env python3 + +from testUtils import Utils +from datetime import datetime +from datetime import timedelta +import time +from Cluster import Cluster +from WalletMgr import WalletMgr +from TestHelper import TestHelper +from TestHelper import AppArgs + +import json +import os +import re +import shutil +import signal +import sys + +############################################################### +# ship_test +# +# This test sets up <-p> producing node(s) and <-n - -p> +# non-producing node(s). One of the non-producing nodes +# is configured with the state_history_plugin. An instance +# of node will be started with a client javascript to exercise +# the SHiP API. +# +############################################################### + +Print=Utils.Print + +appArgs = AppArgs() +extraArgs = appArgs.add(flag="--num-requests", type=int, help="How many requests that each ship_client requests", default=1) +extraArgs = appArgs.add(flag="--num-clients", type=int, help="How many ship_clients should be started", default=1) +args = TestHelper.parse_args({"-p", "-n","--dump-error-details","--keep-logs","-v","--leave-running","--clean-run"}, applicationSpecificArgs=appArgs) + +Utils.Debug=args.v +totalProducerNodes=args.p +totalNodes=args.n +if totalNodes<=totalProducerNodes: + totalNodes=totalProducerNodes+1 +totalNonProducerNodes=totalNodes-totalProducerNodes +totalProducers=totalProducerNodes +cluster=Cluster(walletd=True) +dumpErrorDetails=args.dump_error_details +keepLogs=args.keep_logs +dontKill=args.leave_running +killAll=args.clean_run +walletPort=TestHelper.DEFAULT_WALLET_PORT + +walletMgr=WalletMgr(True, port=walletPort) +testSuccessful=False +killEosInstances=not dontKill +killWallet=not dontKill + +WalletdName=Utils.EosWalletName +ClientName="cleos" +shipTempDir=None + +try: + TestHelper.printSystemInfo("BEGIN") + + cluster.setWalletMgr(walletMgr) + cluster.killall(allInstances=killAll) + cluster.cleanup() + Print("Stand up cluster") + specificExtraNodeosArgs={} + # non-producing nodes are at the end of the cluster's nodes, so reserving the last one for state_history_plugin + shipNodeNum = totalNodes - 1 + specificExtraNodeosArgs[shipNodeNum]="--plugin eosio::state_history_plugin --disable-replay-opts --sync-fetch-span 200 --plugin eosio::net_api_plugin " + + if cluster.launch(pnodes=totalProducerNodes, + totalNodes=totalNodes, totalProducers=totalProducers, + useBiosBootFile=False, specificExtraNodeosArgs=specificExtraNodeosArgs) is False: + Utils.cmdError("launcher") + Utils.errorExit("Failed to stand up eos cluster.") + + # *** identify each node (producers and non-producing node) *** + + shipNode = cluster.getNode(shipNodeNum) + prodNode = cluster.getNode(0) + + #verify nodes are in sync and advancing + cluster.waitOnClusterSync(blockAdvancing=5) + Print("Cluster in Sync") + + javascriptClient = "tests/ship_client.js" + cmd = "node %s --num-requests %d" % (javascriptClient, args.num_requests) + if Utils.Debug: Utils.Print("cmd: %s" % (cmd)) + clients = [] + files = [] + shipTempDir = os.path.join(Utils.DataDir, "ship") + os.makedirs(shipTempDir, exist_ok = True) + shipClientFilePrefix = os.path.join(shipTempDir, "client") + + starts = [] + for i in range(0, args.num_clients): + start = time.perf_counter() + outFile = open("%s%d.out" % (shipClientFilePrefix, i), "w") + errFile = open("%s%d.err" % (shipClientFilePrefix, i), "w") + Print("Start client %d" % (i)) + popen=Utils.delayedCheckOutput(cmd, stdout=outFile, stderr=errFile) + starts.append(time.perf_counter()) + clients.append((popen, cmd)) + files.append((outFile, errFile)) + Print("Client %d started, Ship node head is: %s" % (i, shipNode.getBlockNum())) + + Print("Stopping all %d clients" % (args.num_clients)) + + for index, (popen, _), (out, err), start in zip(range(len(clients)), clients, files, starts): + popen.wait() + Print("Stopped client %d. Ran for %.3f seconds." % (index, time.perf_counter() - start)) + out.close() + err.close() + + Print("Shutdown state_history_plugin nodeos") + shipNode.kill(signal.SIGTERM) + + files = None + + maxFirstBN = -1 + minLastBN = sys.maxsize + for index in range(0, len(clients)): + done = False + shipClientErrorFile = "%s%d.err" % (shipClientFilePrefix, i) + with open(shipClientErrorFile, "r") as errFile: + statuses = None + lines = errFile.readlines() + missingModules = [] + for line in lines: + match = re.search(r"Error: Cannot find module '(\w+)'", line) + if match: + missingModules.append(match.group(1)) + if len(missingModules) > 0: + Utils.errorExit("Javascript client #%d threw an exception, it was missing modules: %s" % (index, ", ".join(missingModules))) + + try: + statuses = json.loads(" ".join(lines)) + except json.decoder.JSONDecodeError as er: + Utils.errorExit("javascript client output was malformed in %s. Exception: %s" % (shipClientErrorFile, er)) + + for status in statuses: + statusDesc = status["status"] + if statusDesc == "done": + done = True + firstBlockNum = status["first_block_num"] + lastBlockNum = status["last_block_num"] + maxFirstBN = max(maxFirstBN, firstBlockNum) + minLastBN = min(minLastBN, lastBlockNum) + if statusDesc == "error": + Utils.errorExit("javascript client reporting error see: %s." % (shipClientErrorFile)) + + assert done, Print("ERROR: Did not find a \"done\" status for client %d" % (i)) + + Print("All clients active from block num: %s to block_num: %s." % (maxFirstBN, minLastBN)) + + stderrFile=Utils.getNodeDataDir(shipNodeNum, "stderr.txt") + biggestDelta = timedelta(seconds=0) + totalDelta = timedelta(seconds=0) + timeCount = 0 + with open(stderrFile, 'r') as f: + line = f.readline() + while line: + match = re.search(r'info\s+([0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3})\s.+Received\sblock\s+.+\s#([0-9]+)\s@\s([0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3})', + line) + if match: + rcvTimeStr = match.group(1) + prodTimeStr = match.group(3) + blockNum = int(match.group(2)) + if blockNum > maxFirstBN: + # ship requests can only affect time after clients started + timeFmt = '%Y-%m-%dT%H:%M:%S.%f' + rcvTime = datetime.strptime(rcvTimeStr, timeFmt) + prodTime = datetime.strptime(prodTimeStr, timeFmt) + delta = rcvTime - prodTime + biggestDelta = max(delta, biggestDelta) + + totalDelta += delta + timeCount += 1 + assert delta < timedelta(seconds=0.500), Print("ERROR: block_num: %s took %.3f seconds to be received." % (blockNum, delta.total_seconds())) + + line = f.readline() + + avg = totalDelta.total_seconds() / timeCount if timeCount > 0 else 0.0 + Print("Greatest delay time: %.3f seconds, average delay time: %.3f seconds" % (biggestDelta.total_seconds(), avg)) + + testSuccessful = True +finally: + TestHelper.shutdown(cluster, walletMgr, testSuccessful=testSuccessful, killEosInstances=killEosInstances, killWallet=killWallet, keepLogs=keepLogs, cleanRun=killAll, dumpErrorDetails=dumpErrorDetails) + if shipTempDir is not None: + if dumpErrorDetails and not testSuccessful: + def printTruncatedFile(filename, maxLines): + Print(Utils.FileDivider) + with open(filename, "r") as f: + Print("Contents of %s" % (filename)) + line = f.readline() + lineCount = 0 + while line and lineCount < maxLines: + Print(line) + lineCount += 1 + line = f.readline() + if line: + Print("... CONTENT TRUNCATED AT %d lines" % (maxLines)) + + for index in range(0, args.num_clients): + # error file should not contain much content, so if there are lots of lines it is likely useless + printTruncatedFile("%s%d.err" % (shipClientFilePrefix, i), maxLines=1000) + # output file should have lots of output, but if user passes in a huge number of requests, these could go on forever + printTruncatedFile("%s%d.out" % (shipClientFilePrefix, i), maxLines=20000) + + if not keepLogs: + shutil.rmtree(shipTempDir, ignore_errors=True) + if not testSuccessful and dumpErrorDetails: + Print(Utils.FileDivider) + Print("Compare Blocklog") + cluster.compareBlockLogs() + Print(Utils.FileDivider) + Print("Print Blocklog") + cluster.printBlockLog() + Print(Utils.FileDivider) + +errorCode = 0 if testSuccessful else 1 +exit(errorCode) diff --git a/tests/testUtils.py b/tests/testUtils.py index 4deb5b12fac..f0d2162b8f3 100755 --- a/tests/testUtils.py +++ b/tests/testUtils.py @@ -156,10 +156,21 @@ def getChainStrategies(): @staticmethod def checkOutput(cmd, ignoreError=False): + popen = Utils.delayedCheckOutput(cmd) + return Utils.checkDelayedOutput(popen, cmd, ignoreError=ignoreError) + + @staticmethod + def delayedCheckOutput(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE): if (isinstance(cmd, list)): - popen=subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + popen=subprocess.Popen(cmd, stdout=stdout, stderr=stderr) else: - popen=subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + popen=subprocess.Popen(cmd, stdout=stdout, stderr=stderr, shell=True) + return popen + + @staticmethod + def checkDelayedOutput(popen, cmd, ignoreError=False): + assert isinstance(popen, subprocess.Popen) + assert isinstance(cmd, (str,list)) (output,error)=popen.communicate() Utils.CheckOutputDeque.append((output,error,cmd)) if popen.returncode != 0 and not ignoreError: @@ -214,6 +225,12 @@ def waitForBool(lam, timeout=None, sleepTime=3, reporter=None): ret=Utils.waitForObj(myLam, timeout, sleepTime, reporter=reporter) return False if ret is None else ret + @staticmethod + def waitForBoolWithArg(lam, arg, timeout=None, sleepTime=3, reporter=None): + myLam = lambda: True if lam(arg, timeout) else None + ret=Utils.waitForObj(myLam, timeout, sleepTime, reporter=reporter) + return False if ret is None else ret + @staticmethod def filterJsonObjectOrArray(data): firstObjIdx=data.find('{') @@ -257,7 +274,6 @@ def runCmdReturnStr(cmd, trace=False): cmdArr=shlex.split(cmd) return Utils.runCmdArrReturnStr(cmdArr) - @staticmethod def runCmdArrReturnStr(cmdArr, trace=False): retStr=Utils.checkOutput(cmdArr)