Skip to content

Commit

Permalink
Improve performance during CheckAttesterDoubleVotes (#8927)
Browse files Browse the repository at this point in the history
* de-normalize attestationDataRootsBucket to improve performace in CheckAttesterDoubleVotes

* parallize processing of every attestation

* fix double vote test case to take care of jumbled double votes due to parallel processing

* remove attestationRecordsBucket totally and take care of pruning

* missed out build file

* fix review comments

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
  • Loading branch information
jmozah and rauljordan authored May 25, 2021
1 parent aa0fb05 commit a958dd2
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 67 deletions.
2 changes: 1 addition & 1 deletion beacon-chain/db/slasherkv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ go_library(
"//proto/beacon/rpc/v1:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/fileutil:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/params:go_default_library",
"@com_github_ferranbt_fastssz//:go_default_library",
"@com_github_golang_snappy//:go_default_library",
Expand All @@ -32,6 +31,7 @@ go_library(
"@com_github_sirupsen_logrus//:go_default_library",
"@io_etcd_go_bbolt//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
],
)

Expand Down
1 change: 0 additions & 1 deletion beacon-chain/db/slasherkv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func NewKVStore(ctx context.Context, dirPath string, config *Config) (*Store, er
tx,
// Slasher buckets.
attestedEpochsByValidator,
attestationRecordsBucket,
attestationDataRootsBucket,
proposalRecordsBucket,
slasherChunksBucket,
Expand Down
6 changes: 1 addition & 5 deletions beacon-chain/db/slasherkv/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,11 @@ func (s *Store) PruneAttestations(
log.Debugf("Pruned %d/%d epochs worth of attestations", epochAtCursor, endPruneEpoch-1)
if err := s.db.Update(func(tx *bolt.Tx) error {
rootsBkt := tx.Bucket(attestationDataRootsBucket)
attsBkt := tx.Bucket(attestationRecordsBucket)
c := rootsBkt.Cursor()

var lastPrunedEpoch, epochsPruned types.Epoch
// We begin a pruning iteration at starting from the first item in the bucket.
for k, v := c.First(); k != nil; k, v = c.Next() {
for k, _ := c.First(); k != nil; k, _ = c.Next() {
// We check the epoch from the current key in the database.
// If we have hit an epoch that is greater than the end epoch of the pruning process,
// we then completely exit the process as we are done.
Expand All @@ -174,9 +173,6 @@ func (s *Store) PruneAttestations(
if err := rootsBkt.Delete(k); err != nil {
return err
}
if err := attsBkt.Delete(v); err != nil {
return err
}
if epochAtCursor == 0 {
epochsPruned = 1
} else if epochAtCursor > lastPrunedEpoch {
Expand Down
1 change: 0 additions & 1 deletion beacon-chain/db/slasherkv/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package slasherkv
var (
// Slasher buckets.
attestedEpochsByValidator = []byte("attested-epochs-by-validator")
attestationRecordsBucket = []byte("attestation-records")
attestationDataRootsBucket = []byte("attestation-data-roots")
proposalRecordsBucket = []byte("proposal-records")
slasherChunksBucket = []byte("slasher-chunks")
Expand Down
120 changes: 61 additions & 59 deletions beacon-chain/db/slasherkv/slasher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/binary"
"fmt"
"sort"
"sync"

ssz "github.com/ferranbt/fastssz"
"github.com/golang/snappy"
Expand All @@ -15,9 +16,9 @@ import (
slashertypes "github.com/prysmaticlabs/prysm/beacon-chain/slasher/types"
slashpb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/hashutil"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
"golang.org/x/sync/errgroup"
)

const (
Expand Down Expand Up @@ -92,43 +93,58 @@ func (s *Store) CheckAttesterDoubleVotes(
ctx, span := trace.StartSpan(ctx, "BeaconDB.CheckAttesterDoubleVotes")
defer span.End()
doubleVotes := make([]*slashertypes.AttesterDoubleVote, 0)
err := s.db.View(func(tx *bolt.Tx) error {
signingRootsBkt := tx.Bucket(attestationDataRootsBucket)
attRecordsBkt := tx.Bucket(attestationRecordsBucket)
for _, att := range attestations {
encEpoch := encodeTargetEpoch(att.IndexedAttestation.Data.Target.Epoch)
for _, valIdx := range att.IndexedAttestation.AttestingIndices {
encIdx := encodeValidatorIndex(types.ValidatorIndex(valIdx))
validatorEpochKey := append(encEpoch, encIdx...)
attRecordsKey := signingRootsBkt.Get(validatorEpochKey)

// An attestation record key is comprised of a signing root (32 bytes)
// and a fast sum hash of the attesting indices (8 bytes).
if len(attRecordsKey) < attestationRecordKeySize {
continue
doubleVotesMu := sync.Mutex{}
eg, egctx := errgroup.WithContext(ctx)
for _, att := range attestations {
// Copy the iteration instance to a local variable to give each go-routine its own copy to play with.
// See https://golang.org/doc/faq#closures_and_goroutines for more details.
attToProcess := att
// process every attestation parallelly.
eg.Go(func() error {
err := s.db.View(func(tx *bolt.Tx) error {
signingRootsBkt := tx.Bucket(attestationDataRootsBucket)
encEpoch := encodeTargetEpoch(attToProcess.IndexedAttestation.Data.Target.Epoch)
localDoubleVotes := make([]*slashertypes.AttesterDoubleVote, 0)
for _, valIdx := range attToProcess.IndexedAttestation.AttestingIndices {
encIdx := encodeValidatorIndex(types.ValidatorIndex(valIdx))
validatorEpochKey := append(encEpoch, encIdx...)
encExistingAttRecord := signingRootsBkt.Get(validatorEpochKey)
if encExistingAttRecord == nil {
continue
}
existingSigningRoot := bytesutil.ToBytes32(encExistingAttRecord[:signingRootSize])
if existingSigningRoot != attToProcess.SigningRoot {
existingAttRecord, err := decodeAttestationRecord(encExistingAttRecord[signingRootSize:])
if err != nil {
return err
}
slashAtt := &slashertypes.AttesterDoubleVote{
ValidatorIndex: types.ValidatorIndex(valIdx),
Target: attToProcess.IndexedAttestation.Data.Target.Epoch,
PrevAttestationWrapper: existingAttRecord,
AttestationWrapper: attToProcess,
}
localDoubleVotes = append(localDoubleVotes, slashAtt)
}
}
encExistingAttRecord := attRecordsBkt.Get(attRecordsKey)
if encExistingAttRecord == nil {
continue
// if any routine is cancelled, then cancel this routine too
select {
case <-egctx.Done():
return egctx.Err()
default:
}
existingSigningRoot := bytesutil.ToBytes32(attRecordsKey[:signingRootSize])
if existingSigningRoot != att.SigningRoot {
existingAttRecord, err := decodeAttestationRecord(encExistingAttRecord)
if err != nil {
return err
}
doubleVotes = append(doubleVotes, &slashertypes.AttesterDoubleVote{
ValidatorIndex: types.ValidatorIndex(valIdx),
Target: att.IndexedAttestation.Data.Target.Epoch,
PrevAttestationWrapper: existingAttRecord,
AttestationWrapper: att,
})
// if there are any doible votes in this attestation, add it to the global double votes
if len(localDoubleVotes) > 0 {
doubleVotesMu.Lock()
defer doubleVotesMu.Unlock()
doubleVotes = append(doubleVotes, localDoubleVotes...)
}
}
}
return nil
})
return doubleVotes, err
return nil
})
return err
})
}
return doubleVotes, eg.Wait()
}

// AttestationRecordForValidator given a validator index and a target epoch,
Expand All @@ -144,16 +160,11 @@ func (s *Store) AttestationRecordForValidator(
key := append(encEpoch, encIdx...)
err := s.db.View(func(tx *bolt.Tx) error {
signingRootsBkt := tx.Bucket(attestationDataRootsBucket)
attRecordKey := signingRootsBkt.Get(key)
if attRecordKey == nil {
return nil
}
attRecordsBkt := tx.Bucket(attestationRecordsBucket)
indexedAttBytes := attRecordsBkt.Get(attRecordKey)
indexedAttBytes := signingRootsBkt.Get(key)
if indexedAttBytes == nil {
return nil
}
decoded, err := decodeAttestationRecord(indexedAttBytes)
decoded, err := decodeAttestationRecord(indexedAttBytes[signingRootSize:])
if err != nil {
return err
}
Expand Down Expand Up @@ -189,23 +200,15 @@ func (s *Store) SaveAttestationRecordsForValidators(
encodedRecords[i] = value
}
return s.db.Update(func(tx *bolt.Tx) error {
attRecordsBkt := tx.Bucket(attestationRecordsBucket)
signingRootsBkt := tx.Bucket(attestationDataRootsBucket)
for i, att := range attestations {
// An attestation record key is comprised of the signing root (32 bytes)
// and a fastsum64 of the attesting indices (8 bytes). This is used
// to have a more optimal schema
attIndicesHash := hashutil.FastSum64(encodedIndices[i])
attRecordKey := append(
att.SigningRoot[:], ssz.MarshalUint64(make([]byte, 0), attIndicesHash)...,
)
if err := attRecordsBkt.Put(attRecordKey, encodedRecords[i]); err != nil {
return err
}
for _, valIdx := range att.IndexedAttestation.AttestingIndices {
encIdx := encodeValidatorIndex(types.ValidatorIndex(valIdx))
key := append(encodedTargetEpoch[i], encIdx...)
if err := signingRootsBkt.Put(key, attRecordKey); err != nil {
// signature is prefixed so that double votest can be checked without decoding attestations
// and save some cpu cycles
value := append(att.SigningRoot[:], encodedRecords[i]...)
if err := signingRootsBkt.Put(key, value); err != nil {
return err
}
}
Expand Down Expand Up @@ -361,18 +364,17 @@ func (s *Store) HighestAttestations(
history := make([]*slashpb.HighestAttestation, 0, len(encodedIndices))
err = s.db.View(func(tx *bolt.Tx) error {
signingRootsBkt := tx.Bucket(attestationDataRootsBucket)
attRecordsBkt := tx.Bucket(attestationRecordsBucket)
for i := 0; i < len(encodedIndices); i++ {
c := signingRootsBkt.Cursor()
for k, v := c.Last(); k != nil; k, v = c.Prev() {
if suffixForAttestationRecordsKey(k, encodedIndices[i]) {
encodedAttRecord := attRecordsBkt.Get(v)
encodedAttRecord := v[signingRootSize:]
if encodedAttRecord == nil {
continue
}
attWrapper, err := decodeAttestationRecord(encodedAttRecord)
if err != nil {
return err
attWrapper, decodeErr := decodeAttestationRecord(encodedAttRecord)
if decodeErr != nil {
return decodeErr
}
highestAtt := &slashpb.HighestAttestation{
ValidatorIndex: uint64(indices[i]),
Expand Down
40 changes: 40 additions & 0 deletions beacon-chain/db/slasherkv/slasher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package slasherkv
import (
"context"
"encoding/binary"
"math/rand"
"reflect"
"sort"
"testing"
"time"

ssz "github.com/ferranbt/fastssz"
types "github.com/prysmaticlabs/eth2-types"
Expand Down Expand Up @@ -110,6 +113,10 @@ func TestStore_CheckAttesterDoubleVotes(t *testing.T) {
}
doubleVotes, err := beaconDB.CheckAttesterDoubleVotes(ctx, slashableAtts)
require.NoError(t, err)

sort.SliceStable(doubleVotes, func(i, j int) bool {
return uint64(doubleVotes[i].ValidatorIndex) < uint64(doubleVotes[j].ValidatorIndex)
})
require.DeepEqual(t, wanted, doubleVotes)
}

Expand Down Expand Up @@ -442,6 +449,39 @@ func BenchmarkHighestAttestations(b *testing.B) {
}
}

func BenchmarkStore_CheckDoubleBlockProposals(b *testing.B) {
b.StopTimer()
count := 10000
valsPerAtt := 100
indicesPerAtt := make([][]uint64, count)
for i := 0; i < count; i++ {
indicesForAtt := make([]uint64, valsPerAtt)
for r := i * count; r < valsPerAtt*(i+1); r++ {
indicesForAtt[i] = uint64(r)
}
indicesPerAtt[i] = indicesForAtt
}
atts := make([]*slashertypes.IndexedAttestationWrapper, count)
for i := 0; i < count; i++ {
atts[i] = createAttestationWrapper(types.Epoch(i), types.Epoch(i+2), indicesPerAtt[i], []byte{})
}

ctx := context.Background()
beaconDB := setupDB(b)
require.NoError(b, beaconDB.SaveAttestationRecordsForValidators(ctx, atts))

// shuffle attestations
rand.Seed(time.Now().UnixNano())
rand.Shuffle(count, func(i, j int) { atts[i], atts[j] = atts[j], atts[i] })

b.ReportAllocs()
b.StartTimer()
for i := 0; i < b.N; i++ {
_, err := beaconDB.CheckAttesterDoubleVotes(ctx, atts)
require.NoError(b, err)
}
}

func createProposalWrapper(t *testing.T, slot types.Slot, proposerIndex types.ValidatorIndex, signingRoot []byte) *slashertypes.SignedBlockHeaderWrapper {
header := &ethpb.BeaconBlockHeader{
Slot: slot,
Expand Down

0 comments on commit a958dd2

Please sign in to comment.