Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FullSync Future should stop when total terminal difficulty is reached #3423

Merged
merged 9 commits into from
Mar 2, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.hyperledger.besu.ethereum.eth.sync.DefaultSynchronizer;
import org.hyperledger.besu.ethereum.eth.sync.SyncMode;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
Expand Down Expand Up @@ -371,7 +372,8 @@ public BesuController build() {
syncState,
dataDirectory,
clock,
metricsSystem);
metricsSystem,
getFullSyncTerminationCondition(protocolContext.getBlockchain()));

final MiningCoordinator miningCoordinator =
createMiningCoordinator(
Expand Down Expand Up @@ -416,6 +418,14 @@ public BesuController build() {
additionalPluginServices);
}

protected SyncTerminationCondition getFullSyncTerminationCondition(final Blockchain blockchain) {
return genesisConfig
.getConfigOptions()
.getTerminalTotalDifficulty()
.map(difficulty -> SyncTerminationCondition.difficulty(difficulty, blockchain))
.orElse(SyncTerminationCondition.never());
}

protected void prepForBuild() {}

protected JsonRpcMethods createAdditionalJsonRpcMethodFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
import org.hyperledger.besu.plugin.services.BesuEvents;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

/** Provides an interface to block synchronization processes. */
public interface Synchronizer {

// Default tolerance used to determine whether or not this node is "in sync"
long DEFAULT_IN_SYNC_TOLERANCE = 5;

void start();
CompletableFuture<Void> start();

void stop();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,16 @@ public Optional<MessageData> dispatch(final EthMessage ethMessage) {
messageResponseConstructor.response(ethMessage.getData()));
}

