Skip to content

Commit

Permalink
Import old attestation store (#7466)
Browse files Browse the repository at this point in the history
* import attestation to new data structure

* add tests

* add failure massages

* added signing root to data

* added signing root to data

* public keys 48 length

* remove redundant loop

* fix proposals

* fix manage dir name

* Omit redundant nil check on slices

* nishant feedback

* add test
  • Loading branch information
shayzluf authored Oct 12, 2020
1 parent 7cc32c4 commit 3d0fc8b
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 29 deletions.
50 changes: 49 additions & 1 deletion validator/db/kv/attestation_history_new.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
bolt "go.etcd.io/bbolt"
Expand Down Expand Up @@ -82,7 +83,7 @@ func (hd EncHistoryData) getTargetData(ctx context.Context, target uint64) (*His

history.Source = bytesutil.FromBytes8(hd[cursor : cursor+sourceSize])
sr := make([]byte, 32)
copy(hd[cursor+sourceSize:cursor+historySize], sr)
copy(sr, hd[cursor+sourceSize:cursor+historySize])
history.SigningRoot = sr
return history, nil
}
Expand Down Expand Up @@ -150,3 +151,50 @@ func (store *Store) SaveAttestationHistoryNewForPubKeys(ctx context.Context, his
})
return err
}

// ImportOldAttestationFormat import old attestation format data into the new attestation format
func (store *Store) ImportOldAttestationFormat(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "Validator.ImportOldAttestationFormat")
defer span.End()
var allKeys [][48]byte

if err := store.db.View(func(tx *bolt.Tx) error {
attestationsBucket := tx.Bucket(historicAttestationsBucket)
if err := attestationsBucket.ForEach(func(pubKey, _ []byte) error {
var pubKeyCopy [48]byte
copy(pubKeyCopy[:], pubKey)
allKeys = append(allKeys, pubKeyCopy)
return nil
}); err != nil {
return errors.Wrapf(err, "could not retrieve attestations for source in %s", store.databasePath)
}

return nil
}); err != nil {
return err
}
allKeys = removeDuplicateKeys(allKeys)
attMap, err := store.AttestationHistoryForPubKeys(ctx, allKeys)
if err != nil {
return errors.Wrapf(err, "could not retrieve data for public keys %v", allKeys)
}
dataMap := make(map[[48]byte]EncHistoryData)
for key, atts := range attMap {
dataMap[key] = newAttestationHistoryArray(atts.LatestEpochWritten)
dataMap[key], err = dataMap[key].setLatestEpochWritten(ctx, atts.LatestEpochWritten)
if err != nil {
return err
}
for target, source := range atts.TargetToSource {
dataMap[key], err = dataMap[key].setTargetData(ctx, target, &HistoryData{
Source: source,
SigningRoot: []byte{1},
})
if err != nil {
return err
}
}
}
err = store.SaveAttestationHistoryNewForPubKeys(ctx, dataMap)
return err
}
79 changes: 76 additions & 3 deletions validator/db/kv/attestation_history_new_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"testing"

slashpb "github.com/prysmaticlabs/prysm/proto/slashing"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
bolt "go.etcd.io/bbolt"
)

func TestNewAttestationHistoryArray(t *testing.T) {
Expand Down Expand Up @@ -113,11 +115,15 @@ func TestSetTargetData(t *testing.T) {
})
if tt.error == "" {
require.NoError(t, err)

} else {
assert.ErrorContains(t, tt.error, err)
td, err := enc.getTargetData(ctx, tt.target)
require.NoError(t, err)
require.DeepEqual(t, bytesutil.PadTo(tt.signingRoot, 32), td.SigningRoot)
require.Equal(t, tt.source, td.Source)
return
}
assert.ErrorContains(t, tt.error, err)
require.DeepEqual(t, tt.expected, enc)

})
}

