Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Reorg events #2090

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
ec98fd4
add chain reorg sse
realbigsean Dec 14, 2020
1e97813
add re-org search limit
realbigsean Dec 16, 2020
71de4dd
add a re-org test
realbigsean Dec 18, 2020
f5781e3
Merge branch 'unstable' of https://github.com/sigp/lighthouse into re…
realbigsean Dec 18, 2020
7f1412d
add tests
realbigsean Dec 21, 2020
88b5a12
fix test
realbigsean Dec 21, 2020
90ef1a3
fix logic for if we can't find re-org depth
realbigsean Mar 1, 2021
f4c12a8
Merge branch 'unstable' of https://github.com/sigp/lighthouse into re…
realbigsean Mar 1, 2021
095c441
fix merge with unstable
realbigsean Mar 1, 2021
6a1a9b6
Merge branch 'unstable' of https://github.com/sigp/lighthouse into re…
realbigsean Mar 2, 2021
5714f23
Merge branch 'unstable' of https://github.com/sigp/lighthouse into re…
realbigsean Mar 29, 2021
5a3ad1b
Merge branch 'unstable' of https://github.com/sigp/lighthouse into re…
realbigsean Apr 23, 2021
00e1668
refactor the `find_reorgs` method to avoid state clone
realbigsean Apr 23, 2021
3634504
use `get_block_root` rather than iterating over block roots
realbigsean Apr 23, 2021
0878581
cargo fmt
realbigsean Apr 23, 2021
d70707c
Add iterator-based reorg finder
paulhauner Apr 25, 2021
d78a5c0
Merge branch 'unstable' of https://github.com/sigp/lighthouse into re…
realbigsean May 5, 2021
6fdb456
Merge branch 'unstable' of https://github.com/sigp/lighthouse into re…
realbigsean Jun 2, 2021
9ed4533
Use saturating sub for `Slot`.
realbigsean Jun 2, 2021
ad3cffa
Merge branch 'unstable' of https://github.com/sigp/lighthouse into re…
realbigsean Jun 15, 2021
e0b3e74
Fix test
realbigsean Jun 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 102 additions & 7 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::BeaconForkChoiceStore;
use crate::BeaconSnapshot;
use crate::{metrics, BeaconChainError};
use eth2::types::{EventKind, SseBlock, SseFinalizedCheckpoint, SseHead};
use eth2::types::{EventKind, SseBlock, SseChainReorg, SseFinalizedCheckpoint, SseHead};
use fork_choice::ForkChoice;
use futures::channel::mpsc::Sender;
use itertools::process_results;
Expand Down Expand Up @@ -455,6 +455,77 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map(|result| result.map_err(|e| e.into())))
}

/// Iterate through the current chain to find the slot intersecting with the given beacon state.
/// The maximum depth this will search is `SLOTS_PER_HISTORICAL_ROOT`, and if that depth is reached
/// and no intersection is found, the finalized slot will be returned.
pub fn find_reorg_slot(
paulhauner marked this conversation as resolved.
Show resolved Hide resolved
&self,
new_state: &BeaconState<T::EthSpec>,
new_block_root: Hash256,
) -> Result<Slot, Error> {
self.with_head(|snapshot| {
let old_state = &snapshot.beacon_state;
let old_block_root = snapshot.beacon_block_root;

// The earliest slot for which the two chains may have a common history.
let lowest_slot = std::cmp::min(new_state.slot, old_state.slot);

// Create an iterator across `$state`, assuming that the block at `$state.slot` has the
// block root of `$block_root`.
//
// The iterator will be skipped until the next value returns `lowest_slot`.
//
// This is a macro instead of a function or closure due to the complex types invloved
// in all the iterator wrapping.
macro_rules! aligned_roots_iter {
($state: ident, $block_root: ident) => {
std::iter::once(Ok(($state.slot, $block_root)))
.chain($state.rev_iter_block_roots(&self.spec))
.skip_while(|result| {
result
.as_ref()
.map_or(false, |(slot, _)| *slot > lowest_slot)
})
};
}

// Create iterators across old/new roots where iterators both start at the same slot.
let mut new_roots = aligned_roots_iter!(new_state, new_block_root);
let mut old_roots = aligned_roots_iter!(old_state, old_block_root);

// Whilst *both* of the iterators are still returning values, try and find a common
// ancestor between them.
while let (Some(old), Some(new)) = (old_roots.next(), new_roots.next()) {
let (old_slot, old_root) = old?;
let (new_slot, new_root) = new?;

// Sanity check to detect programming errors.
if old_slot != new_slot {
return Err(Error::InvalidReorgSlotIter { new_slot, old_slot });
}

if old_root == new_root {
// A common ancestor has been found.
return Ok(old_slot);
}
}

// If no common ancestor is found, declare that the re-org happened at the previous
// finalized slot.
//
// Sometimes this will result in the return slot being *lower* than the actual reorg
// slot. However, assuming we don't re-org through a finalized slot, it will never be
// *higher*.
//
// We provide this potentially-inaccurate-but-safe information to avoid onerous
// database reads during times of deep reorgs.
Ok(old_state
.finalized_checkpoint
.epoch
.start_slot(T::EthSpec::slots_per_epoch()))
})
}

