-
Notifications
You must be signed in to change notification settings - Fork 745
/
mod.rs
2370 lines (2168 loc) · 97.3 KB
/
mod.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
//! Implementation of Lighthouse's peer management system.
use crate::discovery::enr_ext::EnrExt;
use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode};
use crate::service::TARGET_SUBNET_PEERS;
use crate::{error, metrics, Gossipsub};
use crate::{NetworkGlobals, PeerId};
use crate::{Subnet, SubnetDiscovery};
use delay_map::HashSetDelay;
use discv5::Enr;
use libp2p::identify::Info as IdentifyInfo;
use lru_cache::LRUTimeCache;
use peerdb::{BanOperation, BanResult, ScoreUpdateResult};
use rand::seq::SliceRandom;
use slog::{debug, error, trace, warn};
use smallvec::SmallVec;
use std::{
sync::Arc,
time::{Duration, Instant},
};
use types::{EthSpec, SyncSubnetId};
pub use libp2p::core::Multiaddr;
pub use libp2p::identity::Keypair;
#[allow(clippy::mutable_key_type)] // PeerId in hashmaps are no longer permitted by clippy
pub mod peerdb;
use crate::peer_manager::peerdb::client::ClientKind;
use libp2p::multiaddr;
pub use peerdb::peer_info::{
ConnectionDirection, PeerConnectionStatus, PeerConnectionStatus::*, PeerInfo,
};
use peerdb::score::{PeerAction, ReportSource};
pub use peerdb::sync_status::{SyncInfo, SyncStatus};
use std::collections::{hash_map::Entry, HashMap};
use std::net::IpAddr;
use strum::IntoEnumIterator;
pub mod config;
mod network_behaviour;
/// The heartbeat performs regular updates such as updating reputations and performing discovery
/// requests. This defines the interval in seconds.
const HEARTBEAT_INTERVAL: u64 = 30;
/// The minimum amount of time we allow peers to reconnect to us after a disconnect when we are
/// saturated with peers. This effectively looks like a swarm BAN for this amount of time.
pub const PEER_RECONNECTION_TIMEOUT: Duration = Duration::from_secs(600);
/// This is used in the pruning logic. We avoid pruning peers on sync-committees if doing so would
/// lower our peer count below this number. Instead we favour a non-uniform distribution of subnet
/// peers.
pub const MIN_SYNC_COMMITTEE_PEERS: u64 = 2;
/// A fraction of `PeerManager::target_peers` that we allow to connect to us in excess of
/// `PeerManager::target_peers`. For clarity, if `PeerManager::target_peers` is 50 and
/// PEER_EXCESS_FACTOR = 0.1 we allow 10% more nodes, i.e 55.
pub const PEER_EXCESS_FACTOR: f32 = 0.1;
/// A fraction of `PeerManager::target_peers` that we want to be outbound-only connections.
pub const TARGET_OUTBOUND_ONLY_FACTOR: f32 = 0.3;
/// A fraction of `PeerManager::target_peers` that if we get below, we start a discovery query to
/// reach our target. MIN_OUTBOUND_ONLY_FACTOR must be < TARGET_OUTBOUND_ONLY_FACTOR.
pub const MIN_OUTBOUND_ONLY_FACTOR: f32 = 0.2;
/// The fraction of extra peers beyond the PEER_EXCESS_FACTOR that we allow us to dial for when
/// requiring subnet peers. More specifically, if our target peer limit is 50, and our excess peer
/// limit is 55, and we are at 55 peers, the following parameter provisions a few more slots of
/// dialing priority peers we need for validator duties.
pub const PRIORITY_PEER_EXCESS: f32 = 0.2;
/// The main struct that handles peer's reputation and connection status.
pub struct PeerManager<TSpec: EthSpec> {
/// Storage of network globals to access the `PeerDB`.
network_globals: Arc<NetworkGlobals<TSpec>>,
/// A queue of events that the `PeerManager` is waiting to produce.
events: SmallVec<[PeerManagerEvent; 16]>,
/// A collection of inbound-connected peers awaiting to be Ping'd.
inbound_ping_peers: HashSetDelay<PeerId>,
/// A collection of outbound-connected peers awaiting to be Ping'd.
outbound_ping_peers: HashSetDelay<PeerId>,
/// A collection of peers awaiting to be Status'd.
status_peers: HashSetDelay<PeerId>,
/// The target number of peers we would like to connect to.
target_peers: usize,
/// Peers queued to be dialed.
peers_to_dial: Vec<Enr>,
/// The number of temporarily banned peers. This is used to prevent instantaneous
/// reconnection.
// NOTE: This just prevents re-connections. The state of the peer is otherwise unaffected. A
// peer can be in a disconnected state and new connections will be refused and logged as if the
// peer is banned without it being reflected in the peer's state.
// Also the banned state can out-last the peer's reference in the peer db. So peers that are
// unknown to us can still be temporarily banned. This is fundamentally a relationship with
// the swarm. Regardless of our knowledge of the peer in the db, it will be temporarily banned
// at the swarm layer.
// NOTE: An LRUTimeCache is used compared to a structure that needs to be polled to avoid very
// frequent polling to unban peers. Instead, this cache piggy-backs the PeerManager heartbeat
// to update and clear the cache. Therefore the PEER_RECONNECTION_TIMEOUT only has a resolution
// of the HEARTBEAT_INTERVAL.
temporary_banned_peers: LRUTimeCache<PeerId>,
/// A collection of sync committee subnets that we need to stay subscribed to.
/// Sync committee subnets are longer term (256 epochs). Hence, we need to re-run
/// discovery queries for subnet peers if we disconnect from existing sync
/// committee subnet peers.
sync_committee_subnets: HashMap<SyncSubnetId, Instant>,
/// The heartbeat interval to perform routine maintenance.
heartbeat: tokio::time::Interval,
/// Keeps track of whether the discovery service is enabled or not.
discovery_enabled: bool,
/// Keeps track if the current instance is reporting metrics or not.
metrics_enabled: bool,
/// Keeps track of whether the QUIC protocol is enabled or not.
quic_enabled: bool,
/// The logger associated with the `PeerManager`.
log: slog::Logger,
}
/// The events that the `PeerManager` outputs (requests).
#[derive(Debug)]
pub enum PeerManagerEvent {
/// A peer has dialed us.
PeerConnectedIncoming(PeerId),
/// A peer has been dialed.
PeerConnectedOutgoing(PeerId),
/// A peer has disconnected.
PeerDisconnected(PeerId),
/// Sends a STATUS to a peer.
Status(PeerId),
/// Sends a PING to a peer.
Ping(PeerId),
/// Request METADATA from a peer.
MetaData(PeerId),
/// The peer should be disconnected.
DisconnectPeer(PeerId, GoodbyeReason),
/// Inform the behaviour to ban this peer and associated ip addresses.
Banned(PeerId, Vec<IpAddr>),
/// The peer should be unbanned with the associated ip addresses.
UnBanned(PeerId, Vec<IpAddr>),
/// Request the behaviour to discover more peers and the amount of peers to discover.
DiscoverPeers(usize),
/// Request the behaviour to discover peers on subnets.
DiscoverSubnetPeers(Vec<SubnetDiscovery>),
}
impl<TSpec: EthSpec> PeerManager<TSpec> {
// NOTE: Must be run inside a tokio executor.
pub fn new(
cfg: config::Config,
network_globals: Arc<NetworkGlobals<TSpec>>,
log: &slog::Logger,
) -> error::Result<Self> {
let config::Config {
discovery_enabled,
metrics_enabled,
target_peer_count,
status_interval,
ping_interval_inbound,
ping_interval_outbound,
quic_enabled,
} = cfg;
// Set up the peer manager heartbeat interval
let heartbeat = tokio::time::interval(tokio::time::Duration::from_secs(HEARTBEAT_INTERVAL));
Ok(PeerManager {
network_globals,
events: SmallVec::new(),
peers_to_dial: Default::default(),
inbound_ping_peers: HashSetDelay::new(Duration::from_secs(ping_interval_inbound)),
outbound_ping_peers: HashSetDelay::new(Duration::from_secs(ping_interval_outbound)),
status_peers: HashSetDelay::new(Duration::from_secs(status_interval)),
target_peers: target_peer_count,
temporary_banned_peers: LRUTimeCache::new(PEER_RECONNECTION_TIMEOUT),
sync_committee_subnets: Default::default(),
heartbeat,
discovery_enabled,
metrics_enabled,
quic_enabled,
log: log.clone(),
})
}
/* Public accessible functions */
/// The application layer wants to disconnect from a peer for a particular reason.
///
/// All instant disconnections are fatal and we ban the associated peer.
///
/// This will send a goodbye and disconnect the peer if it is connected or dialing.
pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason, source: ReportSource) {
// Update the sync status if required
if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
debug!(self.log, "Sending goodbye to peer"; "peer_id" => %peer_id, "reason" => %reason, "score" => %info.score());
if matches!(reason, GoodbyeReason::IrrelevantNetwork) {
info.update_sync_status(SyncStatus::IrrelevantPeer);
}
}
self.report_peer(
peer_id,
PeerAction::Fatal,
source,
Some(reason),
"goodbye_peer",
);
}
/// Reports a peer for some action.
///
/// If the peer doesn't exist, log a warning and insert defaults.
pub fn report_peer(
&mut self,
peer_id: &PeerId,
action: PeerAction,
source: ReportSource,
reason: Option<GoodbyeReason>,
msg: &'static str,
) {
let action = self
.network_globals
.peers
.write()
.report_peer(peer_id, action, source, msg);
self.handle_score_action(peer_id, action, reason);
}
/// Upon adjusting a Peer's score, there are times the peer manager must pass messages up to
/// libp2p. This function handles the conditional logic associated with each score update
/// result.
fn handle_score_action(
&mut self,
peer_id: &PeerId,
action: ScoreUpdateResult,
reason: Option<GoodbyeReason>,
) {
match action {
ScoreUpdateResult::Ban(ban_operation) => {
// The peer has been banned and we need to handle the banning operation
// NOTE: When we ban a peer, its IP address can be banned. We do not recursively search
// through all our connected peers banning all other peers that are using this IP address.
// If these peers are behaving fine, we permit their current connections. However, if any new
// nodes or current nodes try to reconnect on a banned IP, they will be instantly banned
// and disconnected.
self.handle_ban_operation(peer_id, ban_operation, reason);
}
ScoreUpdateResult::Disconnect => {
// The peer has transitioned to a disconnect state and has been marked as such in
// the peer db. We must inform libp2p to disconnect this peer.
self.inbound_ping_peers.remove(peer_id);
self.outbound_ping_peers.remove(peer_id);
self.events.push(PeerManagerEvent::DisconnectPeer(
*peer_id,
GoodbyeReason::BadScore,
));
}
ScoreUpdateResult::NoAction => {
// The report had no effect on the peer and there is nothing to do.
}
ScoreUpdateResult::Unbanned(unbanned_ips) => {
// Inform the Swarm to unban the peer
self.events
.push(PeerManagerEvent::UnBanned(*peer_id, unbanned_ips));
}
}
}
/// If a peer is being banned, this handles the banning operation.
fn handle_ban_operation(
&mut self,
peer_id: &PeerId,
ban_operation: BanOperation,
reason: Option<GoodbyeReason>,
) {
match ban_operation {
BanOperation::TemporaryBan => {
// The peer could be temporarily banned. We only do this in the case that
// we have currently reached our peer target limit.
if self.network_globals.connected_peers() >= self.target_peers {
// We have enough peers, prevent this reconnection.
self.temporary_banned_peers.raw_insert(*peer_id);
self.events.push(PeerManagerEvent::Banned(*peer_id, vec![]));
}
}
BanOperation::DisconnectThePeer => {
// The peer was currently connected, so we start a disconnection.
// Once the peer has disconnected, its connection state will transition to a
// banned state.
self.events.push(PeerManagerEvent::DisconnectPeer(
*peer_id,
reason.unwrap_or(GoodbyeReason::BadScore),
));
}
BanOperation::PeerDisconnecting => {
// The peer is currently being disconnected and will be banned once the
// disconnection completes.
}
BanOperation::ReadyToBan(banned_ips) => {
// The peer is not currently connected, we can safely ban it at the swarm
// level.
// If a peer is being banned, this trumps any temporary ban the peer might be
// under. We no longer track it in the temporary ban list.
if !self.temporary_banned_peers.raw_remove(peer_id) {
// If the peer is not already banned, inform the Swarm to ban the peer
self.events
.push(PeerManagerEvent::Banned(*peer_id, banned_ips));
// If the peer was in the process of being un-banned, remove it (a rare race
// condition)
self.events.retain(|event| {
if let PeerManagerEvent::UnBanned(unbanned_peer_id, _) = event {
unbanned_peer_id != peer_id // Remove matching peer ids
} else {
true
}
});
}
}
}
}
/// Peers that have been returned by discovery requests that are suitable for dialing are
/// returned here.
///
/// This function decides whether or not to dial these peers.
#[allow(clippy::mutable_key_type)]
pub fn peers_discovered(&mut self, results: HashMap<Enr, Option<Instant>>) {
let mut to_dial_peers = 0;
let connected_or_dialing = self.network_globals.connected_or_dialing_peers();
for (enr, min_ttl) in results {
// There are two conditions in deciding whether to dial this peer.
// 1. If we are less than our max connections. Discovery queries are executed to reach
// our target peers, so its fine to dial up to our max peers (which will get pruned
// in the next heartbeat down to our target).
// 2. If the peer is one our validators require for a specific subnet, then it is
// considered a priority. We have pre-allocated some extra priority slots for these
// peers as specified by PRIORITY_PEER_EXCESS. Therefore we dial these peers, even
// if we are already at our max_peer limit.
if !self.peers_to_dial.contains(&enr)
&& ((min_ttl.is_some()
&& connected_or_dialing + to_dial_peers < self.max_priority_peers())
|| connected_or_dialing + to_dial_peers < self.max_peers())
{
// This should be updated with the peer dialing. In fact created once the peer is
// dialed
if let Some(min_ttl) = min_ttl {
self.network_globals
.peers
.write()
.update_min_ttl(&enr.peer_id(), min_ttl);
}
let peer_id = enr.peer_id();
if self.dial_peer(enr) {
debug!(self.log, "Dialing discovered peer"; "peer_id" => %peer_id);
to_dial_peers += 1;
}
}
}
// Queue another discovery if we need to
self.maintain_peer_count(to_dial_peers);
}
/// A STATUS message has been received from a peer. This resets the status timer.
pub fn peer_statusd(&mut self, peer_id: &PeerId) {
self.status_peers.insert(*peer_id);
}
/// Insert the sync subnet into list of long lived sync committee subnets that we need to
/// maintain adequate number of peers for.
pub fn add_sync_subnet(&mut self, subnet_id: SyncSubnetId, min_ttl: Instant) {
match self.sync_committee_subnets.entry(subnet_id) {
Entry::Vacant(_) => {
self.sync_committee_subnets.insert(subnet_id, min_ttl);
}
Entry::Occupied(old) => {
if *old.get() < min_ttl {
self.sync_committee_subnets.insert(subnet_id, min_ttl);
}
}
}
}
/// The maximum number of peers we allow to connect to us. This is `target_peers` * (1 +
/// PEER_EXCESS_FACTOR)
fn max_peers(&self) -> usize {
(self.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR)).ceil() as usize
}
/// The maximum number of peers we allow when dialing a priority peer (i.e a peer that is
/// subscribed to subnets that our validator requires. This is `target_peers` * (1 +
/// PEER_EXCESS_FACTOR + PRIORITY_PEER_EXCESS)
fn max_priority_peers(&self) -> usize {
(self.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR + PRIORITY_PEER_EXCESS)).ceil()
as usize
}
/// The minimum number of outbound peers that we reach before we start another discovery query.
fn min_outbound_only_peers(&self) -> usize {
(self.target_peers as f32 * MIN_OUTBOUND_ONLY_FACTOR).ceil() as usize
}
/// The minimum number of outbound peers that we reach before we start another discovery query.
fn target_outbound_peers(&self) -> usize {
(self.target_peers as f32 * TARGET_OUTBOUND_ONLY_FACTOR).ceil() as usize
}
/// The maximum number of peers that are connected or dialing before we refuse to do another
/// discovery search for more outbound peers. We can use up to half the priority peer excess allocation.
fn max_outbound_dialing_peers(&self) -> usize {
(self.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR + PRIORITY_PEER_EXCESS / 2.0)).ceil()
as usize
}
/* Notifications from the Swarm */
/// A peer is being dialed.
/// Returns true, if this peer will be dialed.
pub fn dial_peer(&mut self, peer: Enr) -> bool {
if self
.network_globals
.peers
.read()
.should_dial(&peer.peer_id())
{
self.peers_to_dial.push(peer);
true
} else {
false
}
}
/// Reports if a peer is banned or not.
///
/// This is used to determine if we should accept incoming connections.
pub fn ban_status(&self, peer_id: &PeerId) -> Option<BanResult> {
self.network_globals.peers.read().ban_status(peer_id)
}
pub fn is_connected(&self, peer_id: &PeerId) -> bool {
self.network_globals.peers.read().is_connected(peer_id)
}
/// Reports whether the peer limit is reached in which case we stop allowing new incoming
/// connections.
pub fn peer_limit_reached(&self, count_dialing: bool) -> bool {
if count_dialing {
// This is an incoming connection so limit by the standard max peers
self.network_globals.connected_or_dialing_peers() >= self.max_peers()
} else {
// We dialed this peer, allow up to max_outbound_dialing_peers
self.network_globals.connected_peers() >= self.max_outbound_dialing_peers()
}
}
/// Updates `PeerInfo` with `identify` information.
pub fn identify(&mut self, peer_id: &PeerId, info: &IdentifyInfo) {
if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
let previous_kind = peer_info.client().kind;
let previous_listening_addresses =
peer_info.set_listening_addresses(info.listen_addrs.clone());
peer_info.set_client(peerdb::client::Client::from_identify_info(info));
if previous_kind != peer_info.client().kind
|| *peer_info.listening_addresses() != previous_listening_addresses
{
debug!(self.log, "Identified Peer"; "peer" => %peer_id,
"protocol_version" => &info.protocol_version,
"agent_version" => &info.agent_version,
"listening_addresses" => ?info.listen_addrs,
"observed_address" => ?info.observed_addr,
"protocols" => ?info.protocols
);
}
} else {
error!(self.log, "Received an Identify response from an unknown peer"; "peer_id" => peer_id.to_string());
}
}
/// An error has occurred in the RPC.
///
/// This adjusts a peer's score based on the error.
pub fn handle_rpc_error(
&mut self,
peer_id: &PeerId,
protocol: Protocol,
err: &RPCError,
direction: ConnectionDirection,
) {
let client = self.network_globals.client(peer_id);
let score = self.network_globals.peers.read().score(peer_id);
debug!(self.log, "RPC Error"; "protocol" => %protocol, "err" => %err, "client" => %client,
"peer_id" => %peer_id, "score" => %score, "direction" => ?direction);
metrics::inc_counter_vec(
&metrics::TOTAL_RPC_ERRORS_PER_CLIENT,
&[
client.kind.as_ref(),
err.as_static_str(),
direction.as_ref(),
],
);
// Map this error to a `PeerAction` (if any)
let peer_action = match err {
RPCError::IncompleteStream => {
// They closed early, this could mean poor connection
PeerAction::MidToleranceError
}
RPCError::InternalError(e) => {
debug!(self.log, "Internal RPC Error"; "error" => %e, "peer_id" => %peer_id);
return;
}
RPCError::HandlerRejected => PeerAction::Fatal,
RPCError::InvalidData(_) => {
// Peer is not complying with the protocol. This is considered a malicious action
PeerAction::Fatal
}
RPCError::IoError(_e) => {
// this could their fault or ours, so we tolerate this
PeerAction::HighToleranceError
}
RPCError::ErrorResponse(code, _) => match code {
RPCResponseErrorCode::Unknown => PeerAction::HighToleranceError,
RPCResponseErrorCode::ResourceUnavailable => {
// Don't ban on this because we want to retry with a block by root request.
if matches!(protocol, Protocol::BlobsByRoot) {
return;
}
// NOTE: This error only makes sense for the `BlocksByRange` and `BlocksByRoot`
// protocols.
//
// If we are syncing, there is no point keeping these peers around and
// continually failing to request blocks. We instantly ban them and hope that
// by the time the ban lifts, the peers will have completed their backfill
// sync.
//
// TODO: Potentially a more graceful way of handling such peers, would be to
// implement a new sync type which tracks these peers and prevents the sync
// algorithms from requesting blocks from them (at least for a set period of
// time, multiple failures would then lead to a ban).
match direction {
// If the blocks request was initiated by us, then we have no use of this
// peer and so we ban it.
ConnectionDirection::Outgoing => PeerAction::Fatal,
// If the blocks request was initiated by the peer, then we let the peer decide if
// it wants to continue talking to us, we do not ban the peer.
ConnectionDirection::Incoming => return,
}
}
RPCResponseErrorCode::ServerError => PeerAction::MidToleranceError,
RPCResponseErrorCode::InvalidRequest => PeerAction::LowToleranceError,
RPCResponseErrorCode::RateLimited => match protocol {
Protocol::Ping => PeerAction::MidToleranceError,
Protocol::BlocksByRange => PeerAction::MidToleranceError,
Protocol::BlocksByRoot => PeerAction::MidToleranceError,
Protocol::BlobsByRange => PeerAction::MidToleranceError,
Protocol::LightClientBootstrap => PeerAction::LowToleranceError,
Protocol::BlobsByRoot => PeerAction::MidToleranceError,
Protocol::Goodbye => PeerAction::LowToleranceError,
Protocol::MetaData => PeerAction::LowToleranceError,
Protocol::Status => PeerAction::LowToleranceError,
},
RPCResponseErrorCode::BlobsNotFoundForBlock => PeerAction::LowToleranceError,
},
RPCError::SSZDecodeError(_) => PeerAction::Fatal,
RPCError::UnsupportedProtocol => {
// Not supporting a protocol shouldn't be considered a malicious action, but
// it is an action that in some cases will make the peer unfit to continue
// communicating.
match protocol {
Protocol::Ping => PeerAction::Fatal,
Protocol::BlocksByRange => return,
Protocol::BlocksByRoot => return,
Protocol::BlobsByRange => return,
Protocol::BlobsByRoot => return,
Protocol::Goodbye => return,
Protocol::LightClientBootstrap => return,
Protocol::MetaData => PeerAction::Fatal,
Protocol::Status => PeerAction::Fatal,
}
}
RPCError::StreamTimeout => match direction {
ConnectionDirection::Incoming => {
// There was a timeout responding to a peer.
debug!(self.log, "Timed out responding to RPC Request"; "peer_id" => %peer_id);
return;
}
ConnectionDirection::Outgoing => match protocol {
Protocol::Ping => PeerAction::LowToleranceError,
Protocol::BlocksByRange => PeerAction::MidToleranceError,
Protocol::BlocksByRoot => PeerAction::MidToleranceError,
Protocol::BlobsByRange => PeerAction::MidToleranceError,
Protocol::BlobsByRoot => PeerAction::MidToleranceError,
Protocol::LightClientBootstrap => return,
Protocol::Goodbye => return,
Protocol::MetaData => return,
Protocol::Status => return,
},
},
RPCError::NegotiationTimeout => PeerAction::LowToleranceError,
RPCError::Disconnected => return, // No penalty for a graceful disconnection
};
self.report_peer(
peer_id,
peer_action,
ReportSource::RPC,
None,
"handle_rpc_error",
);
}
/// A ping request has been received.
// NOTE: The behaviour responds with a PONG automatically
pub fn ping_request(&mut self, peer_id: &PeerId, seq: u64) {
if let Some(peer_info) = self.network_globals.peers.read().peer_info(peer_id) {
// received a ping
// reset the to-ping timer for this peer
trace!(self.log, "Received a ping request"; "peer_id" => %peer_id, "seq_no" => seq);
match peer_info.connection_direction() {
Some(ConnectionDirection::Incoming) => {
self.inbound_ping_peers.insert(*peer_id);
}
Some(ConnectionDirection::Outgoing) => {
self.outbound_ping_peers.insert(*peer_id);
}
None => {
warn!(self.log, "Received a ping from a peer with an unknown connection direction"; "peer_id" => %peer_id);
}
}
// if the sequence number is unknown send an update the meta data of the peer.
if let Some(meta_data) = &peer_info.meta_data() {
if *meta_data.seq_number() < seq {
trace!(self.log, "Requesting new metadata from peer";
"peer_id" => %peer_id, "known_seq_no" => meta_data.seq_number(), "ping_seq_no" => seq);
self.events.push(PeerManagerEvent::MetaData(*peer_id));
}
} else {
// if we don't know the meta-data, request it
debug!(self.log, "Requesting first metadata from peer";
"peer_id" => %peer_id);
self.events.push(PeerManagerEvent::MetaData(*peer_id));
}
} else {
error!(self.log, "Received a PING from an unknown peer";
"peer_id" => %peer_id);
}
}
/// A PONG has been returned from a peer.
pub fn pong_response(&mut self, peer_id: &PeerId, seq: u64) {
if let Some(peer_info) = self.network_globals.peers.read().peer_info(peer_id) {
// received a pong
// if the sequence number is unknown send update the meta data of the peer.
if let Some(meta_data) = &peer_info.meta_data() {
if *meta_data.seq_number() < seq {
trace!(self.log, "Requesting new metadata from peer";
"peer_id" => %peer_id, "known_seq_no" => meta_data.seq_number(), "pong_seq_no" => seq);
self.events.push(PeerManagerEvent::MetaData(*peer_id));
}
} else {
// if we don't know the meta-data, request it
trace!(self.log, "Requesting first metadata from peer";
"peer_id" => %peer_id);
self.events.push(PeerManagerEvent::MetaData(*peer_id));
}
} else {
error!(self.log, "Received a PONG from an unknown peer"; "peer_id" => %peer_id);
}
}
/// Received a metadata response from a peer.
pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData<TSpec>) {
if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
if let Some(known_meta_data) = &peer_info.meta_data() {
if *known_meta_data.seq_number() < *meta_data.seq_number() {
trace!(self.log, "Updating peer's metadata";
"peer_id" => %peer_id, "known_seq_no" => known_meta_data.seq_number(), "new_seq_no" => meta_data.seq_number());
} else {
trace!(self.log, "Received old metadata";
"peer_id" => %peer_id, "known_seq_no" => known_meta_data.seq_number(), "new_seq_no" => meta_data.seq_number());
// Updating metadata even in this case to prevent storing
// incorrect `attnets/syncnets` for a peer
}
} else {
// we have no meta-data for this peer, update
debug!(self.log, "Obtained peer's metadata";
"peer_id" => %peer_id, "new_seq_no" => meta_data.seq_number());
}
peer_info.set_meta_data(meta_data);
} else {
error!(self.log, "Received METADATA from an unknown peer";
"peer_id" => %peer_id);
}
}
/// Updates the gossipsub scores for all known peers in gossipsub.
pub(crate) fn update_gossipsub_scores(&mut self, gossipsub: &Gossipsub) {
let actions = self
.network_globals
.peers
.write()
.update_gossipsub_scores(self.target_peers, gossipsub);
for (peer_id, score_action) in actions {
self.handle_score_action(&peer_id, score_action, None);
}
}
/* Internal functions */
/// Sets a peer as connected as long as their reputation allows it
/// Informs if the peer was accepted
fn inject_connect_ingoing(
&mut self,
peer_id: &PeerId,
multiaddr: Multiaddr,
enr: Option<Enr>,
) -> bool {
self.inject_peer_connection(peer_id, ConnectingType::IngoingConnected { multiaddr }, enr)
}
/// Sets a peer as connected as long as their reputation allows it
/// Informs if the peer was accepted
fn inject_connect_outgoing(
&mut self,
peer_id: &PeerId,
multiaddr: Multiaddr,
enr: Option<Enr>,
) -> bool {
self.inject_peer_connection(
peer_id,
ConnectingType::OutgoingConnected { multiaddr },
enr,
)
}
/// Updates the state of the peer as disconnected.
///
/// This is also called when dialing a peer fails.
fn inject_disconnect(&mut self, peer_id: &PeerId) {
let (ban_operation, purged_peers) = self
.network_globals
.peers
.write()
.inject_disconnect(peer_id);
if let Some(ban_operation) = ban_operation {
// The peer was awaiting a ban, continue to ban the peer.
self.handle_ban_operation(peer_id, ban_operation, None);
}
// Remove the ping and status timer for the peer
self.inbound_ping_peers.remove(peer_id);
self.outbound_ping_peers.remove(peer_id);
self.status_peers.remove(peer_id);
self.events.extend(
purged_peers
.into_iter()
.map(|(peer_id, unbanned_ips)| PeerManagerEvent::UnBanned(peer_id, unbanned_ips)),
);
}
/// Registers a peer as connected. The `ingoing` parameter determines if the peer is being
/// dialed or connecting to us.
///
/// This is called by `connect_ingoing` and `connect_outgoing`.
///
/// Informs if the peer was accepted in to the db or not.
fn inject_peer_connection(
&mut self,
peer_id: &PeerId,
connection: ConnectingType,
enr: Option<Enr>,
) -> bool {
{
let mut peerdb = self.network_globals.peers.write();
if peerdb.ban_status(peer_id).is_some() {
// don't connect if the peer is banned
error!(self.log, "Connection has been allowed to a banned peer"; "peer_id" => %peer_id);
}
match connection {
ConnectingType::Dialing => {
peerdb.dialing_peer(peer_id, enr);
return true;
}
ConnectingType::IngoingConnected { multiaddr } => {
peerdb.connect_ingoing(peer_id, multiaddr, enr);
// start a timer to ping inbound peers.
self.inbound_ping_peers.insert(*peer_id);
}
ConnectingType::OutgoingConnected { multiaddr } => {
peerdb.connect_outgoing(peer_id, multiaddr, enr);
// start a timer for to ping outbound peers.
self.outbound_ping_peers.insert(*peer_id);
}
}
}
// start a ping and status timer for the peer
self.status_peers.insert(*peer_id);
// increment prometheus metrics
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);
true
}
// Gracefully disconnects a peer without banning them.
fn disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) {
self.events
.push(PeerManagerEvent::DisconnectPeer(peer_id, reason));
self.network_globals
.peers
.write()
.notify_disconnecting(&peer_id, false);
}
/// Run discovery query for additional sync committee peers if we fall below `TARGET_PEERS`.
fn maintain_sync_committee_peers(&mut self) {
// Remove expired entries
self.sync_committee_subnets
.retain(|_, v| *v > Instant::now());
let subnets_to_discover: Vec<SubnetDiscovery> = self
.sync_committee_subnets
.iter()
.filter_map(|(k, v)| {
if self
.network_globals
.peers
.read()
.good_peers_on_subnet(Subnet::SyncCommittee(*k))
.count()
< TARGET_SUBNET_PEERS
{
Some(SubnetDiscovery {
subnet: Subnet::SyncCommittee(*k),
min_ttl: Some(*v),
})
} else {
None
}
})
.collect();
// request the subnet query from discovery
if !subnets_to_discover.is_empty() {
debug!(
self.log,
"Making subnet queries for maintaining sync committee peers";
"subnets" => ?subnets_to_discover.iter().map(|s| s.subnet).collect::<Vec<_>>()
);
self.events
.push(PeerManagerEvent::DiscoverSubnetPeers(subnets_to_discover));
}
}
/// This function checks the status of our current peers and optionally requests a discovery
/// query if we need to find more peers to maintain the current number of peers
fn maintain_peer_count(&mut self, dialing_peers: usize) {
// Check if we need to do a discovery lookup
if self.discovery_enabled {
let peer_count = self.network_globals.connected_or_dialing_peers();
let outbound_only_peer_count = self.network_globals.connected_outbound_only_peers();
let wanted_peers = if peer_count < self.target_peers.saturating_sub(dialing_peers) {
// We need more peers in general.
self.max_peers().saturating_sub(dialing_peers) - peer_count
} else if outbound_only_peer_count < self.min_outbound_only_peers()
&& peer_count < self.max_outbound_dialing_peers()
{
self.max_outbound_dialing_peers()
.saturating_sub(dialing_peers)
.saturating_sub(peer_count)
} else {
0
};
if wanted_peers != 0 {
// We need more peers, re-queue a discovery lookup.
debug!(self.log, "Starting a new peer discovery query"; "connected" => peer_count, "target" => self.target_peers, "outbound" => outbound_only_peer_count, "wanted" => wanted_peers);
self.events
.push(PeerManagerEvent::DiscoverPeers(wanted_peers));
}
}
}
/// Remove excess peers back down to our target values.
/// This prioritises peers with a good score and uniform distribution of peers across
/// subnets.
///
/// The logic for the peer pruning is as follows:
///
/// Global rules:
/// - Always maintain peers we need for a validator duty.
/// - Do not prune outbound peers to exceed our outbound target.
/// - Do not prune more peers than our target peer count.
/// - If we have an option to remove a number of peers, remove ones that have the least
/// long-lived subnets.
/// - When pruning peers based on subnet count. If multiple peers can be chosen, choose a peer
/// that is not subscribed to a long-lived sync committee subnet.
/// - When pruning peers based on subnet count, do not prune a peer that would lower us below the
/// MIN_SYNC_COMMITTEE_PEERS peer count. To keep it simple, we favour a minimum number of sync-committee-peers over
/// uniformity subnet peers. NOTE: We could apply more sophisticated logic, but the code is
/// simpler and easier to maintain if we take this approach. If we are pruning subnet peers
/// below the MIN_SYNC_COMMITTEE_PEERS and maintaining the sync committee peers, this should be
/// fine as subnet peers are more likely to be found than sync-committee-peers. Also, we're
/// in a bit of trouble anyway if we have so few peers on subnets. The
/// MIN_SYNC_COMMITTEE_PEERS
/// number should be set low as an absolute lower bound to maintain peers on the sync
/// committees.
/// - Do not prune trusted peers. NOTE: This means if a user has more trusted peers than the
/// excess peer limit, all of the following logic is subverted as we will not prune any peers.
/// Also, the more trusted peers a user has, the less room Lighthouse has to efficiently manage
/// its peers across the subnets.
///
/// Prune peers in the following order:
/// 1. Remove worst scoring peers
/// 2. Remove peers that are not subscribed to a subnet (they have less value)
/// 3. Remove peers that we have many on any particular subnet
/// 4. Randomly remove peers if all the above are satisfied
///
fn prune_excess_peers(&mut self) {
// The current number of connected peers.
let connected_peer_count = self.network_globals.connected_peers();
if connected_peer_count <= self.target_peers {
// No need to prune peers
return;
}
// Keep a list of peers we are pruning.
let mut peers_to_prune = std::collections::HashSet::new();
let connected_outbound_peer_count = self.network_globals.connected_outbound_only_peers();
// Keep track of the number of outbound peers we are pruning.
let mut outbound_peers_pruned = 0;
macro_rules! prune_peers {
($filter: expr) => {
let filter = $filter;
for (peer_id, info) in self
.network_globals
.peers
.read()
.worst_connected_peers()
.iter()
.filter(|(_, info)| {
!info.has_future_duty() && !info.is_trusted() && filter(*info)
})
{
if peers_to_prune.len()
>= connected_peer_count.saturating_sub(self.target_peers)
{
// We have found all the peers we need to drop, end.
break;
}
if peers_to_prune.contains(*peer_id) {
continue;
}
// Only remove up to the target outbound peer count.
if info.is_outbound_only() {
if self.target_outbound_peers() + outbound_peers_pruned
< connected_outbound_peer_count
{
outbound_peers_pruned += 1;
} else {
continue;
}
}
peers_to_prune.insert(**peer_id);
}
};
}
// 1. Look through peers that have the worst score (ignoring non-penalized scored peers).
prune_peers!(|info: &PeerInfo<TSpec>| { info.score().score() < 0.0 });
// 2. Attempt to remove peers that are not subscribed to a subnet, if we still need to
// prune more.
if peers_to_prune.len() < connected_peer_count.saturating_sub(self.target_peers) {
prune_peers!(|info: &PeerInfo<TSpec>| { !info.has_long_lived_subnet() });
}
// 3. and 4. Remove peers that are too grouped on any given subnet. If all subnets are
// uniformly distributed, remove random peers.
if peers_to_prune.len() < connected_peer_count.saturating_sub(self.target_peers) {
// Of our connected peers, build a map from subnet_id -> Vec<(PeerId, PeerInfo)>
let mut subnet_to_peer: HashMap<Subnet, Vec<(PeerId, PeerInfo<TSpec>)>> =
HashMap::new();
// These variables are used to track if a peer is in a long-lived sync-committee as we
// may wish to retain this peer over others when pruning.
let mut sync_committee_peer_count: HashMap<SyncSubnetId, u64> = HashMap::new();
let mut peer_to_sync_committee: HashMap<
PeerId,
std::collections::HashSet<SyncSubnetId>,
> = HashMap::new();