Skip to content

Commit

Permalink
Add tracking for peer heartbeats (#1356)
Browse files Browse the repository at this point in the history
FuelLabs/fuel-core#1348

In order to track peer heartbeats, this PR adds two new things:

- A concept of a rolling/moving average for the time between heartbeats
for each peer
    - This is updated every time a heartbeat is sent from that peer
- A regular timeout in the P2P `Task` that will check peers for good
heartbeat behavior

_Originally I was looking at just reporting on heartbeat, but if a peer
never reported then they would be exempt from checks_

There are ~3~ 2 cases in which we will want to punish peers for their
heartbeat behavior:

- ~They have never sent us a heartbeat~ (If we treat the setup as the
first heartbeat, then we can merge this with the next 👇)
- They haven't sent us a heartbeat in a long time
- They are sending heartbeats very sporadically

We can also use this data to determine which peers are performing the
best

---------

Co-authored-by: Brandon Kite <brandonkite92@gmail.com>
  • Loading branch information
crypto523 and Voxelot committed Sep 15, 2023
1 parent 7e6e12f commit aab8af2
Show file tree
Hide file tree
Showing 12 changed files with 727 additions and 90 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Description of the upcoming release here.

### Added

- [#1356](https://github.com/FuelLabs/fuel-core/pull/1356): Add peer reputation reporting to heartbeat code
- [#1355](https://github.com/FuelLabs/fuel-core/pull/1355): Added new metrics related to block importing, such as tps, sync delays etc
- [#1339](https://github.com/FuelLabs/fuel-core/pull/1339): Adds `baseAssetId` to `FeeParameters` in the GraphQL API.
- [#1331](https://github.com/FuelLabs/fuel-core/pull/1331): Add peer reputation reporting to block import code
Expand All @@ -20,6 +21,7 @@ Description of the upcoming release here.
- [#1286](https://github.com/FuelLabs/fuel-core/pull/1286): Include readable names for test cases where missing.
- [#1274](https://github.com/FuelLabs/fuel-core/pull/1274): Added tests to benchmark block synchronization.
- [#1263](https://github.com/FuelLabs/fuel-core/pull/1263): Add gas benchmarks for `ED19` and `ECR1` instructions.
- [#1331](https://github.com/FuelLabs/fuel-core/pull/1331): Add peer reputation reporting to block import code

### Changed

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions bin/fuel-core/src/cli/run/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,18 @@ pub struct P2PArgs {
/// Cannot be zero.
#[clap(long = "heartbeat-max-failures", default_value = "5", env)]
pub heartbeat_max_failures: NonZeroU32,

/// For peer reputations, the interval at which to check heartbeat health for all peers
#[clap(long = "heartbeat-check-interval", default_value = "5", env)]
pub heartbeat_check_interval: u64,

/// For peer reputations, the maximum average interval between heartbeats for a peer before penalty
#[clap(long = "heartbeat-max-avg-interval", default_value = "20", env)]
pub heartbeat_max_avg_interval: u64,

/// For peer reputations, the maximum time since last heartbeat before penalty
#[clap(long = "heartbeat-max-time-since-last", default_value = "40", env)]
pub heartbeat_max_time_since_last: u64,
}

#[derive(Debug, Clone, Args)]
Expand Down Expand Up @@ -302,6 +314,13 @@ impl P2PArgs {
heartbeat_config,
set_request_timeout: Duration::from_secs(self.request_timeout),
set_connection_keep_alive: Duration::from_secs(self.connection_keep_alive),
heartbeat_check_interval: Duration::from_secs(self.heartbeat_check_interval),
heartbeat_max_avg_interval: Duration::from_secs(
self.heartbeat_max_avg_interval,
),
heartbeat_max_time_since_last: Duration::from_secs(
self.heartbeat_max_time_since_last,
),
info_interval: Some(Duration::from_secs(self.info_interval)),
identify_interval: Some(Duration::from_secs(self.identify_interval)),
metrics,
Expand Down
3 changes: 2 additions & 1 deletion crates/services/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ rand = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_with = "1.11"
sha2 = "0.10"
thiserror = "1.0.47"
tokio = { workspace = true, features = ["sync"] }
tracing = { workspace = true }

Expand All @@ -70,7 +71,7 @@ fuel-core-types = { path = "../../types", features = [
"test-helpers"
] }
rand = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio = { workspace = true, features = ["full", "test-util"] }
tracing-attributes = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }

Expand Down
13 changes: 13 additions & 0 deletions crates/services/p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ pub struct Config<State = Initialized> {
/// Sets the keep-alive timeout of idle connections.
pub set_connection_keep_alive: Duration,

/// Time between checking heartbeat status for all peers
pub heartbeat_check_interval: Duration,
/// Max avg time between heartbeats for a given peer before getting reputation penalty
pub heartbeat_max_avg_interval: Duration,
/// Max time since a given peer has sent a heartbeat before getting reputation penalty
pub heartbeat_max_time_since_last: Duration,

/// Enables prometheus metrics for this fuel-service
pub metrics: bool,

Expand Down Expand Up @@ -176,6 +183,9 @@ impl Config<NotInitialized> {
heartbeat_config: self.heartbeat_config,
set_request_timeout: self.set_request_timeout,
set_connection_keep_alive: self.set_connection_keep_alive,
heartbeat_check_interval: self.heartbeat_check_interval,
heartbeat_max_avg_interval: self.heartbeat_max_time_since_last,
heartbeat_max_time_since_last: self.heartbeat_max_time_since_last,
metrics: self.metrics,
state: Initialized(()),
})
Expand Down Expand Up @@ -218,6 +228,9 @@ impl Config<NotInitialized> {
heartbeat_config: HeartbeatConfig::default(),
set_request_timeout: REQ_RES_TIMEOUT,
set_connection_keep_alive: REQ_RES_TIMEOUT,
heartbeat_check_interval: Duration::from_secs(10),
heartbeat_max_avg_interval: Duration::from_secs(20),
heartbeat_max_time_since_last: Duration::from_secs(40),
info_interval: Some(Duration::from_secs(3)),
identify_interval: Some(Duration::from_secs(5)),
metrics: false,
Expand Down
4 changes: 2 additions & 2 deletions crates/services/p2p/src/p2p_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ impl<Codec: NetworkCodec> FuelP2PService<Codec> {
.collect()
}

pub fn get_peers_ids(&self) -> impl Iterator<Item = &PeerId> {
pub fn get_peers_ids_iter(&self) -> impl Iterator<Item = &PeerId> {
self.peer_manager.get_peers_ids()
}

Expand Down Expand Up @@ -309,7 +309,7 @@ impl<Codec: NetworkCodec> FuelP2PService<Codec> {
let peer_id = match peer_id {
Some(peer_id) => peer_id,
_ => {
let peers = self.get_peers_ids();
let peers = self.get_peers_ids_iter();
let peers_count = self.peer_manager.total_peers_connected();

if peers_count == 0 {
Expand Down
100 changes: 40 additions & 60 deletions crates/services/p2p/src/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,18 @@ use std::{
Arc,
RwLock,
},
time::Duration,
};
use tokio::time::Instant;
use tracing::{
debug,
info,
};

use crate::gossipsub_config::GRAYLIST_THRESHOLD;
use crate::{
gossipsub_config::GRAYLIST_THRESHOLD,
peer_manager::heartbeat_data::HeartbeatData,
};

pub mod heartbeat_data;

/// At this point we better just ban the peer
const MIN_GOSSIPSUB_SCORE_BEFORE_BAN: AppScore = GRAYLIST_THRESHOLD;
Expand All @@ -44,23 +47,17 @@ pub struct PeerInfo {
pub score: AppScore,
}

impl Default for PeerInfo {
fn default() -> Self {
impl PeerInfo {
pub fn new(heartbeat_avg_window: u32) -> Self {
Self {
peer_addresses: HashSet::new(),
client_version: None,
heartbeat_data: HeartbeatData::new(heartbeat_avg_window),
score: DEFAULT_APP_SCORE,
client_version: Default::default(),
heartbeat_data: Default::default(),
peer_addresses: Default::default(),
}
}
}

enum PeerInfoInsert {
Addresses(Vec<Multiaddr>),
ClientVersion(String),
HeartbeatData(HeartbeatData),
}

/// Manages Peers and their events
#[derive(Debug)]
pub struct PeerManager {
Expand Down Expand Up @@ -117,14 +114,13 @@ impl PeerManager {
) {
if let Some(time_elapsed) = self
.get_peer_info(peer_id)
.and_then(|info| info.heartbeat_data.seconds_since_last_heartbeat())
.map(|info| info.heartbeat_data.duration_since_last_heartbeat())
{
debug!(target: "fuel-p2p", "Previous heartbeat happened {:?} seconds ago", time_elapsed);
debug!(target: "fuel-p2p", "Previous heartbeat happened {:?} milliseconds ago", time_elapsed.as_millis());
}

let heartbeat_data = HeartbeatData::new(block_height);

self.insert_peer_info(peer_id, PeerInfoInsert::HeartbeatData(heartbeat_data));
let peers = self.get_assigned_peer_table_mut(peer_id);
update_heartbeat(peers, peer_id, block_height);
}

/// Returns `true` signaling that the peer should be disconnected
Expand All @@ -137,7 +133,8 @@ impl PeerManager {
if initial_connection {
self.handle_initial_connection(peer_id, addresses)
} else {
self.insert_peer_info(peer_id, PeerInfoInsert::Addresses(addresses));
let peers = self.get_assigned_peer_table_mut(peer_id);
insert_peer_addresses(peers, peer_id, addresses);
false
}
}
Expand All @@ -148,8 +145,9 @@ impl PeerManager {
addresses: Vec<Multiaddr>,
agent_version: String,
) {
self.insert_peer_info(peer_id, PeerInfoInsert::ClientVersion(agent_version));
self.insert_peer_info(peer_id, PeerInfoInsert::Addresses(addresses));
let peers = self.get_assigned_peer_table_mut(peer_id);
insert_client_version(peers, peer_id, agent_version);
insert_peer_addresses(peers, peer_id, addresses);
}

pub fn batch_update_score_with_decay(&mut self) {
Expand Down Expand Up @@ -197,6 +195,12 @@ impl PeerManager {
self.non_reserved_connected_peers.get(peer_id)
}

pub fn get_all_peers(&self) -> impl Iterator<Item = (&PeerId, &PeerInfo)> {
self.non_reserved_connected_peers
.iter()
.chain(self.reserved_connected_peers.iter())
}

pub fn get_disconnected_reserved_peers(&self) -> impl Iterator<Item = &PeerId> {
self.reserved_peers
.iter()
Expand Down Expand Up @@ -255,6 +259,8 @@ impl PeerManager {
peer_id: &PeerId,
addresses: Vec<Multiaddr>,
) -> bool {
const HEARTBEAT_AVG_WINDOW: u32 = 10;

// if the connected Peer is not from the reserved peers
if !self.reserved_peers.contains(peer_id) {
let non_reserved_peers_connected = self.non_reserved_connected_peers.len();
Expand All @@ -272,15 +278,16 @@ impl PeerManager {
}

self.non_reserved_connected_peers
.insert(*peer_id, PeerInfo::default());
.insert(*peer_id, PeerInfo::new(HEARTBEAT_AVG_WINDOW));
} else {
self.reserved_connected_peers
.insert(*peer_id, PeerInfo::default());
.insert(*peer_id, PeerInfo::new(HEARTBEAT_AVG_WINDOW));

self.send_reserved_peers_update();
}

self.insert_peer_info(peer_id, PeerInfoInsert::Addresses(addresses));
let peers = self.get_assigned_peer_table_mut(peer_id);
insert_peer_addresses(peers, peer_id, addresses);

false
}
Expand All @@ -291,22 +298,14 @@ impl PeerManager {
.send(self.reserved_connected_peers.len());
}

fn insert_peer_info(&mut self, peer_id: &PeerId, data: PeerInfoInsert) {
let peers = if self.reserved_peers.contains(peer_id) {
fn get_assigned_peer_table_mut(
&mut self,
peer_id: &PeerId,
) -> &mut HashMap<PeerId, PeerInfo> {
if self.reserved_peers.contains(peer_id) {
&mut self.reserved_connected_peers
} else {
&mut self.non_reserved_connected_peers
};
match data {
PeerInfoInsert::Addresses(addresses) => {
insert_peer_addresses(peers, peer_id, addresses)
}
PeerInfoInsert::ClientVersion(client_version) => {
insert_client_version(peers, peer_id, client_version)
}
PeerInfoInsert::HeartbeatData(block_height) => {
insert_heartbeat_data(peers, peer_id, block_height)
}
}
}
}
Expand All @@ -316,25 +315,6 @@ pub struct ConnectionState {
peers_allowed: bool,
}

#[derive(Debug, Clone, Default)]
pub struct HeartbeatData {
pub block_height: Option<BlockHeight>,
pub last_heartbeat: Option<Instant>,
}

impl HeartbeatData {
pub fn new(block_height: BlockHeight) -> Self {
Self {
block_height: Some(block_height),
last_heartbeat: Some(Instant::now()),
}
}

pub fn seconds_since_last_heartbeat(&self) -> Option<Duration> {
self.last_heartbeat.map(|time| time.elapsed())
}
}

impl ConnectionState {
pub fn new() -> Arc<RwLock<Self>> {
Arc::new(RwLock::new(Self {
Expand Down Expand Up @@ -369,13 +349,13 @@ fn insert_peer_addresses(
}
}

fn insert_heartbeat_data(
fn update_heartbeat(
peers: &mut HashMap<PeerId, PeerInfo>,
peer_id: &PeerId,
heartbeat_data: HeartbeatData,
block_height: BlockHeight,
) {
if let Some(peer) = peers.get_mut(peer_id) {
peer.heartbeat_data = heartbeat_data;
peer.heartbeat_data.update(block_height);
} else {
log_missing_peer(peer_id);
}
Expand Down
Loading

0 comments on commit aab8af2

Please sign in to comment.