Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migrated connect_to_unbanned_peer test #8196

Merged
merged 8 commits into from
Dec 12, 2022
4 changes: 2 additions & 2 deletions chain/network/src/peer/peer_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub(crate) enum ClosingReason {
#[error("Received a message of type not allowed on this connection.")]
DisallowedMessage,
#[error("PeerManager requested to close the connection")]
PeerManager,
PeerManagerRequest,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

#[error("Received DisconnectMessage from peer")]
DisconnectMessage,
#[error("Peer clock skew exceeded {MAX_CLOCK_SKEW}")]
Expand Down Expand Up @@ -1551,7 +1551,7 @@ impl actix::Handler<WithSpanContext<Stop>> for PeerActor {
ctx,
match msg.ban_reason {
Some(reason) => ClosingReason::Ban(reason),
None => ClosingReason::PeerManager,
None => ClosingReason::PeerManagerRequest,
},
);
}
Expand Down
9 changes: 1 addition & 8 deletions chain/network/src/peer_manager/peer_manager_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,10 +498,7 @@ impl PeerManagerActor {
let _timer =
metrics::PEER_MANAGER_TRIGGER_TIME.with_label_values(&["monitor_peers"]).start_timer();

self.state.peer_store.unban(&self.clock);
if let Err(err) = self.state.peer_store.update_connected_peers_last_seen(&self.clock) {
tracing::error!(target: "network", ?err, "Failed to update peers last seen time.");
}
self.state.peer_store.update(&self.clock);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice simplification moving all the updates to one function.


if self.is_outbound_bootstrap_needed() {
let tier2 = self.state.tier2.load();
Expand Down Expand Up @@ -547,10 +544,6 @@ impl PeerManagerActor {
// If there are too many active connections try to remove some connections
self.maybe_stop_active_connection();

if let Err(err) = self.state.peer_store.remove_expired(&self.clock) {
tracing::error!(target: "network", ?err, "Failed to remove expired peers");
};

// Find peers that are not reliable (too much behind) - and make sure that we're not routing messages through them.
let unreliable_peers = self.unreliable_peers();
metrics::PEER_UNRELIABLE.set(unreliable_peers.len() as i64);
Expand Down
165 changes: 81 additions & 84 deletions chain/network/src/peer_manager/peer_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ mod testonly;
#[cfg(test)]
mod tests;

/// How often to update the KnownPeerState.last_seen in storage.
const UPDATE_LAST_SEEN_INTERVAL: time::Duration = time::Duration::minutes(1);

/// Level of trust we have about a new (PeerId, Addr) pair.
#[derive(Eq, PartialEq, Debug, Clone, Copy)]
enum TrustLevel {
Expand Down Expand Up @@ -224,16 +227,73 @@ impl Inner {
}
Ok(())
}

/// Removes peers that are not responding for expiration period.
fn remove_expired(&mut self, now: time::Utc) {
let mut to_remove = vec![];
for (peer_id, peer_status) in self.peer_states.iter() {
if peer_status.status != KnownPeerStatus::Connected
&& now > peer_status.last_seen + self.config.peer_expiration_duration
{
tracing::debug!(target: "network", "Removing peer: last seen {:?} ago", now-peer_status.last_seen);
to_remove.push(peer_id.clone());
}
}
if let Err(err) = self.delete_peers(&to_remove) {
tracing::error!(target: "network", ?err, "Failed to remove expired peers");
}
}

fn unban(&mut self, now: time::Utc) {
let mut to_unban = vec![];
for (peer_id, peer_state) in &self.peer_states {
if let KnownPeerStatus::Banned(_, ban_time) = peer_state.status {
if now < ban_time + self.config.ban_window {
continue;
}
tracing::info!(target: "network", unbanned = ?peer_id, ?ban_time, "unbanning a peer");
to_unban.push(peer_id.clone());
}
}
for peer_id in &to_unban {
if let Err(err) = self.peer_unban(&peer_id) {
tracing::error!(target: "network", ?peer_id, ?err, "Failed to unban a peer");
}
}
}

/// Update the 'last_seen' time for all the peers that we're currently connected to.
fn update_last_seen(&mut self, now: time::Utc) {
for (peer_id, peer_state) in self.peer_states.iter_mut() {
if peer_state.status == KnownPeerStatus::Connected
&& now > peer_state.last_seen + UPDATE_LAST_SEEN_INTERVAL
{
peer_state.last_seen = now;
if let Err(err) = self.store.set_peer_state(peer_id, peer_state) {
tracing::error!(target: "network", ?peer_id, ?err, "Failed to update peers last seen time.");
}
}
}
}

/// Cleans up the state of the PeerStore, due to passing time.
/// * it unbans a peer if config.ban_window has passed
/// * it updates KnownPeerStatus.last_seen of the connected peers
/// * it removes peers which were not seen for config.peer_expiration_duration
/// This function should be called periodically.
pub fn update(&mut self, clock: &time::Clock) {
pompon0 marked this conversation as resolved.
Show resolved Hide resolved
let now = clock.now_utc();
// TODO(gprusak): these operations could be put into a single DB write transaction.
self.unban(now);
self.update_last_seen(now);
self.remove_expired(now);
}
}

pub(crate) struct PeerStore(Mutex<Inner>);

impl PeerStore {
pub(crate) fn new(
clock: &time::Clock,
config: Config,
store: store::Store,
) -> anyhow::Result<Self> {
pub fn new(clock: &time::Clock, config: Config, store: store::Store) -> anyhow::Result<Self> {
let boot_nodes: HashSet<_> = config.boot_nodes.iter().map(|p| p.id.clone()).collect();
// A mapping from `PeerId` to `KnownPeerState`.
let mut peerid_2_state = HashMap::default();
Expand Down Expand Up @@ -345,29 +405,29 @@ impl PeerStore {
self.0.lock().config.blacklist.contains(*addr)
}

pub(crate) fn len(&self) -> usize {
pub fn len(&self) -> usize {
self.0.lock().peer_states.len()
}

pub(crate) fn is_banned(&self, peer_id: &PeerId) -> bool {
pub fn is_banned(&self, peer_id: &PeerId) -> bool {
self.0.lock().peer_states.get(peer_id).map_or(false, |s| s.status.is_banned())
}

pub(crate) fn count_banned(&self) -> usize {
pub fn count_banned(&self) -> usize {
self.0.lock().peer_states.values().filter(|st| st.status.is_banned()).count()
}

pub fn update(&self, clock: &time::Clock) {
self.0.lock().update(clock)
}

#[allow(dead_code)]
/// Returns the state of the current peer in memory.
pub(crate) fn get_peer_state(&self, peer_id: &PeerId) -> Option<KnownPeerState> {
pub fn get_peer_state(&self, peer_id: &PeerId) -> Option<KnownPeerState> {
self.0.lock().peer_states.get(peer_id).cloned()
}

pub(crate) fn peer_connected(
&self,
clock: &time::Clock,
peer_info: &PeerInfo,
) -> anyhow::Result<()> {
pub fn peer_connected(&self, clock: &time::Clock, peer_info: &PeerInfo) -> anyhow::Result<()> {
let mut inner = self.0.lock();
inner.add_signed_peer(clock, peer_info.clone())?;
let mut store = inner.store.clone();
Expand All @@ -377,29 +437,7 @@ impl PeerStore {
Ok(store.set_peer_state(&peer_info.id, entry)?)
}

/// Update the 'last_seen' time for all the peers that we're currently connected to.
pub(crate) fn update_connected_peers_last_seen(
&self,
clock: &time::Clock,
) -> anyhow::Result<()> {
let mut inner = self.0.lock();
let mut store = inner.store.clone();
for (peer_id, peer_state) in inner.peer_states.iter_mut() {
if peer_state.status == KnownPeerStatus::Connected
&& clock.now_utc() > peer_state.last_seen.saturating_add(time::Duration::minutes(1))
{
peer_state.last_seen = clock.now_utc();
store.set_peer_state(peer_id, peer_state)?
}
}
Ok(())
}

pub(crate) fn peer_disconnected(
&self,
clock: &time::Clock,
peer_id: &PeerId,
) -> anyhow::Result<()> {
pub fn peer_disconnected(&self, clock: &time::Clock, peer_id: &PeerId) -> anyhow::Result<()> {
let mut inner = self.0.lock();
let mut store = inner.store.clone();
if let Some(peer_state) = inner.peer_states.get_mut(peer_id) {
Expand All @@ -414,7 +452,7 @@ impl PeerStore {

/// Records the last attempt to connect to peer.
/// Marks the peer as Unknown (as we failed to connect to it).
pub(crate) fn peer_connection_attempt(
pub fn peer_connection_attempt(
&self,
clock: &time::Clock,
peer_id: &PeerId,
Expand All @@ -437,7 +475,7 @@ impl PeerStore {
Ok(())
}

pub(crate) fn peer_ban(
pub fn peer_ban(
&self,
clock: &time::Clock,
peer_id: &PeerId,
Expand All @@ -459,7 +497,7 @@ impl PeerStore {

/// Return unconnected or peers with unknown status that we can try to connect to.
/// Peers with unknown addresses are filtered out.
pub(crate) fn unconnected_peer(
pub fn unconnected_peer(
&self,
ignore_fn: impl Fn(&KnownPeerState) -> bool,
prefer_previously_connected_peer: bool,
Expand Down Expand Up @@ -499,37 +537,20 @@ impl PeerStore {
}

/// Return healthy known peers up to given amount.
pub(crate) fn healthy_peers(&self, max_count: usize) -> Vec<PeerInfo> {
pub fn healthy_peers(&self, max_count: usize) -> Vec<PeerInfo> {
self.0
.lock()
.find_peers(|p| matches!(p.status, KnownPeerStatus::Banned(_, _)).not(), max_count)
}

/// Removes peers that are not responding for expiration period.
pub(crate) fn remove_expired(&self, clock: &time::Clock) -> anyhow::Result<()> {
let mut inner = self.0.lock();
let now = clock.now_utc();
let mut to_remove = vec![];
for (peer_id, peer_status) in inner.peer_states.iter() {
let diff = now - peer_status.last_seen;
if peer_status.status != KnownPeerStatus::Connected
&& diff > inner.config.peer_expiration_duration
{
tracing::debug!(target: "network", "Removing peer: last seen {:?} ago", diff);
to_remove.push(peer_id.clone());
}
}
inner.delete_peers(&to_remove)
}

/// Adds peers we’ve learned about from other peers.
///
/// Identities of the nodes hasn’t been verified in any way. We don’t even
/// know if there is anything running at given addresses and even if there
/// are nodes there we haven’t received signatures of their peer ID.
///
/// See also [`Self::add_direct_peer`] and [`Self::add_signed_peer`].
pub(crate) fn add_indirect_peers(
pub fn add_indirect_peers(
&self,
clock: &time::Clock,
peers: impl Iterator<Item = PeerInfo>,
Expand Down Expand Up @@ -561,34 +582,10 @@ impl PeerStore {
/// confirming that identity yet.
///
/// See also [`Self::add_indirect_peers`] and [`Self::add_signed_peer`].
pub(crate) fn add_direct_peer(
&self,
clock: &time::Clock,
peer_info: PeerInfo,
) -> anyhow::Result<()> {
pub fn add_direct_peer(&self, clock: &time::Clock, peer_info: PeerInfo) -> anyhow::Result<()> {
self.0.lock().add_peer(clock, peer_info, TrustLevel::Direct)
}

pub fn unban(&self, clock: &time::Clock) {
let mut inner = self.0.lock();
let now = clock.now_utc();
let mut to_unban = vec![];
for (peer_id, peer_state) in &inner.peer_states {
if let KnownPeerStatus::Banned(_, ban_time) = peer_state.status {
if now < ban_time + inner.config.ban_window {
continue;
}
tracing::info!(target: "network", unbanned = ?peer_id, ?ban_time, "unbanning a peer");
to_unban.push(peer_id.clone());
}
}
for peer_id in &to_unban {
if let Err(err) = inner.peer_unban(&peer_id) {
tracing::error!(target: "network", ?err, "Failed to unban a peer");
}
}
}

pub fn load(&self) -> HashMap<PeerId, KnownPeerState> {
self.0.lock().peer_states.clone()
}
Expand Down
34 changes: 29 additions & 5 deletions chain/network/src/peer_manager/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::testonly::fake_client;
use crate::time;
use crate::types::{
AccountKeys, ChainInfo, KnownPeerStatus, NetworkRequests, PeerManagerMessageRequest,
ReasonForBan,
};
use crate::PeerManagerActor;
use near_o11y::WithSpanContextExt;
Expand Down Expand Up @@ -151,7 +152,7 @@ impl ActorHandler {
&self,
peer_info: &PeerInfo,
tier: tcp::Tier,
) -> impl 'static + Send + Future<Output = ()> {
) -> impl 'static + Send + Future<Output = tcp::StreamId> {
let addr = self.actix.addr.clone();
let events = self.events.clone();
let peer_info = peer_info.clone();
Expand All @@ -165,7 +166,7 @@ impl ActorHandler {
Event::PeerManager(PME::HandshakeCompleted(ev))
if ev.stream_id == stream_id =>
{
Some(())
Some(stream_id)
}
Event::PeerManager(PME::ConnectionClosed(ev)) if ev.stream_id == stream_id => {
panic!("PeerManager rejected the handshake")
Expand Down Expand Up @@ -324,6 +325,25 @@ impl ActorHandler {
self.with_state(move |s| async move { s.tier1_advertise_proxies(&clock).await }).await
}

pub async fn disconnect_and_ban(
&self,
clock: &time::Clock,
peer_id: &PeerId,
reason: ReasonForBan,
) {
// TODO(gprusak): make it wait asynchronously for the connection to get closed.
// TODO(gprusak): figure out how to await for both ends to disconnect.
let clock = clock.clone();
let peer_id = peer_id.clone();
self.with_state(move |s| async move { s.disconnect_and_ban(&clock, &peer_id, reason) })
.await
}

pub async fn peer_store_update(&self, clock: &time::Clock) {
let clock = clock.clone();
self.with_state(move |s| async move { s.peer_store.update(&clock) }).await;
}

pub async fn send_ping(&self, nonce: u64, target: PeerId) {
self.actix
.addr
Expand Down Expand Up @@ -452,7 +472,7 @@ pub(crate) async fn start(
cfg: config::NetworkConfig,
chain: Arc<data::Chain>,
) -> ActorHandler {
let (send, recv) = broadcast::unbounded_channel();
let (send, mut recv) = broadcast::unbounded_channel();
let actix = ActixSystem::spawn({
let mut cfg = cfg.clone();
let chain = chain.clone();
Expand All @@ -464,9 +484,13 @@ pub(crate) async fn start(
}
})
.await;
let mut h = ActorHandler { cfg, actix, events: recv };
let h = ActorHandler { cfg, actix, events: recv.clone() };
// Wait for the server to start.
assert_eq!(Event::PeerManager(PME::ServerStarted), h.events.recv().await);
recv.recv_until(|ev| match ev {
Event::PeerManager(PME::ServerStarted) => Some(()),
_ => None,
})
.await;
h.set_chain_info(chain.get_chain_info()).await;
h
}
Loading