Skip to content

Commit

Permalink
Refactor op pool for speed and correctness (sigp#3312)
Browse files Browse the repository at this point in the history
## Proposed Changes

This PR has two aims: to speed up attestation packing in the op pool, and to fix bugs in the verification of attester slashings, proposer slashings and voluntary exits. The changes are bundled into a single database schema upgrade (v12).

Attestation packing is sped up by removing several inefficiencies: 

- No more recalculation of `attesting_indices` during packing.
- No (unnecessary) examination of the `ParticipationFlags`: a bitfield suffices. See `RewardCache`.
- No re-checking of attestation validity during packing: the `AttestationMap` provides attestations which are "correct by construction" (I have checked this using Hydra).
- No SSZ re-serialization for the clunky `AttestationId` type (it can be removed in a future release).

So far the speed-up seems to be roughly 2-10x, from 500ms down to 50-100ms.

Verification of attester slashings, proposer slashings and voluntary exits is fixed by:

- Tracking the `ForkVersion`s that were used to verify each message inside the `SigVerifiedOp`. This allows us to quickly re-verify that they match the head state's opinion of what the `ForkVersion` should be at the epoch(s) relevant to the message.
- Storing the `SigVerifiedOp` on disk rather than the raw operation. This allows us to continue track the fork versions after a reboot.

This is mostly contained in this commit 52bb184.

## Additional Info

The schema upgrade uses the justified state to re-verify attestations and compute `attesting_indices` for them. It will drop any attestations that fail to verify, by the logic that attestations are most valuable in the few slots after they're observed, and are probably stale and useless by the time a node restarts. Exits and proposer slashings and similarly re-verified to obtain `SigVerifiedOp`s.

This PR contains a runtime killswitch `--paranoid-block-proposal` which opts out of all the optimisations in favour of closely verifying every included message. Although I'm quite sure that the optimisations are correct this flag could be useful in the event of an unforeseen emergency.

Finally, you might notice that the `RewardCache` appears quite useless in its current form because it is only updated on the hot-path immediately before proposal. My hope is that in future we can shift calls to `RewardCache::update` into the background, e.g. while performing the state advance. It is also forward-looking to `tree-states` compatibility, where iterating and indexing `state.{previous,current}_epoch_participation` is expensive and needs to be minimised.
  • Loading branch information
michaelsproul committed Aug 29, 2022
1 parent 1c9ec42 commit 66eca1a
Show file tree
Hide file tree
Showing 37 changed files with 1,704 additions and 515 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion beacon_node/beacon_chain/src/attestation_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,17 @@ impl<'a, T: BeaconChainTypes> Clone for IndexedUnaggregatedAttestation<'a, T> {

/// A helper trait implemented on wrapper types that can be progressed to a state where they can be
/// verified for application to fork choice.
pub trait VerifiedAttestation<T: BeaconChainTypes> {
pub trait VerifiedAttestation<T: BeaconChainTypes>: Sized {
fn attestation(&self) -> &Attestation<T::EthSpec>;

fn indexed_attestation(&self) -> &IndexedAttestation<T::EthSpec>;

// Inefficient default implementation. This is overridden for gossip verified attestations.
fn into_attestation_and_indices(self) -> (Attestation<T::EthSpec>, Vec<u64>) {
let attestation = self.attestation().clone();
let attesting_indices = self.indexed_attestation().attesting_indices.clone().into();
(attestation, attesting_indices)
}
}

impl<'a, T: BeaconChainTypes> VerifiedAttestation<T> for VerifiedAggregatedAttestation<'a, T> {
Expand Down
153 changes: 112 additions & 41 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,23 @@ use fork_choice::{
use futures::channel::mpsc::Sender;
use itertools::process_results;
use itertools::Itertools;
use operation_pool::{OperationPool, PersistedOperationPool};
use operation_pool::{AttestationRef, OperationPool, PersistedOperationPool};
use parking_lot::{Mutex, RwLock};
use safe_arith::SafeArith;
use slasher::Slasher;
use slog::{crit, debug, error, info, trace, warn, Logger};
use slot_clock::SlotClock;
use ssz::Encode;
use state_processing::{
common::get_indexed_attestation,
common::{get_attesting_indices_from_state, get_indexed_attestation},
per_block_processing,
per_block_processing::errors::AttestationValidationError,
per_block_processing::{
errors::AttestationValidationError, verify_attestation_for_block_inclusion,
VerifySignatures,
},
per_slot_processing,
state_advance::{complete_state_advance, partial_state_advance},
BlockSignatureStrategy, SigVerifiedOp, VerifyBlockRoot,
BlockSignatureStrategy, SigVerifiedOp, VerifyBlockRoot, VerifyOperation,
};
use std::cmp::Ordering;
use std::collections::HashMap;
Expand Down Expand Up @@ -1904,25 +1907,22 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Accepts a `VerifiedAttestation` and attempts to apply it to `self.op_pool`.
///
/// The op pool is used by local block producers to pack blocks with operations.
pub fn add_to_block_inclusion_pool(
pub fn add_to_block_inclusion_pool<A>(
&self,
verified_attestation: &impl VerifiedAttestation<T>,
) -> Result<(), AttestationError> {
verified_attestation: A,
) -> Result<(), AttestationError>
where
A: VerifiedAttestation<T>,
{
let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_APPLY_TO_OP_POOL);

// If there's no eth1 chain then it's impossible to produce blocks and therefore
// useless to put things in the op pool.
if self.eth1_chain.is_some() {
let fork = self.canonical_head.cached_head().head_fork();

let (attestation, attesting_indices) =
verified_attestation.into_attestation_and_indices();
self.op_pool
.insert_attestation(
// TODO: address this clone.
verified_attestation.attestation().clone(),
&fork,
self.genesis_validators_root,
&self.spec,
)
.insert_attestation(attestation, attesting_indices)
.map_err(Error::from)?;
}

Expand Down Expand Up @@ -1955,15 +1955,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn filter_op_pool_attestation(
&self,
filter_cache: &mut HashMap<(Hash256, Epoch), bool>,
att: &Attestation<T::EthSpec>,
att: &AttestationRef<T::EthSpec>,
state: &BeaconState<T::EthSpec>,
) -> bool {
*filter_cache
.entry((att.data.beacon_block_root, att.data.target.epoch))
.entry((att.data.beacon_block_root, att.checkpoint.target_epoch))
.or_insert_with(|| {
self.shuffling_is_compatible(
&att.data.beacon_block_root,
att.data.target.epoch,
att.checkpoint.target_epoch,
state,
)
})
Expand Down Expand Up @@ -2045,7 +2045,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn verify_voluntary_exit_for_gossip(
&self,
exit: SignedVoluntaryExit,
) -> Result<ObservationOutcome<SignedVoluntaryExit>, Error> {
) -> Result<ObservationOutcome<SignedVoluntaryExit, T::EthSpec>, Error> {
// NOTE: this could be more efficient if it avoided cloning the head state
let wall_clock_state = self.wall_clock_state()?;
Ok(self
Expand All @@ -2066,7 +2066,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

/// Accept a pre-verified exit and queue it for inclusion in an appropriate block.
pub fn import_voluntary_exit(&self, exit: SigVerifiedOp<SignedVoluntaryExit>) {
pub fn import_voluntary_exit(&self, exit: SigVerifiedOp<SignedVoluntaryExit, T::EthSpec>) {
if self.eth1_chain.is_some() {
self.op_pool.insert_voluntary_exit(exit)
}
Expand All @@ -2076,7 +2076,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn verify_proposer_slashing_for_gossip(
&self,
proposer_slashing: ProposerSlashing,
) -> Result<ObservationOutcome<ProposerSlashing>, Error> {
) -> Result<ObservationOutcome<ProposerSlashing, T::EthSpec>, Error> {
let wall_clock_state = self.wall_clock_state()?;
Ok(self.observed_proposer_slashings.lock().verify_and_observe(
proposer_slashing,
Expand All @@ -2086,7 +2086,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

/// Accept some proposer slashing and queue it for inclusion in an appropriate block.
pub fn import_proposer_slashing(&self, proposer_slashing: SigVerifiedOp<ProposerSlashing>) {
pub fn import_proposer_slashing(
&self,
proposer_slashing: SigVerifiedOp<ProposerSlashing, T::EthSpec>,
) {
if self.eth1_chain.is_some() {
self.op_pool.insert_proposer_slashing(proposer_slashing)
}
Expand All @@ -2096,7 +2099,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn verify_attester_slashing_for_gossip(
&self,
attester_slashing: AttesterSlashing<T::EthSpec>,
) -> Result<ObservationOutcome<AttesterSlashing<T::EthSpec>>, Error> {
) -> Result<ObservationOutcome<AttesterSlashing<T::EthSpec>, T::EthSpec>, Error> {
let wall_clock_state = self.wall_clock_state()?;
Ok(self.observed_attester_slashings.lock().verify_and_observe(
attester_slashing,
Expand All @@ -2111,7 +2114,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// 2. Add it to the op pool.
pub fn import_attester_slashing(
&self,
attester_slashing: SigVerifiedOp<AttesterSlashing<T::EthSpec>>,
attester_slashing: SigVerifiedOp<AttesterSlashing<T::EthSpec>, T::EthSpec>,
) {
// Add to fork choice.
self.canonical_head
Expand All @@ -2120,10 +2123,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// Add to the op pool (if we have the ability to propose blocks).
if self.eth1_chain.is_some() {
self.op_pool.insert_attester_slashing(
attester_slashing,
self.canonical_head.cached_head().head_fork(),
)
self.op_pool.insert_attester_slashing(attester_slashing)
}
}

Expand Down Expand Up @@ -3351,7 +3351,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
};

let (proposer_slashings, attester_slashings, voluntary_exits) =
let (mut proposer_slashings, mut attester_slashings, mut voluntary_exits) =
self.op_pool.get_slashings_and_exits(&state, &self.spec);

let eth1_data = eth1_chain.eth1_data_for_block_production(&state, &self.spec)?;
Expand All @@ -3362,12 +3362,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let unagg_import_timer =
metrics::start_timer(&metrics::BLOCK_PRODUCTION_UNAGGREGATED_TIMES);
for attestation in self.naive_aggregation_pool.read().iter() {
if let Err(e) = self.op_pool.insert_attestation(
attestation.clone(),
&state.fork(),
state.genesis_validators_root(),
&self.spec,
) {
let import = |attestation: &Attestation<T::EthSpec>| {
let attesting_indices = get_attesting_indices_from_state(&state, attestation)?;
self.op_pool
.insert_attestation(attestation.clone(), attesting_indices)
};
if let Err(e) = import(attestation) {
// Don't stop block production if there's an error, just create a log.
error!(
self.log,
Expand All @@ -3388,15 +3388,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
metrics::start_timer(&metrics::BLOCK_PRODUCTION_ATTESTATION_TIMES);

let mut prev_filter_cache = HashMap::new();
let prev_attestation_filter = |att: &&Attestation<T::EthSpec>| {
self.filter_op_pool_attestation(&mut prev_filter_cache, *att, &state)
let prev_attestation_filter = |att: &AttestationRef<T::EthSpec>| {
self.filter_op_pool_attestation(&mut prev_filter_cache, att, &state)
};
let mut curr_filter_cache = HashMap::new();
let curr_attestation_filter = |att: &&Attestation<T::EthSpec>| {
self.filter_op_pool_attestation(&mut curr_filter_cache, *att, &state)
let curr_attestation_filter = |att: &AttestationRef<T::EthSpec>| {
self.filter_op_pool_attestation(&mut curr_filter_cache, att, &state)
};

let attestations = self
let mut attestations = self
.op_pool
.get_attestations(
&state,
Expand All @@ -3407,6 +3407,77 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_err(BlockProductionError::OpPoolError)?;
drop(attestation_packing_timer);

// If paranoid mode is enabled re-check the signatures of every included message.
// This will be a lot slower but guards against bugs in block production and can be
// quickly rolled out without a release.
if self.config.paranoid_block_proposal {
attestations.retain(|att| {
verify_attestation_for_block_inclusion(
&state,
att,
VerifySignatures::True,
&self.spec,
)
.map_err(|e| {
warn!(
self.log,
"Attempted to include an invalid attestation";
"err" => ?e,
"block_slot" => state.slot(),
"attestation" => ?att
);
})
.is_ok()
});

proposer_slashings.retain(|slashing| {
slashing
.clone()
.validate(&state, &self.spec)
.map_err(|e| {
warn!(
self.log,
"Attempted to include an invalid proposer slashing";
"err" => ?e,
"block_slot" => state.slot(),
"slashing" => ?slashing
);
})
.is_ok()
});

attester_slashings.retain(|slashing| {
slashing
.clone()
.validate(&state, &self.spec)
.map_err(|e| {
warn!(
self.log,
"Attempted to include an invalid attester slashing";
"err" => ?e,
"block_slot" => state.slot(),
"slashing" => ?slashing
);
})
.is_ok()
});

voluntary_exits.retain(|exit| {
exit.clone()
.validate(&state, &self.spec)
.map_err(|e| {
warn!(
self.log,
"Attempted to include an invalid proposer slashing";
"err" => ?e,
"block_slot" => state.slot(),
"exit" => ?exit
);
})
.is_ok()
});
}

let slot = state.slot();
let proposer_index = state.get_beacon_proposer_index(state.slot(), &self.spec)? as u64;

Expand Down
33 changes: 27 additions & 6 deletions beacon_node/beacon_chain/src/block_reward.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
use eth2::lighthouse::{AttestationRewards, BlockReward, BlockRewardMeta};
use operation_pool::{AttMaxCover, MaxCover};
use state_processing::per_block_processing::altair::sync_committee::compute_sync_aggregate_rewards;
use operation_pool::{AttMaxCover, MaxCover, RewardCache, SplitAttestation};
use state_processing::{
common::get_attesting_indices_from_state,
per_block_processing::altair::sync_committee::compute_sync_aggregate_rewards,
};
use types::{BeaconBlockRef, BeaconState, EthSpec, ExecPayload, Hash256};

impl<T: BeaconChainTypes> BeaconChain<T> {
Expand All @@ -10,20 +13,38 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block: BeaconBlockRef<'_, T::EthSpec, Payload>,
block_root: Hash256,
state: &BeaconState<T::EthSpec>,
reward_cache: &mut RewardCache,
include_attestations: bool,
) -> Result<BlockReward, BeaconChainError> {
if block.slot() != state.slot() {
return Err(BeaconChainError::BlockRewardSlotError);
}

reward_cache.update(state)?;

let total_active_balance = state.get_total_active_balance()?;
let mut per_attestation_rewards = block

let split_attestations = block
.body()
.attestations()
.iter()
.map(|att| {
AttMaxCover::new(att, state, total_active_balance, &self.spec)
.ok_or(BeaconChainError::BlockRewardAttestationError)
let attesting_indices = get_attesting_indices_from_state(state, att)?;
Ok(SplitAttestation::new(att.clone(), attesting_indices))
})
.collect::<Result<Vec<_>, BeaconChainError>>()?;

let mut per_attestation_rewards = split_attestations
.iter()
.map(|att| {
AttMaxCover::new(
att.as_ref(),
state,
reward_cache,
total_active_balance,
&self.spec,
)
.ok_or(BeaconChainError::BlockRewardAttestationError)
})
.collect::<Result<Vec<_>, _>>()?;

Expand All @@ -34,7 +55,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let latest_att = &updated[i];

for att in to_update {
att.update_covering_set(latest_att.object(), latest_att.covering_set());
att.update_covering_set(latest_att.intermediate(), latest_att.covering_set());
}
}

Expand Down
Loading

0 comments on commit 66eca1a

Please sign in to comment.