Skip to content

Commit

Permalink
Initial implementation of leader schedule cache
Browse files Browse the repository at this point in the history
  • Loading branch information
yewman committed Sep 10, 2024
1 parent b4e53c8 commit 3d5732d
Show file tree
Hide file tree
Showing 8 changed files with 450 additions and 278 deletions.
71 changes: 1 addition & 70 deletions src/accountsdb/genesis_config.zig
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const Account = sig.core.Account;
const Pubkey = sig.core.Pubkey;
const Slot = sig.core.Slot;
const Epoch = sig.core.Epoch;
const EpochSchedule = sig.core.EpochSchedule;

pub const UnixTimestamp = i64;
pub const String = std.ArrayList(u8);
Expand Down Expand Up @@ -124,76 +125,6 @@ pub const Inflation = struct {
}
};

/// Analogous to [EpochSchedule](https://github.com/anza-xyz/agave/blob/5a9906ebf4f24cd2a2b15aca638d609ceed87797/sdk/program/src/epoch_schedule.rs#L35)
pub const EpochSchedule = extern struct {
/// The maximum number of slots in each epoch.
slots_per_epoch: u64,

/// A number of slots before beginning of an epoch to calculate
/// a leader schedule for that epoch.
leader_schedule_slot_offset: u64,

/// Whether epochs start short and grow.
warmup: bool,

/// The first epoch after the warmup period.
///
/// Basically: `log2(slots_per_epoch) - log2(MINIMUM_SLOTS_PER_EPOCH)`.
first_normal_epoch: Epoch,

/// The first slot after the warmup period.
///
/// Basically: `MINIMUM_SLOTS_PER_EPOCH * (2.pow(first_normal_epoch) - 1)`.
first_normal_slot: Slot,

pub fn getEpoch(self: *const EpochSchedule, slot: Slot) Epoch {
return self.getEpochAndSlotIndex(slot)[0];
}

pub fn getEpochAndSlotIndex(self: *const EpochSchedule, slot: Slot) struct { Epoch, Slot } {
if (slot < self.first_normal_slot) {
var epoch = slot +| MINIMUM_SLOTS_PER_EPOCH +| 1;
epoch = @ctz(std.math.ceilPowerOfTwo(u64, epoch) catch {
std.debug.panic("failed to ceil power of two: {d}", .{epoch});
}) -| @ctz(MINIMUM_SLOTS_PER_EPOCH) -| 1;

const exponent = epoch +| @ctz(MINIMUM_SLOTS_PER_EPOCH);
const epoch_len = std.math.powi(u64, 2, exponent) catch std.math.maxInt(u64);

const slot_index = slot -| (epoch_len -| MINIMUM_SLOTS_PER_EPOCH);

return .{ epoch, slot_index };
} else {
const normal_slot_index = slot -| self.first_normal_slot;
const normal_epoch_index = std.math.divTrunc(u64, normal_slot_index, self.slots_per_epoch) catch 0;

const epoch = self.first_normal_epoch +| normal_epoch_index;
const slot_index = std.math.rem(u64, normal_slot_index, self.slots_per_epoch) catch 0;

return .{ epoch, slot_index };
}
}

/// get the length of the given epoch (in slots)
pub fn getSlotsInEpoch(self: *const EpochSchedule, epoch: Epoch) Slot {
comptime std.debug.assert(std.math.isPowerOfTwo(MINIMUM_SLOTS_PER_EPOCH));
return if (epoch < self.first_normal_epoch)
@as(Slot, 1) <<| epoch +| @ctz(MINIMUM_SLOTS_PER_EPOCH)
else
self.slots_per_epoch;
}

pub fn random(rand: std.Random) EpochSchedule {
return .{
.slots_per_epoch = rand.int(u64),
.leader_schedule_slot_offset = rand.int(u64),
.warmup = rand.boolean(),
.first_normal_epoch = rand.int(Epoch),
.first_normal_slot = rand.int(Slot),
};
}
};

