From 4a02d5686ace372a91fd20c5a4df27440463c353 Mon Sep 17 00:00:00 2001 From: pk910 Date: Sat, 8 Feb 2025 00:27:03 +0100 Subject: [PATCH 01/18] commit --- clients/consensus/chainspec.go | 77 +++---- db/consolidation_requests.go | 12 +- .../pgsql/20250207221132_request-results.sql | 11 + .../sqlite/20250207221132_request-results.sql | 11 + dbtypes/dbtypes.go | 25 +++ indexer/beacon/block_helper.go | 24 +++ indexer/beacon/epochstate.go | 18 +- indexer/beacon/epochstats.go | 18 ++ indexer/beacon/writedb_state.go | 191 ++++++++++++++++++ 9 files changed, 340 insertions(+), 47 deletions(-) create mode 100644 db/schema/pgsql/20250207221132_request-results.sql create mode 100644 db/schema/sqlite/20250207221132_request-results.sql create mode 100644 indexer/beacon/writedb_state.go diff --git a/clients/consensus/chainspec.go b/clients/consensus/chainspec.go index 3bb2b347..e40badec 100644 --- a/clients/consensus/chainspec.go +++ b/clients/consensus/chainspec.go @@ -20,44 +20,45 @@ type ForkVersion struct { // https://github.com/ethereum/consensus-specs/blob/dev/configs/mainnet.yaml type ChainSpec struct { - PresetBase string `yaml:"PRESET_BASE"` - ConfigName string `yaml:"CONFIG_NAME" check-if:"false"` - MinGenesisTime time.Time `yaml:"MIN_GENESIS_TIME"` - GenesisForkVersion phase0.Version `yaml:"GENESIS_FORK_VERSION"` - AltairForkVersion phase0.Version `yaml:"ALTAIR_FORK_VERSION"` - AltairForkEpoch *uint64 `yaml:"ALTAIR_FORK_EPOCH"` - BellatrixForkVersion phase0.Version `yaml:"BELLATRIX_FORK_VERSION"` - BellatrixForkEpoch *uint64 `yaml:"BELLATRIX_FORK_EPOCH"` - CapellaForkVersion phase0.Version `yaml:"CAPELLA_FORK_VERSION"` - CapellaForkEpoch *uint64 `yaml:"CAPELLA_FORK_EPOCH"` - DenebForkVersion phase0.Version `yaml:"DENEB_FORK_VERSION"` - DenebForkEpoch *uint64 `yaml:"DENEB_FORK_EPOCH"` - ElectraForkVersion phase0.Version `yaml:"ELECTRA_FORK_VERSION" check-if-fork:"ElectraForkEpoch"` - ElectraForkEpoch *uint64 `yaml:"ELECTRA_FORK_EPOCH"` - Eip7594ForkVersion phase0.Version `yaml:"EIP7594_FORK_VERSION" check-if-fork:"Eip7594ForkEpoch"` - Eip7594ForkEpoch *uint64 `yaml:"EIP7594_FORK_EPOCH"` - SecondsPerSlot time.Duration `yaml:"SECONDS_PER_SLOT"` - SlotsPerEpoch uint64 `yaml:"SLOTS_PER_EPOCH"` - EpochsPerHistoricalVector uint64 `yaml:"EPOCHS_PER_HISTORICAL_VECTOR"` - EpochsPerSlashingVector uint64 `yaml:"EPOCHS_PER_SLASHINGS_VECTOR"` - EpochsPerSyncCommitteePeriod uint64 `yaml:"EPOCHS_PER_SYNC_COMMITTEE_PERIOD"` - MinSeedLookahead uint64 `yaml:"MIN_SEED_LOOKAHEAD"` - ShuffleRoundCount uint64 `yaml:"SHUFFLE_ROUND_COUNT"` - MaxEffectiveBalance uint64 `yaml:"MAX_EFFECTIVE_BALANCE"` - MaxEffectiveBalanceElectra uint64 `yaml:"MAX_EFFECTIVE_BALANCE_ELECTRA" check-if-fork:"ElectraForkEpoch"` - TargetCommitteeSize uint64 `yaml:"TARGET_COMMITTEE_SIZE"` - MaxCommitteesPerSlot uint64 `yaml:"MAX_COMMITTEES_PER_SLOT"` - MinPerEpochChurnLimit uint64 `yaml:"MIN_PER_EPOCH_CHURN_LIMIT"` - ChurnLimitQuotient uint64 `yaml:"CHURN_LIMIT_QUOTIENT"` - DomainBeaconProposer phase0.DomainType `yaml:"DOMAIN_BEACON_PROPOSER"` - DomainBeaconAttester phase0.DomainType `yaml:"DOMAIN_BEACON_ATTESTER"` - DomainSyncCommittee phase0.DomainType `yaml:"DOMAIN_SYNC_COMMITTEE"` - SyncCommitteeSize uint64 `yaml:"SYNC_COMMITTEE_SIZE"` - DepositContractAddress []byte `yaml:"DEPOSIT_CONTRACT_ADDRESS"` - MaxConsolidationRequestsPerPayload uint64 `yaml:"MAX_CONSOLIDATION_REQUESTS_PER_PAYLOAD" check-if-fork:"ElectraForkEpoch"` - MaxWithdrawalRequestsPerPayload uint64 `yaml:"MAX_WITHDRAWAL_REQUESTS_PER_PAYLOAD" check-if-fork:"ElectraForkEpoch"` - DepositChainId uint64 `yaml:"DEPOSIT_CHAIN_ID"` - MinActivationBalance uint64 `yaml:"MIN_ACTIVATION_BALANCE"` + PresetBase string `yaml:"PRESET_BASE"` + ConfigName string `yaml:"CONFIG_NAME" check-if:"false"` + MinGenesisTime time.Time `yaml:"MIN_GENESIS_TIME"` + GenesisForkVersion phase0.Version `yaml:"GENESIS_FORK_VERSION"` + AltairForkVersion phase0.Version `yaml:"ALTAIR_FORK_VERSION"` + AltairForkEpoch *uint64 `yaml:"ALTAIR_FORK_EPOCH"` + BellatrixForkVersion phase0.Version `yaml:"BELLATRIX_FORK_VERSION"` + BellatrixForkEpoch *uint64 `yaml:"BELLATRIX_FORK_EPOCH"` + CapellaForkVersion phase0.Version `yaml:"CAPELLA_FORK_VERSION"` + CapellaForkEpoch *uint64 `yaml:"CAPELLA_FORK_EPOCH"` + DenebForkVersion phase0.Version `yaml:"DENEB_FORK_VERSION"` + DenebForkEpoch *uint64 `yaml:"DENEB_FORK_EPOCH"` + ElectraForkVersion phase0.Version `yaml:"ELECTRA_FORK_VERSION" check-if-fork:"ElectraForkEpoch"` + ElectraForkEpoch *uint64 `yaml:"ELECTRA_FORK_EPOCH"` + Eip7594ForkVersion phase0.Version `yaml:"EIP7594_FORK_VERSION" check-if-fork:"Eip7594ForkEpoch"` + Eip7594ForkEpoch *uint64 `yaml:"EIP7594_FORK_EPOCH"` + SecondsPerSlot time.Duration `yaml:"SECONDS_PER_SLOT"` + SlotsPerEpoch uint64 `yaml:"SLOTS_PER_EPOCH"` + EpochsPerHistoricalVector uint64 `yaml:"EPOCHS_PER_HISTORICAL_VECTOR"` + EpochsPerSlashingVector uint64 `yaml:"EPOCHS_PER_SLASHINGS_VECTOR"` + EpochsPerSyncCommitteePeriod uint64 `yaml:"EPOCHS_PER_SYNC_COMMITTEE_PERIOD"` + MinSeedLookahead uint64 `yaml:"MIN_SEED_LOOKAHEAD"` + ShuffleRoundCount uint64 `yaml:"SHUFFLE_ROUND_COUNT"` + MaxEffectiveBalance uint64 `yaml:"MAX_EFFECTIVE_BALANCE"` + MaxEffectiveBalanceElectra uint64 `yaml:"MAX_EFFECTIVE_BALANCE_ELECTRA" check-if-fork:"ElectraForkEpoch"` + TargetCommitteeSize uint64 `yaml:"TARGET_COMMITTEE_SIZE"` + MaxCommitteesPerSlot uint64 `yaml:"MAX_COMMITTEES_PER_SLOT"` + MinPerEpochChurnLimit uint64 `yaml:"MIN_PER_EPOCH_CHURN_LIMIT"` + ChurnLimitQuotient uint64 `yaml:"CHURN_LIMIT_QUOTIENT"` + DomainBeaconProposer phase0.DomainType `yaml:"DOMAIN_BEACON_PROPOSER"` + DomainBeaconAttester phase0.DomainType `yaml:"DOMAIN_BEACON_ATTESTER"` + DomainSyncCommittee phase0.DomainType `yaml:"DOMAIN_SYNC_COMMITTEE"` + SyncCommitteeSize uint64 `yaml:"SYNC_COMMITTEE_SIZE"` + DepositContractAddress []byte `yaml:"DEPOSIT_CONTRACT_ADDRESS"` + MaxConsolidationRequestsPerPayload uint64 `yaml:"MAX_CONSOLIDATION_REQUESTS_PER_PAYLOAD" check-if-fork:"ElectraForkEpoch"` + MaxWithdrawalRequestsPerPayload uint64 `yaml:"MAX_WITHDRAWAL_REQUESTS_PER_PAYLOAD" check-if-fork:"ElectraForkEpoch"` + DepositChainId uint64 `yaml:"DEPOSIT_CHAIN_ID"` + MinActivationBalance uint64 `yaml:"MIN_ACTIVATION_BALANCE"` + MaxPendingPartialsPerWithdrawalsSweep uint64 `yaml:"MAX_PENDING_PARTIALS_PER_WITHDRAWALS_SWEEP"` // EIP7594: PeerDAS NumberOfColumns *uint64 `yaml:"NUMBER_OF_COLUMNS" check-if-fork:"Eip7594ForkEpoch"` diff --git a/db/consolidation_requests.go b/db/consolidation_requests.go index 9c054d86..15ab3257 100644 --- a/db/consolidation_requests.go +++ b/db/consolidation_requests.go @@ -15,11 +15,11 @@ func InsertConsolidationRequests(consolidations []*dbtypes.ConsolidationRequest, dbtypes.DBEnginePgsql: "INSERT INTO consolidation_requests ", dbtypes.DBEngineSqlite: "INSERT OR REPLACE INTO consolidation_requests ", }), - "(slot_number, slot_root, slot_index, orphaned, fork_id, source_address, source_index, source_pubkey, target_index, target_pubkey, tx_hash, block_number)", + "(slot_number, slot_root, slot_index, orphaned, fork_id, source_address, source_index, source_pubkey, target_index, target_pubkey, tx_hash, block_number, result)", " VALUES ", ) argIdx := 0 - fieldCount := 12 + fieldCount := 13 args := make([]interface{}, len(consolidations)*fieldCount) for i, consolidation := range consolidations { @@ -47,10 +47,11 @@ func InsertConsolidationRequests(consolidations []*dbtypes.ConsolidationRequest, args[argIdx+9] = consolidation.TargetPubkey[:] args[argIdx+10] = consolidation.TxHash[:] args[argIdx+11] = consolidation.BlockNumber + args[argIdx+12] = consolidation.Result argIdx += fieldCount } fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{ - dbtypes.DBEnginePgsql: " ON CONFLICT (slot_root, slot_index) DO UPDATE SET orphaned = excluded.orphaned, fork_id = excluded.fork_id", + dbtypes.DBEnginePgsql: " ON CONFLICT (slot_root, slot_index) DO UPDATE SET orphaned = excluded.orphaned, fork_id = excluded.fork_id, result = excluded.result", dbtypes.DBEngineSqlite: "", })) _, err := tx.Exec(sql.String(), args...) @@ -66,7 +67,7 @@ func GetConsolidationRequestsFiltered(offset uint64, limit uint32, canonicalFork fmt.Fprint(&sql, ` WITH cte AS ( SELECT - slot_number, slot_root, slot_index, orphaned, fork_id, source_address, source_index, source_pubkey, target_index, target_pubkey, tx_hash, block_number + slot_number, slot_root, slot_index, orphaned, fork_id, source_address, source_index, source_pubkey, target_index, target_pubkey, tx_hash, block_number, result FROM consolidation_requests `) @@ -173,7 +174,8 @@ func GetConsolidationRequestsFiltered(offset uint64, limit uint32, canonicalFork 0 AS target_index, null AS target_pubkey, null AS tx_hash, - 0 AS block_number + 0 AS block_number, + 0 AS result FROM cte UNION ALL SELECT * FROM ( SELECT * FROM cte diff --git a/db/schema/pgsql/20250207221132_request-results.sql b/db/schema/pgsql/20250207221132_request-results.sql new file mode 100644 index 00000000..2f2cec85 --- /dev/null +++ b/db/schema/pgsql/20250207221132_request-results.sql @@ -0,0 +1,11 @@ +-- +goose Up +-- +goose StatementBegin + +ALTER TABLE public."consolidation_requests" + ADD "result" TINYINT NOT NULL DEFAULT 0; + +-- +goose StatementEnd +-- +goose Down +-- +goose StatementBegin +SELECT 'NOT SUPPORTED'; +-- +goose StatementEnd diff --git a/db/schema/sqlite/20250207221132_request-results.sql b/db/schema/sqlite/20250207221132_request-results.sql new file mode 100644 index 00000000..2b5f8d22 --- /dev/null +++ b/db/schema/sqlite/20250207221132_request-results.sql @@ -0,0 +1,11 @@ +-- +goose Up +-- +goose StatementBegin + +ALTER TABLE "consolidation_requests" + ADD "result" TINYINT NOT NULL DEFAULT 0; + +-- +goose StatementEnd +-- +goose Down +-- +goose StatementBegin +SELECT 'NOT SUPPORTED'; +-- +goose StatementEnd diff --git a/dbtypes/dbtypes.go b/dbtypes/dbtypes.go index 49c78504..b2bb2205 100644 --- a/dbtypes/dbtypes.go +++ b/dbtypes/dbtypes.go @@ -251,6 +251,30 @@ type Slashing struct { ForkId uint64 `db:"fork_id"` } +const ( + ConsolidationRequestResultUnknown uint8 = 0 + ConsolidationRequestResultSuccess uint8 = 1 + + // global errors + ConsolidationRequestResultTotalBalanceTooLow uint8 = 10 + ConsolidationRequestResultQueueFull uint8 = 11 + + // source validator errors + ConsolidationRequestResultSrcNotFound uint8 = 20 + ConsolidationRequestResultSrcInvalidCredentials uint8 = 21 + ConsolidationRequestResultSrcInvalidSender uint8 = 22 + ConsolidationRequestResultSrcNotActive uint8 = 23 + ConsolidationRequestResultSrcNotOldEnough uint8 = 24 + ConsolidationRequestResultSrcHasPendingWithdrawal uint8 = 25 + + // target validator errors + ConsolidationRequestResultTgtNotFound uint8 = 30 + ConsolidationRequestResultTgtInvalidCredentials uint8 = 31 + ConsolidationRequestResultTgtInvalidSender uint8 = 32 + ConsolidationRequestResultTgtNotCompounding uint8 = 33 + ConsolidationRequestResultTgtNotActive uint8 = 34 +) + type ConsolidationRequest struct { SlotNumber uint64 `db:"slot_number"` SlotRoot []byte `db:"slot_root"` @@ -264,6 +288,7 @@ type ConsolidationRequest struct { TargetPubkey []byte `db:"target_pubkey"` TxHash []byte `db:"tx_hash"` BlockNumber uint64 `db:"block_number"` + Result uint8 `db:"result"` } type ConsolidationRequestTx struct { diff --git a/indexer/beacon/block_helper.go b/indexer/beacon/block_helper.go index 843a1100..7fd92544 100644 --- a/indexer/beacon/block_helper.go +++ b/indexer/beacon/block_helper.go @@ -325,3 +325,27 @@ func getStateCurrentSyncCommittee(v *spec.VersionedBeaconState) ([]phase0.BLSPub return nil, errors.New("unknown version") } } + +// getStatePendingWithdrawals returns the current sync committee from a versioned beacon state. +func getStatePendingWithdrawals(v *spec.VersionedBeaconState) ([]*electra.PendingPartialWithdrawal, error) { + switch v.Version { + case spec.DataVersionPhase0: + return nil, errors.New("no pending withdrawals in phase0") + case spec.DataVersionAltair: + return nil, errors.New("no pending withdrawals in altair") + case spec.DataVersionBellatrix: + return nil, errors.New("no pending withdrawals in bellatrix") + case spec.DataVersionCapella: + return nil, errors.New("no pending withdrawals in capella") + case spec.DataVersionDeneb: + return nil, errors.New("no pending withdrawals in deneb") + case spec.DataVersionElectra: + if v.Electra == nil || v.Electra.PendingPartialWithdrawals == nil { + return nil, errors.New("no electra block") + } + + return v.Electra.PendingPartialWithdrawals, nil + default: + return nil, errors.New("unknown version") + } +} diff --git a/indexer/beacon/epochstate.go b/indexer/beacon/epochstate.go index fde0d5fe..902927c0 100644 --- a/indexer/beacon/epochstate.go +++ b/indexer/beacon/epochstate.go @@ -7,6 +7,7 @@ import ( "time" "github.com/attestantio/go-eth2-client/spec" + "github.com/attestantio/go-eth2-client/spec/electra" "github.com/attestantio/go-eth2-client/spec/phase0" ) @@ -22,10 +23,11 @@ type epochState struct { readyChan chan bool highPriority bool - validatorBalances []phase0.Gwei - randaoMixes []phase0.Root - depositIndex uint64 - syncCommittee []phase0.ValidatorIndex + validatorBalances []phase0.Gwei + randaoMixes []phase0.Root + depositIndex uint64 + syncCommittee []phase0.ValidatorIndex + pendingPartialWithdrawals []*electra.PendingPartialWithdrawal } // newEpochState creates a new epochState instance with the root of the state to be loaded. @@ -192,5 +194,13 @@ func (s *epochState) processState(state *spec.VersionedBeaconState, cache *epoch s.syncCommittee = []phase0.ValidatorIndex{} } + if state.Version >= spec.DataVersionElectra { + pendingPartialWithdrawals, err := getStatePendingWithdrawals(state) + if err != nil { + return fmt.Errorf("error getting pending withdrawal indices from state %v: %v", s.slotRoot.String(), err) + } + s.pendingPartialWithdrawals = pendingPartialWithdrawals + } + return nil } diff --git a/indexer/beacon/epochstats.go b/indexer/beacon/epochstats.go index 77f27189..ddf911c2 100644 --- a/indexer/beacon/epochstats.go +++ b/indexer/beacon/epochstats.go @@ -56,6 +56,7 @@ type EpochStatsValues struct { ActiveBalance phase0.Gwei EffectiveBalance phase0.Gwei FirstDepositIndex uint64 + PendingWithdrawals []EpochStatsPendingWithdrawals } // EpochStatsPacked holds the packed values for the epoch-specific information. @@ -68,6 +69,7 @@ type EpochStatsPacked struct { TotalBalance phase0.Gwei ActiveBalance phase0.Gwei FirstDepositIndex uint64 + PendingWithdrawals []EpochStatsPendingWithdrawals } // EpochStatsPackedValidator holds the packed values for an active validator. @@ -76,6 +78,11 @@ type EpochStatsPackedValidator struct { EffectiveBalanceEth uint16 // effective balance in full ETH } +type EpochStatsPendingWithdrawals struct { + ValidatorIndex phase0.ValidatorIndex + Epoch phase0.Epoch +} + // newEpochStats creates a new EpochStats instance. func newEpochStats(epoch phase0.Epoch, dependentRoot phase0.Root) *EpochStats { stats := &EpochStats{ @@ -167,6 +174,7 @@ func (es *EpochStats) buildPackedSSZ(dynSsz *dynssz.DynSsz) ([]byte, error) { TotalBalance: es.values.TotalBalance, ActiveBalance: es.values.ActiveBalance, FirstDepositIndex: es.values.FirstDepositIndex, + PendingWithdrawals: es.values.PendingWithdrawals, } lastValidatorIndex := phase0.ValidatorIndex(0) @@ -221,6 +229,7 @@ func (es *EpochStats) parsePackedSSZ(dynSsz *dynssz.DynSsz, chainState *consensu ActiveBalance: packedValues.ActiveBalance, EffectiveBalance: 0, FirstDepositIndex: packedValues.FirstDepositIndex, + PendingWithdrawals: packedValues.PendingWithdrawals, } lastValidatorIndex := phase0.ValidatorIndex(0) @@ -289,6 +298,7 @@ func (es *EpochStats) pruneValues() { ActiveBalance: es.values.ActiveBalance, EffectiveBalance: es.values.EffectiveBalance, FirstDepositIndex: es.values.FirstDepositIndex, + PendingWithdrawals: nil, // prune } es.values = nil @@ -342,6 +352,14 @@ func (es *EpochStats) processState(indexer *Indexer, validatorSet []*phase0.Vali ActiveBalance: 0, EffectiveBalance: 0, FirstDepositIndex: es.dependentState.depositIndex, + PendingWithdrawals: make([]EpochStatsPendingWithdrawals, len(es.dependentState.pendingPartialWithdrawals)), + } + + for i, pendingPartialWithdrawal := range es.dependentState.pendingPartialWithdrawals { + values.PendingWithdrawals[i] = EpochStatsPendingWithdrawals{ + ValidatorIndex: pendingPartialWithdrawal.ValidatorIndex, + Epoch: pendingPartialWithdrawal.WithdrawableEpoch, + } } if validatorSet == nil { diff --git a/indexer/beacon/writedb_state.go b/indexer/beacon/writedb_state.go new file mode 100644 index 00000000..aae13a7c --- /dev/null +++ b/indexer/beacon/writedb_state.go @@ -0,0 +1,191 @@ +package beacon + +import ( + "slices" + + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/ethpandaops/dora/dbtypes" +) + +func (dbw *dbWriter) getParentBlocks(block *Block) []*Block { + chainState := dbw.indexer.consensusPool.GetChainState() + minSlot := chainState.EpochToSlot(chainState.EpochOfSlot(block.Slot)) + parentBlocks := []*Block{} + + for { + parentBlockRoot := block.GetParentRoot() + if parentBlockRoot == nil { + break + } + + parentBlock := dbw.indexer.GetBlockByRoot(*parentBlockRoot) + if parentBlock == nil { + break + } + + if parentBlock.Slot < minSlot { + break + } + + parentBlocks = append(parentBlocks, parentBlock) + } + + slices.Reverse(parentBlocks) + + return parentBlocks +} + +func (dbw *dbWriter) replayPendingWithdrawalsMap(epochStats *EpochStats, parentBlocks []*Block) map[phase0.ValidatorIndex]bool { + withdrawalsMap := map[phase0.ValidatorIndex]bool{} + + chainState := dbw.indexer.consensusPool.GetChainState() + chainSpec := chainState.GetSpecs() + if chainSpec.ElectraForkEpoch == nil || epochStats.epoch < phase0.Epoch(*chainSpec.ElectraForkEpoch) { + return withdrawalsMap + } + + var pendingWithdrawals []EpochStatsPendingWithdrawals + epochStatsValues := epochStats.GetValues(false) + if epochStatsValues == nil || epochStatsValues.PendingWithdrawals == nil { + pendingWithdrawals = epochStatsValues.PendingWithdrawals + } else { + pendingWithdrawals = []EpochStatsPendingWithdrawals{} + } + + maxDequeuePerSweep := chainSpec.MaxPendingPartialsPerWithdrawalsSweep + + for _, block := range parentBlocks { + // dequeue pending withdrawals + dequeued := uint64(0) + for _, pendingWithdrawal := range pendingWithdrawals { + if pendingWithdrawal.Epoch > epochStats.epoch || dequeued >= maxDequeuePerSweep { + break + } + dequeued++ + } + pendingWithdrawals = pendingWithdrawals[dequeued:] + + // get additional withdrawal requests from block + blockBody := block.GetBlock() + if blockBody == nil { + continue + } + + requests, err := blockBody.ExecutionRequests() + if err != nil { + continue + } + + for _, withdrawal := range requests.Withdrawals { + + } + } + + return nil +} + +type replayBlockConsolidationsState struct { + block *Block + pendingWithdrawals []EpochStatsPendingWithdrawals + additionalWithdrawals []phase0.ValidatorIndex + validatorMap map[phase0.ValidatorIndex]*phase0.Validator +} + +func (dbw *dbWriter) replayBlockConsolidations(epochStats *EpochStats, block *Block, prevState *replayBlockConsolidationsState) ([]uint8, *replayBlockConsolidationsState) { + parentBlocks := dbw.getParentBlocks(block) + if prevState == nil || prevState.block.Slot < block.Slot { + epochStatsValues := epochStats.GetValues(false) + if epochStatsValues == nil { + return nil, nil + } + + pendingWithdrawals := epochStatsValues.PendingWithdrawals + if pendingWithdrawals == nil { + pendingWithdrawals = []EpochStatsPendingWithdrawals{} + } + + prevState = &replayBlockConsolidationsState{ + block: parentBlocks[0], + pendingWithdrawals: pendingWithdrawals, + additionalWithdrawals: []phase0.ValidatorIndex{}, + validatorMap: map[phase0.ValidatorIndex]*phase0.Validator{}, + } + } + + // get consolidations from block + blockBody := block.GetBlock() + if blockBody == nil { + return nil, nil + } + + blockRequests, err := blockBody.ExecutionRequests() + if err != nil { + return nil, nil + } + + // get relevant indices and prepare results array + relevantIndices := []phase0.ValidatorIndex{} + requestResults := make([]uint8, len(blockRequests.Consolidations)) + for i, consolidation := range blockRequests.Consolidations { + sourceIndice, sourceFound := dbw.indexer.validatorCache.getValidatorIndexByPubkey(consolidation.SourcePubkey) + if !sourceFound { + requestResults[i] = dbtypes.ConsolidationRequestResultSrcNotFound + continue + } + + targetIndice, targetFound := dbw.indexer.validatorCache.getValidatorIndexByPubkey(consolidation.TargetPubkey) + if !targetFound { + requestResults[i] = dbtypes.ConsolidationRequestResultTgtNotFound + continue + } + + relevantIndices = append(relevantIndices, sourceIndice, targetIndice) + } + + validatorMap := map[phase0.ValidatorIndex]*phase0.Validator{} + getValidator := func(index phase0.ValidatorIndex) *phase0.Validator { + if validator, ok := validatorMap[index]; ok { + return validator + } + + validator, err := dbw.indexer.validatorCache.getValidatorByIndex(index, nil) + if err != nil { + return nil + } + } + + // replay parent blocks up to the current block and apply all operations for relevant indices + for _, parentBlock := range parentBlocks { + blockBody := parentBlock.GetBlock() + if blockBody == nil { + return nil, nil + } + + // get bls changes + blsChanges, err := blockBody.BLSToExecutionChanges() + if err != nil { + return nil, nil + } + + for _, blsChange := range blsChanges { + validatorIndex := blsChange.Message.ValidatorIndex + if slices.Contains(relevantIndices, validatorIndex) { + prevState.validatorMap[validatorIndex] = &blsChange.Message.Validator + } + + if blsChange.ValidatorIndex == prevState.validatorMap[blsChange.ValidatorIndex].Index { + prevState.validatorMap[blsChange.ValidatorIndex].WithdrawalCredentials = blsChange.WithdrawalCredentials + } + } + + // get execution requests + requests, err := blockBody.ExecutionRequests() + requests, err := blockBody.ExecutionRequests() + if err != nil { + return nil + } + } + + return prevState + +} From c238cba617daca09c9800d2351ae1781073727d9 Mon Sep 17 00:00:00 2001 From: pk910 Date: Sat, 8 Feb 2025 03:12:52 +0100 Subject: [PATCH 02/18] commit --- clients/consensus/chainspec.go | 8 +- .../pgsql/20250207221132_request-results.sql | 3 + .../sqlite/20250207221132_request-results.sql | 3 + db/withdrawal_requests.go | 12 +- dbtypes/dbtypes.go | 19 + indexer/beacon/block.go | 4 +- indexer/beacon/block_helper.go | 26 +- indexer/beacon/epochcache.go | 13 + indexer/beacon/epochstate.go | 9 + indexer/beacon/epochstats.go | 135 ++--- indexer/beacon/finalization.go | 8 +- indexer/beacon/pruning.go | 6 +- indexer/beacon/state_sim.go | 476 ++++++++++++++++++ indexer/beacon/validatorcache.go | 11 + indexer/beacon/writedb.go | 60 ++- indexer/beacon/writedb_state.go | 191 ------- 16 files changed, 707 insertions(+), 277 deletions(-) create mode 100644 indexer/beacon/state_sim.go delete mode 100644 indexer/beacon/writedb_state.go diff --git a/clients/consensus/chainspec.go b/clients/consensus/chainspec.go index e40badec..bda692d4 100644 --- a/clients/consensus/chainspec.go +++ b/clients/consensus/chainspec.go @@ -58,7 +58,13 @@ type ChainSpec struct { MaxWithdrawalRequestsPerPayload uint64 `yaml:"MAX_WITHDRAWAL_REQUESTS_PER_PAYLOAD" check-if-fork:"ElectraForkEpoch"` DepositChainId uint64 `yaml:"DEPOSIT_CHAIN_ID"` MinActivationBalance uint64 `yaml:"MIN_ACTIVATION_BALANCE"` - MaxPendingPartialsPerWithdrawalsSweep uint64 `yaml:"MAX_PENDING_PARTIALS_PER_WITHDRAWALS_SWEEP"` + MaxPendingPartialsPerWithdrawalsSweep uint64 `yaml:"MAX_PENDING_PARTIALS_PER_WITHDRAWALS_SWEEP" check-if-fork:"ElectraForkEpoch"` + PendingPartialWithdrawalsLimit uint64 `yaml:"PENDING_PARTIAL_WITHDRAWALS_LIMIT" check-if-fork:"ElectraForkEpoch"` + PendingConsolidationsLimit uint64 `yaml:"PENDING_CONSOLIDATIONS_LIMIT" check-if-fork:"ElectraForkEpoch"` + MinPerEpochChurnLimitElectra uint64 `yaml:"MIN_PER_EPOCH_CHURN_LIMIT_ELECTRA" check-if-fork:"ElectraForkEpoch"` + MaxPerEpochActivationExitChurnLimit uint64 `yaml:"MAX_PER_EPOCH_ACTIVATION_EXIT_CHURN_LIMIT" check-if-fork:"ElectraForkEpoch"` + EffectiveBalanceIncrement uint64 `yaml:"EFFECTIVE_BALANCE_INCREMENT"` + ShardCommitteePeriod uint64 `yaml:"SHARD_COMMITTEE_PERIOD"` // EIP7594: PeerDAS NumberOfColumns *uint64 `yaml:"NUMBER_OF_COLUMNS" check-if-fork:"Eip7594ForkEpoch"` diff --git a/db/schema/pgsql/20250207221132_request-results.sql b/db/schema/pgsql/20250207221132_request-results.sql index 2f2cec85..a85525ff 100644 --- a/db/schema/pgsql/20250207221132_request-results.sql +++ b/db/schema/pgsql/20250207221132_request-results.sql @@ -4,6 +4,9 @@ ALTER TABLE public."consolidation_requests" ADD "result" TINYINT NOT NULL DEFAULT 0; +ALTER TABLE public."withdrawal_requests" + ADD "result" TINYINT NOT NULL DEFAULT 0; + -- +goose StatementEnd -- +goose Down -- +goose StatementBegin diff --git a/db/schema/sqlite/20250207221132_request-results.sql b/db/schema/sqlite/20250207221132_request-results.sql index 2b5f8d22..3a579fd5 100644 --- a/db/schema/sqlite/20250207221132_request-results.sql +++ b/db/schema/sqlite/20250207221132_request-results.sql @@ -4,6 +4,9 @@ ALTER TABLE "consolidation_requests" ADD "result" TINYINT NOT NULL DEFAULT 0; +ALTER TABLE "withdrawal_requests" + ADD "result" TINYINT NOT NULL DEFAULT 0; + -- +goose StatementEnd -- +goose Down -- +goose StatementBegin diff --git a/db/withdrawal_requests.go b/db/withdrawal_requests.go index dab0a5f6..12d422ff 100644 --- a/db/withdrawal_requests.go +++ b/db/withdrawal_requests.go @@ -15,11 +15,11 @@ func InsertWithdrawalRequests(elRequests []*dbtypes.WithdrawalRequest, tx *sqlx. dbtypes.DBEnginePgsql: "INSERT INTO withdrawal_requests ", dbtypes.DBEngineSqlite: "INSERT OR REPLACE INTO withdrawal_requests ", }), - "(slot_number, slot_root, slot_index, orphaned, fork_id, source_address, validator_index, validator_pubkey, amount, tx_hash, block_number)", + "(slot_number, slot_root, slot_index, orphaned, fork_id, source_address, validator_index, validator_pubkey, amount, tx_hash, block_number, result)", " VALUES ", ) argIdx := 0 - fieldCount := 11 + fieldCount := 12 args := make([]any, len(elRequests)*fieldCount) for i, elRequest := range elRequests { @@ -47,10 +47,11 @@ func InsertWithdrawalRequests(elRequests []*dbtypes.WithdrawalRequest, tx *sqlx. args[argIdx+8] = elRequest.Amount args[argIdx+9] = elRequest.TxHash args[argIdx+10] = elRequest.BlockNumber + args[argIdx+11] = elRequest.Result argIdx += fieldCount } fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{ - dbtypes.DBEnginePgsql: " ON CONFLICT (slot_root, slot_index) DO UPDATE SET orphaned = excluded.orphaned, tx_hash = excluded.tx_hash", + dbtypes.DBEnginePgsql: " ON CONFLICT (slot_root, slot_index) DO UPDATE SET orphaned = excluded.orphaned, tx_hash = excluded.tx_hash, result = excluded.result", dbtypes.DBEngineSqlite: "", })) @@ -67,7 +68,7 @@ func GetWithdrawalRequestsFiltered(offset uint64, limit uint32, canonicalForkIds fmt.Fprint(&sql, ` WITH cte AS ( SELECT - slot_number, slot_index, slot_root, orphaned, fork_id, source_address, validator_index, validator_pubkey, CAST(amount AS BIGINT), tx_hash, block_number + slot_number, slot_index, slot_root, orphaned, fork_id, source_address, validator_index, validator_pubkey, CAST(amount AS BIGINT), tx_hash, block_number, result FROM withdrawal_requests `) @@ -159,7 +160,8 @@ func GetWithdrawalRequestsFiltered(offset uint64, limit uint32, canonicalForkIds null AS validator_pubkey, CAST(0 AS BIGINT) AS amount, null AS tx_hash, - 0 AS block_number + 0 AS block_number, + 0 AS result FROM cte UNION ALL SELECT * FROM ( SELECT * FROM cte diff --git a/dbtypes/dbtypes.go b/dbtypes/dbtypes.go index c1200528..abb0b941 100644 --- a/dbtypes/dbtypes.go +++ b/dbtypes/dbtypes.go @@ -308,6 +308,24 @@ type ConsolidationRequestTx struct { DequeueBlock uint64 `db:"dequeue_block"` } +const ( + WithdrawalRequestResultUnknown uint8 = 0 + WithdrawalRequestResultSuccess uint8 = 1 + + // global errors + WithdrawalRequestResultQueueFull uint8 = 10 + + // validator errors + WithdrawalRequestResultValidatorNotFound uint8 = 20 + WithdrawalRequestResultValidatorInvalidCredentials uint8 = 21 + WithdrawalRequestResultValidatorInvalidSender uint8 = 22 + WithdrawalRequestResultValidatorNotActive uint8 = 23 + WithdrawalRequestResultValidatorNotOldEnough uint8 = 24 + WithdrawalRequestResultValidatorNotCompounding uint8 = 25 + WithdrawalRequestResultValidatorHasPendingWithdrawal uint8 = 26 + WithdrawalRequestResultValidatorBalanceTooLow uint8 = 27 +) + type WithdrawalRequest struct { SlotNumber uint64 `db:"slot_number"` SlotRoot []byte `db:"slot_root"` @@ -320,6 +338,7 @@ type WithdrawalRequest struct { Amount int64 `db:"amount"` TxHash []byte `db:"tx_hash"` BlockNumber uint64 `db:"block_number"` + Result uint8 `db:"result"` } type WithdrawalRequestTx struct { diff --git a/indexer/beacon/block.go b/indexer/beacon/block.go index 4ca8c8d6..0d123103 100644 --- a/indexer/beacon/block.go +++ b/indexer/beacon/block.go @@ -431,7 +431,7 @@ func (block *Block) GetDbWithdrawalRequests(indexer *Indexer, isCanonical bool) return nil } - return indexer.dbWriter.buildDbWithdrawalRequests(block, !isCanonical, nil) + return indexer.dbWriter.buildDbWithdrawalRequests(block, !isCanonical, nil, nil) } // GetDbConsolidationRequests returns the database representation of the consolidation requests in this block. @@ -440,7 +440,7 @@ func (block *Block) GetDbConsolidationRequests(indexer *Indexer, isCanonical boo return nil } - return indexer.dbWriter.buildDbConsolidationRequests(block, !isCanonical, nil) + return indexer.dbWriter.buildDbConsolidationRequests(block, !isCanonical, nil, nil) } // GetForkId returns the fork ID of this block. diff --git a/indexer/beacon/block_helper.go b/indexer/beacon/block_helper.go index 7fd92544..66ddfa03 100644 --- a/indexer/beacon/block_helper.go +++ b/indexer/beacon/block_helper.go @@ -326,7 +326,7 @@ func getStateCurrentSyncCommittee(v *spec.VersionedBeaconState) ([]phase0.BLSPub } } -// getStatePendingWithdrawals returns the current sync committee from a versioned beacon state. +// getStatePendingWithdrawals returns the pending withdrawals from a versioned beacon state. func getStatePendingWithdrawals(v *spec.VersionedBeaconState) ([]*electra.PendingPartialWithdrawal, error) { switch v.Version { case spec.DataVersionPhase0: @@ -349,3 +349,27 @@ func getStatePendingWithdrawals(v *spec.VersionedBeaconState) ([]*electra.Pendin return nil, errors.New("unknown version") } } + +// getStatePendingConsolidations returns the pending consolidations from a versioned beacon state. +func getStatePendingConsolidations(v *spec.VersionedBeaconState) ([]*electra.PendingConsolidation, error) { + switch v.Version { + case spec.DataVersionPhase0: + return nil, errors.New("no pending consolidations in phase0") + case spec.DataVersionAltair: + return nil, errors.New("no pending consolidations in altair") + case spec.DataVersionBellatrix: + return nil, errors.New("no pending consolidations in bellatrix") + case spec.DataVersionCapella: + return nil, errors.New("no pending consolidations in capella") + case spec.DataVersionDeneb: + return nil, errors.New("no pending consolidations in deneb") + case spec.DataVersionElectra: + if v.Electra == nil || v.Electra.PendingConsolidations == nil { + return nil, errors.New("no electra block") + } + + return v.Electra.PendingConsolidations, nil + default: + return nil, errors.New("unknown version") + } +} diff --git a/indexer/beacon/epochcache.go b/indexer/beacon/epochcache.go index edbdf5cd..29524b10 100644 --- a/indexer/beacon/epochcache.go +++ b/indexer/beacon/epochcache.go @@ -154,6 +154,19 @@ func (cache *epochCache) getEpochStatsByEpoch(epoch phase0.Epoch) []*EpochStats return statsList } +func (cache *epochCache) getEpochStatsByEpochAndRoot(epoch phase0.Epoch, blockRoot phase0.Root) *EpochStats { + cache.cacheMutex.RLock() + defer cache.cacheMutex.RUnlock() + + for _, stats := range cache.statsMap { + if stats.epoch == epoch && cache.indexer.blockCache.isCanonicalBlock(stats.dependentRoot, blockRoot) { + return stats + } + } + + return nil +} + func (cache *epochCache) getEpochStatsBeforeEpoch(epoch phase0.Epoch) []*EpochStats { cache.cacheMutex.RLock() defer cache.cacheMutex.RUnlock() diff --git a/indexer/beacon/epochstate.go b/indexer/beacon/epochstate.go index 65b2cda6..d4be4e2b 100644 --- a/indexer/beacon/epochstate.go +++ b/indexer/beacon/epochstate.go @@ -28,6 +28,7 @@ type epochState struct { depositIndex uint64 syncCommittee []phase0.ValidatorIndex pendingPartialWithdrawals []*electra.PendingPartialWithdrawal + pendingConsolidations []*electra.PendingConsolidation } // newEpochState creates a new epochState instance with the root of the state to be loaded. @@ -198,6 +199,14 @@ func (s *epochState) processState(state *spec.VersionedBeaconState, cache *epoch return fmt.Errorf("error getting pending withdrawal indices from state %v: %v", s.slotRoot.String(), err) } s.pendingPartialWithdrawals = pendingPartialWithdrawals + + pendingConsolidations, err := getStatePendingConsolidations(state) + if err != nil { + return fmt.Errorf("error getting pending consolidation indices from state %v: %v", s.slotRoot.String(), err) + } + + // apply epoch transition to get remaining pending consolidations + s.pendingConsolidations = pendingConsolidations } return nil diff --git a/indexer/beacon/epochstats.go b/indexer/beacon/epochstats.go index 05a1c68d..0ebdc407 100644 --- a/indexer/beacon/epochstats.go +++ b/indexer/beacon/epochstats.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/attestantio/go-eth2-client/spec/electra" "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/ethpandaops/dora/clients/consensus" "github.com/ethpandaops/dora/db" @@ -44,32 +45,34 @@ type EpochStats struct { // EpochStatsValues holds the values for the epoch-specific information. type EpochStatsValues struct { - RandaoMix phase0.Hash32 - NextRandaoMix phase0.Hash32 - ActiveIndices []phase0.ValidatorIndex - EffectiveBalances []uint16 - ProposerDuties []phase0.ValidatorIndex - AttesterDuties [][][]duties.ActiveIndiceIndex - SyncCommitteeDuties []phase0.ValidatorIndex - ActiveValidators uint64 - TotalBalance phase0.Gwei - ActiveBalance phase0.Gwei - EffectiveBalance phase0.Gwei - FirstDepositIndex uint64 - PendingWithdrawals []EpochStatsPendingWithdrawals + RandaoMix phase0.Hash32 + NextRandaoMix phase0.Hash32 + ActiveIndices []phase0.ValidatorIndex + EffectiveBalances []uint16 + ProposerDuties []phase0.ValidatorIndex + AttesterDuties [][][]duties.ActiveIndiceIndex + SyncCommitteeDuties []phase0.ValidatorIndex + ActiveValidators uint64 + TotalBalance phase0.Gwei + ActiveBalance phase0.Gwei + EffectiveBalance phase0.Gwei + FirstDepositIndex uint64 + PendingWithdrawals []EpochStatsPendingWithdrawals + PendingConsolidations []electra.PendingConsolidation } // EpochStatsPacked holds the packed values for the epoch-specific information. type EpochStatsPacked struct { - ActiveValidators []EpochStatsPackedValidator - ProposerDuties []phase0.ValidatorIndex - SyncCommitteeDuties []phase0.ValidatorIndex - RandaoMix phase0.Hash32 - NextRandaoMix phase0.Hash32 - TotalBalance phase0.Gwei - ActiveBalance phase0.Gwei - FirstDepositIndex uint64 - PendingWithdrawals []EpochStatsPendingWithdrawals + ActiveValidators []EpochStatsPackedValidator + ProposerDuties []phase0.ValidatorIndex + SyncCommitteeDuties []phase0.ValidatorIndex + RandaoMix phase0.Hash32 + NextRandaoMix phase0.Hash32 + TotalBalance phase0.Gwei + ActiveBalance phase0.Gwei + FirstDepositIndex uint64 + PendingWithdrawals []EpochStatsPendingWithdrawals + PendingConsolidations []electra.PendingConsolidation } // EpochStatsPackedValidator holds the packed values for an active validator. @@ -166,15 +169,16 @@ func (es *EpochStats) buildPackedSSZ(dynSsz *dynssz.DynSsz) ([]byte, error) { } packedValues := &EpochStatsPacked{ - ActiveValidators: make([]EpochStatsPackedValidator, es.values.ActiveValidators), - ProposerDuties: es.values.ProposerDuties, - SyncCommitteeDuties: es.values.SyncCommitteeDuties, - RandaoMix: es.values.RandaoMix, - NextRandaoMix: es.values.NextRandaoMix, - TotalBalance: es.values.TotalBalance, - ActiveBalance: es.values.ActiveBalance, - FirstDepositIndex: es.values.FirstDepositIndex, - PendingWithdrawals: es.values.PendingWithdrawals, + ActiveValidators: make([]EpochStatsPackedValidator, es.values.ActiveValidators), + ProposerDuties: es.values.ProposerDuties, + SyncCommitteeDuties: es.values.SyncCommitteeDuties, + RandaoMix: es.values.RandaoMix, + NextRandaoMix: es.values.NextRandaoMix, + TotalBalance: es.values.TotalBalance, + ActiveBalance: es.values.ActiveBalance, + FirstDepositIndex: es.values.FirstDepositIndex, + PendingWithdrawals: es.values.PendingWithdrawals, + PendingConsolidations: es.values.PendingConsolidations, } lastValidatorIndex := phase0.ValidatorIndex(0) @@ -219,17 +223,18 @@ func (es *EpochStats) parsePackedSSZ(dynSsz *dynssz.DynSsz, chainState *consensu } values := &EpochStatsValues{ - RandaoMix: packedValues.RandaoMix, - NextRandaoMix: packedValues.NextRandaoMix, - ActiveIndices: make([]phase0.ValidatorIndex, len(packedValues.ActiveValidators)), - EffectiveBalances: make([]uint16, len(packedValues.ActiveValidators)), - ProposerDuties: packedValues.ProposerDuties, - SyncCommitteeDuties: packedValues.SyncCommitteeDuties, - TotalBalance: packedValues.TotalBalance, - ActiveBalance: packedValues.ActiveBalance, - EffectiveBalance: 0, - FirstDepositIndex: packedValues.FirstDepositIndex, - PendingWithdrawals: packedValues.PendingWithdrawals, + RandaoMix: packedValues.RandaoMix, + NextRandaoMix: packedValues.NextRandaoMix, + ActiveIndices: make([]phase0.ValidatorIndex, len(packedValues.ActiveValidators)), + EffectiveBalances: make([]uint16, len(packedValues.ActiveValidators)), + ProposerDuties: packedValues.ProposerDuties, + SyncCommitteeDuties: packedValues.SyncCommitteeDuties, + TotalBalance: packedValues.TotalBalance, + ActiveBalance: packedValues.ActiveBalance, + EffectiveBalance: 0, + FirstDepositIndex: packedValues.FirstDepositIndex, + PendingWithdrawals: packedValues.PendingWithdrawals, + PendingConsolidations: packedValues.PendingConsolidations, } lastValidatorIndex := phase0.ValidatorIndex(0) @@ -287,18 +292,19 @@ func (es *EpochStats) pruneValues() { } es.prunedValues = &EpochStatsValues{ - RandaoMix: es.values.RandaoMix, - NextRandaoMix: es.values.NextRandaoMix, - EffectiveBalances: nil, // prune - ProposerDuties: es.values.ProposerDuties, - AttesterDuties: nil, // prune - SyncCommitteeDuties: es.values.SyncCommitteeDuties, - ActiveValidators: es.values.ActiveValidators, - TotalBalance: es.values.TotalBalance, - ActiveBalance: es.values.ActiveBalance, - EffectiveBalance: es.values.EffectiveBalance, - FirstDepositIndex: es.values.FirstDepositIndex, - PendingWithdrawals: nil, // prune + RandaoMix: es.values.RandaoMix, + NextRandaoMix: es.values.NextRandaoMix, + EffectiveBalances: nil, // prune + ProposerDuties: es.values.ProposerDuties, + AttesterDuties: nil, // prune + SyncCommitteeDuties: es.values.SyncCommitteeDuties, + ActiveValidators: es.values.ActiveValidators, + TotalBalance: es.values.TotalBalance, + ActiveBalance: es.values.ActiveBalance, + EffectiveBalance: es.values.EffectiveBalance, + FirstDepositIndex: es.values.FirstDepositIndex, + PendingWithdrawals: nil, // prune + PendingConsolidations: nil, // prune } es.values = nil @@ -345,14 +351,15 @@ func (es *EpochStats) processState(indexer *Indexer, validatorSet []*phase0.Vali chainState := indexer.consensusPool.GetChainState() values := &EpochStatsValues{ - ActiveIndices: make([]phase0.ValidatorIndex, 0), - EffectiveBalances: make([]uint16, 0), - SyncCommitteeDuties: es.dependentState.syncCommittee, - TotalBalance: 0, - ActiveBalance: 0, - EffectiveBalance: 0, - FirstDepositIndex: es.dependentState.depositIndex, - PendingWithdrawals: make([]EpochStatsPendingWithdrawals, len(es.dependentState.pendingPartialWithdrawals)), + ActiveIndices: make([]phase0.ValidatorIndex, 0), + EffectiveBalances: make([]uint16, 0), + SyncCommitteeDuties: es.dependentState.syncCommittee, + TotalBalance: 0, + ActiveBalance: 0, + EffectiveBalance: 0, + FirstDepositIndex: es.dependentState.depositIndex, + PendingWithdrawals: make([]EpochStatsPendingWithdrawals, len(es.dependentState.pendingPartialWithdrawals)), + PendingConsolidations: make([]electra.PendingConsolidation, len(es.dependentState.pendingConsolidations)), } for i, pendingPartialWithdrawal := range es.dependentState.pendingPartialWithdrawals { @@ -362,6 +369,10 @@ func (es *EpochStats) processState(indexer *Indexer, validatorSet []*phase0.Vali } } + for i, pendingConsolidation := range es.dependentState.pendingConsolidations { + values.PendingConsolidations[i] = *pendingConsolidation + } + if validatorSet != nil { for index, validator := range validatorSet { values.TotalBalance += es.dependentState.validatorBalances[index] diff --git a/indexer/beacon/finalization.go b/indexer/beacon/finalization.go index 9aae391c..a248ec6f 100644 --- a/indexer/beacon/finalization.go +++ b/indexer/beacon/finalization.go @@ -311,11 +311,17 @@ func (indexer *Indexer) finalizeEpoch(epoch phase0.Epoch, justifiedRoot phase0.R dependentBlock := indexer.blockCache.getDependentBlock(chainState, block, client) var epochStats *EpochStats + if dependentBlock != nil { epochStats = indexer.epochCache.getEpochStats(epoch, dependentBlock.Root) } - if _, err := indexer.dbWriter.persistBlockData(tx, block, epochStats, nil, true, nil); err != nil { + var sim *stateSimulator + if epochStats != nil { + sim = newStateSimulator(indexer, epochStats) + } + + if _, err := indexer.dbWriter.persistBlockData(tx, block, epochStats, nil, true, nil, sim); err != nil { return fmt.Errorf("failed persisting orphaned slot %v (%v): %v", block.Slot, block.Root.String(), err) } diff --git a/indexer/beacon/pruning.go b/indexer/beacon/pruning.go index 65033d7f..99a447f1 100644 --- a/indexer/beacon/pruning.go +++ b/indexer/beacon/pruning.go @@ -194,13 +194,14 @@ func (indexer *Indexer) processEpochPruning(pruneEpoch phase0.Epoch) (uint64, ui persistedBlocks := map[phase0.Root]bool{} for _, epochData := range epochData { + sim := newStateSimulator(indexer, epochData.epochStats) dbEpoch := indexer.dbWriter.buildDbEpoch(pruneEpoch, epochData.chain, epochData.epochStats, epochData.epochVotes, func(block *Block, depositIndex *uint64) { if persistedBlocks[block.Root] { return } // persist pruned block data as orphaned here, the canonical blocks will be updated by the finalization or synchronization process later - _, err := indexer.dbWriter.persistBlockData(tx, block, epochData.epochStats, depositIndex, true, nil) + _, err := indexer.dbWriter.persistBlockData(tx, block, epochData.epochStats, depositIndex, true, nil, sim) if err != nil { indexer.logger.Errorf("error persisting pruned slot %v: %v", block.Root.String(), err) } @@ -342,7 +343,8 @@ func (indexer *Indexer) processCachePruning(prunedEpochStats, prunedEpochStates if len(pruningData) > 0 { err := db.RunDBTransaction(func(tx *sqlx.Tx) error { for _, pruneBlock := range pruningData { - _, err := indexer.dbWriter.persistBlockData(tx, pruneBlock.block, pruneBlock.epochStats, nil, true, nil) + sim := newStateSimulator(indexer, pruneBlock.epochStats) + _, err := indexer.dbWriter.persistBlockData(tx, pruneBlock.block, pruneBlock.epochStats, nil, true, nil, sim) if err != nil { indexer.logger.Errorf("error persisting old pruned slot %v: %v", pruneBlock.block.Root.String(), err) } diff --git a/indexer/beacon/state_sim.go b/indexer/beacon/state_sim.go new file mode 100644 index 00000000..b6a1daf5 --- /dev/null +++ b/indexer/beacon/state_sim.go @@ -0,0 +1,476 @@ +package beacon + +import ( + "bytes" + "slices" + + "github.com/attestantio/go-eth2-client/spec/electra" + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/ethpandaops/dora/dbtypes" + "github.com/juliangruber/go-intersect" +) + +type stateSimulator struct { + indexer *Indexer + epochStats *EpochStats + epochStatsValues *EpochStatsValues + prevState *stateSimulatorState +} + +type stateSimulatorState struct { + epochRoot phase0.Root + block *Block + pendingWithdrawals []EpochStatsPendingWithdrawals + additionalWithdrawals []phase0.ValidatorIndex + pendingConsolidationCount uint64 + validatorMap map[phase0.ValidatorIndex]*phase0.Validator + blockResults [][]uint8 +} + +func newStateSimulator(indexer *Indexer, epochStats *EpochStats) *stateSimulator { + sim := &stateSimulator{ + indexer: indexer, + epochStats: epochStats, + epochStatsValues: epochStats.GetValues(false), + prevState: nil, + } + + if sim.epochStatsValues == nil { + return nil + } + + return sim +} + +func (sim *stateSimulator) getParentBlocks(block *Block) []*Block { + chainState := sim.indexer.consensusPool.GetChainState() + minSlot := chainState.EpochToSlot(chainState.EpochOfSlot(block.Slot)) + parentBlocks := []*Block{} + + for { + parentBlockRoot := block.GetParentRoot() + if parentBlockRoot == nil { + break + } + + parentBlock := sim.indexer.GetBlockByRoot(*parentBlockRoot) + if parentBlock == nil { + break + } + + if parentBlock.Slot < minSlot { + break + } + + parentBlocks = append(parentBlocks, parentBlock) + block = parentBlock + } + + slices.Reverse(parentBlocks) + + return parentBlocks +} + +func (sim *stateSimulator) resetState(block *Block) *stateSimulatorState { + pendingWithdrawals := sim.epochStatsValues.PendingWithdrawals + if pendingWithdrawals == nil { + pendingWithdrawals = []EpochStatsPendingWithdrawals{} + } + + state := &stateSimulatorState{ + block: nil, + epochRoot: block.Root, + pendingWithdrawals: pendingWithdrawals, + pendingConsolidationCount: 0, + additionalWithdrawals: []phase0.ValidatorIndex{}, + validatorMap: map[phase0.ValidatorIndex]*phase0.Validator{}, + } + sim.prevState = state + + // get pending consolidations from state + processedConsolidations := uint64(0) + for _, pendingConsolidation := range sim.epochStatsValues.PendingConsolidations { + srcValidator := sim.getValidator(pendingConsolidation.SourceIndex) + if srcValidator == nil { + return nil + } + + if srcValidator.WithdrawableEpoch > sim.epochStats.epoch { + break + } + + processedConsolidations++ + } + + state.pendingConsolidationCount = uint64(len(sim.epochStatsValues.PendingConsolidations)) - processedConsolidations + + // get pending withdrawals from state + state.pendingWithdrawals = sim.epochStatsValues.PendingWithdrawals + + return state +} + +func (sim *stateSimulator) getValidator(index phase0.ValidatorIndex) *phase0.Validator { + if validator, ok := sim.prevState.validatorMap[index]; ok { + return validator + } + + validator := sim.indexer.validatorCache.getValidatorByIndexAndRoot(index, sim.prevState.epochRoot) + sim.prevState.validatorMap[index] = validator + return validator +} + +func (sim *stateSimulator) applyConsolidation(consolidation *electra.ConsolidationRequest) uint8 { + chainState := sim.indexer.consensusPool.GetChainState() + chainSpec := chainState.GetSpecs() + + // this follows the spec logic: + // https://github.com/ethereum/consensus-specs/blob/dev/specs/electra/beacon-chain.md#new-process_consolidation_request + + sourceIndice, sourceFound := sim.indexer.pubkeyCache.Get(consolidation.SourcePubkey) + if !sourceFound { + return dbtypes.ConsolidationRequestResultSrcNotFound + } + + targetIndice, targetFound := sim.indexer.pubkeyCache.Get(consolidation.TargetPubkey) + if !targetFound { + return dbtypes.ConsolidationRequestResultTgtNotFound + } + + srcValidator := sim.getValidator(sourceIndice) + if srcValidator == nil { + return dbtypes.ConsolidationRequestResultSrcNotFound + } + + tgtValidator := sim.getValidator(targetIndice) + if tgtValidator == nil { + return dbtypes.ConsolidationRequestResultTgtNotFound + } + + srcWithdrawalCreds := srcValidator.WithdrawalCredentials + if srcWithdrawalCreds[0] == 0x00 { + return dbtypes.ConsolidationRequestResultSrcInvalidCredentials + } + + if !bytes.Equal(srcWithdrawalCreds[12:], consolidation.SourceAddress[:]) { + return dbtypes.ConsolidationRequestResultSrcInvalidCredentials + } + + tgtWithdrawalCreds := tgtValidator.WithdrawalCredentials + if tgtWithdrawalCreds[0] == 0x00 { + return dbtypes.ConsolidationRequestResultTgtInvalidCredentials + } + + if !bytes.Equal(tgtWithdrawalCreds[12:], consolidation.SourceAddress[:]) { + return dbtypes.ConsolidationRequestResultTgtInvalidCredentials + } + + if srcValidator == tgtValidator { + // self consolidation, set withdrawal credentials to 0x02 + srcValidator.WithdrawalCredentials[0] = 0x02 + return dbtypes.ConsolidationRequestResultSuccess + } + + if sim.prevState.pendingConsolidationCount >= chainSpec.PendingConsolidationsLimit { + return dbtypes.ConsolidationRequestResultQueueFull + } + + if tgtValidator.WithdrawalCredentials[0] != 0x02 { + return dbtypes.ConsolidationRequestResultTgtNotCompounding + } + + // get_balance_churn_limit + balanceChurnLimit := uint64(sim.epochStatsValues.EffectiveBalance) / chainSpec.ChurnLimitQuotient + if chainSpec.MinPerEpochChurnLimitElectra > balanceChurnLimit { + balanceChurnLimit = chainSpec.MinPerEpochChurnLimitElectra + } + balanceChurnLimit = balanceChurnLimit - (balanceChurnLimit % chainSpec.EffectiveBalanceIncrement) + + // get_activation_exit_churn_limit + activationExitChurnLimit := balanceChurnLimit + if chainSpec.MaxPerEpochActivationExitChurnLimit < activationExitChurnLimit { + activationExitChurnLimit = chainSpec.MaxPerEpochActivationExitChurnLimit + } + + // get_consolidation_churn_limit + consolidationChurnLimit := balanceChurnLimit - activationExitChurnLimit + + // check consolidationChurnLimit + if consolidationChurnLimit <= chainSpec.MinActivationBalance { + return dbtypes.ConsolidationRequestResultTotalBalanceTooLow + } + + if srcValidator.ActivationEpoch == FarFutureEpoch || srcValidator.ActivationEpoch > sim.epochStats.epoch || srcValidator.ExitEpoch < FarFutureEpoch { + return dbtypes.ConsolidationRequestResultSrcNotActive + } + + if tgtValidator.ActivationEpoch == FarFutureEpoch || tgtValidator.ActivationEpoch > sim.epochStats.epoch || tgtValidator.ExitEpoch < FarFutureEpoch { + return dbtypes.ConsolidationRequestResultTgtNotActive + } + + if sim.epochStats.epoch < srcValidator.ActivationEpoch+phase0.Epoch(chainSpec.ShardCommitteePeriod) { + return dbtypes.ConsolidationRequestResultSrcNotOldEnough + } + + // check pending withdrawals for source + pendingWithdrawals := 0 + for _, pendingWithdrawal := range sim.prevState.pendingWithdrawals { + if pendingWithdrawal.ValidatorIndex == sourceIndice { + pendingWithdrawals++ + break + } + } + + if pendingWithdrawals > 0 || slices.Contains(sim.prevState.additionalWithdrawals, sourceIndice) { + return dbtypes.ConsolidationRequestResultSrcHasPendingWithdrawal + } + + sim.prevState.pendingConsolidationCount++ + srcValidator.ExitEpoch = FarFutureEpoch - 1 // dummy value to indicate the validator is exiting, but we don't know when exactly + return dbtypes.ConsolidationRequestResultSuccess +} + +func (sim *stateSimulator) applyWithdrawal(withdrawal *electra.WithdrawalRequest) uint8 { + chainState := sim.indexer.consensusPool.GetChainState() + chainSpec := chainState.GetSpecs() + + // this follows the spec logic: + // https://github.com/ethereum/consensus-specs/blob/dev/specs/electra/beacon-chain.md#new-process_withdrawal_request + + if uint64(len(sim.prevState.pendingWithdrawals)+len(sim.prevState.additionalWithdrawals)) >= chainSpec.PendingPartialWithdrawalsLimit { + return dbtypes.WithdrawalRequestResultQueueFull + } + + validatorIndice, validatorFound := sim.indexer.pubkeyCache.Get(withdrawal.ValidatorPubkey) + if !validatorFound { + return dbtypes.WithdrawalRequestResultValidatorNotFound + } + + validator := sim.getValidator(validatorIndice) + if validator == nil { + return dbtypes.WithdrawalRequestResultValidatorNotFound + } + + srcWithdrawalCreds := validator.WithdrawalCredentials + if srcWithdrawalCreds[0] == 0x00 { + return dbtypes.WithdrawalRequestResultValidatorInvalidCredentials + } + + if !bytes.Equal(srcWithdrawalCreds[12:], withdrawal.SourceAddress[:]) { + return dbtypes.WithdrawalRequestResultValidatorInvalidCredentials + } + + if validator.ActivationEpoch == FarFutureEpoch || validator.ActivationEpoch > sim.epochStats.epoch || validator.ExitEpoch < FarFutureEpoch { + return dbtypes.WithdrawalRequestResultValidatorNotActive + } + + if sim.epochStats.epoch < validator.ActivationEpoch+phase0.Epoch(chainSpec.ShardCommitteePeriod) { + return dbtypes.WithdrawalRequestResultValidatorNotOldEnough + } + + if withdrawal.Amount == 0 { + pendingWithdrawals := 0 + for _, pendingWithdrawal := range sim.prevState.pendingWithdrawals { + if pendingWithdrawal.ValidatorIndex == validatorIndice { + pendingWithdrawals++ + break + } + } + + if pendingWithdrawals > 0 || slices.Contains(sim.prevState.additionalWithdrawals, validatorIndice) { + return dbtypes.WithdrawalRequestResultValidatorHasPendingWithdrawal + } + + validator.ExitEpoch = FarFutureEpoch - 1 // dummy value to indicate the validator is exiting, but we don't know when exactly + return dbtypes.WithdrawalRequestResultSuccess + } + + if validator.WithdrawalCredentials[0] != 0x02 { + return dbtypes.WithdrawalRequestResultValidatorNotCompounding + } + + if validator.EffectiveBalance < phase0.Gwei(chainSpec.MinActivationBalance) { + return dbtypes.WithdrawalRequestResultValidatorBalanceTooLow + } + + sim.prevState.additionalWithdrawals = append(sim.prevState.additionalWithdrawals, validatorIndice) + + return dbtypes.WithdrawalRequestResultSuccess +} + +func (sim *stateSimulator) applyBlock(block *Block) [][]uint8 { + if sim.prevState.block != nil && sim.prevState.block.Slot >= block.Slot { + return nil + } + + blockBody := block.GetBlock() + if blockBody == nil { + return nil + } + + // process pending withdrawals + chainState := sim.indexer.consensusPool.GetChainState() + chainSpec := chainState.GetSpecs() + processedWithdrawals := uint64(0) + skippedWithdrawals := uint64(0) + for _, pendingWithdrawal := range sim.prevState.pendingWithdrawals { + if pendingWithdrawal.Epoch > sim.epochStats.epoch { + break + } + + srcValidator := sim.getValidator(pendingWithdrawal.ValidatorIndex) + if srcValidator == nil { + return nil + } + + if srcValidator.ExitEpoch != FarFutureEpoch || srcValidator.EffectiveBalance < phase0.Gwei(chainSpec.MinActivationBalance) { + skippedWithdrawals++ + continue + } + + processedWithdrawals++ + if processedWithdrawals >= chainSpec.MaxPendingPartialsPerWithdrawalsSweep { + break + } + } + if processedWithdrawals+skippedWithdrawals > 0 { + sim.prevState.pendingWithdrawals = sim.prevState.pendingWithdrawals[processedWithdrawals+skippedWithdrawals:] + } + + // apply bls changes + blsChanges, err := blockBody.BLSToExecutionChanges() + if err != nil { + return nil + } + for _, blsChange := range blsChanges { + validatorIndex := blsChange.Message.ValidatorIndex + + validator := sim.getValidator(validatorIndex) + if validator == nil { + return nil + } + + wdcredsPrefix := []byte{0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} + validator.WithdrawalCredentials = append(wdcredsPrefix, blsChange.Message.ToExecutionAddress[:]...) + } + + // apply slashings + proposerSlashing, err := blockBody.ProposerSlashings() + if err != nil { + return nil + } + for _, proposerSlashing := range proposerSlashing { + proposerIndex := proposerSlashing.SignedHeader1.Message.ProposerIndex + validator := sim.getValidator(proposerIndex) + if validator == nil { + return nil + } + + validator.Slashed = true + validator.ExitEpoch = FarFutureEpoch - 1 // dummy value to indicate the validator is exiting, but we don't know when exactly + } + + attesterSlashing, err := blockBody.AttesterSlashings() + if err != nil { + return nil + } + for _, attesterSlashing := range attesterSlashing { + att1, _ := attesterSlashing.Attestation1() + att2, _ := attesterSlashing.Attestation2() + if att1 == nil || att2 == nil { + continue + } + + att1AttestingIndices, _ := att1.AttestingIndices() + att2AttestingIndices, _ := att2.AttestingIndices() + if att1AttestingIndices == nil || att2AttestingIndices == nil { + continue + } + + inter := intersect.Simple(att1AttestingIndices, att2AttestingIndices) + for _, j := range inter { + valIdx := j.(uint64) + + validator := sim.getValidator(phase0.ValidatorIndex(valIdx)) + if validator == nil { + return nil + } + + validator.Slashed = true + validator.ExitEpoch = FarFutureEpoch - 1 // dummy value to indicate the validator is exiting, but we don't know when exactly + } + } + + // apply voluntary exits + voluntaryExits, err := blockBody.VoluntaryExits() + if err != nil { + return nil + } + for _, voluntaryExit := range voluntaryExits { + validator := sim.getValidator(voluntaryExit.Message.ValidatorIndex) + if validator == nil { + return nil + } + + validator.ExitEpoch = FarFutureEpoch - 1 // dummy value to indicate the validator is exiting, but we don't know when exactly + } + + // get execution requests + requests, err := blockBody.ExecutionRequests() + if err != nil { + return nil + } + + results := make([][]uint8, 2) + + // apply withdrawal requests + results[0] = make([]uint8, len(requests.Withdrawals)) + for i, withdrawal := range requests.Withdrawals { + results[0][i] = sim.applyWithdrawal(withdrawal) + } + + // apply consolidation requests + results[1] = make([]uint8, len(requests.Consolidations)) + for i, consolidation := range requests.Consolidations { + results[1][i] = sim.applyConsolidation(consolidation) + } + + sim.prevState.block = block + + return results +} + +func (sim *stateSimulator) replayBlockResults(block *Block) [][]uint8 { + chainState := sim.indexer.consensusPool.GetChainState() + chainSpec := chainState.GetSpecs() + if chainSpec.ElectraForkEpoch == nil || sim.epochStats.epoch < phase0.Epoch(*chainSpec.ElectraForkEpoch) { + return nil + } + + parentBlocks := sim.getParentBlocks(block) + if sim.prevState == nil || (sim.prevState.block != nil && sim.prevState.block.Slot < block.Slot) { + state := sim.resetState(block) + if state == nil { + return nil + } + } + + if sim.prevState.block == block { + return sim.prevState.blockResults + } + + // replay parent blocks up to the current block and apply all operations for relevant indices + for _, parentBlock := range parentBlocks { + sim.applyBlock(parentBlock) + } + + // get consolidations from block + blockBody := block.GetBlock() + if blockBody == nil { + return nil + } + + results := sim.applyBlock(block) + sim.prevState.blockResults = results + return results +} diff --git a/indexer/beacon/validatorcache.go b/indexer/beacon/validatorcache.go index 6f896ac6..06fce8ae 100644 --- a/indexer/beacon/validatorcache.go +++ b/indexer/beacon/validatorcache.go @@ -665,6 +665,17 @@ func (cache *validatorCache) getValidatorByIndexAndRoot(index phase0.ValidatorIn if dbValidator := db.GetValidatorByIndex(index); dbValidator != nil { validator = UnwrapDbValidator(dbValidator) } + } else { + validator = &phase0.Validator{ + PublicKey: validator.PublicKey, + WithdrawalCredentials: validator.WithdrawalCredentials, + EffectiveBalance: validator.EffectiveBalance, + Slashed: validator.Slashed, + ActivationEligibilityEpoch: validator.ActivationEligibilityEpoch, + ActivationEpoch: validator.ActivationEpoch, + ExitEpoch: validator.ExitEpoch, + WithdrawableEpoch: validator.WithdrawableEpoch, + } } return validator diff --git a/indexer/beacon/writedb.go b/indexer/beacon/writedb.go index d273a482..419c8fd8 100644 --- a/indexer/beacon/writedb.go +++ b/indexer/beacon/writedb.go @@ -58,7 +58,7 @@ func (dbw *dbWriter) persistMissedSlots(tx *sqlx.Tx, epoch phase0.Epoch, blocks return nil } -func (dbw *dbWriter) persistBlockData(tx *sqlx.Tx, block *Block, epochStats *EpochStats, depositIndex *uint64, orphaned bool, overrideForkId *ForkKey) (*dbtypes.Slot, error) { +func (dbw *dbWriter) persistBlockData(tx *sqlx.Tx, block *Block, epochStats *EpochStats, depositIndex *uint64, orphaned bool, overrideForkId *ForkKey, sim *stateSimulator) (*dbtypes.Slot, error) { // insert block dbBlock := dbw.buildDbBlock(block, epochStats, overrideForkId) if dbBlock == nil { @@ -78,7 +78,7 @@ func (dbw *dbWriter) persistBlockData(tx *sqlx.Tx, block *Block, epochStats *Epo // insert child objects if block.Slot > 0 { - err = dbw.persistBlockChildObjects(tx, block, depositIndex, orphaned, overrideForkId) + err = dbw.persistBlockChildObjects(tx, block, depositIndex, orphaned, overrideForkId, sim) if err != nil { return nil, err } @@ -87,7 +87,7 @@ func (dbw *dbWriter) persistBlockData(tx *sqlx.Tx, block *Block, epochStats *Epo return dbBlock, nil } -func (dbw *dbWriter) persistBlockChildObjects(tx *sqlx.Tx, block *Block, depositIndex *uint64, orphaned bool, overrideForkId *ForkKey) error { +func (dbw *dbWriter) persistBlockChildObjects(tx *sqlx.Tx, block *Block, depositIndex *uint64, orphaned bool, overrideForkId *ForkKey, sim *stateSimulator) error { var err error // insert deposits (pre/early electra) @@ -115,13 +115,13 @@ func (dbw *dbWriter) persistBlockChildObjects(tx *sqlx.Tx, block *Block, deposit } // insert consolidation requests - err = dbw.persistBlockConsolidationRequests(tx, block, orphaned, overrideForkId) + err = dbw.persistBlockConsolidationRequests(tx, block, orphaned, overrideForkId, sim) if err != nil { return err } // insert withdrawal requests - err = dbw.persistBlockWithdrawalRequests(tx, block, orphaned, overrideForkId) + err = dbw.persistBlockWithdrawalRequests(tx, block, orphaned, overrideForkId, sim) if err != nil { return err } @@ -137,8 +137,10 @@ func (dbw *dbWriter) persistEpochData(tx *sqlx.Tx, epoch phase0.Epoch, blocks [] } canonicalForkId := ForkKey(0) + sim := newStateSimulator(dbw.indexer, epochStats) + dbEpoch := dbw.buildDbEpoch(epoch, blocks, epochStats, epochVotes, func(block *Block, depositIndex *uint64) { - _, err := dbw.persistBlockData(tx, block, epochStats, depositIndex, false, &canonicalForkId) + _, err := dbw.persistBlockData(tx, block, epochStats, depositIndex, false, &canonicalForkId, sim) if err != nil { dbw.indexer.logger.Errorf("error persisting slot: %v", err) } @@ -665,9 +667,9 @@ func (dbw *dbWriter) buildDbSlashings(block *Block, orphaned bool, overrideForkI return dbSlashings } -func (dbw *dbWriter) persistBlockConsolidationRequests(tx *sqlx.Tx, block *Block, orphaned bool, overrideForkId *ForkKey) error { +func (dbw *dbWriter) persistBlockConsolidationRequests(tx *sqlx.Tx, block *Block, orphaned bool, overrideForkId *ForkKey, sim *stateSimulator) error { // insert consolidation requests - dbConsolidations := dbw.buildDbConsolidationRequests(block, orphaned, overrideForkId) + dbConsolidations := dbw.buildDbConsolidationRequests(block, orphaned, overrideForkId, sim) if orphaned { for idx := range dbConsolidations { dbConsolidations[idx].Orphaned = true @@ -684,7 +686,7 @@ func (dbw *dbWriter) persistBlockConsolidationRequests(tx *sqlx.Tx, block *Block return nil } -func (dbw *dbWriter) buildDbConsolidationRequests(block *Block, orphaned bool, overrideForkId *ForkKey) []*dbtypes.ConsolidationRequest { +func (dbw *dbWriter) buildDbConsolidationRequests(block *Block, orphaned bool, overrideForkId *ForkKey, sim *stateSimulator) []*dbtypes.ConsolidationRequest { blockBody := block.GetBlock() if blockBody == nil { return nil @@ -695,6 +697,19 @@ func (dbw *dbWriter) buildDbConsolidationRequests(block *Block, orphaned bool, o return nil } + if sim == nil { + chainState := dbw.indexer.consensusPool.GetChainState() + epochStats := dbw.indexer.epochCache.getEpochStatsByEpochAndRoot(chainState.EpochOfSlot(block.Slot), block.Root) + if epochStats != nil { + sim = newStateSimulator(dbw.indexer, epochStats) + } + } + + var blockResults [][]uint8 + if sim != nil { + blockResults = sim.replayBlockResults(block) + } + consolidations := requests.Consolidations if len(consolidations) == 0 { @@ -728,15 +743,19 @@ func (dbw *dbWriter) buildDbConsolidationRequests(block *Block, orphaned bool, o dbConsolidation.TargetIndex = &targetIdx } + if blockResults != nil { + dbConsolidation.Result = blockResults[1][idx] + } + dbConsolidations[idx] = dbConsolidation } return dbConsolidations } -func (dbw *dbWriter) persistBlockWithdrawalRequests(tx *sqlx.Tx, block *Block, orphaned bool, overrideForkId *ForkKey) error { +func (dbw *dbWriter) persistBlockWithdrawalRequests(tx *sqlx.Tx, block *Block, orphaned bool, overrideForkId *ForkKey, sim *stateSimulator) error { // insert deposits - dbWithdrawalRequests := dbw.buildDbWithdrawalRequests(block, orphaned, overrideForkId) + dbWithdrawalRequests := dbw.buildDbWithdrawalRequests(block, orphaned, overrideForkId, sim) if len(dbWithdrawalRequests) > 0 { err := db.InsertWithdrawalRequests(dbWithdrawalRequests, tx) @@ -748,7 +767,7 @@ func (dbw *dbWriter) persistBlockWithdrawalRequests(tx *sqlx.Tx, block *Block, o return nil } -func (dbw *dbWriter) buildDbWithdrawalRequests(block *Block, orphaned bool, overrideForkId *ForkKey) []*dbtypes.WithdrawalRequest { +func (dbw *dbWriter) buildDbWithdrawalRequests(block *Block, orphaned bool, overrideForkId *ForkKey, sim *stateSimulator) []*dbtypes.WithdrawalRequest { blockBody := block.GetBlock() if blockBody == nil { return nil @@ -759,6 +778,19 @@ func (dbw *dbWriter) buildDbWithdrawalRequests(block *Block, orphaned bool, over return nil } + if sim == nil { + chainState := dbw.indexer.consensusPool.GetChainState() + epochStats := dbw.indexer.epochCache.getEpochStatsByEpochAndRoot(chainState.EpochOfSlot(block.Slot), block.Root) + if epochStats != nil { + sim = newStateSimulator(dbw.indexer, epochStats) + } + } + + var blockResults [][]uint8 + if sim != nil { + blockResults = sim.replayBlockResults(block) + } + withdrawalRequests := requests.Withdrawals if len(withdrawalRequests) == 0 { @@ -788,6 +820,10 @@ func (dbw *dbWriter) buildDbWithdrawalRequests(block *Block, orphaned bool, over dbWithdrawalRequest.ValidatorIndex = &validatorIdx } + if blockResults != nil { + dbWithdrawalRequest.Result = blockResults[0][idx] + } + dbWithdrawalRequests[idx] = dbWithdrawalRequest } diff --git a/indexer/beacon/writedb_state.go b/indexer/beacon/writedb_state.go deleted file mode 100644 index aae13a7c..00000000 --- a/indexer/beacon/writedb_state.go +++ /dev/null @@ -1,191 +0,0 @@ -package beacon - -import ( - "slices" - - "github.com/attestantio/go-eth2-client/spec/phase0" - "github.com/ethpandaops/dora/dbtypes" -) - -func (dbw *dbWriter) getParentBlocks(block *Block) []*Block { - chainState := dbw.indexer.consensusPool.GetChainState() - minSlot := chainState.EpochToSlot(chainState.EpochOfSlot(block.Slot)) - parentBlocks := []*Block{} - - for { - parentBlockRoot := block.GetParentRoot() - if parentBlockRoot == nil { - break - } - - parentBlock := dbw.indexer.GetBlockByRoot(*parentBlockRoot) - if parentBlock == nil { - break - } - - if parentBlock.Slot < minSlot { - break - } - - parentBlocks = append(parentBlocks, parentBlock) - } - - slices.Reverse(parentBlocks) - - return parentBlocks -} - -func (dbw *dbWriter) replayPendingWithdrawalsMap(epochStats *EpochStats, parentBlocks []*Block) map[phase0.ValidatorIndex]bool { - withdrawalsMap := map[phase0.ValidatorIndex]bool{} - - chainState := dbw.indexer.consensusPool.GetChainState() - chainSpec := chainState.GetSpecs() - if chainSpec.ElectraForkEpoch == nil || epochStats.epoch < phase0.Epoch(*chainSpec.ElectraForkEpoch) { - return withdrawalsMap - } - - var pendingWithdrawals []EpochStatsPendingWithdrawals - epochStatsValues := epochStats.GetValues(false) - if epochStatsValues == nil || epochStatsValues.PendingWithdrawals == nil { - pendingWithdrawals = epochStatsValues.PendingWithdrawals - } else { - pendingWithdrawals = []EpochStatsPendingWithdrawals{} - } - - maxDequeuePerSweep := chainSpec.MaxPendingPartialsPerWithdrawalsSweep - - for _, block := range parentBlocks { - // dequeue pending withdrawals - dequeued := uint64(0) - for _, pendingWithdrawal := range pendingWithdrawals { - if pendingWithdrawal.Epoch > epochStats.epoch || dequeued >= maxDequeuePerSweep { - break - } - dequeued++ - } - pendingWithdrawals = pendingWithdrawals[dequeued:] - - // get additional withdrawal requests from block - blockBody := block.GetBlock() - if blockBody == nil { - continue - } - - requests, err := blockBody.ExecutionRequests() - if err != nil { - continue - } - - for _, withdrawal := range requests.Withdrawals { - - } - } - - return nil -} - -type replayBlockConsolidationsState struct { - block *Block - pendingWithdrawals []EpochStatsPendingWithdrawals - additionalWithdrawals []phase0.ValidatorIndex - validatorMap map[phase0.ValidatorIndex]*phase0.Validator -} - -func (dbw *dbWriter) replayBlockConsolidations(epochStats *EpochStats, block *Block, prevState *replayBlockConsolidationsState) ([]uint8, *replayBlockConsolidationsState) { - parentBlocks := dbw.getParentBlocks(block) - if prevState == nil || prevState.block.Slot < block.Slot { - epochStatsValues := epochStats.GetValues(false) - if epochStatsValues == nil { - return nil, nil - } - - pendingWithdrawals := epochStatsValues.PendingWithdrawals - if pendingWithdrawals == nil { - pendingWithdrawals = []EpochStatsPendingWithdrawals{} - } - - prevState = &replayBlockConsolidationsState{ - block: parentBlocks[0], - pendingWithdrawals: pendingWithdrawals, - additionalWithdrawals: []phase0.ValidatorIndex{}, - validatorMap: map[phase0.ValidatorIndex]*phase0.Validator{}, - } - } - - // get consolidations from block - blockBody := block.GetBlock() - if blockBody == nil { - return nil, nil - } - - blockRequests, err := blockBody.ExecutionRequests() - if err != nil { - return nil, nil - } - - // get relevant indices and prepare results array - relevantIndices := []phase0.ValidatorIndex{} - requestResults := make([]uint8, len(blockRequests.Consolidations)) - for i, consolidation := range blockRequests.Consolidations { - sourceIndice, sourceFound := dbw.indexer.validatorCache.getValidatorIndexByPubkey(consolidation.SourcePubkey) - if !sourceFound { - requestResults[i] = dbtypes.ConsolidationRequestResultSrcNotFound - continue - } - - targetIndice, targetFound := dbw.indexer.validatorCache.getValidatorIndexByPubkey(consolidation.TargetPubkey) - if !targetFound { - requestResults[i] = dbtypes.ConsolidationRequestResultTgtNotFound - continue - } - - relevantIndices = append(relevantIndices, sourceIndice, targetIndice) - } - - validatorMap := map[phase0.ValidatorIndex]*phase0.Validator{} - getValidator := func(index phase0.ValidatorIndex) *phase0.Validator { - if validator, ok := validatorMap[index]; ok { - return validator - } - - validator, err := dbw.indexer.validatorCache.getValidatorByIndex(index, nil) - if err != nil { - return nil - } - } - - // replay parent blocks up to the current block and apply all operations for relevant indices - for _, parentBlock := range parentBlocks { - blockBody := parentBlock.GetBlock() - if blockBody == nil { - return nil, nil - } - - // get bls changes - blsChanges, err := blockBody.BLSToExecutionChanges() - if err != nil { - return nil, nil - } - - for _, blsChange := range blsChanges { - validatorIndex := blsChange.Message.ValidatorIndex - if slices.Contains(relevantIndices, validatorIndex) { - prevState.validatorMap[validatorIndex] = &blsChange.Message.Validator - } - - if blsChange.ValidatorIndex == prevState.validatorMap[blsChange.ValidatorIndex].Index { - prevState.validatorMap[blsChange.ValidatorIndex].WithdrawalCredentials = blsChange.WithdrawalCredentials - } - } - - // get execution requests - requests, err := blockBody.ExecutionRequests() - requests, err := blockBody.ExecutionRequests() - if err != nil { - return nil - } - } - - return prevState - -} From 39c37611612702c855900895834b3e5990e57aac Mon Sep 17 00:00:00 2001 From: pk910 Date: Sat, 8 Feb 2025 15:19:19 +0100 Subject: [PATCH 03/18] show simulated request results on UI --- handlers/el_consolidations.go | 75 +++++++++++++++++++ handlers/el_withdrawals.go | 32 ++++++++ handlers/validator.go | 4 + .../el_consolidations/el_consolidations.html | 8 +- templates/el_withdrawals/el_withdrawals.html | 8 +- .../validator/consolidationRequests.html | 8 +- types/models/el_consolidations.go | 2 + types/models/el_withdrawals.go | 2 + types/models/validator.go | 4 + 9 files changed, 140 insertions(+), 3 deletions(-) diff --git a/handlers/el_consolidations.go b/handlers/el_consolidations.go index 367a71db..04dbe336 100644 --- a/handlers/el_consolidations.go +++ b/handlers/el_consolidations.go @@ -8,7 +8,9 @@ import ( "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/ethereum/go-ethereum/common" + "github.com/ethpandaops/dora/clients/consensus" "github.com/ethpandaops/dora/dbtypes" + "github.com/ethpandaops/dora/indexer/beacon" "github.com/ethpandaops/dora/services" "github.com/ethpandaops/dora/templates" "github.com/ethpandaops/dora/types/models" @@ -230,6 +232,8 @@ func buildFilteredElConsolidationsPageData(pageIdx uint64, pageSize uint64, minS elConsolidationData.SlotRoot = request.SlotRoot elConsolidationData.Time = chainState.SlotToTime(phase0.Slot(request.SlotNumber)) elConsolidationData.Status = uint64(1) + elConsolidationData.Result = request.Result + elConsolidationData.ResultMessage = getConsolidationResultMessage(request.Result, chainState.GetSpecs()) if consolidation.RequestOrphaned { elConsolidationData.Status = uint64(2) } @@ -288,3 +292,74 @@ func buildFilteredElConsolidationsPageData(pageIdx uint64, pageSize uint64, minS return pageData } + +func getConsolidationResultMessage(result uint8, specs *consensus.ChainSpec) string { + switch result { + case dbtypes.ConsolidationRequestResultUnknown: + return "Unknown result" + case dbtypes.ConsolidationRequestResultSuccess: + return "Success" + case dbtypes.ConsolidationRequestResultTotalBalanceTooLow: + requiredBalance := getConsolidationRequiredBalance(specs) + return fmt.Sprintf("Error: Total active balance too low (required: %v ETH)", requiredBalance/beacon.EtherGweiFactor) + case dbtypes.ConsolidationRequestResultQueueFull: + return "Error: Queue is full" + case dbtypes.ConsolidationRequestResultSrcNotFound: + return "Error: Source validator not found" + case dbtypes.ConsolidationRequestResultSrcInvalidCredentials: + return "Error: Source validator has invalid credentials" + case dbtypes.ConsolidationRequestResultSrcInvalidSender: + return "Error: Source validator withdrawal address does not match tx sender" + case dbtypes.ConsolidationRequestResultSrcNotActive: + return "Error: Source validator is not active" + case dbtypes.ConsolidationRequestResultSrcNotOldEnough: + return fmt.Sprintf("Error: Source validator is not old enough (min. %v epochs)", specs.ShardCommitteePeriod) + case dbtypes.ConsolidationRequestResultSrcHasPendingWithdrawal: + return "Error: Source validator has pending partial withdrawal" + case dbtypes.ConsolidationRequestResultTgtNotFound: + return "Error: Target validator not found" + case dbtypes.ConsolidationRequestResultTgtInvalidCredentials: + return "Error: Target validator has invalid credentials" + case dbtypes.ConsolidationRequestResultTgtInvalidSender: + return "Error: Target validator withdrawal address does not match tx sender" + case dbtypes.ConsolidationRequestResultTgtNotCompounding: + return "Error: Target validator is not compounding" + case dbtypes.ConsolidationRequestResultTgtNotActive: + return "Error: Target validator is not active" + default: + return fmt.Sprintf("Unknown error code: %d", result) + } +} + +func getConsolidationRequiredBalance(chainSpec *consensus.ChainSpec) phase0.Gwei { + // (c) claude-3.5-sonnet + // We need: consolidationChurnLimit > chainSpec.MinActivationBalance + // Where: consolidationChurnLimit = balanceChurnLimit - activationExitChurnLimit + // And: balanceChurnLimit = max(totalActiveBalance/ChurnLimitQuotient, MinPerEpochChurnLimitElectra) + // And: activationExitChurnLimit = min(balanceChurnLimit, MaxPerEpochActivationExitChurnLimit) + + // Work backwards: + // 1. balanceChurnLimit - activationExitChurnLimit > MinActivationBalance + // 2. balanceChurnLimit - min(balanceChurnLimit, MaxPerEpochActivationExitChurnLimit) > MinActivationBalance + // 3. For the minimum valid totalActiveBalance, these will be equal: + // balanceChurnLimit - MaxPerEpochActivationExitChurnLimit = MinActivationBalance + // 4. Therefore: balanceChurnLimit = MinActivationBalance + MaxPerEpochActivationExitChurnLimit + + requiredBalanceChurnLimit := chainSpec.MinActivationBalance + chainSpec.MaxPerEpochActivationExitChurnLimit + + // Round up to next increment + if requiredBalanceChurnLimit%chainSpec.EffectiveBalanceIncrement != 0 { + requiredBalanceChurnLimit += chainSpec.EffectiveBalanceIncrement - (requiredBalanceChurnLimit % chainSpec.EffectiveBalanceIncrement) + } + + // Now solve for totalActiveBalance: + // balanceChurnLimit = max(totalActiveBalance/ChurnLimitQuotient, MinPerEpochChurnLimitElectra) + // Therefore: totalActiveBalance = balanceChurnLimit * ChurnLimitQuotient + + // But first ensure we meet the minimum churn limit + if requiredBalanceChurnLimit < chainSpec.MinPerEpochChurnLimitElectra { + requiredBalanceChurnLimit = chainSpec.MinPerEpochChurnLimitElectra + } + + return phase0.Gwei(requiredBalanceChurnLimit * chainSpec.ChurnLimitQuotient) +} diff --git a/handlers/el_withdrawals.go b/handlers/el_withdrawals.go index 0d3d82f0..a1909f88 100644 --- a/handlers/el_withdrawals.go +++ b/handlers/el_withdrawals.go @@ -8,6 +8,7 @@ import ( "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/ethereum/go-ethereum/common" + "github.com/ethpandaops/dora/clients/consensus" "github.com/ethpandaops/dora/dbtypes" "github.com/ethpandaops/dora/services" "github.com/ethpandaops/dora/templates" @@ -216,6 +217,8 @@ func buildFilteredElWithdrawalsPageData(pageIdx uint64, pageSize uint64, minSlot elWithdrawalData.SlotRoot = request.SlotRoot elWithdrawalData.Time = chainState.SlotToTime(phase0.Slot(request.SlotNumber)) elWithdrawalData.Status = uint64(1) + elWithdrawalData.Result = request.Result + elWithdrawalData.ResultMessage = getWithdrawalResultMessage(request.Result, chainState.GetSpecs()) if elWithdrawal.RequestOrphaned { elWithdrawalData.Status = uint64(2) } @@ -274,3 +277,32 @@ func buildFilteredElWithdrawalsPageData(pageIdx uint64, pageSize uint64, minSlot return pageData } + +func getWithdrawalResultMessage(result uint8, specs *consensus.ChainSpec) string { + switch result { + case dbtypes.WithdrawalRequestResultUnknown: + return "Unknown result" + case dbtypes.WithdrawalRequestResultSuccess: + return "Success" + case dbtypes.WithdrawalRequestResultQueueFull: + return "Error: Queue is full" + case dbtypes.WithdrawalRequestResultValidatorNotFound: + return "Error: Validator not found" + case dbtypes.WithdrawalRequestResultValidatorInvalidCredentials: + return "Error: Validator has invalid credentials" + case dbtypes.WithdrawalRequestResultValidatorInvalidSender: + return "Error: Validator withdrawal address does not match tx sender" + case dbtypes.WithdrawalRequestResultValidatorNotActive: + return "Error: Validator is not active" + case dbtypes.WithdrawalRequestResultValidatorNotOldEnough: + return fmt.Sprintf("Error: Validator is not old enough (min. %v epochs)", specs.ShardCommitteePeriod) + case dbtypes.WithdrawalRequestResultValidatorNotCompounding: + return "Error: Validator is not compounding" + case dbtypes.WithdrawalRequestResultValidatorHasPendingWithdrawal: + return "Error: Validator has pending partial withdrawal" + case dbtypes.WithdrawalRequestResultValidatorBalanceTooLow: + return "Error: Validator balance too low" + default: + return fmt.Sprintf("Unknown error code: %d", result) + } +} diff --git a/handlers/validator.go b/handlers/validator.go index 252f1f7b..78599c30 100644 --- a/handlers/validator.go +++ b/handlers/validator.go @@ -476,6 +476,8 @@ func buildValidatorPageData(validatorIndex uint64, tabView string) (*models.Vali elWithdrawalData.SlotRoot = request.SlotRoot elWithdrawalData.Time = chainState.SlotToTime(phase0.Slot(request.SlotNumber)) elWithdrawalData.Status = uint64(1) + elWithdrawalData.Result = request.Result + elWithdrawalData.ResultMessage = getWithdrawalResultMessage(request.Result, chainState.GetSpecs()) if elWithdrawal.RequestOrphaned { elWithdrawalData.Status = uint64(2) } @@ -563,6 +565,8 @@ func buildValidatorPageData(validatorIndex uint64, tabView string) (*models.Vali elConsolidationData.SlotRoot = request.SlotRoot elConsolidationData.Time = chainState.SlotToTime(phase0.Slot(request.SlotNumber)) elConsolidationData.Status = uint64(1) + elConsolidationData.Result = request.Result + elConsolidationData.ResultMessage = getConsolidationResultMessage(request.Result, chainState.GetSpecs()) if consolidation.RequestOrphaned { elConsolidationData.Status = uint64(2) } diff --git a/templates/el_consolidations/el_consolidations.html b/templates/el_consolidations/el_consolidations.html index 84762b3f..c785fac6 100644 --- a/templates/el_consolidations/el_consolidations.html +++ b/templates/el_consolidations/el_consolidations.html @@ -239,7 +239,13 @@

{{ if eq $request.Status 0 }} Req. Pending {{ else if eq $request.Status 1 }} - Req. Included + {{ if eq $request.Result 0 }} + Req. Included + {{ else if eq $request.Result 1 }} + Req. Processed + {{ else }} + Req. Failed + {{ end }} {{ else if eq $request.Status 2 }} Req. Orphaned {{ end }} diff --git a/templates/el_withdrawals/el_withdrawals.html b/templates/el_withdrawals/el_withdrawals.html index 29cc775a..deacd4eb 100644 --- a/templates/el_withdrawals/el_withdrawals.html +++ b/templates/el_withdrawals/el_withdrawals.html @@ -222,7 +222,13 @@

{{ if eq $request.Status 0 }} Req. Pending {{ else if eq $request.Status 1 }} - Req. Included + {{ if eq $request.Result 0 }} + Req. Included + {{ else if eq $request.Result 1 }} + Req. Included + {{ else }} + Req. Failed + {{ end }} {{ else if eq $request.Status 2 }} Req. Orphaned {{ end }} diff --git a/templates/validator/consolidationRequests.html b/templates/validator/consolidationRequests.html index 5b0debbb..afb0e213 100644 --- a/templates/validator/consolidationRequests.html +++ b/templates/validator/consolidationRequests.html @@ -84,7 +84,13 @@ {{ if eq $request.Status 0 }} Req. Pending {{ else if eq $request.Status 1 }} - Req. Included + {{ if eq $request.Result 0 }} + Req. Included + {{ else if eq $request.Result 1 }} + Req. Processed + {{ else }} + Req. Failed + {{ end }} {{ else if eq $request.Status 2 }} Req. Orphaned {{ end }} diff --git a/types/models/el_consolidations.go b/types/models/el_consolidations.go index e7bddc1f..47abeb76 100644 --- a/types/models/el_consolidations.go +++ b/types/models/el_consolidations.go @@ -43,6 +43,8 @@ type ElConsolidationsPageDataConsolidation struct { SlotRoot []byte `json:"slot_root"` Time time.Time `json:"time"` Status uint64 `json:"status"` + Result uint8 `json:"result"` + ResultMessage string `json:"result_message"` TxStatus uint64 `json:"tx_status"` SourceAddr []byte `json:"src_addr"` SourceValidatorValid bool `json:"src_vvalid"` diff --git a/types/models/el_withdrawals.go b/types/models/el_withdrawals.go index 02a8fcd8..fe35c33c 100644 --- a/types/models/el_withdrawals.go +++ b/types/models/el_withdrawals.go @@ -41,6 +41,8 @@ type ElWithdrawalsPageDataWithdrawal struct { SlotRoot []byte `json:"slot_root"` Time time.Time `json:"time"` Status uint64 `json:"status"` + Result uint8 `json:"result"` + ResultMessage string `json:"result_message"` TxStatus uint64 `json:"tx_status"` SourceAddr []byte `json:"source_addr"` Amount uint64 `json:"amount"` diff --git a/types/models/validator.go b/types/models/validator.go index 7155d9ee..9d0a6e8b 100644 --- a/types/models/validator.go +++ b/types/models/validator.go @@ -115,6 +115,8 @@ type ValidatorPageDataConsolidation struct { SlotRoot []byte `json:"slot_root"` Time time.Time `json:"time"` Status uint64 `json:"status"` + Result uint8 `json:"result"` + ResultMessage string `json:"result_message"` TxStatus uint64 `json:"tx_status"` SourceAddr []byte `json:"src_addr"` SourceValidatorValid bool `json:"src_vvalid"` @@ -146,6 +148,8 @@ type ValidatorPageDataWithdrawal struct { SlotRoot []byte `json:"slot_root"` Time time.Time `json:"time"` Status uint64 `json:"status"` + Result uint8 `json:"result"` + ResultMessage string `json:"result_message"` TxStatus uint64 `json:"tx_status"` SourceAddr []byte `json:"source_addr"` Amount uint64 `json:"amount"` From fad994195091a9d041a384019d681beadec8e339 Mon Sep 17 00:00:00 2001 From: pk910 Date: Sat, 8 Feb 2025 15:46:19 +0100 Subject: [PATCH 04/18] fix result simulation in synchronizer --- indexer/beacon/finalization.go | 2 +- indexer/beacon/state_sim.go | 10 +++++++++- indexer/beacon/synchronizer.go | 26 ++++++++++++++------------ indexer/beacon/writedb.go | 8 +++++--- 4 files changed, 29 insertions(+), 17 deletions(-) diff --git a/indexer/beacon/finalization.go b/indexer/beacon/finalization.go index a248ec6f..9ea4be8e 100644 --- a/indexer/beacon/finalization.go +++ b/indexer/beacon/finalization.go @@ -302,7 +302,7 @@ func (indexer *Indexer) finalizeEpoch(epoch phase0.Epoch, justifiedRoot phase0.R deleteBeforeSlot := chainState.EpochToSlot(epoch + 1) err := db.RunDBTransaction(func(tx *sqlx.Tx) error { // persist canonical epoch data - if err := indexer.dbWriter.persistEpochData(tx, epoch, canonicalBlocks, epochStats, epochVotes); err != nil { + if err := indexer.dbWriter.persistEpochData(tx, epoch, canonicalBlocks, epochStats, epochVotes, nil); err != nil { return fmt.Errorf("failed persisting epoch data for epoch %v: %v", epoch, err) } diff --git a/indexer/beacon/state_sim.go b/indexer/beacon/state_sim.go index b6a1daf5..77b27e8a 100644 --- a/indexer/beacon/state_sim.go +++ b/indexer/beacon/state_sim.go @@ -15,6 +15,7 @@ type stateSimulator struct { epochStats *EpochStats epochStatsValues *EpochStatsValues prevState *stateSimulatorState + validatorSet []*phase0.Validator } type stateSimulatorState struct { @@ -115,7 +116,14 @@ func (sim *stateSimulator) getValidator(index phase0.ValidatorIndex) *phase0.Val return validator } - validator := sim.indexer.validatorCache.getValidatorByIndexAndRoot(index, sim.prevState.epochRoot) + var validator *phase0.Validator + + if sim.validatorSet != nil && len(sim.validatorSet) > int(index) { + validator = sim.validatorSet[index] + } else { + validator = sim.indexer.validatorCache.getValidatorByIndexAndRoot(index, sim.prevState.epochRoot) + } + sim.prevState.validatorMap[index] = validator return validator } diff --git a/indexer/beacon/synchronizer.go b/indexer/beacon/synchronizer.go index bf434db2..78a92753 100644 --- a/indexer/beacon/synchronizer.go +++ b/indexer/beacon/synchronizer.go @@ -355,22 +355,21 @@ func (sync *synchronizer) syncEpoch(syncEpoch phase0.Epoch, client *Client, last return false, fmt.Errorf("error fetching epoch %v state: %v", syncEpoch, err) } + var validatorSet []*phase0.Validator + if state == nil { + sync.logger.Warnf("state for epoch %v not found", syncEpoch) + } else { + validatorSet, err = state.Validators() + if err != nil { + sync.logger.Warnf("error getting validator set from state %v: %v", dependentRoot.String(), err) + } + } + var epochStats *EpochStats var epochStatsValues *EpochStatsValues if epochState != nil && epochState.loadingStatus == 2 { epochStats = newEpochStats(syncEpoch, dependentRoot) epochStats.dependentState = epochState - - var validatorSet []*phase0.Validator - if state == nil { - sync.logger.Warnf("state for epoch %v not found", syncEpoch) - } else { - validatorSet, err = state.Validators() - if err != nil { - sync.logger.Warnf("error getting validator set from state %v: %v", epochStats.dependentRoot.String(), err) - } - } - epochStats.processState(sync.indexer, validatorSet) epochStatsValues = epochStats.GetValues(false) } @@ -391,9 +390,12 @@ func (sync *synchronizer) syncEpoch(syncEpoch phase0.Epoch, client *Client, last } } + sim := newStateSimulator(sync.indexer, epochStats) + sim.validatorSet = validatorSet + // save blocks err = db.RunDBTransaction(func(tx *sqlx.Tx) error { - err = sync.indexer.dbWriter.persistEpochData(tx, syncEpoch, canonicalBlocks, epochStats, epochVotes) + err = sync.indexer.dbWriter.persistEpochData(tx, syncEpoch, canonicalBlocks, epochStats, epochVotes, sim) if err != nil { return fmt.Errorf("error persisting epoch data to db: %v", err) } diff --git a/indexer/beacon/writedb.go b/indexer/beacon/writedb.go index 419c8fd8..9da28a4d 100644 --- a/indexer/beacon/writedb.go +++ b/indexer/beacon/writedb.go @@ -129,15 +129,17 @@ func (dbw *dbWriter) persistBlockChildObjects(tx *sqlx.Tx, block *Block, deposit return nil } -func (dbw *dbWriter) persistEpochData(tx *sqlx.Tx, epoch phase0.Epoch, blocks []*Block, epochStats *EpochStats, epochVotes *EpochVotes) error { +func (dbw *dbWriter) persistEpochData(tx *sqlx.Tx, epoch phase0.Epoch, blocks []*Block, epochStats *EpochStats, epochVotes *EpochVotes, sim *stateSimulator) error { if tx == nil { return db.RunDBTransaction(func(tx *sqlx.Tx) error { - return dbw.persistEpochData(tx, epoch, blocks, epochStats, epochVotes) + return dbw.persistEpochData(tx, epoch, blocks, epochStats, epochVotes, sim) }) } canonicalForkId := ForkKey(0) - sim := newStateSimulator(dbw.indexer, epochStats) + if sim == nil { + sim = newStateSimulator(dbw.indexer, epochStats) + } dbEpoch := dbw.buildDbEpoch(epoch, blocks, epochStats, epochVotes, func(block *Block, depositIndex *uint64) { _, err := dbw.persistBlockData(tx, block, epochStats, depositIndex, false, &canonicalForkId, sim) From 040084f3013a396893aedd35ab396488efc3c383 Mon Sep 17 00:00:00 2001 From: pk910 Date: Sat, 8 Feb 2025 16:50:50 +0100 Subject: [PATCH 05/18] cache block operation results & clean up code --- indexer/beacon/block.go | 2 ++ indexer/beacon/blockcache.go | 5 +---- indexer/beacon/finalization.go | 2 ++ indexer/beacon/pruning.go | 1 + indexer/beacon/state_sim.go | 10 +++++++++- indexer/beacon/writedb.go | 20 ++++++++++---------- 6 files changed, 25 insertions(+), 15 deletions(-) diff --git a/indexer/beacon/block.go b/indexer/beacon/block.go index 0d123103..fc434a7a 100644 --- a/indexer/beacon/block.go +++ b/indexer/beacon/block.go @@ -37,6 +37,8 @@ type Block struct { seenMutex sync.RWMutex seenMap map[uint16]*Client processedActivity uint8 + blockResults [][]uint8 + blockResultsMutex sync.Mutex } // BlockBodyIndex holds important block propoerties that are used as index for cache lookups. diff --git a/indexer/beacon/blockcache.go b/indexer/beacon/blockcache.go index 0eece406..b5a326e4 100644 --- a/indexer/beacon/blockcache.go +++ b/indexer/beacon/blockcache.go @@ -379,9 +379,6 @@ func (cache *blockCache) isCanonicalBlock(blockRoot phase0.Root, head phase0.Roo // It returns a boolean indicating whether the block with blockRoot is a canonical block, and the distance between the two blocks. func (cache *blockCache) getCanonicalDistance(blockRoot phase0.Root, head phase0.Root, maxDistance uint64) (bool, uint64) { block := cache.getBlockByRoot(blockRoot) - if block == nil { - return false, 0 - } canonicalBlock := cache.getBlockByRoot(head) if canonicalBlock == nil { @@ -394,7 +391,7 @@ func (cache *blockCache) getCanonicalDistance(blockRoot phase0.Root, head phase0 } for canonicalBlock != nil { - if canonicalBlock.Slot < block.Slot { + if block != nil && canonicalBlock.Slot < block.Slot { return false, 0 } diff --git a/indexer/beacon/finalization.go b/indexer/beacon/finalization.go index 9ea4be8e..d4f40792 100644 --- a/indexer/beacon/finalization.go +++ b/indexer/beacon/finalization.go @@ -293,6 +293,8 @@ func (indexer *Indexer) finalizeEpoch(epoch phase0.Epoch, justifiedRoot phase0.R if blockIndex := block.GetBlockIndex(); blockIndex != nil { canonicalBlockHashes[i] = blockIndex.ExecutionHash[:] } + + block.blockResults = nil // force re-simulation of block results } t1dur := time.Since(t1) - t1loading diff --git a/indexer/beacon/pruning.go b/indexer/beacon/pruning.go index 99a447f1..f2d640d4 100644 --- a/indexer/beacon/pruning.go +++ b/indexer/beacon/pruning.go @@ -260,6 +260,7 @@ func (indexer *Indexer) processEpochPruning(pruneEpoch phase0.Epoch) (uint64, ui block.processingStatus = dbtypes.UnfinalizedBlockStatusPruned block.setBlockIndex(block.block) block.block = nil + block.blockResults = nil } // clean up epoch stats cache diff --git a/indexer/beacon/state_sim.go b/indexer/beacon/state_sim.go index 77b27e8a..950414e6 100644 --- a/indexer/beacon/state_sim.go +++ b/indexer/beacon/state_sim.go @@ -455,8 +455,15 @@ func (sim *stateSimulator) replayBlockResults(block *Block) [][]uint8 { return nil } + block.blockResultsMutex.Lock() + defer block.blockResultsMutex.Unlock() + + if len(block.blockResults) > 0 { + return block.blockResults + } + parentBlocks := sim.getParentBlocks(block) - if sim.prevState == nil || (sim.prevState.block != nil && sim.prevState.block.Slot < block.Slot) { + if sim.prevState == nil || (sim.prevState.block != nil && sim.prevState.block.Slot > block.Slot) { state := sim.resetState(block) if state == nil { return nil @@ -480,5 +487,6 @@ func (sim *stateSimulator) replayBlockResults(block *Block) [][]uint8 { results := sim.applyBlock(block) sim.prevState.blockResults = results + block.blockResults = results return results } diff --git a/indexer/beacon/writedb.go b/indexer/beacon/writedb.go index 9da28a4d..747bc8bd 100644 --- a/indexer/beacon/writedb.go +++ b/indexer/beacon/writedb.go @@ -707,17 +707,17 @@ func (dbw *dbWriter) buildDbConsolidationRequests(block *Block, orphaned bool, o } } - var blockResults [][]uint8 - if sim != nil { - blockResults = sim.replayBlockResults(block) - } - consolidations := requests.Consolidations if len(consolidations) == 0 { return []*dbtypes.ConsolidationRequest{} } + var blockResults [][]uint8 + if sim != nil { + blockResults = sim.replayBlockResults(block) + } + blockNumber, _ := blockBody.ExecutionBlockNumber() dbConsolidations := make([]*dbtypes.ConsolidationRequest, len(consolidations)) @@ -788,17 +788,17 @@ func (dbw *dbWriter) buildDbWithdrawalRequests(block *Block, orphaned bool, over } } - var blockResults [][]uint8 - if sim != nil { - blockResults = sim.replayBlockResults(block) - } - withdrawalRequests := requests.Withdrawals if len(withdrawalRequests) == 0 { return []*dbtypes.WithdrawalRequest{} } + var blockResults [][]uint8 + if sim != nil { + blockResults = sim.replayBlockResults(block) + } + blockNumber, _ := blockBody.ExecutionBlockNumber() dbWithdrawalRequests := make([]*dbtypes.WithdrawalRequest, len(withdrawalRequests)) From 8adbcf0088232fa4662cb90f0632f6ae6bfec6f2 Mon Sep 17 00:00:00 2001 From: pk910 Date: Sat, 8 Feb 2025 17:35:42 +0100 Subject: [PATCH 06/18] fix table size for consolidations/withdrawals list --- templates/el_consolidations/el_consolidations.html | 2 +- templates/el_withdrawals/el_withdrawals.html | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/templates/el_consolidations/el_consolidations.html b/templates/el_consolidations/el_consolidations.html index c785fac6..8b5c4a6e 100644 --- a/templates/el_consolidations/el_consolidations.html +++ b/templates/el_consolidations/el_consolidations.html @@ -184,7 +184,7 @@

{{ formatRecentTimeShort $request.Time }}
- {{ ethAddressLink $request.SourceAddr }} + {{ ethAddressLink $request.SourceAddr }}
diff --git a/templates/el_withdrawals/el_withdrawals.html b/templates/el_withdrawals/el_withdrawals.html index deacd4eb..bbe3de3c 100644 --- a/templates/el_withdrawals/el_withdrawals.html +++ b/templates/el_withdrawals/el_withdrawals.html @@ -171,7 +171,7 @@

{{ formatRecentTimeShort $request.Time }}
- {{ ethAddressLink $request.SourceAddr }} + {{ ethAddressLink $request.SourceAddr }}
From f30b64ab7e981402d3e69ef91f2d7d5fffff0f5e Mon Sep 17 00:00:00 2001 From: pk910 Date: Sat, 8 Feb 2025 17:38:04 +0100 Subject: [PATCH 07/18] init controls on lazy loaded content --- templates/validator/validator.html | 1 + 1 file changed, 1 insertion(+) diff --git a/templates/validator/validator.html b/templates/validator/validator.html index 8fb9810c..d338d716 100644 --- a/templates/validator/validator.html +++ b/templates/validator/validator.html @@ -284,6 +284,7 @@

Validator {{ forma $.get(event.target.getAttribute('href') + "&lazy=true", function(data) { $(tabEl).html(data); $(tabEl).data("loaded", true); + explorer.initControls(); }); } From 04164a5cd24bb50d1270ec261b126c5fe820f705 Mon Sep 17 00:00:00 2001 From: pk910 Date: Sat, 8 Feb 2025 18:00:58 +0100 Subject: [PATCH 08/18] show withdrawal request status on validator details page --- templates/validator/withdrawalRequests.html | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/templates/validator/withdrawalRequests.html b/templates/validator/withdrawalRequests.html index 482dd4c6..917e4709 100644 --- a/templates/validator/withdrawalRequests.html +++ b/templates/validator/withdrawalRequests.html @@ -67,7 +67,13 @@ {{ if eq $request.Status 0 }} Req. Pending {{ else if eq $request.Status 1 }} - Req. Included + {{ if eq $request.Result 0 }} + Req. Included + {{ else if eq $request.Result 1 }} + Req. Processed + {{ else }} + Req. Failed + {{ end }} {{ else if eq $request.Status 2 }} Req. Orphaned {{ end }} From c251d314388771acbd3868685ae9e1e8e6b79e22 Mon Sep 17 00:00:00 2001 From: pk910 Date: Sat, 8 Feb 2025 18:02:22 +0100 Subject: [PATCH 09/18] fix show additional withdrawal requests link on validator details page --- templates/validator/withdrawalRequests.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/templates/validator/withdrawalRequests.html b/templates/validator/withdrawalRequests.html index 917e4709..5bd45e01 100644 --- a/templates/validator/withdrawalRequests.html +++ b/templates/validator/withdrawalRequests.html @@ -83,7 +83,7 @@ {{ if gt .AdditionalWithdrawalRequestCount 0 }} - View {{ .AdditionalWithdrawalRequestCount }} more withdrawal requests + View {{ .AdditionalWithdrawalRequestCount }} more withdrawal requests {{ end }} From e037a1745ac111695bcf0f278afb9381e6a77fed Mon Sep 17 00:00:00 2001 From: pk910 Date: Sat, 8 Feb 2025 18:52:51 +0100 Subject: [PATCH 10/18] avoid incomplete spec warnings for non-scheduled forks --- clients/consensus/chainspec.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/consensus/chainspec.go b/clients/consensus/chainspec.go index bda692d4..34ea1ef8 100644 --- a/clients/consensus/chainspec.go +++ b/clients/consensus/chainspec.go @@ -33,9 +33,9 @@ type ChainSpec struct { DenebForkVersion phase0.Version `yaml:"DENEB_FORK_VERSION"` DenebForkEpoch *uint64 `yaml:"DENEB_FORK_EPOCH"` ElectraForkVersion phase0.Version `yaml:"ELECTRA_FORK_VERSION" check-if-fork:"ElectraForkEpoch"` - ElectraForkEpoch *uint64 `yaml:"ELECTRA_FORK_EPOCH"` + ElectraForkEpoch *uint64 `yaml:"ELECTRA_FORK_EPOCH" check-if-fork:"ElectraForkEpoch"` Eip7594ForkVersion phase0.Version `yaml:"EIP7594_FORK_VERSION" check-if-fork:"Eip7594ForkEpoch"` - Eip7594ForkEpoch *uint64 `yaml:"EIP7594_FORK_EPOCH"` + Eip7594ForkEpoch *uint64 `yaml:"EIP7594_FORK_EPOCH" check-if-fork:"Eip7594ForkEpoch"` SecondsPerSlot time.Duration `yaml:"SECONDS_PER_SLOT"` SlotsPerEpoch uint64 `yaml:"SLOTS_PER_EPOCH"` EpochsPerHistoricalVector uint64 `yaml:"EPOCHS_PER_HISTORICAL_VECTOR"` From d20a4d367d4649889b2cf7a4b40d6f4a125c55c9 Mon Sep 17 00:00:00 2001 From: pk910 Date: Sun, 9 Feb 2025 17:26:42 +0100 Subject: [PATCH 11/18] fix state pullution from state simulator --- indexer/beacon/state_sim.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/indexer/beacon/state_sim.go b/indexer/beacon/state_sim.go index 950414e6..e46d9562 100644 --- a/indexer/beacon/state_sim.go +++ b/indexer/beacon/state_sim.go @@ -119,7 +119,17 @@ func (sim *stateSimulator) getValidator(index phase0.ValidatorIndex) *phase0.Val var validator *phase0.Validator if sim.validatorSet != nil && len(sim.validatorSet) > int(index) { - validator = sim.validatorSet[index] + valdata := sim.validatorSet[index] + validator = &phase0.Validator{ + PublicKey: valdata.PublicKey, + WithdrawalCredentials: valdata.WithdrawalCredentials, + EffectiveBalance: valdata.EffectiveBalance, + Slashed: valdata.Slashed, + ActivationEligibilityEpoch: valdata.ActivationEligibilityEpoch, + ActivationEpoch: valdata.ActivationEpoch, + ExitEpoch: valdata.ExitEpoch, + WithdrawableEpoch: valdata.WithdrawableEpoch, + } } else { validator = sim.indexer.validatorCache.getValidatorByIndexAndRoot(index, sim.prevState.epochRoot) } @@ -175,7 +185,12 @@ func (sim *stateSimulator) applyConsolidation(consolidation *electra.Consolidati if srcValidator == tgtValidator { // self consolidation, set withdrawal credentials to 0x02 - srcValidator.WithdrawalCredentials[0] = 0x02 + if srcValidator.WithdrawalCredentials[0] == 0x01 { + withdrawalCreds := make([]byte, 32) + copy(withdrawalCreds, srcValidator.WithdrawalCredentials) + withdrawalCreds[0] = 0x02 + srcValidator.WithdrawalCredentials = withdrawalCreds + } return dbtypes.ConsolidationRequestResultSuccess } From aa2c4e21da47c920720523040c3950e2ebff53c5 Mon Sep 17 00:00:00 2001 From: pk910 Date: Sun, 9 Feb 2025 17:40:12 +0100 Subject: [PATCH 12/18] avoid reusing cached simulation results for orphaned blocks on same slot --- indexer/beacon/state_sim.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/indexer/beacon/state_sim.go b/indexer/beacon/state_sim.go index e46d9562..a20e053c 100644 --- a/indexer/beacon/state_sim.go +++ b/indexer/beacon/state_sim.go @@ -478,7 +478,22 @@ func (sim *stateSimulator) replayBlockResults(block *Block) [][]uint8 { } parentBlocks := sim.getParentBlocks(block) - if sim.prevState == nil || (sim.prevState.block != nil && sim.prevState.block.Slot > block.Slot) { + + canReuseParentState := false + if sim.prevState != nil && sim.prevState.block != nil { + if sim.prevState.block.Slot == block.Slot { + canReuseParentState = bytes.Equal(sim.prevState.block.Root[:], block.Root[:]) + } else if sim.prevState.block.Slot < block.Slot { + for _, parentBlock := range parentBlocks { + if parentBlock.Slot == sim.prevState.block.Slot { + canReuseParentState = bytes.Equal(parentBlock.Root[:], sim.prevState.block.Root[:]) + break + } + } + } + } + + if !canReuseParentState { state := sim.resetState(block) if state == nil { return nil From 2468ba9d36f98c71ec11cb5fbae0889ea1212a26 Mon Sep 17 00:00:00 2001 From: pk910 Date: Sun, 9 Feb 2025 17:45:35 +0100 Subject: [PATCH 13/18] fix operation result error codes --- indexer/beacon/state_sim.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/indexer/beacon/state_sim.go b/indexer/beacon/state_sim.go index a20e053c..a65a1067 100644 --- a/indexer/beacon/state_sim.go +++ b/indexer/beacon/state_sim.go @@ -171,7 +171,7 @@ func (sim *stateSimulator) applyConsolidation(consolidation *electra.Consolidati } if !bytes.Equal(srcWithdrawalCreds[12:], consolidation.SourceAddress[:]) { - return dbtypes.ConsolidationRequestResultSrcInvalidCredentials + return dbtypes.ConsolidationRequestResultSrcInvalidSender } tgtWithdrawalCreds := tgtValidator.WithdrawalCredentials @@ -180,7 +180,7 @@ func (sim *stateSimulator) applyConsolidation(consolidation *electra.Consolidati } if !bytes.Equal(tgtWithdrawalCreds[12:], consolidation.SourceAddress[:]) { - return dbtypes.ConsolidationRequestResultTgtInvalidCredentials + return dbtypes.ConsolidationRequestResultTgtInvalidSender } if srcValidator == tgtValidator { @@ -280,7 +280,7 @@ func (sim *stateSimulator) applyWithdrawal(withdrawal *electra.WithdrawalRequest } if !bytes.Equal(srcWithdrawalCreds[12:], withdrawal.SourceAddress[:]) { - return dbtypes.WithdrawalRequestResultValidatorInvalidCredentials + return dbtypes.WithdrawalRequestResultValidatorInvalidSender } if validator.ActivationEpoch == FarFutureEpoch || validator.ActivationEpoch > sim.epochStats.epoch || validator.ExitEpoch < FarFutureEpoch { From 60fb06148cf28dd67257ecf9c92fed365ac98b4c Mon Sep 17 00:00:00 2001 From: pk910 Date: Sun, 9 Feb 2025 17:55:36 +0100 Subject: [PATCH 14/18] improve error messages --- handlers/el_consolidations.go | 2 +- handlers/el_withdrawals.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/handlers/el_consolidations.go b/handlers/el_consolidations.go index 04dbe336..652f43f9 100644 --- a/handlers/el_consolidations.go +++ b/handlers/el_consolidations.go @@ -303,7 +303,7 @@ func getConsolidationResultMessage(result uint8, specs *consensus.ChainSpec) str requiredBalance := getConsolidationRequiredBalance(specs) return fmt.Sprintf("Error: Total active balance too low (required: %v ETH)", requiredBalance/beacon.EtherGweiFactor) case dbtypes.ConsolidationRequestResultQueueFull: - return "Error: Queue is full" + return "Error: Consolidation queue is full" case dbtypes.ConsolidationRequestResultSrcNotFound: return "Error: Source validator not found" case dbtypes.ConsolidationRequestResultSrcInvalidCredentials: diff --git a/handlers/el_withdrawals.go b/handlers/el_withdrawals.go index a1909f88..54955730 100644 --- a/handlers/el_withdrawals.go +++ b/handlers/el_withdrawals.go @@ -285,7 +285,7 @@ func getWithdrawalResultMessage(result uint8, specs *consensus.ChainSpec) string case dbtypes.WithdrawalRequestResultSuccess: return "Success" case dbtypes.WithdrawalRequestResultQueueFull: - return "Error: Queue is full" + return "Error: Withdrawal queue is full" case dbtypes.WithdrawalRequestResultValidatorNotFound: return "Error: Validator not found" case dbtypes.WithdrawalRequestResultValidatorInvalidCredentials: From ac531180b4345ff3b5c7a874cb2e20805381ebeb Mon Sep 17 00:00:00 2001 From: pk910 Date: Thu, 13 Feb 2025 21:53:31 +0100 Subject: [PATCH 15/18] fix schema upgrade for pgsql --- db/consolidation_request_txs.go | 2 +- db/schema/pgsql/20250207221132_request-results.sql | 4 ++-- db/withdrawal_request_txs.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/db/consolidation_request_txs.go b/db/consolidation_request_txs.go index baef091a..4f23099b 100644 --- a/db/consolidation_request_txs.go +++ b/db/consolidation_request_txs.go @@ -53,7 +53,7 @@ func InsertConsolidationRequestTxs(consolidationTxs []*dbtypes.ConsolidationRequ argIdx += fieldCount } fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{ - dbtypes.DBEnginePgsql: " ON CONFLICT (block_root, block_index) DO UPDATE SET source_index = excluded.source_index, target_index = excluded.target_index, fork_id = excluded.fork_id", + dbtypes.DBEnginePgsql: " ON CONFLICT (block_root, block_index) DO UPDATE SET source_index = excluded.source_index, target_index = excluded.target_index, dequeue_block = excluded.dequeue_block, fork_id = excluded.fork_id", dbtypes.DBEngineSqlite: "", })) diff --git a/db/schema/pgsql/20250207221132_request-results.sql b/db/schema/pgsql/20250207221132_request-results.sql index a85525ff..cd5b7f8b 100644 --- a/db/schema/pgsql/20250207221132_request-results.sql +++ b/db/schema/pgsql/20250207221132_request-results.sql @@ -2,10 +2,10 @@ -- +goose StatementBegin ALTER TABLE public."consolidation_requests" - ADD "result" TINYINT NOT NULL DEFAULT 0; + ADD "result" smallint NOT NULL DEFAULT 0; ALTER TABLE public."withdrawal_requests" - ADD "result" TINYINT NOT NULL DEFAULT 0; + ADD "result" smallint NOT NULL DEFAULT 0; -- +goose StatementEnd -- +goose Down diff --git a/db/withdrawal_request_txs.go b/db/withdrawal_request_txs.go index 9da94427..1ded2643 100644 --- a/db/withdrawal_request_txs.go +++ b/db/withdrawal_request_txs.go @@ -52,7 +52,7 @@ func InsertWithdrawalRequestTxs(withdrawalTxs []*dbtypes.WithdrawalRequestTx, tx argIdx += fieldCount } fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{ - dbtypes.DBEnginePgsql: " ON CONFLICT (block_root, block_index) DO UPDATE SET validator_index = excluded.validator_index, fork_id = excluded.fork_id", + dbtypes.DBEnginePgsql: " ON CONFLICT (block_root, block_index) DO UPDATE SET validator_index = excluded.validator_index, dequeue_block = excluded.dequeue_block, fork_id = excluded.fork_id", dbtypes.DBEngineSqlite: "", })) From 54b1604f890feb075c290484421204060c7bc4bc Mon Sep 17 00:00:00 2001 From: pk910 Date: Sun, 16 Feb 2025 15:50:26 +0100 Subject: [PATCH 16/18] fix dequeue block calculation across multiple crawler calls (bad state preservation) --- indexer/execution/contract_indexer.go | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/indexer/execution/contract_indexer.go b/indexer/execution/contract_indexer.go index 55f4dd0d..5b61c43c 100644 --- a/indexer/execution/contract_indexer.go +++ b/indexer/execution/contract_indexer.go @@ -311,8 +311,9 @@ func (ci *contractIndexer[TxType]) processFinalizedBlocks(finalizedBlockNumber u } // calculate how many requests were dequeued at the end of the current block range - if ci.options.dequeueRate > 0 && queueBlock < toBlock { - dequeuedRequests := (toBlock - queueBlock) * ci.options.dequeueRate + if ci.options.dequeueRate > 0 { + // we need to add 1 to the block range as we want to preserve the queue state after the last block in the range + dequeuedRequests := (toBlock - queueBlock + 1) * ci.options.dequeueRate if dequeuedRequests > queueLength { queueLength = 0 } else { @@ -402,12 +403,6 @@ func (ci *contractIndexer[TxType]) processRecentBlocksForFork(headFork *forkWith }() queueBlock := startBlockNumber - // we start crawling from the next block, so we need to decrease the queue length for the current block - if queueLength > ci.options.dequeueRate { - queueLength -= ci.options.dequeueRate - } else { - queueLength = 0 - } // process blocks in range until the head el block is reached for startBlockNumber <= elHeadBlockNumber { @@ -527,17 +522,17 @@ func (ci *contractIndexer[TxType]) processRecentBlocksForFork(headFork *forkWith } // calculate how many requests were dequeued at the end of the current block range - if queueBlock < toBlock { - dequeuedRequests := (toBlock - queueBlock) * ci.options.dequeueRate + if ci.options.dequeueRate > 0 { + dequeuedRequests := (toBlock - queueBlock + 1) * ci.options.dequeueRate if dequeuedRequests > queueLength { queueLength = 0 } else { queueLength -= dequeuedRequests } - - queueBlock = toBlock } + queueBlock = toBlock + if len(requestTxs) > 0 { ci.logger.Infof("crawled recent contract logs for fork %v (%v-%v): %v events", headFork.forkId, startBlockNumber, toBlock, len(requestTxs)) } From 909d9d7126cb8cacddd7bdbe51e4f872687e6214 Mon Sep 17 00:00:00 2001 From: pk910 Date: Mon, 17 Feb 2025 01:48:16 +0100 Subject: [PATCH 17/18] cleanup dead code --- indexer/beacon/state_sim.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/indexer/beacon/state_sim.go b/indexer/beacon/state_sim.go index a65a1067..d6e86c24 100644 --- a/indexer/beacon/state_sim.go +++ b/indexer/beacon/state_sim.go @@ -504,19 +504,15 @@ func (sim *stateSimulator) replayBlockResults(block *Block) [][]uint8 { return sim.prevState.blockResults } - // replay parent blocks up to the current block and apply all operations for relevant indices + // replay parent blocks up to the current block and apply all relevant operations for _, parentBlock := range parentBlocks { sim.applyBlock(parentBlock) } - // get consolidations from block - blockBody := block.GetBlock() - if blockBody == nil { - return nil - } - + // apply current block and store results results := sim.applyBlock(block) sim.prevState.blockResults = results block.blockResults = results + return results } From 61d9eba712f2373767dac55857cfa27e25c8d8cf Mon Sep 17 00:00:00 2001 From: pk910 Date: Mon, 17 Feb 2025 01:54:18 +0100 Subject: [PATCH 18/18] fix show more consolidations link --- templates/validator/consolidationRequests.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/templates/validator/consolidationRequests.html b/templates/validator/consolidationRequests.html index afb0e213..b825bb2e 100644 --- a/templates/validator/consolidationRequests.html +++ b/templates/validator/consolidationRequests.html @@ -100,7 +100,7 @@ {{ if gt .AdditionalConsolidationRequestCount 0 }} - View {{ .AdditionalConsolidationRequestCount }} more consolidation requests + View {{ .AdditionalConsolidationRequestCount }} more consolidation requests {{ end }}