/// Iterates across all `(state_root, slot)` pairs from the head of the chain (inclusive) to
/// the earliest reachable ancestor (may or may not be genesis).
///
Expand Down Expand Up @@ -2270,14 +2341,25 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Note: this will declare a re-org if we skip `SLOTS_PER_HISTORICAL_ROOT` blocks
// between calls to fork choice without swapping between chains. This seems like an
// extreme-enough scenario that a warning is fine.
let is_reorg = current_head.block_root
!= new_head
.beacon_state
.get_block_root(current_head.slot)
.map(|root| *root)
.unwrap_or_else(|_| Hash256::random());
let is_reorg = new_head
.beacon_state
.get_block_root(current_head.slot)
.map_or(true, |root| *root != current_head.block_root);

let mut reorg_distance = Slot::new(0);

if is_reorg {
match self.find_reorg_slot(&new_head.beacon_state, new_head.beacon_block_root) {
Ok(slot) => reorg_distance = current_head.slot.saturating_sub(slot),
Err(e) => {
warn!(
self.log,
"Could not find re-org depth";
"error" => format!("{:?}", e),
);
}
}

metrics::inc_counter(&metrics::FORK_CHOICE_REORG_COUNT);
warn!(
self.log,
Expand All @@ -2287,6 +2369,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"new_head_parent" => %new_head.beacon_block.parent_root(),
"new_head" => %beacon_block_root,
"new_slot" => new_head.beacon_block.slot(),
"reorg_distance" => reorg_distance,
);
} else {
debug!(
Expand Down Expand Up @@ -2452,6 +2535,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
}
}

if is_reorg && event_handler.has_reorg_subscribers() {
event_handler.register(EventKind::ChainReorg(SseChainReorg {
slot: head_slot,
depth: reorg_distance.as_u64(),
old_head_block: current_head.block_root,
old_head_state: current_head.state_root,
new_head_block: beacon_block_root,
new_head_state: state_root,
epoch: head_slot.epoch(T::EthSpec::slots_per_epoch()),
}));
}
}

Ok(())
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/beacon_chain/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ pub enum BeaconChainError {
request_slot: Slot,
slot: Slot,
},
InvalidReorgSlotIter {
old_slot: Slot,
new_slot: Slot,
},
}

easy_from_to!(SlotProcessingError, BeaconChainError);
Expand Down
15 changes: 15 additions & 0 deletions beacon_node/beacon_chain/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub struct ServerSentEventHandler<T: EthSpec> {
finalized_tx: Sender<EventKind<T>>,
head_tx: Sender<EventKind<T>>,
exit_tx: Sender<EventKind<T>>,
chain_reorg: Sender<EventKind<T>>,
log: Logger,
}

Expand All @@ -22,13 +23,15 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
let (finalized_tx, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY);
let (head_tx, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY);
let (exit_tx, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY);
let (chain_reorg, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY);

Self {
attestation_tx,
block_tx,
finalized_tx,
head_tx,
exit_tx,
chain_reorg,
log,
}
}
Expand All @@ -39,13 +42,15 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
let (finalized_tx, _) = broadcast::channel(capacity);
let (head_tx, _) = broadcast::channel(capacity);
let (exit_tx, _) = broadcast::channel(capacity);
let (chain_reorg, _) = broadcast::channel(capacity);

Self {
attestation_tx,
block_tx,
finalized_tx,
head_tx,
exit_tx,
chain_reorg,
log,
}
}
Expand All @@ -65,6 +70,8 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
.map(|count| trace!(self.log, "Registering server-sent head event"; "receiver_count" => count)),
EventKind::VoluntaryExit(exit) => self.exit_tx.send(EventKind::VoluntaryExit(exit))
.map(|count| trace!(self.log, "Registering server-sent voluntary exit event"; "receiver_count" => count)),
EventKind::ChainReorg(reorg) => self.chain_reorg.send(EventKind::ChainReorg(reorg))
.map(|count| trace!(self.log, "Registering server-sent chain reorg event"; "receiver_count" => count)),
};
if let Err(SendError(event)) = result {
trace!(self.log, "No receivers registered to listen for event"; "event" => ?event);
Expand All @@ -91,6 +98,10 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
self.exit_tx.subscribe()
}

pub fn subscribe_reorgs(&self) -> Receiver<EventKind<T>> {
self.chain_reorg.subscribe()
}

pub fn has_attestation_subscribers(&self) -> bool {
self.attestation_tx.receiver_count() > 0
}
Expand All @@ -110,4 +121,8 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
pub fn has_exit_subscribers(&self) -> bool {
self.exit_tx.receiver_count() > 0
}

pub fn has_reorg_subscribers(&self) -> bool {
self.chain_reorg.receiver_count() > 0
}
}
67 changes: 66 additions & 1 deletion beacon_node/beacon_chain/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use beacon_chain::{
AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType,
OP_POOL_DB_KEY,
},
WhenSlotSkipped,
StateSkipConfig, WhenSlotSkipped,
};
use operation_pool::PersistedOperationPool;
use state_processing::{
Expand Down Expand Up @@ -139,6 +139,71 @@ fn iterators() {
);
}

