Skip to content

Commit

Permalink
migrated connect_to_unbanned_peer test (#8196)
Browse files Browse the repository at this point in the history
Migrated connect_to_unbanned_peer test from integration-tests to near-network.
It was flaky when executed in presubmit. I've made it more synchronized and faked out time, so now it shouldn't cause problems any more. I've tested it locally for flakiness (multiple concurrent runs in a loop) and observed no problems.

To accomodate the test logic, I reorganized a bit the implementation of some PeerStore methods, but no semantic change was introduced there.
  • Loading branch information
pompon0 authored Dec 12, 2022
1 parent 1f0c57e commit 783658f
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 131 deletions.
4 changes: 2 additions & 2 deletions chain/network/src/peer/peer_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub(crate) enum ClosingReason {
#[error("Received a message of type not allowed on this connection.")]
DisallowedMessage,
#[error("PeerManager requested to close the connection")]
PeerManager,
PeerManagerRequest,
#[error("Received DisconnectMessage from peer")]
DisconnectMessage,
#[error("Peer clock skew exceeded {MAX_CLOCK_SKEW}")]
Expand Down Expand Up @@ -1551,7 +1551,7 @@ impl actix::Handler<WithSpanContext<Stop>> for PeerActor {
ctx,
match msg.ban_reason {
Some(reason) => ClosingReason::Ban(reason),
None => ClosingReason::PeerManager,
None => ClosingReason::PeerManagerRequest,
},
);
}
Expand Down
9 changes: 1 addition & 8 deletions chain/network/src/peer_manager/peer_manager_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,10 +498,7 @@ impl PeerManagerActor {
let _timer =
metrics::PEER_MANAGER_TRIGGER_TIME.with_label_values(&["monitor_peers"]).start_timer();

self.state.peer_store.unban(&self.clock);
if let Err(err) = self.state.peer_store.update_connected_peers_last_seen(&self.clock) {
tracing::error!(target: "network", ?err, "Failed to update peers last seen time.");
}
self.state.peer_store.update(&self.clock);

if self.is_outbound_bootstrap_needed() {
let tier2 = self.state.tier2.load();
Expand Down Expand Up @@ -547,10 +544,6 @@ impl PeerManagerActor {
// If there are too many active connections try to remove some connections
self.maybe_stop_active_connection();

if let Err(err) = self.state.peer_store.remove_expired(&self.clock) {
tracing::error!(target: "network", ?err, "Failed to remove expired peers");
};

// Find peers that are not reliable (too much behind) - and make sure that we're not routing messages through them.
let unreliable_peers = self.unreliable_peers();
metrics::PEER_UNRELIABLE.set(unreliable_peers.len() as i64);
Expand Down
165 changes: 81 additions & 84 deletions chain/network/src/peer_manager/peer_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ mod testonly;
#[cfg(test)]
mod tests;

/// How often to update the KnownPeerState.last_seen in storage.
const UPDATE_LAST_SEEN_INTERVAL: time::Duration = time::Duration::minutes(1);

/// Level of trust we have about a new (PeerId, Addr) pair.
#[derive(Eq, PartialEq, Debug, Clone, Copy)]
enum TrustLevel {
Expand Down Expand Up @@ -224,16 +227,73 @@ impl Inner {
}
Ok(())
}

/// Removes peers that are not responding for expiration period.
fn remove_expired(&mut self, now: time::Utc) {
let mut to_remove = vec![];
for (peer_id, peer_status) in self.peer_states.iter() {
if peer_status.status != KnownPeerStatus::Connected
&& now > peer_status.last_seen + self.config.peer_expiration_duration
{
tracing::debug!(target: "network", "Removing peer: last seen {:?} ago", now-peer_status.last_seen);
to_remove.push(peer_id.clone());
}
}
if let Err(err) = self.delete_peers(&to_remove) {
tracing::error!(target: "network", ?err, "Failed to remove expired peers");
}
}

fn unban(&mut self, now: time::Utc) {
let mut to_unban = vec![];
for (peer_id, peer_state) in &self.peer_states {
if let KnownPeerStatus::Banned(_, ban_time) = peer_state.status {
if now < ban_time + self.config.ban_window {
continue;
}
tracing::info!(target: "network", unbanned = ?peer_id, ?ban_time, "unbanning a peer");
to_unban.push(peer_id.clone());
}
}
for peer_id in &to_unban {
if let Err(err) = self.peer_unban(&peer_id) {
tracing::error!(target: "network", ?peer_id, ?err, "Failed to unban a peer");
}
}
}

/// Update the 'last_seen' time for all the peers that we're currently connected to.
fn update_last_seen(&mut self, now: time::Utc) {
for (peer_id, peer_state) in self.peer_states.iter_mut() {
if peer_state.status == KnownPeerStatus::Connected
&& now > peer_state.last_seen + UPDATE_LAST_SEEN_INTERVAL
{
peer_state.last_seen = now;
if let Err(err) = self.store.set_peer_state(peer_id, peer_state) {
tracing::error!(target: "network", ?peer_id, ?err, "Failed to update peers last seen time.");
}
}
}
}

/// Cleans up the state of the PeerStore, due to passing time.
/// * it unbans a peer if config.ban_window has passed
/// * it updates KnownPeerStatus.last_seen of the connected peers
/// * it removes peers which were not seen for config.peer_expiration_duration
/// This function should be called periodically.
pub fn update(&mut self, clock: &time::Clock) {
let now = clock.now_utc();
// TODO(gprusak): these operations could be put into a single DB write transaction.
self.unban(now);
self.update_last_seen(now);
self.remove_expired(now);
}
}

pub(crate) struct PeerStore(Mutex<Inner>);

impl PeerStore {
pub(crate) fn new(
clock: &time::Clock,
config: Config,
store: store::Store,
) -> anyhow::Result<Self> {
pub fn new(clock: &time::Clock, config: Config, store: store::Store) -> anyhow::Result<Self> {
let boot_nodes: HashSet<_> = config.boot_nodes.iter().map(|p| p.id.clone()).collect();
// A mapping from `PeerId` to `KnownPeerState`.
let mut peerid_2_state = HashMap::default();
Expand Down Expand Up @@ -345,29 +405,29 @@ impl PeerStore {
self.0.lock().config.blacklist.contains(*addr)
}

pub(crate) fn len(&self) -> usize {
pub fn len(&self) -> usize {
self.0.lock().peer_states.len()
}

pub(crate) fn is_banned(&self, peer_id: &PeerId) -> bool {
pub fn is_banned(&self, peer_id: &PeerId) -> bool {
self.0.lock().peer_states.get(peer_id).map_or(false, |s| s.status.is_banned())
}

pub(crate) fn count_banned(&self) -> usize {
pub fn count_banned(&self) -> usize {
self.0.lock().peer_states.values().filter(|st| st.status.is_banned()).count()
}

pub fn update(&self, clock: &time::Clock) {
self.0.lock().update(clock)
}

#[allow(dead_code)]
/// Returns the state of the current peer in memory.
pub(crate) fn get_peer_state(&self, peer_id: &PeerId) -> Option<KnownPeerState> {
pub fn get_peer_state(&self, peer_id: &PeerId) -> Option<KnownPeerState> {
self.0.lock().peer_states.get(peer_id).cloned()
}

pub(crate) fn peer_connected(
&self,
clock: &time::Clock,
peer_info: &PeerInfo,
) -> anyhow::Result<()> {
pub fn peer_connected(&self, clock: &time::Clock, peer_info: &PeerInfo) -> anyhow::Result<()> {
let mut inner = self.0.lock();
inner.add_signed_peer(clock, peer_info.clone())?;
let mut store = inner.store.clone();
Expand All @@ -377,29 +437,7 @@ impl PeerStore {
Ok(store.set_peer_state(&peer_info.id, entry)?)
}

/// Update the 'last_seen' time for all the peers that we're currently connected to.
pub(crate) fn update_connected_peers_last_seen(
&self,
clock: &time::Clock,
) -> anyhow::Result<()> {
let mut inner = self.0.lock();
let mut store = inner.store.clone();
for (peer_id, peer_state) in inner.peer_states.iter_mut() {
if peer_state.status == KnownPeerStatus::Connected
&& clock.now_utc() > peer_state.last_seen.saturating_add(time::Duration::minutes(1))
{
peer_state.last_seen = clock.now_utc();
store.set_peer_state(peer_id, peer_state)?
}
}
Ok(())
}

pub(crate) fn peer_disconnected(
&self,
clock: &time::Clock,
peer_id: &PeerId,
) -> anyhow::Result<()> {
pub fn peer_disconnected(&self, clock: &time::Clock, peer_id: &PeerId) -> anyhow::Result<()> {
let mut inner = self.0.lock();
let mut store = inner.store.clone();
if let Some(peer_state) = inner.peer_states.get_mut(peer_id) {
Expand All @@ -414,7 +452,7 @@ impl PeerStore {

/// Records the last attempt to connect to peer.
/// Marks the peer as Unknown (as we failed to connect to it).
pub(crate) fn peer_connection_attempt(
pub fn peer_connection_attempt(
&self,
clock: &time::Clock,
peer_id: &PeerId,
Expand All @@ -437,7 +475,7 @@ impl PeerStore {
Ok(())
}

pub(crate) fn peer_ban(
pub fn peer_ban(
&self,
clock: &time::Clock,
peer_id: &PeerId,
Expand All @@ -459,7 +497,7 @@ impl PeerStore {

/// Return unconnected or peers with unknown status that we can try to connect to.
/// Peers with unknown addresses are filtered out.
pub(crate) fn unconnected_peer(
pub fn unconnected_peer(
&self,
ignore_fn: impl Fn(&KnownPeerState) -> bool,
prefer_previously_connected_peer: bool,
Expand Down Expand Up @@ -499,37 +537,20 @@ impl PeerStore {
}

/// Return healthy known peers up to given amount.
pub(crate) fn healthy_peers(&self, max_count: usize) -> Vec<PeerInfo> {
pub fn healthy_peers(&self, max_count: usize) -> Vec<PeerInfo> {
self.0
.lock()
.find_peers(|p| matches!(p.status, KnownPeerStatus::Banned(_, _)).not(), max_count)
}

/// Removes peers that are not responding for expiration period.
pub(crate) fn remove_expired(&self, clock: &time::Clock) -> anyhow::Result<()> {
let mut inner = self.0.lock();
let now = clock.now_utc();
let mut to_remove = vec![];
for (peer_id, peer_status) in inner.peer_states.iter() {
let diff = now - peer_status.last_seen;
if peer_status.status != KnownPeerStatus::Connected
&& diff > inner.config.peer_expiration_duration
{
tracing::debug!(target: "network", "Removing peer: last seen {:?} ago", diff);
to_remove.push(peer_id.clone());
}
}
inner.delete_peers(&to_remove)
}

/// Adds peers we’ve learned about from other peers.
///
/// Identities of the nodes hasn’t been verified in any way. We don’t even
/// know if there is anything running at given addresses and even if there
/// are nodes there we haven’t received signatures of their peer ID.
///
/// See also [`Self::add_direct_peer`] and [`Self::add_signed_peer`].
pub(crate) fn add_indirect_peers(
pub fn add_indirect_peers(
&self,
clock: &time::Clock,
peers: impl Iterator<Item = PeerInfo>,
Expand Down Expand Up @@ -561,34 +582,10 @@ impl PeerStore {
/// confirming that identity yet.
///
/// See also [`Self::add_indirect_peers`] and [`Self::add_signed_peer`].
pub(crate) fn add_direct_peer(
&self,
clock: &time::Clock,
peer_info: PeerInfo,
) -> anyhow::Result<()> {
pub fn add_direct_peer(&self, clock: &time::Clock, peer_info: PeerInfo) -> anyhow::Result<()> {
self.0.lock().add_peer(clock, peer_info, TrustLevel::Direct)
}

pub fn unban(&self, clock: &time::Clock) {
let mut inner = self.0.lock();
let now = clock.now_utc();
let mut to_unban = vec![];
for (peer_id, peer_state) in &inner.peer_states {
if let KnownPeerStatus::Banned(_, ban_time) = peer_state.status {
if now < ban_time + inner.config.ban_window {
continue;
}
tracing::info!(target: "network", unbanned = ?peer_id, ?ban_time, "unbanning a peer");
to_unban.push(peer_id.clone());
}
}
for peer_id in &to_unban {
if let Err(err) = inner.peer_unban(&peer_id) {
tracing::error!(target: "network", ?err, "Failed to unban a peer");
}
}
}

pub fn load(&self) -> HashMap<PeerId, KnownPeerState> {
self.0.lock().peer_states.clone()
}
Expand Down
34 changes: 29 additions & 5 deletions chain/network/src/peer_manager/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::testonly::fake_client;
use crate::time;
use crate::types::{
AccountKeys, ChainInfo, KnownPeerStatus, NetworkRequests, PeerManagerMessageRequest,
ReasonForBan,
};
use crate::PeerManagerActor;
use near_o11y::WithSpanContextExt;
Expand Down Expand Up @@ -151,7 +152,7 @@ impl ActorHandler {
&self,
peer_info: &PeerInfo,
tier: tcp::Tier,
) -> impl 'static + Send + Future<Output = ()> {
) -> impl 'static + Send + Future<Output = tcp::StreamId> {
let addr = self.actix.addr.clone();
let events = self.events.clone();
let peer_info = peer_info.clone();
Expand All @@ -165,7 +166,7 @@ impl ActorHandler {
Event::PeerManager(PME::HandshakeCompleted(ev))
if ev.stream_id == stream_id =>
{
Some(())
Some(stream_id)
}
Event::PeerManager(PME::ConnectionClosed(ev)) if ev.stream_id == stream_id => {
panic!("PeerManager rejected the handshake")
Expand Down Expand Up @@ -324,6 +325,25 @@ impl ActorHandler {
self.with_state(move |s| async move { s.tier1_advertise_proxies(&clock).await }).await
}

pub async fn disconnect_and_ban(
&self,
clock: &time::Clock,
peer_id: &PeerId,
reason: ReasonForBan,
) {
// TODO(gprusak): make it wait asynchronously for the connection to get closed.
// TODO(gprusak): figure out how to await for both ends to disconnect.
let clock = clock.clone();
let peer_id = peer_id.clone();
self.with_state(move |s| async move { s.disconnect_and_ban(&clock, &peer_id, reason) })
.await
}

pub async fn peer_store_update(&self, clock: &time::Clock) {
let clock = clock.clone();
self.with_state(move |s| async move { s.peer_store.update(&clock) }).await;
}

pub async fn send_ping(&self, nonce: u64, target: PeerId) {
self.actix
.addr
Expand Down Expand Up @@ -452,7 +472,7 @@ pub(crate) async fn start(
cfg: config::NetworkConfig,
chain: Arc<data::Chain>,
) -> ActorHandler {
let (send, recv) = broadcast::unbounded_channel();
let (send, mut recv) = broadcast::unbounded_channel();
let actix = ActixSystem::spawn({
let mut cfg = cfg.clone();
let chain = chain.clone();
Expand All @@ -464,9 +484,13 @@ pub(crate) async fn start(
}
})
.await;
let mut h = ActorHandler { cfg, actix, events: recv };
let h = ActorHandler { cfg, actix, events: recv.clone() };
// Wait for the server to start.
assert_eq!(Event::PeerManager(PME::ServerStarted), h.events.recv().await);
recv.recv_until(|ev| match ev {
Event::PeerManager(PME::ServerStarted) => Some(()),
_ => None,
})
.await;
h.set_chain_info(chain.get_chain_info()).await;
h
}
Loading

0 comments on commit 783658f

Please sign in to comment.