/// Analogous to [ClusterType](https://github.com/anza-xyz/agave/blob/cadba689cb44db93e9c625770cafd2fc0ae89e33/sdk/src/genesis_config.rs#L46)
pub const ClusterType = enum(u8) {
Testnet,
Expand Down
7 changes: 6 additions & 1 deletion src/accountsdb/snapshots.zig
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const Slot = sig.core.time.Slot;

const FileId = sig.accounts_db.accounts_file.FileId;

const EpochSchedule = sig.accounts_db.genesis_config.EpochSchedule;
const EpochSchedule = sig.core.EpochSchedule;
const FeeRateGovernor = sig.accounts_db.genesis_config.FeeRateGovernor;
const Inflation = sig.accounts_db.genesis_config.Inflation;
const Rent = sig.accounts_db.genesis_config.Rent;
Expand Down Expand Up @@ -889,6 +889,11 @@ pub const BankFields = struct {
.is_delta = rand.boolean(),
};
}

pub fn getStakedNodes(self: *const BankFields, allocator: std.mem.Allocator, epoch: Epoch) !*const std.AutoArrayHashMapUnmanaged(Pubkey, u64) {
const epoch_stakes = self.epoch_stakes.getPtr(epoch) orelse return error.NoEpochStakes;
return epoch_stakes.stakes.vote_accounts.stakedNodes(allocator);
}
};

/// Analogous to [SerializableAccountStorageEntry](https://github.com/anza-xyz/agave/blob/cadba689cb44db93e9c625770cafd2fc0ae89e33/runtime/src/serde_snapshot/storage.rs#L11)
Expand Down
43 changes: 28 additions & 15 deletions src/cmd/cmd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,20 @@ const IpAddr = sig.net.IpAddr;
const Logger = sig.trace.Logger;
const Pubkey = sig.core.Pubkey;
const ShredCollectorDependencies = sig.shred_collector.ShredCollectorDependencies;
const SingleEpochLeaderSchedule = sig.core.leader_schedule.SingleEpochLeaderSchedule;
const LeaderSchedule = sig.core.leader_schedule.LeaderSchedule;
const SnapshotFiles = sig.accounts_db.SnapshotFiles;
const SocketAddr = sig.net.SocketAddr;
const StatusCache = sig.accounts_db.StatusCache;
const EpochSchedule = sig.core.EpochSchedule;
const LeaderScheduleCache = sig.core.leader_schedule.LeaderScheduleCache;

const downloadSnapshotsFromGossip = sig.accounts_db.downloadSnapshotsFromGossip;
const getOrInitIdentity = helpers.getOrInitIdentity;
const globalRegistry = sig.prometheus.globalRegistry;
const getWallclockMs = sig.time.getWallclockMs;
const leaderScheduleFromBank = sig.core.leader_schedule.leaderScheduleFromBank;
const parallelUnpackZstdTarBall = sig.accounts_db.parallelUnpackZstdTarBall;
const parseLeaderSchedule = sig.core.leader_schedule.parseLeaderSchedule;
const requestIpEcho = sig.net.requestIpEcho;
const spawnMetrics = sig.prometheus.spawnMetrics;
const writeLeaderSchedule = sig.core.leader_schedule.writeLeaderSchedule;

const BlockstoreReader = sig.ledger.BlockstoreReader;
const BlockstoreWriter = sig.ledger.BlockstoreWriter;
Expand Down Expand Up @@ -639,10 +638,16 @@ fn validator() !void {
geyser_writer,
);

// leader schedule
var leader_schedule = try getLeaderScheduleFromCli(allocator) orelse
try leaderScheduleFromBank(allocator, snapshot.bank.bank_fields);
const leader_provider = leader_schedule.provider();
// leader schedule cache
var leader_schedule_cache = LeaderScheduleCache.init(allocator, snapshot.bank.bank_fields.epoch_schedule);
if (try getLeaderScheduleFromCli(allocator) orelse null) |leader_schedule| {
try leader_schedule_cache.insertLeaderSchedule(snapshot.bank.bank_fields.epoch, leader_schedule);
} else {
_ = try leader_schedule_cache.getSlotLeaderMaybeCompute(snapshot.bank.bank_fields.slot, snapshot.bank.bank_fields);
}
// This provider will fail at epoch boundary unless another thread updated the leader schedule cache
// i.e. called leader_schedule_cache.getSlotLeaderMaybeCompute(slot, bank_fields);
const leader_provider = leader_schedule_cache.getSlotLeaderProvider();