Expand Down Expand Up @@ -166,3 +172,70 @@ func TestAttestationHistoryForPubKeysNew_OK(t *testing.T) {
require.NoError(t, err)
require.DeepEqual(t, setAttHistoryForPubKeys, historyForPubKeys, "Expected attestation history epoch bits to be empty")
}
func TestStore_ImportOldAttestationFormatBadSourceFormat(t *testing.T) {
ctx := context.Background()
pubKeys := [][48]byte{{3}, {4}}
db := setupDB(t, pubKeys)
err := db.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(historicAttestationsBucket)
for _, pubKey := range pubKeys {
if err := bucket.Put(pubKey[:], []byte{1}); err != nil {
return err
}
}
return nil
})
require.NoError(t, err)
require.ErrorContains(t, "could not retrieve data for public keys", db.ImportOldAttestationFormat(ctx))
}

func TestStore_ImportOldAttestationFormat(t *testing.T) {
ctx := context.Background()
pubKeys := [][48]byte{{3}, {4}}
db := setupDB(t, pubKeys)

farFuture := params.BeaconConfig().FarFutureEpoch
newMap := make(map[uint64]uint64)
// The validator attested at target epoch 2 but had no attestations for target epochs 0 and 1.
newMap[0] = farFuture
newMap[1] = farFuture
newMap[2] = 1
history := &slashpb.AttestationHistory{
TargetToSource: newMap,
LatestEpochWritten: 2,
}

newMap2 := make(map[uint64]uint64)
// The validator attested at target epoch 1 and 3 but had no attestations for target epochs 0 and 2.
newMap2[0] = farFuture
newMap2[1] = 0
newMap2[2] = farFuture
newMap2[3] = 2
history2 := &slashpb.AttestationHistory{
TargetToSource: newMap2,
LatestEpochWritten: 3,
}

attestationHistory := make(map[[48]byte]*slashpb.AttestationHistory)
attestationHistory[pubKeys[0]] = history
attestationHistory[pubKeys[1]] = history2

require.NoError(t, db.SaveAttestationHistoryForPubKeys(context.Background(), attestationHistory), "Saving attestation history failed")
require.NoError(t, db.ImportOldAttestationFormat(ctx), "Import attestation history failed")

attHis, err := db.AttestationHistoryNewForPubKeys(ctx, pubKeys)
require.NoError(t, err)
for pk, encHis := range attHis {
his, ok := attestationHistory[pk]
require.Equal(t, true, ok, "Missing public key in the original data")
lew, err := encHis.getLatestEpochWritten(ctx)
require.NoError(t, err, "Failed to get latest epoch written")
require.Equal(t, his.LatestEpochWritten, lew, "LatestEpochWritten is not equal to the source data value")
for target, source := range his.TargetToSource {
hd, err := encHis.getTargetData(ctx, target)
require.NoError(t, err, "Failed to get target data for epoch: %d", target)
require.Equal(t, source, hd.Source, "Source epoch is different")
require.DeepEqual(t, bytesutil.PadTo([]byte{1}, 32), hd.SigningRoot, "Signing root differs in imported data")
}
}
}
39 changes: 19 additions & 20 deletions validator/db/kv/manage.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package kv

import (
"bytes"
"context"
"encoding/hex"
"path/filepath"
Expand All @@ -20,12 +19,12 @@ type epochProposals struct {
}

type pubKeyProposals struct {
PubKey []byte
PubKey [48]byte
Proposals []epochProposals
}

type pubKeyAttestations struct {
PubKey []byte
PubKey [48]byte
Attestations []byte
}

Expand Down Expand Up @@ -54,13 +53,13 @@ func Split(ctx context.Context, sourceStore *Store, targetDirectory string) erro
return createSplitTargetStores(targetDirectory, allProposals, allAttestations)
}

