diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryAgent.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryAgent.java index 6731f9d971..cb4b565501 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryAgent.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryAgent.java @@ -26,6 +26,7 @@ import tech.pegasys.pantheon.ethereum.p2p.discovery.PeerDiscoveryEvent.PeerDroppedEvent; import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.Packet; import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerDiscoveryController; +import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerDiscoveryController.AsyncExecutor; import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerRequirement; import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerTable; import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PingPacketData; @@ -115,6 +116,8 @@ public PeerDiscoveryAgent( protected abstract TimerUtil createTimer(); + protected abstract AsyncExecutor createWorkerExecutor(); + protected abstract CompletableFuture listenForConnections(); protected abstract CompletableFuture sendOutgoingPacket( @@ -162,6 +165,7 @@ private PeerDiscoveryController createController() { bootstrapPeers, this::handleOutgoingPacket, createTimer(), + createWorkerExecutor(), PEER_REFRESH_INTERVAL_MS, peerRequirement, peerBlacklist, diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/VertxPeerDiscoveryAgent.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/VertxPeerDiscoveryAgent.java index f9574c6d4d..3fd7e6caf4 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/VertxPeerDiscoveryAgent.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/VertxPeerDiscoveryAgent.java @@ -18,6 +18,7 @@ import tech.pegasys.pantheon.ethereum.p2p.config.DiscoveryConfiguration; import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.Packet; import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerDiscoveryController; +import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerDiscoveryController.AsyncExecutor; import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerRequirement; import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.TimerUtil; import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.VertxTimerUtil; @@ -34,6 +35,7 @@ import java.util.Optional; import java.util.OptionalInt; import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; import io.vertx.core.AsyncResult; import io.vertx.core.Vertx; @@ -67,6 +69,11 @@ protected TimerUtil createTimer() { return new VertxTimerUtil(vertx); } + @Override + protected AsyncExecutor createWorkerExecutor() { + return new VertxAsyncExecutor(); + } + @Override protected CompletableFuture listenForConnections() { CompletableFuture future = new CompletableFuture<>(); @@ -194,4 +201,29 @@ private void handlePacket(final DatagramPacket datagram) { LOG.error("Encountered error while handling packet", t); } } + + private class VertxAsyncExecutor implements AsyncExecutor { + + @Override + public CompletableFuture execute(final Supplier action) { + final CompletableFuture result = new CompletableFuture<>(); + vertx.executeBlocking( + future -> { + try { + future.complete(action.get()); + } catch (final Throwable t) { + future.fail(t); + } + }, + false, + event -> { + if (event.succeeded()) { + result.complete(event.result()); + } else { + result.completeExceptionally(event.cause()); + } + }); + return result; + } + } } diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryController.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryController.java index 0a54b12a0f..86bbd60daf 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryController.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryController.java @@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; @@ -119,6 +120,7 @@ public class PeerDiscoveryController { private RetryDelayFunction retryDelayFunction = RetryDelayFunction.linear(1.5, 2000, 60000); + private final AsyncExecutor workerExecutor; private final long tableRefreshIntervalMs; private final PeerRequirement peerRequirement; @@ -140,6 +142,7 @@ public PeerDiscoveryController( final Collection bootstrapNodes, final OutboundMessageHandler outboundMessageHandler, final TimerUtil timerUtil, + final AsyncExecutor workerExecutor, final long tableRefreshIntervalMs, final PeerRequirement peerRequirement, final PeerBlacklist peerBlacklist, @@ -151,6 +154,7 @@ public PeerDiscoveryController( this.localPeer = localPeer; this.bootstrapNodes = bootstrapNodes; this.peerTable = peerTable; + this.workerExecutor = workerExecutor; this.tableRefreshIntervalMs = tableRefreshIntervalMs; this.peerRequirement = peerRequirement; this.peerBlacklist = peerBlacklist; @@ -391,19 +395,23 @@ void bond(final DiscoveryPeer peer) { interaction -> { final PingPacketData data = PingPacketData.create(localPeer.getEndpoint(), peer.getEndpoint()); - final Packet pingPacket = createPacket(PacketType.PING, data); - - final BytesValue pingHash = pingPacket.getHash(); - // Update the matching filter to only accept the PONG if it echoes the hash of our PING. - final Predicate newFilter = - packet -> - packet - .getPacketData(PongPacketData.class) - .map(pong -> pong.getPingHash().equals(pingHash)) - .orElse(false); - interaction.updateFilter(newFilter); - - sendPacket(peer, pingPacket); + createPacket( + PacketType.PING, + data, + pingPacket -> { + final BytesValue pingHash = pingPacket.getHash(); + // Update the matching filter to only accept the PONG if it echoes the hash of our + // PING. + final Predicate newFilter = + packet -> + packet + .getPacketData(PongPacketData.class) + .map(pong -> pong.getPingHash().equals(pingHash)) + .orElse(false); + interaction.updateFilter(newFilter); + + sendPacket(peer, pingPacket); + }); }; // The filter condition will be updated as soon as the action is performed. @@ -413,9 +421,13 @@ void bond(final DiscoveryPeer peer) { } private void sendPacket(final DiscoveryPeer peer, final PacketType type, final PacketData data) { - Packet packet = createPacket(type, data); - logSendingPacket(peer, packet); - outboundMessageHandler.send(peer, packet); + createPacket( + type, + data, + packet -> { + logSendingPacket(peer, packet); + outboundMessageHandler.send(peer, packet); + }); } private void sendPacket(final DiscoveryPeer peer, final Packet packet) { @@ -424,8 +436,17 @@ private void sendPacket(final DiscoveryPeer peer, final Packet packet) { } @VisibleForTesting - Packet createPacket(final PacketType type, final PacketData data) { - return Packet.create(type, data, keypair); + void createPacket(final PacketType type, final PacketData data, final Consumer handler) { + // Creating packets is quite expensive because they have to be cryptographically signed + // So ensure the work is done on a worker thread to avoid blocking the vertx event thread. + workerExecutor + .execute(() -> Packet.create(type, data, keypair)) + .thenAccept(handler) + .exceptionally( + error -> { + LOG.error("Error while creating packet", error); + return null; + }); } /** @@ -563,4 +584,8 @@ void cancelTimers() { timerId.ifPresent(timerUtil::cancelTimer); } } + + public interface AsyncExecutor { + CompletableFuture execute(Supplier action); + } } diff --git a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryTimestampsTest.java b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryTimestampsTest.java index fd65d3e84f..7292be1bfc 100644 --- a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryTimestampsTest.java +++ b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/PeerDiscoveryTimestampsTest.java @@ -17,6 +17,7 @@ import static org.mockito.Mockito.when; import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair; +import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.BlockingAsyncExecutor; import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.MockPeerDiscoveryAgent; import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.MockTimerUtil; import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.OutboundMessageHandler; @@ -58,6 +59,7 @@ public void lastSeenAndFirstDiscoveredTimestampsUpdatedOnMessage() { Collections.emptyList(), OutboundMessageHandler.NOOP, new MockTimerUtil(), + new BlockingAsyncExecutor(), TimeUnit.HOURS.toMillis(1), () -> true, new PeerBlacklist(), diff --git a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/BlockingAsyncExecutor.java b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/BlockingAsyncExecutor.java new file mode 100644 index 0000000000..9b77590aef --- /dev/null +++ b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/BlockingAsyncExecutor.java @@ -0,0 +1,26 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.p2p.discovery.internal; + +import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerDiscoveryController.AsyncExecutor; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; + +public class BlockingAsyncExecutor implements AsyncExecutor { + + @Override + public CompletableFuture execute(final Supplier action) { + return CompletableFuture.completedFuture(action.get()); + } +} diff --git a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/MockPeerDiscoveryAgent.java b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/MockPeerDiscoveryAgent.java index 79907f91aa..15d19e2f41 100644 --- a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/MockPeerDiscoveryAgent.java +++ b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/MockPeerDiscoveryAgent.java @@ -16,6 +16,7 @@ import tech.pegasys.pantheon.ethereum.p2p.config.DiscoveryConfiguration; import tech.pegasys.pantheon.ethereum.p2p.discovery.DiscoveryPeer; import tech.pegasys.pantheon.ethereum.p2p.discovery.PeerDiscoveryAgent; +import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerDiscoveryController.AsyncExecutor; import tech.pegasys.pantheon.ethereum.p2p.peers.PeerBlacklist; import tech.pegasys.pantheon.ethereum.permissioning.NodeWhitelistController; import tech.pegasys.pantheon.util.bytes.BytesValue; @@ -93,6 +94,11 @@ protected TimerUtil createTimer() { return new MockTimerUtil(); } + @Override + protected AsyncExecutor createWorkerExecutor() { + return new BlockingAsyncExecutor(); + } + @Override public CompletableFuture stop() { return CompletableFuture.completedFuture(null); diff --git a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java index 1d47042576..97307f012b 100644 --- a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java +++ b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryControllerTest.java @@ -19,6 +19,7 @@ import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -123,7 +124,7 @@ public void bootstrapPeersRetriesSent() { final PingPacketData mockPing = PingPacketData.create(localPeer.getEndpoint(), peers.get(0).getEndpoint()); final Packet mockPacket = Packet.create(PacketType.PING, mockPing, keyPairs.get(0)); - doReturn(mockPacket).when(controller).createPacket(eq(PacketType.PING), any()); + mockPingPacketCreation(mockPacket); controller.start(); @@ -152,6 +153,27 @@ public void bootstrapPeersRetriesSent() { .forEach(p -> assertThat(p.getStatus()).isEqualTo(PeerDiscoveryStatus.BONDING)); } + private void mockPingPacketCreation(final Packet mockPacket) { + mockPacketCreation(PacketType.PING, Optional.empty(), mockPacket); + } + + private void mockPacketCreation( + final PacketType type, final DiscoveryPeer peer, final Packet mockPacket) { + mockPacketCreation(type, Optional.of(peer), mockPacket); + } + + private void mockPacketCreation( + final PacketType type, final Optional peer, final Packet mockPacket) { + doAnswer( + invocation -> { + final Consumer handler = invocation.getArgument(2); + handler.accept(mockPacket); + return null; + }) + .when(controller) + .createPacket(eq(type), peer.isPresent() ? matchPingDataForPeer(peer.get()) : any(), any()); + } + @Test public void bootstrapPeersRetriesStoppedUponResponse() { // Create peers. @@ -172,7 +194,7 @@ public void bootstrapPeersRetriesStoppedUponResponse() { final PingPacketData mockPing = PingPacketData.create(localPeer.getEndpoint(), peers.get(0).getEndpoint()); final Packet mockPacket = Packet.create(PacketType.PING, mockPing, keyPairs.get(0)); - doReturn(mockPacket).when(controller).createPacket(eq(PacketType.PING), any()); + mockPingPacketCreation(mockPacket); controller.start(); @@ -226,7 +248,7 @@ public void shouldStopRetryingInteractionWhenLimitIsReached() { final PingPacketData mockPing = PingPacketData.create(localPeer.getEndpoint(), peers.get(0).getEndpoint()); final Packet mockPacket = Packet.create(PacketType.PING, mockPing, keyPairs.get(0)); - doReturn(mockPacket).when(controller).createPacket(eq(PacketType.PING), any()); + mockPingPacketCreation(mockPacket); controller.start(); @@ -261,7 +283,7 @@ public void bootstrapPeersPongReceived_HashMatched() { final PingPacketData mockPing = PingPacketData.create(localPeer.getEndpoint(), peers.get(0).getEndpoint()); final Packet mockPacket = Packet.create(PacketType.PING, mockPing, keyPairs.get(0)); - doReturn(mockPacket).when(controller).createPacket(eq(PacketType.PING), any()); + mockPingPacketCreation(mockPacket); controller.start(); @@ -317,7 +339,7 @@ public void bootstrapPeersPongReceived_HashUnmatched() { final PingPacketData mockPing = PingPacketData.create(localPeer.getEndpoint(), peers.get(0).getEndpoint()); final Packet mockPacket = Packet.create(PacketType.PING, mockPing, keyPairs.get(0)); - doReturn(mockPacket).when(controller).createPacket(eq(PacketType.PING), any()); + mockPingPacketCreation(mockPacket); controller.start(); @@ -364,7 +386,7 @@ public void findNeighborsSentAfterBondingFinished() { final PingPacketData mockPing = PingPacketData.create(localPeer.getEndpoint(), peers.get(0).getEndpoint()); final Packet mockPacket = Packet.create(PacketType.PING, mockPing, keyPairs.get(0)); - doReturn(mockPacket).when(controller).createPacket(eq(PacketType.PING), any()); + mockPingPacketCreation(mockPacket); controller.setRetryDelayFunction((prev) -> 999999999L); controller.start(); @@ -429,7 +451,7 @@ public void peerSeenTwice() throws InterruptedException { PingPacketData.create(localPeer.getEndpoint(), peers.get(0).getEndpoint()); final Packet pingPacket = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0)); - doReturn(pingPacket).when(controller).createPacket(eq(PacketType.PING), any()); + mockPingPacketCreation(pingPacket); controller.setRetryDelayFunction((prev) -> 999999999L); controller.start(); @@ -589,9 +611,7 @@ public void shouldNotAddNewPeerWhenReceivedPongFromBlacklistedPeer() { List keyPairs = PeerDiscoveryTestHelper.generateKeyPairs(1); PingPacketData pingPacketData = PingPacketData.create(localEndpoint, discoPeer.getEndpoint()); final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0)); - doReturn(discoPeerPing) - .when(controller) - .createPacket(eq(PacketType.PING), matchPingDataForPeer(discoPeer)); + mockPacketCreation(PacketType.PING, discoPeer, discoPeerPing); controller.start(); verify(outboundMessageHandler, times(1)) @@ -608,17 +628,13 @@ public void shouldNotAddNewPeerWhenReceivedPongFromBlacklistedPeer() { keyPairs = PeerDiscoveryTestHelper.generateKeyPairs(1); pingPacketData = PingPacketData.create(localEndpoint, otherPeer.getEndpoint()); final Packet pingPacket = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0)); - doReturn(pingPacket) - .when(controller) - .createPacket(eq(PacketType.PING), matchPingDataForPeer(otherPeer)); + mockPacketCreation(PacketType.PING, otherPeer, pingPacket); // Setup ping to be sent to otherPeer2 after neighbors packet is received keyPairs = PeerDiscoveryTestHelper.generateKeyPairs(1); pingPacketData = PingPacketData.create(localEndpoint, otherPeer2.getEndpoint()); final Packet pingPacket2 = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0)); - doReturn(pingPacket2) - .when(controller) - .createPacket(eq(PacketType.PING), matchPingDataForPeer(otherPeer2)); + mockPacketCreation(PacketType.PING, otherPeer2, pingPacket2); final Packet neighborsPacket = MockPacketDataFactory.mockNeighborsPacket(discoPeer, otherPeer, otherPeer2); @@ -673,9 +689,7 @@ public void shouldNotBondWithBlacklistedPeer() { List keyPairs = PeerDiscoveryTestHelper.generateKeyPairs(1); PingPacketData pingPacketData = PingPacketData.create(localEndpoint, discoPeer.getEndpoint()); final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0)); - doReturn(discoPeerPing) - .when(controller) - .createPacket(eq(PacketType.PING), matchPingDataForPeer(discoPeer)); + mockPacketCreation(PacketType.PING, discoPeer, discoPeerPing); controller.start(); verify(outboundMessageHandler, times(1)).send(any(), matchPacketOfType(PacketType.PING)); @@ -691,17 +705,13 @@ public void shouldNotBondWithBlacklistedPeer() { keyPairs = PeerDiscoveryTestHelper.generateKeyPairs(1); pingPacketData = PingPacketData.create(localEndpoint, otherPeer.getEndpoint()); final Packet pingPacket = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0)); - doReturn(pingPacket) - .when(controller) - .createPacket(eq(PacketType.PING), matchPingDataForPeer(otherPeer)); + mockPacketCreation(PacketType.PING, otherPeer, pingPacket); // Setup ping to be sent to otherPeer2 after neighbors packet is received keyPairs = PeerDiscoveryTestHelper.generateKeyPairs(1); pingPacketData = PingPacketData.create(localEndpoint, otherPeer2.getEndpoint()); final Packet pingPacket2 = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0)); - doReturn(pingPacket2) - .when(controller) - .createPacket(eq(PacketType.PING), matchPingDataForPeer(otherPeer2)); + mockPacketCreation(PacketType.PING, otherPeer2, pingPacket2); // Blacklist peer blacklist.add(otherPeer); @@ -737,9 +747,7 @@ public void shouldRespondToNeighborsRequestFromKnownPeer() final PingPacketData pingPacketData = PingPacketData.create(localEndpoint, discoPeer.getEndpoint()); final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0)); - doReturn(discoPeerPing) - .when(controller) - .createPacket(eq(PacketType.PING), matchPingDataForPeer(discoPeer)); + mockPacketCreation(PacketType.PING, discoPeer, discoPeerPing); controller.start(); verify(outboundMessageHandler, times(1)).send(any(), matchPacketOfType(PacketType.PING)); @@ -782,9 +790,7 @@ public void shouldNotRespondToNeighborsRequestFromUnknownPeer() final PingPacketData pingPacketData = PingPacketData.create(localEndpoint, discoPeer.getEndpoint()); final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0)); - doReturn(discoPeerPing) - .when(controller) - .createPacket(eq(PacketType.PING), matchPingDataForPeer(discoPeer)); + mockPacketCreation(PacketType.PING, discoPeer, discoPeerPing); controller.start(); verify(outboundMessageHandler, times(1)).send(any(), matchPacketOfType(PacketType.PING)); @@ -825,9 +831,7 @@ public void shouldNotRespondToNeighborsRequestFromBlacklistedPeer() { final PingPacketData pingPacketData = PingPacketData.create(localEndpoint, discoPeer.getEndpoint()); final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0)); - doReturn(discoPeerPing) - .when(controller) - .createPacket(eq(PacketType.PING), matchPingDataForPeer(discoPeer)); + mockPacketCreation(PacketType.PING, discoPeer, discoPeerPing); controller.start(); verify(outboundMessageHandler, times(1)).send(any(), matchPacketOfType(PacketType.PING)); @@ -863,7 +867,7 @@ public void shouldAddNewPeerWhenReceivedPongAndPeerTableBucketIsNotFull() { .peers(peers.get(0)) .outboundMessageHandler(outboundMessageHandler) .build(); - doReturn(pingPacket).when(controller).createPacket(eq(PacketType.PING), any()); + mockPingPacketCreation(pingPacket); controller.setRetryDelayFunction((prev) -> 999999999L); controller.start(); @@ -895,7 +899,7 @@ public void shouldAddNewPeerWhenReceivedPongAndPeerTableBucketIsFull() { final PingPacketData pingPacketData = PingPacketData.create(localPeer.getEndpoint(), peers.get(0).getEndpoint()); final Packet pingPacket = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0)); - doReturn(pingPacket).when(controller).createPacket(eq(PacketType.PING), any()); + mockPingPacketCreation(pingPacket); controller.start(); @@ -961,7 +965,7 @@ public void shouldNotAddPeerInNeighborsPacketWithoutBonding() { .peers(peers.get(0)) .outboundMessageHandler(outboundMessageHandler) .build(); - doReturn(pingPacket).when(controller).createPacket(eq(PacketType.PING), any()); + mockPingPacketCreation(pingPacket); controller.start(); verify(outboundMessageHandler, times(1)) @@ -1009,9 +1013,7 @@ public void shouldNotBondWithNonWhitelistedPeer() throws IOException { List keyPairs = PeerDiscoveryTestHelper.generateKeyPairs(1); PingPacketData pingPacketData = PingPacketData.create(localEndpoint, discoPeer.getEndpoint()); final Packet discoPeerPing = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0)); - doReturn(discoPeerPing) - .when(controller) - .createPacket(eq(PacketType.PING), matchPingDataForPeer(discoPeer)); + mockPacketCreation(PacketType.PING, discoPeer, discoPeerPing); controller.start(); verify(outboundMessageHandler, times(1)).send(any(), matchPacketOfType(PacketType.PING)); @@ -1027,17 +1029,13 @@ public void shouldNotBondWithNonWhitelistedPeer() throws IOException { keyPairs = PeerDiscoveryTestHelper.generateKeyPairs(1); pingPacketData = PingPacketData.create(localEndpoint, otherPeer.getEndpoint()); final Packet pingPacket = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0)); - doReturn(pingPacket) - .when(controller) - .createPacket(eq(PacketType.PING), matchPingDataForPeer(otherPeer)); + mockPacketCreation(PacketType.PING, otherPeer, pingPacket); // Setup ping to be sent to otherPeer2 after neighbors packet is received keyPairs = PeerDiscoveryTestHelper.generateKeyPairs(1); pingPacketData = PingPacketData.create(localEndpoint, otherPeer2.getEndpoint()); final Packet pingPacket2 = Packet.create(PacketType.PING, pingPacketData, keyPairs.get(0)); - doReturn(pingPacket2) - .when(controller) - .createPacket(eq(PacketType.PING), matchPingDataForPeer(otherPeer2)); + mockPacketCreation(PacketType.PING, otherPeer2, pingPacket2); final Packet neighborsPacket = MockPacketDataFactory.mockNeighborsPacket(discoPeer, otherPeer, otherPeer2); @@ -1308,6 +1306,7 @@ PeerDiscoveryController build() { discoPeers, outboundMessageHandler, timerUtil, + new BlockingAsyncExecutor(), TABLE_REFRESH_INTERVAL_MS, PEER_REQUIREMENT, blacklist, diff --git a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryTableRefreshTest.java b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryTableRefreshTest.java index c5c9794dab..06ab99c64e 100644 --- a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryTableRefreshTest.java +++ b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/PeerDiscoveryTableRefreshTest.java @@ -60,6 +60,7 @@ public void tableRefreshSingleNode() { emptyList(), outboundMessageHandler, timer, + new BlockingAsyncExecutor(), 0, () -> true, new PeerBlacklist(),