Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitor attestations #9901

Merged
merged 11 commits into from
Nov 19, 2021
Merged
15 changes: 15 additions & 0 deletions beacon-chain/monitor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,26 @@ go_library(
name = "go_default_library",
srcs = [
"doc.go",
"process_attestation.go",
"process_block.go",
"process_exit.go",
"service.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/monitor",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/core/altair:go_default_library",
"//beacon-chain/core/blocks:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//config/params:go_default_library",
"//encoding/bytesutil:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/attestation:go_default_library",
"//proto/prysm/v1alpha1/block:go_default_library",
"//runtime/version:go_default_library",
"//time/slots:go_default_library",
"@com_github_prysmaticlabs_eth2_types//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
Expand All @@ -23,18 +32,24 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"process_attestation_test.go",
"process_block_test.go",
"process_exit_test.go",
"service_test.go",
],
embed = [":go_default_library"],
deps = [
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//config/params:go_default_library",
"//encoding/bytesutil:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/wrapper:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"@com_github_prysmaticlabs_eth2_types//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
],
)
213 changes: 213 additions & 0 deletions beacon-chain/monitor/process_attestation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package monitor

import (
"context"
"fmt"

types "github.com/prysmaticlabs/eth2-types"
"github.com/prysmaticlabs/prysm/beacon-chain/core/altair"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/config/params"
"github.com/prysmaticlabs/prysm/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/attestation"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/block"
"github.com/prysmaticlabs/prysm/runtime/version"
"github.com/prysmaticlabs/prysm/time/slots"
"github.com/sirupsen/logrus"
)

// newerSlotFromTrackedVal returns true if the validator is tracked and if the
// given slot is different than the last attested slot from this validator.
func (s *Service) newerSlotFromTrackedVal(idx types.ValidatorIndex, slot types.Slot) bool {
potuz marked this conversation as resolved.
Show resolved Hide resolved
if !s.TrackedIndex(types.ValidatorIndex(idx)) {
return false
}

if lp, ok := s.latestPerformance[types.ValidatorIndex(idx)]; ok {
return lp.attestedSlot != slot
}
return false
}

// attestingIndices returns the indices of validators that appear in the
// given aggregated atestation.
func attestingIndices(ctx context.Context, state state.BeaconState, att *ethpb.Attestation) ([]uint64, error) {
committee, err := helpers.BeaconCommitteeFromState(ctx, state, att.Data.Slot, att.Data.CommitteeIndex)
if err != nil {
return nil, err
}
return attestation.AttestingIndices(att.AggregationBits, committee)
}

// logMessageTimelyFlagsForIndex returns the log message with the basic
// performance indicators for the attestation (head, source, target)
func logMessageTimelyFlagsForIndex(idx types.ValidatorIndex, data *ethpb.AttestationData) logrus.Fields {
prestonvanloon marked this conversation as resolved.
Show resolved Hide resolved
return logrus.Fields{
"ValidatorIndex": idx,
"Slot": data.Slot,
"Source": fmt.Sprintf("%#x", bytesutil.Trunc(data.Source.Root)),
"Target": fmt.Sprintf("%#x", bytesutil.Trunc(data.Target.Root)),
"Head": fmt.Sprintf("%#x", bytesutil.Trunc(data.BeaconBlockRoot)),
}
}

// processAttestations logs the event that one of our tracked validators'
// attestations was included in a block
func (s *Service) processAttestations(ctx context.Context, state state.BeaconState, blk block.BeaconBlock) {
if blk == nil || blk.Body() == nil {
return
}
for _, attestation := range blk.Body().Attestations() {
s.processIncludedAttestation(ctx, state, attestation)
}
}

