Skip to content

Commit

Permalink
7311 add get headers from peer task (#7781)
Browse files Browse the repository at this point in the history
* 7311: spotless

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix broken BesuCommandTest

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: add class

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Move PeerTaskFeatureToggle to more appropriate location

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: add X prefix to peertask-system-enabled

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Move --Xpeertask-system-enabled out of BesuCommand and make hidden

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: spotless

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Add GetReceiptsFromPeerTask

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Move isPeerTaskSystemEnabled to SynchronizerOptions

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix javadoc issue

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix javadoc issue

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Move PeerTaskFeatureToggleTestHelper to TestUtil and fix RunnerTest

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: spotless

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Remove PeerTaskFeatureToggle in favor of including isPeerTaskSystemEnabled in SynchronizerConfiguration

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Adjust to the removal of PeerTaskFeatureToggle and use SynchronizerConfiguration to get the toggle instead

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Reduce timeout in PeerTaskRequestSender to 5s

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Refactor PeerManager to be an interface

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix up compile errors after merge

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix MetricsAcceptanceTest

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix MetricsAcceptanceTest

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix DownloadReceiptsStep when using peer task system

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Rename PeerManager to PeerSelector

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Reword PeerSelector javadoc to avoid implementation details

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Use ConcurrentHashMap in DefaultPeerSelector

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Reword trace log in DefaultPeerSelector

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Remove unused imports

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Use a 1 second delay between retries in PeerTaskExecutor to match old implementation

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Add testGetPeerButNoPeerMatchesFilter to DefaultPeerSelectorTest

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Add testGetPeerButNoPeerMatchesFilter to DefaultPeerSelectorTest

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: spotless

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix MetricsAcceptanceTest

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix MetricsAcceptanceTest

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Modify PeerTaskExecutor metric to include response time from peer

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Use SubProtocol instead of subprotocol name string in PeerTask

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: rename timing context to ignored to prevent intellij warnings

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Use constants for number of retries

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Convert PeerTaskExecutorResult to a record

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Rename PeerTaskBehavior to PeerTaskRetryBehavior

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Move peer selection logic to PeerSelector

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: spotless

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix up everything broken after merge

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Attempt to improve performance of peer task system in pipeline

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: fix compile check

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix broken workflow

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Reduce logging in JsonRpcExecutor to trace level

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: More changes in DownloadReceiptsStep

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Rework DownloadReceiptsStep

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Make changes as discussed in walkthrough meeting

Remove DefaultPeerSelector, make EthPeers implement PeerSelector interface, and add PeerTask.getPeerRequirementFilter

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Update after merge and make discussed changes from walkthrough discussion

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Change to regular HashMap

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Remove runtime exception again

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Rename getPeerTaskBehavior to getPeerTaskRetryBehavior

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Rename getPeerTaskBehavior to getPeerTaskRetryBehavior

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Rework PeerTaskExecutor retry system to be 0-based

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix up compile errors after merge

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix broken DownloadReceiptsStepTest test

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Move GetReceipts to services worker for parallelism

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Refactor peer task system usage in DownloadReceiptsStep to better match old system

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Remove unused async methods in PeerTaskExecutor

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Return Optional<EthPeer> in PeerSelector.getPeer and utilise existing peer selection behavior in EthPeers

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Update after merge

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Redo getPeer again to include hasAvailableRequestCapacity check

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Add protocol spec supplier to GetReceiptsFromPeerTask

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Rework getPeer again to use LEAST_TO_MOST_BUSY comparator

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Import PeerNotConnected class instead of using fully qualified class name

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Change to specifying retry counts in PeerTask instead of behavior enums

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: clean up after merge

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: clean up after merge

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix up javadoc

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Add additional metrics to PeerTaskExecutor

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Add Predicate to PeerTask to check for partial success

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix incorrect name on isPartialSuccessTest

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Implement isPartialSuccess and add unit tests

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Add partialSuccessCounter and inflightRequestGauge in PeerTaskExecutor

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Also filter by whether a peer is fully validated

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Remove unneeded throws in RunnerTest

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix up inflight requests gauge in PeerTaskExecutor

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Update plugin api hash

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Update plugin api hash

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Add javadoc to LabelledGauge.isLabelsObserved

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Update plugin-api hash

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Update changelog

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Implement GetHeadersFromPeerTask and use in DetermineCommonAncestorTask

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Handle headers with no receipts as a special case in DownloadReceiptsStep

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Complete merge

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Get DetermineCommonAncestorTask working with peer task system

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Use taskName instead of className for labelNames

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Use snake_case for metric names

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Use _total metric name suffix

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: rework partial success handling

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Update GetReceiptsFromPeerTask with partialSuccess changes

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Update GetHeadersFromPeerTask with partialSuccess changes

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Add default implementation to LabelledGauge.isLabelsObserved

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Use Peer task systems GetHeadersFromPeerTask in GetBlockFromPeerTask

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix broken unit test

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Remove unused constructor from AbstractPeerBlockValidator

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Use GetHeadersFromPeerTask in AbstractPeerBlockValidator

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Use peer task executor in SyncTargetManager

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix javadoc on BesuControllerBuilder

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Remove logs used to confirm operation

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Implement GetHeadersFromPeerTask in FastSyncActions and PivotBlockConfirmer

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Rename parseResponse to processResponse

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Wrap peer task system usage in ethScheduler call to match other usages

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: apply spotless

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Move check for empty trie hash into GetReceiptsFromPeerTask and update unit test to test for this functionality

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: spotless

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix compile issue after merge

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix compile issue after merge

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Remove BodyValidator and update code and test to match

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Implement GetHeadersForPeerTask usage in DownloadHeadersStep

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: spotless

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: remove unneeded logs

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: spotless

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix up pre-fill and add test to test failure scenario

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Use ProtocolSchedule.anyMatch to find if any ProtocolSpecs are PoS, remove new usages of currentProtocolSpecSupplier

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Only attempt to remove headers on successful requests

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: clean up after merge

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: clean up after merge

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Use peer task system in RangeHeadersFetcher

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Use peer task system in DownloadHeaderSequenceTask

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix GetHeadersFromPeerTask mocking in CheckPointSyncChainDownloaderTest

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Extract peer task executor answer for getHeaders to separate class for reuse in tests

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: spotless

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Implement peer task system usage in BackwardSyncStep

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Implement peer task system usage in ChainHeadTracker

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Implement peer task system usage in PivotSelectorFromSafeBlock and improve logging

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Implement unit test for GetHeadersFromPeerTask

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix up merge compile error

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Ensure FastSyncActions and PivotSelectorFromSafeBlock retry getting headers for all peers, matching RetryingGetHeaderFromPeerByHashTask

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Change PeerTaskExecutorResult.ethPeer to an Optional

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Use CancellationException instead of InterruptedException in PivotBlockConfirmer

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Use PivotBlockRetriever.MAX_QUERY_RETRIES_PER_PEER to set retries for GetHeadersFromPeerTask

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: spotless

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Add PeerTask.shouldDisconnectPeer and ensure functionality matches old code

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Remove old info logs

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix broken test by correctly including peer in PeerTaskExecutorResults in test classes

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix incorrect equality tests

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix broken test

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: spotless

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Move PeerTaskExecutor into EthContext to reduce plumbing changes

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: spotless

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Remove protocol check from GetHeadersFromPeerTask.getPeerRequirementFilter

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix broken test

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix broken integration test

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Refactor peer task validation

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Refactor peer task validation

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Use peer count for retry count when getting headers in BackwardSyncStep, FastSyncActions, and PivotSelectorFromSafeBlock

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: spotless

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Move chainstate update into GetHeadersFromPeerTask.postProcessResult

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Fix compile errors

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

* 7311: Update after merge

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>

---------

Signed-off-by: Matilda Clerke <matilda.clerke@consensys.net>
Signed-off-by: Matilda-Clerke <matilda.clerke@consensys.net>
Co-authored-by: Sally MacFarlane <macfarla.github@gmail.com>
  • Loading branch information
Matilda-Clerke and macfarla authored Dec 11, 2024
1 parent 4435f75 commit 657efff
Show file tree
Hide file tree
Showing 75 changed files with 2,812 additions and 545 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -690,9 +690,10 @@ public BesuController build() {
.build());
}

