Skip to content

Commit

Permalink
Pack attestations into blocks in parallel (#2307)
Browse files Browse the repository at this point in the history
## Proposed Changes

Use two instances of max cover when packing attestations into blocks: one for the previous epoch, and one for the current epoch. This reduces the amount of computation done by roughly half due to the `O(n^2)` running time of max cover (`2 * (n/2)^2 = n^2/2`). This should help alleviate some load on block proposal, particularly on Prater.
  • Loading branch information
michaelsproul committed Apr 13, 2021
1 parent c1203f5 commit 3b901dc
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 73 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 36 additions & 16 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,26 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(signed_aggregate)
}

/// Filter an attestation from the op pool for shuffling compatibility.
///
/// Use the provided `filter_cache` map to memoize results.
pub fn filter_op_pool_attestation(
&self,
filter_cache: &mut HashMap<(Hash256, Epoch), bool>,
att: &Attestation<T::EthSpec>,
state: &BeaconState<T::EthSpec>,
) -> bool {
*filter_cache
.entry((att.data.beacon_block_root, att.data.target.epoch))
.or_insert_with(|| {
self.shuffling_is_compatible(
&att.data.beacon_block_root,
att.data.target.epoch,
&state,
)
})
}

/// Check that the shuffling at `block_root` is equal to one of the shufflings of `state`.
///
/// The `target_epoch` argument determines which shuffling to check compatibility with, it
Expand Down Expand Up @@ -1968,21 +1988,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.deposits_for_block_inclusion(&state, &eth1_data, &self.spec)?
.into();

// Map from attestation head block root to shuffling compatibility.
// Used to memoize the `attestation_shuffling_is_compatible` function.
let mut shuffling_filter_cache = HashMap::new();
let attestation_filter = |att: &&Attestation<T::EthSpec>| -> bool {
*shuffling_filter_cache
.entry((att.data.beacon_block_root, att.data.target.epoch))
.or_insert_with(|| {
self.shuffling_is_compatible(
&att.data.beacon_block_root,
att.data.target.epoch,
&state,
)
})
};

// Iterate through the naive aggregation pool and ensure all the attestations from there
// are included in the operation pool.
let unagg_import_timer =
Expand Down Expand Up @@ -2012,9 +2017,24 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

let attestation_packing_timer =
metrics::start_timer(&metrics::BLOCK_PRODUCTION_ATTESTATION_TIMES);

let mut prev_filter_cache = HashMap::new();
let prev_attestation_filter = |att: &&Attestation<T::EthSpec>| {
self.filter_op_pool_attestation(&mut prev_filter_cache, *att, &state)
};
let mut curr_filter_cache = HashMap::new();
let curr_attestation_filter = |att: &&Attestation<T::EthSpec>| {
self.filter_op_pool_attestation(&mut curr_filter_cache, *att, &state)
};

let attestations = self
.op_pool
.get_attestations(&state, attestation_filter, &self.spec)
.get_attestations(
&state,
prev_attestation_filter,
curr_attestation_filter,
&self.spec,
)
.map_err(BlockProductionError::OpPoolError)?
.into();
drop(attestation_packing_timer);
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/operation_pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@ authors = ["Michael Sproul <michael@sigmaprime.io>"]
edition = "2018"

[dependencies]
itertools = "0.10.0"
int_to_bytes = { path = "../../consensus/int_to_bytes" }
lazy_static = "1.4.0"
lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
parking_lot = "0.11.0"
types = { path = "../../consensus/types" }
state_processing = { path = "../../consensus/state_processing" }
eth2_ssz = "0.1.2"
eth2_ssz_derive = "0.1.0"
rayon = "1.5.0"
serde = "1.0.116"
serde_derive = "1.0.116"
store = { path = "../store" }
Expand Down
7 changes: 3 additions & 4 deletions beacon_node/operation_pool/src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use state_processing::common::{get_attesting_indices, get_base_reward};
use std::collections::HashMap;
use types::{Attestation, BeaconState, BitList, ChainSpec, EthSpec};

