Skip to content

Commit

Permalink
Normalize backfill logs/errors (#13642)
Browse files Browse the repository at this point in the history
* Normalize backfill logs

* improve flag desc

* review
  • Loading branch information
rkapka authored Feb 22, 2024
1 parent f795e09 commit 7a9608e
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 40 deletions.
1 change: 1 addition & 0 deletions beacon-chain/sync/backfill/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"batch.go",
"batcher.go",
"blobs.go",
"log.go",
"metrics.go",
"pool.go",
"service.go",
Expand Down
12 changes: 6 additions & 6 deletions beacon-chain/sync/backfill/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
log "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus"
)

// ErrChainBroken indicates a backfill batch can't be imported to the db because it is not known to be the ancestor
Expand Down Expand Up @@ -73,9 +73,9 @@ type batch struct {
bs *blobSync
}

func (b batch) logFields() log.Fields {
func (b batch) logFields() logrus.Fields {
return map[string]interface{}{
"batch_id": b.id(),
"batchId": b.id(),
"state": b.state.String(),
"scheduled": b.scheduled.String(),
"seq": b.seq,
Expand Down Expand Up @@ -139,7 +139,7 @@ func (b batch) withResults(results verifiedROBlocks, bs *blobSync) batch {

func (b batch) postBlobSync() batch {
if b.blobsNeeded() > 0 {
log.WithFields(b.logFields()).WithField("blobs_missing", b.blobsNeeded()).Error("batch still missing blobs after downloading from peer")
log.WithFields(b.logFields()).WithField("blobsMissing", b.blobsNeeded()).Error("Batch still missing blobs after downloading from peer")
b.bs = nil
b.results = []blocks.ROBlock{}
return b.withState(batchErrRetryable)
Expand All @@ -153,14 +153,14 @@ func (b batch) withState(s batchState) batch {
switch b.state {
case batchErrRetryable:
b.retries += 1
log.WithFields(b.logFields()).Info("sequencing batch for retry")
log.WithFields(b.logFields()).Info("Sequencing batch for retry")
case batchInit, batchNil:
b.firstScheduled = b.scheduled
}
}
if s == batchImportComplete {
backfillBatchTimeRoundtrip.Observe(float64(time.Since(b.firstScheduled).Milliseconds()))
log.WithFields(b.logFields()).Debug("Backfill batch imported.")
log.WithFields(b.logFields()).Debug("Backfill batch imported")
}
b.state = s
b.seq += 1
Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/sync/backfill/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package backfill

import "github.com/sirupsen/logrus"

var log = logrus.WithField("prefix", "backfill")
1 change: 0 additions & 1 deletion beacon-chain/sync/backfill/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
log "github.com/sirupsen/logrus"
)

type batchWorkerPool interface {
Expand Down
45 changes: 22 additions & 23 deletions beacon-chain/sync/backfill/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/proto/dbval"
"github.com/prysmaticlabs/prysm/v5/runtime"
"github.com/prysmaticlabs/prysm/v5/time/slots"
log "github.com/sirupsen/logrus"
)

type Service struct {
Expand Down Expand Up @@ -149,12 +148,12 @@ func (s *Service) initVerifier(ctx context.Context) (*verifier, sync.ContextByte
}
keys, err := cps.PublicKeys()
if err != nil {
return nil, nil, errors.Wrap(err, "Unable to retrieve public keys for all validators in the origin state")
return nil, nil, errors.Wrap(err, "unable to retrieve public keys for all validators in the origin state")
}
vr := cps.GenesisValidatorsRoot()
ctxMap, err := sync.ContextByteVersionsForValRoot(bytesutil.ToBytes32(vr))
if err != nil {
return nil, nil, errors.Wrapf(err, "unable to initialize context version map using genesis validator root = %#x", vr)
return nil, nil, errors.Wrapf(err, "unable to initialize context version map using genesis validator root %#x", vr)
}
v, err := newBackfillVerifier(vr, keys)
return v, ctxMap, err
Expand All @@ -164,10 +163,10 @@ func (s *Service) updateComplete() bool {
b, err := s.pool.complete()
if err != nil {
if errors.Is(err, errEndSequence) {
log.WithField("backfill_slot", b.begin).Info("Backfill is complete.")
log.WithField("backfillSlot", b.begin).Info("Backfill is complete")
return true
}
log.WithError(err).Error("Backfill service received unhandled error from worker pool.")
log.WithError(err).Error("Backfill service received unhandled error from worker pool")
return true
}
s.batchSeq.update(b)
Expand All @@ -187,11 +186,11 @@ func (s *Service) importBatches(ctx context.Context) {
for i := range importable {
ib := importable[i]
if len(ib.results) == 0 {
log.WithFields(ib.logFields()).Error("Batch with no results, skipping importer.")
log.WithFields(ib.logFields()).Error("Batch with no results, skipping importer")
}
_, err := s.batchImporter(ctx, current, ib, s.store)
if err != nil {
log.WithError(err).WithFields(ib.logFields()).Debug("Backfill batch failed to import.")
log.WithError(err).WithFields(ib.logFields()).Debug("Backfill batch failed to import")
s.downscore(ib)
s.batchSeq.update(ib.withState(batchErrRetryable))
// If a batch fails, the subsequent batches are no longer considered importable.
Expand All @@ -204,8 +203,8 @@ func (s *Service) importBatches(ctx context.Context) {

nt := s.batchSeq.numTodo()
log.WithField("imported", imported).WithField("importable", len(importable)).
WithField("batches_remaining", nt).
Info("Backfill batches processed.")
WithField("batchesRemaining", nt).
Info("Backfill batches processed")

backfillRemainingBatches.Set(float64(nt))
}
Expand All @@ -220,7 +219,7 @@ func (s *Service) scheduleTodos() {
// and then we'll have the parent_root expected by 90 to ensure it matches the root for 89,
// at which point we know we can process [80..90).
if errors.Is(err, errMaxBatches) {
log.Debug("Backfill batches waiting for descendent batch to complete.")
log.Debug("Backfill batches waiting for descendent batch to complete")
return
}
}
Expand All @@ -232,57 +231,57 @@ func (s *Service) scheduleTodos() {
// Start begins the runloop of backfill.Service in the current goroutine.
func (s *Service) Start() {
if !s.enabled {
log.Info("Backfill service not enabled.")
log.Info("Backfill service not enabled")
return
}
ctx, cancel := context.WithCancel(s.ctx)
defer func() {
log.Info("Backfill service is shutting down.")
log.Info("Backfill service is shutting down")
cancel()
}()
clock, err := s.cw.WaitForClock(ctx)
if err != nil {
log.WithError(err).Error("Backfill service failed to start while waiting for genesis data.")
log.WithError(err).Error("Backfill service failed to start while waiting for genesis data")
return
}
s.clock = clock
v, err := s.verifierWaiter.WaitForInitializer(ctx)
s.newBlobVerifier = newBlobVerifierFromInitializer(v)

if err != nil {
log.WithError(err).Error("Could not initialize blob verifier in backfill service.")
log.WithError(err).Error("Could not initialize blob verifier in backfill service")
return
}

if s.store.isGenesisSync() {
log.Info("Backfill short-circuit; node synced from genesis.")
log.Info("Backfill short-circuit; node synced from genesis")
return
}
status := s.store.status()
// Exit early if there aren't going to be any batches to backfill.
if primitives.Slot(status.LowSlot) <= s.ms(s.clock.CurrentSlot()) {
log.WithField("minimum_required_slot", s.ms(s.clock.CurrentSlot())).
WithField("backfill_lowest_slot", status.LowSlot).
Info("Exiting backfill service; minimum block retention slot > lowest backfilled block.")
log.WithField("minimumRequiredSlot", s.ms(s.clock.CurrentSlot())).
WithField("backfillLowestSlot", status.LowSlot).
Info("Exiting backfill service; minimum block retention slot > lowest backfilled block")
return
}
s.verifier, s.ctxMap, err = s.initVerifier(ctx)
if err != nil {
log.WithError(err).Error("Unable to initialize backfill verifier.")
log.WithError(err).Error("Unable to initialize backfill verifier")
return
}

if s.initSyncWaiter != nil {
log.Info("Backfill service waiting for initial-sync to reach head before starting.")
log.Info("Backfill service waiting for initial-sync to reach head before starting")
if err := s.initSyncWaiter(); err != nil {
log.WithError(err).Error("Error waiting for init-sync to complete.")
log.WithError(err).Error("Error waiting for init-sync to complete")
return
}
}
s.pool.spawn(ctx, s.nWorkers, clock, s.pa, s.verifier, s.ctxMap, s.newBlobVerifier, s.blobStore)
s.batchSeq = newBatchSequencer(s.nWorkers, s.ms(s.clock.CurrentSlot()), primitives.Slot(status.LowSlot), primitives.Slot(s.batchSize))
if err = s.initBatches(); err != nil {
log.WithError(err).Error("Non-recoverable error in backfill service.")
log.WithError(err).Error("Non-recoverable error in backfill service")
return
}

Expand All @@ -296,7 +295,7 @@ func (s *Service) Start() {
s.importBatches(ctx)
batchesWaiting.Set(float64(s.batchSeq.countWithState(batchImportable)))
if err := s.batchSeq.moveMinimum(s.ms(s.clock.CurrentSlot())); err != nil {
log.WithError(err).Error("Non-recoverable error while adjusting backfill minimum slot.")
log.WithError(err).Error("Non-recoverable error while adjusting backfill minimum slot")
}
s.scheduleTodos()
}
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/backfill/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/proto/dbval"
)

var errBatchDisconnected = errors.New("Highest block root in backfill batch doesn't match next parent_root")
var errBatchDisconnected = errors.New("highest block root in backfill batch doesn't match next parent_root")

// NewUpdater correctly initializes a StatusUpdater value with the required database value.
func NewUpdater(ctx context.Context, store BeaconDB) (*Store, error) {
Expand Down
9 changes: 4 additions & 5 deletions beacon-chain/sync/backfill/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
log "github.com/sirupsen/logrus"
)

type workerId int
Expand All @@ -31,14 +30,14 @@ func (w *p2pWorker) run(ctx context.Context) {
for {
select {
case b := <-w.todo:
log.WithFields(b.logFields()).WithField("backfill_worker", w.id).Debug("Backfill worker received batch.")
log.WithFields(b.logFields()).WithField("backfillWorker", w.id).Debug("Backfill worker received batch")
if b.state == batchBlobSync {
w.done <- w.handleBlobs(ctx, b)
} else {
w.done <- w.handleBlocks(ctx, b)
}
case <-ctx.Done():
log.WithField("backfill_worker", w.id).Info("Backfill worker exiting after context canceled.")
log.WithField("backfillWorker", w.id).Info("Backfill worker exiting after context canceled")
return
}
}
Expand Down Expand Up @@ -73,7 +72,7 @@ func (w *p2pWorker) handleBlocks(ctx context.Context, b batch) batch {
bdl += vb[i].SizeSSZ()
}
backfillBlocksApproximateBytes.Add(float64(bdl))
log.WithFields(b.logFields()).WithField("dlbytes", bdl).Debug("backfill batch block bytes downloaded")
log.WithFields(b.logFields()).WithField("dlbytes", bdl).Debug("Backfill batch block bytes downloaded")
bs, err := newBlobSync(cs, vb, &blobSyncConfig{retentionStart: blobRetentionStart, nbv: w.nbv, store: w.bfs})
if err != nil {
return b.withRetryableError(err)
Expand All @@ -97,7 +96,7 @@ func (w *p2pWorker) handleBlobs(ctx context.Context, b batch) batch {
// All blobs are the same size, so we can compute 1 and use it for all in the batch.
sz := blobs[0].SizeSSZ() * len(blobs)
backfillBlobsApproximateBytes.Add(float64(sz))
log.WithFields(b.logFields()).WithField("dlbytes", sz).Debug("backfill batch blob bytes downloaded")
log.WithFields(b.logFields()).WithField("dlbytes", sz).Debug("Backfill batch blob bytes downloaded")
}
return b.postBlobSync()
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/beacon-chain/sync/backfill/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ var (
// This flag will be removed onced backfill is enabled by default.
EnableExperimentalBackfill = &cli.BoolFlag{
Name: "enable-experimental-backfill",
Usage: "Backfill is still experimental at this time." +
Usage: "Backfill is still experimental at this time. " +
"It will only be enabled if this flag is specified and the node was started using checkpoint sync.",
}
// BackfillBatchSize allows users to tune block backfill request sizes to maximize network utilization
Expand All @@ -21,7 +21,7 @@ var (
Name: backfillBatchSizeName,
Usage: "Number of blocks per backfill batch. " +
"A larger number will request more blocks at once from peers, but also consume more system memory to " +
"hold batches in memory during processing. This has a multiplicative effect with " + backfillWorkerCountName,
"hold batches in memory during processing. This has a multiplicative effect with " + backfillWorkerCountName + ".",
Value: 64,
}
// BackfillWorkerCount allows users to tune the number of concurrent backfill batches to download, to maximize
Expand All @@ -30,9 +30,9 @@ var (
Name: backfillWorkerCountName,
Usage: "Number of concurrent backfill batch requests. " +
"A larger number will better utilize network resources, up to a system-dependent limit, but will also " +
"consume more system memory to hold batches in memory during processing. Multiply by backfill-batch-size and " +
"consume more system memory to hold batches in memory during processing. Multiply by " + backfillBatchSizeName + " and " +
"average block size (~2MB before deneb) to find the right number for your system. " +
"This has a multiplicatice effect with " + backfillBatchSizeName,
"This has a multiplicative effect with " + backfillBatchSizeName + ".",
Value: 2,
}
)

0 comments on commit 7a9608e

Please sign in to comment.