Skip to content

Commit

Permalink
revert a couple of commits
Browse files Browse the repository at this point in the history
Signed-off-by: Stefan <stefan.pingel@consensys.net>
  • Loading branch information
pinges committed Jun 16, 2022
1 parent 36cda04 commit 1b5f84e
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ protected boolean removeEldestEntry(final Map.Entry<Hash, Boolean> eldest) {
private final String protocolName;
private final Clock clock;
private final List<NodeMessagePermissioningProvider> permissioningProviders;
private ChainState chainHeadState = new ChainState();
private final ChainState chainHeadState = new ChainState();
private final AtomicBoolean statusHasBeenSentToPeer = new AtomicBoolean(false);
private final AtomicBoolean statusHasBeenReceivedFromPeer = new AtomicBoolean(false);
private final AtomicBoolean fullyValidated = new AtomicBoolean(false);
Expand Down Expand Up @@ -588,10 +588,6 @@ public Optional<BlockHeader> getCheckpointHeader() {
return checkpointHeader;
}

public void setChainState(final ChainState chainState) {
this.chainHeadState = chainState;
}

@FunctionalInterface
public interface DisconnectCallback {
void onDisconnect(EthPeer peer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.hyperledger.besu.ethereum.eth.manager.EthPeer.DisconnectCallback;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.permissioning.NodeMessagePermissioningProvider;
Expand All @@ -37,7 +36,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -72,8 +70,7 @@ public class EthPeers {
private final Subscribers<ConnectCallback> connectCallbacks = Subscribers.create();
private final Subscribers<DisconnectCallback> disconnectCallbacks = Subscribers.create();
private final Collection<PendingPeerRequest> pendingRequests = new CopyOnWriteArrayList<>();
private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(); // We might want to schedule using the ctx
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
;

public EthPeers(
Expand Down Expand Up @@ -118,8 +115,8 @@ public EthPeer preStatusExchangedConnection(
callWhenStatusesExchanged(peerConnection, peerToAdd, throwable);
});
preStatusExchangedPeers.put(peerConnection, peer);
// if the status messages has not been received within 20s remove the entry
scheduler.schedule(() -> preStatusExchangedPeers.remove(peerConnection), 20, TimeUnit.SECONDS);
// if the status messages has not been received within 30s remove the entry
scheduler.schedule(() -> preStatusExchangedPeers.remove(peerConnection), 30, TimeUnit.SECONDS);

return peer;
}
Expand All @@ -135,36 +132,22 @@ private void callWhenStatusesExchanged(
"Adding peer {} with connection {}",
peerConnection.getPeer().getId(),
System.identityHashCode(peerConnection));
final AtomicReference<ChainState> chainStateFromPrevPeer = new AtomicReference<>();
connections.compute(
peerConnection.getPeer().getId(),
(id, prevPeer) -> {
if (prevPeer != null) {
previouslyUsedPeers.put(peerConnection, prevPeer);
chainStateFromPrevPeer.set(prevPeer.chainState());
// TODO: When moving a previous eth peer out of the connections map we
// might have to copy validationStatus and/or chainHeadState or similar
// to the new member
// remove this entry after 30s. We have to keep it for a bit to make sure that
// remove this entry after 40s. We have to keep it for a bit to make sure that
// requests we might have made with
// this peer can be used. Makes sure that we do not flag these messages as unsolicited
// messages.
scheduler.schedule(
() -> {
previouslyUsedPeers.remove(peerConnection);
peerConnection.disconnect(DisconnectMessage.DisconnectReason.ALREADY_CONNECTED);
},
30,
TimeUnit.SECONDS);
() -> previouslyUsedPeers.remove(peerConnection), 40, TimeUnit.SECONDS);
}
return peerToAdd;
});
final ChainState chainState = chainStateFromPrevPeer.get();
if (chainState != null) {
peerToAdd.setChainState(chainState);
}
}
preStatusExchangedPeers.remove(peerConnection);
preStatusExchangedPeers.remove(peerConnection.getPeer().getId());
}

