Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blockstore): benchmarks #275

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ pub fn build(b: *Build) void {
benchmark_exe.root_module.addImport("zig-network", zig_network_module);
benchmark_exe.root_module.addImport("httpz", httpz_mod);
benchmark_exe.root_module.addImport("zstd", zstd_mod);
benchmark_exe.root_module.addImport("rocksdb", rocksdb_mod);
benchmark_exe.linkLibC();

const benchmark_exe_run = b.addRunArtifact(benchmark_exe);
Expand Down
Binary file not shown.
Binary file not shown.
8 changes: 8 additions & 0 deletions src/benchmarks.zig
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ pub fn main() !void {
TimeUnits.microseconds,
);
}

if (std.mem.startsWith(u8, filter, "ledger") or run_all_benchmarks) {
try benchmark(
@import("ledger/tests.zig").BenchmarLegger,
max_time_per_bench,
TimeUnits.microseconds,
);
}
}

const TimeUnits = enum {
Expand Down
1 change: 0 additions & 1 deletion src/ledger/rocksdb.zig
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ pub fn RocksDB(comptime column_families: []const ColumnFamily) type {
defer key_bytes.deinit();
const val_bytes = try value_serializer.serializeToRef(self.allocator, value);
defer val_bytes.deinit();

self.inner.put(
self.cf_handles[cf.find(column_families)],
key_bytes.data,
Expand Down
189 changes: 189 additions & 0 deletions src/ledger/tests.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const std = @import("std");
const sig = @import("../sig.zig");
const ledger = @import("lib.zig");
const transaction_status = @import("./transaction_status.zig");

const Allocator = std.mem.Allocator;

Expand All @@ -12,6 +13,10 @@ const Shred = ledger.shred.Shred;
const Slot = sig.core.Slot;
const SlotMeta = ledger.meta.SlotMeta;
const VersionedTransactionWithStatusMeta = ledger.reader.VersionedTransactionWithStatusMeta;
const Reward = transaction_status.Reward;
const Rewards = transaction_status.Rewards;
const RewardType = transaction_status.RewardType;
const Pubkey = sig.core.Pubkey;

const comptimePrint = std.fmt.comptimePrint;

Expand Down Expand Up @@ -252,6 +257,13 @@ fn testShreds(comptime filename: []const u8) ![]const Shred {
return loadShredsFromFile(std.testing.allocator, path);
}

/// Differs from testShreds in that it uses the std.heap.c_allocator instead
/// of the test allocator.
fn benchShreds(comptime filename: []const u8) ![]const Shred {
const path = comptimePrint("{s}/{s}", .{ test_shreds_dir, filename });
return loadShredsFromFile(std.heap.c_allocator, path);
}

/// Read shreds from binary file structured like this:
/// [shred0_len: u64(little endian)][shred0_payload][shred1_len...
///
Expand Down Expand Up @@ -321,6 +333,18 @@ pub fn deinitShreds(allocator: Allocator, shreds: []const Shred) void {

/// Read entries from binary file structured like this:
/// [entry0_len: u64(little endian)][entry0_bincode][entry1_len...
///
/// loadEntriesFromFile can read entries produced by this rust function:
/// ```rust
/// fn save_entries_to_file(shreds: &[Entry], path: &str) {
/// let mut file = std::fs::File::create(path).unwrap();
/// for entry in &entries {
/// let payload = bincode::serialize(&entry).unwrap();
/// file.write(&payload.len().to_le_bytes()).unwrap();
/// file.write(&*payload).unwrap();
/// }
/// }
/// ```
pub fn loadEntriesFromFile(allocator: Allocator, path: []const u8) ![]const Entry {
const file = try std.fs.cwd().openFile(path, .{});
const reader = file.reader();
Expand All @@ -336,6 +360,22 @@ pub fn loadEntriesFromFile(allocator: Allocator, path: []const u8) ![]const Entr
return entries.toOwnedSlice();
}

fn create_rewards(allocator: std.mem.Allocator, count: usize) !Rewards {
var rng = std.Random.DefaultPrng.init(100);
const rand = rng.random();
var rewards: Rewards = Rewards.init(allocator);
for (0..count) |i| {
try rewards.append(Reward{
.pubkey = &Pubkey.random(rand).data,
.lamports = @intCast(42 + i),
.post_balance = std.math.maxInt(u64),
.reward_type = RewardType.Fee,
.commission = null,
});
}
return rewards;
}

const State = TestState("global");
const DB = TestDB("global");

Expand Down Expand Up @@ -370,6 +410,20 @@ pub fn TestState(scope: []const u8) type {
return self;
}

/// Differs from init in that it uses the std.heap.c_allocator instead
/// of the test allocator.
pub fn initBench(comptime test_name: []const u8) !*Self {
const self = try _allocator.create(Self);
self.* = .{
.db = try TestDB(scope).initCustom(_allocator, test_name),
.registry = sig.prometheus.Registry(.{}).init(_allocator),
.lowest_cleanup_slot = sig.sync.RwMux(Slot).init(0),
.max_root = std.atomic.Value(Slot).init(0),
._leak_check = try std.heap.c_allocator.create(u8),
};
return self;
}

pub fn allocator(_: Self) Allocator {
return _allocator;
}
Expand Down Expand Up @@ -407,6 +461,14 @@ pub fn TestState(scope: []const u8) type {
_allocator.destroy(self);
_ = gpa.detectLeaks();
}

pub fn deinitBench(self: *Self) void {
self.db.deinit();
self.registry.deinit();
std.heap.c_allocator.destroy(self._leak_check);
_allocator.destroy(self);
_ = gpa.detectLeaks();
}
};
}

Expand All @@ -425,3 +487,130 @@ pub fn TestDB(scope: []const u8) type {
}
};
}