public void subscribe(final int messageCode, final MessageCallback callback) {
listenersByCode.computeIfAbsent(messageCode, key -> Subscribers.create()).subscribe(callback);
public long subscribe(final int messageCode, final MessageCallback callback) {
return listenersByCode
.computeIfAbsent(messageCode, key -> Subscribers.create())
.subscribe(callback);
}

public void unsubsribe(final long id) {
for (Subscribers<MessageCallback> subscribers : listenersByCode.values()) {
subscribers.unsubscribe(id);
}
}

public void registerResponseConstructor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ public class BlockPropagationManager {
private final Set<Long> requestedNonAnnouncedBlocks =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final PendingBlocksManager pendingBlocksManager;
private Optional<Long> onBlockAddedSId = Optional.empty();
private Optional<Long> newBlockSId;
private Optional<Long> newBlockHashesSId;

BlockPropagationManager(
final SynchronizerConfiguration config,
Expand Down Expand Up @@ -111,12 +114,37 @@ public void start() {
}
}

public void stop() {
if (started.get()) {
clearListeners();
started.set(false);
} else {
LOG.warn("Attempted to stop when we are not even running...");
}
}

private void setupListeners() {
protocolContext.getBlockchain().observeBlockAdded(this::onBlockAdded);
ethContext.getEthMessages().subscribe(EthPV62.NEW_BLOCK, this::handleNewBlockFromNetwork);
ethContext
.getEthMessages()
.subscribe(EthPV62.NEW_BLOCK_HASHES, this::handleNewBlockHashesFromNetwork);
onBlockAddedSId =
Optional.of(protocolContext.getBlockchain().observeBlockAdded(this::onBlockAdded));
newBlockSId =
Optional.of(
ethContext
.getEthMessages()
.subscribe(EthPV62.NEW_BLOCK, this::handleNewBlockFromNetwork));
newBlockHashesSId =
Optional.of(
ethContext
.getEthMessages()
.subscribe(EthPV62.NEW_BLOCK_HASHES, this::handleNewBlockHashesFromNetwork));
}

private void clearListeners() {
onBlockAddedSId.ifPresent(id -> protocolContext.getBlockchain().removeObserver(id));
newBlockSId.ifPresent(id -> ethContext.getEthMessages().unsubsribe(id));
newBlockHashesSId.ifPresent(id -> ethContext.getEthMessages().unsubsribe(id));
onBlockAddedSId = Optional.empty();
newBlockSId = Optional.empty();
newBlockHashesSId = Optional.empty();
}

private void onBlockAdded(final BlockAddedEvent blockAddedEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition;

import java.time.Duration;
import java.util.ArrayDeque;
Expand All @@ -45,6 +46,7 @@ public class CheckpointRangeSource implements Iterator<CheckpointRange> {
private final EthScheduler ethScheduler;
private final int checkpointTimeoutsPermitted;
private final Duration newHeaderWaitDuration;
private final SyncTerminationCondition terminationCondition;

private final Queue<CheckpointRange> retrievedRanges = new ArrayDeque<>();
private BlockHeader lastRangeEnd;
Expand All @@ -59,15 +61,17 @@ public CheckpointRangeSource(
final EthScheduler ethScheduler,
final EthPeer peer,
final BlockHeader commonAncestor,
final int checkpointTimeoutsPermitted) {
final int checkpointTimeoutsPermitted,
final SyncTerminationCondition terminationCondition) {
this(
checkpointFetcher,
syncTargetChecker,
ethScheduler,
peer,
commonAncestor,
checkpointTimeoutsPermitted,
Duration.ofSeconds(5));
Duration.ofSeconds(5),
terminationCondition);
}

CheckpointRangeSource(
Expand All @@ -77,22 +81,25 @@ public CheckpointRangeSource(
final EthPeer peer,
final BlockHeader commonAncestor,
final int checkpointTimeoutsPermitted,
final Duration newHeaderWaitDuration) {
final Duration newHeaderWaitDuration,
final SyncTerminationCondition terminationCondition) {
this.checkpointFetcher = checkpointFetcher;
this.syncTargetChecker = syncTargetChecker;
this.ethScheduler = ethScheduler;
this.peer = peer;
this.lastRangeEnd = commonAncestor;
this.checkpointTimeoutsPermitted = checkpointTimeoutsPermitted;
this.newHeaderWaitDuration = newHeaderWaitDuration;
this.terminationCondition = terminationCondition;
}

@Override
public boolean hasNext() {
return !retrievedRanges.isEmpty()
|| (requestFailureCount < checkpointTimeoutsPermitted
&& syncTargetChecker.shouldContinueDownloadingFromSyncTarget(peer, lastRangeEnd)
&& !reachedEndOfCheckpoints);
return terminationCondition.shouldContinueDownload()
&& (!retrievedRanges.isEmpty()
|| (requestFailureCount < checkpointTimeoutsPermitted
&& syncTargetChecker.shouldContinueDownloadingFromSyncTarget(peer, lastRangeEnd)
&& !reachedEndOfCheckpoints));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastDownloaderFactory;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncDownloader;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncException;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
import org.hyperledger.besu.ethereum.eth.sync.fullsync.FullSyncDownloader;
import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition;
import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocksManager;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
Expand All @@ -33,11 +33,11 @@
import org.hyperledger.besu.plugin.data.SyncStatus;
import org.hyperledger.besu.plugin.services.BesuEvents.SyncStatusListener;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.util.ExceptionUtils;

import java.nio.file.Path;
import java.time.Clock;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
Expand Down Expand Up @@ -66,7 +66,8 @@ public DefaultSynchronizer(
final SyncState syncState,
final Path dataDirectory,
final Clock clock,
final MetricsSystem metricsSystem) {
final MetricsSystem metricsSystem,
final SyncTerminationCondition terminationCondition) {
this.maybePruner = maybePruner;
this.syncState = syncState;

Expand All @@ -91,7 +92,13 @@ public DefaultSynchronizer(

this.fullSyncDownloader =
new FullSyncDownloader(
syncConfig, protocolSchedule, protocolContext, ethContext, syncState, metricsSystem);
syncConfig,
protocolSchedule,
protocolContext,
ethContext,
syncState,
metricsSystem,
terminationCondition);
this.fastSyncDownloader =
FastDownloaderFactory.create(
syncConfig,
Expand Down Expand Up @@ -123,25 +130,25 @@ private TrailingPeerRequirements calculateTrailingPeerRequirements() {
}

@Override
public void start() {
public CompletableFuture<Void> start() {
if (running.compareAndSet(false, true)) {
LOG.info("Starting synchronizer.");
blockPropagationManager.start();
CompletableFuture<Void> future;
if (fastSyncDownloader.isPresent()) {
fastSyncDownloader
.get()
.start()
.whenComplete(this::handleFastSyncResult)
.exceptionally(
ex -> {
LOG.warn("Exiting FastSync process");
System.exit(0);
return null;
});
future = fastSyncDownloader.get().start().thenCompose(this::handleFastSyncResult);

} else {
startFullSync();
future = startFullSync();
}
future =
future.thenApply(
unused -> {
blockPropagationManager.stop();
running.set(false);
return null;
});
return future;
} else {
throw new IllegalStateException("Attempt to start an already started synchronizer.");
}
Expand All @@ -154,6 +161,7 @@ public void stop() {
fastSyncDownloader.ifPresent(FastSyncDownloader::stop);
fullSyncDownloader.stop();
maybePruner.ifPresent(Pruner::stop);
blockPropagationManager.stop();
}
}

Expand All @@ -164,36 +172,37 @@ public void awaitStop() throws InterruptedException {
}
}

private void handleFastSyncResult(final FastSyncState result, final Throwable error) {
private CompletableFuture<Void> handleFastSyncResult(final FastSyncState result) {
if (!running.get()) {
// We've been shutdown which will have triggered the fast sync future to complete
return;
return CompletableFuture.completedFuture(null);
}
fastSyncDownloader.ifPresent(FastSyncDownloader::deleteFastSyncState);
final Throwable rootCause = ExceptionUtils.rootCause(error);
if (rootCause instanceof FastSyncException) {
LOG.error(
"Fast sync failed ({}), please try again.", ((FastSyncException) rootCause).getError());
throw new FastSyncException(rootCause);
} else if (error != null) {
LOG.error("Fast sync failed, please try again.", error);
throw new FastSyncException(error);
} else {
result
.getPivotBlockHeader()
.ifPresent(
blockHeader ->
protocolContext.getWorldStateArchive().setArchiveStateUnSafe(blockHeader));
LOG.info(
"Fast sync completed successfully with pivot block {}",
result.getPivotBlockNumber().getAsLong());
}
startFullSync();
result
.getPivotBlockHeader()
.ifPresent(
blockHeader ->
protocolContext.getWorldStateArchive().setArchiveStateUnSafe(blockHeader));
LOG.info(
"Fast sync completed successfully with pivot block {}",
result.getPivotBlockNumber().getAsLong());
return startFullSync();
}

private void startFullSync() {
private CompletableFuture<Void> startFullSync() {
maybePruner.ifPresent(Pruner::start);
fullSyncDownloader.start();
return fullSyncDownloader
.start()
.thenCompose(
unused -> {
maybePruner.ifPresent(Pruner::stop);
return null;
})
.thenApply(
o -> {
maybePruner.ifPresent(Pruner::stop);
return null;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ private synchronized CompletionStage<Void> startDownloadForSyncTarget(final Sync
return CompletableFuture.failedFuture(
new CancellationException("Chain download was cancelled"));
}
if (!syncTargetManager.shouldContinueDownloading()) {
return CompletableFuture.completedFuture(null);
}
syncState.setSyncTarget(target.peer(), target.commonAncestor());
currentDownloadPipeline = downloadPipelineFactory.createDownloadPipelineForSyncTarget(target);
debugLambda(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.hyperledger.besu.ethereum.eth.sync.DownloadHeadersStep;
import org.hyperledger.besu.ethereum.eth.sync.DownloadPipelineFactory;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.fullsync.SyncTerminationCondition;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncTarget;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.metrics.BesuMetricCategory;
Expand Down Expand Up @@ -105,7 +106,8 @@ public Pipeline<CheckpointRange> createDownloadPipelineForSyncTarget(final SyncT
ethContext.getScheduler(),
target.peer(),
target.commonAncestor(),
syncConfig.getDownloaderCheckpointTimeoutsPermitted());
syncConfig.getDownloaderCheckpointTimeoutsPermitted(),
SyncTerminationCondition.never());
final DownloadHeadersStep downloadHeadersStep =
new DownloadHeadersStep(
protocolSchedule,
Expand Down
Loading