Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use persistent data structures in fork choice #2059

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions beacon_node/beacon_chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,4 @@ regex = "1.3.9"
exit-future = "0.2.0"
slasher = { path = "../../slasher" }
eth2 = { path = "../../common/eth2" }
im = "15.0.0"
14 changes: 10 additions & 4 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1639,7 +1639,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

let mut fork_choice = self.fork_choice.write();
// Perform a cheap clone of the fork choice data structure, which is comprised of smaller
// persistent (copy-on-write) data structures.
// The clone will be enshrined as the official version if all operations succeed.
let mut fork_choice_lock = self.fork_choice.write();
let mut fork_choice = fork_choice_lock.clone();

// Do not import a block that doesn't descend from the finalized root.
let signed_block =
Expand Down Expand Up @@ -1724,9 +1728,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.store.do_atomically(ops)?;
drop(txn_lock);

// The fork choice write-lock is dropped *after* the on-disk database has been updated.
// This prevents inconsistency between the two at the expense of concurrency.
drop(fork_choice);
// Update fork choice atomically after the database write has succeeded. This prevents
// fork choice becoming inconsistent with the on-disk database in the event of a
// write failure, or a concurrent mutation.
*fork_choice_lock = fork_choice;
drop(fork_choice_lock);

let parent_root = block.parent_root;
let slot = block.slot;
Expand Down
27 changes: 7 additions & 20 deletions beacon_node/beacon_chain/src/beacon_fork_choice_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
//! reads when fork choice requires the validator balances of the justified state.

