Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Update ship's file format #7113

Merged
merged 9 commits into from
Apr 12, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <fstream>
#include <stdint.h>

#include <eosio/chain/block_header.hpp>
#include <eosio/chain/exceptions.hpp>
#include <eosio/chain/types.hpp>
#include <fc/log/logger.hpp>
Expand All @@ -21,39 +22,29 @@ namespace eosio {
* +---------+----------------+-----------+------------------+-----+---------+----------------+
*
* *.index:
* +-----------+-------------+-----+-----------+
* | Summary i | Summary i+1 | ... | Summary z |
* +-----------+-------------+-----+-----------+
* +----------------+------------------+-----+----------------+
* | Pos of Entry i | Pos of Entry i+1 | ... | Pos of Entry z |
* +----------------+------------------+-----+----------------+
*
* each entry:
* uint32_t block_num
* block_id_type block_id
* uint64_t size of payload
* uint8_t version
* payload
*
* each summary:
* uint64_t position of entry in *.log
*
* state payload:
* uint32_t size of deltas
* char[] deltas
* state_history_log_header
* payload
*/

// todo: look into switching this to serialization instead of memcpy
// todo: consider reworking versioning
// todo: consider dropping block_num since it's included in block_id
// todo: currently only checks version on the first record. Need in recover_blocks
inline uint64_t ship_magic(uint32_t version) { return N(ship) | version; }
inline bool is_ship(uint64_t magic) { return (magic & 0xffff'ffff'0000'0000) == N(ship); }
inline uint32_t get_ship_version(uint64_t magic) { return magic; }
inline bool is_ship_supported_version(uint64_t magic) { return is_ship(magic) && get_ship_version(magic) == 0; }
inline bool is_ship_unsupported_version(uint64_t magic) { return is_ship(magic) && get_ship_version(magic) != 0; }

struct state_history_log_header {
uint32_t block_num = 0;
chain::block_id_type block_id;
uint64_t magic = ship_magic(0);
chain::block_id_type block_id = {};
uint64_t payload_size = 0;
uint8_t version = 0;
};

struct state_history_summary {
uint64_t pos = 0;
};
static const int state_history_log_header_serial_size = sizeof(state_history_log_header::magic) +
sizeof(state_history_log_header::block_id) +
sizeof(state_history_log_header::payload_size);

class state_history_log {
private:
Expand All @@ -78,40 +69,58 @@ class state_history_log {
uint32_t begin_block() const { return _begin_block; }
uint32_t end_block() const { return _end_block; }

void read_header(state_history_log_header& header, bool assert_version = true) {
char bytes[state_history_log_header_serial_size];
log.read(bytes, sizeof(bytes));
fc::datastream<const char*> ds(bytes, sizeof(bytes));
fc::raw::unpack(ds, header);
EOS_ASSERT(!ds.remaining(), chain::plugin_exception, "state_history_log_header_serial_size mismatch");
EOS_ASSERT(!assert_version || is_ship_supported_version(header.magic), chain::plugin_exception,
"corrupt ${name}.log (0)", ("name", name));
}

void write_header(const state_history_log_header& header) {
char bytes[state_history_log_header_serial_size];
fc::datastream<char*> ds(bytes, sizeof(bytes));
fc::raw::pack(ds, header);
EOS_ASSERT(!ds.remaining(), chain::plugin_exception, "state_history_log_header_serial_size mismatch");
log.write(bytes, sizeof(bytes));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though it should be the same in this case, it is better use ds.tellp() rather than sizeof(bytes) here.

}

template <typename F>
void write_entry(const state_history_log_header& header, const chain::block_id_type& prev_id, F write_payload) {
EOS_ASSERT(_begin_block == _end_block || header.block_num <= _end_block, chain::plugin_exception,
auto block_num = chain::block_header::num_from_id(header.block_id);
EOS_ASSERT(_begin_block == _end_block || block_num <= _end_block, chain::plugin_exception,
"missed a block in ${name}.log", ("name", name));

if (_begin_block != _end_block && header.block_num > _begin_block) {
if (header.block_num == _end_block) {
if (_begin_block != _end_block && block_num > _begin_block) {
if (block_num == _end_block) {
EOS_ASSERT(prev_id == last_block_id, chain::plugin_exception, "missed a fork change in ${name}.log",
("name", name));
} else {
state_history_log_header prev;
get_entry(header.block_num - 1, prev);
get_entry(block_num - 1, prev);
EOS_ASSERT(prev_id == prev.block_id, chain::plugin_exception, "missed a fork change in ${name}.log",
("name", name));
}
}

if (header.block_num < _end_block)
truncate(header.block_num);
if (block_num < _end_block)
truncate(block_num);
log.seekg(0, std::ios_base::end);
uint64_t pos = log.tellg();
log.write((char*)&header, sizeof(header));
write_header(header);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could store the position right after writing the header but before writing the payload:

uint64_t payload_start_pos = log.tellg();

write_payload(log);
uint64_t end = log.tellg();
EOS_ASSERT(end == pos + sizeof(header) + header.payload_size, chain::plugin_exception,
EOS_ASSERT(end == pos + state_history_log_header_serial_size + header.payload_size, chain::plugin_exception,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then here the condition could be end == payload_start_pos + header.payload_size.

"wrote payload with incorrect size to ${name}.log", ("name", name));
log.write((char*)&pos, sizeof(pos));

index.seekg(0, std::ios_base::end);
state_history_summary summary{.pos = pos};
index.write((char*)&summary, sizeof(summary));
index.write((char*)&pos, sizeof(pos));
if (_begin_block == _end_block)
_begin_block = header.block_num;
_end_block = header.block_num + 1;
_begin_block = block_num;
_end_block = block_num + 1;
last_block_id = header.block_id;
}

Expand All @@ -120,7 +129,7 @@ class state_history_log {
EOS_ASSERT(block_num >= _begin_block && block_num < _end_block, chain::plugin_exception,
"read non-existing block in ${name}.log", ("name", name));
log.seekg(get_pos(block_num));
log.read((char*)&header, sizeof(header));
read_header(header);
return log;
}

Expand All @@ -136,17 +145,18 @@ class state_history_log {
uint64_t suffix;
log.seekg(size - sizeof(suffix));
log.read((char*)&suffix, sizeof(suffix));
if (suffix > size || suffix + sizeof(header) > size) {
if (suffix > size || suffix + state_history_log_header_serial_size > size) {
elog("corrupt ${name}.log (2)", ("name", name));
return false;
}
log.seekg(suffix);
log.read((char*)&header, sizeof(header));
if (suffix + sizeof(header) + header.payload_size + sizeof(suffix) != size) {
read_header(header, false);
if (!is_ship_supported_version(header.magic) ||
suffix + state_history_log_header_serial_size + header.payload_size + sizeof(suffix) != size) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last part of condition could be log.tellg() + header.payload_size + sizeof(suffix) != size.

elog("corrupt ${name}.log (3)", ("name", name));
return false;
}
_end_block = header.block_num + 1;
_end_block = chain::block_header::num_from_id(header.block_id) + 1;
last_block_id = header.block_id;
if (_begin_block >= _end_block) {
elog("corrupt ${name}.log (4)", ("name", name));
Expand All @@ -161,18 +171,22 @@ class state_history_log {
uint32_t num_found = 0;
while (true) {
state_history_log_header header;
if (pos + sizeof(header) > size)
if (pos + state_history_log_header_serial_size > size)
break;
log.seekg(pos);
log.read((char*)&header, sizeof(header));
read_header(header, false);
uint64_t suffix;
if (header.payload_size > size || pos + sizeof(header) + header.payload_size + sizeof(suffix) > size)
if (!is_ship_supported_version(header.magic) || header.payload_size > size ||
pos + state_history_log_header_serial_size + header.payload_size + sizeof(suffix) > size) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last part of condition could be log.tellg() + header.payload_size + sizeof(suffix) > size.

EOS_ASSERT(!is_ship_unsupported_version(header.magic), chain::plugin_exception,
"${name}.log has an unsupported version", ("name", name));
break;
log.seekg(pos + sizeof(header) + header.payload_size);
}
log.seekg(pos + state_history_log_header_serial_size + header.payload_size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace with log.seekg( log.tellg() + header.payload_size ).

log.read((char*)&suffix, sizeof(suffix));
if (suffix != pos)
break;
pos = pos + sizeof(header) + header.payload_size + sizeof(suffix);
pos = pos + state_history_log_header_serial_size + header.payload_size + sizeof(suffix);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace with pos = log.tellg().

if (!(++num_found % 10000)) {
printf("%10u blocks found, log pos=%12llu\r", (unsigned)num_found, (unsigned long long)pos);
fflush(stdout);
Expand All @@ -188,13 +202,14 @@ class state_history_log {
log.open(log_filename, std::ios_base::binary | std::ios_base::in | std::ios_base::out | std::ios_base::app);
log.seekg(0, std::ios_base::end);
uint64_t size = log.tellg();
if (size >= sizeof(state_history_log_header)) {
if (size >= state_history_log_header_serial_size) {
state_history_log_header header;
log.seekg(0);
log.read((char*)&header, sizeof(header));
EOS_ASSERT(header.version == 0 && sizeof(header) + header.payload_size + sizeof(uint64_t) <= size,
read_header(header, false);
EOS_ASSERT(is_ship_supported_version(header.magic) &&
state_history_log_header_serial_size + header.payload_size + sizeof(uint64_t) <= size,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last part of condition could be log.tellg() + header.payload_size + sizeof(uint64_t) <= size.

chain::plugin_exception, "corrupt ${name}.log (1)", ("name", name));
_begin_block = header.block_num;
_begin_block = chain::block_header::num_from_id(header.block_id);
last_block_id = header.block_id;
if (!get_last_block(size))
recover_blocks(size);
Expand All @@ -208,7 +223,7 @@ class state_history_log {
void open_index() {
index.open(index_filename, std::ios_base::binary | std::ios_base::in | std::ios_base::out | std::ios_base::app);
index.seekg(0, std::ios_base::end);
if (index.tellg() == (static_cast<int>(_end_block) - _begin_block) * sizeof(state_history_summary))
if (index.tellg() == (static_cast<int>(_end_block) - _begin_block) * sizeof(uint64_t))
return;
ilog("Regenerate ${name}.index", ("name", name));
index.close();
Expand All @@ -220,21 +235,21 @@ class state_history_log {
uint32_t num_found = 0;
while (pos < size) {
state_history_log_header header;
EOS_ASSERT(pos + sizeof(header) <= size, chain::plugin_exception, "corrupt ${name}.log (6)", ("name", name));
EOS_ASSERT(pos + state_history_log_header_serial_size <= size, chain::plugin_exception,
"corrupt ${name}.log (6)", ("name", name));
log.seekg(pos);
log.read((char*)&header, sizeof(header));
uint64_t suffix_pos = pos + sizeof(header) + header.payload_size;
read_header(header, false);
uint64_t suffix_pos = pos + state_history_log_header_serial_size + header.payload_size;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace with uint64_t suffix_pos = log.tellg() + header.payload_size;.

uint64_t suffix;
EOS_ASSERT(suffix_pos + sizeof(suffix) <= size, chain::plugin_exception, "corrupt ${name}.log (7)",
("name", name));
EOS_ASSERT(is_ship_supported_version(header.magic) && suffix_pos + sizeof(suffix) <= size,
chain::plugin_exception, "corrupt ${name}.log (7)", ("name", name));
log.seekg(suffix_pos);
log.read((char*)&suffix, sizeof(suffix));
// ilog("block ${b} at ${pos}-${end} suffix=${suffix} file_size=${fs}",
// ("b", header.block_num)("pos", pos)("end", suffix_pos + sizeof(suffix))("suffix", suffix)("fs", size));
EOS_ASSERT(suffix == pos, chain::plugin_exception, "corrupt ${name}.log (8)", ("name", name));

state_history_summary summary{.pos = pos};
index.write((char*)&summary, sizeof(summary));
index.write((char*)&pos, sizeof(pos));
pos = suffix_pos + sizeof(suffix);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could also just be pos = log.tellg();.

if (!(++num_found % 10000)) {
printf("%10u blocks found, log pos=%12llu\r", (unsigned)num_found, (unsigned long long)pos);
Expand All @@ -244,10 +259,10 @@ class state_history_log {
}

uint64_t get_pos(uint32_t block_num) {
state_history_summary summary;
index.seekg((block_num - _begin_block) * sizeof(summary));
index.read((char*)&summary, sizeof(summary));
return summary.pos;
uint64_t pos;
index.seekg((block_num - _begin_block) * sizeof(pos));
index.read((char*)&pos, sizeof(pos));
return pos;
}

void truncate(uint32_t block_num) {
Expand All @@ -267,7 +282,7 @@ class state_history_log {
log.seekg(0);
index.seekg(0);
boost::filesystem::resize_file(log_filename, pos);
boost::filesystem::resize_file(index_filename, (block_num - _begin_block) * sizeof(state_history_summary));
boost::filesystem::resize_file(index_filename, (block_num - _begin_block) * sizeof(uint64_t));
_end_block = block_num;
}
log.sync();
Expand All @@ -277,3 +292,5 @@ class state_history_log {
}; // state_history_log

} // namespace eosio

FC_REFLECT(eosio::state_history_log_header, (magic)(block_id)(payload_size))
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,6 @@ datastream<ST>& operator<<(datastream<ST>& ds, const history_serial_wrapper<eosi
fc::raw::pack(ds, fc::unsigned_int(0));
fc::raw::pack(ds, as_type<fc::unsigned_int>(obj.obj.action_ordinal));
fc::raw::pack(ds, as_type<fc::unsigned_int>(obj.obj.creator_action_ordinal));
fc::raw::pack(ds, as_type<fc::unsigned_int>(obj.obj.parent_action_ordinal));
fc::raw::pack(ds, bool(obj.obj.receipt));
if (obj.obj.receipt) {
fc::raw::pack(ds, make_history_serial_wrapper(obj.db, as_type<eosio::chain::action_receipt>(*obj.obj.receipt)));
Expand Down
6 changes: 2 additions & 4 deletions plugins/state_history_plugin/state_history_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
auto traces_bin = zlib_compress_bytes(fc::raw::pack(make_history_serial_wrapper(db, traces)));
EOS_ASSERT(traces_bin.size() == (uint32_t)traces_bin.size(), plugin_exception, "traces is too big");

state_history_log_header header{.block_num = block_state->block->block_num(),
.block_id = block_state->block->id(),
state_history_log_header header{.block_id = block_state->block->id(),
.payload_size = sizeof(uint32_t) + traces_bin.size()};
trace_log->write_entry(header, block_state->block->previous, [&](auto& stream) {
uint32_t s = (uint32_t)traces_bin.size();
Expand Down Expand Up @@ -487,8 +486,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl

auto deltas_bin = zlib_compress_bytes(fc::raw::pack(deltas));
EOS_ASSERT(deltas_bin.size() == (uint32_t)deltas_bin.size(), plugin_exception, "deltas is too big");
state_history_log_header header{.block_num = block_state->block->block_num(),
.block_id = block_state->block->id(),
state_history_log_header header{.block_id = block_state->block->id(),
.payload_size = sizeof(uint32_t) + deltas_bin.size()};
chain_state_log->write_entry(header, block_state->block->previous, [&](auto& stream) {
uint32_t s = (uint32_t)deltas_bin.size();
Expand Down
1 change: 0 additions & 1 deletion plugins/state_history_plugin/state_history_plugin_abi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ extern const char* const state_history_plugin_abi = R"({
"name": "action_trace_v0", "fields": [
{ "name": "action_ordinal", "type": "varuint32" },
{ "name": "creator_action_ordinal", "type": "varuint32" },
{ "name": "parent_action_ordinal", "type": "varuint32" },
{ "name": "receipt", "type": "action_receipt?" },
{ "name": "receiver", "type": "name" },
{ "name": "act", "type": "action" },
Expand Down