@@ -389,7 +389,9 @@ const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32;
389
389
390
390
struct Peer {
391
391
channel_encryptor : PeerChannelEncryptor ,
392
- their_node_id : Option < PublicKey > ,
392
+ /// We cache a `NodeId` here to avoid serializing peers' keys every time we forward gossip
393
+ /// messages in `PeerManager`. Use `Peer::set_their_node_id` to modify this field.
394
+ their_node_id : Option < ( PublicKey , NodeId ) > ,
393
395
their_features : Option < InitFeatures > ,
394
396
their_net_address : Option < NetAddress > ,
395
397
@@ -481,6 +483,10 @@ impl Peer {
481
483
total_outbound_buffered > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP ||
482
484
self . msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
483
485
}
486
+
487
+ fn set_their_node_id ( & mut self , node_id : PublicKey ) {
488
+ self . their_node_id = Some ( ( node_id, NodeId :: from_pubkey ( & node_id) ) ) ;
489
+ }
484
490
}
485
491
486
492
/// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g.
@@ -651,10 +657,10 @@ impl<Descriptor: SocketDescriptor, RM: Deref, L: Deref, NS: Deref> PeerManager<D
651
657
/// This works around `format!()` taking a reference to each argument, preventing
652
658
/// `if let Some(node_id) = peer.their_node_id { format!(.., node_id) } else { .. }` from compiling
653
659
/// due to lifetime errors.
654
- struct OptionalFromDebugger < ' a > ( & ' a Option < PublicKey > ) ;
660
+ struct OptionalFromDebugger < ' a > ( & ' a Option < ( PublicKey , NodeId ) > ) ;
655
661
impl core:: fmt:: Display for OptionalFromDebugger < ' _ > {
656
662
fn fmt ( & self , f : & mut core:: fmt:: Formatter < ' _ > ) -> Result < ( ) , core:: fmt:: Error > {
657
- if let Some ( node_id) = self . 0 { write ! ( f, " from {}" , log_pubkey!( node_id) ) } else { Ok ( ( ) ) }
663
+ if let Some ( ( node_id, _ ) ) = self . 0 { write ! ( f, " from {}" , log_pubkey!( node_id) ) } else { Ok ( ( ) ) }
658
664
}
659
665
}
660
666
@@ -741,7 +747,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
741
747
return None ;
742
748
}
743
749
p. their_node_id
744
- } ) . collect ( )
750
+ } ) . map ( | ( node_id , _ ) | node_id ) . collect ( )
745
751
}
746
752
747
753
fn get_ephemeral_key ( & self ) -> SecretKey {
@@ -849,7 +855,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
849
855
fn do_attempt_write_data ( & self , descriptor : & mut Descriptor , peer : & mut Peer ) {
850
856
while !peer. awaiting_write_event {
851
857
if peer. should_buffer_onion_message ( ) {
852
- if let Some ( peer_node_id) = peer. their_node_id {
858
+ if let Some ( ( peer_node_id, _ ) ) = peer. their_node_id {
853
859
if let Some ( next_onion_message) =
854
860
self . message_handler . onion_message_handler . next_onion_message_for_peer ( peer_node_id) {
855
861
self . enqueue_message ( peer, & next_onion_message) ;
@@ -978,9 +984,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
978
984
/// Append a message to a peer's pending outbound/write buffer
979
985
fn enqueue_message < M : wire:: Type > ( & self , peer : & mut Peer , message : & M ) {
980
986
if is_gossip_msg ( message. type_id ( ) ) {
981
- log_gossip ! ( self . logger, "Enqueueing message {:?} to {}" , message, log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
987
+ log_gossip ! ( self . logger, "Enqueueing message {:?} to {}" , message, log_pubkey!( peer. their_node_id. unwrap( ) . 0 ) ) ;
982
988
} else {
983
- log_trace ! ( self . logger, "Enqueueing message {:?} to {}" , message, log_pubkey!( peer. their_node_id. unwrap( ) ) )
989
+ log_trace ! ( self . logger, "Enqueueing message {:?} to {}" , message, log_pubkey!( peer. their_node_id. unwrap( ) . 0 ) )
984
990
}
985
991
peer. msgs_sent_since_pong += 1 ;
986
992
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( message) ) ;
@@ -1065,14 +1071,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1065
1071
1066
1072
macro_rules! insert_node_id {
1067
1073
( ) => {
1068
- match self . node_id_to_descriptor. lock( ) . unwrap( ) . entry( peer. their_node_id. unwrap( ) ) {
1074
+ match self . node_id_to_descriptor. lock( ) . unwrap( ) . entry( peer. their_node_id. unwrap( ) . 0 ) {
1069
1075
hash_map:: Entry :: Occupied ( _) => {
1070
- log_trace!( self . logger, "Got second connection with {}, closing" , log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
1076
+ log_trace!( self . logger, "Got second connection with {}, closing" , log_pubkey!( peer. their_node_id. unwrap( ) . 0 ) ) ;
1071
1077
peer. their_node_id = None ; // Unset so that we don't generate a peer_disconnected event
1072
1078
return Err ( PeerHandleError { no_connection_possible: false } )
1073
1079
} ,
1074
1080
hash_map:: Entry :: Vacant ( entry) => {
1075
- log_debug!( self . logger, "Finished noise handshake for connection with {}" , log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
1081
+ log_debug!( self . logger, "Finished noise handshake for connection with {}" , log_pubkey!( peer. their_node_id. unwrap( ) . 0 ) ) ;
1076
1082
entry. insert( peer_descriptor. clone( ) )
1077
1083
} ,
1078
1084
} ;
@@ -1096,7 +1102,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1096
1102
peer. pending_read_buffer = [ 0 ; 18 ] . to_vec ( ) ; // Message length header is 18 bytes
1097
1103
peer. pending_read_is_header = true ;
1098
1104
1099
- peer. their_node_id = Some ( their_node_id) ;
1105
+ peer. set_their_node_id ( their_node_id) ;
1100
1106
insert_node_id ! ( ) ;
1101
1107
let features = self . message_handler . chan_handler . provided_init_features ( & their_node_id)
1102
1108
. or ( self . message_handler . route_handler . provided_init_features ( & their_node_id) )
@@ -1110,7 +1116,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1110
1116
peer. channel_encryptor. process_act_three( & peer. pending_read_buffer[ ..] ) ) ;
1111
1117
peer. pending_read_buffer = [ 0 ; 18 ] . to_vec ( ) ; // Message length header is 18 bytes
1112
1118
peer. pending_read_is_header = true ;
1113
- peer. their_node_id = Some ( their_node_id) ;
1119
+ peer. set_their_node_id ( their_node_id) ;
1114
1120
insert_node_id ! ( ) ;
1115
1121
let features = self . message_handler . chan_handler . provided_init_features ( & their_node_id)
1116
1122
. or ( self . message_handler . route_handler . provided_init_features ( & their_node_id) )
@@ -1212,7 +1218,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1212
1218
}
1213
1219
1214
1220
for msg in msgs_to_forward. drain ( ..) {
1215
- self . forward_broadcast_msg ( & * peers, & msg, peer_node_id. as_ref ( ) ) ;
1221
+ self . forward_broadcast_msg ( & * peers, & msg, peer_node_id. as_ref ( ) . map ( | ( pk , _ ) | pk ) ) ;
1216
1222
}
1217
1223
1218
1224
Ok ( pause_read)
@@ -1226,7 +1232,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1226
1232
mut peer_lock : MutexGuard < Peer > ,
1227
1233
message : wire:: Message < <<CMH as core:: ops:: Deref >:: Target as wire:: CustomMessageReader >:: CustomMessage >
1228
1234
) -> Result < Option < wire:: Message < <<CMH as core:: ops:: Deref >:: Target as wire:: CustomMessageReader >:: CustomMessage > > , MessageHandlingError > {
1229
- let their_node_id = peer_lock. their_node_id . clone ( ) . expect ( "We know the peer's public key by the time we receive messages" ) ;
1235
+ let their_node_id = peer_lock. their_node_id . clone ( ) . expect ( "We know the peer's public key by the time we receive messages" ) . 0 ;
1230
1236
peer_lock. received_message_since_timer_tick = true ;
1231
1237
1232
1238
// Need an Init as first message
@@ -1467,13 +1473,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1467
1473
log_gossip ! ( self . logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
1468
1474
continue ;
1469
1475
}
1470
- if let Some ( their_node_id) = peer. their_node_id {
1471
- let their_node_id = NodeId :: from_pubkey ( & their_node_id) ;
1476
+ if let Some ( ( _, their_node_id) ) = peer. their_node_id {
1472
1477
if their_node_id == msg. contents . node_id_1 || their_node_id == msg. contents . node_id_2 {
1473
1478
continue ;
1474
1479
}
1475
1480
}
1476
- if except_node. is_some ( ) && peer. their_node_id . as_ref ( ) == except_node {
1481
+ if except_node. is_some ( ) && peer. their_node_id . as_ref ( ) . map ( | ( pk , _ ) | pk ) == except_node {
1477
1482
continue ;
1478
1483
}
1479
1484
self . enqueue_encoded_gossip_broadcast ( & mut * peer, encoded_msg. clone ( ) ) ;
@@ -1493,12 +1498,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1493
1498
log_gossip ! ( self . logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
1494
1499
continue ;
1495
1500
}
1496
- if let Some ( their_node_id) = peer. their_node_id {
1497
- if NodeId :: from_pubkey ( & their_node_id) == msg. contents . node_id {
1501
+ if let Some ( ( _ , their_node_id) ) = peer. their_node_id {
1502
+ if their_node_id == msg. contents . node_id {
1498
1503
continue ;
1499
1504
}
1500
1505
}
1501
- if except_node. is_some ( ) && peer. their_node_id . as_ref ( ) == except_node {
1506
+ if except_node. is_some ( ) && peer. their_node_id . as_ref ( ) . map ( | ( pk , _ ) | pk ) == except_node {
1502
1507
continue ;
1503
1508
}
1504
1509
self . enqueue_encoded_gossip_broadcast ( & mut * peer, encoded_msg. clone ( ) ) ;
@@ -1518,7 +1523,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1518
1523
log_gossip ! ( self . logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
1519
1524
continue ;
1520
1525
}
1521
- if except_node. is_some ( ) && peer. their_node_id . as_ref ( ) == except_node {
1526
+ if except_node. is_some ( ) && peer. their_node_id . as_ref ( ) . map ( | ( pk , _ ) | pk ) == except_node {
1522
1527
continue ;
1523
1528
}
1524
1529
self . enqueue_encoded_gossip_broadcast ( & mut * peer, encoded_msg. clone ( ) ) ;
@@ -1837,7 +1842,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1837
1842
} ,
1838
1843
Some ( peer_lock) => {
1839
1844
let peer = peer_lock. lock ( ) . unwrap ( ) ;
1840
- if let Some ( node_id) = peer. their_node_id {
1845
+ if let Some ( ( node_id, _ ) ) = peer. their_node_id {
1841
1846
log_trace ! ( self . logger,
1842
1847
"Handling disconnection of peer {}, with {}future connection to the peer possible." ,
1843
1848
log_pubkey!( node_id) , if no_connection_possible { "no " } else { "" } ) ;
@@ -1877,7 +1882,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1877
1882
self . node_id_to_descriptor . lock ( ) . unwrap ( ) . clear ( ) ;
1878
1883
let peers = & mut * peers_lock;
1879
1884
for ( mut descriptor, peer) in peers. drain ( ) {
1880
- if let Some ( node_id) = peer. lock ( ) . unwrap ( ) . their_node_id {
1885
+ if let Some ( ( node_id, _ ) ) = peer. lock ( ) . unwrap ( ) . their_node_id {
1881
1886
log_trace ! ( self . logger, "Disconnecting peer with id {} due to client request to disconnect all peers" , node_id) ;
1882
1887
self . message_handler . chan_handler . peer_disconnected ( & node_id, false ) ;
1883
1888
self . message_handler . onion_message_handler . peer_disconnected ( & node_id, false ) ;
@@ -1967,7 +1972,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1967
1972
let mut peers_lock = self . peers . write ( ) . unwrap ( ) ;
1968
1973
for descriptor in descriptors_needing_disconnect. iter ( ) {
1969
1974
if let Some ( peer) = peers_lock. remove ( descriptor) {
1970
- if let Some ( node_id) = peer. lock ( ) . unwrap ( ) . their_node_id {
1975
+ if let Some ( ( node_id, _ ) ) = peer. lock ( ) . unwrap ( ) . their_node_id {
1971
1976
log_trace ! ( self . logger, "Disconnecting peer with id {} due to ping timeout" , node_id) ;
1972
1977
self . node_id_to_descriptor . lock ( ) . unwrap ( ) . remove ( & node_id) ;
1973
1978
self . message_handler . chan_handler . peer_disconnected ( & node_id, false ) ;
0 commit comments