Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
tbenr committed Oct 23, 2024
1 parent ec10dc9 commit fb0f4a4
Show file tree
Hide file tree
Showing 16 changed files with 164 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import com.google.common.annotations.VisibleForTesting;
import java.util.Optional;
import java.util.function.Function;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.ssz.SszData;
Expand All @@ -34,13 +36,16 @@
import tech.pegasys.teku.storage.client.RecentChainData;

public abstract class AbstractGossipManager<T extends SszData> implements GossipManager {
private static final Logger LOG = LogManager.getLogger();

private final GossipNetwork gossipNetwork;
private final GossipEncoding gossipEncoding;

private final Eth2TopicHandler<T> topicHandler;

private Optional<TopicChannel> channel = Optional.empty();
private final GossipFailureLogger gossipFailureLogger;
private final Function<T, Optional<UInt64>> getSlotForMessage;

protected AbstractGossipManager(
final RecentChainData recentChainData,
Expand All @@ -51,6 +56,7 @@ protected AbstractGossipManager(
final ForkInfo forkInfo,
final OperationProcessor<T> processor,
final SszSchema<T> gossipType,
final Function<T, Optional<UInt64>> getSlotForMessage,
final Function<T, UInt64> getEpochForMessage,
final NetworkingSpecConfig networkingConfig,
final DebugDataDumper debugDataDumper) {
Expand All @@ -69,6 +75,8 @@ protected AbstractGossipManager(
networkingConfig,
debugDataDumper);
this.gossipEncoding = gossipEncoding;
this.gossipFailureLogger = new GossipFailureLogger(topicName.toString());
this.getSlotForMessage = getSlotForMessage;
}

@VisibleForTesting
Expand All @@ -77,11 +85,29 @@ public Eth2TopicHandler<T> getTopicHandler() {
}

protected void publishMessage(final T message) {
channel.ifPresent(c -> c.gossip(gossipEncoding.encode(message)).ifExceptionGetsHereRaiseABug());
publishMessageWithFeedback(message).ifExceptionGetsHereRaiseABug();
}

/**
* This method is designed to return a future that only completes successfully whenever the gossip
* was succeeded (sent to at least one peer) or failed.
*/
protected SafeFuture<Void> publishMessageWithFeedback(final T message) {
return channel.map(c -> c.gossip(gossipEncoding.encode(message))).orElse(SafeFuture.COMPLETE);
return channel
.map(c -> c.gossip(gossipEncoding.encode(message)))
.orElse(SafeFuture.failedFuture(new IllegalStateException("Gossip channel not available")))
.handle(
(__, err) -> {
if (err != null) {
gossipFailureLogger.logWithSuppression(err, getSlotForMessage.apply(message));
} else {
LOG.trace(
"Successfully gossiped message with root {} on {}",
message::hashTreeRoot,
topicHandler::getTopic);
}
return null;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package tech.pegasys.teku.networking.eth2.gossip;

import java.util.Optional;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding;
import tech.pegasys.teku.networking.eth2.gossip.topics.GossipTopicName;
Expand Down Expand Up @@ -51,6 +52,7 @@ public AggregateGossipManager(
spec.atEpoch(forkInfo.getFork().getEpoch())
.getSchemaDefinitions()
.getSignedAggregateAndProofSchema(),
message -> Optional.of(message.getMessage().getAggregate().getData().getSlot()),
message -> spec.computeEpochAtSlot(message.getMessage().getAggregate().getData().getSlot()),
spec.getNetworkingConfig(),
debugDataDumper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package tech.pegasys.teku.networking.eth2.gossip;

import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.hyperledger.besu.plugin.services.MetricsSystem;
Expand Down Expand Up @@ -62,7 +63,8 @@ public void onNewAttestation(final ValidatableAttestation validatableAttestation
attestationPublishSuccessCounter.inc();
},
error -> {
gossipFailureLogger.logWithSuppression(error, attestation.getData().getSlot());
gossipFailureLogger.logWithSuppression(
error, Optional.of(attestation.getData().getSlot()));
attestationPublishFailureCounter.inc();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package tech.pegasys.teku.networking.eth2.gossip;

import java.util.Optional;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding;
import tech.pegasys.teku.networking.eth2.gossip.topics.GossipTopicName;
Expand Down Expand Up @@ -46,6 +47,7 @@ public AttesterSlashingGossipManager(
spec.atEpoch(forkInfo.getFork().getEpoch())
.getSchemaDefinitions()
.getAttesterSlashingSchema(),
message -> Optional.of(message.getAttestation1().getData().getSlot()),
message -> spec.computeEpochAtSlot(message.getAttestation1().getData().getSlot()),
spec.getNetworkingConfig(),
debugDataDumper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@

package tech.pegasys.teku.networking.eth2.gossip;

import static tech.pegasys.teku.networking.eth2.gossip.topics.GossipTopicName.getBlobSidecarSubnetTopicName;

import com.google.common.annotations.VisibleForTesting;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.util.Optional;
import java.util.stream.IntStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding;
import tech.pegasys.teku.networking.eth2.gossip.topics.GossipTopicName;
import tech.pegasys.teku.networking.eth2.gossip.topics.OperationMilestoneValidator;
import tech.pegasys.teku.networking.eth2.gossip.topics.OperationProcessor;
import tech.pegasys.teku.networking.eth2.gossip.topics.topichandlers.Eth2TopicHandler;
Expand All @@ -40,11 +43,13 @@
import tech.pegasys.teku.storage.client.RecentChainData;

public class BlobSidecarGossipManager implements GossipManager {
private static final Logger LOG = LogManager.getLogger();

private final Spec spec;
private final GossipNetwork gossipNetwork;
private final GossipEncoding gossipEncoding;
private final Int2ObjectMap<Eth2TopicHandler<BlobSidecar>> subnetIdToTopicHandler;
private final GossipFailureLogger gossipFailureLogger;

private final Int2ObjectMap<TopicChannel> subnetIdToChannel = new Int2ObjectOpenHashMap<>();

Expand Down Expand Up @@ -81,25 +86,47 @@ public static BlobSidecarGossipManager create(
subnetIdToTopicHandler.put(subnetId, topicHandler);
});
return new BlobSidecarGossipManager(
spec, gossipNetwork, gossipEncoding, subnetIdToTopicHandler);
spec,
gossipNetwork,
gossipEncoding,
subnetIdToTopicHandler,
new GossipFailureLogger("blob sidecar"));
}

private BlobSidecarGossipManager(
final Spec spec,
final GossipNetwork gossipNetwork,
final GossipEncoding gossipEncoding,
final Int2ObjectMap<Eth2TopicHandler<BlobSidecar>> subnetIdToTopicHandler) {
final Int2ObjectMap<Eth2TopicHandler<BlobSidecar>> subnetIdToTopicHandler,
final GossipFailureLogger gossipFailureLogger) {
this.spec = spec;
this.gossipNetwork = gossipNetwork;
this.gossipEncoding = gossipEncoding;
this.subnetIdToTopicHandler = subnetIdToTopicHandler;
this.gossipFailureLogger = gossipFailureLogger;
}

/**
* This method is designed to return a future that only completes successfully whenever the gossip
* was succeeded (sent to at least one peer) or failed.
*/
public SafeFuture<Void> publishBlobSidecar(final BlobSidecar message) {
final int subnetId = spec.computeSubnetForBlobSidecar(message).intValue();
return Optional.ofNullable(subnetIdToChannel.get(subnetId))
.map(channel -> channel.gossip(gossipEncoding.encode(message)))
.orElse(SafeFuture.COMPLETE);
.orElse(SafeFuture.failedFuture(new IllegalStateException("Gossip channel not available")))
.handle(
(__, err) -> {
if (err != null) {
gossipFailureLogger.logWithSuppression(err, Optional.of(message.getSlot()));
} else {
LOG.trace(
"Successfully gossiped blob sidecar {} on {}",
() -> message.getSlotAndBlockRoot().toLogString(),
() -> getBlobSidecarSubnetTopicName(subnetId));
}
return null;
});
}

@VisibleForTesting
Expand Down Expand Up @@ -147,7 +174,7 @@ private static Eth2TopicHandler<BlobSidecar> createBlobSidecarTopicHandler(
new TopicSubnetIdAwareOperationProcessor(spec, subnetId, processor),
gossipEncoding,
forkInfo.getForkDigest(spec),
GossipTopicName.getBlobSidecarSubnetTopicName(subnetId),
getBlobSidecarSubnetTopicName(subnetId),
new OperationMilestoneValidator<>(
spec,
forkInfo.getFork(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package tech.pegasys.teku.networking.eth2.gossip;

import java.util.Optional;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding;
Expand Down Expand Up @@ -47,6 +48,7 @@ public BlockGossipManager(
spec.atEpoch(forkInfo.getFork().getEpoch())
.getSchemaDefinitions()
.getSignedBeaconBlockSchema(),
block -> Optional.of(block.getSlot()),
block -> spec.computeEpochAtSlot(block.getSlot()),
spec.getNetworkingConfig(),
debugDataDumper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.libp2p.core.SemiDuplexNoOutboundStreamException;
import io.libp2p.pubsub.MessageAlreadySeenException;
import io.libp2p.pubsub.NoPeersForOutboundMessageException;
import java.util.Optional;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -26,46 +27,55 @@ public class GossipFailureLogger {
private static final Logger LOG = LogManager.getLogger();

private final String messageType;
private UInt64 lastErroredSlot;
private Optional<UInt64> lastErroredSlot;
private Throwable lastRootCause;

public GossipFailureLogger(final String messageType) {
this.messageType = messageType;
}

public synchronized void logWithSuppression(final Throwable error, final UInt64 slot) {
public synchronized void logWithSuppression(
final Throwable error, final Optional<UInt64> maybeSlot) {
final Throwable rootCause = Throwables.getRootCause(error);

final boolean suppress =
slot.equals(lastErroredSlot) && rootCause.getClass().equals(lastRootCause.getClass());
final boolean suppress;
if (maybeSlot.isEmpty()) {
suppress = false;
} else {
suppress =
maybeSlot.equals(lastErroredSlot)
&& rootCause.getClass().equals(lastRootCause.getClass());
}

lastErroredSlot = slot;
lastErroredSlot = maybeSlot;
lastRootCause = rootCause;

final String slotLog = maybeSlot.map(slot -> " for slot " + slot).orElse("");

if (lastRootCause instanceof MessageAlreadySeenException) {
LOG.debug(
"Failed to publish {}(s) for slot {} because the message has already been seen",
"Failed to publish {}(s){} because the message has already been seen",
messageType,
lastErroredSlot);
slotLog);
} else if (lastRootCause instanceof NoPeersForOutboundMessageException) {
LOG.log(
suppress ? Level.DEBUG : Level.WARN,
"Failed to publish {}(s) for slot {}; {}",
"Failed to publish {}(s){}; {}",
messageType,
lastErroredSlot,
rootCause.getMessage());
slotLog,
lastRootCause.getMessage());
} else if (lastRootCause instanceof SemiDuplexNoOutboundStreamException) {
LOG.log(
suppress ? Level.DEBUG : Level.WARN,
"Failed to publish {}(s) for slot {} because no active outbound stream for the required gossip topic",
"Failed to publish {}(s){} because no active outbound stream for the required gossip topic",
messageType,
lastErroredSlot);
slotLog);
} else {
LOG.log(
suppress ? Level.DEBUG : Level.ERROR,
"Failed to publish {}(s) for slot {}",
"Failed to publish {}(s){}",
messageType,
lastErroredSlot,
slotLog,
error);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package tech.pegasys.teku.networking.eth2.gossip;

import java.util.Optional;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding;
import tech.pegasys.teku.networking.eth2.gossip.topics.GossipTopicName;
Expand Down Expand Up @@ -44,6 +45,7 @@ public ProposerSlashingGossipManager(
forkInfo,
processor,
ProposerSlashing.SSZ_SCHEMA,
message -> Optional.of(message.getHeader1().getMessage().getSlot()),
message ->
recentChainData
.getSpec()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package tech.pegasys.teku.networking.eth2.gossip;

import java.util.Optional;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding;
import tech.pegasys.teku.networking.eth2.gossip.topics.GossipTopicName;
Expand Down Expand Up @@ -47,6 +48,7 @@ public SignedBlsToExecutionChangeGossipManager(
forkInfo,
processor,
schemaDefinitions.getSignedBlsToExecutionChangeSchema(),
message -> Optional.empty(),
// BLS changes don't have a fork they apply to so are always considered to match the fork
// of the topic they arrived on (ie disable fork checking at this level)
message -> forkInfo.getFork().getEpoch(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package tech.pegasys.teku.networking.eth2.gossip;

import java.util.Optional;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding;
import tech.pegasys.teku.networking.eth2.gossip.topics.GossipTopicName;
Expand Down Expand Up @@ -47,6 +48,7 @@ public SignedContributionAndProofGossipManager(
forkInfo,
processor,
schemaDefinitions.getSignedContributionAndProofSchema(),
message -> Optional.of(message.getMessage().getContribution().getSlot()),
message ->
recentChainData
.getSpec()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private void publish(final SyncCommitteeMessage message, final int subnetId) {
publishSuccessCounter.inc();
},
error -> {
gossipFailureLogger.logWithSuppression(error, message.getSlot());
gossipFailureLogger.logWithSuppression(error, Optional.of(message.getSlot()));
publishFailureCounter.inc();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package tech.pegasys.teku.networking.eth2.gossip;

import java.util.Optional;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding;
import tech.pegasys.teku.networking.eth2.gossip.topics.GossipTopicName;
Expand Down Expand Up @@ -44,6 +45,7 @@ public VoluntaryExitGossipManager(
forkInfo,
processor,
SignedVoluntaryExit.SSZ_SCHEMA,
exit -> Optional.empty(),
exit -> exit.getMessage().getEpoch(),
networkingConfig,
debugDataDumper);
Expand Down
Loading

0 comments on commit fb0f4a4

Please sign in to comment.