Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block publishing performance #8181

Merged
merged 6 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import tech.pegasys.teku.beacon.sync.events.SyncState;
import tech.pegasys.teku.beacon.sync.events.SyncStateProvider;
import tech.pegasys.teku.beacon.sync.events.SyncStateTracker;
import tech.pegasys.teku.ethereum.performance.trackers.BlockProductionPerformanceFactory;
import tech.pegasys.teku.ethereum.performance.trackers.BlockProductionAndPublishingPerformanceFactory;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.metrics.StubMetricsSystem;
import tech.pegasys.teku.infrastructure.metrics.Validator.ValidatorDutyMetricUtils;
Expand Down Expand Up @@ -121,7 +121,8 @@ public class ValidatorApiHandlerIntegrationTest {
syncCommitteeMessagePool,
syncCommitteeContributionPool,
syncCommitteeSubscriptionManager,
new BlockProductionPerformanceFactory(new SystemTimeProvider(), true, 0));
new BlockProductionAndPublishingPerformanceFactory(
new SystemTimeProvider(), __ -> UInt64.ZERO, true, 0, 0));

@BeforeEach
public void setup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.bls.BLSSignature;
import tech.pegasys.teku.ethereum.performance.trackers.BlockProductionPerformance;
import tech.pegasys.teku.ethereum.performance.trackers.BlockPublishingPerformance;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
Expand All @@ -37,7 +38,9 @@ SafeFuture<BlockContainerAndMetaData> createUnsignedBlock(
Optional<UInt64> requestedBuilderBoostFactor,
BlockProductionPerformance blockProductionPerformance);

SafeFuture<SignedBeaconBlock> unblindSignedBlockIfBlinded(SignedBeaconBlock maybeBlindedBlock);
SafeFuture<SignedBeaconBlock> unblindSignedBlockIfBlinded(
SignedBeaconBlock maybeBlindedBlock, BlockPublishingPerformance blockPublishingPerformance);

List<BlobSidecar> createBlobSidecars(SignedBlockContainer blockContainer);
List<BlobSidecar> createBlobSidecars(
SignedBlockContainer blockContainer, BlockPublishingPerformance blockPublishingPerformance);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.bls.BLSSignature;
import tech.pegasys.teku.ethereum.performance.trackers.BlockProductionPerformance;
import tech.pegasys.teku.ethereum.performance.trackers.BlockPublishingPerformance;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
Expand Down Expand Up @@ -70,8 +71,12 @@ public SafeFuture<BlockContainerAndMetaData> createUnsignedBlock(
}

