Skip to content

Commit

Permalink
Log attestation performance on the validator monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
potuz committed Nov 13, 2021
1 parent b93b8db commit 28c90c9
Show file tree
Hide file tree
Showing 8 changed files with 578 additions and 1 deletion.
14 changes: 14 additions & 0 deletions beacon-chain/monitor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "go_default_library",
srcs = [
"doc.go",
"process_attestation.go",
"process_block.go",
"process_exit.go",
"service.go",
Expand All @@ -12,9 +13,16 @@ go_library(
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/core/blocks:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//config/params:go_default_library",
"//encoding/bytesutil:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/attestation:go_default_library",
"//proto/prysm/v1alpha1/block:go_default_library",
"//runtime/version:go_default_library",
"//time/slots:go_default_library",
"@com_github_prysmaticlabs_eth2_types//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
Expand All @@ -23,18 +31,24 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"process_attestation_test.go",
"process_block_test.go",
"process_exit_test.go",
"service_test.go",
],
embed = [":go_default_library"],
deps = [
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//config/params:go_default_library",
"//encoding/bytesutil:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/wrapper:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"@com_github_prysmaticlabs_eth2_types//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
],
)
192 changes: 192 additions & 0 deletions beacon-chain/monitor/process_attestation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package monitor

import (
"fmt"

types "github.com/prysmaticlabs/eth2-types"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/config/params"
"github.com/prysmaticlabs/prysm/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/attestation"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/block"
"github.com/prysmaticlabs/prysm/runtime/version"
"github.com/prysmaticlabs/prysm/time/slots"
"github.com/sirupsen/logrus"
)

// getAttestingIndices returns the indices of validators that appear in the
// given aggregated atestation.
func (s *Service) getAttestingIndices(state state.BeaconState, att *ethpb.Attestation) ([]uint64, error) {
committee, err := helpers.BeaconCommitteeFromState(s.ctx, state, att.Data.Slot, att.Data.CommitteeIndex)
if err != nil {
return nil, err
}
return attestation.AttestingIndices(att.AggregationBits, committee)
}

// logMessageTimelyFlagsForIndex returns the log message with the basic
// performance indicators for the attestation (head, source, target)
func logMessageTimelyFlagsForIndex(idx uint64, data *ethpb.AttestationData) logrus.Fields {
return logrus.Fields{
"ValidatorIndex": idx,
"Slot": data.Slot,
"Source": fmt.Sprintf("%#x", bytesutil.Trunc(data.Source.Root)),
"Target": fmt.Sprintf("%#x", bytesutil.Trunc(data.Target.Root)),
"Head": fmt.Sprintf("%#x", bytesutil.Trunc(data.BeaconBlockRoot)),
}
}

// processAttestations logs the event that one of our tracked validators'
// attestations was included in a block
func (s *Service) processAttestations(state state.BeaconState, blk block.BeaconBlock) {
for _, attestation := range blk.Body().Attestations() {
s.processIncludedAttestation(state, attestation)
}
}