use crate::{metrics, BeaconSnapshot};
use derivative::Derivative;
use fork_choice::ForkChoiceStore;
use im::Vector;
use ssz_derive::{Decode, Encode};
use std::marker::PhantomData;
use std::sync::Arc;
Expand Down Expand Up @@ -72,7 +74,7 @@ struct CacheItem {
/// It is effectively a mapping of `epoch_boundary_block_root -> state.balances`.
#[derive(PartialEq, Clone, Default, Debug, Encode, Decode)]
struct BalancesCache {
items: Vec<CacheItem>,
items: Vector<CacheItem>,
}

impl BalancesCache {
Expand Down Expand Up @@ -109,7 +111,7 @@ impl BalancesCache {
self.items.remove(0);
}

self.items.push(item);
self.items.push_back(item);
}

Ok(())
Expand Down Expand Up @@ -157,8 +159,10 @@ impl BalancesCache {

/// Implements `fork_choice::ForkChoiceStore` in order to provide a persistent backing to the
/// `fork_choice::ForkChoice` struct.
#[derive(Debug)]
#[derive(Debug, Derivative)]
#[derivative(PartialEq, Clone(bound = ""))]
pub struct BeaconForkChoiceStore<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
#[derivative(PartialEq = "ignore")]
store: Arc<HotColdDB<E, Hot, Cold>>,
balances_cache: BalancesCache,
time: Slot,
Expand All @@ -169,23 +173,6 @@ pub struct BeaconForkChoiceStore<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<
_phantom: PhantomData<E>,
}

impl<E, Hot, Cold> PartialEq for BeaconForkChoiceStore<E, Hot, Cold>
where
E: EthSpec,
Hot: ItemStore<E>,
Cold: ItemStore<E>,
{
/// This implementation ignores the `store` and `slot_clock`.
fn eq(&self, other: &Self) -> bool {
self.balances_cache == other.balances_cache
&& self.time == other.time
&& self.finalized_checkpoint == other.finalized_checkpoint
&& self.justified_checkpoint == other.justified_checkpoint
&& self.justified_balances == other.justified_balances
&& self.best_justified_checkpoint == other.best_justified_checkpoint
}
}

impl<E, Hot, Cold> BeaconForkChoiceStore<E, Hot, Cold>
where
E: EthSpec,
Expand Down
4 changes: 2 additions & 2 deletions consensus/fork_choice/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ version = "0.1.0"
authors = ["Paul Hauner <paul@paulhauner.com>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
derivative = "2.1.1"
im = "15.0.0"
types = { path = "../types" }
proto_array = { path = "../proto_array" }
eth2_ssz = "0.1.2"
Expand Down
46 changes: 18 additions & 28 deletions consensus/fork_choice/src/fork_choice.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::marker::PhantomData;

use crate::ForkChoiceStore;
use derivative::Derivative;
use im::Vector;
use proto_array::{Block as ProtoBlock, ProtoArrayForkChoice};
use ssz_derive::{Decode, Encode};
use std::cmp::Ordering;
use std::marker::PhantomData;
use types::{
BeaconBlock, BeaconState, BeaconStateError, Checkpoint, Epoch, EthSpec, Hash256,
IndexedAttestation, RelativeEpoch, ShufflingId, Slot,
};

use crate::ForkChoiceStore;
use std::cmp::Ordering;

/// Defined here:
///
/// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/fork-choice.md#configuration
Expand Down Expand Up @@ -188,8 +188,8 @@ impl<E: EthSpec> From<&IndexedAttestation<E>> for QueuedAttestation {
/// current slot. Also removes those values from `self.queued_attestations`.
fn dequeue_attestations(
current_slot: Slot,
queued_attestations: &mut Vec<QueuedAttestation>,
) -> Vec<QueuedAttestation> {
queued_attestations: &mut Vector<QueuedAttestation>,
) -> Vector<QueuedAttestation> {
let remaining = queued_attestations.split_off(
queued_attestations
.iter()
Expand All @@ -210,28 +210,18 @@ fn dequeue_attestations(
///
/// - Management of the justified state and caching of balances.
/// - Queuing of attestations from the current slot.
#[derive(Clone, Derivative)]
#[derivative(PartialEq(bound = "T: ForkChoiceStore<E> + PartialEq, E: EthSpec"))]
pub struct ForkChoice<T, E> {
/// Storage for `ForkChoice`, modelled off the spec `Store` object.
fc_store: T,
/// The underlying representation of the block DAG.
proto_array: ProtoArrayForkChoice,
/// Attestations that arrived at the current slot and must be queued for later processing.
queued_attestations: Vec<QueuedAttestation>,
queued_attestations: Vector<QueuedAttestation>,
_phantom: PhantomData<E>,
}

impl<T, E> PartialEq for ForkChoice<T, E>
where
T: ForkChoiceStore<E> + PartialEq,
E: EthSpec,
{
fn eq(&self, other: &Self) -> bool {
self.fc_store == other.fc_store
&& self.proto_array == other.proto_array
&& self.queued_attestations == other.queued_attestations
}
}

impl<T, E> ForkChoice<T, E>
where
T: ForkChoiceStore<E>,
Expand Down Expand Up @@ -266,7 +256,7 @@ where
Ok(Self {
fc_store,
proto_array,
queued_attestations: vec![],
queued_attestations: Vector::new(),
_phantom: PhantomData,
})
}
Expand All @@ -278,7 +268,7 @@ where
pub fn from_components(
fc_store: T,
proto_array: ProtoArrayForkChoice,
queued_attestations: Vec<QueuedAttestation>,
queued_attestations: Vector<QueuedAttestation>,
) -> Self {
Self {
fc_store,
Expand Down Expand Up @@ -711,7 +701,7 @@ where
// Delay consideration in the fork choice until their slot is in the past.
// ```
self.queued_attestations
.push(QueuedAttestation::from(attestation));
.push_back(QueuedAttestation::from(attestation));
}

Ok(())
Expand Down Expand Up @@ -800,7 +790,7 @@ where
}

/// Returns a reference to the currently queued attestations.
pub fn queued_attestations(&self) -> &[QueuedAttestation] {
pub fn queued_attestations(&self) -> &Vector<QueuedAttestation> {
&self.queued_attestations
}

Expand Down Expand Up @@ -835,7 +825,7 @@ where
pub fn to_persisted(&self) -> PersistedForkChoice {
PersistedForkChoice {
proto_array_bytes: self.proto_array().as_bytes(),
queued_attestations: self.queued_attestations().to_vec(),
queued_attestations: self.queued_attestations().clone(),
}
}
}
Expand All @@ -846,7 +836,7 @@ where
#[derive(Encode, Decode, Clone)]
pub struct PersistedForkChoice {
proto_array_bytes: Vec<u8>,
queued_attestations: Vec<QueuedAttestation>,
queued_attestations: Vector<QueuedAttestation>,
}

#[cfg(test)]
Expand Down Expand Up @@ -877,7 +867,7 @@ mod tests {
}
}

fn get_queued_attestations() -> Vec<QueuedAttestation> {
fn get_queued_attestations() -> Vector<QueuedAttestation> {
(1..4)
.into_iter()
.map(|i| QueuedAttestation {
Expand All @@ -889,7 +879,7 @@ mod tests {
.collect()
}

fn get_slots(queued_attestations: &[QueuedAttestation]) -> Vec<u64> {
fn get_slots(queued_attestations: &Vector<QueuedAttestation>) -> Vec<u64> {
queued_attestations.iter().map(|a| a.slot.into()).collect()
}

Expand Down
2 changes: 1 addition & 1 deletion consensus/fork_choice/src/fork_choice_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use types::{BeaconBlock, BeaconState, Checkpoint, EthSpec, Hash256, Slot};
/// The primary motivation for defining this as a trait to be implemented upstream rather than a
/// concrete struct is to allow this crate to be free from "impure" on-disk database logic,
/// hopefully making auditing easier.
pub trait ForkChoiceStore<T: EthSpec>: Sized {
pub trait ForkChoiceStore<T: EthSpec>: Sized + Clone {
type Error;

/// Returns the last value passed to `Self::update_time`.
Expand Down
3 changes: 2 additions & 1 deletion consensus/fork_choice/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use fork_choice::{
ForkChoiceStore, InvalidAttestation, InvalidBlock, QueuedAttestation,
SAFE_SLOTS_TO_UPDATE_JUSTIFIED,
};
use im::Vector;
use std::fmt;
use std::sync::Mutex;
use store::{MemoryStore, StoreConfig};
Expand Down Expand Up @@ -141,7 +142,7 @@ impl ForkChoiceTest {
/// Inspect the queued attestations in fork choice.
pub fn inspect_queued_attestations<F>(self, mut func: F) -> Self
where
F: FnMut(&[QueuedAttestation]),
F: FnMut(&Vector<QueuedAttestation>),
{
self.harness
.chain
Expand Down
5 changes: 3 additions & 2 deletions consensus/proto_array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ name = "proto_array"
path = "src/bin.rs"

[dependencies]
im = { version = "15.0.0", features = ["serde"] }
types = { path = "../types" }
eth2_ssz = "0.1.2"
eth2_ssz = { version = "0.1.2", features = ["arc", "im"] }
eth2_ssz_derive = "0.1.0"
serde = "1.0.116"
serde = { version = "1.0.116", features = ["rc"] }
serde_derive = "1.0.116"
serde_yaml = "0.8.13"
Loading