Skip to content

Commit

Permalink
bench: add verifySnapshot benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexicon226 committed Sep 30, 2024
1 parent 7b5058d commit 17db117
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 90 deletions.
153 changes: 106 additions & 47 deletions src/accountsdb/db.zig
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ pub const AccountsDB = struct {
break :blk .{ ptr, ptr.allocator() };
} else {
logger.infof("using ram index", .{});
break :blk .{ null, std.heap.page_allocator };
break :blk .{ null, allocator };
}
};
errdefer if (maybe_disk_allocator_ptr) |ptr| {
Expand Down Expand Up @@ -336,7 +336,7 @@ pub const AccountsDB = struct {
return snapshot_fields;
}

/// loads the account files and gernates the account index from a snapshot
/// loads the account files and generates the account index from a snapshot
pub fn loadFromSnapshot(
self: *Self,
/// Account file info map from the snapshot manifest.
Expand All @@ -355,13 +355,13 @@ pub const AccountsDB = struct {
var accounts_dir = try self.snapshot_dir.openDir("accounts", .{});
defer accounts_dir.close();

var timer = try sig.time.Timer.start();

const n_account_files = snapshot_manifest.file_map.count();
self.logger.infof("found {d} account files", .{n_account_files});

std.debug.assert(n_account_files > 0);

var timer = try sig.time.Timer.start();

{
const bhs, var bhs_lg = try self.getOrInitBankHashStats(snapshot_manifest.slot);
defer bhs_lg.unlock();
Expand All @@ -384,12 +384,8 @@ pub const AccountsDB = struct {

// setup the parallel indexing
const use_disk_index = self.config.use_disk_index;
var loading_threads = try ArrayList(AccountsDB).initCapacity(
self.allocator,
n_parse_threads,
);
for (0..n_parse_threads) |_| {
var thread_db = loading_threads.addOneAssumeCapacity();
const loading_threads = try self.allocator.alloc(AccountsDB, n_parse_threads);
for (loading_threads) |*thread_db| {
thread_db.* = try AccountsDB.init(
per_thread_allocator,
.noop, // dont spam the logs with init information (we set it after)
Expand All @@ -409,13 +405,14 @@ pub const AccountsDB = struct {
// at this defer point, there are three memory components we care about
// 1) the account references (AccountRef)
// 2) the hashmap of refs (Map(Pubkey, *AccountRef))
// and 3) the file maps Map(FileId, AccountFile)
// 3) the file maps Map(FileId, AccountFile)
//
// each loading thread will have its own copy of these
// what happens:
// 2) and 3) will be copied into the main index thread and so we can deinit them
// 1) will continue to exist on the heap and its ownership is given
// the the main accounts-db index
for (loading_threads.items) |*loading_thread| {
for (loading_threads) |*loading_thread| {
// NOTE: deinit hashmap, dont close the files
const file_map, var file_map_lg = loading_thread.file_map.writeWithLock();
defer file_map_lg.unlock();
Expand All @@ -424,9 +421,10 @@ pub const AccountsDB = struct {
// NOTE: important `false` (ie, 1)
loading_thread.account_index.deinit(false);
}
loading_threads.deinit();
self.allocator.free(loading_threads);
}

var read_timer = try sig.time.Timer.start();
self.logger.infof("reading and indexing accounts...", .{});
{
var handles = std.ArrayList(std.Thread).init(self.allocator);
Expand All @@ -439,7 +437,7 @@ pub const AccountsDB = struct {
&handles,
loadAndVerifyAccountsFilesMultiThread,
.{
loading_threads.items,
loading_threads,
accounts_dir,
snapshot_manifest.file_map,
accounts_per_file_estimate,
Expand All @@ -448,13 +446,14 @@ pub const AccountsDB = struct {
n_parse_threads,
);
}
self.logger.infof("total time: {s}", .{timer.read()});
self.logger.infof("reading and index accounts took: {}", .{read_timer.read()});

self.logger.infof("combining thread accounts...", .{});
self.logger.infof("merging thread accounts...", .{});
var merge_timer = try sig.time.Timer.start();
try self.mergeMultipleDBs(loading_threads.items, n_combine_threads);
self.logger.debugf("combining thread indexes took: {s}", .{merge_timer.read()});
try self.mergeMultipleDBs(loading_threads, n_combine_threads);
self.logger.debugf("merging thread indexes took: {}", .{merge_timer.read()});

self.logger.infof("total time: {s}", .{timer.read()});
return timer.read();
}

Expand Down Expand Up @@ -555,13 +554,16 @@ pub const AccountsDB = struct {
var accounts_file = blk: {
const file_name_bounded = sig.utils.fmt.boundedFmt("{d}.{d}", .{ slot, file_info.id.toInt() });

const accounts_file_file = accounts_dir.openFile(file_name_bounded.constSlice(), .{ .mode = .read_write }) catch |err| {
self.logger.errf("Failed to open accounts/{s}: {s}", .{ file_name_bounded.constSlice(), @errorName(err) });
const accounts_file = accounts_dir.openFile(file_name_bounded.constSlice(), .{ .mode = .read_write }) catch |err| {
self.logger.errf("Failed to open {s}: {s}", .{
try accounts_dir.realpathAlloc(self.allocator, file_name_bounded.constSlice()),
@errorName(err),
});
return err;
};
errdefer accounts_file_file.close();
errdefer accounts_file.close();

break :blk AccountFile.init(accounts_file_file, file_info, slot) catch |err| {
break :blk AccountFile.init(accounts_file, file_info, slot) catch |err| {
self.logger.errf("failed to *open* AccountsFile {s}: {s}\n", .{ file_name_bounded.constSlice(), @errorName(err) });
return err;
};
Expand Down Expand Up @@ -596,7 +598,7 @@ pub const AccountsDB = struct {
// ! reset memory for the next slot
defer geyser_storage.reset();

const data_versioned = sig.geyser.core.VersionedAccountPayload{
const data_versioned: sig.geyser.core.VersionedAccountPayload = .{
.AccountPayloadV1 = .{
.accounts = geyser_storage.accounts.items,
.pubkeys = geyser_storage.pubkeys.items,
Expand Down Expand Up @@ -695,7 +697,7 @@ pub const AccountsDB = struct {
thread_dbs: []AccountsDB,
n_threads: usize,
) !void {
var handles = std.ArrayList(std.Thread).init(self.allocator);
var handles = try std.ArrayList(std.Thread).initCapacity(self.allocator, n_threads);
defer {
for (handles.items) |*h| h.join();
handles.deinit();
Expand Down Expand Up @@ -859,7 +861,6 @@ pub const AccountsDB = struct {
var timer = try std.time.Timer.start();
const n_threads = @as(u32, @truncate(try std.Thread.getCpuCount())) * 2;

// alloc the result
const hashes = try self.allocator.alloc(ArrayListUnmanaged(Hash), n_threads);
defer self.allocator.free(hashes);

Expand Down Expand Up @@ -894,13 +895,19 @@ pub const AccountsDB = struct {
);
}

self.logger.debugf("took: {s}", .{std.fmt.fmtDuration(timer.read())});
self.logger.debugf(
"collecting hashes from accounts took: {s}",
.{std.fmt.fmtDuration(timer.read())},
);
timer.reset();

self.logger.infof("computing the merkle root over accounts...", .{});
var hash_tree = NestedHashTree{ .hashes = hashes };
const accounts_hash = try hash_tree.computeMerkleRoot(MERKLE_FANOUT);
self.logger.debugf("took {s}", .{std.fmt.fmtDuration(timer.read())});
self.logger.debugf(
"computing the merkle root over accounts took {s}",
.{std.fmt.fmtDuration(timer.read())},
);
timer.reset();

var total_lamports: u64 = 0;
Expand All @@ -914,8 +921,8 @@ pub const AccountsDB = struct {
};
}

/// validates the accounts_db which was loaded from a snapshot (
/// including the accounts hash and total lamports matches the expected values)
/// validates the accounts_db which was loaded from a snapshot
/// (including the accounts hash and total lamports matches the expected values)
pub fn validateLoadFromSnapshot(
self: *Self,
// used to verify the incremental snapshot
Expand Down Expand Up @@ -1080,7 +1087,7 @@ pub const AccountsDB = struct {
.IncrementalAccountHash => Blake3.hash(&key.data, &account_hash.data, .{}),
}
} else {
// hashes arent always stored correctly in snapshots
// hashes aren't always stored correctly in snapshots
if (account_hash.order(&Hash.default()) == .eq) {
const account, var lock_guard = try self.getAccountFromRefWithReadLock(max_slot_ref);
defer lock_guard.unlock();
Expand Down Expand Up @@ -3989,26 +3996,26 @@ pub const BenchmarkAccountsDBSnapshotLoad = struct {

pub const args = [_]BenchArgs{
BenchArgs{
.name = "RAM index",
.name = "RAM index (8 threads)",
.use_disk = false,
.n_threads = 10,
.n_threads = 8,
},
// BenchArgs{
// .use_disk = true,
// .n_threads = 2,
// .name = "DISK (2 threads)",
// .name = "DISK index (2 threads)",
// },
};

pub fn loadSnapshot(bench_args: BenchArgs) !u64 {
const allocator = std.heap.page_allocator;
// NOTE: usually this will be an incremental snapshot
// renamed as a full snapshot (mv {inc-snap-fmt}.tar.zstd {full-snap-fmt}.tar.zstd)
// (because test snapshots are too small and full snapshots are too big)

// unpack the snapshot
// NOTE: usually this will be an incremental snapshot
// renamed as a full snapshot (mv {inc-snap-fmt}.tar.zstd {full-snap-fmt}.tar.zstd)
// (because test snapshots are too small and full snapshots are too big)
const dir_path = sig.TEST_DATA_DIR ++ "bench_snapshot/";
const accounts_path = dir_path ++ "accounts";
const dir_path = sig.TEST_DATA_DIR ++ "bench_snapshot/";
const accounts_path = dir_path ++ "accounts";

pub fn loadSnapshot(bench_args: BenchArgs) !u64 {
const allocator = std.heap.c_allocator;

// const logger = Logger{ .noop = {} };
const logger = Logger.init(allocator, .debug);
Expand All @@ -4019,9 +4026,9 @@ pub const BenchmarkAccountsDBSnapshotLoad = struct {
std.debug.print("need to setup a snapshot in {s} for this benchmark...\n", .{dir_path});
return 0;
};

const snapshot_files = try SnapshotFiles.find(allocator, snapshot_dir);

// unpack the snapshot
std.fs.cwd().access(accounts_path, .{}) catch {
const archive_file = try snapshot_dir.openFile(snapshot_files.full_snapshot.snapshotNameStr().constSlice(), .{});
defer archive_file.close();
Expand All @@ -4030,7 +4037,7 @@ pub const BenchmarkAccountsDBSnapshotLoad = struct {
logger,
archive_file,
snapshot_dir,
try std.Thread.getCpuCount() / 2,
bench_args.n_threads,
true,
);
};
Expand All @@ -4043,7 +4050,7 @@ pub const BenchmarkAccountsDBSnapshotLoad = struct {
.number_of_index_bins = 32,
.use_disk_index = bench_args.use_disk,
}, null);
defer accounts_db.deinit();
defer accounts_db.deinit(true);

var accounts_dir = try std.fs.cwd().openDir(accounts_path, .{ .iterate = true });
defer accounts_dir.close();
Expand All @@ -4057,14 +4064,66 @@ pub const BenchmarkAccountsDBSnapshotLoad = struct {

// sanity check
const accounts_hash, const total_lamports = try accounts_db.computeAccountHashesAndLamports(.{
.FullAccountHash = .{
.max_slot = accounts_db.largest_rooted_slot.load(.monotonic),
},
.FullAccountHash = .{ .max_slot = accounts_db.largest_rooted_slot.load(.monotonic) },
});
std.debug.print("r: hash: {}, lamports: {}\n", .{ accounts_hash, total_lamports });

return duration.asNanos();
}

pub fn verifySnapshot(bench_args: BenchArgs) !u64 {
const allocator = std.heap.c_allocator;
const logger = Logger.init(allocator, .debug);
defer logger.deinit();
logger.spawn();

const snapshot_dir = std.fs.cwd().openDir(dir_path, .{ .iterate = true }) catch {
std.debug.print("need to setup a snapshot in {s} for this benchmark...\n", .{dir_path});
return 0;
};
const snapshot_files = try SnapshotFiles.find(allocator, snapshot_dir);

std.fs.cwd().access(accounts_path, .{}) catch {
const archive_file = try snapshot_dir.openFile(snapshot_files.full_snapshot.snapshotNameStr().constSlice(), .{});
defer archive_file.close();
try parallelUnpackZstdTarBall(
allocator,
logger,
archive_file,
snapshot_dir,
bench_args.n_threads,
true,
);
};

var snapshots = try AllSnapshotFields.fromFiles(
allocator,
logger,
snapshot_dir,
snapshot_files,
);
defer snapshots.deinit(allocator);
const snapshot = try snapshots.collapse();

var accounts_db = try AccountsDB.init(allocator, logger, snapshot_dir, .{
.number_of_index_bins = 32,
.use_disk_index = bench_args.use_disk,
}, null);
defer accounts_db.deinit();

var accounts_dir = try std.fs.cwd().openDir(accounts_path, .{ .iterate = true });
defer accounts_dir.close();

const full_snapshot = snapshots.full;
const validate_duration = try accounts_db.validateLoadFromSnapshot(
snapshot.bank_fields_inc.snapshot_persistence,
full_snapshot.bank_fields.slot,
full_snapshot.bank_fields.capitalization,
snapshot.accounts_db_fields.bank_hash_info.accounts_hash,
);

return validate_duration.asNanos();
}
};

pub const BenchmarkAccountsDB = struct {
Expand Down
14 changes: 6 additions & 8 deletions src/accountsdb/snapshots.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1649,19 +1649,16 @@ pub const SnapshotFiles = struct {

/// finds existing snapshots (full and matching incremental) by looking for .tar.zstd files
pub fn find(allocator: std.mem.Allocator, snapshot_directory: std.fs.Dir) !Self {
const snapshot_dir_iter = snapshot_directory.iterate();

const files = try readDirectory(allocator, snapshot_dir_iter);
var filenames = files.filenames;
const files = try readDirectory(allocator, snapshot_directory);
defer {
filenames.deinit();
allocator.free(files.filename_memory);
for (files) |file| allocator.free(file);
allocator.free(files);
}

// find the snapshots
var maybe_latest_full_snapshot: ?FullSnapshotFileInfo = null;
var count: usize = 0;
for (filenames.items) |filename| {
for (files) |filename| {
const snapshot = FullSnapshotFileInfo.fromString(filename) catch continue;
if (count == 0 or snapshot.slot > maybe_latest_full_snapshot.?.slot) {
maybe_latest_full_snapshot = snapshot;
Expand All @@ -1672,7 +1669,7 @@ pub const SnapshotFiles = struct {

count = 0;
var maybe_latest_incremental_snapshot: ?IncrementalSnapshotFileInfo = null;
for (filenames.items) |filename| {
for (files) |filename| {
const snapshot = IncrementalSnapshotFileInfo.fromString(filename) catch continue;
// need to match the base slot
if (snapshot.base_slot == latest_full_snapshot.slot and (count == 0 or
Expand Down Expand Up @@ -1771,6 +1768,7 @@ pub const AllSnapshotFields = struct {
// TODO: use a better allocator
const allocator = storages_map.allocator;
var slots_to_remove = std.ArrayList(Slot).init(allocator);
defer slots_to_remove.deinit();

// make sure theres no overlap in slots between full and incremental and combine
var storages_entry_iter = storages_map.iterator();
Expand Down
Loading

0 comments on commit 17db117

Please sign in to comment.