Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store computed custody subnets in PeerDB and fix custody lookup test #6218

Merged
merged 2 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion beacon_node/http_api/src/publish_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
}

if let Some(gossip_verified_data_columns) = gossip_verified_data_columns {
let custody_columns_indices = network_globals.custody_columns(block.epoch(), &chain.spec);
let custody_columns_indices = network_globals.custody_columns(block.epoch());

let custody_columns = gossip_verified_data_columns
.into_iter()
Expand Down
1 change: 1 addition & 0 deletions beacon_node/http_api/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ pub async fn create_api_server<T: BeaconChainTypes>(
vec![],
false,
&log,
chain.spec.clone(),
));

// Only a peer manager can add peers, so we create a dummy manager.
Expand Down
1 change: 1 addition & 0 deletions beacon_node/lighthouse_network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1232,6 +1232,7 @@ mod tests {
vec![],
false,
&log,
spec.clone(),
);
let keypair = keypair.into();
Discovery::new(keypair, &config, Arc::new(globals), &log, &spec)
Expand Down
6 changes: 4 additions & 2 deletions beacon_node/lighthouse_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1383,7 +1383,8 @@ mod tests {
..Default::default()
};
let log = build_log(slog::Level::Debug, false);
let globals = NetworkGlobals::new_test_globals(vec![], &log);
let spec = E::default_spec();
let globals = NetworkGlobals::new_test_globals(vec![], &log, spec);
PeerManager::new(config, Arc::new(globals), &log).unwrap()
}

Expand All @@ -1397,7 +1398,8 @@ mod tests {
..Default::default()
};
let log = build_log(slog::Level::Debug, false);
let globals = NetworkGlobals::new_test_globals(trusted_peers, &log);
let spec = E::default_spec();
let globals = NetworkGlobals::new_test_globals(trusted_peers, &log, spec);
PeerManager::new(config, Arc::new(globals), &log).unwrap()
}

Expand Down
66 changes: 55 additions & 11 deletions beacon_node/lighthouse_network/src/peer_manager/peerdb.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::discovery::enr::PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY;
use crate::discovery::CombinedKey;
use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, Gossipsub, PeerId};
use crate::{
metrics, multiaddr::Multiaddr, types::Subnet, Enr, EnrExt, Eth2Enr, Gossipsub, PeerId,
};
use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo};
use rand::seq::SliceRandom;
use score::{PeerAction, ReportSource, Score, ScoreState};
Expand All @@ -13,7 +15,7 @@ use std::{
fmt::Formatter,
};
use sync_status::SyncStatus;
use types::{ChainSpec, EthSpec};
use types::{ChainSpec, DataColumnSubnetId, EthSpec};

pub mod client;
pub mod peer_info;
Expand Down Expand Up @@ -45,10 +47,16 @@ pub struct PeerDB<E: EthSpec> {
disable_peer_scoring: bool,
/// PeerDB's logger
log: slog::Logger,
spec: ChainSpec,
}