#[test]
fn find_reorgs() {
let num_blocks_produced = MinimalEthSpec::slots_per_historical_root() + 1;

let harness = get_harness(VALIDATOR_COUNT);

harness.extend_chain(
num_blocks_produced as usize,
BlockStrategy::OnCanonicalHead,
// No need to produce attestations for this test.
AttestationStrategy::SomeValidators(vec![]),
);

let head_state = harness.chain.head_beacon_state().unwrap();
let head_slot = head_state.slot;
let genesis_state = harness
.chain
.state_at_slot(Slot::new(0), StateSkipConfig::WithStateRoots)
.unwrap();

// because genesis is more than `SLOTS_PER_HISTORICAL_ROOT` away, this should return with the
// finalized slot.
assert_eq!(
harness
.chain
.find_reorg_slot(&genesis_state, harness.chain.genesis_block_root)
.unwrap(),
head_state
.finalized_checkpoint
.epoch
.start_slot(MinimalEthSpec::slots_per_epoch())
);

// test head
assert_eq!(
harness
.chain
.find_reorg_slot(
&head_state,
harness.chain.head_beacon_block().unwrap().canonical_root()
)
.unwrap(),
head_slot
);

// Re-org back to the slot prior to the head.
let prev_slot = head_slot - Slot::new(1);
let prev_state = harness
.chain
.state_at_slot(prev_slot, StateSkipConfig::WithStateRoots)
.unwrap();
let prev_block_root = harness
.chain
.block_root_at_slot(prev_slot, WhenSlotSkipped::None)
.unwrap()
.unwrap();
assert_eq!(
harness
.chain
.find_reorg_slot(&prev_state, prev_block_root)
.unwrap(),
prev_slot
);
}

#[test]
fn chooses_fork() {
let harness = get_harness(VALIDATOR_COUNT);
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2153,6 +2153,9 @@ pub fn serve<T: BeaconChainTypes>(
api_types::EventTopic::FinalizedCheckpoint => {
event_handler.subscribe_finalized()
}
api_types::EventTopic::ChainReorg => {
event_handler.subscribe_reorgs()
}
};

receivers.push(BroadcastStream::new(receiver).map(|msg| {
Expand Down
41 changes: 41 additions & 0 deletions beacon_node/http_api/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ struct ApiTester {
chain: Arc<BeaconChain<EphemeralHarnessType<E>>>,
client: BeaconNodeHttpClient,
next_block: SignedBeaconBlock<E>,
reorg_block: SignedBeaconBlock<E>,
attestations: Vec<Attestation<E>>,
attester_slashing: AttesterSlashing<E>,
proposer_slashing: ProposerSlashing,
Expand Down Expand Up @@ -105,6 +106,10 @@ impl ApiTester {
let (next_block, _next_state) =
harness.make_block(head.beacon_state.clone(), harness.chain.slot().unwrap());

// `make_block` adds random graffiti, so this will produce an alternate block
let (reorg_block, _reorg_state) =
harness.make_block(head.beacon_state.clone(), harness.chain.slot().unwrap());

let head_state_root = head.beacon_state_root();
let attestations = harness
.get_unaggregated_attestations(
Expand Down Expand Up @@ -213,6 +218,7 @@ impl ApiTester {
chain,
client,
next_block,
reorg_block,
attestations,
attester_slashing,
proposer_slashing,
Expand All @@ -238,6 +244,10 @@ impl ApiTester {
let (next_block, _next_state) =
harness.make_block(head.beacon_state.clone(), harness.chain.slot().unwrap());

// `make_block` adds random graffiti, so this will produce an alternate block
let (reorg_block, _reorg_state) =
harness.make_block(head.beacon_state.clone(), harness.chain.slot().unwrap());

let head_state_root = head.beacon_state_root();
let attestations = harness
.get_unaggregated_attestations(
Expand Down Expand Up @@ -320,6 +330,7 @@ impl ApiTester {
chain,
client,
next_block,
reorg_block,
attestations,
attester_slashing,
proposer_slashing,
Expand Down Expand Up @@ -2233,6 +2244,36 @@ impl ApiTester {
&[expected_block, expected_finalized, expected_head]
);

// Test a reorg event
let mut chain_reorg_event_future = self
.client
.get_events::<E>(&[EventTopic::ChainReorg])
.await
.unwrap();

let expected_reorg = EventKind::ChainReorg(SseChainReorg {
slot: self.next_block.slot(),
depth: 1,
old_head_block: self.next_block.canonical_root(),
old_head_state: self.next_block.state_root(),
new_head_block: self.reorg_block.canonical_root(),
new_head_state: self.reorg_block.state_root(),
epoch: self.next_block.slot().epoch(E::slots_per_epoch()),
});

self.client
.post_beacon_blocks(&self.reorg_block)
.await
.unwrap();

let reorg_event = poll_events(
&mut chain_reorg_event_future,
1,
Duration::from_millis(10000),
)
.await;
assert_eq!(reorg_event.as_slice(), &[expected_reorg]);

self
}

Expand Down
Loading