// blockstore
const blockstore_db = try sig.ledger.BlockstoreDB.open(
Expand Down Expand Up @@ -739,8 +744,16 @@ fn shredCollector() !void {

// leader schedule
// NOTE: leader schedule is needed for the shred collector because we skip accounts-db setup
var leader_schedule = try getLeaderScheduleFromCli(allocator) orelse @panic("No leader schedule found");
const leader_provider = leader_schedule.provider();
var leader_schedule_cache = LeaderScheduleCache.init(allocator, try EpochSchedule.default());

// This is a sort of hack to get the epoch of the leader schedule and then insert into the cache
// We should aim to use the leader schedule cache instead of the leader schedule since the later
// cannot transition between epochs.
const leader_schedule = try getLeaderScheduleFromCli(allocator) orelse @panic("No leader schedule found");
const leader_schedule_epoch = leader_schedule_cache.epoch_schedule.getEpoch(leader_schedule.first_slot.?); // first_slot is non null iff leader schedule is built from cli
try leader_schedule_cache.insertLeaderSchedule(leader_schedule_epoch, leader_schedule);

const leader_provider = leader_schedule_cache.getSlotLeaderProvider();

// blockstore
const blockstore_db = try sig.ledger.BlockstoreDB.open(
Expand Down Expand Up @@ -964,20 +977,20 @@ fn printLeaderSchedule() !void {
return err;
}
};
break :b try leaderScheduleFromBank(allocator, loaded_snapshot.bank.bank_fields);
break :b try LeaderSchedule.fromBank(allocator, loaded_snapshot.bank.bank_fields.epoch, loaded_snapshot.bank.bank_fields);
};

var stdout = std.io.bufferedWriter(std.io.getStdOut().writer());
try writeLeaderSchedule(leader_schedule, stdout.writer());
try leader_schedule.write(stdout.writer(), leader_schedule.first_slot.?);
try stdout.flush();
}

