Skip to content

Commit

Permalink
SnapshotBundle uses schema
Browse files Browse the repository at this point in the history
  • Loading branch information
battlmonstr committed Nov 6, 2024
1 parent 2c5f896 commit f296b66
Show file tree
Hide file tree
Showing 16 changed files with 223 additions and 238 deletions.
44 changes: 22 additions & 22 deletions cmd/capi/execute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,54 +151,54 @@ std::vector<SilkwormChainSnapshot> collect_all_snapshots(const SnapshotRepositor
std::vector<SilkwormTransactionsSnapshot> transactions_snapshot_sequence;

for (const auto& bundle_ptr : repository.view_bundles()) {
const auto& bundle = *bundle_ptr;
db::blocks::BundleDataRef bundle{**bundle_ptr};
{
{
SilkwormHeadersSnapshot raw_headers_snapshot{
.segment{
.file_path = make_path(bundle.header_segment.path()),
.memory_address = bundle.header_segment.memory_file_region().data(),
.memory_length = bundle.header_segment.memory_file_region().size(),
.file_path = make_path(bundle.header_segment().path()),
.memory_address = bundle.header_segment().memory_file_region().data(),
.memory_length = bundle.header_segment().memory_file_region().size(),
},
.header_hash_index{
.file_path = make_path(bundle.idx_header_hash.path()),
.memory_address = bundle.idx_header_hash.memory_file_region().data(),
.memory_length = bundle.idx_header_hash.memory_file_region().size(),
.file_path = make_path(bundle.idx_header_hash().path()),
.memory_address = bundle.idx_header_hash().memory_file_region().data(),
.memory_length = bundle.idx_header_hash().memory_file_region().size(),
},
};
headers_snapshot_sequence.push_back(raw_headers_snapshot);
}
{
SilkwormBodiesSnapshot raw_bodies_snapshot{
.segment{
.file_path = make_path(bundle.body_segment.path()),
.memory_address = bundle.body_segment.memory_file_region().data(),
.memory_length = bundle.body_segment.memory_file_region().size(),
.file_path = make_path(bundle.body_segment().path()),
.memory_address = bundle.body_segment().memory_file_region().data(),
.memory_length = bundle.body_segment().memory_file_region().size(),
},
.block_num_index{
.file_path = make_path(bundle.idx_body_number.path()),
.memory_address = bundle.idx_body_number.memory_file_region().data(),
.memory_length = bundle.idx_body_number.memory_file_region().size(),
.file_path = make_path(bundle.idx_body_number().path()),
.memory_address = bundle.idx_body_number().memory_file_region().data(),
.memory_length = bundle.idx_body_number().memory_file_region().size(),
},
};
bodies_snapshot_sequence.push_back(raw_bodies_snapshot);
}
{
SilkwormTransactionsSnapshot raw_transactions_snapshot{
.segment{
.file_path = make_path(bundle.txn_segment.path()),
.memory_address = bundle.txn_segment.memory_file_region().data(),
.memory_length = bundle.txn_segment.memory_file_region().size(),
.file_path = make_path(bundle.txn_segment().path()),
.memory_address = bundle.txn_segment().memory_file_region().data(),
.memory_length = bundle.txn_segment().memory_file_region().size(),
},
.tx_hash_index{
.file_path = make_path(bundle.idx_txn_hash.path()),
.memory_address = bundle.idx_txn_hash.memory_file_region().data(),
.memory_length = bundle.idx_txn_hash.memory_file_region().size(),
.file_path = make_path(bundle.idx_txn_hash().path()),
.memory_address = bundle.idx_txn_hash().memory_file_region().data(),
.memory_length = bundle.idx_txn_hash().memory_file_region().size(),
},
.tx_hash_2_block_index{
.file_path = make_path(bundle.idx_txn_hash_2_block.path()),
.memory_address = bundle.idx_txn_hash_2_block.memory_file_region().data(),
.memory_length = bundle.idx_txn_hash_2_block.memory_file_region().size(),
.file_path = make_path(bundle.idx_txn_hash_2_block().path()),
.memory_address = bundle.idx_txn_hash_2_block().memory_file_region().data(),
.memory_length = bundle.idx_txn_hash_2_block().memory_file_region().size(),
},
};
transactions_snapshot_sequence.push_back(raw_transactions_snapshot);
Expand Down
8 changes: 4 additions & 4 deletions cmd/dev/snapshots.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,8 @@ BodyCounters count_bodies_in_all(const SnapshotSubcommandSettings& settings) {
int num_bodies = 0;
uint64_t num_txns = 0;
for (const auto& bundle_ptr : repository.view_bundles()) {
const auto& bundle = *bundle_ptr;
const auto [body_count, txn_count] = count_bodies_in_one(settings, bundle.body_segment);
db::blocks::BundleDataRef bundle{**bundle_ptr};
const auto [body_count, txn_count] = count_bodies_in_one(settings, bundle.body_segment());
num_bodies += body_count;
num_txns += txn_count;
}
Expand Down Expand Up @@ -376,8 +376,8 @@ int count_headers_in_all(const SnapshotSubcommandSettings& settings) {
auto repository = make_repository(settings.settings);
int num_headers{0};
for (const auto& bundle_ptr : repository.view_bundles()) {
const auto& bundle = *bundle_ptr;
const auto header_count = count_headers_in_one(settings, bundle.header_segment);
db::blocks::BundleDataRef bundle{**bundle_ptr};
const auto header_count = count_headers_in_one(settings, bundle.header_segment());
num_headers += header_count;
}
return num_headers;
Expand Down
28 changes: 17 additions & 11 deletions silkworm/capi/silkworm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,19 +378,25 @@ SILKWORM_EXPORT int silkworm_add_snapshot(SilkwormHandle handle, SilkwormChainSn
snapshots::Index idx_txn_hash{transactions_segment_path->related_path(snapshots::SnapshotType::transactions, snapshots::kIdxExtension), make_region(ts.tx_hash_index)};
snapshots::Index idx_txn_hash_2_block{transactions_segment_path->related_path(snapshots::SnapshotType::transactions_to_block, snapshots::kIdxExtension), make_region(ts.tx_hash_2_block_index)};

snapshots::SnapshotBundle bundle{
headers_segment_path->step_range(),
{
.header_segment = std::move(header_segment),
.idx_header_hash = std::move(idx_header_hash),
snapshots::SnapshotBundleData bundle_data = [&]() {
snapshots::SnapshotBundleData data;

.body_segment = std::move(body_segment),
.idx_body_number = std::move(idx_body_number),
data.segments.emplace(db::blocks::kHeaderSegmentName, std::move(header_segment));
data.rec_split_indexes.emplace(db::blocks::kIdxHeaderHashName, std::move(idx_header_hash));

.txn_segment = std::move(txn_segment),
.idx_txn_hash = std::move(idx_txn_hash),
.idx_txn_hash_2_block = std::move(idx_txn_hash_2_block),
},
data.segments.emplace(db::blocks::kBodySegmentName, std::move(body_segment));
data.rec_split_indexes.emplace(db::blocks::kIdxBodyNumberName, std::move(idx_body_number));

data.segments.emplace(db::blocks::kTxnSegmentName, std::move(txn_segment));
data.rec_split_indexes.emplace(db::blocks::kIdxTxnHashName, std::move(idx_txn_hash));
data.rec_split_indexes.emplace(db::blocks::kIdxTxnHash2BlockName, std::move(idx_txn_hash_2_block));

return data;
}();

snapshots::SnapshotBundle bundle{
headers_segment_path->step_range(),
std::move(bundle_data),
};
handle->repository->add_snapshot_bundle(std::move(bundle));
return SILKWORM_OK;
Expand Down
14 changes: 14 additions & 0 deletions silkworm/db/blocks/schema_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,18 @@ inline constexpr datastore::EntityName kIdxTxnHashName{"transactions"};
//! Index transaction_hash -> block_num
inline constexpr datastore::EntityName kIdxTxnHash2BlockName{"transactions_to_block"};

struct BundleDataRef {
const snapshots::SnapshotBundleData& data;

const snapshots::SegmentFileReader& header_segment() const { return data.segments.at(kHeaderSegmentName); }
const snapshots::Index& idx_header_hash() const { return data.rec_split_indexes.at(kIdxHeaderHashName); }

const snapshots::SegmentFileReader& body_segment() const { return data.segments.at(kBodySegmentName); }
const snapshots::Index& idx_body_number() const { return data.rec_split_indexes.at(kIdxBodyNumberName); }

const snapshots::SegmentFileReader& txn_segment() const { return data.segments.at(kTxnSegmentName); }
const snapshots::Index& idx_txn_hash() const { return data.rec_split_indexes.at(kIdxTxnHashName); }
const snapshots::Index& idx_txn_hash_2_block() const { return data.rec_split_indexes.at(kIdxTxnHash2BlockName); }
};

} // namespace silkworm::db::blocks
9 changes: 5 additions & 4 deletions silkworm/db/blocks/transactions/txn_queries.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <silkworm/core/types/hash.hpp>
#include <silkworm/db/datastore/snapshots/basic_queries.hpp>

#include "../schema_config.hpp"
#include "txn_segment.hpp"

namespace silkworm::snapshots {
Expand Down Expand Up @@ -59,10 +60,10 @@ class TransactionBlockNumByTxnHashRepoQuery {

std::optional<BlockNum> exec(const Hash& hash) {
for (const TBundle& bundle_ptr : bundles_) {
const auto& bundle = *bundle_ptr;
const SegmentFileReader& segment = bundle.txn_segment;
const Index& idx_txn_hash = bundle.idx_txn_hash;
const Index& idx_txn_hash_2_block = bundle.idx_txn_hash_2_block;
db::blocks::BundleDataRef bundle{**bundle_ptr};
const SegmentFileReader& segment = bundle.txn_segment();
const Index& idx_txn_hash = bundle.idx_txn_hash();
const Index& idx_txn_hash_2_block = bundle.idx_txn_hash_2_block();

TransactionFindByHashQuery cross_check_query{{segment, idx_txn_hash}};
TransactionBlockNumByTxnHashQuery query{idx_txn_hash_2_block, cross_check_query};
Expand Down
45 changes: 32 additions & 13 deletions silkworm/db/datastore/snapshots/common/snapshot_path.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace silkworm::snapshots {

namespace fs = std::filesystem;

std::optional<SnapshotPath> SnapshotPath::parse(fs::path path) {
std::optional<StepRange> SnapshotPath::parse_step_range(fs::path path) {
const std::string filename_no_ext = path.stem().string();

// Expected stem format: <version>-<6_digit_block_from>-<6_digit_block_to>-<tag>
Expand All @@ -43,17 +43,6 @@ std::optional<SnapshotPath> SnapshotPath::parse(fs::path path) {

const auto [ver, from, to, tag] = std::tie(tokens[0], tokens[1], tokens[2], tokens[3]);

// Expected version format: v<x> (hence check length, check first char and parse w/ offset by one)
if (ver.empty() || ver[0] != 'v') {
return std::nullopt;
}

uint8_t ver_num = 0;
const auto ver_result = std::from_chars(ver.data() + 1, ver.data() + ver.size(), ver_num);
if (ver_result.ec == std::errc::invalid_argument) {
return std::nullopt;
}

// Expected scaled block format: <dddddd>
if (from.size() != 6 || to.size() != 6) {
return std::nullopt;
Expand All @@ -76,6 +65,36 @@ std::optional<SnapshotPath> SnapshotPath::parse(fs::path path) {
return std::nullopt;
}

return StepRange{step_from, step_to};
}

std::optional<SnapshotPath> SnapshotPath::parse(fs::path path) {
const std::string filename_no_ext = path.stem().string();

// Expected stem format: <version>-<6_digit_block_from>-<6_digit_block_to>-<tag>
const std::vector<absl::string_view> tokens = absl::StrSplit(filename_no_ext, absl::MaxSplits('-', 3));
if (tokens.size() != 4) {
return std::nullopt;
}

const auto [ver, from, to, tag] = std::tie(tokens[0], tokens[1], tokens[2], tokens[3]);

// Expected version format: v<x> (hence check length, check first char and parse w/ offset by one)
if (ver.empty() || ver[0] != 'v') {
return std::nullopt;
}

uint8_t ver_num = 0;
const auto ver_result = std::from_chars(ver.data() + 1, ver.data() + ver.size(), ver_num);
if (ver_result.ec == std::errc::invalid_argument) {
return std::nullopt;
}

auto step_range = parse_step_range(path);
if (!step_range) {
return std::nullopt;
}

// Expected tag format: headers|bodies|transactions|transactions-to-block
// parsing relies on magic_enum, so SnapshotType items must match exactly
std::string tag_str{tag.data(), tag.size()};
Expand All @@ -85,7 +104,7 @@ std::optional<SnapshotPath> SnapshotPath::parse(fs::path path) {
return std::nullopt;
}

return SnapshotPath{std::move(path), ver_num, {step_from, step_to}, *type};
return SnapshotPath{std::move(path), ver_num, *step_range, *type};
}

SnapshotPath SnapshotPath::make(
Expand Down
1 change: 1 addition & 0 deletions silkworm/db/datastore/snapshots/common/snapshot_path.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ inline constexpr uint8_t kSnapshotV1{1};
class SnapshotPath {
public:
static std::optional<SnapshotPath> parse(std::filesystem::path path);
static std::optional<StepRange> parse_step_range(std::filesystem::path path);

static SnapshotPath make(
const std::filesystem::path& dir,
Expand Down
62 changes: 42 additions & 20 deletions silkworm/db/datastore/snapshots/snapshot_bundle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,57 +16,79 @@

#include "snapshot_bundle.hpp"

#include <magic_enum.hpp>

#include <silkworm/infra/common/ensure.hpp>

namespace silkworm::snapshots {

SnapshotBundleData make_bundle_data(
const Schema::RepositoryDef& schema,
const std::filesystem::path& dir_path,
StepRange step_range) {
return {
schema.make_segments(dir_path, step_range),
schema.make_rec_split_indexes(dir_path, step_range),
};
}

SnapshotBundle::~SnapshotBundle() {
close();
}

void SnapshotBundle::reopen() {
for (auto& segment_ref : segments()) {
segment_ref.get().reopen_segment();
ensure(!segment_ref.get().empty(), [&]() {
return "invalid empty snapshot " + segment_ref.get().fs_path().string();
for (auto& entry : data_.segments) {
SegmentFileReader& segment = entry.second;
segment.reopen_segment();
ensure(!segment.empty(), [&]() {
return "invalid empty snapshot " + segment.fs_path().string();
});
}
for (auto& index_ref : indexes()) {
index_ref.get().reopen_index();
for (auto& entry : data_.rec_split_indexes) {
Index& index = entry.second;
index.reopen_index();
}
}

void SnapshotBundle::close() {
for (auto& index_ref : indexes()) {
index_ref.get().close_index();
for (auto& entry : data_.rec_split_indexes) {
Index& index = entry.second;
index.close_index();
}
for (auto& segment_ref : segments()) {
segment_ref.get().close();
for (auto& entry : data_.segments) {
SegmentFileReader& segment = entry.second;
segment.close();
}
if (on_close_callback_) {
on_close_callback_(*this);
}
}

const SegmentFileReader& SnapshotBundle::segment(SnapshotType type) const {
datastore::EntityName name{magic_enum::enum_name(type)};
return data_.segments.at(name);
}

const Index& SnapshotBundle::index(SnapshotType type) const {
datastore::EntityName name{magic_enum::enum_name(type)};
return data_.rec_split_indexes.at(name);
}

std::vector<std::filesystem::path> SnapshotBundle::files() {
std::vector<std::filesystem::path> files;
files.reserve(kSnapshotsCount + kIndexesCount);

for (auto& segment_ref : segments()) {
files.push_back(segment_ref.get().path().path());
for (const SegmentFileReader& segment : segments()) {
files.push_back(segment.path().path());
}
for (auto& index_ref : indexes()) {
files.push_back(index_ref.get().path().path());
for (const Index& index : rec_split_indexes()) {
files.push_back(index.path().path());
}
return files;
}

std::vector<SnapshotPath> SnapshotBundle::segment_paths() {
std::vector<SnapshotPath> paths;
paths.reserve(kSnapshotsCount);

for (auto& segment_ref : segments()) {
paths.push_back(segment_ref.get().path());
for (const SegmentFileReader& segment : segments()) {
paths.push_back(segment.path());
}
return paths;
}
Expand Down
Loading

0 comments on commit f296b66

Please sign in to comment.