From 2cf61d662137a7489e560ce733fc4080d062ddba Mon Sep 17 00:00:00 2001 From: Stefan Pingel <16143240+pinges@users.noreply.github.com> Date: Thu, 21 Dec 2023 10:22:35 +1000 Subject: [PATCH] add missing check for static peers to allow them to exceed the connection limit (#6316) * add check for static peers to allow them to exceed the connection limit * change logging disonnetction in EthPeers * returning best peers only returns fully validated peers Signed-off-by: stefan.pingel@consensys.net --- .../besu/ethereum/eth/manager/EthPeers.java | 55 +++++++++---------- .../AbstractRetryingSwitchingPeerTask.java | 2 +- .../AbstractPeerBlockValidatorTest.java | 2 +- .../p2p/peers/DefaultPeerPrivileges.java | 6 +- .../ethereum/p2p/peers/PeerPrivileges.java | 6 +- .../besu/ethereum/p2p/rlpx/RlpxAgent.java | 4 +- 6 files changed, 37 insertions(+), 38 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java index a5af86551bb..1c7da3fd2b6 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeers.java @@ -37,7 +37,6 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutionException; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -67,6 +66,7 @@ public class EthPeers { Comparator.comparing(EthPeer::outstandingRequests) .thenComparing(EthPeer::getLastRequestTimestamp); public static final int NODE_ID_LENGTH = 64; + public static final int USEFULL_PEER_SCORE_THRESHOLD = 102; private final Map completeConnections = new ConcurrentHashMap<>(); @@ -182,9 +182,8 @@ private List getIncompleteConnections(final Bytes id) { } public boolean registerDisconnect(final PeerConnection connection) { - final Bytes id = connection.getPeer().getId(); - final EthPeer peer = completeConnections.get(id); - return registerDisconnect(id, peer, connection); + final EthPeer peer = peer(connection); + return registerDisconnect(peer.getId(), peer, connection); } private boolean registerDisconnect( @@ -197,8 +196,11 @@ private boolean registerDisconnect( disconnectCallbacks.forEach(callback -> callback.onDisconnect(peer)); peer.handleDisconnect(); abortPendingRequestsAssignedToDisconnectedPeers(); - LOG.debug("Disconnected EthPeer {}", peer.getShortNodeId()); - LOG.trace("Disconnected EthPeer {}", peer); + if (peer.getReputation().getScore() > USEFULL_PEER_SCORE_THRESHOLD) { + LOG.debug("Disonnected USEFULL peer {}", peer); + } else { + LOG.debug("Disconnected EthPeer {}", peer.getShortNodeId()); + } } } reattemptPendingPeerRequests(); @@ -220,16 +222,8 @@ private void abortPendingRequestsAssignedToDisconnectedPeers() { } public EthPeer peer(final PeerConnection connection) { - try { - return incompleteConnections.get( - connection, () -> completeConnections.get(connection.getPeer().getId())); - } catch (final ExecutionException e) { - throw new RuntimeException(e); - } - } - - public EthPeer peer(final Bytes peerId) { - return completeConnections.get(peerId); + final EthPeer ethPeer = incompleteConnections.getIfPresent(connection); + return ethPeer != null ? ethPeer : completeConnections.get(connection.getPeer().getId()); } public PendingPeerRequest executePeerRequest( @@ -321,7 +315,9 @@ public Stream streamAvailablePeers() { } public Stream streamBestPeers() { - return streamAvailablePeers().sorted(getBestChainComparator().reversed()); + return streamAvailablePeers() + .filter(EthPeer::isFullyValidated) + .sorted(getBestChainComparator().reversed()); } public Optional bestPeer() { @@ -365,10 +361,10 @@ public Stream getAllConnections() { } public boolean shouldConnect(final Peer peer, final boolean inbound) { - if (peerCount() >= peerUpperBound) { + final Bytes id = peer.getId(); + if (peerCount() >= peerUpperBound && !canExceedPeerLimits(id)) { return false; } - final Bytes id = peer.getId(); final EthPeer ethPeer = completeConnections.get(id); if (ethPeer != null && !ethPeer.isDisconnected()) { return false; @@ -428,8 +424,8 @@ private void ethPeerStatusExchanged(final EthPeer peer) { private int comparePeerPriorities(final EthPeer p1, final EthPeer p2) { final PeerConnection a = p1.getConnection(); final PeerConnection b = p2.getConnection(); - final boolean aCanExceedPeerLimits = canExceedPeerLimits(a); - final boolean bCanExceedPeerLimits = canExceedPeerLimits(b); + final boolean aCanExceedPeerLimits = canExceedPeerLimits(a.getPeer().getId()); + final boolean bCanExceedPeerLimits = canExceedPeerLimits(b.getPeer().getId()); if (aCanExceedPeerLimits && !bCanExceedPeerLimits) { return -1; } else if (bCanExceedPeerLimits && !aCanExceedPeerLimits) { @@ -441,11 +437,11 @@ private int comparePeerPriorities(final EthPeer p1, final EthPeer p2) { } } - private boolean canExceedPeerLimits(final PeerConnection a) { + private boolean canExceedPeerLimits(final Bytes peerId) { if (rlpxAgent == null) { - return true; + return false; } - return rlpxAgent.canExceedConnectionLimits(a.getPeer()); + return rlpxAgent.canExceedConnectionLimits(peerId); } private int compareConnectionInitiationTimes(final PeerConnection a, final PeerConnection b) { @@ -464,7 +460,7 @@ private void enforceRemoteConnectionLimits() { getActivePrioritizedPeers() .filter(p -> p.getConnection().inboundInitiated()) - .filter(p -> !canExceedPeerLimits(p.getConnection())) + .filter(p -> !canExceedPeerLimits(p.getId())) .skip(maxRemotelyInitiatedConnections) .forEach( conn -> { @@ -488,10 +484,9 @@ private void enforceConnectionLimits() { return; } getActivePrioritizedPeers() - .filter(p -> !p.isDisconnected()) .skip(peerUpperBound) .map(EthPeer::getConnection) - .filter(c -> !canExceedPeerLimits(c)) + .filter(c -> !canExceedPeerLimits(c.getPeer().getId())) .forEach( conn -> { LOG.trace( @@ -516,7 +511,7 @@ private long countUntrustedRemotelyInitiatedConnections() { .map(ep -> ep.getConnection()) .filter(c -> c.inboundInitiated()) .filter(c -> !c.isDisconnected()) - .filter(conn -> !canExceedPeerLimits(conn)) + .filter(conn -> !canExceedPeerLimits(conn.getPeer().getId())) .count(); } @@ -542,7 +537,7 @@ private boolean addPeerToEthPeers(final EthPeer peer) { final Bytes id = peer.getId(); if (!randomPeerPriority) { // Disconnect if too many peers - if (!canExceedPeerLimits(connection) && peerCount() >= peerUpperBound) { + if (!canExceedPeerLimits(id) && peerCount() >= peerUpperBound) { LOG.trace( "Too many peers. Disconnect connection: {}, max connections {}", connection, @@ -552,7 +547,7 @@ private boolean addPeerToEthPeers(final EthPeer peer) { } // Disconnect if too many remotely-initiated connections if (connection.inboundInitiated() - && !canExceedPeerLimits(connection) + && !canExceedPeerLimits(id) && remoteConnectionLimitReached()) { LOG.trace( "Too many remotely-initiated connections. Disconnect incoming connection: {}, maxRemote={}", diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java index f9400a6dac6..06c92c76204 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java @@ -140,7 +140,7 @@ private void refreshPeers() { failedPeers.stream() .filter(peer -> !peer.isDisconnected()) .findAny() - .or(() -> peers.streamAvailablePeers().sorted(peers.getBestChainComparator()).findFirst()) + .or(() -> peers.streamAvailablePeers().min(peers.getBestChainComparator())) .ifPresent( peer -> { LOG.atDebug() diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/peervalidation/AbstractPeerBlockValidatorTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/peervalidation/AbstractPeerBlockValidatorTest.java index 1781428531a..4ebdcb69d4b 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/peervalidation/AbstractPeerBlockValidatorTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/peervalidation/AbstractPeerBlockValidatorTest.java @@ -71,7 +71,7 @@ public void validatePeer_requestBlockFromPeerBeingTested() { final PeerValidator validator = createValidator(blockNumber, 0); - final int peerCount = 1000; + final int peerCount = 24; final List otherPeers = Stream.generate( () -> EthProtocolManagerTestUtil.createPeer(ethProtocolManager, blockNumber)) diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/peers/DefaultPeerPrivileges.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/peers/DefaultPeerPrivileges.java index 535a49f2ac6..c52b24f61cb 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/peers/DefaultPeerPrivileges.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/peers/DefaultPeerPrivileges.java @@ -14,6 +14,8 @@ */ package org.hyperledger.besu.ethereum.p2p.peers; +import org.apache.tuweni.bytes.Bytes; + public class DefaultPeerPrivileges implements PeerPrivileges { private final MaintainedPeers maintainedPeers; @@ -22,7 +24,7 @@ public DefaultPeerPrivileges(final MaintainedPeers maintainedPeers) { } @Override - public boolean canExceedConnectionLimits(final Peer peer) { - return maintainedPeers.contains(peer); + public boolean canExceedConnectionLimits(final Bytes peerId) { + return maintainedPeers.streamPeers().anyMatch(p -> p.getId().equals(peerId)); } } diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/peers/PeerPrivileges.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/peers/PeerPrivileges.java index e99b1a39729..6447522e287 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/peers/PeerPrivileges.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/peers/PeerPrivileges.java @@ -14,14 +14,16 @@ */ package org.hyperledger.besu.ethereum.p2p.peers; +import org.apache.tuweni.bytes.Bytes; + public interface PeerPrivileges { /** * If true, the given peer can connect or remain connected even if the max connection limit or the * maximum remote connection limit has been reached or exceeded. * - * @param peer The peer to be checked. + * @param peerId The peer id to be checked. * @return {@code true} if the peer should be allowed to connect regardless of connection limits. */ - boolean canExceedConnectionLimits(final Peer peer); + boolean canExceedConnectionLimits(final Bytes peerId); } diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgent.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgent.java index 6cf250d715b..98a1f60df37 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgent.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/rlpx/RlpxAgent.java @@ -310,8 +310,8 @@ private CompletableFuture initiateOutboundConnection(final Peer }); } - public boolean canExceedConnectionLimits(final Peer peer) { - return peerPrivileges.canExceedConnectionLimits(peer); + public boolean canExceedConnectionLimits(final Bytes peerId) { + return peerPrivileges.canExceedConnectionLimits(peerId); } private void handleIncomingConnection(final PeerConnection peerConnection) {