Skip to content

Commit

Permalink
Gm/ssv 137 add pre and post consensus logs (#1387)
Browse files Browse the repository at this point in the history
* add more logs to ProcessPreConsensus

* add logs to ProcessConsensus and ProcessPostConsensus to give more sense about time consuming by diff components

* change gitlab ci to test on stage

* add logs [attester] to ProcessConsensus and ProcessPostConsensus to give more sense about time consuming by diff components

* change github ci in order to test on stage

* add symbols to validator registration logs

* fix typo mistake

* fix text to align spec

* align all runners logs

* add got decided to attester runner

* add logs to attester runner

* add logs to sync_committee runner

* pushing to nodes 61-64

* refactors (still WIP)

* fixes

* rename

* log

* fix

* refactor

* refactors

* add slots to logs

* bugfix

* deploy stage + logs PR to 1-60

* approve spec diffs

* revert gitlab

* deploy stage + logs PR to nodes 1-60

* revert gitlab

* review

* fixed log

* improve aggregators logs

* deploy to 13-60

* fixed agg consensus time

* approve spec diffs

* fix logs

* log block hash in submission errors

* fix block hash log

* approve spec diff

* revert gitlab

---------

Co-authored-by: guy muroch <guym@blox.io>
Co-authored-by: moshe-blox <moshe@blox.io>
  • Loading branch information
3 people authored May 26, 2024
1 parent 26a7672 commit cbe6254
Show file tree
Hide file tree
Showing 14 changed files with 301 additions and 120 deletions.
45 changes: 43 additions & 2 deletions logging/fields/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,15 @@ const (
FieldClusterIndex = "cluster_index"
FieldConfig = "config"
FieldConnectionID = "connection_id"
FieldPreConsensusTime = "pre_consensus_time"
FieldConsensusTime = "consensus_time"
FieldPostConsensusTime = "post_consensus_time"
FieldQuorumTime = "quorum_time"
FieldDecidedTime = "decided_time"
FieldBlockTime = "block_time"
FieldBeaconDataTime = "beacon_data_time"
FieldBlockRootTime = "block_root_time"
FieldBroadcastTime = "broadcast_time"
FieldSubmissionTime = "submission_time"
FieldCount = "count"
FieldTook = "took"
Expand Down Expand Up @@ -268,12 +276,45 @@ func Topic(val string) zap.Field {
return zap.String(FieldTopic, val)
}

func PreConsensusTime(val time.Duration) zap.Field {
return zap.String(FieldPreConsensusTime, FormatDuration(val))
}

func ConsensusTime(val time.Duration) zap.Field {
return zap.String(FieldConsensusTime, strconv.FormatFloat(val.Seconds(), 'f', 5, 64))
return zap.String(FieldConsensusTime, FormatDuration(val))
}

func PostConsensusTime(val time.Duration) zap.Field {
return zap.String(FieldPostConsensusTime, FormatDuration(val))
}

func QuorumTime(val time.Duration) zap.Field {
return zap.String(FieldQuorumTime, FormatDuration(val))
}
func DecidedTime(val time.Duration) zap.Field {
return zap.String(FieldDecidedTime, FormatDuration(val))
}

func BlockTime(val time.Duration) zap.Field {
return zap.String(FieldBlockTime, FormatDuration(val))
}
func BeaconDataTime(val time.Duration) zap.Field {
return zap.String(FieldBeaconDataTime, FormatDuration(val))
}
func BlockRootTime(val time.Duration) zap.Field {
return zap.String(FieldBlockRootTime, FormatDuration(val))
}

func SubmissionTime(val time.Duration) zap.Field {
return zap.String(FieldSubmissionTime, strconv.FormatFloat(val.Seconds(), 'f', 5, 64))
return zap.String(FieldSubmissionTime, FormatDuration(val))
}

func BroadcastTime(val time.Duration) zap.Field {
return zap.String(FieldBroadcastTime, FormatDuration(val))
}

func FormatDuration(val time.Duration) string {
return strconv.FormatFloat(val.Seconds(), 'f', 5, 64)
}

func DutyID(val string) zap.Field {
Expand Down
4 changes: 3 additions & 1 deletion protocol/v2/qbft/instance/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package instance
import (
"bytes"
"sort"
"time"

"github.com/pkg/errors"
specqbft "github.com/ssvlabs/ssv-spec/qbft"
Expand Down Expand Up @@ -47,7 +48,8 @@ func (i *Instance) UponCommit(logger *zap.Logger, signedCommit *specqbft.SignedM
logger.Debug("🎯 got commit quorum",
fields.Round(i.State.Round),
zap.Any("agg-signers", agg.Signers),
fields.Root(signedCommit.Message.Root))
fields.Root(signedCommit.Message.Root),
fields.QuorumTime(time.Since(i.started)))

i.metrics.EndStageCommit()

Expand Down
3 changes: 3 additions & 0 deletions protocol/v2/qbft/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package instance
import (
"encoding/json"
"sync"
"time"

"github.com/ssvlabs/ssv/logging/fields"

Expand All @@ -26,6 +27,7 @@ type Instance struct {
forceStop bool
StartValue []byte

started time.Time
metrics *metrics
}

Expand Down Expand Up @@ -65,6 +67,7 @@ func (i *Instance) Start(logger *zap.Logger, value []byte, height specqbft.Heigh
i.bumpToRound(specqbft.FirstRound)
i.State.Height = height
i.metrics.StartStage()
i.started = time.Now()

i.config.GetTimer().TimeoutForRound(height, specqbft.FirstRound)

Expand Down
5 changes: 4 additions & 1 deletion protocol/v2/qbft/instance/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package instance

import (
"bytes"
"time"

"github.com/pkg/errors"
specqbft "github.com/ssvlabs/ssv-spec/qbft"
Expand Down Expand Up @@ -49,7 +50,9 @@ func (i *Instance) uponPrepare(logger *zap.Logger, signedPrepare *specqbft.Signe
logger.Debug("🎯 got prepare quorum",
fields.Round(i.State.Round),
zap.Any("prepare-signers", allSigners(prepareMsgContainer.MessagesForRound(i.State.Round))),
fields.Root(proposedRoot))
fields.Root(proposedRoot),
fields.QuorumTime(time.Since(i.started)),
)

commitMsg, err := CreateCommit(i.State, i.config, proposedRoot)
if err != nil {
Expand Down
40 changes: 22 additions & 18 deletions protocol/v2/ssv/runner/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package runner

import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"time"

"github.com/attestantio/go-eth2-client/spec/phase0"
ssz "github.com/ferranbt/fastssz"
Expand All @@ -18,8 +20,7 @@ import (
)

type AggregatorRunner struct {
BaseRunner *BaseRunner

BaseRunner *BaseRunner
beacon specssv.BeaconNode
network specssv.Network
signer spectypes.KeyManager
Expand Down Expand Up @@ -80,8 +81,6 @@ func (r *AggregatorRunner) ProcessPreConsensus(logger *zap.Logger, signedMsg *sp
return nil
}

r.metrics.EndPreConsensus()

// only 1 root, verified by basePreConsensusMsgProcessing
root := roots[0]
// reconstruct selection proof sig
Expand All @@ -91,24 +90,16 @@ func (r *AggregatorRunner) ProcessPreConsensus(logger *zap.Logger, signedMsg *sp
r.BaseRunner.FallBackAndVerifyEachSignature(r.GetState().PreConsensusContainer, root)
return errors.Wrap(err, "got pre-consensus quorum but it has invalid signatures")
}

duty := r.GetState().StartingDuty

logger.Debug("🧩 got partial signature quorum",
zap.Any("signer", signedMsg.Signer),
fields.Slot(duty.Slot),
)
r.metrics.EndPreConsensus()

r.metrics.PauseDutyFullFlow()

// get block data
duty := r.GetState().StartingDuty
res, ver, err := r.GetBeaconNode().SubmitAggregateSelectionProof(duty.Slot, duty.CommitteeIndex, duty.CommitteeLength, duty.ValidatorIndex, fullSig)
if err != nil {
return errors.Wrap(err, "failed to submit aggregate and proof")
}

r.metrics.ContinueDutyFullFlow()
r.metrics.StartConsensus()

byts, err := res.MarshalSSZ()
if err != nil {
Expand All @@ -120,6 +111,7 @@ func (r *AggregatorRunner) ProcessPreConsensus(logger *zap.Logger, signedMsg *sp
DataSSZ: byts,
}

r.metrics.StartConsensus()
if err := r.BaseRunner.decide(logger, r, input); err != nil {
return errors.Wrap(err, "can't start new duty runner instance for duty")
}
Expand Down Expand Up @@ -193,7 +185,6 @@ func (r *AggregatorRunner) ProcessPostConsensus(logger *zap.Logger, signedMsg *s
if !quorum {
return nil
}

r.metrics.EndPostConsensus()

for _, root := range roots {
Expand All @@ -218,18 +209,31 @@ func (r *AggregatorRunner) ProcessPostConsensus(logger *zap.Logger, signedMsg *s
Signature: specSig,
}

proofSubmissionEnd := r.metrics.StartBeaconSubmission()
start := time.Now()
endSubmission := r.metrics.StartBeaconSubmission()

logger = logger.With(
zap.Uint64s("signers", getPostConsensusSigners(r.GetState(), root)),
fields.PreConsensusTime(r.metrics.GetPreConsensusTime()),
fields.ConsensusTime(r.metrics.GetConsensusTime()),
fields.PostConsensusTime(r.metrics.GetPostConsensusTime()),
zap.String("block_root", hex.EncodeToString(msg.Message.Aggregate.Data.BeaconBlockRoot[:])),
)
if err := r.GetBeaconNode().SubmitSignedAggregateSelectionProof(msg); err != nil {
r.metrics.RoleSubmissionFailed()
logger.Error("❌ could not submit to Beacon chain reconstructed contribution and proof",
fields.SubmissionTime(time.Since(start)),
zap.Error(err))
return errors.Wrap(err, "could not submit to Beacon chain reconstructed signed aggregate")
}

proofSubmissionEnd()
endSubmission()
r.metrics.EndDutyFullFlow(r.GetState().RunningInstance.State.Round)
r.metrics.RoleSubmitted()

logger.Debug("✅ successful submitted aggregate")
logger.Debug("✅ successful submitted aggregate",
fields.SubmissionTime(time.Since(start)),
)
}
r.GetState().Finished = true

Expand Down
49 changes: 25 additions & 24 deletions protocol/v2/ssv/runner/attester.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ type AttesterRunner struct {
operatorSigner spectypes.OperatorSigner
valCheck specqbft.ProposedValueCheckF

started time.Time
metrics metrics.ConsensusMetrics
}

Expand Down Expand Up @@ -86,7 +85,6 @@ func (r *AttesterRunner) ProcessConsensus(logger *zap.Logger, signedMsg *specqbf
if !decided {
return nil
}

r.metrics.EndConsensus()
r.metrics.StartPostConsensus()

Expand Down Expand Up @@ -139,17 +137,13 @@ func (r *AttesterRunner) ProcessPostConsensus(logger *zap.Logger, signedMsg *spe
return errors.Wrap(err, "failed processing post consensus message")
}

duty := r.GetState().DecidedValue.Duty
logger = logger.With(fields.Slot(duty.Slot))
logger.Debug("🧩 got partial signatures",
logger.Debug("🧩 got partial signature",
zap.Uint64("signer", signedMsg.Signer))

if !quorum {
return nil
}

r.metrics.EndPostConsensus()

attestationData, err := r.GetState().DecidedValue.GetAttestationData()
if err != nil {
return errors.Wrap(err, "could not get attestation data")
Expand All @@ -166,10 +160,12 @@ func (r *AttesterRunner) ProcessPostConsensus(logger *zap.Logger, signedMsg *spe
}
specSig := phase0.BLSSignature{}
copy(specSig[:], sig)
r.metrics.EndPostConsensus()

logger.Debug("🧩 reconstructed partial signatures",
zap.Uint64s("signers", getPostConsensusSigners(r.GetState(), root)))
endSubmission := r.metrics.StartBeaconSubmission()
startSubmissionTime := time.Now()

duty := r.GetState().DecidedValue.Duty
aggregationBitfield := bitfield.NewBitlist(r.GetState().DecidedValue.Duty.CommitteeLength)
aggregationBitfield.SetBitAt(duty.ValidatorCommitteeIndex, true)
signedAtt := &phase0.Attestation{
Expand All @@ -178,27 +174,30 @@ func (r *AttesterRunner) ProcessPostConsensus(logger *zap.Logger, signedMsg *spe
AggregationBits: aggregationBitfield,
}

attestationSubmissionEnd := r.metrics.StartBeaconSubmission()
consensusDuration := time.Since(r.started)

// Submit it to the BN.
start := time.Now()
logger = logger.With(
zap.Uint64s("signers", getPostConsensusSigners(r.GetState(), root)),
fields.BeaconDataTime(r.metrics.GetBeaconDataTime()),
fields.ConsensusTime(r.metrics.GetConsensusTime()),
fields.PostConsensusTime(r.metrics.GetPostConsensusTime()),
fields.Height(r.BaseRunner.QBFTController.Height),
fields.Round(r.GetState().RunningInstance.State.Round),
zap.String("block_root", hex.EncodeToString(signedAtt.Data.BeaconBlockRoot[:])),
)
if err := r.beacon.SubmitAttestation(signedAtt); err != nil {
r.metrics.RoleSubmissionFailed()
logger.Error("❌ failed to submit attestation", zap.Error(err))
logger.Error("❌ failed to submit attestation",
fields.SubmissionTime(time.Since(startSubmissionTime)),
zap.Error(err))
return errors.Wrap(err, "could not submit to Beacon chain reconstructed attestation")
}

attestationSubmissionEnd()
endSubmission()
r.metrics.EndDutyFullFlow(r.GetState().RunningInstance.State.Round)
r.metrics.RoleSubmitted()

logger.Info("✅ successfully submitted attestation",
zap.String("block_root", hex.EncodeToString(signedAtt.Data.BeaconBlockRoot[:])),
fields.ConsensusTime(consensusDuration),
fields.SubmissionTime(time.Since(start)),
fields.Height(r.BaseRunner.QBFTController.Height),
fields.Round(r.GetState().RunningInstance.State.Round))
fields.SubmissionTime(time.Since(startSubmissionTime)))
}
r.GetState().Finished = true

Expand Down Expand Up @@ -226,14 +225,15 @@ func (r *AttesterRunner) expectedPostConsensusRootsAndDomain() ([]ssz.HashRoot,
// 4) collect 2f+1 partial sigs, reconstruct and broadcast valid attestation sig to the BN
func (r *AttesterRunner) executeDuty(logger *zap.Logger, duty *spectypes.Duty) error {
start := time.Now()
r.metrics.StartBeaconData()
attData, ver, err := r.GetBeaconNode().GetAttestationData(duty.Slot, duty.CommitteeIndex)
if err != nil {
logger.Error("❌ failed to get attestation data",
fields.BeaconDataTime(time.Since(start)),
zap.Error(err))
return errors.Wrap(err, "failed to get attestation data")
}
logger = logger.With(zap.Duration("attestation_data_time", time.Since(start)))

r.started = time.Now()

r.metrics.EndBeaconData()
r.metrics.StartDutyFullFlow()
r.metrics.StartConsensus()

Expand All @@ -248,6 +248,7 @@ func (r *AttesterRunner) executeDuty(logger *zap.Logger, duty *spectypes.Duty) e
DataSSZ: attDataByts,
}

logger = logger.With(fields.BeaconDataTime(r.metrics.GetBeaconDataTime()))
if err := r.BaseRunner.decide(logger, r, input); err != nil {
return errors.Wrap(err, "can't start new duty runner instance for duty")
}
Expand Down
Loading

0 comments on commit cbe6254

Please sign in to comment.