impl<E: EthSpec> PeerDB<E> {
pub fn new(trusted_peers: Vec<PeerId>, disable_peer_scoring: bool, log: &slog::Logger) -> Self {
pub fn new(
trusted_peers: Vec<PeerId>,
disable_peer_scoring: bool,
log: &slog::Logger,
spec: ChainSpec,
) -> Self {
// Initialize the peers hashmap with trusted peers
let peers = trusted_peers
.into_iter()
Expand All @@ -60,6 +68,7 @@ impl<E: EthSpec> PeerDB<E> {
banned_peers_count: BannedPeersCount::default(),
disable_peer_scoring,
peers,
spec,
}
}

Expand Down Expand Up @@ -247,6 +256,27 @@ impl<E: EthSpec> PeerDB<E> {
.map(|(peer_id, _)| peer_id)
}

pub fn good_custody_subnet_peer(
&self,
subnet: DataColumnSubnetId,
) -> impl Iterator<Item = &PeerId> {
self.peers
.iter()
.filter(move |(_, info)| {
// TODO(das): we currently consider peer to be a subnet peer if the peer is *either*
// subscribed to the subnet or assigned to the subnet.
// The first condition is currently required as we don't have custody count in
// metadata implemented yet, and therefore unable to reliably determine custody
// subnet count (ENR is not always available).
// This condition can be removed later so that we can identify peers that are not
// serving custody columns and penalise accordingly.
let is_custody_subnet_peer = info.on_subnet_gossipsub(&Subnet::DataColumn(subnet))
|| info.is_assigned_to_custody_subnet(&subnet);
info.is_connected() && info.is_good_gossipsub_peer() && is_custody_subnet_peer
})
.map(|(peer_id, _)| peer_id)
}

/// Gives the ids of all known disconnected peers.
pub fn disconnected_peers(&self) -> impl Iterator<Item = &PeerId> {
self.peers
Expand Down Expand Up @@ -676,12 +706,12 @@ impl<E: EthSpec> PeerDB<E> {
/// Updates the connection state. MUST ONLY BE USED IN TESTS.
pub fn __add_connected_peer_testing_only(
&mut self,
peer_id: &PeerId,
supernode: bool,
spec: &ChainSpec,
) -> Option<BanOperation> {
) -> PeerId {
let enr_key = CombinedKey::generate_secp256k1();
let mut enr = Enr::builder().build(&enr_key).unwrap();
let peer_id = enr.peer_id();

if supernode {
enr.insert(
Expand All @@ -693,13 +723,15 @@ impl<E: EthSpec> PeerDB<E> {
}

self.update_connection_state(
peer_id,
&peer_id,
NewConnectionState::Connected {
enr: Some(enr),
seen_address: Multiaddr::empty(),
direction: ConnectionDirection::Outgoing,
},
)
);

peer_id
}

/// The connection state of the peer has been changed. Modify the peer in the db to ensure all
Expand Down Expand Up @@ -762,8 +794,17 @@ impl<E: EthSpec> PeerDB<E> {
seen_address,
},
) => {
// Update the ENR if one exists
// Update the ENR if one exists, and compute the custody subnets
if let Some(enr) = enr {
let node_id = enr.node_id().raw().into();
let custody_subnet_count = enr.custody_subnet_count::<E>(&self.spec);
let custody_subnets = DataColumnSubnetId::compute_custody_subnets::<E>(
node_id,
custody_subnet_count,
&self.spec,
)
.collect::<HashSet<_>>();
info.set_custody_subnets(custody_subnets);
info.set_enr(enr);
}

Expand Down Expand Up @@ -1314,7 +1355,8 @@ mod tests {

fn get_db() -> PeerDB<M> {
let log = build_log(slog::Level::Debug, false);
PeerDB::new(vec![], false, &log)
let spec = M::default_spec();
PeerDB::new(vec![], false, &log, spec)
}

#[test]
Expand Down Expand Up @@ -2013,7 +2055,8 @@ mod tests {
fn test_trusted_peers_score() {
let trusted_peer = PeerId::random();
let log = build_log(slog::Level::Debug, false);
let mut pdb: PeerDB<M> = PeerDB::new(vec![trusted_peer], false, &log);
let spec = M::default_spec();
let mut pdb: PeerDB<M> = PeerDB::new(vec![trusted_peer], false, &log, spec);

pdb.connect_ingoing(&trusted_peer, "/ip4/0.0.0.0".parse().unwrap(), None);

Expand All @@ -2037,7 +2080,8 @@ mod tests {
fn test_disable_peer_scoring() {
let peer = PeerId::random();
let log = build_log(slog::Level::Debug, false);
let mut pdb: PeerDB<M> = PeerDB::new(vec![], true, &log);
let spec = M::default_spec();
let mut pdb: PeerDB<M> = PeerDB::new(vec![], true, &log, spec);

pdb.connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap(), None);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::collections::HashSet;
use std::net::IpAddr;
use std::time::Instant;
use strum::AsRefStr;
use types::EthSpec;
use types::{DataColumnSubnetId, EthSpec};
use PeerConnectionStatus::*;

/// Information about a given connected peer.
Expand All @@ -40,6 +40,11 @@ pub struct PeerInfo<E: EthSpec> {
meta_data: Option<MetaData<E>>,
/// Subnets the peer is connected to.
subnets: HashSet<Subnet>,
/// This is computed from either metadata or the ENR, and contains the subnets that the peer
/// is *assigned* to custody, rather than *connected* to (different to `self.subnets`).
/// Note: Another reason to keep this separate to `self.subnets` is an upcoming change to
/// decouple custody requirements from the actual subnets, i.e. changing this to `custody_groups`.
custody_subnets: HashSet<DataColumnSubnetId>,
/// The time we would like to retain this peer. After this time, the peer is no longer
/// necessary.
#[serde(skip)]
Expand All @@ -62,6 +67,7 @@ impl<E: EthSpec> Default for PeerInfo<E> {
listening_addresses: Vec::new(),
seen_multiaddrs: HashSet::new(),
subnets: HashSet::new(),
custody_subnets: HashSet::new(),
sync_status: SyncStatus::Unknown,
meta_data: None,
min_ttl: None,
Expand Down Expand Up @@ -210,6 +216,11 @@ impl<E: EthSpec> PeerInfo<E> {
self.subnets.contains(subnet)
}

/// Returns if the peer is assigned to a given `DataColumnSubnetId`.
pub fn is_assigned_to_custody_subnet(&self, subnet: &DataColumnSubnetId) -> bool {
self.custody_subnets.contains(subnet)
}

/// Returns true if the peer is connected to a long-lived subnet.
pub fn has_long_lived_subnet(&self) -> bool {
// Check the meta_data
Expand Down Expand Up @@ -362,6 +373,10 @@ impl<E: EthSpec> PeerInfo<E> {
self.connection_status = connection_status
}

pub(super) fn set_custody_subnets(&mut self, custody_subnets: HashSet<DataColumnSubnetId>) {
self.custody_subnets = custody_subnets
}

/// Sets the ENR of the peer if one is known.
pub(super) fn set_enr(&mut self, enr: Enr) {
self.enr = Some(enr)
Expand Down
1 change: 1 addition & 0 deletions beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ impl<E: EthSpec> Network<E> {
trusted_peers,
config.disable_peer_scoring,
&log,
ctx.chain_spec.clone(),
);
Arc::new(globals)
};
Expand Down
48 changes: 29 additions & 19 deletions beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
use crate::peer_manager::peerdb::PeerDB;
use crate::rpc::{MetaData, MetaDataV2};
use crate::types::{BackFillState, SyncState};
use crate::EnrExt;
use crate::{Client, Eth2Enr};
use crate::{Enr, GossipTopic, Multiaddr, PeerId};
use crate::{EnrExt, Subnet};
use parking_lot::RwLock;
use std::collections::HashSet;
use types::data_column_sidecar::ColumnIndex;
Expand All @@ -27,6 +27,7 @@ pub struct NetworkGlobals<E: EthSpec> {
pub sync_state: RwLock<SyncState>,
/// The current state of the backfill sync.
pub backfill_state: RwLock<BackFillState>,
spec: ChainSpec,
}

impl<E: EthSpec> NetworkGlobals<E> {
Expand All @@ -36,16 +37,23 @@ impl<E: EthSpec> NetworkGlobals<E> {
trusted_peers: Vec<PeerId>,
disable_peer_scoring: bool,
log: &slog::Logger,
spec: ChainSpec,
) -> Self {
NetworkGlobals {
local_enr: RwLock::new(enr.clone()),
peer_id: RwLock::new(enr.peer_id()),
listen_multiaddrs: RwLock::new(Vec::new()),
local_metadata: RwLock::new(local_metadata),
peers: RwLock::new(PeerDB::new(trusted_peers, disable_peer_scoring, log)),
peers: RwLock::new(PeerDB::new(
trusted_peers,
disable_peer_scoring,
log,
spec.clone(),
)),
gossipsub_subscriptions: RwLock::new(HashSet::new()),
sync_state: RwLock::new(SyncState::Stalled),
backfill_state: RwLock::new(BackFillState::NotRequired),
spec,
}
}

Expand Down Expand Up @@ -112,44 +120,45 @@ impl<E: EthSpec> NetworkGlobals<E> {
}

/// Compute custody data columns the node is assigned to custody.
pub fn custody_columns(&self, _epoch: Epoch, spec: &ChainSpec) -> Vec<ColumnIndex> {
pub fn custody_columns(&self, _epoch: Epoch) -> Vec<ColumnIndex> {
let enr = self.local_enr();
let node_id = enr.node_id().raw().into();
let custody_subnet_count = enr.custody_subnet_count::<E>(spec);
DataColumnSubnetId::compute_custody_columns::<E>(node_id, custody_subnet_count, spec)
let custody_subnet_count = enr.custody_subnet_count::<E>(&self.spec);
DataColumnSubnetId::compute_custody_columns::<E>(node_id, custody_subnet_count, &self.spec)
.collect()
}

/// Compute custody data column subnets the node is assigned to custody.
pub fn custody_subnets(&self, spec: &ChainSpec) -> impl Iterator<Item = DataColumnSubnetId> {
pub fn custody_subnets(&self) -> impl Iterator<Item = DataColumnSubnetId> {
let enr = self.local_enr();
let node_id = enr.node_id().raw().into();
let custody_subnet_count = enr.custody_subnet_count::<E>(spec);
DataColumnSubnetId::compute_custody_subnets::<E>(node_id, custody_subnet_count, spec)
let custody_subnet_count = enr.custody_subnet_count::<E>(&self.spec);
DataColumnSubnetId::compute_custody_subnets::<E>(node_id, custody_subnet_count, &self.spec)
}

/// Returns a connected peer that:
/// 1. is connected
/// 2. assigned to custody the column based on it's `custody_subnet_count` from metadata (WIP)
/// 2. assigned to custody the column based on it's `custody_subnet_count` from ENR or metadata (WIP)
/// 3. has a good score
/// 4. subscribed to the specified column - this condition can be removed later, so we can
/// identify and penalise peers that are supposed to custody the column.
pub fn custody_peers_for_column(
&self,
column_index: ColumnIndex,
spec: &ChainSpec,
) -> Vec<PeerId> {
pub fn custody_peers_for_column(&self, column_index: ColumnIndex) -> Vec<PeerId> {
self.peers
.read()
.good_peers_on_subnet(Subnet::DataColumn(
DataColumnSubnetId::from_column_index::<E>(column_index as usize, spec),
.good_custody_subnet_peer(DataColumnSubnetId::from_column_index::<E>(
column_index as usize,
&self.spec,
))
.cloned()
.collect::<Vec<_>>()
}

/// TESTING ONLY. Build a dummy NetworkGlobals instance.
pub fn new_test_globals(trusted_peers: Vec<PeerId>, log: &slog::Logger) -> NetworkGlobals<E> {
pub fn new_test_globals(
trusted_peers: Vec<PeerId>,
log: &slog::Logger,
spec: ChainSpec,
) -> NetworkGlobals<E> {
use crate::CombinedKeyExt;
let keypair = libp2p::identity::secp256k1::Keypair::generate();
let enr_key: discv5::enr::CombinedKey = discv5::enr::CombinedKey::from_secp256k1(&keypair);
Expand All @@ -164,6 +173,7 @@ impl<E: EthSpec> NetworkGlobals<E> {
trusted_peers,
false,
log,
spec,
)
}
}
Expand All @@ -180,9 +190,9 @@ mod test {
let default_custody_requirement_column_count = spec.number_of_columns as u64
/ spec.data_column_sidecar_subnet_count
* spec.custody_requirement;
let globals = NetworkGlobals::<E>::new_test_globals(vec![], &log);
let globals = NetworkGlobals::<E>::new_test_globals(vec![], &log, spec.clone());
let any_epoch = Epoch::new(0);
let columns = globals.custody_columns(any_epoch, &spec);
let columns = globals.custody_columns(any_epoch);
assert_eq!(
columns.len(),
default_custody_requirement_column_count as usize
Expand Down
Loading
Loading