final EthContext ethContext = new EthContext(ethPeers, ethMessages, snapMessages, scheduler);
final PeerTaskExecutor peerTaskExecutor =
new PeerTaskExecutor(ethPeers, new PeerTaskRequestSender(), metricsSystem);
final EthContext ethContext =
new EthContext(ethPeers, ethMessages, snapMessages, scheduler, peerTaskExecutor);
final boolean fullSyncDisabled = !SyncMode.isFullSync(syncConfig.getSyncMode());
final SyncState syncState = new SyncState(blockchain, ethPeers, fullSyncDisabled, checkpoint);

Expand All @@ -718,7 +719,8 @@ public BesuController build() {
besuComponent.map(BesuComponent::getBlobCache).orElse(new BlobCache()),
miningConfiguration);

final List<PeerValidator> peerValidators = createPeerValidators(protocolSchedule);
final List<PeerValidator> peerValidators =
createPeerValidators(protocolSchedule, peerTaskExecutor);

final EthProtocolManager ethProtocolManager =
createEthProtocolManager(
Expand Down Expand Up @@ -947,6 +949,7 @@ private PivotBlockSelector createPivotSelector(
ethContext,
metricsSystem,
genesisConfigOptions,
syncConfig,
unverifiedForkchoiceSupplier,
unsubscribeForkchoiceListener);
} else {
Expand Down Expand Up @@ -1179,29 +1182,42 @@ private ChainDataPruner createChainPruner(final BlockchainStorage blockchainStor
* Create peer validators list.
*
* @param protocolSchedule the protocol schedule
* @param peerTaskExecutor the peer task executor
* @return the list
*/
protected List<PeerValidator> createPeerValidators(final ProtocolSchedule protocolSchedule) {
protected List<PeerValidator> createPeerValidators(
final ProtocolSchedule protocolSchedule, final PeerTaskExecutor peerTaskExecutor) {
final List<PeerValidator> validators = new ArrayList<>();

final OptionalLong daoBlock = genesisConfigOptions.getDaoForkBlock();
if (daoBlock.isPresent()) {
// Setup dao validator
validators.add(
new DaoForkPeerValidator(protocolSchedule, metricsSystem, daoBlock.getAsLong()));
new DaoForkPeerValidator(
protocolSchedule, peerTaskExecutor, syncConfig, metricsSystem, daoBlock.getAsLong()));
}

final OptionalLong classicBlock = genesisConfigOptions.getClassicForkBlock();
// setup classic validator
if (classicBlock.isPresent()) {
validators.add(
new ClassicForkPeerValidator(protocolSchedule, metricsSystem, classicBlock.getAsLong()));
new ClassicForkPeerValidator(
protocolSchedule,
peerTaskExecutor,
syncConfig,
metricsSystem,
classicBlock.getAsLong()));
}

for (final Map.Entry<Long, Hash> requiredBlock : requiredBlocks.entrySet()) {
validators.add(
new RequiredBlocksPeerValidator(
protocolSchedule, metricsSystem, requiredBlock.getKey(), requiredBlock.getValue()));
protocolSchedule,
peerTaskExecutor,
syncConfig,
metricsSystem,
requiredBlock.getKey(),
requiredBlock.getValue()));
}