// processIncludedAttestation logs in the event that one of our tracked validators'
// appears in the attesting indices and this included attestation was not included
// before.
func (s *Service) processIncludedAttestation(state state.BeaconState, att *ethpb.Attestation) {
attestingIndices, err := s.getAttestingIndices(state, att)
if err != nil {
log.WithError(err).Error("Could not get attesting indices")
return
}
for _, idx := range attestingIndices {
if s.TrackedIndex(types.ValidatorIndex(idx)) &&
s.latestPerformance[types.ValidatorIndex(idx)].attestedSlot != att.Data.Slot {
logFields := logMessageTimelyFlagsForIndex(idx, att.Data)
balance, err := state.BalanceAtIndex(types.ValidatorIndex(idx))
if err != nil {
log.WithError(err).Error("Could not get balance")
return
}

aggregatedPerf := s.aggregatedPerformance[types.ValidatorIndex(idx)]
aggregatedPerf.totalAttestedCount++
aggregatedPerf.totalRequestedCount++

latestPerf := s.latestPerformance[types.ValidatorIndex(idx)]
balanceChg := balance - latestPerf.balance
latestPerf.balanceChange = balanceChg
latestPerf.balance = balance
latestPerf.attestedSlot = att.Data.Slot
latestPerf.inclusionSlot = state.Slot()
aggregatedPerf.totalDistance += uint64(latestPerf.inclusionSlot - latestPerf.attestedSlot)

if state.Version() == version.Altair {
targetIdx := params.BeaconConfig().TimelyTargetFlagIndex
sourceIdx := params.BeaconConfig().TimelySourceFlagIndex
headIdx := params.BeaconConfig().TimelyHeadFlagIndex

var participation []byte
if slots.ToEpoch(latestPerf.inclusionSlot) ==
slots.ToEpoch(latestPerf.attestedSlot) {
participation, err = state.CurrentEpochParticipation()
if err != nil {
log.WithError(err).Error("Could not get current epoch participation")
return
}
} else {
participation, err = state.PreviousEpochParticipation()
if err != nil {
log.WithError(err).Error("Could not get previous epoch participation")
return
}
}
flags := participation[idx]
latestPerf.timelySource = ((flags >> sourceIdx) & 1) == 1
latestPerf.timelyHead = ((flags >> headIdx) & 1) == 1
latestPerf.timelyTarget = ((flags >> targetIdx) & 1) == 1

if latestPerf.timelySource {
aggregatedPerf.totalCorrectSource++
}
if latestPerf.timelyHead {
aggregatedPerf.totalCorrectHead++
}
if latestPerf.timelyTarget {
aggregatedPerf.totalCorrectTarget++
}
}
logFields["CorrectHead"] = latestPerf.timelyHead
logFields["CorrectSource"] = latestPerf.timelySource
logFields["CorrectTarget"] = latestPerf.timelyTarget
logFields["Inclusion Slot"] = latestPerf.inclusionSlot
logFields["NewBalance"] = balance
logFields["BalanceChange"] = balanceChg

s.latestPerformance[types.ValidatorIndex(idx)] = latestPerf
s.aggregatedPerformance[types.ValidatorIndex(idx)] = aggregatedPerf
log.WithFields(logFields).Info("Attestation included")
}
}
}

// processUnaggregatedAttestation logs when the beacon node sees an unaggregated attestation from one of our
// tracked validators
func (s *Service) processUnaggregatedAttestation(att *ethpb.Attestation) {
var root [32]byte
copy(root[:], att.Data.BeaconBlockRoot)
state, err := s.config.StateGen.StateByRootIfCached(s.ctx, root)
if err != nil {
log.WithError(err).Error("Could not query cache for state")
return
}
if state == nil {
log.Debug("Skipping unnagregated attestation due to state not found in cache")
return
}
attestingIndices, err := s.getAttestingIndices(state, att)
if err != nil {
log.WithError(err).Error("Could not get attesting indices")
return
}
for _, idx := range attestingIndices {
if s.TrackedIndex(types.ValidatorIndex(idx)) &&
s.latestPerformance[types.ValidatorIndex(idx)].attestedSlot != att.Data.Slot {
logFields := logMessageTimelyFlagsForIndex(idx, att.Data)
log.WithFields(logFields).Info("Processed unaggregated attestation")
}
}
}

// processAggregatedAttestation logs when we see an aggregation from one of our tracked validators or an aggregated
// attestation from one of our tracked validators
func (s *Service) processAggregatedAttestation(att *ethpb.AggregateAttestationAndProof) {
if s.TrackedIndex(att.AggregatorIndex) {
log.WithFields(logrus.Fields{
"ValidatorIndex": att.AggregatorIndex,
}).Info("Processed attestation aggregation")
aggregatedPerf := s.aggregatedPerformance[att.AggregatorIndex]
aggregatedPerf.totalAggregations++
s.aggregatedPerformance[att.AggregatorIndex] = aggregatedPerf
}

var root [32]byte
copy(root[:], att.Aggregate.Data.BeaconBlockRoot)
state, err := s.config.StateGen.StateByRootIfCached(s.ctx, root)
if err != nil {
log.WithError(err).Error("Could not query cache for state")
return
}
if state == nil {
log.Debug("Skipping agregated attestation due to state not found in cache")
return
}
attestingIndices, err := s.getAttestingIndices(state, att.Aggregate)
if err != nil {
log.WithError(err).Error("Could not get attesting indices")
return
}
for _, idx := range attestingIndices {
if s.TrackedIndex(types.ValidatorIndex(idx)) &&
s.latestPerformance[types.ValidatorIndex(idx)].attestedSlot != att.Aggregate.Data.Slot {
logFields := logMessageTimelyFlagsForIndex(idx, att.Aggregate.Data)
log.WithFields(logFields).Info("Processed aggregated attestation")
}
}
}
Loading

0 comments on commit 28c90c9

Please sign in to comment.