pub const BenchmarLegger = struct {
pub const min_iterations = 5;
pub const max_iterations = 5;

// Analogous to [bench_write_small](https://github.com/anza-xyz/agave/blob/cfd393654f84c36a3c49f15dbe25e16a0269008d/ledger/benches/blockstore.rs#L59)
pub fn benchWriteSmall() !u64 {
const allocator = std.heap.c_allocator;
var state = try State.initBench("bench write small");
defer state.deinitBench();
var inserter = try state.shredInserter();

const prefix = "agave.blockstore.bench_write_small.";
const shreds = try benchShreds(prefix ++ "shreds.bin");
defer inline for (.{shreds}) |slice| {
deinitShreds(allocator, slice);
};

const is_repairs = try inserter.allocator.alloc(bool, shreds.len);
defer inserter.allocator.free(is_repairs);
for (0..shreds.len) |i| {
is_repairs[i] = false;
}

var timer = try std.time.Timer.start();
_ = try inserter.insertShreds(shreds, is_repairs, null, false, null);
return timer.read();
}

// Analogous to [bench_read_sequential]https://github.com/anza-xyz/agave/blob/cfd393654f84c36a3c49f15dbe25e16a0269008d/ledger/benches/blockstore.rs#L78
pub fn benchReadSequential() !u64 {
const allocator = std.heap.c_allocator;
var state = try State.initBench("bentch read sequential");
defer state.deinitBench();
var inserter = try state.shredInserter();
var reader = try state.reader();

const prefix = "agave.blockstore.bench_read.";
const shreds = try benchShreds(prefix ++ "shreds.bin");
defer inline for (.{shreds}) |slice| {
deinitShreds(allocator, slice);
};

const total_shreds = shreds.len;

_ = try ledger.insert_shred.insertShredsForTest(&inserter, shreds);

const slot: u32 = 0;
const num_reads = total_shreds / 15;

var rng = std.Random.DefaultPrng.init(100);

var timer = try std.time.Timer.start();
const start_index = rng.random().intRangeAtMost(u32, 0, @intCast(total_shreds));
for (start_index..start_index + num_reads) |i| {
const shred_index = i % total_shreds;
_ = try reader.getDataShred(slot, shred_index);
}
return timer.read();
}

// Analogous to [bench_read_random]https://github.com/anza-xyz/agave/blob/92eca1192b055d896558a78759d4e79ab4721ff1/ledger/benches/blockstore.rs#L103
pub fn benchReadRandom() !u64 {
const allocator = std.heap.c_allocator;
var state = try State.initBench("bench read randmom");
defer state.deinitBench();
var inserter = try state.shredInserter();
var reader = try state.reader();

const prefix = "agave.blockstore.bench_read.";
const shreds = try benchShreds(prefix ++ "shreds.bin");
defer inline for (.{shreds}) |slice| {
deinitShreds(allocator, slice);
};

const total_shreds = shreds.len;
_ = try ledger.insert_shred.insertShredsForTest(&inserter, shreds);

const slot: u32 = 0;

var rng = std.Random.DefaultPrng.init(100);

var indices = try std.ArrayList(u32).initCapacity(inserter.allocator, total_shreds);
defer indices.deinit();
for (total_shreds) |_| {
indices.appendAssumeCapacity(rng.random().uintAtMost(u32, @intCast(total_shreds)));
}

var timer = try std.time.Timer.start();
for (indices.items) |shred_index| {
_ = try reader.getDataShred(slot, shred_index);
}
return timer.read();
}

// Analogous to [bench_serialize_write_bincode](https://github.com/anza-xyz/agave/blob/9c2098450ca7e5271e3690277992fbc910be27d0/ledger/benches/protobuf.rs#L88)
pub fn benchSerializeWriteBincode() !u64 {
const allocator = std.heap.c_allocator;
var state = try State.initBench("bench serialize write bincode");
defer state.deinitBench();
const slot: u32 = 0;

var rewards: Rewards = try create_rewards(allocator, 100);
var timer = try std.time.Timer.start();
try state.db.put(schema.rewards, slot, .{
.rewards = try rewards.toOwnedSlice(),
.num_partitions = null,
});
return timer.read();
}

pub fn benchReadBincode() !u64 {
const allocator = std.heap.c_allocator;
var state = try State.initBench("bench read bincode");
defer state.deinitBench();
const slot: u32 = 1;

var rewards: Rewards = try create_rewards(allocator, 100);
try state.db.put(schema.rewards, slot, .{
.rewards = try rewards.toOwnedSlice(),
.num_partitions = null,
});
var timer = try std.time.Timer.start();
_ = try state.db.getBytes(schema.rewards, slot);
return timer.read();
}
};
Loading