final CheckpointConfigOptions checkpointConfigOptions =
Expand All @@ -1210,6 +1226,8 @@ protected List<PeerValidator> createPeerValidators(final ProtocolSchedule protoc
validators.add(
new CheckpointBlocksPeerValidator(
protocolSchedule,
peerTaskExecutor,
syncConfig,
metricsSystem,
checkpointConfigOptions.getNumber().orElseThrow(),
checkpointConfigOptions.getHash().map(Hash::fromHexString).orElseThrow()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.RequiredBlocksPeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
Expand Down Expand Up @@ -79,6 +80,7 @@ protected MiningCoordinator createMiningCoordinator(
new BackwardSyncContext(
protocolContext,
protocolSchedule,
syncConfig,
metricsSystem,
ethProtocolManager.ethContext(),
syncState,
Expand Down Expand Up @@ -235,14 +237,17 @@ protected PluginServiceFactory createAdditionalPluginServices(
}

@Override
protected List<PeerValidator> createPeerValidators(final ProtocolSchedule protocolSchedule) {
List<PeerValidator> retval = super.createPeerValidators(protocolSchedule);
protected List<PeerValidator> createPeerValidators(
final ProtocolSchedule protocolSchedule, final PeerTaskExecutor peerTaskExecutor) {
List<PeerValidator> retval = super.createPeerValidators(protocolSchedule, peerTaskExecutor);
final OptionalLong powTerminalBlockNumber = genesisConfigOptions.getTerminalBlockNumber();
final Optional<Hash> powTerminalBlockHash = genesisConfigOptions.getTerminalBlockHash();
if (powTerminalBlockHash.isPresent() && powTerminalBlockNumber.isPresent()) {
retval.add(
new RequiredBlocksPeerValidator(
protocolSchedule,
peerTaskExecutor,
syncConfig,
metricsSystem,
powTerminalBlockNumber.getAsLong(),
powTerminalBlockHash.get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ protected MiningCoordinator createMiningCoordinator(
new TransitionBackwardSyncContext(
protocolContext,
transitionProtocolSchedule,
syncConfig,
metricsSystem,
ethProtocolManager.ethContext(),
syncState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider;
import org.hyperledger.besu.ethereum.core.MiningConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.mainnet.BlockHeaderValidator;
Expand Down Expand Up @@ -73,6 +74,7 @@ public class TransitionControllerBuilderTest {
@Mock ProtocolContext protocolContext;
@Mock MutableBlockchain mockBlockchain;
@Mock TransactionPool transactionPool;
@Mock PeerTaskExecutor peerTaskExecutor;
@Mock SyncState syncState;

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardChain;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
Expand All @@ -43,13 +44,15 @@ public class TransitionBackwardSyncContext extends BackwardSyncContext {
public TransitionBackwardSyncContext(
final ProtocolContext protocolContext,
final TransitionProtocolSchedule transitionProtocolSchedule,
final SynchronizerConfiguration synchronizerConfiguration,
final MetricsSystem metricsSystem,
final EthContext ethContext,
final SyncState syncState,
final StorageProvider storageProvider) {
super(
protocolContext,
transitionProtocolSchedule,
synchronizerConfiguration,
metricsSystem,
ethContext,
syncState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/
package org.hyperledger.besu.ethereum.eth.manager;

import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;

import java.util.Optional;

public class EthContext {
Expand All @@ -22,24 +24,31 @@ public class EthContext {
private final EthMessages ethMessages;
private final Optional<EthMessages> snapMessages;
private final EthScheduler scheduler;
private final PeerTaskExecutor peerTaskExecutor;

public EthContext(
final EthPeers ethPeers,
final EthMessages ethMessages,
final EthMessages snapMessages,
final EthScheduler scheduler) {
final EthScheduler scheduler,
final PeerTaskExecutor peerTaskExecutor) {
this.ethPeers = ethPeers;
this.ethMessages = ethMessages;
this.snapMessages = Optional.of(snapMessages);
this.scheduler = scheduler;
this.peerTaskExecutor = peerTaskExecutor;
}

public EthContext(
final EthPeers ethPeers, final EthMessages ethMessages, final EthScheduler scheduler) {
final EthPeers ethPeers,
final EthMessages ethMessages,
final EthScheduler scheduler,
final PeerTaskExecutor peerTaskExecutor) {
this.ethPeers = ethPeers;
this.ethMessages = ethMessages;
this.snapMessages = Optional.empty();
this.scheduler = scheduler;
this.peerTaskExecutor = peerTaskExecutor;
}

public EthPeers getEthPeers() {
Expand All @@ -57,4 +66,8 @@ public Optional<EthMessages> getSnapMessages() {
public EthScheduler getScheduler() {
return scheduler;
}

public PeerTaskExecutor getPeerTaskExecutor() {
return peerTaskExecutor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ public InvalidPeerTaskResponseException() {
super();
}

public InvalidPeerTaskResponseException(final String message) {
super(message);
}

public InvalidPeerTaskResponseException(final Throwable cause) {
super(cause);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,13 @@ default int getRetriesWithSamePeer() {
Predicate<EthPeer> getPeerRequirementFilter();

/**
* Checks if the supplied result is considered a success
* Performs a high level check of the results, returning a PeerTaskValidationResponse to describe
* the result of the check
*
* @return true if the supplied result is considered a success
* @param result The results of the PeerTask, as returned by processResponse
* @return a PeerTaskValidationResponse to describe the result of the check
*/
boolean isSuccess(T result);
PeerTaskValidationResponse validateResult(T result);

default void postProcessResult(final PeerTaskExecutorResult<T> result) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Manages the execution of PeerTasks, respecting their PeerTaskRetryBehavior */
public class PeerTaskExecutor {
private static final Logger LOG = LoggerFactory.getLogger(PeerTaskExecutor.class);

private final PeerSelector peerSelector;
private final PeerTaskRequestSender requestSender;
Expand Down Expand Up @@ -98,8 +101,8 @@ public <T> PeerTaskExecutorResult<T> execute(final PeerTask<T> peerTask) {
if (peer.isEmpty()) {
executorResult =
new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.NO_PEER_AVAILABLE);
continue;
Optional.empty(), PeerTaskExecutorResponseCode.NO_PEER_AVAILABLE, Optional.empty());
break;
}
usedEthPeers.add(peer.get());
executorResult = executeAgainstPeer(peerTask, peer.get());
Expand Down Expand Up @@ -138,43 +141,59 @@ public <T> PeerTaskExecutorResult<T> executeAgainstPeer(
inflightRequestCountForThisTaskClass.decrementAndGet();
}

if (peerTask.isSuccess(result)) {
PeerTaskValidationResponse validationResponse = peerTask.validateResult(result);
if (validationResponse == PeerTaskValidationResponse.RESULTS_VALID_AND_GOOD) {
peer.recordUsefulResponse();
executorResult =
new PeerTaskExecutorResult<>(
Optional.ofNullable(result), PeerTaskExecutorResponseCode.SUCCESS);
Optional.ofNullable(result),
PeerTaskExecutorResponseCode.SUCCESS,
Optional.of(peer));
peerTask.postProcessResult(executorResult);
} else {
// At this point, the result is most likely empty. Technically, this is a valid result, so
// we don't penalise the peer, but it's also a useless result, so we return
// INVALID_RESPONSE code
LOG.debug(
"Invalid response found for {} from peer {}", taskClassName, peer.getLoggableId());
validationResponse
.getDisconnectReason()
.ifPresent((disconnectReason) -> peer.disconnect(disconnectReason));
executorResult =
new PeerTaskExecutorResult<>(
Optional.ofNullable(result), PeerTaskExecutorResponseCode.INVALID_RESPONSE);
Optional.ofNullable(result),
PeerTaskExecutorResponseCode.INVALID_RESPONSE,
Optional.of(peer));
}

} catch (PeerNotConnected e) {
executorResult =
new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.PEER_DISCONNECTED);
Optional.empty(),
PeerTaskExecutorResponseCode.PEER_DISCONNECTED,
Optional.of(peer));

} catch (InterruptedException | TimeoutException e) {
peer.recordRequestTimeout(requestMessageData.getCode());
timeoutCounter.labels(taskClassName).inc();
executorResult =
new PeerTaskExecutorResult<>(Optional.empty(), PeerTaskExecutorResponseCode.TIMEOUT);
new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.TIMEOUT, Optional.of(peer));

} catch (InvalidPeerTaskResponseException e) {
peer.recordUselessResponse(e.getMessage());
invalidResponseCounter.labels(taskClassName).inc();
LOG.debug(
"Invalid response found for {} from peer {}", taskClassName, peer.getLoggableId(), e);
executorResult =
new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.INVALID_RESPONSE);
Optional.empty(), PeerTaskExecutorResponseCode.INVALID_RESPONSE, Optional.of(peer));

} catch (ExecutionException e) {
} catch (Exception e) {
internalExceptionCounter.labels(taskClassName).inc();
LOG.error("Server error found for {} from peer {}", taskClassName, peer.getLoggableId(), e);
executorResult =
new PeerTaskExecutorResult<>(
Optional.empty(), PeerTaskExecutorResponseCode.INTERNAL_SERVER_ERROR);
Optional.empty(),
PeerTaskExecutorResponseCode.INTERNAL_SERVER_ERROR,
Optional.of(peer));
}
} while (retriesRemaining-- > 0
&& executorResult.responseCode() != PeerTaskExecutorResponseCode.SUCCESS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;

import org.hyperledger.besu.ethereum.eth.manager.EthPeer;

import java.util.Optional;

public record PeerTaskExecutorResult<T>(
Optional<T> result, PeerTaskExecutorResponseCode responseCode) {}
Optional<T> result, PeerTaskExecutorResponseCode responseCode, Optional<EthPeer> ethPeer) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright contributors to Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;

import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage;

import java.util.Optional;

public enum PeerTaskValidationResponse {
NO_RESULTS_RETURNED(null),
TOO_MANY_RESULTS_RETURNED(null),
RESULTS_DO_NOT_MATCH_QUERY(null),
NON_SEQUENTIAL_HEADERS_RETURNED(
DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL_NON_SEQUENTIAL_HEADERS),
RESULTS_VALID_AND_GOOD(null);
private final Optional<DisconnectMessage.DisconnectReason> disconnectReason;

PeerTaskValidationResponse(final DisconnectMessage.DisconnectReason disconnectReason) {
this.disconnectReason = Optional.ofNullable(disconnectReason);
}

public Optional<DisconnectMessage.DisconnectReason> getDisconnectReason() {
return disconnectReason;
}
}
Loading

0 comments on commit 657efff

Please sign in to comment.