@Override
public List<BlobSidecar> createBlobSidecars(final SignedBlockContainer blockContainer) {
return operationSelector.createBlobSidecarsSelector().apply(blockContainer);
public List<BlobSidecar> createBlobSidecars(
final SignedBlockContainer blockContainer,
final BlockPublishingPerformance blockPublishingPerformance) {
return operationSelector
.createBlobSidecarsSelector(blockPublishingPerformance)
.apply(blockContainer);
}

private BlockContents createBlockContents(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.bls.BLSSignature;
import tech.pegasys.teku.ethereum.performance.trackers.BlockProductionPerformance;
import tech.pegasys.teku.ethereum.performance.trackers.BlockPublishingPerformance;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
Expand Down Expand Up @@ -94,16 +95,20 @@ private BlockContainerAndMetaData beaconBlockAndStateToBlockContainerAndMetaData

@Override
public SafeFuture<SignedBeaconBlock> unblindSignedBlockIfBlinded(
final SignedBeaconBlock maybeBlindedBlock) {
final SignedBeaconBlock maybeBlindedBlock,
final BlockPublishingPerformance blockPublishingPerformance) {
if (maybeBlindedBlock.isBlinded()) {
return spec.unblindSignedBeaconBlock(
maybeBlindedBlock.getSignedBlock(), operationSelector.createBlockUnblinderSelector());
maybeBlindedBlock.getSignedBlock(),
operationSelector.createBlockUnblinderSelector(blockPublishingPerformance));
}
return SafeFuture.completedFuture(maybeBlindedBlock);
}

@Override
public List<BlobSidecar> createBlobSidecars(final SignedBlockContainer blockContainer) {
public List<BlobSidecar> createBlobSidecars(
final SignedBlockContainer blockContainer,
final BlockPublishingPerformance blockPublishingPerformance) {
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.bls.BLSSignature;
import tech.pegasys.teku.ethereum.performance.trackers.BlockProductionPerformance;
import tech.pegasys.teku.ethereum.performance.trackers.BlockPublishingPerformance;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.ssz.SszList;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
Expand Down Expand Up @@ -409,7 +410,8 @@ private SafeFuture<Void> builderSetKzgCommitments(
.thenAccept(bodyBuilder::blobKzgCommitments);
}

public Consumer<SignedBeaconBlockUnblinder> createBlockUnblinderSelector() {
public Consumer<SignedBeaconBlockUnblinder> createBlockUnblinderSelector(
final BlockPublishingPerformance blockPublishingPerformance) {
return bodyUnblinder -> {
final SignedBeaconBlock signedBlindedBlock = bodyUnblinder.getSignedBlindedBeaconBlock();

Expand All @@ -432,7 +434,7 @@ public Consumer<SignedBeaconBlockUnblinder> createBlockUnblinderSelector() {
bodyUnblinder.setExecutionPayloadSupplier(
() ->
executionLayerBlockProductionManager
.getUnblindedPayload(signedBlindedBlock)
.getUnblindedPayload(signedBlindedBlock, blockPublishingPerformance)
.thenApply(BuilderPayload::getExecutionPayload));
}
};
Expand All @@ -442,7 +444,8 @@ public Function<BeaconBlock, SafeFuture<BlobsBundle>> createBlobsBundleSelector(
return block -> getCachedExecutionBlobsBundle(block.getSlot());
}

public Function<SignedBlockContainer, List<BlobSidecar>> createBlobSidecarsSelector() {
public Function<SignedBlockContainer, List<BlobSidecar>> createBlobSidecarsSelector(
final BlockPublishingPerformance blockPublishingPerformance) {
return blockContainer -> {
final UInt64 slot = blockContainer.getSlot();
final SignedBeaconBlock block = blockContainer.getSignedBlock();
Expand Down Expand Up @@ -479,12 +482,17 @@ public Function<SignedBlockContainer, List<BlobSidecar>> createBlobSidecarsSelec
proofs = blockContainer.getKzgProofs().orElseThrow();
}

return IntStream.range(0, blobs.size())
.mapToObj(
index ->
miscHelpersDeneb.constructBlobSidecar(
block, UInt64.valueOf(index), blobs.get(index), proofs.get(index)))
.toList();
final List<BlobSidecar> blobSidecars =
IntStream.range(0, blobs.size())
.mapToObj(
index ->
miscHelpersDeneb.constructBlobSidecar(
block, UInt64.valueOf(index), blobs.get(index), proofs.get(index)))
.toList();

blockPublishingPerformance.blobSidecarsPrepared();

return blobSidecars;
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.bls.BLSSignature;
import tech.pegasys.teku.ethereum.performance.trackers.BlockProductionPerformance;
import tech.pegasys.teku.ethereum.performance.trackers.BlockPublishingPerformance;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
Expand Down Expand Up @@ -84,15 +85,22 @@ public SafeFuture<BlockContainerAndMetaData> createUnsignedBlock(

@Override
public SafeFuture<SignedBeaconBlock> unblindSignedBlockIfBlinded(
final SignedBeaconBlock maybeBlindedBlock) {
final SignedBeaconBlock maybeBlindedBlock,
final BlockPublishingPerformance blockPublishingPerformance) {
final SpecMilestone milestone = getMilestone(maybeBlindedBlock.getSlot());
return registeredFactories.get(milestone).unblindSignedBlockIfBlinded(maybeBlindedBlock);
return registeredFactories
.get(milestone)
.unblindSignedBlockIfBlinded(maybeBlindedBlock, blockPublishingPerformance);
}

@Override
public List<BlobSidecar> createBlobSidecars(final SignedBlockContainer blockContainer) {
public List<BlobSidecar> createBlobSidecars(
final SignedBlockContainer blockContainer,
BlockPublishingPerformance blockPublishingPerformance) {
final SpecMilestone milestone = getMilestone(blockContainer.getSlot());
return registeredFactories.get(milestone).createBlobSidecars(blockContainer);
return registeredFactories
.get(milestone)
.createBlobSidecars(blockContainer, blockPublishingPerformance);
}

private SpecMilestone getMilestone(final UInt64 slot) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static tech.pegasys.teku.infrastructure.metrics.Validator.DutyType.ATTESTATION_PRODUCTION;
import static tech.pegasys.teku.infrastructure.metrics.Validator.ValidatorDutyMetricUtils.startTimer;
import static tech.pegasys.teku.infrastructure.metrics.Validator.ValidatorDutyMetricsSteps.CREATE;
import static tech.pegasys.teku.infrastructure.time.TimeUtilities.secondsToMillis;
import static tech.pegasys.teku.spec.config.SpecConfig.GENESIS_SLOT;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -54,8 +53,9 @@
import tech.pegasys.teku.ethereum.json.types.validator.SyncCommitteeDuties;
import tech.pegasys.teku.ethereum.json.types.validator.SyncCommitteeDuty;
import tech.pegasys.teku.ethereum.json.types.validator.SyncCommitteeSelectionProof;
import tech.pegasys.teku.ethereum.performance.trackers.BlockProductionAndPublishingPerformanceFactory;
import tech.pegasys.teku.ethereum.performance.trackers.BlockProductionPerformance;
import tech.pegasys.teku.ethereum.performance.trackers.BlockProductionPerformanceFactory;
import tech.pegasys.teku.ethereum.performance.trackers.BlockPublishingPerformance;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.ssz.SszList;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
Expand Down Expand Up @@ -116,7 +116,8 @@ public class ValidatorApiHandler implements ValidatorApiChannel {
*/
private static final int DUTY_EPOCH_TOLERANCE = 1;

private final BlockProductionPerformanceFactory blockProductionPerformanceFactory;
private final BlockProductionAndPublishingPerformanceFactory
blockProductionAndPublishingPerformanceFactory;
private final ChainDataProvider chainDataProvider;
private final NodeDataProvider nodeDataProvider;
private final CombinedChainDataClient combinedChainDataClient;
Expand Down Expand Up @@ -160,8 +161,10 @@ public ValidatorApiHandler(
final SyncCommitteeMessagePool syncCommitteeMessagePool,
final SyncCommitteeContributionPool syncCommitteeContributionPool,
final SyncCommitteeSubscriptionManager syncCommitteeSubscriptionManager,
final BlockProductionPerformanceFactory blockProductionPerformanceFactory) {
this.blockProductionPerformanceFactory = blockProductionPerformanceFactory;
final BlockProductionAndPublishingPerformanceFactory
blockProductionAndPublishingPerformanceFactory) {
this.blockProductionAndPublishingPerformanceFactory =
blockProductionAndPublishingPerformanceFactory;
this.chainDataProvider = chainDataProvider;
this.nodeDataProvider = nodeDataProvider;
this.combinedChainDataClient = combinedChainDataClient;
Expand Down Expand Up @@ -323,21 +326,14 @@ public SafeFuture<Optional<BlockContainerAndMetaData>> createUnsignedBlock(
return NodeSyncingException.failedFuture();
}
final BlockProductionPerformance blockProductionPerformance =
blockProductionPerformanceFactory.create(slot);
blockProductionAndPublishingPerformanceFactory.createForProduction(slot);
return forkChoiceTrigger
.prepareForBlockProduction(slot, blockProductionPerformance)
.thenCompose(
__ ->
combinedChainDataClient.getStateForBlockProduction(
slot, forkChoiceTrigger.isForkChoiceOverrideLateBlockEnabled()))
.thenPeek(
maybeState -> {
maybeState.ifPresent(
state ->
blockProductionPerformance.slotTime(
() -> secondsToMillis(spec.computeTimeAtSlot(state, slot))));
blockProductionPerformance.getStateAtSlot();
})
.thenPeek(__ -> blockProductionPerformance.getStateAtSlot())
.thenCompose(
blockSlotState ->
createBlock(
Expand Down Expand Up @@ -630,13 +626,18 @@ private SafeFuture<InternalValidationResult> processAggregateAndProof(
public SafeFuture<SendSignedBlockResult> sendSignedBlock(
final SignedBlockContainer maybeBlindedBlockContainer,
final BroadcastValidationLevel broadcastValidationLevel) {
final BlockPublishingPerformance blockPublishingPerformance =
blockProductionAndPublishingPerformanceFactory.createForPublishing(
maybeBlindedBlockContainer.getSlot());
return blockPublisher
.sendSignedBlock(maybeBlindedBlockContainer, broadcastValidationLevel)
.sendSignedBlock(
maybeBlindedBlockContainer, broadcastValidationLevel, blockPublishingPerformance)
.exceptionally(
ex -> {
final String reason = getRootCauseMessage(ex);
return SendSignedBlockResult.rejected(reason);
});
})
.alwaysRun(blockPublishingPerformance::complete);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.ethereum.performance.trackers.BlockPublishingPerformance;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
Expand Down Expand Up @@ -55,18 +56,19 @@ public AbstractBlockPublisher(
@Override
public SafeFuture<SendSignedBlockResult> sendSignedBlock(
final SignedBlockContainer blockContainer,
final BroadcastValidationLevel broadcastValidationLevel) {
final BroadcastValidationLevel broadcastValidationLevel,
final BlockPublishingPerformance blockPublishingPerformance) {
return blockFactory
.unblindSignedBlockIfBlinded(blockContainer.getSignedBlock())
.unblindSignedBlockIfBlinded(blockContainer.getSignedBlock(), blockPublishingPerformance)
.thenPeek(performanceTracker::saveProducedBlock)
.thenCompose(
signedBlock -> {
// creating blob sidecars after unblinding the block to ensure in the blinded flow we
// will have the cached builder payload
final List<BlobSidecar> blobSidecars =
blockFactory.createBlobSidecars(blockContainer);
blockFactory.createBlobSidecars(blockContainer, blockPublishingPerformance);
return gossipAndImportUnblindedSignedBlockAndBlobSidecars(
signedBlock, blobSidecars, broadcastValidationLevel);
signedBlock, blobSidecars, broadcastValidationLevel, blockPublishingPerformance);
})
.thenCompose(result -> calculateResult(blockContainer, result));
}
Expand All @@ -75,28 +77,31 @@ public SafeFuture<SendSignedBlockResult> sendSignedBlock(
gossipAndImportUnblindedSignedBlockAndBlobSidecars(
final SignedBeaconBlock block,
final List<BlobSidecar> blobSidecars,
final BroadcastValidationLevel broadcastValidationLevel) {
final BroadcastValidationLevel broadcastValidationLevel,
final BlockPublishingPerformance blockPublishingPerformance) {

if (broadcastValidationLevel == BroadcastValidationLevel.NOT_REQUIRED) {
// when broadcast validation is disabled, we can publish the block (and blob sidecars)
// immediately and then import
publishBlockAndBlobSidecars(block, blobSidecars);
return importBlockAndBlobSidecars(block, blobSidecars, broadcastValidationLevel);
publishBlockAndBlobSidecars(block, blobSidecars, blockPublishingPerformance);
return importBlockAndBlobSidecars(
block, blobSidecars, broadcastValidationLevel, blockPublishingPerformance);
}

// when broadcast validation is enabled, we need to wait for the validation to complete before
// publishing the block (and blob sidecars)

final SafeFuture<BlockImportAndBroadcastValidationResults>
blockImportAndBroadcastValidationResults =
importBlockAndBlobSidecars(block, blobSidecars, broadcastValidationLevel);
importBlockAndBlobSidecars(
block, blobSidecars, broadcastValidationLevel, blockPublishingPerformance);

blockImportAndBroadcastValidationResults
.thenCompose(BlockImportAndBroadcastValidationResults::broadcastValidationResult)
.thenAccept(
broadcastValidationResult -> {
if (broadcastValidationResult == BroadcastValidationResult.SUCCESS) {
publishBlockAndBlobSidecars(block, blobSidecars);
publishBlockAndBlobSidecars(block, blobSidecars, blockPublishingPerformance);
LOG.debug("Block (and blob sidecars) publishing initiated");
} else {
LOG.warn(
Expand All @@ -118,10 +123,13 @@ public SafeFuture<SendSignedBlockResult> sendSignedBlock(
abstract SafeFuture<BlockImportAndBroadcastValidationResults> importBlockAndBlobSidecars(
SignedBeaconBlock block,
List<BlobSidecar> blobSidecars,
BroadcastValidationLevel broadcastValidationLevel);
BroadcastValidationLevel broadcastValidationLevel,
BlockPublishingPerformance blockPublishingPerformance);

abstract void publishBlockAndBlobSidecars(
SignedBeaconBlock block, List<BlobSidecar> blobSidecars);
SignedBeaconBlock block,
List<BlobSidecar> blobSidecars,
BlockPublishingPerformance blockPublishingPerformance);

private SafeFuture<SendSignedBlockResult> calculateResult(
final SignedBlockContainer maybeBlindedBlockContainer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package tech.pegasys.teku.validator.coordinator.publisher;

import tech.pegasys.teku.ethereum.performance.trackers.BlockPublishingPerformance;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockContainer;
import tech.pegasys.teku.spec.datastructures.validator.BroadcastValidationLevel;
Expand All @@ -21,5 +22,7 @@
/** Used to publish blocks (unblinded and blinded) and blob sidecars */
public interface BlockPublisher {
SafeFuture<SendSignedBlockResult> sendSignedBlock(
SignedBlockContainer blockContainer, BroadcastValidationLevel broadcastValidationLevel);
SignedBlockContainer blockContainer,
BroadcastValidationLevel broadcastValidationLevel,
BlockPublishingPerformance blockPublishingPerformance);
}
Loading