Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance during CheckAttesterDoubleVotes #8927

Merged
merged 11 commits into from
May 25, 2021
Merged
2 changes: 1 addition & 1 deletion beacon-chain/db/slasherkv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ go_library(
"//proto/beacon/rpc/v1:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/fileutil:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/params:go_default_library",
"@com_github_ferranbt_fastssz//:go_default_library",
"@com_github_golang_snappy//:go_default_library",
Expand All @@ -32,6 +31,7 @@ go_library(
"@com_github_sirupsen_logrus//:go_default_library",
"@io_etcd_go_bbolt//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
],
)

Expand Down
1 change: 0 additions & 1 deletion beacon-chain/db/slasherkv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func NewKVStore(ctx context.Context, dirPath string, config *Config) (*Store, er
tx,
// Slasher buckets.
attestedEpochsByValidator,
attestationRecordsBucket,
attestationDataRootsBucket,
proposalRecordsBucket,
slasherChunksBucket,
Expand Down
6 changes: 1 addition & 5 deletions beacon-chain/db/slasherkv/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,11 @@ func (s *Store) PruneAttestations(
log.Debugf("Pruned %d/%d epochs worth of attestations", epochAtCursor, endPruneEpoch-1)
if err := s.db.Update(func(tx *bolt.Tx) error {
rootsBkt := tx.Bucket(attestationDataRootsBucket)
attsBkt := tx.Bucket(attestationRecordsBucket)
c := rootsBkt.Cursor()

var lastPrunedEpoch, epochsPruned types.Epoch
// We begin a pruning iteration at starting from the first item in the bucket.
for k, v := c.First(); k != nil; k, v = c.Next() {
for k, _ := c.First(); k != nil; k, _ = c.Next() {
// We check the epoch from the current key in the database.
// If we have hit an epoch that is greater than the end epoch of the pruning process,
// we then completely exit the process as we are done.
Expand All @@ -174,9 +173,6 @@ func (s *Store) PruneAttestations(
if err := rootsBkt.Delete(k); err != nil {
return err
}
if err := attsBkt.Delete(v); err != nil {
return err
}
if epochAtCursor == 0 {
epochsPruned = 1
} else if epochAtCursor > lastPrunedEpoch {
Expand Down
1 change: 0 additions & 1 deletion beacon-chain/db/slasherkv/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package slasherkv
var (
// Slasher buckets.
attestedEpochsByValidator = []byte("attested-epochs-by-validator")
attestationRecordsBucket = []byte("attestation-records")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a database "migration" to delete this bucket to reclaim the space, if it is no longer used.

@rauljordan, what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This db is unused at the moment, so we are safe, but we should add a migration in the future for slasher changes

attestationDataRootsBucket = []byte("attestation-data-roots")
proposalRecordsBucket = []byte("proposal-records")
slasherChunksBucket = []byte("slasher-chunks")
Expand Down
120 changes: 61 additions & 59 deletions beacon-chain/db/slasherkv/slasher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/binary"
"fmt"
"sort"
"sync"

ssz "github.com/ferranbt/fastssz"
"github.com/golang/snappy"
Expand All @@ -15,9 +16,9 @@ import (
slashertypes "github.com/prysmaticlabs/prysm/beacon-chain/slasher/types"
slashpb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/hashutil"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
"golang.org/x/sync/errgroup"
)

const (
Expand Down Expand Up @@ -92,43 +93,58 @@ func (s *Store) CheckAttesterDoubleVotes(
ctx, span := trace.StartSpan(ctx, "BeaconDB.CheckAttesterDoubleVotes")
defer span.End()
doubleVotes := make([]*slashertypes.AttesterDoubleVote, 0)
err := s.db.View(func(tx *bolt.Tx) error {
signingRootsBkt := tx.Bucket(attestationDataRootsBucket)
attRecordsBkt := tx.Bucket(attestationRecordsBucket)
for _, att := range attestations {
encEpoch := encodeTargetEpoch(att.IndexedAttestation.Data.Target.Epoch)
for _, valIdx := range att.IndexedAttestation.AttestingIndices {
encIdx := encodeValidatorIndex(types.ValidatorIndex(valIdx))
validatorEpochKey := append(encEpoch, encIdx...)
attRecordsKey := signingRootsBkt.Get(validatorEpochKey)

// An attestation record key is comprised of a signing root (32 bytes)
// and a fast sum hash of the attesting indices (8 bytes).
if len(attRecordsKey) < attestationRecordKeySize {
continue
doubleVotesMu := sync.Mutex{}
eg, egctx := errgroup.WithContext(ctx)
for _, att := range attestations {
// Copy the iteration instance to a local variable to give each go-routine its own copy to play with.
// See https://golang.org/doc/faq#closures_and_goroutines for more details.
attToProcess := att
// process every attestation parallelly.
eg.Go(func() error {
err := s.db.View(func(tx *bolt.Tx) error {
signingRootsBkt := tx.Bucket(attestationDataRootsBucket)
encEpoch := encodeTargetEpoch(attToProcess.IndexedAttestation.Data.Target.Epoch)
localDoubleVotes := make([]*slashertypes.AttesterDoubleVote, 0)
for _, valIdx := range attToProcess.IndexedAttestation.AttestingIndices {
encIdx := encodeValidatorIndex(types.ValidatorIndex(valIdx))
validatorEpochKey := append(encEpoch, encIdx...)
encExistingAttRecord := signingRootsBkt.Get(validatorEpochKey)
if encExistingAttRecord == nil {
continue
}
existingSigningRoot := bytesutil.ToBytes32(encExistingAttRecord[:signingRootSize])
if existingSigningRoot != attToProcess.SigningRoot {
existingAttRecord, err := decodeAttestationRecord(encExistingAttRecord[signingRootSize:])
if err != nil {
return err
}
slashAtt := &slashertypes.AttesterDoubleVote{
ValidatorIndex: types.ValidatorIndex(valIdx),
Target: attToProcess.IndexedAttestation.Data.Target.Epoch,
PrevAttestationWrapper: existingAttRecord,
AttestationWrapper: attToProcess,
}
localDoubleVotes = append(localDoubleVotes, slashAtt)
}
}
encExistingAttRecord := attRecordsBkt.Get(attRecordsKey)
if encExistingAttRecord == nil {
continue
// if any routine is cancelled, then cancel this routine too
select {
case <-egctx.Done():
return egctx.Err()
default:
}
existingSigningRoot := bytesutil.ToBytes32(attRecordsKey[:signingRootSize])
if existingSigningRoot != att.SigningRoot {
existingAttRecord, err := decodeAttestationRecord(encExistingAttRecord)
if err != nil {
return err
}
doubleVotes = append(doubleVotes, &slashertypes.AttesterDoubleVote{
ValidatorIndex: types.ValidatorIndex(valIdx),
Target: att.IndexedAttestation.Data.Target.Epoch,
PrevAttestationWrapper: existingAttRecord,
AttestationWrapper: att,
})
// if there are any doible votes in this attestation, add it to the global double votes
if len(localDoubleVotes) > 0 {
doubleVotesMu.Lock()
defer doubleVotesMu.Unlock()
doubleVotes = append(doubleVotes, localDoubleVotes...)
}
}
}
return nil
})
return doubleVotes, err
return nil
})
return err
})
}
return doubleVotes, eg.Wait()
}

// AttestationRecordForValidator given a validator index and a target epoch,
Expand All @@ -144,16 +160,11 @@ func (s *Store) AttestationRecordForValidator(
key := append(encEpoch, encIdx...)
err := s.db.View(func(tx *bolt.Tx) error {
signingRootsBkt := tx.Bucket(attestationDataRootsBucket)
attRecordKey := signingRootsBkt.Get(key)
if attRecordKey == nil {
return nil
}
attRecordsBkt := tx.Bucket(attestationRecordsBucket)
indexedAttBytes := attRecordsBkt.Get(attRecordKey)
indexedAttBytes := signingRootsBkt.Get(key)
if indexedAttBytes == nil {
return nil
}
decoded, err := decodeAttestationRecord(indexedAttBytes)
decoded, err := decodeAttestationRecord(indexedAttBytes[signingRootSize:])
if err != nil {
return err
}
Expand Down Expand Up @@ -189,23 +200,15 @@ func (s *Store) SaveAttestationRecordsForValidators(
encodedRecords[i] = value
}
return s.db.Update(func(tx *bolt.Tx) error {
attRecordsBkt := tx.Bucket(attestationRecordsBucket)
signingRootsBkt := tx.Bucket(attestationDataRootsBucket)
for i, att := range attestations {
// An attestation record key is comprised of the signing root (32 bytes)
// and a fastsum64 of the attesting indices (8 bytes). This is used
// to have a more optimal schema
attIndicesHash := hashutil.FastSum64(encodedIndices[i])
attRecordKey := append(
att.SigningRoot[:], ssz.MarshalUint64(make([]byte, 0), attIndicesHash)...,
)
if err := attRecordsBkt.Put(attRecordKey, encodedRecords[i]); err != nil {
return err
}
for _, valIdx := range att.IndexedAttestation.AttestingIndices {
encIdx := encodeValidatorIndex(types.ValidatorIndex(valIdx))
key := append(encodedTargetEpoch[i], encIdx...)
if err := signingRootsBkt.Put(key, attRecordKey); err != nil {
// signature is prefixed so that double votest can be checked without decoding attestations
// and save some cpu cycles
value := append(att.SigningRoot[:], encodedRecords[i]...)
if err := signingRootsBkt.Put(key, value); err != nil {
return err
}
}
Expand Down Expand Up @@ -361,18 +364,17 @@ func (s *Store) HighestAttestations(
history := make([]*slashpb.HighestAttestation, 0, len(encodedIndices))
err = s.db.View(func(tx *bolt.Tx) error {
signingRootsBkt := tx.Bucket(attestationDataRootsBucket)
attRecordsBkt := tx.Bucket(attestationRecordsBucket)
for i := 0; i < len(encodedIndices); i++ {
c := signingRootsBkt.Cursor()
for k, v := c.Last(); k != nil; k, v = c.Prev() {
if suffixForAttestationRecordsKey(k, encodedIndices[i]) {
encodedAttRecord := attRecordsBkt.Get(v)
encodedAttRecord := v[signingRootSize:]
if encodedAttRecord == nil {
continue
}
attWrapper, err := decodeAttestationRecord(encodedAttRecord)
if err != nil {
return err
attWrapper, decodeErr := decodeAttestationRecord(encodedAttRecord)
if decodeErr != nil {
return decodeErr
}
highestAtt := &slashpb.HighestAttestation{
ValidatorIndex: uint64(indices[i]),
Expand Down
40 changes: 40 additions & 0 deletions beacon-chain/db/slasherkv/slasher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package slasherkv
import (
"context"
"encoding/binary"
"math/rand"
"reflect"
"sort"
"testing"
"time"

ssz "github.com/ferranbt/fastssz"
types "github.com/prysmaticlabs/eth2-types"
Expand Down Expand Up @@ -110,6 +113,10 @@ func TestStore_CheckAttesterDoubleVotes(t *testing.T) {
}
doubleVotes, err := beaconDB.CheckAttesterDoubleVotes(ctx, slashableAtts)
require.NoError(t, err)

sort.SliceStable(doubleVotes, func(i, j int) bool {
return uint64(doubleVotes[i].ValidatorIndex) < uint64(doubleVotes[j].ValidatorIndex)
})
require.DeepEqual(t, wanted, doubleVotes)
}

Expand Down Expand Up @@ -442,6 +449,39 @@ func BenchmarkHighestAttestations(b *testing.B) {
}
}

func BenchmarkStore_CheckDoubleBlockProposals(b *testing.B) {
b.StopTimer()
count := 10000
valsPerAtt := 100
indicesPerAtt := make([][]uint64, count)
for i := 0; i < count; i++ {
indicesForAtt := make([]uint64, valsPerAtt)
for r := i * count; r < valsPerAtt*(i+1); r++ {
indicesForAtt[i] = uint64(r)
}
indicesPerAtt[i] = indicesForAtt
}
atts := make([]*slashertypes.IndexedAttestationWrapper, count)
for i := 0; i < count; i++ {
atts[i] = createAttestationWrapper(types.Epoch(i), types.Epoch(i+2), indicesPerAtt[i], []byte{})
}

ctx := context.Background()
beaconDB := setupDB(b)
require.NoError(b, beaconDB.SaveAttestationRecordsForValidators(ctx, atts))

// shuffle attestations
rand.Seed(time.Now().UnixNano())
rand.Shuffle(count, func(i, j int) { atts[i], atts[j] = atts[j], atts[i] })

b.ReportAllocs()
b.StartTimer()
for i := 0; i < b.N; i++ {
_, err := beaconDB.CheckAttesterDoubleVotes(ctx, atts)
require.NoError(b, err)
}
}

func createProposalWrapper(t *testing.T, slot types.Slot, proposerIndex types.ValidatorIndex, signingRoot []byte) *slashertypes.SignedBlockHeaderWrapper {
header := &ethpb.BeaconBlockHeader{
Slot: slot,
Expand Down