Skip to content

Commit

Permalink
Add Batch Method for Reading Validator Proposing Histories (#8378)
Browse files Browse the repository at this point in the history
* add in batch method

* add in new proposal history methods for efficiency and progress bars

* tests fixed to use the new methods

* add back get slot proposing history method

* add gaz
  • Loading branch information
rauljordan authored Feb 2, 2021
1 parent 3fd8c4c commit caac08d
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 73 deletions.
1 change: 1 addition & 0 deletions validator/db/iface/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type ValidatorDB interface {
// Proposer protection related methods.
HighestSignedProposal(ctx context.Context, publicKey [48]byte) (uint64, bool, error)
LowestSignedProposal(ctx context.Context, publicKey [48]byte) (uint64, bool, error)
ProposalHistoryForPubKey(ctx context.Context, publicKey [48]byte) ([]*kv.Proposal, error)
ProposalHistoryForSlot(ctx context.Context, publicKey [48]byte, slot uint64) ([32]byte, bool, error)
SaveProposalHistoryForSlot(ctx context.Context, pubKey [48]byte, slot uint64, signingRoot []byte) error
ProposedPublicKeys(ctx context.Context) ([][48]byte, error)
Expand Down
26 changes: 26 additions & 0 deletions validator/db/kv/proposer_protection.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,32 @@ func (s *Store) ProposalHistoryForSlot(ctx context.Context, publicKey [48]byte,
return signingRoot, proposalExists, err
}

// ProposalHistoryForPubKey returns the entire proposal history for a given public key.
func (s *Store) ProposalHistoryForPubKey(ctx context.Context, publicKey [48]byte) ([]*Proposal, error) {
ctx, span := trace.StartSpan(ctx, "Validator.ProposalHistoryForPubKey")
defer span.End()

proposals := make([]*Proposal, 0)
err := s.view(func(tx *bolt.Tx) error {
bucket := tx.Bucket(historicProposalsBucket)
valBucket := bucket.Bucket(publicKey[:])
if valBucket == nil {
return nil
}
return valBucket.ForEach(func(slotKey, signingRootBytes []byte) error {
slot := bytesutil.BytesToUint64BigEndian(slotKey)
sr := make([]byte, 32)
copy(sr, signingRootBytes)
proposals = append(proposals, &Proposal{
Slot: slot,
SigningRoot: sr,
})
return nil
})
})
return proposals, err
}

// SaveProposalHistoryForSlot saves the proposal history for the requested validator public key.
// We also check if the incoming proposal slot is lower than the lowest signed proposal slot
// for the validator and override its value on disk.
Expand Down
57 changes: 38 additions & 19 deletions validator/db/kv/proposer_protection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,37 +46,49 @@ func TestSaveProposalHistoryForSlot_OK(t *testing.T) {
require.DeepEqual(t, bytesutil.PadTo([]byte{1}, 32), signingRoot[:], "Expected DB to keep object the same")
}

func TestSaveProposalHistoryForSlot_Empty(t *testing.T) {
func TestNewProposalHistoryForPubKey_ReturnsEmptyIfNoHistory(t *testing.T) {
valPubkey := [48]byte{1, 2, 3}
db := setupDB(t, [][48]byte{})

proposalHistory, err := db.ProposalHistoryForPubKey(context.Background(), valPubkey)
require.NoError(t, err)
assert.DeepEqual(t, make([]*Proposal, 0), proposalHistory)
}

func TestSaveProposalHistoryForPubKey_OK(t *testing.T) {
pubkey := [48]byte{3}
db := setupDB(t, [][48]byte{pubkey})

slot := uint64(2)
emptySlot := uint64(120)
err := db.SaveProposalHistoryForSlot(context.Background(), pubkey, slot, []byte{1})

root := [32]byte{1}
err := db.SaveProposalHistoryForSlot(context.Background(), pubkey, slot, root[:])
require.NoError(t, err, "Saving proposal history failed: %v")
signingRoot, _, err := db.ProposalHistoryForSlot(context.Background(), pubkey, emptySlot)
proposalHistory, err := db.ProposalHistoryForPubKey(context.Background(), pubkey)
require.NoError(t, err, "Failed to get proposal history")

require.NotNil(t, signingRoot)
require.DeepEqual(t, bytesutil.PadTo([]byte{}, 32), signingRoot[:], "Expected DB to keep object the same")
require.NotNil(t, proposalHistory)
want := []*Proposal{
{
Slot: slot,
SigningRoot: root[:],
},
}
require.DeepEqual(t, want[0], proposalHistory[0])
}

func TestSaveProposalHistoryForSlot_Overwrites(t *testing.T) {
pubkey := [48]byte{0}
tests := []struct {
slot uint64
signingRoot []byte
}{
{
slot: uint64(1),
signingRoot: bytesutil.PadTo([]byte{1}, 32),
},
{
slot: uint64(2),
signingRoot: bytesutil.PadTo([]byte{2}, 32),
},
{
slot: uint64(1),
signingRoot: bytesutil.PadTo([]byte{3}, 32),
},
}
Expand All @@ -85,11 +97,11 @@ func TestSaveProposalHistoryForSlot_Overwrites(t *testing.T) {
db := setupDB(t, [][48]byte{pubkey})
err := db.SaveProposalHistoryForSlot(context.Background(), pubkey, 0, tt.signingRoot)
require.NoError(t, err, "Saving proposal history failed")
signingRoot, _, err := db.ProposalHistoryForSlot(context.Background(), pubkey, 0)
proposalHistory, err := db.ProposalHistoryForPubKey(context.Background(), pubkey)
require.NoError(t, err, "Failed to get proposal history")

require.NotNil(t, signingRoot)
require.DeepEqual(t, tt.signingRoot, signingRoot[:], "Expected DB to keep object the same")
require.NotNil(t, proposalHistory)
require.DeepEqual(t, tt.signingRoot, proposalHistory[0].SigningRoot, "Expected DB to keep object the same")
require.NoError(t, db.Close(), "Failed to close database")
}
}
Expand Down Expand Up @@ -143,15 +155,22 @@ func TestPruneProposalHistoryBySlot_OK(t *testing.T) {
require.NoError(t, err, "Saving proposal history failed")
}

signingRootsBySlot := make(map[uint64][]byte)
proposalHistory, err := db.ProposalHistoryForPubKey(context.Background(), pubKey)
require.NoError(t, err)

for _, hist := range proposalHistory {
signingRootsBySlot[hist.Slot] = hist.SigningRoot
}

for _, slot := range tt.removedSlots {
sr, _, err := db.ProposalHistoryForSlot(context.Background(), pubKey, slot)
require.NoError(t, err, "Failed to get proposal history")
require.DeepEqual(t, bytesutil.PadTo([]byte{}, 32), sr[:], "Unexpected difference in bytes for epoch %d", slot)
_, ok := signingRootsBySlot[slot]
require.Equal(t, false, ok)
}
for _, slot := range tt.storedSlots {
sr, _, err := db.ProposalHistoryForSlot(context.Background(), pubKey, slot)
require.NoError(t, err, "Failed to get proposal history")
require.DeepEqual(t, signedRoot, sr[:], "Unexpected difference in bytes for epoch %d", slot)
root, ok := signingRootsBySlot[slot]
require.Equal(t, true, ok)
require.DeepEqual(t, signedRoot, root, "Unexpected difference in bytes for epoch %d", slot)
}
require.NoError(t, db.Close(), "Failed to close database")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
deps = [
"//shared/bytesutil:go_default_library",
"//shared/params:go_default_library",
"//shared/progressutil:go_default_library",
"//shared/slashutil:go_default_library",
"//validator/db:go_default_library",
"//validator/db/kv:go_default_library",
Expand All @@ -36,7 +37,6 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//shared/params:go_default_library",
"//shared/testutil/assert:go_default_library",
"//shared/testutil/require:go_default_library",
"//validator/db/kv:go_default_library",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/progressutil"
"github.com/prysmaticlabs/prysm/validator/db"
"github.com/prysmaticlabs/prysm/validator/slashing-protection/local/standard-protection-format/format"
)
Expand Down Expand Up @@ -40,6 +41,9 @@ func ExportStandardProtectionJSON(ctx context.Context, validatorDB db.Database)
dataByPubKey := make(map[[48]byte]*format.ProtectionData)

// Extract the signed proposals by public key.
progress := progressutil.InitializeProgressBar(
len(proposedPublicKeys), "Extracting signed blocks by validator public key",
)
for _, pubKey := range proposedPublicKeys {
pubKeyHex, err := pubKeyToHexString(pubKey[:])
if err != nil {
Expand All @@ -54,9 +58,15 @@ func ExportStandardProtectionJSON(ctx context.Context, validatorDB db.Database)
SignedBlocks: signedBlocks,
SignedAttestations: nil,
}
if err := progress.Add(1); err != nil {
return nil, err
}
}

// Extract the signed attestations by public key.
progress = progressutil.InitializeProgressBar(
len(proposedPublicKeys), "Extracting signed attestations by validator public key",
)
for _, pubKey := range attestedPublicKeys {
pubKeyHex, err := pubKeyToHexString(pubKey[:])
if err != nil {
Expand All @@ -75,6 +85,9 @@ func ExportStandardProtectionJSON(ctx context.Context, validatorDB db.Database)
SignedAttestations: signedAttestations,
}
}
if err := progress.Add(1); err != nil {
return nil, err
}
}

// Next we turn our map into a slice as expected by the EIP-3076 JSON standard.
Expand Down Expand Up @@ -129,39 +142,23 @@ func signedBlocksByPubKey(ctx context.Context, validatorDB db.Database, pubKey [
// in our database, we return nil. This way, a user will be able to export their
// slashing protection history even if one of their keys does not have a history
// of signed blocks.
lowestSignedSlot, exists, err := validatorDB.LowestSignedProposal(ctx, pubKey)
if err != nil {
return nil, err
}
if !exists {
return nil, nil
}
highestSignedSlot, exists, err := validatorDB.HighestSignedProposal(ctx, pubKey)
proposalHistory, err := validatorDB.ProposalHistoryForPubKey(ctx, pubKey)
if err != nil {
return nil, err
}
if !exists {
return nil, nil
}
signedBlocks := make([]*format.SignedBlock, 0)
for i := lowestSignedSlot; i <= highestSignedSlot; i++ {
for _, proposal := range proposalHistory {
if ctx.Err() != nil {
return nil, ctx.Err()
}
signingRoot, exists, err := validatorDB.ProposalHistoryForSlot(ctx, pubKey, i)
signingRootHex, err := rootToHexString(proposal.SigningRoot)
if err != nil {
return nil, err
}
if exists {
signingRootHex, err := rootToHexString(signingRoot[:])
if err != nil {
return nil, err
}
signedBlocks = append(signedBlocks, &format.SignedBlock{
Slot: fmt.Sprintf("%d", i),
SigningRoot: signingRootHex,
})
}
signedBlocks = append(signedBlocks, &format.SignedBlock{
Slot: fmt.Sprintf("%d", proposal.Slot),
SigningRoot: signingRootHex,
})
}
return signedBlocks, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"testing"

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
"github.com/prysmaticlabs/prysm/validator/db/kv"
Expand Down Expand Up @@ -91,17 +90,14 @@ func TestStore_ImportInterchangeData_BadFormat_PreventsDBWrites(t *testing.T) {
require.NoError(t, err)
require.Equal(t, kv.NotSlashable, slashingKind)
}
proposals := proposalHistory[i].Proposals
for _, proposal := range proposals {
receivedProposalSigningRoot, _, err := validatorDB.ProposalHistoryForSlot(ctx, publicKeys[i], proposal.Slot)
require.NoError(t, err)
require.DeepEqual(
t,
params.BeaconConfig().ZeroHash,
receivedProposalSigningRoot,
"Imported proposal signing root is different than the empty default",
)
}
receivedHistory, err := validatorDB.ProposalHistoryForPubKey(ctx, publicKeys[i])
require.NoError(t, err)
require.DeepEqual(
t,
make([]*kv.Proposal, 0),
receivedHistory,
"Imported proposal signing root is different than the empty default",
)
}
}

Expand Down Expand Up @@ -150,12 +146,19 @@ func TestStore_ImportInterchangeData_OK(t *testing.T) {
}

proposals := proposalHistory[i].Proposals

receivedProposalHistory, err := validatorDB.ProposalHistoryForPubKey(ctx, publicKeys[i])
require.NoError(t, err)
rootsBySlot := make(map[uint64][]byte)
for _, proposal := range receivedProposalHistory {
rootsBySlot[proposal.Slot] = proposal.SigningRoot
}
for _, proposal := range proposals {
receivedProposalSigningRoot, _, err := validatorDB.ProposalHistoryForSlot(ctx, publicKeys[i], proposal.Slot)
require.NoError(t, err)
receivedRoot, ok := rootsBySlot[proposal.Slot]
require.DeepEqual(t, true, ok)
require.DeepEqual(
t,
receivedProposalSigningRoot[:],
receivedRoot,
proposal.SigningRoot,
"Imported proposals are different then the generated ones",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"testing"

"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
"github.com/prysmaticlabs/prysm/validator/db/kv"
Expand Down Expand Up @@ -168,12 +167,18 @@ func TestImportInterchangeData_OK(t *testing.T) {
}

proposals := proposalHistory[i].Proposals
receivedProposalHistory, err := validatorDB.ProposalHistoryForPubKey(ctx, publicKeys[i])
require.NoError(t, err)
rootsBySlot := make(map[uint64][]byte)
for _, proposal := range receivedProposalHistory {
rootsBySlot[proposal.Slot] = proposal.SigningRoot
}
for _, proposal := range proposals {
receivedProposalSigningRoot, _, err := validatorDB.ProposalHistoryForSlot(ctx, publicKeys[i], proposal.Slot)
require.NoError(t, err)
receivedRoot, ok := rootsBySlot[proposal.Slot]
require.DeepEqual(t, true, ok)
require.DeepEqual(
t,
receivedProposalSigningRoot[:],
receivedRoot,
proposal.SigningRoot,
"Imported proposals are different then the generated ones",
)
Expand Down Expand Up @@ -309,16 +314,13 @@ func TestStore_ImportInterchangeData_BadFormat_PreventsDBWrites(t *testing.T) {
len(receivedAttestingHistory),
"Imported attestation protection history is different than the empty default",
)
proposals := proposalHistory[i].Proposals
for _, proposal := range proposals {
receivedProposalSigningRoot, _, err := validatorDB.ProposalHistoryForSlot(ctx, publicKeys[i], proposal.Slot)
require.NoError(t, err)
require.DeepEqual(
t,
params.BeaconConfig().ZeroHash,
receivedProposalSigningRoot,
"Imported proposal signing root is different than the empty default",
)
}
receivedHistory, err := validatorDB.ProposalHistoryForPubKey(ctx, publicKeys[i])
require.NoError(t, err)
require.DeepEqual(
t,
make([]*kv.Proposal, 0),
receivedHistory,
"Imported proposal signing root is different than the empty default",
)
}
}

0 comments on commit caac08d

Please sign in to comment.