Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validator: Safer pending attestation records flushing #8433

Merged
merged 16 commits into from
Feb 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions validator/db/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
visibility = ["//validator:__subpackages__"],
deps = [
"//beacon-chain/core/helpers:go_default_library",
"//shared/abool:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/event:go_default_library",
"//shared/fileutil:go_default_library",
Expand Down
87 changes: 76 additions & 11 deletions validator/db/kv/attester_protection.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kv
import (
"context"
"fmt"
"sync"
"time"

"github.com/pkg/errors"
Expand All @@ -26,6 +27,45 @@ type AttestationRecord struct {
SigningRoot [32]byte
}

// NewQueuedAttestationRecords constructor allocates the underlying slice and
// required attributes for managing pending attestation records.
func NewQueuedAttestationRecords() *QueuedAttestationRecords {
return &QueuedAttestationRecords{
records: make([]*AttestationRecord, 0, attestationBatchCapacity),
}
}

// QueuedAttestationRecords is a thread-safe struct for managing a queue of
// attestation records to save to validator database.
type QueuedAttestationRecords struct {
records []*AttestationRecord
lock sync.RWMutex
}

// Append a new attestation record to the queue.
func (p *QueuedAttestationRecords) Append(ar *AttestationRecord) {
p.lock.Lock()
defer p.lock.Unlock()
p.records = append(p.records, ar)
}

// Flush all records. This method returns the current pending records and resets
// the pending records slice.
func (p *QueuedAttestationRecords) Flush() []*AttestationRecord {
p.lock.Lock()
defer p.lock.Unlock()
recs := p.records
p.records = make([]*AttestationRecord, 0, attestationBatchCapacity)
return recs
}

// Len returns the current length of records.
func (p *QueuedAttestationRecords) Len() int {
p.lock.RLock()
defer p.lock.RUnlock()
return len(p.records)
}

// A wrapper over an error received from a background routine
// saving batched attestations for slashing protection.
// This wrapper allows us to send this response over event feeds,
Expand Down Expand Up @@ -97,6 +137,9 @@ func (s *Store) CheckSlashableAttestation(
defer span.End()
var slashKind SlashingKind
err := s.view(func(tx *bolt.Tx) error {
if ctx.Err() != nil {
return ctx.Err()
}
bucket := tx.Bucket(pubKeysBucket)
pkBucket := bucket.Bucket(pubKey[:])
if pkBucket == nil {
Expand Down Expand Up @@ -124,6 +167,10 @@ func (s *Store) CheckSlashableAttestation(
}
// Check for surround votes.
return sourceEpochsBucket.ForEach(func(sourceEpochBytes []byte, targetEpochsBytes []byte) error {
if ctx.Err() != nil {
return ctx.Err()
}

existingSourceEpoch := bytesutil.BytesToEpochBigEndian(sourceEpochBytes)

// There can be multiple target epochs attested per source epoch.
Expand Down Expand Up @@ -234,19 +281,23 @@ func (s *Store) batchAttestationWrites(ctx context.Context) {
for {
select {
case v := <-s.batchedAttestationsChan:
s.batchedAttestations = append(s.batchedAttestations, v)
if len(s.batchedAttestations) == attestationBatchCapacity {
log.WithField("numRecords", attestationBatchCapacity).Debug(
s.batchedAttestations.Append(v)
if numRecords := s.batchedAttestations.Len(); numRecords >= attestationBatchCapacity {
log.WithField("numRecords", numRecords).Debug(
"Reached max capacity of batched attestation records, flushing to DB",
)
s.flushAttestationRecords(ctx)
if s.batchedAttestationsFlushInProgress.IsNotSet() {
s.flushAttestationRecords(ctx, s.batchedAttestations.Flush())
}
}
case <-ticker.C:
if len(s.batchedAttestations) > 0 {
log.WithField("numRecords", len(s.batchedAttestations)).Debug(
if numRecords := s.batchedAttestations.Len(); numRecords > 0 {
log.WithField("numRecords", numRecords).Debug(
"Batched attestation records write interval reached, flushing to DB",
)
s.flushAttestationRecords(ctx)
if s.batchedAttestationsFlushInProgress.IsNotSet() {
s.flushAttestationRecords(ctx, s.batchedAttestations.Flush())
}
}
case <-ctx.Done():
return
Expand All @@ -258,13 +309,27 @@ func (s *Store) batchAttestationWrites(ctx context.Context) {
// and resets the list of batched attestations for future writes.
// This function notifies all subscribers for flushed attestations
// of the result of the save operation.
func (s *Store) flushAttestationRecords(ctx context.Context) {
func (s *Store) flushAttestationRecords(ctx context.Context, records []*AttestationRecord) {
if s.batchedAttestationsFlushInProgress.IsSet() {
// This should never happen. This method should not be called when a flush is already in
// progress. If you are seeing this log, check the atomic bool before calling this method.
log.Error("Attempted to flush attestation records when already in progress")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be at error level? maybe debug is appropriate. We should only have error logs when they are actionable to the user, and this is opaque from a user's perspective

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should never ever happen. If it is happening, then it's a development bug that was recently introduced. I will add a comment to reflect the significance

return
}
s.batchedAttestationsFlushInProgress.Set()
defer s.batchedAttestationsFlushInProgress.UnSet()

start := time.Now()
err := s.saveAttestationRecords(ctx, s.batchedAttestations)
// If there was no error, we reset the batched attestations slice.
err := s.saveAttestationRecords(ctx, records)
// If there was any error, retry the records since the TX would have been reverted.
if err == nil {
log.WithField("duration", time.Since(start)).Debug("Successfully flushed batched attestations to DB")
s.batchedAttestations = make([]*AttestationRecord, 0, attestationBatchCapacity)
} else {
// This should never happen.
log.WithError(err).Error("Failed to batch save attestation records, retrying in queue")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same ^

for _, ar := range records {
s.batchedAttestations.Append(ar)
}
}
// Forward the error, if any, to all subscribers via an event feed.
// We use a struct wrapper around the error as the event feed
Expand Down
40 changes: 37 additions & 3 deletions validator/db/kv/attester_protection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"sync"
"testing"

"github.com/prysmaticlabs/eth2-types"
types "github.com/prysmaticlabs/eth2-types"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
Expand All @@ -17,6 +17,31 @@ import (
bolt "go.etcd.io/bbolt"
)

func TestPendingAttestationRecords_Flush(t *testing.T) {
queue := NewQueuedAttestationRecords()

// Add 5 atts
num := 5
for i := 0; i < num; i++ {
queue.Append(&AttestationRecord{
Target: types.Epoch(i),
})
}

res := queue.Flush()
assert.Equal(t, len(res), num, "Wrong number of flushed attestations")
assert.Equal(t, len(queue.records), 0, "Records were not cleared/flushed")
}

func TestPendingAttestationRecords_Len(t *testing.T) {
queue := NewQueuedAttestationRecords()
assert.Equal(t, queue.Len(), 0)
queue.Append(&AttestationRecord{})
assert.Equal(t, queue.Len(), 1)
queue.Flush()
assert.Equal(t, queue.Len(), 0)
}

func TestStore_CheckSlashableAttestation_DoubleVote(t *testing.T) {
ctx := context.Background()
numValidators := 1
Expand Down Expand Up @@ -372,7 +397,7 @@ func TestSaveAttestationForPubKey_BatchWrites_FullCapacity(t *testing.T) {
require.LogsContain(t, hook, "Reached max capacity of batched attestation records")
require.LogsDoNotContain(t, hook, "Batched attestation records write interval reached")
require.LogsContain(t, hook, "Successfully flushed batched attestations to DB")
require.Equal(t, 0, len(validatorDB.batchedAttestations))
require.Equal(t, 0, validatorDB.batchedAttestations.Len())

// We then verify all the data we wanted to save is indeed saved to disk.
err := validatorDB.view(func(tx *bolt.Tx) error {
Expand Down Expand Up @@ -429,7 +454,7 @@ func TestSaveAttestationForPubKey_BatchWrites_LowCapacity_TimerReached(t *testin
require.LogsDoNotContain(t, hook, "Reached max capacity of batched attestation records")
require.LogsContain(t, hook, "Batched attestation records write interval reached")
require.LogsContain(t, hook, "Successfully flushed batched attestations to DB")
require.Equal(t, 0, len(validatorDB.batchedAttestations))
require.Equal(t, 0, validatorDB.batchedAttestations.Len())

// We then verify all the data we wanted to save is indeed saved to disk.
err := validatorDB.view(func(tx *bolt.Tx) error {
Expand Down Expand Up @@ -540,3 +565,12 @@ func createAttestation(source, target types.Epoch) *ethpb.IndexedAttestation {
},
}
}

func TestStore_flushAttestationRecords_InProgress(t *testing.T) {
s := &Store{}
s.batchedAttestationsFlushInProgress.Set()

hook := logTest.NewGlobal()
s.flushAttestationRecords(context.Background(), nil)
assert.LogsContain(t, hook, "Attempted to flush attestation records when already in progress")
}
14 changes: 8 additions & 6 deletions validator/db/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
prombolt "github.com/prysmaticlabs/prombbolt"
"github.com/prysmaticlabs/prysm/shared/abool"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/fileutil"
"github.com/prysmaticlabs/prysm/shared/params"
Expand Down Expand Up @@ -50,11 +51,12 @@ var blockedBuckets = [][]byte{
// Store defines an implementation of the Prysm Database interface
// using BoltDB as the underlying persistent kv-store for eth2.
type Store struct {
db *bolt.DB
databasePath string
batchedAttestations []*AttestationRecord
batchedAttestationsChan chan *AttestationRecord
batchAttestationsFlushedFeed *event.Feed
db *bolt.DB
databasePath string
batchedAttestations *QueuedAttestationRecords
batchedAttestationsChan chan *AttestationRecord
batchAttestationsFlushedFeed *event.Feed
batchedAttestationsFlushInProgress abool.AtomicBool
}

// Close closes the underlying boltdb database.
Expand Down Expand Up @@ -118,7 +120,7 @@ func NewKVStore(ctx context.Context, dirPath string, pubKeys [][48]byte) (*Store
kv := &Store{
db: boltDB,
databasePath: dirPath,
batchedAttestations: make([]*AttestationRecord, 0, attestationBatchCapacity),
batchedAttestations: NewQueuedAttestationRecords(),
batchedAttestationsChan: make(chan *AttestationRecord, attestationBatchCapacity),
batchAttestationsFlushedFeed: new(event.Feed),
}
Expand Down