fn getLeaderScheduleFromCli(allocator: Allocator) !?SingleEpochLeaderSchedule {
fn getLeaderScheduleFromCli(allocator: Allocator) !?LeaderSchedule {
return if (config.current.leader_schedule_path) |path|
if (std.mem.eql(u8, "--", path))
try parseLeaderSchedule(allocator, std.io.getStdIn().reader())
try LeaderSchedule.read(allocator, std.io.getStdIn().reader())
else
try parseLeaderSchedule(allocator, (try std.fs.cwd().openFile(path, .{})).reader())
try LeaderSchedule.read(allocator, (try std.fs.cwd().openFile(path, .{})).reader())
else
null;
}
Expand Down
111 changes: 111 additions & 0 deletions src/core/epoch_schedule.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
const std = @import("std");
const core = @import("lib.zig");

const Epoch = core.Epoch;
const Slot = core.Slot;

const DEFAULT_SLOTS_PER_EPOCH = core.time.DEFAULT_SLOTS_PER_EPOCH;

/// The default number of slots before an epoch starts to calculate the leader schedule.
pub const DEFAULT_LEADER_SCHEDULE_SLOT_OFFSET: u64 = DEFAULT_SLOTS_PER_EPOCH;

/// The minimum number of slots per epoch during the warmup period.
///
/// Based on `MAX_LOCKOUT_HISTORY` from `vote_program`.
pub const MINIMUM_SLOTS_PER_EPOCH: u64 = 32;

/// Analogous to [EpochSchedule](https://github.com/anza-xyz/agave/blob/5a9906ebf4f24cd2a2b15aca638d609ceed87797/sdk/program/src/epoch_schedule.rs#L35)
pub const EpochSchedule = extern struct {
/// The maximum number of slots in each epoch.
slots_per_epoch: u64,

/// A number of slots before beginning of an epoch to calculate
/// a leader schedule for that epoch.
leader_schedule_slot_offset: u64,

/// Whether epochs start short and grow.
warmup: bool,

/// The first epoch after the warmup period.
///
/// Basically: `log2(slots_per_epoch) - log2(MINIMUM_SLOTS_PER_EPOCH)`.
first_normal_epoch: core.Epoch,

/// The first slot after the warmup period.
///
/// Basically: `MINIMUM_SLOTS_PER_EPOCH * (2.pow(first_normal_epoch) - 1)`.
first_normal_slot: core.Slot,

pub fn getEpoch(self: *const EpochSchedule, slot: Slot) Epoch {
return self.getEpochAndSlotIndex(slot)[0];
}

pub fn getEpochAndSlotIndex(self: *const EpochSchedule, slot: Slot) struct { Epoch, usize } {
if (slot < self.first_normal_slot) {
var epoch = slot +| MINIMUM_SLOTS_PER_EPOCH +| 1;
epoch = @ctz(std.math.ceilPowerOfTwo(u64, epoch) catch {
std.debug.panic("failed to ceil power of two: {d}", .{epoch});
}) -| @ctz(MINIMUM_SLOTS_PER_EPOCH) -| 1;

const exponent = epoch +| @ctz(MINIMUM_SLOTS_PER_EPOCH);
const epoch_len = std.math.powi(u64, 2, exponent) catch std.math.maxInt(u64);

const slot_index = slot -| (epoch_len -| MINIMUM_SLOTS_PER_EPOCH);

return .{ epoch, slot_index };
} else {
const normal_slot_index = slot -| self.first_normal_slot;
const normal_epoch_index = std.math.divTrunc(u64, normal_slot_index, self.slots_per_epoch) catch 0;

const epoch = self.first_normal_epoch +| normal_epoch_index;
const slot_index = std.math.rem(u64, normal_slot_index, self.slots_per_epoch) catch 0;

return .{ epoch, @intCast(slot_index) };
}
}

pub fn getSlotsInEpoch(self: *const EpochSchedule, epoch: Epoch) Slot {
comptime std.debug.assert(std.math.isPowerOfTwo(MINIMUM_SLOTS_PER_EPOCH));
return if (epoch < self.first_normal_epoch)
@as(Slot, 1) <<| epoch +| @ctz(MINIMUM_SLOTS_PER_EPOCH)
else
self.slots_per_epoch;
}

pub fn default() !EpochSchedule {
return EpochSchedule.custom(
DEFAULT_SLOTS_PER_EPOCH,
DEFAULT_LEADER_SCHEDULE_SLOT_OFFSET,
true,
);
}

pub fn custom(slots_per_epoch: u64, leader_schedule_slot_offset: u64, warmup: bool) !EpochSchedule {
std.debug.assert(slots_per_epoch > MINIMUM_SLOTS_PER_EPOCH);
var first_normal_epoch: Epoch = 0;
var first_normal_slot: Slot = 0;
if (warmup) {
const next_power_of_two = try std.math.ceilPowerOfTwo(u64, slots_per_epoch);
const log2_slots_per_epoch = @clz(next_power_of_two) -| @clz(MINIMUM_SLOTS_PER_EPOCH);
first_normal_epoch = @intCast(log2_slots_per_epoch);
first_normal_slot = next_power_of_two -| MINIMUM_SLOTS_PER_EPOCH;
}
return .{
.slots_per_epoch = slots_per_epoch,
.leader_schedule_slot_offset = leader_schedule_slot_offset,
.warmup = warmup,
.first_normal_epoch = first_normal_epoch,
.first_normal_slot = first_normal_slot,
};
}

pub fn random(rand: std.Random) EpochSchedule {
return .{
.slots_per_epoch = rand.int(u64),
.leader_schedule_slot_offset = rand.int(u64),
.warmup = rand.boolean(),
.first_normal_epoch = rand.int(Epoch),
.first_normal_slot = rand.int(Slot),
};
}
};
Loading

0 comments on commit 3d5732d

Please sign in to comment.