// processIncludedAttestation logs in the event that one of our tracked validators'
// appears in the attesting indices and this included attestation was not included
// before.
func (s *Service) processIncludedAttestation(ctx context.Context, state state.BeaconState, att *ethpb.Attestation) {
attestingIndices, err := attestingIndices(ctx, state, att)
if err != nil {
log.WithError(err).Error("Could not get attesting indices")
return
}
for _, idx := range attestingIndices {
if s.newerSlotFromTrackedVal(types.ValidatorIndex(idx), att.Data.Slot) {
logFields := logMessageTimelyFlagsForIndex(types.ValidatorIndex(idx), att.Data)
balance, err := state.BalanceAtIndex(types.ValidatorIndex(idx))
if err != nil {
log.WithError(err).Error("Could not get balance")
return
}

aggregatedPerf := s.aggregatedPerformance[types.ValidatorIndex(idx)]
aggregatedPerf.totalAttestedCount++
aggregatedPerf.totalRequestedCount++

latestPerf := s.latestPerformance[types.ValidatorIndex(idx)]
balanceChg := balance - latestPerf.balance
latestPerf.balanceChange = balanceChg
latestPerf.balance = balance
latestPerf.attestedSlot = att.Data.Slot
latestPerf.inclusionSlot = state.Slot()
potuz marked this conversation as resolved.
Show resolved Hide resolved
aggregatedPerf.totalDistance += uint64(latestPerf.inclusionSlot - latestPerf.attestedSlot)

if state.Version() == version.Altair {
targetIdx := params.BeaconConfig().TimelyTargetFlagIndex
sourceIdx := params.BeaconConfig().TimelySourceFlagIndex
headIdx := params.BeaconConfig().TimelyHeadFlagIndex

var participation []byte
if slots.ToEpoch(latestPerf.inclusionSlot) ==
slots.ToEpoch(latestPerf.attestedSlot) {
participation, err = state.CurrentEpochParticipation()
if err != nil {
log.WithError(err).Error("Could not get current epoch participation")
return
}
} else {
participation, err = state.PreviousEpochParticipation()
if err != nil {
log.WithError(err).Error("Could not get previous epoch participation")
return
}
}
flags := participation[idx]
hasFlag, err := altair.HasValidatorFlag(flags, sourceIdx)
if err != nil {
log.WithError(err).Error("Could not get timely Source flag")
return
}
latestPerf.timelySource = hasFlag
hasFlag, err = altair.HasValidatorFlag(flags, headIdx)
if err != nil {
log.WithError(err).Error("Could not get timely Head flag")
return
}
latestPerf.timelyHead = hasFlag
hasFlag, err = altair.HasValidatorFlag(flags, targetIdx)
if err != nil {
log.WithError(err).Error("Could not get timely Target flag")
return
}
latestPerf.timelyTarget = hasFlag

if latestPerf.timelySource {
aggregatedPerf.totalCorrectSource++
}
if latestPerf.timelyHead {
aggregatedPerf.totalCorrectHead++
}
if latestPerf.timelyTarget {
aggregatedPerf.totalCorrectTarget++
}
}
logFields["CorrectHead"] = latestPerf.timelyHead
logFields["CorrectSource"] = latestPerf.timelySource
logFields["CorrectTarget"] = latestPerf.timelyTarget
logFields["Inclusion Slot"] = latestPerf.inclusionSlot
potuz marked this conversation as resolved.
Show resolved Hide resolved
logFields["NewBalance"] = balance
logFields["BalanceChange"] = balanceChg

s.latestPerformance[types.ValidatorIndex(idx)] = latestPerf
s.aggregatedPerformance[types.ValidatorIndex(idx)] = aggregatedPerf
log.WithFields(logFields).Info("Attestation included")
}
}
}

// processUnaggregatedAttestation logs when the beacon node sees an unaggregated attestation from one of our
// tracked validators
func (s *Service) processUnaggregatedAttestation(ctx context.Context, att *ethpb.Attestation) {
root := bytesutil.ToBytes32(att.Data.BeaconBlockRoot)
state := s.config.StateGen.StateByRootIfCachedNoCopy(root)
if state == nil {
log.Debug("Skipping unnagregated attestation due to state not found in cache")
potuz marked this conversation as resolved.
Show resolved Hide resolved
return
}
attestingIndices, err := attestingIndices(ctx, state, att)
if err != nil {
log.WithError(err).Error("Could not get attesting indices")
return
}
for _, idx := range attestingIndices {
if s.newerSlotFromTrackedVal(types.ValidatorIndex(idx), att.Data.Slot) {
logFields := logMessageTimelyFlagsForIndex(types.ValidatorIndex(idx), att.Data)
log.WithFields(logFields).Info("Processed unaggregated attestation")
}
}
}

// processAggregatedAttestation logs when we see an aggregation from one of our tracked validators or an aggregated
// attestation from one of our tracked validators
func (s *Service) processAggregatedAttestation(ctx context.Context, att *ethpb.AggregateAttestationAndProof) {
if s.TrackedIndex(att.AggregatorIndex) {
log.WithFields(logrus.Fields{
"ValidatorIndex": att.AggregatorIndex,
}).Info("Processed attestation aggregation")
aggregatedPerf := s.aggregatedPerformance[att.AggregatorIndex]
aggregatedPerf.totalAggregations++
s.aggregatedPerformance[att.AggregatorIndex] = aggregatedPerf
}

var root [32]byte
copy(root[:], att.Aggregate.Data.BeaconBlockRoot)
state := s.config.StateGen.StateByRootIfCachedNoCopy(root)
if state == nil {
log.Debug("Skipping agregated attestation due to state not found in cache")
return
}
attestingIndices, err := attestingIndices(ctx, state, att.Aggregate)
if err != nil {
log.WithError(err).Error("Could not get attesting indices")
return
}
for _, idx := range attestingIndices {
if s.newerSlotFromTrackedVal(types.ValidatorIndex(idx), att.Aggregate.Data.Slot) {
logFields := logMessageTimelyFlagsForIndex(types.ValidatorIndex(idx), att.Aggregate.Data)
log.WithFields(logFields).Info("Processed aggregated attestation")
}
}
}
Loading