Skip to content

Commit

Permalink
Monitor attestations (#9901)
Browse files Browse the repository at this point in the history
Log attestation performance on the validator monitor
  • Loading branch information
potuz authored Nov 19, 2021
1 parent cae58bb commit 50159c2
Show file tree
Hide file tree
Showing 8 changed files with 596 additions and 1 deletion.
15 changes: 15 additions & 0 deletions beacon-chain/monitor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,26 @@ go_library(
name = "go_default_library",
srcs = [
"doc.go",
"process_attestation.go",
"process_block.go",
"process_exit.go",
"service.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/monitor",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/core/altair:go_default_library",
"//beacon-chain/core/blocks:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//config/params:go_default_library",
"//encoding/bytesutil:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/attestation:go_default_library",
"//proto/prysm/v1alpha1/block:go_default_library",
"//runtime/version:go_default_library",
"//time/slots:go_default_library",
"@com_github_prysmaticlabs_eth2_types//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
Expand All @@ -23,18 +32,24 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"process_attestation_test.go",
"process_block_test.go",
"process_exit_test.go",
"service_test.go",
],
embed = [":go_default_library"],
deps = [
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//config/params:go_default_library",
"//encoding/bytesutil:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/wrapper:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"@com_github_prysmaticlabs_eth2_types//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
],
)
213 changes: 213 additions & 0 deletions beacon-chain/monitor/process_attestation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package monitor

import (
"context"
"fmt"

types "github.com/prysmaticlabs/eth2-types"
"github.com/prysmaticlabs/prysm/beacon-chain/core/altair"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/config/params"
"github.com/prysmaticlabs/prysm/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/attestation"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/block"
"github.com/prysmaticlabs/prysm/runtime/version"
"github.com/prysmaticlabs/prysm/time/slots"
"github.com/sirupsen/logrus"
)

// updatedPerformanceFromTrackedVal returns true if the validator is tracked and if the
// given slot is different than the last attested slot from this validator.
func (s *Service) updatedPerformanceFromTrackedVal(idx types.ValidatorIndex, slot types.Slot) bool {
if !s.TrackedIndex(types.ValidatorIndex(idx)) {
return false
}

if lp, ok := s.latestPerformance[types.ValidatorIndex(idx)]; ok {
return lp.attestedSlot != slot
}
return false
}

// attestingIndices returns the indices of validators that appear in the
// given aggregated atestation.
func attestingIndices(ctx context.Context, state state.BeaconState, att *ethpb.Attestation) ([]uint64, error) {
committee, err := helpers.BeaconCommitteeFromState(ctx, state, att.Data.Slot, att.Data.CommitteeIndex)
if err != nil {
return nil, err
}
return attestation.AttestingIndices(att.AggregationBits, committee)
}

// logMessageTimelyFlagsForIndex returns the log message with the basic
// performance indicators for the attestation (head, source, target)
func logMessageTimelyFlagsForIndex(idx types.ValidatorIndex, data *ethpb.AttestationData) logrus.Fields {
return logrus.Fields{
"ValidatorIndex": idx,
"Slot": data.Slot,
"Source": fmt.Sprintf("%#x", bytesutil.Trunc(data.Source.Root)),
"Target": fmt.Sprintf("%#x", bytesutil.Trunc(data.Target.Root)),
"Head": fmt.Sprintf("%#x", bytesutil.Trunc(data.BeaconBlockRoot)),
}
}

// processAttestations logs the event that one of our tracked validators'
// attestations was included in a block
func (s *Service) processAttestations(ctx context.Context, state state.BeaconState, blk block.BeaconBlock) {
if blk == nil || blk.Body() == nil {
return
}
for _, attestation := range blk.Body().Attestations() {
s.processIncludedAttestation(ctx, state, attestation)
}
}

// processIncludedAttestation logs in the event that one of our tracked validators'
// appears in the attesting indices and this included attestation was not included
// before.
func (s *Service) processIncludedAttestation(ctx context.Context, state state.BeaconState, att *ethpb.Attestation) {
attestingIndices, err := attestingIndices(ctx, state, att)
if err != nil {
log.WithError(err).Error("Could not get attesting indices")
return
}
for _, idx := range attestingIndices {
if s.updatedPerformanceFromTrackedVal(types.ValidatorIndex(idx), att.Data.Slot) {
logFields := logMessageTimelyFlagsForIndex(types.ValidatorIndex(idx), att.Data)
balance, err := state.BalanceAtIndex(types.ValidatorIndex(idx))
if err != nil {
log.WithError(err).Error("Could not get balance")
return
}

aggregatedPerf := s.aggregatedPerformance[types.ValidatorIndex(idx)]
aggregatedPerf.totalAttestedCount++
aggregatedPerf.totalRequestedCount++

latestPerf := s.latestPerformance[types.ValidatorIndex(idx)]
balanceChg := balance - latestPerf.balance
latestPerf.balanceChange = balanceChg
latestPerf.balance = balance
latestPerf.attestedSlot = att.Data.Slot
latestPerf.inclusionSlot = state.Slot()
aggregatedPerf.totalDistance += uint64(latestPerf.inclusionSlot - latestPerf.attestedSlot)

if state.Version() == version.Altair {
targetIdx := params.BeaconConfig().TimelyTargetFlagIndex
sourceIdx := params.BeaconConfig().TimelySourceFlagIndex
headIdx := params.BeaconConfig().TimelyHeadFlagIndex

var participation []byte
if slots.ToEpoch(latestPerf.inclusionSlot) ==
slots.ToEpoch(latestPerf.attestedSlot) {
participation, err = state.CurrentEpochParticipation()
if err != nil {
log.WithError(err).Error("Could not get current epoch participation")
return
}
} else {
participation, err = state.PreviousEpochParticipation()
if err != nil {
log.WithError(err).Error("Could not get previous epoch participation")
return
}
}
flags := participation[idx]
hasFlag, err := altair.HasValidatorFlag(flags, sourceIdx)
if err != nil {
log.WithError(err).Error("Could not get timely Source flag")
return
}
latestPerf.timelySource = hasFlag
hasFlag, err = altair.HasValidatorFlag(flags, headIdx)
if err != nil {
log.WithError(err).Error("Could not get timely Head flag")
return
}
latestPerf.timelyHead = hasFlag
hasFlag, err = altair.HasValidatorFlag(flags, targetIdx)
if err != nil {
log.WithError(err).Error("Could not get timely Target flag")
return
}
latestPerf.timelyTarget = hasFlag

if latestPerf.timelySource {
aggregatedPerf.totalCorrectSource++
}
if latestPerf.timelyHead {
aggregatedPerf.totalCorrectHead++
}
if latestPerf.timelyTarget {
aggregatedPerf.totalCorrectTarget++
}
}
logFields["CorrectHead"] = latestPerf.timelyHead
logFields["CorrectSource"] = latestPerf.timelySource
logFields["CorrectTarget"] = latestPerf.timelyTarget
logFields["InclusionSlot"] = latestPerf.inclusionSlot
logFields["NewBalance"] = balance
logFields["BalanceChange"] = balanceChg

s.latestPerformance[types.ValidatorIndex(idx)] = latestPerf
s.aggregatedPerformance[types.ValidatorIndex(idx)] = aggregatedPerf
log.WithFields(logFields).Info("Attestation included")
}
}
}

// processUnaggregatedAttestation logs when the beacon node sees an unaggregated attestation from one of our
// tracked validators
func (s *Service) processUnaggregatedAttestation(ctx context.Context, att *ethpb.Attestation) {
root := bytesutil.ToBytes32(att.Data.BeaconBlockRoot)
state := s.config.StateGen.StateByRootIfCachedNoCopy(root)
if state == nil {
log.Debug("Skipping unaggregated attestation due to state not found in cache")
return
}
attestingIndices, err := attestingIndices(ctx, state, att)
if err != nil {
log.WithError(err).Error("Could not get attesting indices")
return
}
for _, idx := range attestingIndices {
if s.updatedPerformanceFromTrackedVal(types.ValidatorIndex(idx), att.Data.Slot) {
logFields := logMessageTimelyFlagsForIndex(types.ValidatorIndex(idx), att.Data)
log.WithFields(logFields).Info("Processed unaggregated attestation")
}
}
}

// processAggregatedAttestation logs when we see an aggregation from one of our tracked validators or an aggregated
// attestation from one of our tracked validators
func (s *Service) processAggregatedAttestation(ctx context.Context, att *ethpb.AggregateAttestationAndProof) {
if s.TrackedIndex(att.AggregatorIndex) {
log.WithFields(logrus.Fields{
"ValidatorIndex": att.AggregatorIndex,
}).Info("Processed attestation aggregation")
aggregatedPerf := s.aggregatedPerformance[att.AggregatorIndex]
aggregatedPerf.totalAggregations++
s.aggregatedPerformance[att.AggregatorIndex] = aggregatedPerf
}

var root [32]byte
copy(root[:], att.Aggregate.Data.BeaconBlockRoot)
state := s.config.StateGen.StateByRootIfCachedNoCopy(root)
if state == nil {
log.Debug("Skipping agregated attestation due to state not found in cache")
return
}
attestingIndices, err := attestingIndices(ctx, state, att.Aggregate)
if err != nil {
log.WithError(err).Error("Could not get attesting indices")
return
}
for _, idx := range attestingIndices {
if s.updatedPerformanceFromTrackedVal(types.ValidatorIndex(idx), att.Aggregate.Data.Slot) {
logFields := logMessageTimelyFlagsForIndex(types.ValidatorIndex(idx), att.Aggregate.Data)
log.WithFields(logFields).Info("Processed aggregated attestation")
}
}
}
Loading

0 comments on commit 50159c2

Please sign in to comment.