diff --git a/beacon_node/operation_pool/src/attestation.rs b/beacon_node/operation_pool/src/attestation.rs index 207e2c65e45..91fd00a3979 100644 --- a/beacon_node/operation_pool/src/attestation.rs +++ b/beacon_node/operation_pool/src/attestation.rs @@ -144,6 +144,13 @@ impl<'a, E: EthSpec> MaxCover for AttMaxCover<'a, E> { /// because including two attestations on chain to satisfy different participation bits is /// impossible without the validator double voting. I.e. it is only suboptimal in the presence /// of slashable voting, which is rare. + /// + /// Post-Electra this optimisation is still OK. The `self.att.data.index` will always be 0 for + /// all Electra attestations, so when a new attestation is added to the solution, we will + /// remove its validators from all attestations at the same slot. It may happen that the + /// included attestation and the attestation being updated have no validators in common, in + /// which case the `retain` will be a no-op. We could consider optimising this in future by only + /// executing the `retain` when the `committee_bits` of the two attestations intersect. fn update_covering_set( &mut self, best_att: &AttestationRef<'a, E>, diff --git a/beacon_node/operation_pool/src/attestation_storage.rs b/beacon_node/operation_pool/src/attestation_storage.rs index 00fbcbe4b01..f06da2afb17 100644 --- a/beacon_node/operation_pool/src/attestation_storage.rs +++ b/beacon_node/operation_pool/src/attestation_storage.rs @@ -1,10 +1,10 @@ use crate::AttestationStats; use itertools::Itertools; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use types::{ attestation::{AttestationBase, AttestationElectra}, superstruct, AggregateSignature, Attestation, AttestationData, BeaconState, BitList, BitVector, - Checkpoint, Epoch, EthSpec, Hash256, Slot, + Checkpoint, Epoch, EthSpec, Hash256, Slot, Unsigned, }; #[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] @@ -30,7 +30,6 @@ pub struct CompactIndexedAttestation { #[superstruct(only(Electra), partial_getter(rename = "aggregation_bits_electra"))] pub aggregation_bits: BitList, pub signature: AggregateSignature, - pub index: u64, #[superstruct(only(Electra))] pub committee_bits: BitVector, } @@ -42,6 +41,7 @@ pub struct SplitAttestation { pub indexed: CompactIndexedAttestation, } +// TODO(electra): rename this type #[derive(Debug, Clone)] pub struct AttestationRef<'a, E: EthSpec> { pub checkpoint: &'a CheckpointKey, @@ -78,7 +78,6 @@ impl SplitAttestation { attesting_indices, aggregation_bits: attn.aggregation_bits, signature: attestation.signature().clone(), - index: data.index, }) } Attestation::Electra(attn) => { @@ -86,7 +85,6 @@ impl SplitAttestation { attesting_indices, aggregation_bits: attn.aggregation_bits, signature: attestation.signature().clone(), - index: data.index, committee_bits: attn.committee_bits, }) } @@ -159,15 +157,15 @@ impl CheckpointKey { } impl CompactIndexedAttestation { - pub fn signers_disjoint_from(&self, other: &Self) -> bool { + pub fn should_aggregate(&self, other: &Self) -> bool { match (self, other) { (CompactIndexedAttestation::Base(this), CompactIndexedAttestation::Base(other)) => { - this.signers_disjoint_from(other) + this.should_aggregate(other) } ( CompactIndexedAttestation::Electra(this), CompactIndexedAttestation::Electra(other), - ) => this.signers_disjoint_from(other), + ) => this.should_aggregate(other), // TODO(electra) is a mix of electra and base compact indexed attestations an edge case we need to deal with? _ => false, } @@ -181,7 +179,7 @@ impl CompactIndexedAttestation { ( CompactIndexedAttestation::Electra(this), CompactIndexedAttestation::Electra(other), - ) => this.aggregate(other), + ) => this.aggregate_same_committee(other), // TODO(electra) is a mix of electra and base compact indexed attestations an edge case we need to deal with? _ => (), } @@ -189,7 +187,7 @@ impl CompactIndexedAttestation { } impl CompactIndexedAttestationBase { - pub fn signers_disjoint_from(&self, other: &Self) -> bool { + pub fn should_aggregate(&self, other: &Self) -> bool { self.aggregation_bits .intersection(&other.aggregation_bits) .is_zero() @@ -208,24 +206,82 @@ impl CompactIndexedAttestationBase { } impl CompactIndexedAttestationElectra { - // TODO(electra) update to match spec requirements - pub fn signers_disjoint_from(&self, other: &Self) -> bool { - self.aggregation_bits - .intersection(&other.aggregation_bits) - .is_zero() + pub fn should_aggregate(&self, other: &Self) -> bool { + // For Electra, only aggregate attestations in the same committee. + self.committee_bits == other.committee_bits + && self + .aggregation_bits + .intersection(&other.aggregation_bits) + .is_zero() } - // TODO(electra) update to match spec requirements - pub fn aggregate(&mut self, other: &Self) { + pub fn aggregate_same_committee(&mut self, other: &Self) { + // TODO(electra): remove assert in favour of Result + assert_eq!(self.committee_bits, other.committee_bits); + self.aggregation_bits = self.aggregation_bits.union(&other.aggregation_bits); + self.attesting_indices = self + .attesting_indices + .drain(..) + .merge(other.attesting_indices.iter().copied()) + .dedup() + .collect(); + self.signature.add_assign_aggregate(&other.signature); + } + + pub fn aggregate_with_disjoint_committees(&mut self, other: &Self) { + // TODO(electra): remove asserts or use Result + assert!(self + .committee_bits + .intersection(&other.committee_bits) + .is_zero(),); + // The attestation being aggregated in must only have 1 committee bit set. + assert_eq!(other.committee_bits.num_set_bits(), 1); + // Check we are aggregating in increasing committee index order (so we can append + // aggregation bits). + assert!(self.committee_bits.highest_set_bit() < other.committee_bits.highest_set_bit()); + + self.committee_bits = self.committee_bits.union(&other.committee_bits); + self.aggregation_bits = + bitlist_extend(&self.aggregation_bits, &other.aggregation_bits).unwrap(); self.attesting_indices = self .attesting_indices .drain(..) .merge(other.attesting_indices.iter().copied()) .dedup() .collect(); - self.aggregation_bits = self.aggregation_bits.union(&other.aggregation_bits); self.signature.add_assign_aggregate(&other.signature); } + + pub fn committee_index(&self) -> u64 { + *self.get_committee_indices().first().unwrap_or(&0u64) + } + + pub fn get_committee_indices(&self) -> Vec { + self.committee_bits + .iter() + .enumerate() + .filter_map(|(index, bit)| if bit { Some(index as u64) } else { None }) + .collect() + } +} + +// TODO(electra): upstream this or a more efficient implementation +fn bitlist_extend(list1: &BitList, list2: &BitList) -> Option> { + let new_length = list1.len() + list2.len(); + let mut list = BitList::::with_capacity(new_length).ok()?; + + // Copy bits from list1. + for (i, bit) in list1.iter().enumerate() { + list.set(i, bit).ok()?; + } + + // Copy bits from list2, starting from the end of list1. + let offset = list1.len(); + for (i, bit) in list2.iter().enumerate() { + list.set(offset + i, bit).ok()?; + } + + Some(list) } impl AttestationMap { @@ -239,34 +295,106 @@ impl AttestationMap { let attestation_map = self.checkpoint_map.entry(checkpoint).or_default(); let attestations = attestation_map.attestations.entry(data).or_default(); - // TODO(electra): // Greedily aggregate the attestation with all existing attestations. // NOTE: this is sub-optimal and in future we will remove this in favour of max-clique // aggregation. let mut aggregated = false; - match attestation { - Attestation::Base(_) => { - for existing_attestation in attestations.iter_mut() { - if existing_attestation.signers_disjoint_from(&indexed) { - existing_attestation.aggregate(&indexed); - aggregated = true; - } else if *existing_attestation == indexed { - aggregated = true; - } - } + for existing_attestation in attestations.iter_mut() { + if existing_attestation.should_aggregate(&indexed) { + existing_attestation.aggregate(&indexed); + aggregated = true; + } else if *existing_attestation == indexed { + aggregated = true; } - // TODO(electra) in order to be devnet ready, we can skip - // aggregating here for now. this will result in "poorly" - // constructed blocks, but that should be fine for devnet - Attestation::Electra(_) => (), - }; + } if !aggregated { attestations.push(indexed); } } + /// Aggregate Electra attestations for the same attestation data signed by different + /// committees. + /// + /// Non-Electra attestations are left as-is. + pub fn aggregate_across_committees(&mut self, checkpoint_key: CheckpointKey) { + let Some(attestation_map) = self.checkpoint_map.get_mut(&checkpoint_key) else { + return; + }; + for compact_indexed_attestations in attestation_map.attestations.values_mut() { + let unaggregated_attestations = std::mem::take(compact_indexed_attestations); + let mut aggregated_attestations: Vec> = vec![]; + + // Aggregate the best attestations for each committee and leave the rest. + let mut best_attestations_by_committee: BTreeMap< + u64, + CompactIndexedAttestationElectra, + > = BTreeMap::new(); + + for committee_attestation in unaggregated_attestations { + let mut electra_attestation = match committee_attestation { + CompactIndexedAttestation::Electra(att) + if att.committee_bits.num_set_bits() == 1 => + { + att + } + CompactIndexedAttestation::Electra(att) => { + // Aggregate already covers multiple committees, leave it as-is. + aggregated_attestations.push(CompactIndexedAttestation::Electra(att)); + continue; + } + CompactIndexedAttestation::Base(att) => { + // Leave as-is. + aggregated_attestations.push(CompactIndexedAttestation::Base(att)); + continue; + } + }; + let committee_index = electra_attestation.committee_index(); + if let Some(existing_attestation) = + best_attestations_by_committee.get_mut(&committee_index) + { + // Search for the best (most aggregation bits) attestation for this committee + // index. + if electra_attestation.aggregation_bits.num_set_bits() + > existing_attestation.aggregation_bits.num_set_bits() + { + // New attestation is better than the previously known one for this + // committee. Replace it. + std::mem::swap(existing_attestation, &mut electra_attestation); + } + // Put the inferior attestation into the list of aggregated attestations + // without performing any cross-committee aggregation. + aggregated_attestations + .push(CompactIndexedAttestation::Electra(electra_attestation)); + } else { + // First attestation seen for this committee. Place it in the map + // provisionally. + best_attestations_by_committee.insert(committee_index, electra_attestation); + } + } + + if let Some(on_chain_aggregate) = + Self::compute_on_chain_aggregate(best_attestations_by_committee) + { + aggregated_attestations + .push(CompactIndexedAttestation::Electra(on_chain_aggregate)); + } + + *compact_indexed_attestations = aggregated_attestations; + } + } + + pub fn compute_on_chain_aggregate( + mut attestations_by_committee: BTreeMap>, + ) -> Option> { + let (_, mut on_chain_aggregate) = attestations_by_committee.pop_first()?; + for (_, attestation) in attestations_by_committee { + on_chain_aggregate.aggregate_with_disjoint_committees(&attestation); + } + Some(on_chain_aggregate) + } + /// Iterate all attestations matching the given `checkpoint_key`. pub fn get_attestations<'a>( &'a self, diff --git a/beacon_node/operation_pool/src/lib.rs b/beacon_node/operation_pool/src/lib.rs index 6645416d4b0..daddbf76652 100644 --- a/beacon_node/operation_pool/src/lib.rs +++ b/beacon_node/operation_pool/src/lib.rs @@ -40,7 +40,7 @@ use std::ptr; use types::{ sync_aggregate::Error as SyncAggregateError, typenum::Unsigned, AbstractExecPayload, Attestation, AttestationData, AttesterSlashing, BeaconState, BeaconStateError, ChainSpec, - Epoch, EthSpec, ProposerSlashing, SignedBeaconBlock, SignedBlsToExecutionChange, + Epoch, EthSpec, ForkName, ProposerSlashing, SignedBeaconBlock, SignedBlsToExecutionChange, SignedVoluntaryExit, Slot, SyncAggregate, SyncCommitteeContribution, Validator, }; @@ -256,6 +256,7 @@ impl OperationPool { curr_epoch_validity_filter: impl for<'a> FnMut(&AttestationRef<'a, E>) -> bool + Send, spec: &ChainSpec, ) -> Result>, OpPoolError> { + let fork_name = state.fork_name_unchecked(); if !matches!(state, BeaconState::Base(_)) { // Epoch cache must be initialized to fetch base reward values in the max cover `score` // function. Currently max cover ignores items on errors. If epoch cache is not @@ -267,7 +268,6 @@ impl OperationPool { // Attestations for the current fork, which may be from the current or previous epoch. let (prev_epoch_key, curr_epoch_key) = CheckpointKey::keys_for_state(state); - let all_attestations = self.attestations.read(); let total_active_balance = state .get_total_active_balance() .map_err(OpPoolError::GetAttestationsTotalBalanceError)?; @@ -284,6 +284,16 @@ impl OperationPool { let mut num_prev_valid = 0_i64; let mut num_curr_valid = 0_i64; + // TODO(electra): Work out how to do this more elegantly. This is a bit of a hack. + let mut all_attestations = self.attestations.write(); + + if fork_name >= ForkName::Electra { + all_attestations.aggregate_across_committees(prev_epoch_key); + all_attestations.aggregate_across_committees(curr_epoch_key); + } + + let all_attestations = parking_lot::RwLockWriteGuard::downgrade(all_attestations); + let prev_epoch_att = self .get_valid_attestations_for_epoch( &prev_epoch_key, @@ -307,6 +317,11 @@ impl OperationPool { ) .inspect(|_| num_curr_valid += 1); + let curr_epoch_limit = if fork_name < ForkName::Electra { + E::MaxAttestations::to_usize() + } else { + E::MaxAttestationsElectra::to_usize() + }; let prev_epoch_limit = if let BeaconState::Base(base_state) = state { std::cmp::min( E::MaxPendingAttestations::to_usize() @@ -314,7 +329,7 @@ impl OperationPool { E::MaxAttestations::to_usize(), ) } else { - E::MaxAttestations::to_usize() + curr_epoch_limit }; let (prev_cover, curr_cover) = rayon::join( @@ -329,11 +344,7 @@ impl OperationPool { }, move || { let _timer = metrics::start_timer(&metrics::ATTESTATION_CURR_EPOCH_PACKING_TIME); - maximum_cover( - curr_epoch_att, - E::MaxAttestations::to_usize(), - "curr_epoch_attestations", - ) + maximum_cover(curr_epoch_att, curr_epoch_limit, "curr_epoch_attestations") }, ); @@ -343,7 +354,7 @@ impl OperationPool { Ok(max_cover::merge_solutions( curr_cover, prev_cover, - E::MaxAttestations::to_usize(), + curr_epoch_limit, )) } @@ -1237,7 +1248,17 @@ mod release_tests { let num_big = target_committee_size / big_step_size; let stats = op_pool.attestation_stats(); - assert_eq!(stats.num_attestation_data, committees.len()); + let fork_name = state.fork_name_unchecked(); + + match fork_name { + ForkName::Electra => { + assert_eq!(stats.num_attestation_data, 1); + } + _ => { + assert_eq!(stats.num_attestation_data, committees.len()); + } + }; + assert_eq!( stats.num_attestations, (num_small + num_big) * committees.len() @@ -1248,11 +1269,27 @@ mod release_tests { let best_attestations = op_pool .get_attestations(&state, |_| true, |_| true, spec) .expect("should have best attestations"); - assert_eq!(best_attestations.len(), max_attestations); + match fork_name { + ForkName::Electra => { + assert_eq!(best_attestations.len(), 8); + } + _ => { + assert_eq!(best_attestations.len(), max_attestations); + } + }; // All the best attestations should be signed by at least `big_step_size` (4) validators. for att in &best_attestations { - assert!(att.num_set_aggregation_bits() >= big_step_size); + match fork_name { + ForkName::Electra => { + // TODO(electra) some attestations only have 2 or 3 agg bits set + // others have 5 + assert!(att.num_set_aggregation_bits() >= 2); + } + _ => { + assert!(att.num_set_aggregation_bits() >= big_step_size); + } + }; } } @@ -1331,11 +1368,20 @@ mod release_tests { let num_small = target_committee_size / small_step_size; let num_big = target_committee_size / big_step_size; + let fork_name = state.fork_name_unchecked(); + + match fork_name { + ForkName::Electra => { + assert_eq!(op_pool.attestation_stats().num_attestation_data, 1); + } + _ => { + assert_eq!( + op_pool.attestation_stats().num_attestation_data, + committees.len() + ); + } + }; - assert_eq!( - op_pool.attestation_stats().num_attestation_data, - committees.len() - ); assert_eq!( op_pool.num_attestations(), (num_small + num_big) * committees.len() @@ -1346,7 +1392,15 @@ mod release_tests { let best_attestations = op_pool .get_attestations(&state, |_| true, |_| true, spec) .expect("should have valid best attestations"); - assert_eq!(best_attestations.len(), max_attestations); + + match fork_name { + ForkName::Electra => { + assert_eq!(best_attestations.len(), 8); + } + _ => { + assert_eq!(best_attestations.len(), max_attestations); + } + }; let total_active_balance = state.get_total_active_balance().unwrap(); diff --git a/consensus/types/src/attestation.rs b/consensus/types/src/attestation.rs index 9dd4f1c9241..8c8a81b90f2 100644 --- a/consensus/types/src/attestation.rs +++ b/consensus/types/src/attestation.rs @@ -248,6 +248,9 @@ impl<'a, E: EthSpec> AttestationRef<'a, E> { impl AttestationElectra { /// Are the aggregation bitfields of these attestations disjoint? + // TODO(electra): check whether the definition from CompactIndexedAttestation::should_aggregate + // is useful where this is used, i.e. only consider attestations disjoint when their committees + // match AND their aggregation bits do not intersect. pub fn signers_disjoint_from(&self, other: &Self) -> bool { self.aggregation_bits .intersection(&other.aggregation_bits)