Skip to content

Commit

Permalink
Discv5.1 - the latest spec updates (#76)
Browse files Browse the repository at this point in the history
* Require sending record when enr-seq == 0
ethereum/devp2p@92bbbd2
* Add destination ID into id-sig-input
ethereum/devp2p@d19fbe7
* Put version into protocol-id
ethereum/devp2p@5b59e88
* Rename 'iv' to 'maskingIV' according to spec changes
Part of ethereum/devp2p@60d9107
* Check min packet size
* TALKRESP protocol is Bytes not the String by the spec.
* Add FindNodeHandler unit test
* Rename create() to a more appropriate decode()
* Remove confusing DiscoveryV5Message wrapper and add DiscoveryMessageDecoder instead. Add an abstract Message class (aka for all discovery versions)
* Do not wrap DecryptException into DecodeException
* Initial refactor to reflect the latest Discv5.1 spec changes:
- packet header fields reorg
- maskingIV mixed up to message encryption associated data
- the whole WhoAreYou message is mixed for id-signature
- WhoAreYou has no more 'src-node-id' thus we need to find session via original-nonce
* Adjust tests to the new reality
* More type safety for Envelope
* Add Discovery test server
* Gradle: no need for 'application' plugin
* Get rid of 'import' which is highlighted by IDEA as ERROR
* Create DiscoveryServer outside of DiscoveryManagerImpl
* Add Field names for better debugging and logging
* Calling a discovery method with NodeRecord should work even this node is not in a Table yet
* Add DiscoveryManager tests and test helpers to simulate and control network interchange
* If another ordinary message received after WhoAreYou packet was sent, we should again send another WhoAreYou packet
* If a session was dropped on remote host we are sending packets instead of putting them into await queue. So when the WhoAreYou packet is received 'unexpectedly' we should take a SENT packet for resending
* Add missing field to StaticHeader.toString()
* Handshake ephemeral public key should be in compressed form
Co-authored-by: mbaxter <meredith.baxter@consensys.net>
  • Loading branch information
Nashatyrev authored Oct 13, 2020
1 parent 3f46afa commit 82b1630
Show file tree
Hide file tree
Showing 62 changed files with 1,612 additions and 972 deletions.
24 changes: 14 additions & 10 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import net.ltgt.gradle.errorprone.*

//discovery v5

plugins {
Expand Down Expand Up @@ -90,23 +88,23 @@ tasks.withType(JavaCompile) {
options.errorprone {
disableWarningsInGeneratedCode
// Our equals need to be symmetric, this checker doesn't respect that.
check('EqualsGetClass', CheckSeverity.OFF)
check('EqualsGetClass', net.ltgt.gradle.errorprone.CheckSeverity.OFF)
// We like to use futures with no return values.
check('FutureReturnValueIgnored', CheckSeverity.OFF)
check('FutureReturnValueIgnored', net.ltgt.gradle.errorprone.CheckSeverity.OFF)
// We use the JSR-305 annotations instead of the Google annotations.
check('ImmutableEnumChecker', CheckSeverity.OFF)
check('ImmutableEnumChecker', net.ltgt.gradle.errorprone.CheckSeverity.OFF)

check('FieldCanBeFinal', CheckSeverity.OFF)
check('InsecureCryptoUsage', CheckSeverity.WARN)
check('WildcardImport', CheckSeverity.WARN)
check('FieldCanBeFinal', net.ltgt.gradle.errorprone.CheckSeverity.OFF)
check('InsecureCryptoUsage', net.ltgt.gradle.errorprone.CheckSeverity.WARN)
check('WildcardImport', net.ltgt.gradle.errorprone.CheckSeverity.WARN)

// This check is broken in Java 12. See https://github.com/google/error-prone/issues/1257
if (JavaVersion.current() == JavaVersion.VERSION_12) {
check('Finally', CheckSeverity.OFF)
check('Finally', net.ltgt.gradle.errorprone.CheckSeverity.OFF)
}
// This check is broken after Java 12. See https://github.com/google/error-prone/issues/1352
if (JavaVersion.current() > JavaVersion.VERSION_12) {
check('TypeParameterUnusedInFormals', CheckSeverity.OFF)
check('TypeParameterUnusedInFormals', net.ltgt.gradle.errorprone.CheckSeverity.OFF)
}
}
options.encoding = 'UTF-8'
Expand Down Expand Up @@ -147,6 +145,12 @@ task sourcesJar(type: Jar, dependsOn: classes) {
from sourceSets.main.allSource
}

task runTestDiscovery(type:JavaExec) {
main = 'org.ethereum.beacon.discovery.app.DiscoveryTestServer'
classpath = sourceSets.main.runtimeClasspath + sourceSets.test.runtimeClasspath
systemProperty "log4j.configurationFile", "log4j2-test-discovery.xml"
}

def resolvedVersion = calculateVersion()
def bintrayUser = project.hasProperty('bintrayUser') ? project.property('bintrayUser') : System.getenv('BINTRAY_USER')
def bintrayKey = project.hasProperty('bintrayApiKey') ? project.property('bintrayApiKey') : System.getenv('BINTRAY_KEY')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
package org.ethereum.beacon.discovery;

import com.google.common.annotations.VisibleForTesting;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -18,7 +16,6 @@
import org.ethereum.beacon.discovery.network.DiscoveryClient;
import org.ethereum.beacon.discovery.network.NettyDiscoveryClientImpl;
import org.ethereum.beacon.discovery.network.NettyDiscoveryServer;
import org.ethereum.beacon.discovery.network.NettyDiscoveryServerImpl;
import org.ethereum.beacon.discovery.network.NetworkParcel;
import org.ethereum.beacon.discovery.pipeline.Envelope;
import org.ethereum.beacon.discovery.pipeline.Field;
Expand All @@ -38,17 +35,19 @@
import org.ethereum.beacon.discovery.pipeline.handler.UnauthorizedMessagePacketHandler;
import org.ethereum.beacon.discovery.pipeline.handler.UnknownPacketTagToSender;
import org.ethereum.beacon.discovery.pipeline.handler.WhoAreYouPacketHandler;
import org.ethereum.beacon.discovery.pipeline.handler.WhoAreYouSessionResolver;
import org.ethereum.beacon.discovery.pipeline.info.FindNodeResponseHandler;
import org.ethereum.beacon.discovery.pipeline.info.MultiPacketResponseHandler;
import org.ethereum.beacon.discovery.pipeline.info.Request;
import org.ethereum.beacon.discovery.scheduler.ExpirationSchedulerFactory;
import org.ethereum.beacon.discovery.scheduler.Scheduler;
import org.ethereum.beacon.discovery.schema.NodeRecord;
import org.ethereum.beacon.discovery.schema.NodeRecordFactory;
import org.ethereum.beacon.discovery.storage.AuthTagRepository;
import org.ethereum.beacon.discovery.schema.NodeRecordInfo;
import org.ethereum.beacon.discovery.storage.LocalNodeRecordStore;
import org.ethereum.beacon.discovery.storage.NodeBucketStorage;
import org.ethereum.beacon.discovery.storage.NodeTable;
import org.ethereum.beacon.discovery.storage.NonceRepository;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
Expand All @@ -62,10 +61,11 @@ public class DiscoveryManagerImpl implements DiscoveryManager {
private final Pipeline incomingPipeline = new PipelineImpl();
private final Pipeline outgoingPipeline = new PipelineImpl();
private final LocalNodeRecordStore localNodeRecordStore;
private final NodeTable nodeTable;
private volatile DiscoveryClient discoveryClient;

public DiscoveryManagerImpl(
Optional<InetSocketAddress> listenAddress,
NettyDiscoveryServer discoveryServer,
NodeTable nodeTable,
NodeBucketStorage nodeBucketStorage,
LocalNodeRecordStore localNodeRecordStore,
Expand All @@ -74,29 +74,24 @@ public DiscoveryManagerImpl(
Scheduler taskScheduler,
ExpirationSchedulerFactory expirationSchedulerFactory,
TalkHandler talkHandler) {
this.nodeTable = nodeTable;
this.localNodeRecordStore = localNodeRecordStore;
final NodeRecord homeNodeRecord = localNodeRecordStore.getLocalNodeRecord();
AuthTagRepository authTagRepo = new AuthTagRepository();

this.discoveryServer =
new NettyDiscoveryServerImpl(
listenAddress
.or(homeNodeRecord::getUdpAddress)
.orElseThrow(
() ->
new IllegalArgumentException(
"Local node record must contain an IP and UDP port")));
NonceRepository nonceRepository = new NonceRepository();

this.discoveryServer = discoveryServer;
NodeIdToSession nodeIdToSession =
new NodeIdToSession(
localNodeRecordStore,
homeNodePrivateKey,
nodeBucketStorage,
authTagRepo,
nonceRepository,
nodeTable,
outgoingPipeline,
expirationSchedulerFactory);
incomingPipeline
.addHandler(new IncomingDataPacker(homeNodeRecord.getNodeId()))
.addHandler(new WhoAreYouSessionResolver(nonceRepository))
.addHandler(new UnknownPacketTagToSender())
.addHandler(nodeIdToSession)
.addHandler(new PacketDispatcherHandler())
Expand Down Expand Up @@ -156,8 +151,15 @@ private <T> CompletableFuture<T> executeTaskImpl(NodeRecord nodeRecord, Request<
return request.getResultPromise();
}

private void addNode(NodeRecord nodeRecord) {
if (nodeTable.getNode(nodeRecord.getNodeId()).isEmpty()) {
nodeTable.save(NodeRecordInfo.createDefault(nodeRecord));
}
}

@Override
public CompletableFuture<Void> findNodes(NodeRecord nodeRecord, List<Integer> distances) {
addNode(nodeRecord);
Request<Void> request =
new Request<>(
new CompletableFuture<>(),
Expand All @@ -168,6 +170,7 @@ public CompletableFuture<Void> findNodes(NodeRecord nodeRecord, List<Integer> di

@Override
public CompletableFuture<Void> ping(NodeRecord nodeRecord) {
addNode(nodeRecord);
Request<Void> request =
new Request<>(
new CompletableFuture<>(),
Expand All @@ -178,6 +181,7 @@ public CompletableFuture<Void> ping(NodeRecord nodeRecord) {

@Override
public CompletableFuture<Bytes> talk(NodeRecord nodeRecord, Bytes protocol, Bytes requestBytes) {
addNode(nodeRecord);
Request<Bytes> request =
new Request<>(
new CompletableFuture<>(),
Expand All @@ -190,4 +194,14 @@ public CompletableFuture<Bytes> talk(NodeRecord nodeRecord, Bytes protocol, Byte
public Publisher<NetworkParcel> getOutgoingMessages() {
return outgoingMessages;
}

@VisibleForTesting
public Pipeline getIncomingPipeline() {
return incomingPipeline;
}

@VisibleForTesting
public Pipeline getOutgoingPipeline() {
return outgoingPipeline;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Arrays.asList;
import static java.util.Objects.requireNonNullElseGet;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.net.InetSocketAddress;
import java.time.Duration;
Expand All @@ -18,6 +20,8 @@
import java.util.stream.Stream;
import org.apache.tuweni.bytes.Bytes;
import org.ethereum.beacon.discovery.database.Database;
import org.ethereum.beacon.discovery.network.NettyDiscoveryServer;
import org.ethereum.beacon.discovery.network.NettyDiscoveryServerImpl;
import org.ethereum.beacon.discovery.scheduler.ExpirationSchedulerFactory;
import org.ethereum.beacon.discovery.scheduler.Schedulers;
import org.ethereum.beacon.discovery.schema.NodeRecord;
Expand All @@ -34,6 +38,7 @@
import org.ethereum.beacon.discovery.task.DiscoveryTaskManager;

public class DiscoverySystemBuilder {

private static final AtomicInteger COUNTER = new AtomicInteger();
private List<NodeRecord> bootnodes = Collections.emptyList();
private Optional<InetSocketAddress> listenAddress = Optional.empty();
Expand All @@ -46,6 +51,7 @@ public class DiscoverySystemBuilder {
private Duration retryTimeout = DiscoveryTaskManager.DEFAULT_RETRY_TIMEOUT;
private Duration lifeCheckInterval = DiscoveryTaskManager.DEFAULT_LIVE_CHECK_INTERVAL;
private TalkHandler talkHandler = TalkHandler.NOOP;
private NettyDiscoveryServer discoveryServer = null;

public DiscoverySystemBuilder localNodeRecord(final NodeRecord localNodeRecord) {
this.localNodeRecord = localNodeRecord;
Expand All @@ -71,6 +77,11 @@ public DiscoverySystemBuilder bootnodes(final String... enrs) {
return this;
}

public DiscoverySystemBuilder bootnodes(final List<NodeRecord> records) {
bootnodes = records;
return this;
}

public DiscoverySystemBuilder bootnodes(final NodeRecord... records) {
bootnodes = asList(records);
return this;
Expand Down Expand Up @@ -106,42 +117,68 @@ public DiscoverySystemBuilder talkHandler(TalkHandler talkHandler) {
return this;
}

public DiscoverySystem build() {
checkNotNull(localNodeRecord, "Missing local node record");
checkNotNull(privateKey, "Missing private key");
public DiscoverySystemBuilder discoveryServer(NettyDiscoveryServer discoveryServer) {
this.discoveryServer = discoveryServer;
return this;
}

if (database == null) {
database = Database.inMemoryDB();
}
final NodeTableStorageFactory nodeTableStorageFactory = new NodeTableStorageFactoryImpl();
final NodeSerializerFactory serializerFactory = new NodeSerializerFactory(nodeRecordFactory);
final NodeTableStorage nodeTableStorage =
nodeTableStorageFactory.createTable(
database, serializerFactory, oldSeq -> localNodeRecord, () -> bootnodes);
final NodeTable nodeTable = nodeTableStorage.get();
if (schedulers == null) {
schedulers = Schedulers.createDefault();
}
final NodeBucketStorage nodeBucketStorage =
new NodeBucketStorageImpl(database, serializerFactory, localNodeRecord);
final int clientNumber = COUNTER.incrementAndGet();
final LocalNodeRecordStore localNodeRecordStore =
new LocalNodeRecordStore(localNodeRecord, privateKey, localNodeRecordListener);
final ExpirationSchedulerFactory expirationSchedulerFactory =
new ExpirationSchedulerFactory(
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("discovery-expiration-%d").build()));
final DiscoveryManager discoveryManager =
new DiscoveryManagerImpl(
listenAddress,
nodeTable,
private void createDefaults() {
database = requireNonNullElseGet(database, () -> Database.inMemoryDB());
schedulers = requireNonNullElseGet(schedulers, () -> Schedulers.createDefault());
discoveryServer =
requireNonNullElseGet(
discoveryServer,
() ->
new NettyDiscoveryServerImpl(
listenAddress
.or(localNodeRecord::getUdpAddress)
.orElseThrow(
() ->
new IllegalArgumentException(
"Local node record must contain an IP and UDP port"))));

nodeTableStorage =
requireNonNullElseGet(
nodeTableStorage,
() ->
nodeTableStorageFactory.createTable(
database, serializerFactory, oldSeq -> localNodeRecord, () -> bootnodes));
nodeTable = requireNonNullElseGet(nodeTable, () -> nodeTableStorage.get());
nodeBucketStorage =
requireNonNullElseGet(
nodeBucketStorage,
() -> new NodeBucketStorageImpl(database, serializerFactory, localNodeRecord));
localNodeRecordStore =
requireNonNullElseGet(
localNodeRecordStore,
privateKey,
nodeRecordFactory,
schedulers.newSingleThreadDaemon("discovery-client-" + clientNumber),
() -> new LocalNodeRecordStore(localNodeRecord, privateKey, localNodeRecordListener));
expirationSchedulerFactory =
requireNonNullElseGet(
expirationSchedulerFactory,
talkHandler);
() ->
new ExpirationSchedulerFactory(
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("discovery-expiration-%d")
.build())));
}

final NodeTableStorageFactory nodeTableStorageFactory = new NodeTableStorageFactoryImpl();
final NodeSerializerFactory serializerFactory = new NodeSerializerFactory(nodeRecordFactory);
final int clientNumber = COUNTER.incrementAndGet();

NodeTableStorage nodeTableStorage;
NodeTable nodeTable;
NodeBucketStorage nodeBucketStorage;
LocalNodeRecordStore localNodeRecordStore;
ExpirationSchedulerFactory expirationSchedulerFactory;

public DiscoverySystem build() {
checkNotNull(localNodeRecord, "Missing local node record");
checkNotNull(privateKey, "Missing private key");
createDefaults();

final DiscoveryManager discoveryManager = buildDiscoveryManager();

final DiscoveryTaskManager discoveryTaskManager =
new DiscoveryTaskManager(
Expand All @@ -158,4 +195,19 @@ public DiscoverySystem build() {
return new DiscoverySystem(
discoveryManager, discoveryTaskManager, expirationSchedulerFactory, nodeTable, bootnodes);
}

@VisibleForTesting
DiscoveryManagerImpl buildDiscoveryManager() {
createDefaults();
return new DiscoveryManagerImpl(
discoveryServer,
nodeTable,
nodeBucketStorage,
localNodeRecordStore,
privateKey,
nodeRecordFactory,
schedulers.newSingleThreadDaemon("discovery-client-" + clientNumber),
expirationSchedulerFactory,
talkHandler);
}
}
17 changes: 1 addition & 16 deletions src/main/java/org/ethereum/beacon/discovery/packet/AuthData.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,5 @@
*/
package org.ethereum.beacon.discovery.packet;

import org.ethereum.beacon.discovery.type.Bytes12;
import org.ethereum.beacon.discovery.util.DecodeException;

/** AuthData part of any {@link Packet}'s {@link Header} */
public interface AuthData extends BytesSerializable {

Bytes12 getAesGcmNonce();

@Override
default void validate() throws DecodeException {
DecodeException.wrap(
() -> "Couldn't decode AuthData nonce: " + getBytes(),
() -> {
getAesGcmNonce();
});
}
}
public interface AuthData extends BytesSerializable {}
Loading

0 comments on commit 82b1630

Please sign in to comment.