From b1ce15965e71e1beaf7e05b57f2ea3220da1f3b3 Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Tue, 22 Aug 2023 22:35:34 +0100 Subject: [PATCH] Add yamux support (networking) --- acceptance-tests/build.gradle | 2 +- build.gradle | 1 + data/beaconrestapi/build.gradle | 2 +- .../allowed-licenses.json | 4 +- gradle/versions.gradle | 13 +- networking/eth2/build.gradle | 5 +- networking/p2p/build.gradle | 5 +- .../p2p/libp2p/LibP2PNetworkBuilder.java | 15 +- .../networking/p2p/libp2p/MplexFirewall.java | 128 ----------- .../networking/p2p/libp2p/MuxFirewall.java | 206 ++++++++++++++++++ .../p2p/network/config/NetworkConfig.java | 19 +- ...FirewallTest.java => MuxFirewallTest.java} | 191 +++++++++------- .../p2p/libp2p/PeerManagerTest.java | 3 +- services/beaconchain/build.gradle | 2 +- teku/build.gradle | 2 +- .../pegasys/teku/cli/options/P2POptions.java | 12 +- 16 files changed, 381 insertions(+), 229 deletions(-) delete mode 100644 networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/MplexFirewall.java create mode 100644 networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/MuxFirewall.java rename networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/{MplexFirewallTest.java => MuxFirewallTest.java} (52%) diff --git a/acceptance-tests/build.gradle b/acceptance-tests/build.gradle index 556c138601f..93c1a6dbfbc 100644 --- a/acceptance-tests/build.gradle +++ b/acceptance-tests/build.gradle @@ -24,7 +24,7 @@ dependencies { testFixturesImplementation project(':infrastructure:time') testFixturesImplementation project(':infrastructure:bls-keystore') testFixturesImplementation 'com.squareup.okhttp3:okhttp' - testFixturesImplementation 'io.libp2p:jvm-libp2p-minimal' + testFixturesImplementation 'io.libp2p:jvm-libp2p' testFixturesImplementation 'org.apache.commons:commons-lang3' testFixturesImplementation 'commons-io:commons-io' testFixturesImplementation 'org.apache.tuweni:tuweni-bytes' diff --git a/build.gradle b/build.gradle index 3000b10ba12..7e15867b602 100644 --- a/build.gradle +++ b/build.gradle @@ -136,6 +136,7 @@ allprojects { repositories { mavenLocal() mavenCentral() + maven { url "https://jitpack.io" } maven { url "https://artifacts.consensys.net/public/maven/maven/" content { includeGroupByRegex('tech\\.pegasys($|\\..*)')} diff --git a/data/beaconrestapi/build.gradle b/data/beaconrestapi/build.gradle index b3ae95976c9..90d39c54151 100644 --- a/data/beaconrestapi/build.gradle +++ b/data/beaconrestapi/build.gradle @@ -41,7 +41,7 @@ dependencies { testImplementation testFixtures(project(':infrastructure:json')) - testCompileOnly 'io.libp2p:jvm-libp2p-minimal' + testCompileOnly 'io.libp2p:jvm-libp2p' integrationTestImplementation testFixtures(project(':infrastructure:bls')) integrationTestImplementation testFixtures(project(':ethereum:spec')) diff --git a/gradle/license-report-config/allowed-licenses.json b/gradle/license-report-config/allowed-licenses.json index 8ddcccb3f62..816a4ac9062 100644 --- a/gradle/license-report-config/allowed-licenses.json +++ b/gradle/license-report-config/allowed-licenses.json @@ -69,7 +69,7 @@ "moduleName": "org.json:json" }, { - "moduleName": "io.libp2p:jvm-libp2p-minimal" + "moduleName": "io.libp2p:jvm-libp2p" }, { "moduleName": "com.squareup.okio:okio" @@ -104,7 +104,7 @@ "moduleLicense": "Apache License, Version 2.0" }, { - "moduleName": "io.libp2p:jvm-libp2p-minimal", + "moduleName": "io.libp2p:jvm-libp2p", "moduleLicense": "Apache License, Version 2.0", "moduleLicenseUrl": "https://github.com/libp2p/jvm-libp2p/blob/develop/LICENSE-APACHE" }, diff --git a/gradle/versions.gradle b/gradle/versions.gradle index ff9e653f33d..d434231de3e 100644 --- a/gradle/versions.gradle +++ b/gradle/versions.gradle @@ -32,7 +32,7 @@ dependencyManagement { entry 'javalin-rendering' } - dependency 'io.libp2p:jvm-libp2p-minimal:0.10.0-RELEASE' + dependency 'io.libp2p:jvm-libp2p:1.0.0-RELEASE' dependency 'tech.pegasys:jblst:0.3.10' dependency 'tech.pegasys:jc-kzg-4844:0.7.2' @@ -58,10 +58,8 @@ dependencyManagement { } dependencySet(group: 'io.netty', version: '4.1.96.Final') { - entry('netty-all') { - exclude "io.netty:netty-transport-rxtx" - exclude "org.rxtx:rxtx" - } + entry 'netty-handler' + entry 'netty-codec-http' } dependencySet(group: 'io.vertx', version: '4.4.4') { @@ -136,10 +134,7 @@ dependencyManagement { dependency 'io.prometheus:simpleclient:0.9.0' dependencySet(group: 'org.hyperledger.besu.internal', version: '23.4.4') { - entry('metrics-core') { - // We include netty-all so omit the separated jars - exclude 'io.netty:netty-tcnative-boringssl-static' - } + entry('metrics-core') entry('core') entry('config') } diff --git a/networking/eth2/build.gradle b/networking/eth2/build.gradle index 5451170c05a..a001b17005d 100644 --- a/networking/eth2/build.gradle +++ b/networking/eth2/build.gradle @@ -17,7 +17,8 @@ dependencies { implementation project(':storage:api') implementation project(':infrastructure:serviceutils') - implementation 'io.libp2p:jvm-libp2p-minimal' + implementation 'io.libp2p:jvm-libp2p' + implementation 'io.netty:netty-codec-http' implementation 'org.apache.tuweni:tuweni-ssz' implementation 'org.xerial.snappy:snappy-java' @@ -53,7 +54,7 @@ dependencies { testFixturesImplementation project(':infrastructure:subscribers') testFixturesImplementation project(':infrastructure:serviceutils') - testFixturesImplementation 'io.libp2p:jvm-libp2p-minimal' + testFixturesImplementation 'io.libp2p:jvm-libp2p' testFixturesImplementation 'org.mockito:mockito-core' testFixturesImplementation 'org.hyperledger.besu:plugin-api' testFixturesImplementation 'org.hyperledger.besu.internal:metrics-core' diff --git a/networking/p2p/build.gradle b/networking/p2p/build.gradle index cbd9628e1e1..3117d70ee21 100644 --- a/networking/p2p/build.gradle +++ b/networking/p2p/build.gradle @@ -17,7 +17,8 @@ dependencies { implementation project(':storage') implementation project(':infrastructure:ssz') - implementation 'io.libp2p:jvm-libp2p-minimal' + implementation 'io.libp2p:jvm-libp2p' + implementation 'io.netty:netty-handler' implementation 'io.projectreactor:reactor-core' implementation 'org.apache.tuweni:tuweni-units' implementation 'org.apache.tuweni:tuweni-crypto' @@ -45,6 +46,6 @@ dependencies { testFixturesImplementation 'org.hyperledger.besu:plugin-api' testFixturesImplementation 'org.hyperledger.besu.internal:metrics-core' - testFixturesImplementation 'io.libp2p:jvm-libp2p-minimal' + testFixturesImplementation 'io.libp2p:jvm-libp2p' testFixturesImplementation 'org.apache.logging.log4j:log4j-core' } diff --git a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/LibP2PNetworkBuilder.java b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/LibP2PNetworkBuilder.java index 64691924e77..fecb8b52e56 100644 --- a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/LibP2PNetworkBuilder.java +++ b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/LibP2PNetworkBuilder.java @@ -76,8 +76,8 @@ public static LibP2PNetworkBuilder create() { protected GossipTopicFilter gossipTopicFilter; protected Firewall firewall = new Firewall(Duration.ofSeconds(30)); - protected MplexFirewall mplexFirewall = - new MplexFirewall(REMOTE_OPEN_STREAMS_RATE_LIMIT, REMOTE_PARALLEL_OPEN_STREAMS_COUNT_LIMIT); + protected MuxFirewall muxFirewall = + new MuxFirewall(REMOTE_OPEN_STREAMS_RATE_LIMIT, REMOTE_PARALLEL_OPEN_STREAMS_COUNT_LIMIT); protected LibP2PGossipNetwork gossipNetwork; @@ -157,6 +157,11 @@ protected Host createHost() { b.getIdentity().setFactory(() -> privKey); b.getTransports().add(TcpTransport::new); b.getSecureChannels().add(NoiseXXSecureChannel::new); + + if (config.isYamuxEnabled()) { + // yamux MUST take precedence during negotiation + b.getMuxers().add(StreamMuxerProtocol.getYamux()); + } b.getMuxers().add(StreamMuxerProtocol.getMplex()); b.getNetwork().listen(listenAddr.toString()); @@ -179,7 +184,7 @@ protected Host createHost() { b.getConnectionHandlers().add(peerManager); - b.getDebug().getMuxFramesHandler().addHandler(mplexFirewall); + b.getDebug().getMuxFramesHandler().addHandler(muxFirewall); }); } @@ -266,8 +271,8 @@ public LibP2PNetworkBuilder firewall(Firewall firewall) { return this; } - public LibP2PNetworkBuilder mplexFirewall(MplexFirewall mplexFirewall) { - this.mplexFirewall = mplexFirewall; + public LibP2PNetworkBuilder muxFirewall(MuxFirewall muxFirewall) { + this.muxFirewall = muxFirewall; return this; } } diff --git a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/MplexFirewall.java b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/MplexFirewall.java deleted file mode 100644 index 5710b144e36..00000000000 --- a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/MplexFirewall.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Copyright Consensys Software Inc., 2022 - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package tech.pegasys.teku.networking.p2p.libp2p; - -import com.google.common.annotations.VisibleForTesting; -import io.libp2p.core.ChannelVisitor; -import io.libp2p.core.Connection; -import io.libp2p.etc.util.netty.mux.MuxId; -import io.libp2p.mux.MuxFrame; -import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; -import java.util.HashSet; -import java.util.Set; -import java.util.function.Supplier; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.jetbrains.annotations.NotNull; -import tech.pegasys.teku.infrastructure.async.FutureUtil; - -public class MplexFirewall implements ChannelVisitor { - - private static final Logger LOG = LogManager.getLogger(); - private static final long ONE_SECOND = 1000; - - private final int remoteOpenStreamsRateLimit; - private final int remoteParallelOpenStreamsLimit; - private final Supplier currentTimeSupplier; - - public MplexFirewall(int remoteOpenStreamsRateLimit, int remoteParallelOpenStreamsLimit) { - this(remoteOpenStreamsRateLimit, remoteParallelOpenStreamsLimit, System::currentTimeMillis); - } - - @VisibleForTesting - MplexFirewall( - int remoteOpenStreamsRateLimit, - int remoteParallelOpenStreamsLimit, - Supplier currentTimeSupplier) { - this.remoteOpenStreamsRateLimit = remoteOpenStreamsRateLimit; - this.remoteParallelOpenStreamsLimit = remoteParallelOpenStreamsLimit; - this.currentTimeSupplier = currentTimeSupplier; - } - - protected void remoteParallelOpenStreamLimitExceeded(MplexFirewallHandler peerMplexHandler) { - LOG.debug("Abruptly closing peer connection due to exceeding parallel open streams limit"); - FutureUtil.ignoreFuture(peerMplexHandler.getConnection().close()); - } - - protected void remoteOpenFrameRateLimitExceeded(MplexFirewallHandler peerMplexHandler) { - LOG.debug("Abruptly closing peer connection due to exceeding open mplex frame rate limit"); - FutureUtil.ignoreFuture(peerMplexHandler.getConnection().close()); - } - - @Override - public void visit(@NotNull Connection connection) { - MplexFirewallHandler firewallHandler = new MplexFirewallHandler(connection); - connection.pushHandler(firewallHandler); - } - - private class MplexFirewallHandler extends ChannelDuplexHandler { - - private final Connection connection; - private int openFrameCounter = 0; - private long startCounterTime = 0; - private final Set remoteOpenedStreamIds = new HashSet<>(); - - public MplexFirewallHandler(Connection connection) { - this.connection = connection; - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { - MuxFrame muxFrame = (MuxFrame) msg; - boolean blockFrame = false; - if (muxFrame.getFlag() == MuxFrame.Flag.OPEN) { - remoteOpenedStreamIds.add(muxFrame.getId()); - if (remoteOpenedStreamIds.size() > remoteParallelOpenStreamsLimit) { - remoteParallelOpenStreamLimitExceeded(this); - blockFrame = true; - } - - long curTime = currentTimeSupplier.get(); - if (curTime - startCounterTime > ONE_SECOND) { - startCounterTime = curTime; - openFrameCounter = 0; - } else { - openFrameCounter++; - if (openFrameCounter > remoteOpenStreamsRateLimit) { - remoteOpenFrameRateLimitExceeded(this); - blockFrame = true; - } - } - } else if (muxFrame.getFlag() == MuxFrame.Flag.CLOSE - || muxFrame.getFlag() == MuxFrame.Flag.RESET) { - remoteOpenedStreamIds.remove(muxFrame.getId()); - } - if (!blockFrame) { - ctx.fireChannelRead(msg); - } - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { - MuxFrame muxFrame = (MuxFrame) msg; - if (muxFrame.getFlag() == MuxFrame.Flag.RESET) { - // Track only RESET since CLOSE from local doesn't close the stream for writing from remote - remoteOpenedStreamIds.remove(muxFrame.getId()); - } - // ignoring since the write() just returns `promise` instance - FutureUtil.ignoreFuture(ctx.write(msg, promise)); - } - - public Connection getConnection() { - return connection; - } - } -} diff --git a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/MuxFirewall.java b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/MuxFirewall.java new file mode 100644 index 00000000000..3841ae8366d --- /dev/null +++ b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/libp2p/MuxFirewall.java @@ -0,0 +1,206 @@ +/* + * Copyright Consensys Software Inc., 2022 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.networking.p2p.libp2p; + +import com.google.common.annotations.VisibleForTesting; +import io.libp2p.core.ChannelVisitor; +import io.libp2p.core.Connection; +import io.libp2p.etc.util.netty.mux.MuxId; +import io.libp2p.mux.mplex.MplexFlag; +import io.libp2p.mux.mplex.MplexFrame; +import io.libp2p.mux.yamux.YamuxFlags; +import io.libp2p.mux.yamux.YamuxFrame; +import io.libp2p.mux.yamux.YamuxType; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import java.util.HashSet; +import java.util.Set; +import java.util.function.Supplier; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; +import tech.pegasys.teku.infrastructure.async.FutureUtil; + +/** + * Adds additional upfront verification of connections. + * + *

Supports both {@link YamuxFrame} and {@link MplexFrame} + */ +public class MuxFirewall implements ChannelVisitor { + + private static final Logger LOG = LogManager.getLogger(); + private static final long ONE_SECOND = 1000; + + private final int remoteOpenStreamsRateLimit; + private final int remoteParallelOpenStreamsLimit; + private final Supplier currentTimeSupplier; + + public MuxFirewall( + final int remoteOpenStreamsRateLimit, final int remoteParallelOpenStreamsLimit) { + this(remoteOpenStreamsRateLimit, remoteParallelOpenStreamsLimit, System::currentTimeMillis); + } + + @VisibleForTesting + MuxFirewall( + final int remoteOpenStreamsRateLimit, + final int remoteParallelOpenStreamsLimit, + final Supplier currentTimeSupplier) { + this.remoteOpenStreamsRateLimit = remoteOpenStreamsRateLimit; + this.remoteParallelOpenStreamsLimit = remoteParallelOpenStreamsLimit; + this.currentTimeSupplier = currentTimeSupplier; + } + + protected void remoteParallelOpenStreamLimitExceeded(final MuxFirewallHandler peerMplexHandler) { + LOG.debug("Abruptly closing peer connection due to exceeding parallel open streams limit"); + FutureUtil.ignoreFuture(peerMplexHandler.getConnection().close()); + } + + protected void remoteOpenFrameRateLimitExceeded(final MuxFirewallHandler peerMplexHandler) { + LOG.debug("Abruptly closing peer connection due to exceeding open mux frame rate limit"); + FutureUtil.ignoreFuture(peerMplexHandler.getConnection().close()); + } + + @Override + public void visit(@NotNull final Connection connection) { + final MuxFirewallHandler firewallHandler = new MuxFirewallHandler(connection); + connection.pushHandler(firewallHandler); + } + + private class MuxFirewallHandler extends ChannelDuplexHandler { + + private final Connection connection; + private int openFrameCounter = 0; + private long startCounterTime = 0; + private final Set remoteOpenedStreamIds = new HashSet<>(); + + public MuxFirewallHandler(final Connection connection) { + this.connection = connection; + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, final Object msg) { + final MuxFrame muxFrame = resolveMuxFrame(msg); + boolean blockFrame = false; + if (muxFrame.indicatesStreamOpening()) { + remoteOpenedStreamIds.add(muxFrame.getId()); + if (remoteOpenedStreamIds.size() > remoteParallelOpenStreamsLimit) { + remoteParallelOpenStreamLimitExceeded(this); + blockFrame = true; + } + + final long curTime = currentTimeSupplier.get(); + if (curTime - startCounterTime > ONE_SECOND) { + startCounterTime = curTime; + openFrameCounter = 0; + } else { + openFrameCounter++; + if (openFrameCounter > remoteOpenStreamsRateLimit) { + remoteOpenFrameRateLimitExceeded(this); + blockFrame = true; + } + } + } else if (muxFrame.indicatesStreamClosing() || muxFrame.indicatesStreamResetting()) { + remoteOpenedStreamIds.remove(muxFrame.getId()); + } + if (!blockFrame) { + ctx.fireChannelRead(msg); + } + } + + @Override + public void write( + final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) { + final MuxFrame muxFrame = resolveMuxFrame(msg); + if (muxFrame.indicatesStreamResetting()) { + // Track only RESET since CLOSE from local doesn't close the stream for writing from remote + remoteOpenedStreamIds.remove(muxFrame.getId()); + } + // ignoring since write() just returns `promise` instance + FutureUtil.ignoreFuture(ctx.write(msg, promise)); + } + + public Connection getConnection() { + return connection; + } + } + + private MuxFrame resolveMuxFrame(final Object msg) { + if (msg instanceof MplexFrame mplexFrame) { + return new MuxFrame() { + + private final MplexFlag mplexFlag = mplexFrame.getFlag(); + + @Override + public MuxId getId() { + return mplexFrame.getId(); + } + + @Override + public boolean indicatesStreamOpening() { + return mplexFlag.getType() == MplexFlag.Type.OPEN; + } + + @Override + public boolean indicatesStreamClosing() { + return mplexFlag.getType() == MplexFlag.Type.CLOSE; + } + + @Override + public boolean indicatesStreamResetting() { + return mplexFlag.getType() == MplexFlag.Type.RESET; + } + }; + } + if (msg instanceof YamuxFrame yamuxFrame) { + return new MuxFrame() { + + private final int yamuxFlags = yamuxFrame.getFlags(); + + @Override + public MuxId getId() { + return yamuxFrame.getId(); + } + + @Override + public boolean indicatesStreamOpening() { + return (yamuxFlags == YamuxFlags.SYN || yamuxFlags == YamuxFlags.ACK) + // ignore Ping type + && yamuxFrame.getType() != YamuxType.PING; + } + + @Override + public boolean indicatesStreamClosing() { + return yamuxFlags == YamuxFlags.FIN; + } + + @Override + public boolean indicatesStreamResetting() { + return yamuxFlags == YamuxFlags.RST; + } + }; + } + throw new IllegalArgumentException("Not supported type of mux frame: " + msg.getClass()); + } + + private interface MuxFrame { + MuxId getId(); + + boolean indicatesStreamOpening(); + + boolean indicatesStreamClosing(); + + boolean indicatesStreamResetting(); + } +} diff --git a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/network/config/NetworkConfig.java b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/network/config/NetworkConfig.java index 85b0e1ed50f..9b6866c1653 100644 --- a/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/network/config/NetworkConfig.java +++ b/networking/p2p/src/main/java/tech/pegasys/teku/networking/p2p/network/config/NetworkConfig.java @@ -45,6 +45,7 @@ public interface PrivateKeySource { public static final String DEFAULT_P2P_INTERFACE = "0.0.0.0"; public static final int DEFAULT_P2P_PORT = 9000; + public static final boolean DEFAULT_YAMUX_ENABLED = false; private final GossipConfig gossipConfig; private final WireLogsConfig wireLogsConfig; @@ -55,6 +56,7 @@ public interface PrivateKeySource { private final Optional advertisedIp; private final int listenPort; private final OptionalInt advertisedPort; + private final boolean yamuxEnabled; private NetworkConfig( final boolean isEnabled, @@ -64,7 +66,8 @@ private NetworkConfig( final String networkInterface, final Optional advertisedIp, final int listenPort, - final OptionalInt advertisedPort) { + final OptionalInt advertisedPort, + final boolean yamuxEnabled) { this.privateKeySource = privateKeySource; this.networkInterface = networkInterface; @@ -79,6 +82,7 @@ private NetworkConfig( this.listenPort = listenPort; this.advertisedPort = advertisedPort; + this.yamuxEnabled = yamuxEnabled; this.gossipConfig = gossipConfig; this.wireLogsConfig = wireLogsConfig; } @@ -115,6 +119,10 @@ public int getAdvertisedPort() { return advertisedPort.orElse(listenPort); } + public boolean isYamuxEnabled() { + return yamuxEnabled; + } + public GossipConfig getGossipConfig() { return gossipConfig; } @@ -175,6 +183,7 @@ public static class Builder { private Optional advertisedIp = Optional.empty(); private int listenPort = DEFAULT_P2P_PORT; private OptionalInt advertisedPort = OptionalInt.empty(); + private boolean yamuxEnabled = DEFAULT_YAMUX_ENABLED; private Builder() {} @@ -187,7 +196,8 @@ public NetworkConfig build() { networkInterface, advertisedIp, listenPort, - advertisedPort); + advertisedPort, + yamuxEnabled); } private Optional createFileKeySource() { @@ -254,6 +264,11 @@ public Builder advertisedPort(final OptionalInt advertisedPort) { this.advertisedPort = advertisedPort; return this; } + + public Builder yamuxEnabled(final boolean yamuxEnabled) { + this.yamuxEnabled = yamuxEnabled; + return this; + } } @VisibleForTesting diff --git a/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/MplexFirewallTest.java b/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/MuxFirewallTest.java similarity index 52% rename from networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/MplexFirewallTest.java rename to networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/MuxFirewallTest.java index 433768de38e..4589ca97597 100644 --- a/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/MplexFirewallTest.java +++ b/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/MuxFirewallTest.java @@ -19,8 +19,11 @@ import io.libp2p.core.Connection; import io.libp2p.core.transport.Transport; import io.libp2p.etc.util.netty.mux.MuxId; -import io.libp2p.mux.MuxFrame; -import io.libp2p.mux.MuxFrame.Flag; +import io.libp2p.mux.mplex.MplexFlag; +import io.libp2p.mux.mplex.MplexFrame; +import io.libp2p.mux.yamux.YamuxFlags; +import io.libp2p.mux.yamux.YamuxFrame; +import io.libp2p.mux.yamux.YamuxType; import io.libp2p.transport.implementation.ConnectionOverNetty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -37,13 +40,14 @@ import java.util.Random; import java.util.concurrent.atomic.AtomicLong; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; -public class MplexFirewallTest { +public class MuxFirewallTest { private static final ChannelId DUMMY_CHANNEL_ID = DefaultChannelId.newInstance(); private final AtomicLong time = new AtomicLong(); - private final MplexFirewall firewall = new MplexFirewall(10, 20, time::get); + private final MuxFirewall firewall = new MuxFirewall(10, 20, time::get); private final ByteBuf data1K = Unpooled.buffer().writeBytes(new byte[1024]); private final EmbeddedChannel channel = new EmbeddedChannel(); private final Connection connectionOverNetty = @@ -58,13 +62,18 @@ void init() { .addLast( new ChannelInboundHandlerAdapter() { @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + public void channelRead(ChannelHandlerContext ctx, Object msg) { passedMessages.add(msg.toString()); } }); } - private void writeOneInbound(MuxFrame message) { + private enum MuxType { + MPLEX, + YAMUX + } + + private void writeOneInbound(Object message) { try { boolean res = channel.writeOneInbound(message).await(1000L); assertThat(res).isTrue(); @@ -73,13 +82,12 @@ private void writeOneInbound(MuxFrame message) { } } - @Test - void testThatDisconnectsOnRateLimitExceed() { + @ParameterizedTest + @EnumSource(MuxType.class) + void testThatDisconnectsOnRateLimitExceed(final MuxType muxType) { for (int i = 0; i < 20; i++) { - writeOneInbound( - new MuxFrame(new MuxId(DUMMY_CHANNEL_ID, i, true), Flag.OPEN, Unpooled.EMPTY_BUFFER)); - writeOneInbound( - new MuxFrame(new MuxId(DUMMY_CHANNEL_ID, i, true), Flag.CLOSE, Unpooled.EMPTY_BUFFER)); + writeOneInbound(createNewStreamFrame(muxType, i)); + writeOneInbound(createCloseReceiverFrame(muxType, i)); time.incrementAndGet(); } assertThat(channel.isOpen()).isFalse(); @@ -88,36 +96,32 @@ void testThatDisconnectsOnRateLimitExceed() { assertThat(passedMessages).hasSizeBetween(20, 22).doesNotHaveDuplicates(); } - @Test - void testThatDoesntDisconnectOnAllowedRate() { + @ParameterizedTest + @EnumSource(MuxType.class) + void testThatDoesNotDisconnectOnAllowedRate(final MuxType muxType) { for (int i = 0; i < 500; i++) { - writeOneInbound( - new MuxFrame(new MuxId(DUMMY_CHANNEL_ID, i, true), Flag.OPEN, Unpooled.EMPTY_BUFFER)); - writeOneInbound( - new MuxFrame(new MuxId(DUMMY_CHANNEL_ID, i, true), Flag.CLOSE, Unpooled.EMPTY_BUFFER)); + writeOneInbound(createNewStreamFrame(muxType, i)); + writeOneInbound(createCloseReceiverFrame(muxType, i)); time.addAndGet(112); // ~9 per sec } assertThat(channel.isOpen()).isTrue(); assertThat(passedMessages).hasSize(1000).doesNotHaveDuplicates(); } - @Test - void testThatDisconnectsOnExceededAfterAllowedRate() { + @ParameterizedTest + @EnumSource(MuxType.class) + void testThatDisconnectsOnExceededAfterAllowedRate(final MuxType muxType) { for (int i = 0; i < 100; i++) { - writeOneInbound( - new MuxFrame(new MuxId(DUMMY_CHANNEL_ID, i, true), Flag.OPEN, Unpooled.EMPTY_BUFFER)); - writeOneInbound( - new MuxFrame(new MuxId(DUMMY_CHANNEL_ID, i, true), Flag.CLOSE, Unpooled.EMPTY_BUFFER)); + writeOneInbound(createNewStreamFrame(muxType, i)); + writeOneInbound(createCloseReceiverFrame(muxType, i)); time.addAndGet(112); // ~9 per sec } assertThat(channel.isOpen()).isTrue(); assertThat(passedMessages).hasSize(200).doesNotHaveDuplicates(); for (int i = 100; i < 200; i++) { - writeOneInbound( - new MuxFrame(new MuxId(DUMMY_CHANNEL_ID, i, true), Flag.OPEN, Unpooled.EMPTY_BUFFER)); - writeOneInbound( - new MuxFrame(new MuxId(DUMMY_CHANNEL_ID, i, true), Flag.CLOSE, Unpooled.EMPTY_BUFFER)); + writeOneInbound(createNewStreamFrame(muxType, i)); + writeOneInbound(createCloseReceiverFrame(muxType, i)); time.addAndGet(80); // ~12 per sec } assertThat(channel.isOpen()).isFalse(); @@ -126,29 +130,27 @@ void testThatDisconnectsOnExceededAfterAllowedRate() { assertThat(passedMessages).hasSizeLessThan(220 + 2).doesNotHaveDuplicates(); } - @Test - void testThatDoesntDisconnectOnHighDataRate() { + @ParameterizedTest + @EnumSource(MuxType.class) + void testThatDoesntDisconnectOnHighDataRate(final MuxType muxType) { for (int i = 0; i < 500; i++) { - writeOneInbound( - new MuxFrame(new MuxId(DUMMY_CHANNEL_ID, i, true), Flag.OPEN, Unpooled.EMPTY_BUFFER)); + writeOneInbound(createNewStreamFrame(muxType, i)); for (int j = 0; j < 30; j++) { - writeOneInbound( - new MuxFrame(new MuxId(DUMMY_CHANNEL_ID, i, true), Flag.DATA, data1K.slice())); + writeOneInbound(createDataFrame(muxType, i)); } - writeOneInbound( - new MuxFrame(new MuxId(DUMMY_CHANNEL_ID, i, true), Flag.CLOSE, Unpooled.EMPTY_BUFFER)); + writeOneInbound(createCloseReceiverFrame(muxType, i)); time.addAndGet(112); // ~9 per sec } assertThat(channel.isOpen()).isTrue(); assertThat(passedMessages).hasSize(500 * (2 + 30)); } - @Test - void testThatDisconnectsOnExceedingParallelStreams() { + @ParameterizedTest + @EnumSource(MuxType.class) + void testThatDisconnectsOnExceedingParallelStreams(final MuxType muxType) { // opening 30 streams on normal open rate for (int i = 0; i < 30; i++) { - writeOneInbound( - new MuxFrame(new MuxId(DUMMY_CHANNEL_ID, i, true), Flag.OPEN, Unpooled.EMPTY_BUFFER)); + writeOneInbound(createNewStreamFrame(muxType, i)); time.addAndGet(112); // ~9 per sec } assertThat(channel.isOpen()).isFalse(); @@ -157,28 +159,25 @@ void testThatDisconnectsOnExceedingParallelStreams() { assertThat(passedMessages).hasSizeLessThan(20 + 1).doesNotHaveDuplicates(); } - @Test - void testThatDoesntDisconnectOnAllowedParallelStreams() { + @ParameterizedTest + @EnumSource(MuxType.class) + void testThatDoesntDisconnectOnAllowedParallelStreams(final MuxType muxType) { IntList openedIds = new IntArrayList(); for (int i = 0; i < 18; i++) { - writeOneInbound( - new MuxFrame(new MuxId(DUMMY_CHANNEL_ID, i, true), Flag.OPEN, Unpooled.EMPTY_BUFFER)); + writeOneInbound(createNewStreamFrame(muxType, i)); openedIds.add(i); time.addAndGet(112); // ~9 per sec } Random random = new Random(); for (int i = 18; i < 200; i++) { - writeOneInbound( - new MuxFrame(new MuxId(DUMMY_CHANNEL_ID, i, true), Flag.OPEN, Unpooled.EMPTY_BUFFER)); + writeOneInbound(createNewStreamFrame(muxType, i)); openedIds.add(i); IntLists.shuffle(openedIds, random); int toRemove = openedIds.removeInt(0); - writeOneInbound( - new MuxFrame( - new MuxId(DUMMY_CHANNEL_ID, toRemove, true), Flag.CLOSE, Unpooled.EMPTY_BUFFER)); + writeOneInbound(createCloseReceiverFrame(muxType, toRemove)); time.addAndGet(112); // ~9 per sec } @@ -186,16 +185,13 @@ void testThatDoesntDisconnectOnAllowedParallelStreams() { assertThat(passedMessages).hasSize(200 * 2 - 18).doesNotHaveDuplicates(); } - @Test - void testThatResetStreamFromLocalIsTracked() throws InterruptedException { + @ParameterizedTest + @EnumSource(MuxType.class) + void testThatResetStreamFromLocalIsTracked(final MuxType muxType) throws InterruptedException { for (int i = 0; i < 200; i++) { - writeOneInbound( - new MuxFrame(new MuxId(DUMMY_CHANNEL_ID, i, true), Flag.OPEN, Unpooled.EMPTY_BUFFER)); + writeOneInbound(createNewStreamFrame(muxType, i)); - channel - .writeAndFlush( - new MuxFrame(new MuxId(DUMMY_CHANNEL_ID, i, true), Flag.RESET, Unpooled.EMPTY_BUFFER)) - .await(1000); + channel.writeAndFlush(createResetInitiatorFrame(muxType, i)).await(1000); time.addAndGet(112); // ~9 per sec } @@ -206,14 +202,12 @@ void testThatResetStreamFromLocalIsTracked() throws InterruptedException { .doesNotHaveDuplicates(); } - @Test - void testThatResetStreamFromRemoteIsTracked() { + @ParameterizedTest + @EnumSource(MuxType.class) + void testThatResetStreamFromRemoteIsTracked(final MuxType muxType) { for (int i = 0; i < 200; i++) { - writeOneInbound( - new MuxFrame(new MuxId(DUMMY_CHANNEL_ID, i, true), Flag.OPEN, Unpooled.EMPTY_BUFFER)); - - writeOneInbound( - new MuxFrame(new MuxId(DUMMY_CHANNEL_ID, i, true), Flag.RESET, Unpooled.EMPTY_BUFFER)); + writeOneInbound(createNewStreamFrame(muxType, i)); + writeOneInbound(createResetReceiverFrame(muxType, i)); time.addAndGet(112); // ~9 per sec } @@ -221,19 +215,16 @@ void testThatResetStreamFromRemoteIsTracked() { assertThat(passedMessages).hasSize(200 * 2).doesNotHaveDuplicates(); } - @Test - void testThatDisconnectsOnLocalClose() throws InterruptedException { + @ParameterizedTest + @EnumSource(MuxType.class) + void testThatDisconnectsOnLocalClose(final MuxType muxType) throws InterruptedException { // CLOSE (unlike RESET) sent from local doesn't close the stream and still allows remote to // write // so it shouldn't affect the number of opened tracked streams for (int i = 0; i < 25; i++) { - writeOneInbound( - new MuxFrame(new MuxId(DUMMY_CHANNEL_ID, i, true), Flag.OPEN, Unpooled.EMPTY_BUFFER)); + writeOneInbound(createNewStreamFrame(muxType, i)); - channel - .writeAndFlush( - new MuxFrame(new MuxId(DUMMY_CHANNEL_ID, i, true), Flag.CLOSE, Unpooled.EMPTY_BUFFER)) - .await(1000); + channel.writeAndFlush(createCloseInitiatorFrame(muxType, i)).await(1000); time.addAndGet(112); // ~9 per sec } @@ -245,4 +236,58 @@ void testThatDisconnectsOnLocalClose() throws InterruptedException { .hasSizeBetween(20, 22) .doesNotHaveDuplicates(); } + + private Object createNewStreamFrame(final MuxType muxType, final long id) { + final MuxId muxId = createMuxId(id); + return switch (muxType) { + case MPLEX -> new MplexFrame(muxId, MplexFlag.NewStream, Unpooled.EMPTY_BUFFER); + case YAMUX -> new YamuxFrame(muxId, YamuxType.DATA, YamuxFlags.ACK, 0, Unpooled.EMPTY_BUFFER); + }; + } + + private Object createCloseInitiatorFrame(final MuxType muxType, final long id) { + final MuxId muxId = createMuxId(id); + return switch (muxType) { + case MPLEX -> new MplexFrame(muxId, MplexFlag.CloseInitiator, Unpooled.EMPTY_BUFFER); + case YAMUX -> new YamuxFrame(muxId, YamuxType.DATA, YamuxFlags.FIN, 0, Unpooled.EMPTY_BUFFER); + }; + } + + private Object createCloseReceiverFrame(final MuxType muxType, final long id) { + final MuxId muxId = createMuxId(id); + return switch (muxType) { + case MPLEX -> new MplexFrame(muxId, MplexFlag.CloseReceiver, Unpooled.EMPTY_BUFFER); + case YAMUX -> new YamuxFrame(muxId, YamuxType.DATA, YamuxFlags.FIN, 0, Unpooled.EMPTY_BUFFER); + }; + } + + private Object createDataFrame(final MuxType muxType, final long id) { + final MuxId muxId = createMuxId(id); + final ByteBuf slicedByteBuf = data1K.slice(); + return switch (muxType) { + case MPLEX -> new MplexFrame(muxId, MplexFlag.MessageReceiver, slicedByteBuf); + case YAMUX -> new YamuxFrame( + muxId, YamuxType.DATA, 0, slicedByteBuf.readableBytes(), slicedByteBuf); + }; + } + + private Object createResetInitiatorFrame(final MuxType muxType, final long id) { + final MuxId muxId = createMuxId(id); + return switch (muxType) { + case MPLEX -> new MplexFrame(muxId, MplexFlag.ResetInitiator, Unpooled.EMPTY_BUFFER); + case YAMUX -> new YamuxFrame(muxId, YamuxType.DATA, YamuxFlags.RST, 0, Unpooled.EMPTY_BUFFER); + }; + } + + private Object createResetReceiverFrame(final MuxType muxType, final long id) { + final MuxId muxId = createMuxId(id); + return switch (muxType) { + case MPLEX -> new MplexFrame(muxId, MplexFlag.ResetReceiver, Unpooled.EMPTY_BUFFER); + case YAMUX -> new YamuxFrame(muxId, YamuxType.DATA, YamuxFlags.RST, 0, Unpooled.EMPTY_BUFFER); + }; + } + + private MuxId createMuxId(final long id) { + return new MuxId(DUMMY_CHANNEL_ID, id, true); + } } diff --git a/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/PeerManagerTest.java b/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/PeerManagerTest.java index 0c473a2b3e4..6b749782d2b 100644 --- a/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/PeerManagerTest.java +++ b/networking/p2p/src/test/java/tech/pegasys/teku/networking/p2p/libp2p/PeerManagerTest.java @@ -146,7 +146,8 @@ public void shouldReportFailedConnectionsToReputationManager() { public void shouldReportSuccessfulConnectionsToReputationManager() { final Connection connection = mock(Connection.class); final Session secureSession = - new Session(PeerId.random(), PeerId.random(), EcdsaKt.generateEcdsaKeyPair().component2()); + new Session( + PeerId.random(), PeerId.random(), EcdsaKt.generateEcdsaKeyPair().component2(), null); when(connection.secureSession()).thenReturn(secureSession); when(connection.closeFuture()).thenReturn(new SafeFuture<>()); final Multiaddr multiaddr = Multiaddr.fromString("/ip4/127.0.0.1/tcp/9000"); diff --git a/services/beaconchain/build.gradle b/services/beaconchain/build.gradle index 4354d88fd4b..fdd1e4aedf2 100644 --- a/services/beaconchain/build.gradle +++ b/services/beaconchain/build.gradle @@ -40,5 +40,5 @@ dependencies { testImplementation testFixtures(project(':infrastructure:bls')) testImplementation testFixtures(project(':infrastructure:metrics')) - implementation 'io.libp2p:jvm-libp2p-minimal' + implementation 'io.libp2p:jvm-libp2p' } \ No newline at end of file diff --git a/teku/build.gradle b/teku/build.gradle index 3ec13ce858d..079540d695c 100644 --- a/teku/build.gradle +++ b/teku/build.gradle @@ -47,7 +47,7 @@ dependencies { implementation 'commons-io:commons-io' implementation 'com.squareup.okhttp3:okhttp' implementation 'info.picocli:picocli' - implementation 'io.libp2p:jvm-libp2p-minimal' + implementation 'io.libp2p:jvm-libp2p' implementation 'io.vertx:vertx-core' implementation 'org.apache.logging.log4j:log4j-slf4j-impl' implementation 'org.apache.logging.log4j:log4j-slf4j2-impl' diff --git a/teku/src/main/java/tech/pegasys/teku/cli/options/P2POptions.java b/teku/src/main/java/tech/pegasys/teku/cli/options/P2POptions.java index abffd683074..67d6ebcbab4 100644 --- a/teku/src/main/java/tech/pegasys/teku/cli/options/P2POptions.java +++ b/teku/src/main/java/tech/pegasys/teku/cli/options/P2POptions.java @@ -293,6 +293,15 @@ public class P2POptions { private boolean blsToExecutionChangesSubnetEnabled = P2PConfig.DEFAULT_BLS_TO_EXECUTION_CHANGES_SUBNET_ENABLED; + @Option( + names = {"--Xp2p-yamux-enabled"}, + paramLabel = "", + showDefaultValue = Visibility.ALWAYS, + description = "Enables yamux multiplexing", + arity = "0..1", + fallbackValue = "true") + private boolean yamuxEnabled = NetworkConfig.DEFAULT_YAMUX_ENABLED; + private int getP2pLowerBound() { if (p2pLowerBound > p2pUpperBound) { STATUS_LOG.adjustingP2pLowerBoundToUpperBound(p2pUpperBound); @@ -356,7 +365,8 @@ public void configure(final TekuConfiguration.Builder builder) { n.networkInterface(p2pInterface) .isEnabled(p2pEnabled) .listenPort(p2pPort) - .advertisedIp(Optional.ofNullable(p2pAdvertisedIp)); + .advertisedIp(Optional.ofNullable(p2pAdvertisedIp)) + .yamuxEnabled(yamuxEnabled); }) .sync( s ->