Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Create peer discovery packets on a worker thread #955

Merged
merged 3 commits into from
Feb 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import tech.pegasys.pantheon.ethereum.p2p.discovery.PeerDiscoveryEvent.PeerDroppedEvent;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.Packet;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerDiscoveryController;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerDiscoveryController.AsyncExecutor;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerRequirement;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerTable;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PingPacketData;
Expand Down Expand Up @@ -115,6 +116,8 @@ public PeerDiscoveryAgent(

protected abstract TimerUtil createTimer();

protected abstract AsyncExecutor createWorkerExecutor();

protected abstract CompletableFuture<InetSocketAddress> listenForConnections();

protected abstract CompletableFuture<Void> sendOutgoingPacket(
Expand Down Expand Up @@ -162,6 +165,7 @@ private PeerDiscoveryController createController() {
bootstrapPeers,
this::handleOutgoingPacket,
createTimer(),
createWorkerExecutor(),
PEER_REFRESH_INTERVAL_MS,
peerRequirement,
peerBlacklist,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import tech.pegasys.pantheon.ethereum.p2p.config.DiscoveryConfiguration;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.Packet;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerDiscoveryController;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerDiscoveryController.AsyncExecutor;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerRequirement;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.TimerUtil;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.VertxTimerUtil;
Expand All @@ -34,6 +35,7 @@
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import io.vertx.core.AsyncResult;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -67,6 +69,11 @@ protected TimerUtil createTimer() {
return new VertxTimerUtil(vertx);
}

@Override
protected AsyncExecutor createWorkerExecutor() {
return new VertxAsyncExecutor();
}

@Override
protected CompletableFuture<InetSocketAddress> listenForConnections() {
CompletableFuture<InetSocketAddress> future = new CompletableFuture<>();
Expand Down Expand Up @@ -194,4 +201,29 @@ private void handlePacket(final DatagramPacket datagram) {
LOG.error("Encountered error while handling packet", t);
}
}

private class VertxAsyncExecutor implements AsyncExecutor {

@Override
public <T> CompletableFuture<T> execute(final Supplier<T> action) {
final CompletableFuture<T> result = new CompletableFuture<>();
vertx.<T>executeBlocking(
future -> {
try {
future.complete(action.get());
} catch (final Throwable t) {
future.fail(t);
}
},
false,
event -> {
if (event.succeeded()) {
result.complete(event.result());
} else {
result.completeExceptionally(event.cause());
}
});
return result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -119,6 +120,7 @@ public class PeerDiscoveryController {

private RetryDelayFunction retryDelayFunction = RetryDelayFunction.linear(1.5, 2000, 60000);

private final AsyncExecutor workerExecutor;
private final long tableRefreshIntervalMs;

private final PeerRequirement peerRequirement;
Expand All @@ -140,6 +142,7 @@ public PeerDiscoveryController(
final Collection<DiscoveryPeer> bootstrapNodes,
final OutboundMessageHandler outboundMessageHandler,
final TimerUtil timerUtil,
final AsyncExecutor workerExecutor,
final long tableRefreshIntervalMs,
final PeerRequirement peerRequirement,
final PeerBlacklist peerBlacklist,
Expand All @@ -151,6 +154,7 @@ public PeerDiscoveryController(
this.localPeer = localPeer;
this.bootstrapNodes = bootstrapNodes;
this.peerTable = peerTable;
this.workerExecutor = workerExecutor;
this.tableRefreshIntervalMs = tableRefreshIntervalMs;
this.peerRequirement = peerRequirement;
this.peerBlacklist = peerBlacklist;
Expand Down Expand Up @@ -391,19 +395,23 @@ void bond(final DiscoveryPeer peer) {
interaction -> {
final PingPacketData data =
PingPacketData.create(localPeer.getEndpoint(), peer.getEndpoint());
final Packet pingPacket = createPacket(PacketType.PING, data);

final BytesValue pingHash = pingPacket.getHash();
// Update the matching filter to only accept the PONG if it echoes the hash of our PING.
final Predicate<Packet> newFilter =
packet ->
packet
.getPacketData(PongPacketData.class)
.map(pong -> pong.getPingHash().equals(pingHash))
.orElse(false);
interaction.updateFilter(newFilter);

sendPacket(peer, pingPacket);
createPacket(
PacketType.PING,
data,
pingPacket -> {
final BytesValue pingHash = pingPacket.getHash();
// Update the matching filter to only accept the PONG if it echoes the hash of our
// PING.
final Predicate<Packet> newFilter =
packet ->
packet
.getPacketData(PongPacketData.class)
.map(pong -> pong.getPingHash().equals(pingHash))
.orElse(false);
interaction.updateFilter(newFilter);

sendPacket(peer, pingPacket);
});
};

// The filter condition will be updated as soon as the action is performed.
Expand All @@ -413,9 +421,13 @@ void bond(final DiscoveryPeer peer) {
}

private void sendPacket(final DiscoveryPeer peer, final PacketType type, final PacketData data) {
Packet packet = createPacket(type, data);
logSendingPacket(peer, packet);
outboundMessageHandler.send(peer, packet);
createPacket(
type,
data,
packet -> {
logSendingPacket(peer, packet);
outboundMessageHandler.send(peer, packet);
});
}

private void sendPacket(final DiscoveryPeer peer, final Packet packet) {
Expand All @@ -424,8 +436,17 @@ private void sendPacket(final DiscoveryPeer peer, final Packet packet) {
}

@VisibleForTesting
Packet createPacket(final PacketType type, final PacketData data) {
return Packet.create(type, data, keypair);
void createPacket(final PacketType type, final PacketData data, final Consumer<Packet> handler) {
// Creating packets is quite expensive because they have to be cryptographically signed
// So ensure the work is done on a worker thread to avoid blocking the vertx event thread.
workerExecutor
.execute(() -> Packet.create(type, data, keypair))
.thenAccept(handler)
.exceptionally(
error -> {
LOG.error("Error while creating packet", error);
return null;
});
}

/**
Expand Down Expand Up @@ -563,4 +584,8 @@ void cancelTimers() {
timerId.ifPresent(timerUtil::cancelTimer);
}
}

public interface AsyncExecutor {
<T> CompletableFuture<T> execute(Supplier<T> action);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static org.mockito.Mockito.when;

import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.BlockingAsyncExecutor;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.MockPeerDiscoveryAgent;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.MockTimerUtil;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.OutboundMessageHandler;
Expand Down Expand Up @@ -58,6 +59,7 @@ public void lastSeenAndFirstDiscoveredTimestampsUpdatedOnMessage() {
Collections.emptyList(),
OutboundMessageHandler.NOOP,
new MockTimerUtil(),
new BlockingAsyncExecutor(),
TimeUnit.HOURS.toMillis(1),
() -> true,
new PeerBlacklist(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.p2p.discovery.internal;

import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerDiscoveryController.AsyncExecutor;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

public class BlockingAsyncExecutor implements AsyncExecutor {

@Override
public <T> CompletableFuture<T> execute(final Supplier<T> action) {
return CompletableFuture.completedFuture(action.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import tech.pegasys.pantheon.ethereum.p2p.config.DiscoveryConfiguration;
import tech.pegasys.pantheon.ethereum.p2p.discovery.DiscoveryPeer;
import tech.pegasys.pantheon.ethereum.p2p.discovery.PeerDiscoveryAgent;
import tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerDiscoveryController.AsyncExecutor;
import tech.pegasys.pantheon.ethereum.p2p.peers.PeerBlacklist;
import tech.pegasys.pantheon.ethereum.permissioning.NodeWhitelistController;
import tech.pegasys.pantheon.util.bytes.BytesValue;
Expand Down Expand Up @@ -93,6 +94,11 @@ protected TimerUtil createTimer() {
return new MockTimerUtil();
}

@Override
protected AsyncExecutor createWorkerExecutor() {
return new BlockingAsyncExecutor();
}

@Override
public CompletableFuture<?> stop() {
return CompletableFuture.completedFuture(null);
Expand Down
Loading