Skip to content

Store a cached NodeId for each Peer #2022

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion lightning/src/ln/chan_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use bitcoin::hash_types::{Txid, PubkeyHash};

use crate::ln::{PaymentHash, PaymentPreimage};
use crate::ln::msgs::DecodeError;
use crate::routing::gossip::NodeId;
use crate::util::ser::{Readable, Writeable, Writer};
use crate::util::transaction_utils;

Expand Down
53 changes: 29 additions & 24 deletions lightning/src/ln/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,9 @@ const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32;

struct Peer {
channel_encryptor: PeerChannelEncryptor,
their_node_id: Option<PublicKey>,
/// We cache a `NodeId` here to avoid serializing peers' keys every time we forward gossip
/// messages in `PeerManager`. Use `Peer::set_their_node_id` to modify this field.
their_node_id: Option<(PublicKey, NodeId)>,
their_features: Option<InitFeatures>,
their_net_address: Option<NetAddress>,

Expand Down Expand Up @@ -481,6 +483,10 @@ impl Peer {
total_outbound_buffered > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP ||
self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
}

fn set_their_node_id(&mut self, node_id: PublicKey) {
self.their_node_id = Some((node_id, NodeId::from_pubkey(&node_id)));
}
}

/// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g.
Expand Down Expand Up @@ -651,10 +657,10 @@ impl<Descriptor: SocketDescriptor, RM: Deref, L: Deref, NS: Deref> PeerManager<D
/// This works around `format!()` taking a reference to each argument, preventing
/// `if let Some(node_id) = peer.their_node_id { format!(.., node_id) } else { .. }` from compiling
/// due to lifetime errors.
struct OptionalFromDebugger<'a>(&'a Option<PublicKey>);
struct OptionalFromDebugger<'a>(&'a Option<(PublicKey, NodeId)>);
impl core::fmt::Display for OptionalFromDebugger<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> {
if let Some(node_id) = self.0 { write!(f, " from {}", log_pubkey!(node_id)) } else { Ok(()) }
if let Some((node_id, _)) = self.0 { write!(f, " from {}", log_pubkey!(node_id)) } else { Ok(()) }
}
}

Expand Down Expand Up @@ -741,7 +747,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
return None;
}
p.their_node_id
}).collect()
}).map(|(node_id, _)| node_id).collect()
}

fn get_ephemeral_key(&self) -> SecretKey {
Expand Down Expand Up @@ -849,7 +855,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
while !peer.awaiting_write_event {
if peer.should_buffer_onion_message() {
if let Some(peer_node_id) = peer.their_node_id {
if let Some((peer_node_id, _)) = peer.their_node_id {
if let Some(next_onion_message) =
self.message_handler.onion_message_handler.next_onion_message_for_peer(peer_node_id) {
self.enqueue_message(peer, &next_onion_message);
Expand Down Expand Up @@ -978,9 +984,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
/// Append a message to a peer's pending outbound/write buffer
fn enqueue_message<M: wire::Type>(&self, peer: &mut Peer, message: &M) {
if is_gossip_msg(message.type_id()) {
log_gossip!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap()));
log_gossip!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0));
} else {
log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap()))
log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0))
}
peer.msgs_sent_since_pong += 1;
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(message));
Expand Down Expand Up @@ -1065,14 +1071,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM

macro_rules! insert_node_id {
() => {
match self.node_id_to_descriptor.lock().unwrap().entry(peer.their_node_id.unwrap()) {
match self.node_id_to_descriptor.lock().unwrap().entry(peer.their_node_id.unwrap().0) {
hash_map::Entry::Occupied(_) => {
log_trace!(self.logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap()));
log_trace!(self.logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap().0));
peer.their_node_id = None; // Unset so that we don't generate a peer_disconnected event
return Err(PeerHandleError{ no_connection_possible: false })
},
hash_map::Entry::Vacant(entry) => {
log_debug!(self.logger, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap()));
log_debug!(self.logger, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap().0));
entry.insert(peer_descriptor.clone())
},
};
Expand All @@ -1096,7 +1102,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
peer.pending_read_buffer = [0; 18].to_vec(); // Message length header is 18 bytes
peer.pending_read_is_header = true;