func getPubKeyProposals(pubKey []byte, proposalsBucket *bolt.Bucket) (*pubKeyProposals, error) {
func getPubKeyProposals(pubKey [48]byte, proposalsBucket *bolt.Bucket) (*pubKeyProposals, error) {
pubKeyProposals := pubKeyProposals{
PubKey: pubKey,
Proposals: []epochProposals{},
}

pubKeyBucket := proposalsBucket.Bucket(pubKey)
pubKeyBucket := proposalsBucket.Bucket(pubKey[:])
if pubKeyBucket == nil {
return &pubKeyProposals, nil
}
Expand Down Expand Up @@ -104,7 +103,7 @@ func createMergeTargetStore(
err = newStore.update(func(tx *bolt.Tx) error {
allProposalsBucket := tx.Bucket(historicProposalsBucket)
for _, pubKeyProposals := range allProposals {
proposalsBucket, err := createProposalsBucket(allProposalsBucket, pubKeyProposals.PubKey)
proposalsBucket, err := createProposalsBucket(allProposalsBucket, pubKeyProposals.PubKey[:])
if err != nil {
return err
}
Expand Down Expand Up @@ -147,7 +146,7 @@ func createSplitTargetStores(
}()

for _, pubKeyProposals := range allProposals {
dirName := hex.EncodeToString(pubKeyProposals.PubKey)[:12]
dirName := hex.EncodeToString(pubKeyProposals.PubKey[:])[:12]
path := filepath.Join(targetDirectory, dirName)
newStore, err := NewKVStore(path, [][48]byte{})
if err != nil {
Expand All @@ -157,7 +156,7 @@ func createSplitTargetStores(

if err := newStore.update(func(tx *bolt.Tx) error {
allProposalsBucket := tx.Bucket(historicProposalsBucket)
proposalsBucket, err := createProposalsBucket(allProposalsBucket, pubKeyProposals.PubKey)
proposalsBucket, err := createProposalsBucket(allProposalsBucket, pubKeyProposals.PubKey[:])
if err != nil {
return err
}
Expand All @@ -167,7 +166,7 @@ func createSplitTargetStores(

attestationsBucket := tx.Bucket(historicAttestationsBucket)
for _, pubKeyAttestations := range allAttestations {
if string(pubKeyAttestations.PubKey) == string(pubKeyProposals.PubKey) {
if string(pubKeyAttestations.PubKey[:]) == string(pubKeyProposals.PubKey[:]) {
if err := addAttestations(attestationsBucket, pubKeyAttestations); err != nil {
return err
}
Expand All @@ -185,13 +184,13 @@ func createSplitTargetStores(
for _, pubKeyAttestations := range allAttestations {
var hasMatchingProposals = false
for _, pubKeyProposals := range allProposals {
if string(pubKeyAttestations.PubKey) == string(pubKeyProposals.PubKey) {
if string(pubKeyAttestations.PubKey[:]) == string(pubKeyProposals.PubKey[:]) {
hasMatchingProposals = true
break
}
}
if !hasMatchingProposals {
dirName := hex.EncodeToString(pubKeyAttestations.PubKey)[:12]
dirName := hex.EncodeToString(pubKeyAttestations.PubKey[:])[:12]
path := filepath.Join(targetDirectory, dirName)
newStore, err := NewKVStore(path, [][48]byte{})
if err != nil {
Expand All @@ -218,13 +217,13 @@ func getAllProposalsAndAllAttestations(stores []*Store) ([]pubKeyProposals, []pu
for _, store := range stores {
// Storing keys upfront will allow using several short transactions (one for every key)
// instead of one long-running transaction for all keys.
var allKeys [][]byte
var allKeys [][48]byte

if err := store.db.View(func(tx *bolt.Tx) error {
proposalsBucket := tx.Bucket(historicProposalsBucket)
if err := proposalsBucket.ForEach(func(pubKey, _ []byte) error {
pubKeyCopy := make([]byte, len(pubKey))
copy(pubKeyCopy, pubKey)
var pubKeyCopy [48]byte
copy(pubKeyCopy[:], pubKey)
allKeys = append(allKeys, pubKeyCopy)
return nil
}); err != nil {
Expand All @@ -233,8 +232,8 @@ func getAllProposalsAndAllAttestations(stores []*Store) ([]pubKeyProposals, []pu

attestationsBucket := tx.Bucket(historicAttestationsBucket)
if err := attestationsBucket.ForEach(func(pubKey, _ []byte) error {
pubKeyCopy := make([]byte, len(pubKey))
copy(pubKeyCopy, pubKey)
var pubKeyCopy [48]byte
copy(pubKeyCopy[:], pubKey)
allKeys = append(allKeys, pubKeyCopy)
return nil
}); err != nil {
Expand All @@ -258,7 +257,7 @@ func getAllProposalsAndAllAttestations(stores []*Store) ([]pubKeyProposals, []pu
allProposals = append(allProposals, *pubKeyProposals)

attestationsBucket := tx.Bucket(historicAttestationsBucket)
v := attestationsBucket.Get(pubKey)
v := attestationsBucket.Get(pubKey[:])
if v != nil {
attestations := pubKeyAttestations{
PubKey: pubKey,
Expand Down Expand Up @@ -296,7 +295,7 @@ func addEpochProposals(bucket *bolt.Bucket, proposals []epochProposals) error {
}

func addAttestations(bucket *bolt.Bucket, attestations pubKeyAttestations) error {
if err := bucket.Put(attestations.PubKey, attestations.Attestations); err != nil {
if err := bucket.Put(attestations.PubKey[:], attestations.Attestations); err != nil {
return errors.Wrapf(
err,
"could not add public key attestations for public key %x",
Expand All @@ -305,13 +304,13 @@ func addAttestations(bucket *bolt.Bucket, attestations pubKeyAttestations) error
return nil
}

func removeDuplicateKeys(keys [][]byte) [][]byte {
func removeDuplicateKeys(keys [][48]byte) [][48]byte {
last := 0

next:
for _, k1 := range keys {
for _, k2 := range keys[:last] {
if bytes.Equal(k1, k2) {
if k1 == k2 {
continue next
}
}
Expand Down
10 changes: 5 additions & 5 deletions validator/db/kv/new_proposal_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (store *Store) ProposalHistoryForSlot(ctx context.Context, publicKey []byte
return fmt.Errorf("validator history empty for public key %#x", publicKey)
}
sr := valBucket.Get(bytesutil.Uint64ToBytesBigEndian(slot))
if sr == nil || len(sr) == 0 {
if len(sr) == 0 {
return nil
}
copy(signingRoot, sr)
Expand Down Expand Up @@ -62,12 +62,12 @@ func (store *Store) ImportProposalHistory(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "Validator.ImportProposalHistory")
defer span.End()

var allKeys [][]byte
var allKeys [][48]byte
err := store.db.View(func(tx *bolt.Tx) error {
proposalsBucket := tx.Bucket(historicProposalsBucket)
if err := proposalsBucket.ForEach(func(pubKey, _ []byte) error {
pubKeyCopy := make([]byte, len(pubKey))
copy(pubKeyCopy, pubKey)
var pubKeyCopy [48]byte
copy(pubKeyCopy[:], pubKey)
allKeys = append(allKeys, pubKeyCopy)
return nil
}); err != nil {
Expand Down Expand Up @@ -97,7 +97,7 @@ func (store *Store) ImportProposalHistory(ctx context.Context) error {
err = store.db.Update(func(tx *bolt.Tx) error {
newProposalsBucket := tx.Bucket(newhistoricProposalsBucket)
for _, pr := range prs {
valBucket, err := newProposalsBucket.CreateBucketIfNotExists(pr.PubKey)
valBucket, err := newProposalsBucket.CreateBucketIfNotExists(pr.PubKey[:])
if err != nil {
return errors.Wrap(err, "could not could not create bucket for public key")
}
Expand Down

0 comments on commit 3d0fc8b

Please sign in to comment.