public void registerDisconnect(final PeerConnection connection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ protected void disconnectPeer(final EthPeer ethPeer) {
}

protected void scheduleNextCheck(final EthPeer ethPeer) {
final Duration timeout = peerValidator.nextValidationCheckTimeout(ethPeer);
Duration timeout = peerValidator.nextValidationCheckTimeout(ethPeer);
ethContext.getScheduler().scheduleFutureTask(() -> checkPeer(ethPeer), timeout);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@ public class SyncTarget {
private final BlockHeader commonAncestor;

public SyncTarget(final EthPeer peer, final BlockHeader commonAncestor) {
this.peer =
peer; // TODO we might want to change that to peer id, so if the current target EthPeer is
// replaced by one with an EthPeer with a different connection with the same peer we
// can continue syncing against that peer
this.peer = peer;
this.commonAncestor = commonAncestor;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.google.common.annotations.VisibleForTesting;
import org.apache.tuweni.bytes.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -430,48 +429,27 @@ && getConnectionCount() >= maxConnections) {
dispatchConnect(peerConnection);
}

@VisibleForTesting
public boolean callOnConnectionReady(
private boolean callOnConnectionReady(
final PeerConnection peerConnection, final boolean isInboundConnection) {

// Track this new connection, deduplicating existing connection if necessary
final AtomicBoolean newConnectionAccepted = new AtomicBoolean(false);
final Peer peer = peerConnection.getPeer();
final RlpxConnection newConnection =
isInboundConnection
? RlpxConnection.inboundConnection(peerConnection)
: RlpxConnection.outboundConnection(
peer, CompletableFuture.completedFuture(peerConnection));
peerConnection.getPeer(), CompletableFuture.completedFuture(peerConnection));
// Our disconnect handler runs connectionsById.compute(), so don't actually execute the
// disconnect command until we've returned from our compute() calculation
// Disconnect if too many peers

if (!randomPeerPriority) {
if (!peerPrivileges.canExceedConnectionLimits(peer)
&& getConnectionCount() >= maxConnections) {
LOG.debug("Too many peers. Disconnect incoming connection: {}", peerConnection);
peerConnection.disconnect(DisconnectReason.TOO_MANY_PEERS);
return false;
}
// Disconnect if too many remotely-initiated connections
if (!peerPrivileges.canExceedConnectionLimits(peer) && remoteConnectionLimitReached()) {
LOG.debug(
"Too many remotely-initiated connections. Disconnect incoming connection: {}",
peerConnection);
peerConnection.disconnect(DisconnectReason.TOO_MANY_PEERS);
return false;
}
}

final AtomicReference<Runnable> disconnectAction = new AtomicReference<>();
connectionsById.compute(
peer.getId(),
peerConnection.getPeer().getId(),
(nodeId, existingConnection) -> {
if (existingConnection == null) {
// The new connection is unique, set it and return
LOG.info(
"Ready connection established with {}, connection {}",
peer.getId(),
peerConnection.getPeer().getId(),
System.identityHashCode(newConnection.getPeerConnection()));
newConnectionAccepted.set(true);
return newConnection;
Expand All @@ -483,7 +461,7 @@ && getConnectionCount() >= maxConnections) {
"Duplicate connection detected, disconnecting existing connection {} in favor of connection {} for peer: {}",
System.identityHashCode(existingConnection.getPeerConnection()),
System.identityHashCode(newConnection.getPeerConnection()),
peer.getId());
peerConnection.getPeer().getId());
disconnectAction.set(
() -> existingConnection.disconnect(DisconnectReason.ALREADY_CONNECTED));
newConnectionAccepted.set(true);
Expand All @@ -494,7 +472,7 @@ && getConnectionCount() >= maxConnections) {
"Duplicate connection detected, disconnecting connection {} in favor of connection {} for peer: {}",
System.identityHashCode(newConnection.getPeerConnection()),
System.identityHashCode(existingConnection.getPeerConnection()),
peer.getId());
peerConnection.getPeer().getId());
disconnectAction.set(
() -> newConnection.disconnect(DisconnectReason.ALREADY_CONNECTED));
return existingConnection;
Expand All @@ -519,7 +497,7 @@ private boolean shouldLimitRemoteConnections() {

private boolean remoteConnectionLimitReached() {
return shouldLimitRemoteConnections()
&& countUntrustedRemotelyInitiatedConnections() > maxRemotelyInitiatedConnections;
&& countUntrustedRemotelyInitiatedConnections() >= maxRemotelyInitiatedConnections;
}

private long countUntrustedRemotelyInitiatedConnections() {
Expand Down Expand Up @@ -647,9 +625,7 @@ public void subscribeDisconnect(final DisconnectCallback callback) {
}

private void dispatchConnect(final PeerConnection connection) {
connectedPeersCounter
.inc(); // TODO: this probably has to happen in the onReady callback, when we are adding the
// connection to the hashmap
connectedPeersCounter.inc();
connectSubscribers.forEach(c -> c.onConnect(connection));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ public void preventMultipleConnections() throws Exception {
final NodeKey listenNodeKey = NodeKeyUtils.generate();
try (final P2PNetwork listener = builder().nodeKey(listenNodeKey).build();
final P2PNetwork connector = builder().build()) {
connector.subscribeConnect((conn) -> conn.onPeerConnectionReady());

listener.start();
connector.start();
Expand All @@ -106,11 +105,14 @@ public void preventMultipleConnections() throws Exception {
connector.connect(createPeer(listenId, listenPort));
final CompletableFuture<PeerConnection> secondFuture =
connector.connect(createPeer(listenId, listenPort));
secondFuture.whenComplete(
(c, e) -> Assertions.assertThat(e.getMessage()).isEqualTo("Already trying to connect"));

final PeerConnection firstConnection = firstFuture.get(30L, TimeUnit.SECONDS);
final PeerConnection secondConnection = secondFuture.get(30L, TimeUnit.SECONDS);
Assertions.assertThat(firstConnection.getPeerInfo().getNodeId()).isEqualTo(listenId);

// Connections should reference the same instance - i.e. we shouldn't create 2 distinct
// connections
assertThat(firstConnection == secondConnection).isTrue();
}
}

Expand All @@ -135,7 +137,6 @@ public void limitMaxPeers() throws Exception {
try (final P2PNetwork listener = builder().nodeKey(nodeKey).config(listenerConfig).build();
final P2PNetwork connector1 = builder().build();
final P2PNetwork connector2 = builder().build()) {
listener.subscribeConnect((conn) -> conn.onPeerConnectionReady());

// Setup listener and first connection
listener.start();
Expand Down Expand Up @@ -169,7 +170,7 @@ public void limitMaxPeers() throws Exception {
.getPeerInfo()
.getNodeId())
.isEqualTo(listenId);
assertThat(peerFuture.get(30L, TimeUnit.SECONDS).getPeerInfo().getNodeId())
Assertions.assertThat(peerFuture.get(30L, TimeUnit.SECONDS).getPeerInfo().getNodeId())
.isEqualTo(listenId);
assertThat(reasonFuture.get(30L, TimeUnit.SECONDS))
.isEqualByComparingTo(DisconnectReason.TOO_MANY_PEERS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,14 @@ public void preventMultipleConnections() throws Exception {
connector.connect(createPeer(listenId, listenPort));
final CompletableFuture<PeerConnection> secondFuture =
connector.connect(createPeer(listenId, listenPort));
secondFuture.whenComplete(
(c, e) -> Assertions.assertThat(e.getMessage()).isEqualTo("Already trying to connect"));

final PeerConnection firstConnection = firstFuture.get(30L, TimeUnit.SECONDS);
final PeerConnection secondConnection = secondFuture.get(30L, TimeUnit.SECONDS);
Assertions.assertThat(firstConnection.getPeerInfo().getNodeId()).isEqualTo(listenId);

// Connections should reference the same instance - i.e. we shouldn't create 2 distinct
// connections
assertThat(firstConnection == secondConnection).isTrue();
}
}

Expand All @@ -146,7 +149,6 @@ public void limitMaxPeers() throws Exception {
builder("partner1client1").nodeKey(nodeKey).config(listenerConfig).build();
final P2PNetwork connector1 = builder("partner1client1").build();
final P2PNetwork connector2 = builder("partner2client1").build()) {
listener.subscribeConnect((conn) -> conn.onPeerConnectionReady());

// Setup listener and first connection
listener.start();
Expand Down

0 comments on commit 1b5f84e

Please sign in to comment.