peer.their_node_id = Some(their_node_id);
peer.set_their_node_id(their_node_id);
insert_node_id!();
let features = self.message_handler.chan_handler.provided_init_features(&their_node_id)
.or(self.message_handler.route_handler.provided_init_features(&their_node_id))
Expand All @@ -1110,7 +1116,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
peer.channel_encryptor.process_act_three(&peer.pending_read_buffer[..]));
peer.pending_read_buffer = [0; 18].to_vec(); // Message length header is 18 bytes
peer.pending_read_is_header = true;
peer.their_node_id = Some(their_node_id);
peer.set_their_node_id(their_node_id);
insert_node_id!();
let features = self.message_handler.chan_handler.provided_init_features(&their_node_id)
.or(self.message_handler.route_handler.provided_init_features(&their_node_id))
Expand Down Expand Up @@ -1212,7 +1218,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
}

for msg in msgs_to_forward.drain(..) {
self.forward_broadcast_msg(&*peers, &msg, peer_node_id.as_ref());
self.forward_broadcast_msg(&*peers, &msg, peer_node_id.as_ref().map(|(pk, _)| pk));
}

Ok(pause_read)
Expand All @@ -1226,7 +1232,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
mut peer_lock: MutexGuard<Peer>,
message: wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>
) -> Result<Option<wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> {
let their_node_id = peer_lock.their_node_id.clone().expect("We know the peer's public key by the time we receive messages");
let their_node_id = peer_lock.their_node_id.clone().expect("We know the peer's public key by the time we receive messages").0;
peer_lock.received_message_since_timer_tick = true;

// Need an Init as first message
Expand Down Expand Up @@ -1467,13 +1473,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
if let Some(their_node_id) = peer.their_node_id {
let their_node_id = NodeId::from_pubkey(&their_node_id);
if let Some((_, their_node_id)) = peer.their_node_id {
if their_node_id == msg.contents.node_id_1 || their_node_id == msg.contents.node_id_2 {
continue;
}
}
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
if except_node.is_some() && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node {
continue;
}
self.enqueue_encoded_gossip_broadcast(&mut *peer, encoded_msg.clone());
Expand All @@ -1493,12 +1498,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
if let Some(their_node_id) = peer.their_node_id {
if NodeId::from_pubkey(&their_node_id) == msg.contents.node_id {
if let Some((_, their_node_id)) = peer.their_node_id {
if their_node_id == msg.contents.node_id {
continue;
}
}
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
if except_node.is_some() && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node {
continue;
}
self.enqueue_encoded_gossip_broadcast(&mut *peer, encoded_msg.clone());
Expand All @@ -1518,7 +1523,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
if except_node.is_some() && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node {
continue;
}
self.enqueue_encoded_gossip_broadcast(&mut *peer, encoded_msg.clone());
Expand Down Expand Up @@ -1837,7 +1842,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
},
Some(peer_lock) => {
let peer = peer_lock.lock().unwrap();
if let Some(node_id) = peer.their_node_id {
if let Some((node_id, _)) = peer.their_node_id {
log_trace!(self.logger,
"Handling disconnection of peer {}, with {}future connection to the peer possible.",
log_pubkey!(node_id), if no_connection_possible { "no " } else { "" });
Expand Down Expand Up @@ -1877,7 +1882,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
self.node_id_to_descriptor.lock().unwrap().clear();
let peers = &mut *peers_lock;
for (mut descriptor, peer) in peers.drain() {
if let Some(node_id) = peer.lock().unwrap().their_node_id {
if let Some((node_id, _)) = peer.lock().unwrap().their_node_id {
log_trace!(self.logger, "Disconnecting peer with id {} due to client request to disconnect all peers", node_id);
self.message_handler.chan_handler.peer_disconnected(&node_id, false);
self.message_handler.onion_message_handler.peer_disconnected(&node_id, false);
Expand Down Expand Up @@ -1967,7 +1972,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
let mut peers_lock = self.peers.write().unwrap();
for descriptor in descriptors_needing_disconnect.iter() {
if let Some(peer) = peers_lock.remove(descriptor) {
if let Some(node_id) = peer.lock().unwrap().their_node_id {
if let Some((node_id, _)) = peer.lock().unwrap().their_node_id {
log_trace!(self.logger, "Disconnecting peer with id {} due to ping timeout", node_id);
self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
self.message_handler.chan_handler.peer_disconnected(&node_id, false);
Expand Down