#[derive(Debug, Clone)]
pub struct AttMaxCover<'a, T: EthSpec> {
/// Underlying attestation.
att: &'a Attestation<T>,
Expand Down Expand Up @@ -44,8 +45,8 @@ impl<'a, T: EthSpec> MaxCover for AttMaxCover<'a, T> {
type Object = Attestation<T>;
type Set = HashMap<u64, u64>;

fn object(&self) -> Attestation<T> {
self.att.clone()
fn object(&self) -> &Attestation<T> {
self.att
}

fn covering_set(&self) -> &HashMap<u64, u64> {
Expand Down Expand Up @@ -100,8 +101,6 @@ pub fn earliest_attestation_validators<T: EthSpec>(
state_attestations
.iter()
// In a single epoch, an attester should only be attesting for one slot and index.
// TODO: we avoid including slashable attestations in the state here,
// but maybe we should do something else with them (like construct slashings).
.filter(|existing_attestation| {
existing_attestation.data.slot == attestation.data.slot
&& existing_attestation.data.index == attestation.data.index
Expand Down
5 changes: 3 additions & 2 deletions beacon_node/operation_pool/src/attester_slashing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use state_processing::per_block_processing::get_slashable_indices_modular;
use std::collections::{HashMap, HashSet};
use types::{AttesterSlashing, BeaconState, ChainSpec, EthSpec};

#[derive(Debug, Clone)]
pub struct AttesterSlashingMaxCover<'a, T: EthSpec> {
slashing: &'a AttesterSlashing<T>,
effective_balances: HashMap<u64, u64>,
Expand Down Expand Up @@ -46,8 +47,8 @@ impl<'a, T: EthSpec> MaxCover for AttesterSlashingMaxCover<'a, T> {
type Set = HashMap<u64, u64>;

/// Extract an object for inclusion in a solution.
fn object(&self) -> AttesterSlashing<T> {
self.slashing.clone()
fn object(&self) -> &AttesterSlashing<T> {
self.slashing
}

/// Get the set of elements covered.
Expand Down
133 changes: 92 additions & 41 deletions beacon_node/operation_pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ mod attestation;
mod attestation_id;
mod attester_slashing;
mod max_cover;
mod metrics;
mod persistence;

pub use persistence::PersistedOperationPool;

use attestation::AttMaxCover;
use attestation_id::AttestationId;
use attester_slashing::AttesterSlashingMaxCover;
use max_cover::maximum_cover;
use max_cover::{maximum_cover, MaxCover};
use parking_lot::RwLock;
use state_processing::per_block_processing::errors::AttestationValidationError;
use state_processing::per_block_processing::{
Expand Down Expand Up @@ -96,6 +97,41 @@ impl<T: EthSpec> OperationPool<T> {
self.attestations.read().values().map(Vec::len).sum()
}

/// Return all valid attestations for the given epoch, for use in max cover.
fn get_valid_attestations_for_epoch<'a>(
&'a self,
epoch: Epoch,
all_attestations: &'a HashMap<AttestationId, Vec<Attestation<T>>>,
state: &'a BeaconState<T>,
total_active_balance: u64,
validity_filter: impl FnMut(&&Attestation<T>) -> bool + Send,
spec: &'a ChainSpec,
) -> impl Iterator<Item = AttMaxCover<'a, T>> + Send {
let domain_bytes = AttestationId::compute_domain_bytes(
epoch,
&state.fork,
state.genesis_validators_root,
spec,
);
all_attestations
.iter()
.filter(move |(key, _)| key.domain_bytes_match(&domain_bytes))
.flat_map(|(_, attestations)| attestations)
.filter(move |attestation| attestation.data.target.epoch == epoch)
.filter(move |attestation| {
// Ensure attestations are valid for block inclusion
verify_attestation_for_block_inclusion(
state,
attestation,
VerifySignatures::False,
spec,
)
.is_ok()
})
.filter(validity_filter)
.filter_map(move |att| AttMaxCover::new(att, state, total_active_balance, spec))
}

/// Get a list of attestations for inclusion in a block.
///
/// The `validity_filter` is a closure that provides extra filtering of the attestations
Expand All @@ -105,53 +141,65 @@ impl<T: EthSpec> OperationPool<T> {
pub fn get_attestations(
&self,
state: &BeaconState<T>,
validity_filter: impl FnMut(&&Attestation<T>) -> bool,
prev_epoch_validity_filter: impl FnMut(&&Attestation<T>) -> bool + Send,
curr_epoch_validity_filter: impl FnMut(&&Attestation<T>) -> bool + Send,
spec: &ChainSpec,
) -> Result<Vec<Attestation<T>>, OpPoolError> {
// Attestations for the current fork, which may be from the current or previous epoch.
let prev_epoch = state.previous_epoch();
let current_epoch = state.current_epoch();
let prev_domain_bytes = AttestationId::compute_domain_bytes(
prev_epoch,
&state.fork,
state.genesis_validators_root,
spec,
);
let curr_domain_bytes = AttestationId::compute_domain_bytes(
current_epoch,
&state.fork,
state.genesis_validators_root,
spec,
);
let reader = self.attestations.read();
let all_attestations = self.attestations.read();
let active_indices = state
.get_cached_active_validator_indices(RelativeEpoch::Current)
.map_err(OpPoolError::GetAttestationsTotalBalanceError)?;
let total_active_balance = state
.get_total_balance(&active_indices, spec)
.map_err(OpPoolError::GetAttestationsTotalBalanceError)?;
let valid_attestations = reader
.iter()
.filter(|(key, _)| {
key.domain_bytes_match(&prev_domain_bytes)
|| key.domain_bytes_match(&curr_domain_bytes)
})
.flat_map(|(_, attestations)| attestations)
// That are valid...
.filter(|attestation| {
verify_attestation_for_block_inclusion(
state,
attestation,
VerifySignatures::False,
spec,
)
.is_ok()
})
.filter(validity_filter)
.flat_map(|att| AttMaxCover::new(att, state, total_active_balance, spec));

Ok(maximum_cover(
valid_attestations,
// Split attestations for the previous & current epochs, so that we
// can optimise them individually in parallel.
let prev_epoch_att = self.get_valid_attestations_for_epoch(
prev_epoch,
&*all_attestations,
state,
total_active_balance,
prev_epoch_validity_filter,
spec,
);
let curr_epoch_att = self.get_valid_attestations_for_epoch(
current_epoch,
&*all_attestations,
state,
total_active_balance,
curr_epoch_validity_filter,
spec,
);

let prev_epoch_limit = std::cmp::min(
T::MaxPendingAttestations::to_usize()
.saturating_sub(state.previous_epoch_attestations.len()),
T::MaxAttestations::to_usize(),
);

let (prev_cover, curr_cover) = rayon::join(
move || {
let _timer = metrics::start_timer(&metrics::ATTESTATION_PREV_EPOCH_PACKING_TIME);
// If we're in the genesis epoch, just use the current epoch attestations.
if prev_epoch == current_epoch {
vec![]
} else {
maximum_cover(prev_epoch_att, prev_epoch_limit)
}
},
move || {
let _timer = metrics::start_timer(&metrics::ATTESTATION_CURR_EPOCH_PACKING_TIME);
maximum_cover(curr_epoch_att, T::MaxAttestations::to_usize())
},
);

Ok(max_cover::merge_solutions(
curr_cover,
prev_cover,
T::MaxAttestations::to_usize(),
))
}
Expand Down Expand Up @@ -231,7 +279,10 @@ impl<T: EthSpec> OperationPool<T> {
let attester_slashings = maximum_cover(
relevant_attester_slashings,
T::MaxAttesterSlashings::to_usize(),
);
)
.into_iter()
.map(|cover| cover.object().clone())
.collect();

(proposer_slashings, attester_slashings)
}
Expand Down Expand Up @@ -619,7 +670,7 @@ mod release_tests {
state.slot -= 1;
assert_eq!(
op_pool
.get_attestations(state, |_| true, spec)
.get_attestations(state, |_| true, |_| true, spec)
.expect("should have attestations")
.len(),
0
Expand All @@ -629,7 +680,7 @@ mod release_tests {
state.slot += spec.min_attestation_inclusion_delay;

let block_attestations = op_pool
.get_attestations(state, |_| true, spec)
.get_attestations(state, |_| true, |_| true, spec)
.expect("Should have block attestations");
assert_eq!(block_attestations.len(), committees.len());

Expand Down Expand Up @@ -799,7 +850,7 @@ mod release_tests {

state.slot += spec.min_attestation_inclusion_delay;
let best_attestations = op_pool
.get_attestations(state, |_| true, spec)
.get_attestations(state, |_| true, |_| true, spec)
.expect("should have best attestations");
assert_eq!(best_attestations.len(), max_attestations);

Expand Down Expand Up @@ -874,7 +925,7 @@ mod release_tests {

state.slot += spec.min_attestation_inclusion_delay;
let best_attestations = op_pool
.get_attestations(state, |_| true, spec)
.get_attestations(state, |_| true, |_| true, spec)
.expect("should have valid best attestations");
assert_eq!(best_attestations.len(), max_attestations);

Expand Down
Loading

0 comments on commit 3b901dc

Please sign in to comment.