Skip to content

Commit

Permalink
add missing check for static peers to allow them to exceed the connec…
Browse files Browse the repository at this point in the history
…tion limit (hyperledger#6316)

* add check for static peers to allow them to exceed the connection limit
* change logging disonnetction in EthPeers
* returning best peers only returns fully validated peers

Signed-off-by: stefan.pingel@consensys.net <stefan.pingel@consensys.net>
  • Loading branch information
pinges authored Dec 21, 2023
1 parent 175895d commit 2cf61d6
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -67,6 +66,7 @@ public class EthPeers {
Comparator.comparing(EthPeer::outstandingRequests)
.thenComparing(EthPeer::getLastRequestTimestamp);
public static final int NODE_ID_LENGTH = 64;
public static final int USEFULL_PEER_SCORE_THRESHOLD = 102;

private final Map<Bytes, EthPeer> completeConnections = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -182,9 +182,8 @@ private List<PeerConnection> getIncompleteConnections(final Bytes id) {
}

public boolean registerDisconnect(final PeerConnection connection) {
final Bytes id = connection.getPeer().getId();
final EthPeer peer = completeConnections.get(id);
return registerDisconnect(id, peer, connection);
final EthPeer peer = peer(connection);
return registerDisconnect(peer.getId(), peer, connection);
}

private boolean registerDisconnect(
Expand All @@ -197,8 +196,11 @@ private boolean registerDisconnect(
disconnectCallbacks.forEach(callback -> callback.onDisconnect(peer));
peer.handleDisconnect();
abortPendingRequestsAssignedToDisconnectedPeers();
LOG.debug("Disconnected EthPeer {}", peer.getShortNodeId());
LOG.trace("Disconnected EthPeer {}", peer);
if (peer.getReputation().getScore() > USEFULL_PEER_SCORE_THRESHOLD) {
LOG.debug("Disonnected USEFULL peer {}", peer);
} else {
LOG.debug("Disconnected EthPeer {}", peer.getShortNodeId());
}
}
}
reattemptPendingPeerRequests();
Expand All @@ -220,16 +222,8 @@ private void abortPendingRequestsAssignedToDisconnectedPeers() {
}

public EthPeer peer(final PeerConnection connection) {
try {
return incompleteConnections.get(
connection, () -> completeConnections.get(connection.getPeer().getId()));
} catch (final ExecutionException e) {
throw new RuntimeException(e);
}
}

public EthPeer peer(final Bytes peerId) {
return completeConnections.get(peerId);
final EthPeer ethPeer = incompleteConnections.getIfPresent(connection);
return ethPeer != null ? ethPeer : completeConnections.get(connection.getPeer().getId());
}

public PendingPeerRequest executePeerRequest(
Expand Down Expand Up @@ -321,7 +315,9 @@ public Stream<EthPeer> streamAvailablePeers() {
}

public Stream<EthPeer> streamBestPeers() {
return streamAvailablePeers().sorted(getBestChainComparator().reversed());
return streamAvailablePeers()
.filter(EthPeer::isFullyValidated)
.sorted(getBestChainComparator().reversed());
}

public Optional<EthPeer> bestPeer() {
Expand Down Expand Up @@ -365,10 +361,10 @@ public Stream<PeerConnection> getAllConnections() {
}

public boolean shouldConnect(final Peer peer, final boolean inbound) {
if (peerCount() >= peerUpperBound) {
final Bytes id = peer.getId();
if (peerCount() >= peerUpperBound && !canExceedPeerLimits(id)) {
return false;
}
final Bytes id = peer.getId();
final EthPeer ethPeer = completeConnections.get(id);
if (ethPeer != null && !ethPeer.isDisconnected()) {
return false;
Expand Down Expand Up @@ -428,8 +424,8 @@ private void ethPeerStatusExchanged(final EthPeer peer) {
private int comparePeerPriorities(final EthPeer p1, final EthPeer p2) {
final PeerConnection a = p1.getConnection();
final PeerConnection b = p2.getConnection();
final boolean aCanExceedPeerLimits = canExceedPeerLimits(a);
final boolean bCanExceedPeerLimits = canExceedPeerLimits(b);
final boolean aCanExceedPeerLimits = canExceedPeerLimits(a.getPeer().getId());
final boolean bCanExceedPeerLimits = canExceedPeerLimits(b.getPeer().getId());
if (aCanExceedPeerLimits && !bCanExceedPeerLimits) {
return -1;
} else if (bCanExceedPeerLimits && !aCanExceedPeerLimits) {
Expand All @@ -441,11 +437,11 @@ private int comparePeerPriorities(final EthPeer p1, final EthPeer p2) {
}
}

private boolean canExceedPeerLimits(final PeerConnection a) {
private boolean canExceedPeerLimits(final Bytes peerId) {
if (rlpxAgent == null) {
return true;
return false;
}
return rlpxAgent.canExceedConnectionLimits(a.getPeer());
return rlpxAgent.canExceedConnectionLimits(peerId);
}

private int compareConnectionInitiationTimes(final PeerConnection a, final PeerConnection b) {
Expand All @@ -464,7 +460,7 @@ private void enforceRemoteConnectionLimits() {

getActivePrioritizedPeers()
.filter(p -> p.getConnection().inboundInitiated())
.filter(p -> !canExceedPeerLimits(p.getConnection()))
.filter(p -> !canExceedPeerLimits(p.getId()))
.skip(maxRemotelyInitiatedConnections)
.forEach(
conn -> {
Expand All @@ -488,10 +484,9 @@ private void enforceConnectionLimits() {
return;
}
getActivePrioritizedPeers()
.filter(p -> !p.isDisconnected())
.skip(peerUpperBound)
.map(EthPeer::getConnection)
.filter(c -> !canExceedPeerLimits(c))
.filter(c -> !canExceedPeerLimits(c.getPeer().getId()))
.forEach(
conn -> {
LOG.trace(
Expand All @@ -516,7 +511,7 @@ private long countUntrustedRemotelyInitiatedConnections() {
.map(ep -> ep.getConnection())
.filter(c -> c.inboundInitiated())
.filter(c -> !c.isDisconnected())
.filter(conn -> !canExceedPeerLimits(conn))
.filter(conn -> !canExceedPeerLimits(conn.getPeer().getId()))
.count();
}

Expand All @@ -542,7 +537,7 @@ private boolean addPeerToEthPeers(final EthPeer peer) {
final Bytes id = peer.getId();
if (!randomPeerPriority) {
// Disconnect if too many peers
if (!canExceedPeerLimits(connection) && peerCount() >= peerUpperBound) {
if (!canExceedPeerLimits(id) && peerCount() >= peerUpperBound) {
LOG.trace(
"Too many peers. Disconnect connection: {}, max connections {}",
connection,
Expand All @@ -552,7 +547,7 @@ private boolean addPeerToEthPeers(final EthPeer peer) {
}
// Disconnect if too many remotely-initiated connections
if (connection.inboundInitiated()
&& !canExceedPeerLimits(connection)
&& !canExceedPeerLimits(id)
&& remoteConnectionLimitReached()) {
LOG.trace(
"Too many remotely-initiated connections. Disconnect incoming connection: {}, maxRemote={}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private void refreshPeers() {
failedPeers.stream()
.filter(peer -> !peer.isDisconnected())
.findAny()
.or(() -> peers.streamAvailablePeers().sorted(peers.getBestChainComparator()).findFirst())
.or(() -> peers.streamAvailablePeers().min(peers.getBestChainComparator()))
.ifPresent(
peer -> {
LOG.atDebug()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void validatePeer_requestBlockFromPeerBeingTested() {

final PeerValidator validator = createValidator(blockNumber, 0);

final int peerCount = 1000;
final int peerCount = 24;
final List<RespondingEthPeer> otherPeers =
Stream.generate(
() -> EthProtocolManagerTestUtil.createPeer(ethProtocolManager, blockNumber))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/
package org.hyperledger.besu.ethereum.p2p.peers;

import org.apache.tuweni.bytes.Bytes;

public class DefaultPeerPrivileges implements PeerPrivileges {
private final MaintainedPeers maintainedPeers;

Expand All @@ -22,7 +24,7 @@ public DefaultPeerPrivileges(final MaintainedPeers maintainedPeers) {
}

@Override
public boolean canExceedConnectionLimits(final Peer peer) {
return maintainedPeers.contains(peer);
public boolean canExceedConnectionLimits(final Bytes peerId) {
return maintainedPeers.streamPeers().anyMatch(p -> p.getId().equals(peerId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@
*/
package org.hyperledger.besu.ethereum.p2p.peers;

import org.apache.tuweni.bytes.Bytes;

public interface PeerPrivileges {

/**
* If true, the given peer can connect or remain connected even if the max connection limit or the
* maximum remote connection limit has been reached or exceeded.
*
* @param peer The peer to be checked.
* @param peerId The peer id to be checked.
* @return {@code true} if the peer should be allowed to connect regardless of connection limits.
*/
boolean canExceedConnectionLimits(final Peer peer);
boolean canExceedConnectionLimits(final Bytes peerId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,8 @@ private CompletableFuture<PeerConnection> initiateOutboundConnection(final Peer
});
}

public boolean canExceedConnectionLimits(final Peer peer) {
return peerPrivileges.canExceedConnectionLimits(peer);
public boolean canExceedConnectionLimits(final Bytes peerId) {
return peerPrivileges.canExceedConnectionLimits(peerId);
}

private void handleIncomingConnection(final PeerConnection peerConnection) {
Expand Down

0 comments on commit 2cf61d6

Please sign in to comment.