Skip to content

Commit

Permalink
Store computed custody subnets in PeerDB and fix custody lookup test (#…
Browse files Browse the repository at this point in the history
…6218)

* Fix failing custody lookup tests.

* Store custody subnets in PeerDB, fix custody lookup test and refactor some methods.
  • Loading branch information
jimmygchen authored Aug 8, 2024
1 parent 8c78010 commit 90700fe
Show file tree
Hide file tree
Showing 13 changed files with 154 additions and 72 deletions.
2 changes: 1 addition & 1 deletion beacon_node/http_api/src/publish_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
}

if let Some(gossip_verified_data_columns) = gossip_verified_data_columns {
let custody_columns_indices = network_globals.custody_columns(block.epoch(), &chain.spec);
let custody_columns_indices = network_globals.custody_columns(block.epoch());

let custody_columns = gossip_verified_data_columns
.into_iter()
Expand Down
1 change: 1 addition & 0 deletions beacon_node/http_api/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ pub async fn create_api_server<T: BeaconChainTypes>(
vec![],
false,
&log,
chain.spec.clone(),
));

// Only a peer manager can add peers, so we create a dummy manager.
Expand Down
1 change: 1 addition & 0 deletions beacon_node/lighthouse_network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1232,6 +1232,7 @@ mod tests {
vec![],
false,
&log,
spec.clone(),
);
let keypair = keypair.into();
Discovery::new(keypair, &config, Arc::new(globals), &log, &spec)
Expand Down
6 changes: 4 additions & 2 deletions beacon_node/lighthouse_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1383,7 +1383,8 @@ mod tests {
..Default::default()
};
let log = build_log(slog::Level::Debug, false);
let globals = NetworkGlobals::new_test_globals(vec![], &log);
let spec = E::default_spec();
let globals = NetworkGlobals::new_test_globals(vec![], &log, spec);
PeerManager::new(config, Arc::new(globals), &log).unwrap()
}

Expand All @@ -1397,7 +1398,8 @@ mod tests {
..Default::default()
};
let log = build_log(slog::Level::Debug, false);
let globals = NetworkGlobals::new_test_globals(trusted_peers, &log);
let spec = E::default_spec();
let globals = NetworkGlobals::new_test_globals(trusted_peers, &log, spec);
PeerManager::new(config, Arc::new(globals), &log).unwrap()
}

Expand Down
66 changes: 55 additions & 11 deletions beacon_node/lighthouse_network/src/peer_manager/peerdb.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::discovery::enr::PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY;
use crate::discovery::CombinedKey;
use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, Gossipsub, PeerId};
use crate::{
metrics, multiaddr::Multiaddr, types::Subnet, Enr, EnrExt, Eth2Enr, Gossipsub, PeerId,
};
use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo};
use rand::seq::SliceRandom;
use score::{PeerAction, ReportSource, Score, ScoreState};
Expand All @@ -13,7 +15,7 @@ use std::{
fmt::Formatter,
};
use sync_status::SyncStatus;
use types::{ChainSpec, EthSpec};
use types::{ChainSpec, DataColumnSubnetId, EthSpec};

pub mod client;
pub mod peer_info;
Expand Down Expand Up @@ -45,10 +47,16 @@ pub struct PeerDB<E: EthSpec> {
disable_peer_scoring: bool,
/// PeerDB's logger
log: slog::Logger,
spec: ChainSpec,
}

