From 1eb34c6814d99d60acd328c69bf8fe685e166081 Mon Sep 17 00:00:00 2001 From: Zahoor Mohamed Date: Sun, 23 May 2021 21:16:54 +0530 Subject: [PATCH 1/6] de-normalize attestationDataRootsBucket to improve performace in CheckAttesterDoubleVotes --- beacon-chain/db/slasherkv/slasher.go | 35 +++++++---------------- beacon-chain/db/slasherkv/slasher_test.go | 35 +++++++++++++++++++++++ 2 files changed, 46 insertions(+), 24 deletions(-) diff --git a/beacon-chain/db/slasherkv/slasher.go b/beacon-chain/db/slasherkv/slasher.go index d7b5eb47a0bf..c90bed64d803 100644 --- a/beacon-chain/db/slasherkv/slasher.go +++ b/beacon-chain/db/slasherkv/slasher.go @@ -94,26 +94,18 @@ func (s *Store) CheckAttesterDoubleVotes( doubleVotes := make([]*slashertypes.AttesterDoubleVote, 0) err := s.db.View(func(tx *bolt.Tx) error { signingRootsBkt := tx.Bucket(attestationDataRootsBucket) - attRecordsBkt := tx.Bucket(attestationRecordsBucket) for _, att := range attestations { encEpoch := encodeTargetEpoch(att.IndexedAttestation.Data.Target.Epoch) for _, valIdx := range att.IndexedAttestation.AttestingIndices { encIdx := encodeValidatorIndex(types.ValidatorIndex(valIdx)) validatorEpochKey := append(encEpoch, encIdx...) - attRecordsKey := signingRootsBkt.Get(validatorEpochKey) - - // An attestation record key is comprised of a signing root (32 bytes) - // and a fast sum hash of the attesting indices (8 bytes). - if len(attRecordsKey) < attestationRecordKeySize { - continue - } - encExistingAttRecord := attRecordsBkt.Get(attRecordsKey) + encExistingAttRecord := signingRootsBkt.Get(validatorEpochKey) if encExistingAttRecord == nil { continue } - existingSigningRoot := bytesutil.ToBytes32(attRecordsKey[:signingRootSize]) + existingSigningRoot := bytesutil.ToBytes32(encExistingAttRecord[:signingRootSize]) if existingSigningRoot != att.SigningRoot { - existingAttRecord, err := decodeAttestationRecord(encExistingAttRecord) + existingAttRecord, err := decodeAttestationRecord(encExistingAttRecord[signingRootSize:]) if err != nil { return err } @@ -144,16 +136,11 @@ func (s *Store) AttestationRecordForValidator( key := append(encEpoch, encIdx...) err := s.db.View(func(tx *bolt.Tx) error { signingRootsBkt := tx.Bucket(attestationDataRootsBucket) - attRecordKey := signingRootsBkt.Get(key) - if attRecordKey == nil { - return nil - } - attRecordsBkt := tx.Bucket(attestationRecordsBucket) - indexedAttBytes := attRecordsBkt.Get(attRecordKey) + indexedAttBytes := signingRootsBkt.Get(key) if indexedAttBytes == nil { return nil } - decoded, err := decodeAttestationRecord(indexedAttBytes) + decoded, err := decodeAttestationRecord(indexedAttBytes[signingRootSize:]) if err != nil { return err } @@ -205,7 +192,8 @@ func (s *Store) SaveAttestationRecordsForValidators( for _, valIdx := range att.IndexedAttestation.AttestingIndices { encIdx := encodeValidatorIndex(types.ValidatorIndex(valIdx)) key := append(encodedTargetEpoch[i], encIdx...) - if err := signingRootsBkt.Put(key, attRecordKey); err != nil { + value := append(att.SigningRoot[:], encodedRecords[i]...) + if err := signingRootsBkt.Put(key, value); err != nil { return err } } @@ -361,18 +349,17 @@ func (s *Store) HighestAttestations( history := make([]*slashpb.HighestAttestation, 0, len(encodedIndices)) err = s.db.View(func(tx *bolt.Tx) error { signingRootsBkt := tx.Bucket(attestationDataRootsBucket) - attRecordsBkt := tx.Bucket(attestationRecordsBucket) for i := 0; i < len(encodedIndices); i++ { c := signingRootsBkt.Cursor() for k, v := c.Last(); k != nil; k, v = c.Prev() { if suffixForAttestationRecordsKey(k, encodedIndices[i]) { - encodedAttRecord := attRecordsBkt.Get(v) + encodedAttRecord := v[signingRootSize:] if encodedAttRecord == nil { continue } - attWrapper, err := decodeAttestationRecord(encodedAttRecord) - if err != nil { - return err + attWrapper, err1 := decodeAttestationRecord(encodedAttRecord) + if err1 != nil { + return err1 } highestAtt := &slashpb.HighestAttestation{ ValidatorIndex: uint64(indices[i]), diff --git a/beacon-chain/db/slasherkv/slasher_test.go b/beacon-chain/db/slasherkv/slasher_test.go index aef779519e9f..a945d95fcb2e 100644 --- a/beacon-chain/db/slasherkv/slasher_test.go +++ b/beacon-chain/db/slasherkv/slasher_test.go @@ -3,8 +3,10 @@ package slasherkv import ( "context" "encoding/binary" + "math/rand" "reflect" "testing" + "time" ssz "github.com/ferranbt/fastssz" types "github.com/prysmaticlabs/eth2-types" @@ -442,6 +444,39 @@ func BenchmarkHighestAttestations(b *testing.B) { } } +func BenchmarkStore_CheckDoubleBlockProposals(b *testing.B) { + b.StopTimer() + count := 10000 + valsPerAtt := 100 + indicesPerAtt := make([][]uint64, count) + for i := 0; i < count; i++ { + indicesForAtt := make([]uint64, valsPerAtt) + for r := i * count; r < valsPerAtt*(i+1); r++ { + indicesForAtt[i] = uint64(r) + } + indicesPerAtt[i] = indicesForAtt + } + atts := make([]*slashertypes.IndexedAttestationWrapper, count) + for i := 0; i < count; i++ { + atts[i] = createAttestationWrapper(types.Epoch(i), types.Epoch(i+2), indicesPerAtt[i], []byte{}) + } + + ctx := context.Background() + beaconDB := setupDB(b) + require.NoError(b, beaconDB.SaveAttestationRecordsForValidators(ctx, atts)) + + // shuffle attestations + rand.Seed(time.Now().UnixNano()) + rand.Shuffle(count, func(i, j int) { atts[i], atts[j] = atts[j], atts[i] }) + + b.ReportAllocs() + b.StartTimer() + for i := 0; i < b.N; i++ { + _, err := beaconDB.CheckAttesterDoubleVotes(ctx, atts) + require.NoError(b, err) + } +} + func createProposalWrapper(t *testing.T, slot types.Slot, proposerIndex types.ValidatorIndex, signingRoot []byte) *slashertypes.SignedBlockHeaderWrapper { header := ðpb.BeaconBlockHeader{ Slot: slot, From 275bbebdfd19161a986303dda83936078c40b37e Mon Sep 17 00:00:00 2001 From: Zahoor Mohamed Date: Sun, 23 May 2021 23:53:06 +0530 Subject: [PATCH 2/6] parallize processing of every attestation --- beacon-chain/db/slasherkv/slasher.go | 78 ++++++++++++++++++---------- 1 file changed, 51 insertions(+), 27 deletions(-) diff --git a/beacon-chain/db/slasherkv/slasher.go b/beacon-chain/db/slasherkv/slasher.go index c90bed64d803..bbe1d7bac4e2 100644 --- a/beacon-chain/db/slasherkv/slasher.go +++ b/beacon-chain/db/slasherkv/slasher.go @@ -5,7 +5,9 @@ import ( "context" "encoding/binary" "fmt" + "golang.org/x/sync/errgroup" "sort" + "sync" ssz "github.com/ferranbt/fastssz" "github.com/golang/snappy" @@ -92,35 +94,55 @@ func (s *Store) CheckAttesterDoubleVotes( ctx, span := trace.StartSpan(ctx, "BeaconDB.CheckAttesterDoubleVotes") defer span.End() doubleVotes := make([]*slashertypes.AttesterDoubleVote, 0) - err := s.db.View(func(tx *bolt.Tx) error { - signingRootsBkt := tx.Bucket(attestationDataRootsBucket) - for _, att := range attestations { - encEpoch := encodeTargetEpoch(att.IndexedAttestation.Data.Target.Epoch) - for _, valIdx := range att.IndexedAttestation.AttestingIndices { - encIdx := encodeValidatorIndex(types.ValidatorIndex(valIdx)) - validatorEpochKey := append(encEpoch, encIdx...) - encExistingAttRecord := signingRootsBkt.Get(validatorEpochKey) - if encExistingAttRecord == nil { - continue - } - existingSigningRoot := bytesutil.ToBytes32(encExistingAttRecord[:signingRootSize]) - if existingSigningRoot != att.SigningRoot { - existingAttRecord, err := decodeAttestationRecord(encExistingAttRecord[signingRootSize:]) - if err != nil { - return err + doubleVotesMu := sync.Mutex{} + eg, egctx := errgroup.WithContext(ctx) + for _, att := range attestations { + attToProcess := att // https://golang.org/doc/faq#closures_and_goroutines + eg.Go(func() error { // process every attestation parallely + err := s.db.View(func(tx *bolt.Tx) error { + signingRootsBkt := tx.Bucket(attestationDataRootsBucket) + encEpoch := encodeTargetEpoch(attToProcess.IndexedAttestation.Data.Target.Epoch) + localDoubleVotes := make([]*slashertypes.AttesterDoubleVote, 0) + for _, valIdx := range attToProcess.IndexedAttestation.AttestingIndices { + encIdx := encodeValidatorIndex(types.ValidatorIndex(valIdx)) + validatorEpochKey := append(encEpoch, encIdx...) + encExistingAttRecord := signingRootsBkt.Get(validatorEpochKey) + if encExistingAttRecord == nil { + continue + } + existingSigningRoot := bytesutil.ToBytes32(encExistingAttRecord[:signingRootSize]) + if existingSigningRoot != attToProcess.SigningRoot { + existingAttRecord, err := decodeAttestationRecord(encExistingAttRecord[signingRootSize:]) + if err != nil { + return err + } + slashAtt := &slashertypes.AttesterDoubleVote{ + ValidatorIndex: types.ValidatorIndex(valIdx), + Target: attToProcess.IndexedAttestation.Data.Target.Epoch, + PrevAttestationWrapper: existingAttRecord, + AttestationWrapper: attToProcess, + } + localDoubleVotes = append(localDoubleVotes, slashAtt) } - doubleVotes = append(doubleVotes, &slashertypes.AttesterDoubleVote{ - ValidatorIndex: types.ValidatorIndex(valIdx), - Target: att.IndexedAttestation.Data.Target.Epoch, - PrevAttestationWrapper: existingAttRecord, - AttestationWrapper: att, - }) } - } - } - return nil - }) - return doubleVotes, err + // if any routine is cancelled, then cancel this routine too + select { + case <-egctx.Done(): + return egctx.Err() + default: + } + // if there are any doible votes in this attestation, add it to the global double votes + if len(localDoubleVotes) > 0 { + doubleVotesMu.Lock() + defer doubleVotesMu.Unlock() + doubleVotes = append(doubleVotes, localDoubleVotes...) + } + return nil + }) + return err + }) + } + return doubleVotes, eg.Wait() } // AttestationRecordForValidator given a validator index and a target epoch, @@ -192,6 +214,8 @@ func (s *Store) SaveAttestationRecordsForValidators( for _, valIdx := range att.IndexedAttestation.AttestingIndices { encIdx := encodeValidatorIndex(types.ValidatorIndex(valIdx)) key := append(encodedTargetEpoch[i], encIdx...) + // signature is prefixed so that double votest can be checked without decoding attestations + // and save some cpu cycles value := append(att.SigningRoot[:], encodedRecords[i]...) if err := signingRootsBkt.Put(key, value); err != nil { return err From 41d8c5dbf4a23d61101eab27393848c748fe427e Mon Sep 17 00:00:00 2001 From: Zahoor Mohamed Date: Mon, 24 May 2021 12:32:12 +0530 Subject: [PATCH 3/6] fix double vote test case to take care of jumbled double votes due to parallel processing --- beacon-chain/db/slasherkv/slasher_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/beacon-chain/db/slasherkv/slasher_test.go b/beacon-chain/db/slasherkv/slasher_test.go index a945d95fcb2e..8e40135041eb 100644 --- a/beacon-chain/db/slasherkv/slasher_test.go +++ b/beacon-chain/db/slasherkv/slasher_test.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "math/rand" "reflect" + "sort" "testing" "time" @@ -112,6 +113,10 @@ func TestStore_CheckAttesterDoubleVotes(t *testing.T) { } doubleVotes, err := beaconDB.CheckAttesterDoubleVotes(ctx, slashableAtts) require.NoError(t, err) + + sort.SliceStable(doubleVotes, func(i, j int) bool { + return uint64(doubleVotes[i].ValidatorIndex) < uint64(doubleVotes[j].ValidatorIndex) + }) require.DeepEqual(t, wanted, doubleVotes) } From 982862f35d55e26f5f84f8701153e23da04d09aa Mon Sep 17 00:00:00 2001 From: Zahoor Mohamed Date: Mon, 24 May 2021 16:02:18 +0530 Subject: [PATCH 4/6] remove attestationRecordsBucket totally and take care of pruning --- beacon-chain/db/slasherkv/kv.go | 1 - beacon-chain/db/slasherkv/pruning.go | 6 +----- beacon-chain/db/slasherkv/schema.go | 1 - beacon-chain/db/slasherkv/slasher.go | 14 +------------- 4 files changed, 2 insertions(+), 20 deletions(-) diff --git a/beacon-chain/db/slasherkv/kv.go b/beacon-chain/db/slasherkv/kv.go index e660c605d066..a5b0d4a1d48f 100644 --- a/beacon-chain/db/slasherkv/kv.go +++ b/beacon-chain/db/slasherkv/kv.go @@ -76,7 +76,6 @@ func NewKVStore(ctx context.Context, dirPath string, config *Config) (*Store, er tx, // Slasher buckets. attestedEpochsByValidator, - attestationRecordsBucket, attestationDataRootsBucket, proposalRecordsBucket, slasherChunksBucket, diff --git a/beacon-chain/db/slasherkv/pruning.go b/beacon-chain/db/slasherkv/pruning.go index 8d4ea262fd1e..cddcb52d551c 100644 --- a/beacon-chain/db/slasherkv/pruning.go +++ b/beacon-chain/db/slasherkv/pruning.go @@ -149,12 +149,11 @@ func (s *Store) PruneAttestations( log.Debugf("Pruned %d/%d epochs worth of attestations", epochAtCursor, endPruneEpoch-1) if err := s.db.Update(func(tx *bolt.Tx) error { rootsBkt := tx.Bucket(attestationDataRootsBucket) - attsBkt := tx.Bucket(attestationRecordsBucket) c := rootsBkt.Cursor() var lastPrunedEpoch, epochsPruned types.Epoch // We begin a pruning iteration at starting from the first item in the bucket. - for k, v := c.First(); k != nil; k, v = c.Next() { + for k, _ := c.First(); k != nil; k, _ = c.Next() { // We check the epoch from the current key in the database. // If we have hit an epoch that is greater than the end epoch of the pruning process, // we then completely exit the process as we are done. @@ -174,9 +173,6 @@ func (s *Store) PruneAttestations( if err := rootsBkt.Delete(k); err != nil { return err } - if err := attsBkt.Delete(v); err != nil { - return err - } if epochAtCursor == 0 { epochsPruned = 1 } else if epochAtCursor > lastPrunedEpoch { diff --git a/beacon-chain/db/slasherkv/schema.go b/beacon-chain/db/slasherkv/schema.go index e4df7a603c09..8c52981bcd16 100644 --- a/beacon-chain/db/slasherkv/schema.go +++ b/beacon-chain/db/slasherkv/schema.go @@ -9,7 +9,6 @@ package slasherkv var ( // Slasher buckets. attestedEpochsByValidator = []byte("attested-epochs-by-validator") - attestationRecordsBucket = []byte("attestation-records") attestationDataRootsBucket = []byte("attestation-data-roots") proposalRecordsBucket = []byte("proposal-records") slasherChunksBucket = []byte("slasher-chunks") diff --git a/beacon-chain/db/slasherkv/slasher.go b/beacon-chain/db/slasherkv/slasher.go index bbe1d7bac4e2..471d9a466048 100644 --- a/beacon-chain/db/slasherkv/slasher.go +++ b/beacon-chain/db/slasherkv/slasher.go @@ -5,7 +5,6 @@ import ( "context" "encoding/binary" "fmt" - "golang.org/x/sync/errgroup" "sort" "sync" @@ -17,9 +16,9 @@ import ( slashertypes "github.com/prysmaticlabs/prysm/beacon-chain/slasher/types" slashpb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1" "github.com/prysmaticlabs/prysm/shared/bytesutil" - "github.com/prysmaticlabs/prysm/shared/hashutil" bolt "go.etcd.io/bbolt" "go.opencensus.io/trace" + "golang.org/x/sync/errgroup" ) const ( @@ -198,19 +197,8 @@ func (s *Store) SaveAttestationRecordsForValidators( encodedRecords[i] = value } return s.db.Update(func(tx *bolt.Tx) error { - attRecordsBkt := tx.Bucket(attestationRecordsBucket) signingRootsBkt := tx.Bucket(attestationDataRootsBucket) for i, att := range attestations { - // An attestation record key is comprised of the signing root (32 bytes) - // and a fastsum64 of the attesting indices (8 bytes). This is used - // to have a more optimal schema - attIndicesHash := hashutil.FastSum64(encodedIndices[i]) - attRecordKey := append( - att.SigningRoot[:], ssz.MarshalUint64(make([]byte, 0), attIndicesHash)..., - ) - if err := attRecordsBkt.Put(attRecordKey, encodedRecords[i]); err != nil { - return err - } for _, valIdx := range att.IndexedAttestation.AttestingIndices { encIdx := encodeValidatorIndex(types.ValidatorIndex(valIdx)) key := append(encodedTargetEpoch[i], encIdx...) From 98c2892ccd1f6c90f73bb296d97d59cdf5459d2f Mon Sep 17 00:00:00 2001 From: Zahoor Mohamed Date: Mon, 24 May 2021 16:35:28 +0530 Subject: [PATCH 5/6] missed out build file --- beacon-chain/db/slasherkv/BUILD.bazel | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon-chain/db/slasherkv/BUILD.bazel b/beacon-chain/db/slasherkv/BUILD.bazel index 5b3753154adf..ce3cb4e66d76 100644 --- a/beacon-chain/db/slasherkv/BUILD.bazel +++ b/beacon-chain/db/slasherkv/BUILD.bazel @@ -20,7 +20,6 @@ go_library( "//proto/beacon/rpc/v1:go_default_library", "//shared/bytesutil:go_default_library", "//shared/fileutil:go_default_library", - "//shared/hashutil:go_default_library", "//shared/params:go_default_library", "@com_github_ferranbt_fastssz//:go_default_library", "@com_github_golang_snappy//:go_default_library", @@ -32,6 +31,7 @@ go_library( "@com_github_sirupsen_logrus//:go_default_library", "@io_etcd_go_bbolt//:go_default_library", "@io_opencensus_go//trace:go_default_library", + "@org_golang_x_sync//errgroup:go_default_library", ], ) From 754400dda98fd28476a5384cb28821c6db1b91f5 Mon Sep 17 00:00:00 2001 From: Zahoor Mohamed Date: Tue, 25 May 2021 00:45:32 +0530 Subject: [PATCH 6/6] fix review comments --- beacon-chain/db/slasherkv/slasher.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/beacon-chain/db/slasherkv/slasher.go b/beacon-chain/db/slasherkv/slasher.go index 471d9a466048..ac5436cfbbe1 100644 --- a/beacon-chain/db/slasherkv/slasher.go +++ b/beacon-chain/db/slasherkv/slasher.go @@ -96,8 +96,11 @@ func (s *Store) CheckAttesterDoubleVotes( doubleVotesMu := sync.Mutex{} eg, egctx := errgroup.WithContext(ctx) for _, att := range attestations { - attToProcess := att // https://golang.org/doc/faq#closures_and_goroutines - eg.Go(func() error { // process every attestation parallely + // Copy the iteration instance to a local variable to give each go-routine its own copy to play with. + // See https://golang.org/doc/faq#closures_and_goroutines for more details. + attToProcess := att + // process every attestation parallelly. + eg.Go(func() error { err := s.db.View(func(tx *bolt.Tx) error { signingRootsBkt := tx.Bucket(attestationDataRootsBucket) encEpoch := encodeTargetEpoch(attToProcess.IndexedAttestation.Data.Target.Epoch) @@ -369,9 +372,9 @@ func (s *Store) HighestAttestations( if encodedAttRecord == nil { continue } - attWrapper, err1 := decodeAttestationRecord(encodedAttRecord) - if err1 != nil { - return err1 + attWrapper, decodeErr := decodeAttestationRecord(encodedAttRecord) + if decodeErr != nil { + return decodeErr } highestAtt := &slashpb.HighestAttestation{ ValidatorIndex: uint64(indices[i]),