From b1657a60e9eb7ae8083b774b46c6df986fa6d220 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Thu, 17 Jun 2021 02:10:46 +0000 Subject: [PATCH] Reorg events (#2090) ## Issue Addressed Resolves #2088 ## Proposed Changes Add the `chain_reorg` SSE event topic ## Additional Info Co-authored-by: realbigsean Co-authored-by: Paul Hauner --- beacon_node/beacon_chain/src/beacon_chain.rs | 109 ++++++++++++- beacon_node/beacon_chain/src/errors.rs | 4 + beacon_node/beacon_chain/src/events.rs | 15 ++ beacon_node/beacon_chain/tests/tests.rs | 67 +++++++- beacon_node/http_api/src/lib.rs | 3 + beacon_node/http_api/tests/tests.rs | 41 +++++ common/eth2/src/types.rs | 20 +++ .../src/per_slot_processing.rs | 2 +- consensus/types/src/beacon_state.rs | 9 ++ consensus/types/src/beacon_state/iter.rs | 151 ++++++++++++++++++ 10 files changed, 412 insertions(+), 9 deletions(-) create mode 100644 consensus/types/src/beacon_state/iter.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index fb726c16189..42ea3652e51 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -32,7 +32,7 @@ use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::BeaconForkChoiceStore; use crate::BeaconSnapshot; use crate::{metrics, BeaconChainError}; -use eth2::types::{EventKind, SseBlock, SseFinalizedCheckpoint, SseHead}; +use eth2::types::{EventKind, SseBlock, SseChainReorg, SseFinalizedCheckpoint, SseHead}; use fork_choice::ForkChoice; use futures::channel::mpsc::Sender; use itertools::process_results; @@ -455,6 +455,77 @@ impl BeaconChain { .map(|result| result.map_err(|e| e.into()))) } + /// Iterate through the current chain to find the slot intersecting with the given beacon state. + /// The maximum depth this will search is `SLOTS_PER_HISTORICAL_ROOT`, and if that depth is reached + /// and no intersection is found, the finalized slot will be returned. + pub fn find_reorg_slot( + &self, + new_state: &BeaconState, + new_block_root: Hash256, + ) -> Result { + self.with_head(|snapshot| { + let old_state = &snapshot.beacon_state; + let old_block_root = snapshot.beacon_block_root; + + // The earliest slot for which the two chains may have a common history. + let lowest_slot = std::cmp::min(new_state.slot, old_state.slot); + + // Create an iterator across `$state`, assuming that the block at `$state.slot` has the + // block root of `$block_root`. + // + // The iterator will be skipped until the next value returns `lowest_slot`. + // + // This is a macro instead of a function or closure due to the complex types invloved + // in all the iterator wrapping. + macro_rules! aligned_roots_iter { + ($state: ident, $block_root: ident) => { + std::iter::once(Ok(($state.slot, $block_root))) + .chain($state.rev_iter_block_roots(&self.spec)) + .skip_while(|result| { + result + .as_ref() + .map_or(false, |(slot, _)| *slot > lowest_slot) + }) + }; + } + + // Create iterators across old/new roots where iterators both start at the same slot. + let mut new_roots = aligned_roots_iter!(new_state, new_block_root); + let mut old_roots = aligned_roots_iter!(old_state, old_block_root); + + // Whilst *both* of the iterators are still returning values, try and find a common + // ancestor between them. + while let (Some(old), Some(new)) = (old_roots.next(), new_roots.next()) { + let (old_slot, old_root) = old?; + let (new_slot, new_root) = new?; + + // Sanity check to detect programming errors. + if old_slot != new_slot { + return Err(Error::InvalidReorgSlotIter { new_slot, old_slot }); + } + + if old_root == new_root { + // A common ancestor has been found. + return Ok(old_slot); + } + } + + // If no common ancestor is found, declare that the re-org happened at the previous + // finalized slot. + // + // Sometimes this will result in the return slot being *lower* than the actual reorg + // slot. However, assuming we don't re-org through a finalized slot, it will never be + // *higher*. + // + // We provide this potentially-inaccurate-but-safe information to avoid onerous + // database reads during times of deep reorgs. + Ok(old_state + .finalized_checkpoint + .epoch + .start_slot(T::EthSpec::slots_per_epoch())) + }) + } + /// Iterates across all `(state_root, slot)` pairs from the head of the chain (inclusive) to /// the earliest reachable ancestor (may or may not be genesis). /// @@ -2270,14 +2341,25 @@ impl BeaconChain { // Note: this will declare a re-org if we skip `SLOTS_PER_HISTORICAL_ROOT` blocks // between calls to fork choice without swapping between chains. This seems like an // extreme-enough scenario that a warning is fine. - let is_reorg = current_head.block_root - != new_head - .beacon_state - .get_block_root(current_head.slot) - .map(|root| *root) - .unwrap_or_else(|_| Hash256::random()); + let is_reorg = new_head + .beacon_state + .get_block_root(current_head.slot) + .map_or(true, |root| *root != current_head.block_root); + + let mut reorg_distance = Slot::new(0); if is_reorg { + match self.find_reorg_slot(&new_head.beacon_state, new_head.beacon_block_root) { + Ok(slot) => reorg_distance = current_head.slot.saturating_sub(slot), + Err(e) => { + warn!( + self.log, + "Could not find re-org depth"; + "error" => format!("{:?}", e), + ); + } + } + metrics::inc_counter(&metrics::FORK_CHOICE_REORG_COUNT); warn!( self.log, @@ -2287,6 +2369,7 @@ impl BeaconChain { "new_head_parent" => %new_head.beacon_block.parent_root(), "new_head" => %beacon_block_root, "new_slot" => new_head.beacon_block.slot(), + "reorg_distance" => reorg_distance, ); } else { debug!( @@ -2452,6 +2535,18 @@ impl BeaconChain { ); } } + + if is_reorg && event_handler.has_reorg_subscribers() { + event_handler.register(EventKind::ChainReorg(SseChainReorg { + slot: head_slot, + depth: reorg_distance.as_u64(), + old_head_block: current_head.block_root, + old_head_state: current_head.state_root, + new_head_block: beacon_block_root, + new_head_state: state_root, + epoch: head_slot.epoch(T::EthSpec::slots_per_epoch()), + })); + } } Ok(()) diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index dabe96c8afd..d5cc5eda66f 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -117,6 +117,10 @@ pub enum BeaconChainError { request_slot: Slot, slot: Slot, }, + InvalidReorgSlotIter { + old_slot: Slot, + new_slot: Slot, + }, } easy_from_to!(SlotProcessingError, BeaconChainError); diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index b679b99776d..b71236bc591 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -12,6 +12,7 @@ pub struct ServerSentEventHandler { finalized_tx: Sender>, head_tx: Sender>, exit_tx: Sender>, + chain_reorg: Sender>, log: Logger, } @@ -22,6 +23,7 @@ impl ServerSentEventHandler { let (finalized_tx, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY); let (head_tx, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY); let (exit_tx, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY); + let (chain_reorg, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY); Self { attestation_tx, @@ -29,6 +31,7 @@ impl ServerSentEventHandler { finalized_tx, head_tx, exit_tx, + chain_reorg, log, } } @@ -39,6 +42,7 @@ impl ServerSentEventHandler { let (finalized_tx, _) = broadcast::channel(capacity); let (head_tx, _) = broadcast::channel(capacity); let (exit_tx, _) = broadcast::channel(capacity); + let (chain_reorg, _) = broadcast::channel(capacity); Self { attestation_tx, @@ -46,6 +50,7 @@ impl ServerSentEventHandler { finalized_tx, head_tx, exit_tx, + chain_reorg, log, } } @@ -65,6 +70,8 @@ impl ServerSentEventHandler { .map(|count| trace!(self.log, "Registering server-sent head event"; "receiver_count" => count)), EventKind::VoluntaryExit(exit) => self.exit_tx.send(EventKind::VoluntaryExit(exit)) .map(|count| trace!(self.log, "Registering server-sent voluntary exit event"; "receiver_count" => count)), + EventKind::ChainReorg(reorg) => self.chain_reorg.send(EventKind::ChainReorg(reorg)) + .map(|count| trace!(self.log, "Registering server-sent chain reorg event"; "receiver_count" => count)), }; if let Err(SendError(event)) = result { trace!(self.log, "No receivers registered to listen for event"; "event" => ?event); @@ -91,6 +98,10 @@ impl ServerSentEventHandler { self.exit_tx.subscribe() } + pub fn subscribe_reorgs(&self) -> Receiver> { + self.chain_reorg.subscribe() + } + pub fn has_attestation_subscribers(&self) -> bool { self.attestation_tx.receiver_count() > 0 } @@ -110,4 +121,8 @@ impl ServerSentEventHandler { pub fn has_exit_subscribers(&self) -> bool { self.exit_tx.receiver_count() > 0 } + + pub fn has_reorg_subscribers(&self) -> bool { + self.chain_reorg.receiver_count() > 0 + } } diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index daa306659de..a8166300391 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -9,7 +9,7 @@ use beacon_chain::{ AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, OP_POOL_DB_KEY, }, - WhenSlotSkipped, + StateSkipConfig, WhenSlotSkipped, }; use operation_pool::PersistedOperationPool; use state_processing::{ @@ -139,6 +139,71 @@ fn iterators() { ); } +#[test] +fn find_reorgs() { + let num_blocks_produced = MinimalEthSpec::slots_per_historical_root() + 1; + + let harness = get_harness(VALIDATOR_COUNT); + + harness.extend_chain( + num_blocks_produced as usize, + BlockStrategy::OnCanonicalHead, + // No need to produce attestations for this test. + AttestationStrategy::SomeValidators(vec![]), + ); + + let head_state = harness.chain.head_beacon_state().unwrap(); + let head_slot = head_state.slot; + let genesis_state = harness + .chain + .state_at_slot(Slot::new(0), StateSkipConfig::WithStateRoots) + .unwrap(); + + // because genesis is more than `SLOTS_PER_HISTORICAL_ROOT` away, this should return with the + // finalized slot. + assert_eq!( + harness + .chain + .find_reorg_slot(&genesis_state, harness.chain.genesis_block_root) + .unwrap(), + head_state + .finalized_checkpoint + .epoch + .start_slot(MinimalEthSpec::slots_per_epoch()) + ); + + // test head + assert_eq!( + harness + .chain + .find_reorg_slot( + &head_state, + harness.chain.head_beacon_block().unwrap().canonical_root() + ) + .unwrap(), + head_slot + ); + + // Re-org back to the slot prior to the head. + let prev_slot = head_slot - Slot::new(1); + let prev_state = harness + .chain + .state_at_slot(prev_slot, StateSkipConfig::WithStateRoots) + .unwrap(); + let prev_block_root = harness + .chain + .block_root_at_slot(prev_slot, WhenSlotSkipped::None) + .unwrap() + .unwrap(); + assert_eq!( + harness + .chain + .find_reorg_slot(&prev_state, prev_block_root) + .unwrap(), + prev_slot + ); +} + #[test] fn chooses_fork() { let harness = get_harness(VALIDATOR_COUNT); diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 30b8c8d90bd..74789496cd4 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -2153,6 +2153,9 @@ pub fn serve( api_types::EventTopic::FinalizedCheckpoint => { event_handler.subscribe_finalized() } + api_types::EventTopic::ChainReorg => { + event_handler.subscribe_reorgs() + } }; receivers.push(BroadcastStream::new(receiver).map(|msg| { diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 515a337d73a..22c4a88a976 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -60,6 +60,7 @@ struct ApiTester { chain: Arc>>, client: BeaconNodeHttpClient, next_block: SignedBeaconBlock, + reorg_block: SignedBeaconBlock, attestations: Vec>, attester_slashing: AttesterSlashing, proposer_slashing: ProposerSlashing, @@ -105,6 +106,10 @@ impl ApiTester { let (next_block, _next_state) = harness.make_block(head.beacon_state.clone(), harness.chain.slot().unwrap()); + // `make_block` adds random graffiti, so this will produce an alternate block + let (reorg_block, _reorg_state) = + harness.make_block(head.beacon_state.clone(), harness.chain.slot().unwrap()); + let head_state_root = head.beacon_state_root(); let attestations = harness .get_unaggregated_attestations( @@ -213,6 +218,7 @@ impl ApiTester { chain, client, next_block, + reorg_block, attestations, attester_slashing, proposer_slashing, @@ -238,6 +244,10 @@ impl ApiTester { let (next_block, _next_state) = harness.make_block(head.beacon_state.clone(), harness.chain.slot().unwrap()); + // `make_block` adds random graffiti, so this will produce an alternate block + let (reorg_block, _reorg_state) = + harness.make_block(head.beacon_state.clone(), harness.chain.slot().unwrap()); + let head_state_root = head.beacon_state_root(); let attestations = harness .get_unaggregated_attestations( @@ -320,6 +330,7 @@ impl ApiTester { chain, client, next_block, + reorg_block, attestations, attester_slashing, proposer_slashing, @@ -2233,6 +2244,36 @@ impl ApiTester { &[expected_block, expected_finalized, expected_head] ); + // Test a reorg event + let mut chain_reorg_event_future = self + .client + .get_events::(&[EventTopic::ChainReorg]) + .await + .unwrap(); + + let expected_reorg = EventKind::ChainReorg(SseChainReorg { + slot: self.next_block.slot(), + depth: 1, + old_head_block: self.next_block.canonical_root(), + old_head_state: self.next_block.state_root(), + new_head_block: self.reorg_block.canonical_root(), + new_head_state: self.reorg_block.state_root(), + epoch: self.next_block.slot().epoch(E::slots_per_epoch()), + }); + + self.client + .post_beacon_blocks(&self.reorg_block) + .await + .unwrap(); + + let reorg_event = poll_events( + &mut chain_reorg_event_future, + 1, + Duration::from_millis(10000), + ) + .await; + assert_eq!(reorg_event.as_slice(), &[expected_reorg]); + self } diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index a00d1e76995..d31407645d7 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -689,6 +689,18 @@ pub struct SseHead { pub epoch_transition: bool, } +#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] +pub struct SseChainReorg { + pub slot: Slot, + #[serde(with = "serde_utils::quoted_u64")] + pub depth: u64, + pub old_head_block: Hash256, + pub old_head_state: Hash256, + pub new_head_block: Hash256, + pub new_head_state: Hash256, + pub epoch: Epoch, +} + #[derive(PartialEq, Debug, Serialize, Clone)] #[serde(bound = "T: EthSpec", untagged)] pub enum EventKind { @@ -697,6 +709,7 @@ pub enum EventKind { FinalizedCheckpoint(SseFinalizedCheckpoint), Head(SseHead), VoluntaryExit(SignedVoluntaryExit), + ChainReorg(SseChainReorg), } impl EventKind { @@ -707,6 +720,7 @@ impl EventKind { EventKind::Attestation(_) => "attestation", EventKind::VoluntaryExit(_) => "voluntary_exit", EventKind::FinalizedCheckpoint(_) => "finalized_checkpoint", + EventKind::ChainReorg(_) => "chain_reorg", } } @@ -735,6 +749,9 @@ impl EventKind { "block" => Ok(EventKind::Block(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Block: {:?}", e)), )?)), + "chain_reorg" => Ok(EventKind::ChainReorg(serde_json::from_str(data).map_err( + |e| ServerError::InvalidServerSentEvent(format!("Chain Reorg: {:?}", e)), + )?)), "finalized_checkpoint" => Ok(EventKind::FinalizedCheckpoint( serde_json::from_str(data).map_err(|e| { ServerError::InvalidServerSentEvent(format!("Finalized Checkpoint: {:?}", e)) @@ -768,6 +785,7 @@ pub enum EventTopic { Attestation, VoluntaryExit, FinalizedCheckpoint, + ChainReorg, } impl FromStr for EventTopic { @@ -780,6 +798,7 @@ impl FromStr for EventTopic { "attestation" => Ok(EventTopic::Attestation), "voluntary_exit" => Ok(EventTopic::VoluntaryExit), "finalized_checkpoint" => Ok(EventTopic::FinalizedCheckpoint), + "chain_reorg" => Ok(EventTopic::ChainReorg), _ => Err("event topic cannot be parsed.".to_string()), } } @@ -793,6 +812,7 @@ impl fmt::Display for EventTopic { EventTopic::Attestation => write!(f, "attestation"), EventTopic::VoluntaryExit => write!(f, "voluntary_exit"), EventTopic::FinalizedCheckpoint => write!(f, "finalized_checkpoint"), + EventTopic::ChainReorg => write!(f, "chain_reorg"), } } } diff --git a/consensus/state_processing/src/per_slot_processing.rs b/consensus/state_processing/src/per_slot_processing.rs index a818bde52bf..82cf5abfe19 100644 --- a/consensus/state_processing/src/per_slot_processing.rs +++ b/consensus/state_processing/src/per_slot_processing.rs @@ -55,7 +55,7 @@ fn cache_state( // Note: increment the state slot here to allow use of our `state_root` and `block_root` // getter/setter functions. // - // This is a bit hacky, however it gets the job safely without lots of code. + // This is a bit hacky, however it gets the job done safely without lots of code. let previous_slot = state.slot; state.slot.safe_add_assign(1)?; diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index 6f1b59552b3..fd1dba881b9 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -23,12 +23,14 @@ use tree_hash_derive::TreeHash; pub use self::committee_cache::CommitteeCache; pub use clone_config::CloneConfig; pub use eth_spec::*; +pub use iter::BlockRootsIter; pub use tree_hash_cache::BeaconTreeHashCache; #[macro_use] mod committee_cache; mod clone_config; mod exit_cache; +mod iter; mod pubkey_cache; mod tests; mod tree_hash_cache; @@ -640,6 +642,13 @@ impl BeaconState { } } + /// Returns an iterator across the past block roots of `state` in descending slot-order. + /// + /// See the docs for `BlockRootsIter` for more detail. + pub fn rev_iter_block_roots<'a>(&'a self, spec: &ChainSpec) -> BlockRootsIter<'a, T> { + BlockRootsIter::new(self, spec.genesis_slot) + } + /// Return the block root at a recent `slot`. /// /// Spec v0.12.1 diff --git a/consensus/types/src/beacon_state/iter.rs b/consensus/types/src/beacon_state/iter.rs new file mode 100644 index 00000000000..5f858dd8af8 --- /dev/null +++ b/consensus/types/src/beacon_state/iter.rs @@ -0,0 +1,151 @@ +use crate::*; + +/// Returns an iterator across the past block roots of `state` in descending slot-order. +/// +/// The iterator has the following characteristics: +/// +/// - Will only return *at most* `state.block_roots.len()` entries. +/// - Will not return slots prior to the genesis_slot. +/// - Each call to next will result in a slot one less than the prior one (or `None`). +/// - Skipped slots will contain the block root from the prior non-skipped slot. +pub struct BlockRootsIter<'a, T: EthSpec> { + state: &'a BeaconState, + genesis_slot: Slot, + prev: Slot, +} + +impl<'a, T: EthSpec> BlockRootsIter<'a, T> { + /// Instantiates a new iterator, returning roots for slots earlier that `state.slot`. + /// + /// See the struct-level documentation for more details. + pub fn new(state: &'a BeaconState, genesis_slot: Slot) -> Self { + Self { + state, + genesis_slot, + prev: state.slot, + } + } +} + +impl<'a, T: EthSpec> Iterator for BlockRootsIter<'a, T> { + type Item = Result<(Slot, Hash256), Error>; + + fn next(&mut self) -> Option { + if self.prev > self.genesis_slot + && self.prev + > self + .state + .slot + .saturating_sub(self.state.block_roots.len() as u64) + { + self.prev = self.prev.saturating_sub(1_u64); + Some( + self.state + .get_block_root(self.prev) + .map(|root| (self.prev, *root)), + ) + } else { + None + } + } +} + +#[cfg(test)] +mod test { + use crate::*; + + type E = MinimalEthSpec; + + fn root_slot(i: usize) -> (Slot, Hash256) { + (Slot::from(i), Hash256::from_low_u64_be(i as u64)) + } + + fn all_roots(state: &BeaconState, spec: &ChainSpec) -> Vec<(Slot, Hash256)> { + state + .rev_iter_block_roots(spec) + .collect::>() + .unwrap() + } + + #[test] + fn block_roots_iter() { + let spec = E::default_spec(); + + let mut state: BeaconState = BeaconState::new(0, <_>::default(), &spec); + + for i in 0..state.block_roots.len() { + state.block_roots[i] = root_slot(i).1; + } + + assert_eq!( + state.slot, spec.genesis_slot, + "test assume a genesis slot state" + ); + assert_eq!( + all_roots(&state, &spec), + vec![], + "state at genesis slot has no history" + ); + + state.slot = Slot::new(1); + assert_eq!( + all_roots(&state, &spec), + vec![root_slot(0)], + "first slot after genesis has one slot history" + ); + + state.slot = Slot::new(2); + assert_eq!( + all_roots(&state, &spec), + vec![root_slot(1), root_slot(0)], + "second slot after genesis has two slot history" + ); + + state.slot = Slot::from(state.block_roots.len() + 2); + let expected = (2..state.block_roots.len() + 2) + .rev() + .map(|i| (Slot::from(i), *state.get_block_root(Slot::from(i)).unwrap())) + .collect::>(); + assert_eq!( + all_roots(&state, &spec), + expected, + "slot higher than the block roots history" + ); + } + + #[test] + fn block_roots_iter_non_zero_genesis() { + let mut spec = E::default_spec(); + spec.genesis_slot = Slot::new(4); + + let mut state: BeaconState = BeaconState::new(0, <_>::default(), &spec); + + for i in 0..state.block_roots.len() { + state.block_roots[i] = root_slot(i).1; + } + + assert_eq!( + state.slot, spec.genesis_slot, + "test assume a genesis slot state" + ); + assert_eq!( + all_roots(&state, &spec), + vec![], + "state at genesis slot has no history" + ); + + state.slot = Slot::new(5); + assert_eq!( + all_roots(&state, &spec), + vec![root_slot(4)], + "first slot after genesis has one slot history" + ); + + state.slot = Slot::new(6); + assert_eq!( + all_roots(&state, &spec), + vec![root_slot(5), root_slot(4)], + "second slot after genesis has two slot history" + ); + } +}