impl<E: EthSpec> PeerDB<E> {
pub fn new(trusted_peers: Vec<PeerId>, disable_peer_scoring: bool, log: &slog::Logger) -> Self {
pub fn new(
trusted_peers: Vec<PeerId>,
disable_peer_scoring: bool,
log: &slog::Logger,
spec: ChainSpec,
) -> Self {
// Initialize the peers hashmap with trusted peers
let peers = trusted_peers
.into_iter()
Expand All @@ -60,6 +68,7 @@ impl<E: EthSpec> PeerDB<E> {
banned_peers_count: BannedPeersCount::default(),
disable_peer_scoring,
peers,
spec,
}
}

Expand Down Expand Up @@ -247,6 +256,27 @@ impl<E: EthSpec> PeerDB<E> {
.map(|(peer_id, _)| peer_id)
}

pub fn good_custody_subnet_peer(
&self,
subnet: DataColumnSubnetId,
) -> impl Iterator<Item = &PeerId> {
self.peers
.iter()
.filter(move |(_, info)| {
// TODO(das): we currently consider peer to be a subnet peer if the peer is *either*
// subscribed to the subnet or assigned to the subnet.
// The first condition is currently required as we don't have custody count in
// metadata implemented yet, and therefore unable to reliably determine custody
// subnet count (ENR is not always available).
// This condition can be removed later so that we can identify peers that are not
// serving custody columns and penalise accordingly.
let is_custody_subnet_peer = info.on_subnet_gossipsub(&Subnet::DataColumn(subnet))
|| info.is_assigned_to_custody_subnet(&subnet);
info.is_connected() && info.is_good_gossipsub_peer() && is_custody_subnet_peer
})
.map(|(peer_id, _)| peer_id)
}

/// Gives the ids of all known disconnected peers.
pub fn disconnected_peers(&self) -> impl Iterator<Item = &PeerId> {
self.peers
Expand Down Expand Up @@ -676,12 +706,12 @@ impl<E: EthSpec> PeerDB<E> {
/// Updates the connection state. MUST ONLY BE USED IN TESTS.
pub fn __add_connected_peer_testing_only(
&mut self,
peer_id: &PeerId,
supernode: bool,
spec: &ChainSpec,
) -> Option<BanOperation> {
) -> PeerId {
let enr_key = CombinedKey::generate_secp256k1();
let mut enr = Enr::builder().build(&enr_key).unwrap();
let peer_id = enr.peer_id();

if supernode {
enr.insert(
Expand All @@ -693,13 +723,15 @@ impl<E: EthSpec> PeerDB<E> {
}

self.update_connection_state(
peer_id,
&peer_id,
NewConnectionState::Connected {
enr: Some(enr),
seen_address: Multiaddr::empty(),
direction: ConnectionDirection::Outgoing,
},
)
);

peer_id
}

/// The connection state of the peer has been changed. Modify the peer in the db to ensure all
Expand Down Expand Up @@ -762,8 +794,17 @@ impl<E: EthSpec> PeerDB<E> {
seen_address,
},
) => {
// Update the ENR if one exists
// Update the ENR if one exists, and compute the custody subnets
if let Some(enr) = enr {
let node_id = enr.node_id().raw().into();
let custody_subnet_count = enr.custody_subnet_count::<E>(&self.spec);
let custody_subnets = DataColumnSubnetId::compute_custody_subnets::<E>(
node_id,
custody_subnet_count,
&self.spec,
)
.collect::<HashSet<_>>();
info.set_custody_subnets(custody_subnets);
info.set_enr(enr);
}

Expand Down Expand Up @@ -1314,7 +1355,8 @@ mod tests {

fn get_db() -> PeerDB<M> {
let log = build_log(slog::Level::Debug, false);
PeerDB::new(vec![], false, &log)
let spec = M::default_spec();
PeerDB::new(vec![], false, &log, spec)
}

#[test]
Expand Down Expand Up @@ -2013,7 +2055,8 @@ mod tests {
fn test_trusted_peers_score() {
let trusted_peer = PeerId::random();
let log = build_log(slog::Level::Debug, false);
let mut pdb: PeerDB<M> = PeerDB::new(vec![trusted_peer], false, &log);
let spec = M::default_spec();
let mut pdb: PeerDB<M> = PeerDB::new(vec![trusted_peer], false, &log, spec);

pdb.connect_ingoing(&trusted_peer, "/ip4/0.0.0.0".parse().unwrap(), None);

Expand All @@ -2037,7 +2080,8 @@ mod tests {
fn test_disable_peer_scoring() {
let peer = PeerId::random();
let log = build_log(slog::Level::Debug, false);
let mut pdb: PeerDB<M> = PeerDB::new(vec![], true, &log);
let spec = M::default_spec();
let mut pdb: PeerDB<M> = PeerDB::new(vec![], true, &log, spec);

pdb.connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap(), None);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::collections::HashSet;
use std::net::IpAddr;
use std::time::Instant;
use strum::AsRefStr;
use types::EthSpec;
use types::{DataColumnSubnetId, EthSpec};
use PeerConnectionStatus::*;

/// Information about a given connected peer.
Expand All @@ -40,6 +40,11 @@ pub struct PeerInfo<E: EthSpec> {
meta_data: Option<MetaData<E>>,
/// Subnets the peer is connected to.
subnets: HashSet<Subnet>,
/// This is computed from either metadata or the ENR, and contains the subnets that the peer
/// is *assigned* to custody, rather than *connected* to (different to `self.subnets`).
/// Note: Another reason to keep this separate to `self.subnets` is an upcoming change to
/// decouple custody requirements from the actual subnets, i.e. changing this to `custody_groups`.
custody_subnets: HashSet<DataColumnSubnetId>,
/// The time we would like to retain this peer. After this time, the peer is no longer
/// necessary.
#[serde(skip)]
Expand All @@ -62,6 +67,7 @@ impl<E: EthSpec> Default for PeerInfo<E> {
listening_addresses: Vec::new(),
seen_multiaddrs: HashSet::new(),
subnets: HashSet::new(),
custody_subnets: HashSet::new(),
sync_status: SyncStatus::Unknown,
meta_data: None,
min_ttl: None,
Expand Down Expand Up @@ -210,6 +216,11 @@ impl<E: EthSpec> PeerInfo<E> {
self.subnets.contains(subnet)
}

/// Returns if the peer is assigned to a given `DataColumnSubnetId`.
pub fn is_assigned_to_custody_subnet(&self, subnet: &DataColumnSubnetId) -> bool {
self.custody_subnets.contains(subnet)
}

/// Returns true if the peer is connected to a long-lived subnet.
pub fn has_long_lived_subnet(&self) -> bool {
// Check the meta_data
Expand Down Expand Up @@ -362,6 +373,10 @@ impl<E: EthSpec> PeerInfo<E> {
self.connection_status = connection_status
}

pub(super) fn set_custody_subnets(&mut self, custody_subnets: HashSet<DataColumnSubnetId>) {
self.custody_subnets = custody_subnets
}

/// Sets the ENR of the peer if one is known.
pub(super) fn set_enr(&mut self, enr: Enr) {
self.enr = Some(enr)
Expand Down
1 change: 1 addition & 0 deletions beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ impl<E: EthSpec> Network<E> {
trusted_peers,
config.disable_peer_scoring,
&log,
ctx.chain_spec.clone(),
);
Arc::new(globals)
};
Expand Down
48 changes: 29 additions & 19 deletions beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
use crate::peer_manager::peerdb::PeerDB;
use crate::rpc::{MetaData, MetaDataV2};
use crate::types::{BackFillState, SyncState};
use crate::EnrExt;
use crate::{Client, Eth2Enr};
use crate::{Enr, GossipTopic, Multiaddr, PeerId};
use crate::{EnrExt, Subnet};
use parking_lot::RwLock;
use std::collections::HashSet;
use types::data_column_sidecar::ColumnIndex;
Expand All @@ -27,6 +27,7 @@ pub struct NetworkGlobals<E: EthSpec> {
pub sync_state: RwLock<SyncState>,
/// The current state of the backfill sync.
pub backfill_state: RwLock<BackFillState>,
spec: ChainSpec,
}

impl<E: EthSpec> NetworkGlobals<E> {
Expand All @@ -36,16 +37,23 @@ impl<E: EthSpec> NetworkGlobals<E> {
trusted_peers: Vec<PeerId>,
disable_peer_scoring: bool,
log: &slog::Logger,
spec: ChainSpec,
) -> Self {
NetworkGlobals {
local_enr: RwLock::new(enr.clone()),
peer_id: RwLock::new(enr.peer_id()),
listen_multiaddrs: RwLock::new(Vec::new()),
local_metadata: RwLock::new(local_metadata),
peers: RwLock::new(PeerDB::new(trusted_peers, disable_peer_scoring, log)),
peers: RwLock::new(PeerDB::new(
trusted_peers,
disable_peer_scoring,
log,
spec.clone(),
)),
gossipsub_subscriptions: RwLock::new(HashSet::new()),
sync_state: RwLock::new(SyncState::Stalled),
backfill_state: RwLock::new(BackFillState::NotRequired),
spec,
}
}

Expand Down Expand Up @@ -112,44 +120,45 @@ impl<E: EthSpec> NetworkGlobals<E> {
}

/// Compute custody data columns the node is assigned to custody.
pub fn custody_columns(&self, _epoch: Epoch, spec: &ChainSpec) -> Vec<ColumnIndex> {
pub fn custody_columns(&self, _epoch: Epoch) -> Vec<ColumnIndex> {
let enr = self.local_enr();
let node_id = enr.node_id().raw().into();
let custody_subnet_count = enr.custody_subnet_count::<E>(spec);
DataColumnSubnetId::compute_custody_columns::<E>(node_id, custody_subnet_count, spec)
let custody_subnet_count = enr.custody_subnet_count::<E>(&self.spec);
DataColumnSubnetId::compute_custody_columns::<E>(node_id, custody_subnet_count, &self.spec)
.collect()
}

/// Compute custody data column subnets the node is assigned to custody.
pub fn custody_subnets(&self, spec: &ChainSpec) -> impl Iterator<Item = DataColumnSubnetId> {
pub fn custody_subnets(&self) -> impl Iterator<Item = DataColumnSubnetId> {
let enr = self.local_enr();
let node_id = enr.node_id().raw().into();
let custody_subnet_count = enr.custody_subnet_count::<E>(spec);
DataColumnSubnetId::compute_custody_subnets::<E>(node_id, custody_subnet_count, spec)
let custody_subnet_count = enr.custody_subnet_count::<E>(&self.spec);
DataColumnSubnetId::compute_custody_subnets::<E>(node_id, custody_subnet_count, &self.spec)
}

/// Returns a connected peer that:
/// 1. is connected
/// 2. assigned to custody the column based on it's `custody_subnet_count` from metadata (WIP)
/// 2. assigned to custody the column based on it's `custody_subnet_count` from ENR or metadata (WIP)
/// 3. has a good score
/// 4. subscribed to the specified column - this condition can be removed later, so we can
/// identify and penalise peers that are supposed to custody the column.
pub fn custody_peers_for_column(
&self,
column_index: ColumnIndex,
spec: &ChainSpec,
) -> Vec<PeerId> {
pub fn custody_peers_for_column(&self, column_index: ColumnIndex) -> Vec<PeerId> {
self.peers
.read()
.good_peers_on_subnet(Subnet::DataColumn(
DataColumnSubnetId::from_column_index::<E>(column_index as usize, spec),
.good_custody_subnet_peer(DataColumnSubnetId::from_column_index::<E>(
column_index as usize,
&self.spec,
))
.cloned()
.collect::<Vec<_>>()
}

/// TESTING ONLY. Build a dummy NetworkGlobals instance.
pub fn new_test_globals(trusted_peers: Vec<PeerId>, log: &slog::Logger) -> NetworkGlobals<E> {
pub fn new_test_globals(
trusted_peers: Vec<PeerId>,
log: &slog::Logger,
spec: ChainSpec,
) -> NetworkGlobals<E> {
use crate::CombinedKeyExt;
let keypair = libp2p::identity::secp256k1::Keypair::generate();
let enr_key: discv5::enr::CombinedKey = discv5::enr::CombinedKey::from_secp256k1(&keypair);
Expand All @@ -164,6 +173,7 @@ impl<E: EthSpec> NetworkGlobals<E> {
trusted_peers,
false,
log,
spec,
)
}
}
Expand All @@ -180,9 +190,9 @@ mod test {
let default_custody_requirement_column_count = spec.number_of_columns as u64
/ spec.data_column_sidecar_subnet_count
* spec.custody_requirement;
let globals = NetworkGlobals::<E>::new_test_globals(vec![], &log);
let globals = NetworkGlobals::<E>::new_test_globals(vec![], &log, spec.clone());
let any_epoch = Epoch::new(0);
let columns = globals.custody_columns(any_epoch, &spec);
let columns = globals.custody_columns(any_epoch);
assert_eq!(
columns.len(),
default_custody_requirement_column_count as usize
Expand Down
Loading

0 comments on commit 90700fe

Please sign in to comment.