Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tracking for peer heartbeats #1356

Merged
merged 21 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Description of the upcoming release here.

### Added

- [#1356](https://github.com/FuelLabs/fuel-core/pull/1356): Add peer reputation reporting to heartbeat code
- [#1355](https://github.com/FuelLabs/fuel-core/pull/1355): Added new metrics related to block importing, such as tps, sync delays etc
- [#1339](https://github.com/FuelLabs/fuel-core/pull/1339): Adds `baseAssetId` to `FeeParameters` in the GraphQL API.
- [#1331](https://github.com/FuelLabs/fuel-core/pull/1331): Add peer reputation reporting to block import code
Expand All @@ -20,6 +21,7 @@ Description of the upcoming release here.
- [#1286](https://github.com/FuelLabs/fuel-core/pull/1286): Include readable names for test cases where missing.
- [#1274](https://github.com/FuelLabs/fuel-core/pull/1274): Added tests to benchmark block synchronization.
- [#1263](https://github.com/FuelLabs/fuel-core/pull/1263): Add gas benchmarks for `ED19` and `ECR1` instructions.
- [#1331](https://github.com/FuelLabs/fuel-core/pull/1331): Add peer reputation reporting to block import code

### Changed

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions bin/fuel-core/src/cli/run/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,18 @@ pub struct P2PArgs {
/// Cannot be zero.
#[clap(long = "heartbeat-max-failures", default_value = "5", env)]
pub heartbeat_max_failures: NonZeroU32,

/// For peer reputations, the interval at which to check heartbeat health for all peers
#[clap(long = "heartbeat-check-interval", default_value = "5", env)]
pub heartbeat_check_interval: u64,

/// For peer reputations, the maximum average interval between heartbeats for a peer before penalty
#[clap(long = "heartbeat-max-avg-interval", default_value = "20", env)]
pub heartbeat_max_avg_interval: u64,

/// For peer reputations, the maximum time since last heartbeat before penalty
#[clap(long = "heartbeat-max-time-since-last", default_value = "40", env)]
pub heartbeat_max_time_since_last: u64,
}

#[derive(Debug, Clone, Args)]
Expand Down Expand Up @@ -302,6 +314,13 @@ impl P2PArgs {
heartbeat_config,
set_request_timeout: Duration::from_secs(self.request_timeout),
set_connection_keep_alive: Duration::from_secs(self.connection_keep_alive),
heartbeat_check_interval: Duration::from_secs(self.heartbeat_check_interval),
heartbeat_max_avg_interval: Duration::from_secs(
self.heartbeat_max_avg_interval,
),
heartbeat_max_time_since_last: Duration::from_secs(
self.heartbeat_max_time_since_last,
),
info_interval: Some(Duration::from_secs(self.info_interval)),
identify_interval: Some(Duration::from_secs(self.identify_interval)),
metrics,
Expand Down
1 change: 1 addition & 0 deletions crates/services/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ rand = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_with = "1.11"
sha2 = "0.10"
thiserror = "1.0.47"
tokio = { workspace = true, features = ["sync"] }
tracing = { workspace = true }

Expand Down
13 changes: 13 additions & 0 deletions crates/services/p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ pub struct Config<State = Initialized> {
/// Sets the keep-alive timeout of idle connections.
pub set_connection_keep_alive: Duration,

/// Time between checking heartbeat status for all peers
pub heartbeat_check_interval: Duration,
/// Max avg time between heartbeats for a given peer before getting reputation penalty
pub heartbeat_max_avg_interval: Duration,
/// Max time since a given peer has sent a heartbeat before getting reputation penalty
pub heartbeat_max_time_since_last: Duration,

/// Enables prometheus metrics for this fuel-service
pub metrics: bool,

Expand Down Expand Up @@ -176,6 +183,9 @@ impl Config<NotInitialized> {
heartbeat_config: self.heartbeat_config,
set_request_timeout: self.set_request_timeout,
set_connection_keep_alive: self.set_connection_keep_alive,
heartbeat_check_interval: self.heartbeat_check_interval,
heartbeat_max_avg_interval: self.heartbeat_max_time_since_last,
heartbeat_max_time_since_last: self.heartbeat_max_time_since_last,
metrics: self.metrics,
state: Initialized(()),
})
Expand Down Expand Up @@ -218,6 +228,9 @@ impl Config<NotInitialized> {
heartbeat_config: HeartbeatConfig::default(),
set_request_timeout: REQ_RES_TIMEOUT,
set_connection_keep_alive: REQ_RES_TIMEOUT,
heartbeat_check_interval: Duration::from_secs(10),
heartbeat_max_avg_interval: Duration::from_secs(20),
heartbeat_max_time_since_last: Duration::from_secs(40),
info_interval: Some(Duration::from_secs(3)),
identify_interval: Some(Duration::from_secs(5)),
metrics: false,
Expand Down
4 changes: 2 additions & 2 deletions crates/services/p2p/src/p2p_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ impl<Codec: NetworkCodec> FuelP2PService<Codec> {
.collect()
}

pub fn get_peers_ids(&self) -> impl Iterator<Item = &PeerId> {
pub fn get_peers_ids_iter(&self) -> impl Iterator<Item = &PeerId> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't it be clear enough from the method signature that it's an iter?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel strongly about it. I just don't think that someone would expect an iterator from that name.

self.peer_manager.get_peers_ids()
}

Expand Down Expand Up @@ -309,7 +309,7 @@ impl<Codec: NetworkCodec> FuelP2PService<Codec> {
let peer_id = match peer_id {
Some(peer_id) => peer_id,
_ => {
let peers = self.get_peers_ids();
let peers = self.get_peers_ids_iter();
let peers_count = self.peer_manager.total_peers_connected();

if peers_count == 0 {
Expand Down
111 changes: 66 additions & 45 deletions crates/services/p2p/src/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,17 @@ pub struct PeerInfo {
pub score: AppScore,
}

impl Default for PeerInfo {
fn default() -> Self {
impl PeerInfo {
pub fn new(heartbeat_avg_window: u32) -> Self {
Self {
peer_addresses: HashSet::new(),
client_version: None,
heartbeat_data: HeartbeatData::new(heartbeat_avg_window),
score: DEFAULT_APP_SCORE,
client_version: Default::default(),
heartbeat_data: Default::default(),
peer_addresses: Default::default(),
}
}
}

enum PeerInfoInsert {
Addresses(Vec<Multiaddr>),
ClientVersion(String),
HeartbeatData(HeartbeatData),
}

/// Manages Peers and their events
#[derive(Debug)]
pub struct PeerManager {
Expand Down Expand Up @@ -117,14 +111,13 @@ impl PeerManager {
) {
if let Some(time_elapsed) = self
.get_peer_info(peer_id)
.and_then(|info| info.heartbeat_data.seconds_since_last_heartbeat())
.map(|info| info.heartbeat_data.duration_since_last_heartbeat())
{
debug!(target: "fuel-p2p", "Previous heartbeat happened {:?} seconds ago", time_elapsed);
debug!(target: "fuel-p2p", "Previous heartbeat happened {:?} milliseconds ago", time_elapsed.as_millis());
}

let heartbeat_data = HeartbeatData::new(block_height);

self.insert_peer_info(peer_id, PeerInfoInsert::HeartbeatData(heartbeat_data));
let peers = self.get_relevant_peers_mut(peer_id);
update_heartbeat(peers, peer_id, block_height);
}

/// Returns `true` signaling that the peer should be disconnected
Expand All @@ -137,7 +130,8 @@ impl PeerManager {
if initial_connection {
self.handle_initial_connection(peer_id, addresses)
} else {
self.insert_peer_info(peer_id, PeerInfoInsert::Addresses(addresses));
let peers = self.get_relevant_peers_mut(peer_id);
insert_peer_addresses(peers, peer_id, addresses);
false
}
}
Expand All @@ -148,8 +142,9 @@ impl PeerManager {
addresses: Vec<Multiaddr>,
agent_version: String,
) {
self.insert_peer_info(peer_id, PeerInfoInsert::ClientVersion(agent_version));
self.insert_peer_info(peer_id, PeerInfoInsert::Addresses(addresses));
let peers = self.get_relevant_peers_mut(peer_id);
insert_client_version(peers, peer_id, agent_version);
insert_peer_addresses(peers, peer_id, addresses);
}

pub fn batch_update_score_with_decay(&mut self) {
Expand Down Expand Up @@ -197,6 +192,12 @@ impl PeerManager {
self.non_reserved_connected_peers.get(peer_id)
}

pub fn get_all_peers(&self) -> impl Iterator<Item = (&PeerId, &PeerInfo)> {
self.non_reserved_connected_peers
.iter()
.chain(self.reserved_connected_peers.iter())
}

pub fn get_disconnected_reserved_peers(&self) -> impl Iterator<Item = &PeerId> {
self.reserved_peers
.iter()
Expand Down Expand Up @@ -255,6 +256,8 @@ impl PeerManager {
peer_id: &PeerId,
addresses: Vec<Multiaddr>,
) -> bool {
const HEARTBEAT_AVG_WINDOW: u32 = 10;

// if the connected Peer is not from the reserved peers
if !self.reserved_peers.contains(peer_id) {
let non_reserved_peers_connected = self.non_reserved_connected_peers.len();
Expand All @@ -272,15 +275,16 @@ impl PeerManager {
}

self.non_reserved_connected_peers
.insert(*peer_id, PeerInfo::default());
.insert(*peer_id, PeerInfo::new(HEARTBEAT_AVG_WINDOW));
} else {
self.reserved_connected_peers
.insert(*peer_id, PeerInfo::default());
.insert(*peer_id, PeerInfo::new(HEARTBEAT_AVG_WINDOW));

self.send_reserved_peers_update();
}

self.insert_peer_info(peer_id, PeerInfoInsert::Addresses(addresses));
let peers = self.get_relevant_peers_mut(peer_id);
insert_peer_addresses(peers, peer_id, addresses);

false
}
Expand All @@ -291,22 +295,14 @@ impl PeerManager {
.send(self.reserved_connected_peers.len());
}

fn insert_peer_info(&mut self, peer_id: &PeerId, data: PeerInfoInsert) {
let peers = if self.reserved_peers.contains(peer_id) {
fn get_relevant_peers_mut(
MitchTurner marked this conversation as resolved.
Show resolved Hide resolved
&mut self,
peer_id: &PeerId,
) -> &mut HashMap<PeerId, PeerInfo> {
if self.reserved_peers.contains(peer_id) {
&mut self.reserved_connected_peers
} else {
&mut self.non_reserved_connected_peers
};
match data {
PeerInfoInsert::Addresses(addresses) => {
insert_peer_addresses(peers, peer_id, addresses)
}
PeerInfoInsert::ClientVersion(client_version) => {
insert_client_version(peers, peer_id, client_version)
}
PeerInfoInsert::HeartbeatData(block_height) => {
insert_heartbeat_data(peers, peer_id, block_height)
}
}
}
}
Expand All @@ -316,22 +312,47 @@ pub struct ConnectionState {
peers_allowed: bool,
}

#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone)]
pub struct HeartbeatData {
pub block_height: Option<BlockHeight>,
pub last_heartbeat: Option<Instant>,
pub last_heartbeat: Instant,
// Size of moving average window
pub window: u32,
// How many heartbeats into the first window have been received
pub count: u32,
// Moving average of duration between heartbeats
pub moving_average: Duration,
}

impl HeartbeatData {
pub fn new(block_height: BlockHeight) -> Self {
pub fn new(window: u32) -> Self {
Self {
block_height: Some(block_height),
last_heartbeat: Some(Instant::now()),
block_height: None,
last_heartbeat: Instant::now(),
window,
count: 0,
moving_average: Duration::from_secs(0),
}
}

pub fn seconds_since_last_heartbeat(&self) -> Option<Duration> {
self.last_heartbeat.map(|time| time.elapsed())
pub fn duration_since_last_heartbeat(&self) -> Duration {
self.last_heartbeat.elapsed()
}

pub fn average_time_between_heartbeats(&self) -> Duration {
self.moving_average
}

pub fn update(&mut self, block_height: BlockHeight) {
self.block_height = Some(block_height);
let old_hearbeat = self.last_heartbeat;
self.last_heartbeat = Instant::now();
let new_duration = self.last_heartbeat - old_hearbeat;
if self.count < self.window {
self.count += 1;
}
self.moving_average =
(self.moving_average * (self.count - 1) + new_duration) / self.count;
}
}

Expand Down Expand Up @@ -369,13 +390,13 @@ fn insert_peer_addresses(
}
}

fn insert_heartbeat_data(
fn update_heartbeat(
peers: &mut HashMap<PeerId, PeerInfo>,
peer_id: &PeerId,
heartbeat_data: HeartbeatData,
block_height: BlockHeight,
) {
if let Some(peer) = peers.get_mut(peer_id) {
peer.heartbeat_data = heartbeat_data;
peer.heartbeat_data.update(block_height);
} else {
log_missing_peer(peer_id);
}
Expand Down
9 changes: 7 additions & 2 deletions crates/services/p2p/src/request_response/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use serde_with::{
serde_as,
FromInto,
};
use thiserror::Error;
use tokio::sync::oneshot;

pub(crate) const REQUEST_RESPONSE_PROTOCOL_ID: &[u8] = b"/fuel/req_res/0.0.1";
Expand Down Expand Up @@ -80,14 +81,18 @@ pub enum OutboundResponse {
Transactions(Option<Arc<Vec<Transaction>>>),
}

#[derive(Debug)]
#[derive(Debug, Error)]
pub enum RequestError {
#[error("Not currently connected to any peers")]
NoPeersConnected,
}

#[derive(Debug, Eq, PartialEq)]
#[derive(Debug, Eq, PartialEq, Error)]
pub enum ResponseError {
#[error("Response channel does not exist")]
ResponseChannelDoesNotExist,
#[error("Failed to send response")]
SendingResponseFailed,
#[error("Failed to convert response to intermediate format")]
ConversionToIntermediateFailed,
}
Loading