Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update state adv. timer for tree-states #4415

Closed
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
.max_by_key(|f| f.finalized_checkpoint.epoch);

// Do a bit of state reconstruction first if required.
if let Some(_) = reconstruction_notif {
if reconstruction_notif.is_some() {
let timer = std::time::Instant::now();

match db.reconstruct_historic_states(Some(BLOCKS_PER_RECONSTRUCTION)) {
Expand Down
209 changes: 124 additions & 85 deletions beacon_node/beacon_chain/src/state_advance_timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@ use std::sync::{
};
use task_executor::TaskExecutor;
use tokio::time::{sleep, sleep_until, Instant};
use types::{AttestationShufflingId, BeaconStateError, EthSpec, Hash256, RelativeEpoch, Slot};
use types::{
AttestationShufflingId, BeaconBlockRef, BeaconState, BeaconStateError, EthSpec, Hash256,
RelativeEpoch, Slot,
};

/// If the head slot is more than `MAX_ADVANCE_DISTANCE` from the current slot, then don't perform
/// the state advancement.
///
/// This avoids doing unnecessary work whilst the node is syncing or has perhaps been put to sleep
/// for some period of time.
const MAX_ADVANCE_DISTANCE: u64 = 4;
const MAX_ADVANCE_DISTANCE: u64 = 16;

/// Similarly for fork choice: avoid the fork choice lookahead during sync.
///
Expand All @@ -43,23 +46,18 @@ const MAX_ADVANCE_DISTANCE: u64 = 4;
/// impact whilst having 8 epochs without a block is a comfortable grace period.
const MAX_FORK_CHOICE_DISTANCE: u64 = 256;

enum AdvanceStateOutcome {
// A state advance was performed.
StateAdvanced,
// A state advance was deemed unnecessary and was not performed.
Noop,
}

#[derive(Debug)]
enum Error {
BeaconChain(BeaconChainError),
BeaconState(BeaconStateError),
Store(store::Error),
HeadMissingFromSnapshotCache(Hash256),
MaxDistanceExceeded {
current_slot: Slot,
head_slot: Slot,
},
StateAlreadyAdvanced {
block_root: Hash256,
},
BadStateSlot {
_state_slot: Slot,
_current_slot: Slot,
},
}

impl From<BeaconChainError> for Error {
Expand Down Expand Up @@ -135,9 +133,9 @@ async fn state_advance_timer<T: BeaconChainTypes>(
}
};

// Run the state advance 3/4 of the way through the slot (9s on mainnet).
// Start running state advance functions 3/4 of the way through the slot (9s on mainnet).
let state_advance_offset = slot_duration / 4;
let state_advance_instant = if duration_to_next_slot > state_advance_offset {
let state_advance_window_start = if duration_to_next_slot > state_advance_offset {
Instant::now() + duration_to_next_slot - state_advance_offset
} else {
// Skip the state advance for the current slot and wait until the next one.
Expand All @@ -153,8 +151,17 @@ async fn state_advance_timer<T: BeaconChainTypes>(
Instant::now() + duration_to_next_slot + slot_duration - fork_choice_offset
};

// Stop running state advance functions 500ms before we start to run
// fork choice. Although we don't *start* state advance functions before
// running fork choice, long-running instances might still be going.
// 500ms is a buffer to ensure the two are separate.
let state_advance_window_end = fork_choice_instant
.checked_sub(slot_duration / (FORK_CHOICE_LOOKAHEAD_FACTOR - 1))
// Just default to the `fork_choice_instant` if things will overflow.
.unwrap_or(fork_choice_instant);

// Wait for the state advance.
sleep_until(state_advance_instant).await;
sleep_until(state_advance_window_start).await;

// Compute the current slot here at approx 3/4 through the slot. Even though this slot is
// only used by fork choice we need to calculate it here rather than after the state
Expand All @@ -181,32 +188,14 @@ async fn state_advance_timer<T: BeaconChainTypes>(

executor.spawn_blocking(
move || {
match advance_head(&beacon_chain, &log) {
match advance_heads(&beacon_chain, current_slot, state_advance_window_end, &log)
{
Ok(()) => (),
Err(Error::BeaconChain(e)) => error!(
Err(e) => error!(
log,
"Failed to advance head state";
"error" => ?e
),
Err(Error::StateAlreadyAdvanced { block_root }) => debug!(
log,
"State already advanced on slot";
"block_root" => ?block_root
),
Err(Error::MaxDistanceExceeded {
current_slot,
head_slot,
}) => debug!(
log,
"Refused to advance head state";
"head_slot" => head_slot,
"current_slot" => current_slot,
),
other => warn!(
log,
"Did not advance head state";
"reason" => ?other
),
};

// Permit this blocking task to spawn again, next time the timer fires.
Expand Down Expand Up @@ -275,59 +264,108 @@ async fn state_advance_timer<T: BeaconChainTypes>(
}
}

fn advance_head<T: BeaconChainTypes>(
fn advance_heads<T: BeaconChainTypes>(
beacon_chain: &Arc<BeaconChain<T>>,
current_slot: Slot,
state_advance_window_end: Instant,
log: &Logger,
) -> Result<(), Error> {
let current_slot = beacon_chain.slot()?;
let head_snapshot = beacon_chain.head_snapshot();

// These brackets ensure that the `head_slot` value is dropped before we run fork choice and
// potentially invalidate it.
// Prune all advanced states, except for those that descend from the head.
//
// Fork-choice is not run *before* this function to avoid unnecessary calls whilst syncing.
{
let head_slot = beacon_chain.best_slot();

// Don't run this when syncing or if lagging too far behind.
if head_slot + MAX_ADVANCE_DISTANCE < current_slot {
return Err(Error::MaxDistanceExceeded {
current_slot,
head_slot,
});
// Note: it's important to ensure that any states advanced in this function
// are included in this list to prevent them being pruned.
beacon_chain
.store
.retain_advanced_states(&[head_snapshot.beacon_block_root]);

// Try to advance the head state to the current slot. Always attempt to
// advance it one slot, then only keep attempting more advances while
// there's still time left in the state advance window.
for i in 0..MAX_ADVANCE_DISTANCE {
// Stop advancing the state if the head block has changed.
if beacon_chain.head_snapshot().beacon_block_root != head_snapshot.beacon_block_root {
break;
}

// Advance the state of the block at the head of the chain.
let outcome = advance_state(
beacon_chain,
current_slot,
head_snapshot.beacon_block_root,
head_snapshot.beacon_block.message(),
&head_snapshot.beacon_state,
log,
)?;

debug!(
log,
"Advanced head state";
"iteration" => i
);

match outcome {
// The state was advanced and there's till time left. Try to advance
// the head state again.
AdvanceStateOutcome::StateAdvanced if Instant::now() < state_advance_window_end => (),
// The state was advanced and there's not enough time left, or there
// was no state advance performed. Break and try to do something
// else.
AdvanceStateOutcome::StateAdvanced | AdvanceStateOutcome::Noop => break,
}
}

let (head_block_root, head_block_state_root) = {
let snapshot = beacon_chain.head_snapshot();
(snapshot.beacon_block_root, snapshot.beacon_state_root())
};
Ok(())
}

let (head_state_root, mut state) = beacon_chain
fn advance_state<T: BeaconChainTypes>(
beacon_chain: &Arc<BeaconChain<T>>,
current_slot: Slot,
block_root: Hash256,
block: BeaconBlockRef<T::EthSpec>,
state: &BeaconState<T::EthSpec>,
log: &Logger,
) -> Result<AdvanceStateOutcome, Error> {
let (existing_state_root, existing_state) = beacon_chain
.store
.get_advanced_state(head_block_root, current_slot, head_block_state_root)?
.ok_or(Error::HeadMissingFromSnapshotCache(head_block_root))?;

if state.slot() == current_slot + 1 {
return Err(Error::StateAlreadyAdvanced {
block_root: head_block_root,
});
} else if state.slot() != current_slot {
// Protect against advancing a state more than a single slot.
//
// Advancing more than one slot without storing the intermediate state would corrupt the
// database. Future works might store temporary, intermediate states inside this function.
return Err(Error::BadStateSlot {
_state_slot: state.slot(),
_current_slot: current_slot,
});
.get_advanced_state_cached_only(block_root, current_slot)
.unwrap_or_else(|| (block.state_root(), state.clone()));
let existing_slot = existing_state.slot();

// There's nothing to do if the state is already at the current slot.
if existing_slot >= current_slot {
debug!(
log,
"State advance unnecessary";
"info" => "state already_advanced",
"state_root" => ?existing_state_root,
"block_root" => ?block_root,
"state_slot" => ?existing_slot,
"current_slot" => ?current_slot,
);
return Ok(AdvanceStateOutcome::Noop);
}

// Don't run this when syncing or if lagging too far behind.
if existing_slot + MAX_ADVANCE_DISTANCE < current_slot {
debug!(
log,
"Refused to advance state";
"existing_slot" => existing_slot,
"current_slot" => current_slot,
);
return Ok(AdvanceStateOutcome::Noop);
}

// Re-assign the `state` variable to prevent confusion between the `state` and `existing_state`.
let mut state = existing_state;
let initial_slot = state.slot();
let initial_epoch = state.current_epoch();

// Advance the state a single slot.
if let Some(summary) =
per_slot_processing(&mut state, Some(head_state_root), &beacon_chain.spec)
per_slot_processing(&mut state, Some(existing_state_root), &beacon_chain.spec)
.map_err(BeaconChainError::from)?
{
// Expose Prometheus metrics.
Expand Down Expand Up @@ -361,8 +399,8 @@ fn advance_head<T: BeaconChainTypes>(

debug!(
log,
"Advanced head state one slot";
"head_block_root" => ?head_block_root,
"Advanced a state by one slot";
"block_root" => ?block_root,
"state_slot" => state.slot(),
"current_slot" => current_slot,
);
Expand All @@ -388,7 +426,7 @@ fn advance_head<T: BeaconChainTypes>(
.lock()
.insert(
state.current_epoch(),
head_block_root,
block_root,
state
.get_beacon_proposer_indices(&beacon_chain.spec)
.map_err(BeaconChainError::from)?,
Expand All @@ -397,9 +435,8 @@ fn advance_head<T: BeaconChainTypes>(
.map_err(BeaconChainError::from)?;

// Update the attester cache.
let shuffling_id =
AttestationShufflingId::new(head_block_root, &state, RelativeEpoch::Next)
.map_err(BeaconChainError::from)?;
let shuffling_id = AttestationShufflingId::new(block_root, &state, RelativeEpoch::Next)
.map_err(BeaconChainError::from)?;
let committee_cache = state
.committee_cache(RelativeEpoch::Next)
.map_err(BeaconChainError::from)?;
Expand All @@ -412,7 +449,7 @@ fn advance_head<T: BeaconChainTypes>(
debug!(
log,
"Primed proposer and attester caches";
"head_block_root" => ?head_block_root,
"block_root" => ?block_root,
"next_epoch_shuffling_root" => ?shuffling_id.shuffling_decision_block,
"state_epoch" => state.current_epoch(),
"current_epoch" => current_slot.epoch(T::EthSpec::slots_per_epoch()),
Expand All @@ -422,24 +459,26 @@ fn advance_head<T: BeaconChainTypes>(
// Apply the state to the attester cache, if the cache deems it interesting.
beacon_chain
.attester_cache
.maybe_cache_state(&state, head_block_root, &beacon_chain.spec)
.maybe_cache_state(&state, block_root, &beacon_chain.spec)
.map_err(BeaconChainError::from)?;

let final_slot = state.slot();

// Write the advanced state to the database.
let advanced_state_root = state.update_tree_hash_cache()?;
beacon_chain.store.put_state(&advanced_state_root, &state)?;
beacon_chain
.store
.put_advanced_state(block_root, block.slot(), advanced_state_root, state)?;

debug!(
log,
"Completed state advance";
"head_block_root" => ?head_block_root,
"block_root" => ?block_root,
"advanced_slot" => final_slot,
"initial_slot" => initial_slot,
);

Ok(())
Ok(AdvanceStateOutcome::StateAdvanced)
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/sync_committee_rewards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.zip(sync_aggregate.sync_committee_bits.iter())
{
let participant_balance = balances
.get_mut(&validator_index)
.get_mut(validator_index)
.ok_or(BeaconChainError::SyncCommitteeRewardsSyncError)?;

if participant_bit {
Expand Down
3 changes: 2 additions & 1 deletion beacon_node/genesis/src/interop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ mod test {
}

for (index, v) in state.validators().iter().enumerate() {
let creds = v.withdrawal_credentials.as_bytes();
let withdrawal_credientials = v.withdrawal_credentials();
let creds = withdrawal_credientials.as_bytes();
if index % 2 == 0 {
assert_eq!(
creds[0], spec.bls_withdrawal_prefix_byte,
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/store/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ pub enum Error {
Hdiff(hdiff::Error),
InconsistentFork(InconsistentFork),
ZeroCacheSize,
AdvancedStateMissesSlot {
previous_slot: Slot,
state_slot: Slot,
},
}

pub trait HandleUnavailable<T> {
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/store/src/forwards_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
// of the pre iterator.
None => {
let continuation_data = continuation_data.take();
let start_slot = Slot::from(iter.limit);
let start_slot = iter.limit;

*self = PostFinalizationLazy {
continuation_data,
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/store/src/hdiff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ mod tests {

let xor_diff = XorDiff::compute(&x_values, &y_values).unwrap();

let mut y_from_xor = x_values.clone();
let mut y_from_xor = x_values;
xor_diff.apply(&mut y_from_xor).unwrap();

assert_eq!(y_values, y_from_xor);
Expand Down
Loading