Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Batch Method for Reading Validator Proposing Histories #8378

Merged
merged 8 commits into from
Feb 2, 2021
1 change: 1 addition & 0 deletions validator/db/iface/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type ValidatorDB interface {
// Proposer protection related methods.
HighestSignedProposal(ctx context.Context, publicKey [48]byte) (uint64, bool, error)
LowestSignedProposal(ctx context.Context, publicKey [48]byte) (uint64, bool, error)
ProposalHistoryForPubKey(ctx context.Context, publicKey [48]byte) ([]*kv.Proposal, error)
ProposalHistoryForSlot(ctx context.Context, publicKey [48]byte, slot uint64) ([32]byte, bool, error)
SaveProposalHistoryForSlot(ctx context.Context, pubKey [48]byte, slot uint64, signingRoot []byte) error
ProposedPublicKeys(ctx context.Context) ([][48]byte, error)
Expand Down
26 changes: 26 additions & 0 deletions validator/db/kv/proposer_protection.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,32 @@ func (s *Store) ProposalHistoryForSlot(ctx context.Context, publicKey [48]byte,
return signingRoot, proposalExists, err
}

// ProposalHistoryForPubKey returns the entire proposal history for a given public key.
func (s *Store) ProposalHistoryForPubKey(ctx context.Context, publicKey [48]byte) ([]*Proposal, error) {
ctx, span := trace.StartSpan(ctx, "Validator.ProposalHistoryForPubKey")
defer span.End()

proposals := make([]*Proposal, 0)
err := s.view(func(tx *bolt.Tx) error {
bucket := tx.Bucket(historicProposalsBucket)
valBucket := bucket.Bucket(publicKey[:])
if valBucket == nil {
return nil
}
return valBucket.ForEach(func(slotKey, signingRootBytes []byte) error {
slot := bytesutil.BytesToUint64BigEndian(slotKey)
sr := make([]byte, 32)
copy(sr, signingRootBytes)
proposals = append(proposals, &Proposal{
Slot: slot,
SigningRoot: sr,
})
return nil
})
})
return proposals, err
}

// SaveProposalHistoryForSlot saves the proposal history for the requested validator public key.
// We also check if the incoming proposal slot is lower than the lowest signed proposal slot
// for the validator and override its value on disk.
Expand Down
57 changes: 38 additions & 19 deletions validator/db/kv/proposer_protection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,37 +46,49 @@ func TestSaveProposalHistoryForSlot_OK(t *testing.T) {
require.DeepEqual(t, bytesutil.PadTo([]byte{1}, 32), signingRoot[:], "Expected DB to keep object the same")
}

func TestSaveProposalHistoryForSlot_Empty(t *testing.T) {
func TestNewProposalHistoryForPubKey_ReturnsEmptyIfNoHistory(t *testing.T) {
valPubkey := [48]byte{1, 2, 3}
db := setupDB(t, [][48]byte{})

proposalHistory, err := db.ProposalHistoryForPubKey(context.Background(), valPubkey)
require.NoError(t, err)
assert.DeepEqual(t, make([]*Proposal, 0), proposalHistory)
}

func TestSaveProposalHistoryForPubKey_OK(t *testing.T) {
pubkey := [48]byte{3}
db := setupDB(t, [][48]byte{pubkey})

slot := uint64(2)
emptySlot := uint64(120)
err := db.SaveProposalHistoryForSlot(context.Background(), pubkey, slot, []byte{1})

root := [32]byte{1}
err := db.SaveProposalHistoryForSlot(context.Background(), pubkey, slot, root[:])
require.NoError(t, err, "Saving proposal history failed: %v")
signingRoot, _, err := db.ProposalHistoryForSlot(context.Background(), pubkey, emptySlot)
proposalHistory, err := db.ProposalHistoryForPubKey(context.Background(), pubkey)
require.NoError(t, err, "Failed to get proposal history")

require.NotNil(t, signingRoot)
require.DeepEqual(t, bytesutil.PadTo([]byte{}, 32), signingRoot[:], "Expected DB to keep object the same")
require.NotNil(t, proposalHistory)
want := []*Proposal{
{
Slot: slot,
SigningRoot: root[:],
},
}
require.DeepEqual(t, want[0], proposalHistory[0])
}

func TestSaveProposalHistoryForSlot_Overwrites(t *testing.T) {
pubkey := [48]byte{0}
tests := []struct {
slot uint64
signingRoot []byte
}{
{
slot: uint64(1),
signingRoot: bytesutil.PadTo([]byte{1}, 32),
},
{
slot: uint64(2),
signingRoot: bytesutil.PadTo([]byte{2}, 32),
},
{
slot: uint64(1),
signingRoot: bytesutil.PadTo([]byte{3}, 32),
},
}
Expand All @@ -85,11 +97,11 @@ func TestSaveProposalHistoryForSlot_Overwrites(t *testing.T) {
db := setupDB(t, [][48]byte{pubkey})
err := db.SaveProposalHistoryForSlot(context.Background(), pubkey, 0, tt.signingRoot)
require.NoError(t, err, "Saving proposal history failed")
signingRoot, _, err := db.ProposalHistoryForSlot(context.Background(), pubkey, 0)
proposalHistory, err := db.ProposalHistoryForPubKey(context.Background(), pubkey)
require.NoError(t, err, "Failed to get proposal history")

require.NotNil(t, signingRoot)
require.DeepEqual(t, tt.signingRoot, signingRoot[:], "Expected DB to keep object the same")
require.NotNil(t, proposalHistory)
require.DeepEqual(t, tt.signingRoot, proposalHistory[0].SigningRoot, "Expected DB to keep object the same")
require.NoError(t, db.Close(), "Failed to close database")
}
}
Expand Down Expand Up @@ -143,15 +155,22 @@ func TestPruneProposalHistoryBySlot_OK(t *testing.T) {
require.NoError(t, err, "Saving proposal history failed")
}

signingRootsBySlot := make(map[uint64][]byte)
proposalHistory, err := db.ProposalHistoryForPubKey(context.Background(), pubKey)
require.NoError(t, err)

for _, hist := range proposalHistory {
signingRootsBySlot[hist.Slot] = hist.SigningRoot
}

for _, slot := range tt.removedSlots {
sr, _, err := db.ProposalHistoryForSlot(context.Background(), pubKey, slot)
require.NoError(t, err, "Failed to get proposal history")
require.DeepEqual(t, bytesutil.PadTo([]byte{}, 32), sr[:], "Unexpected difference in bytes for epoch %d", slot)
_, ok := signingRootsBySlot[slot]
require.Equal(t, false, ok)
}
for _, slot := range tt.storedSlots {
sr, _, err := db.ProposalHistoryForSlot(context.Background(), pubKey, slot)
require.NoError(t, err, "Failed to get proposal history")
require.DeepEqual(t, signedRoot, sr[:], "Unexpected difference in bytes for epoch %d", slot)
root, ok := signingRootsBySlot[slot]
require.Equal(t, true, ok)
require.DeepEqual(t, signedRoot, root, "Unexpected difference in bytes for epoch %d", slot)
}
require.NoError(t, db.Close(), "Failed to close database")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
deps = [
"//shared/bytesutil:go_default_library",
"//shared/params:go_default_library",
"//shared/progressutil:go_default_library",
"//shared/slashutil:go_default_library",
"//validator/db:go_default_library",
"//validator/db/kv:go_default_library",
Expand All @@ -36,7 +37,6 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//shared/params:go_default_library",
"//shared/testutil/assert:go_default_library",
"//shared/testutil/require:go_default_library",
"//validator/db/kv:go_default_library",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/progressutil"
"github.com/prysmaticlabs/prysm/validator/db"
"github.com/prysmaticlabs/prysm/validator/slashing-protection/local/standard-protection-format/format"
)
Expand Down Expand Up @@ -40,6 +41,9 @@ func ExportStandardProtectionJSON(ctx context.Context, validatorDB db.Database)
dataByPubKey := make(map[[48]byte]*format.ProtectionData)

// Extract the signed proposals by public key.
progress := progressutil.InitializeProgressBar(
len(proposedPublicKeys), "Extracting signed blocks by validator public key",
)
for _, pubKey := range proposedPublicKeys {
pubKeyHex, err := pubKeyToHexString(pubKey[:])
if err != nil {
Expand All @@ -54,9 +58,15 @@ func ExportStandardProtectionJSON(ctx context.Context, validatorDB db.Database)
SignedBlocks: signedBlocks,
SignedAttestations: nil,
}
if err := progress.Add(1); err != nil {
return nil, err
}
}

// Extract the signed attestations by public key.
progress = progressutil.InitializeProgressBar(
len(proposedPublicKeys), "Extracting signed attestations by validator public key",
)
for _, pubKey := range attestedPublicKeys {
pubKeyHex, err := pubKeyToHexString(pubKey[:])
if err != nil {
Expand All @@ -75,6 +85,9 @@ func ExportStandardProtectionJSON(ctx context.Context, validatorDB db.Database)
SignedAttestations: signedAttestations,
}
}
if err := progress.Add(1); err != nil {
return nil, err
}
}

// Next we turn our map into a slice as expected by the EIP-3076 JSON standard.
Expand Down Expand Up @@ -129,39 +142,23 @@ func signedBlocksByPubKey(ctx context.Context, validatorDB db.Database, pubKey [
// in our database, we return nil. This way, a user will be able to export their
// slashing protection history even if one of their keys does not have a history
// of signed blocks.
lowestSignedSlot, exists, err := validatorDB.LowestSignedProposal(ctx, pubKey)
if err != nil {
return nil, err
}
if !exists {
return nil, nil
}
highestSignedSlot, exists, err := validatorDB.HighestSignedProposal(ctx, pubKey)
proposalHistory, err := validatorDB.ProposalHistoryForPubKey(ctx, pubKey)
if err != nil {
return nil, err
}
if !exists {
return nil, nil
}
signedBlocks := make([]*format.SignedBlock, 0)
for i := lowestSignedSlot; i <= highestSignedSlot; i++ {
for _, proposal := range proposalHistory {
if ctx.Err() != nil {
return nil, ctx.Err()
}
signingRoot, exists, err := validatorDB.ProposalHistoryForSlot(ctx, pubKey, i)
signingRootHex, err := rootToHexString(proposal.SigningRoot)
if err != nil {
return nil, err
}
if exists {
signingRootHex, err := rootToHexString(signingRoot[:])
if err != nil {
return nil, err
}
signedBlocks = append(signedBlocks, &format.SignedBlock{
Slot: fmt.Sprintf("%d", i),
SigningRoot: signingRootHex,
})
}
signedBlocks = append(signedBlocks, &format.SignedBlock{
Slot: fmt.Sprintf("%d", proposal.Slot),
SigningRoot: signingRootHex,
})
}
return signedBlocks, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"testing"

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
"github.com/prysmaticlabs/prysm/validator/db/kv"
Expand Down Expand Up @@ -91,17 +90,14 @@ func TestStore_ImportInterchangeData_BadFormat_PreventsDBWrites(t *testing.T) {
require.NoError(t, err)
require.Equal(t, kv.NotSlashable, slashingKind)
}
proposals := proposalHistory[i].Proposals
for _, proposal := range proposals {
receivedProposalSigningRoot, _, err := validatorDB.ProposalHistoryForSlot(ctx, publicKeys[i], proposal.Slot)
require.NoError(t, err)
require.DeepEqual(
t,
params.BeaconConfig().ZeroHash,
receivedProposalSigningRoot,
"Imported proposal signing root is different than the empty default",
)
}
receivedHistory, err := validatorDB.ProposalHistoryForPubKey(ctx, publicKeys[i])
require.NoError(t, err)
require.DeepEqual(
t,
make([]*kv.Proposal, 0),
receivedHistory,
"Imported proposal signing root is different than the empty default",
)
}
}

Expand Down Expand Up @@ -150,12 +146,19 @@ func TestStore_ImportInterchangeData_OK(t *testing.T) {
}

proposals := proposalHistory[i].Proposals

receivedProposalHistory, err := validatorDB.ProposalHistoryForPubKey(ctx, publicKeys[i])
require.NoError(t, err)
rootsBySlot := make(map[uint64][]byte)
for _, proposal := range receivedProposalHistory {
rootsBySlot[proposal.Slot] = proposal.SigningRoot
}
for _, proposal := range proposals {
receivedProposalSigningRoot, _, err := validatorDB.ProposalHistoryForSlot(ctx, publicKeys[i], proposal.Slot)
require.NoError(t, err)
receivedRoot, ok := rootsBySlot[proposal.Slot]
require.DeepEqual(t, true, ok)
require.DeepEqual(
t,
receivedProposalSigningRoot[:],
receivedRoot,
proposal.SigningRoot,
"Imported proposals are different then the generated ones",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"testing"

"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
"github.com/prysmaticlabs/prysm/validator/db/kv"
Expand Down Expand Up @@ -168,12 +167,18 @@ func TestImportInterchangeData_OK(t *testing.T) {
}

proposals := proposalHistory[i].Proposals
receivedProposalHistory, err := validatorDB.ProposalHistoryForPubKey(ctx, publicKeys[i])
require.NoError(t, err)
rootsBySlot := make(map[uint64][]byte)
for _, proposal := range receivedProposalHistory {
rootsBySlot[proposal.Slot] = proposal.SigningRoot
}
for _, proposal := range proposals {
receivedProposalSigningRoot, _, err := validatorDB.ProposalHistoryForSlot(ctx, publicKeys[i], proposal.Slot)
require.NoError(t, err)
receivedRoot, ok := rootsBySlot[proposal.Slot]
require.DeepEqual(t, true, ok)
require.DeepEqual(
t,
receivedProposalSigningRoot[:],
receivedRoot,
proposal.SigningRoot,
"Imported proposals are different then the generated ones",
)
Expand Down Expand Up @@ -309,16 +314,13 @@ func TestStore_ImportInterchangeData_BadFormat_PreventsDBWrites(t *testing.T) {
len(receivedAttestingHistory),
"Imported attestation protection history is different than the empty default",
)
proposals := proposalHistory[i].Proposals
for _, proposal := range proposals {
receivedProposalSigningRoot, _, err := validatorDB.ProposalHistoryForSlot(ctx, publicKeys[i], proposal.Slot)
require.NoError(t, err)
require.DeepEqual(
t,
params.BeaconConfig().ZeroHash,
receivedProposalSigningRoot,
"Imported proposal signing root is different than the empty default",
)
}
receivedHistory, err := validatorDB.ProposalHistoryForPubKey(ctx, publicKeys[i])
require.NoError(t, err)
require.DeepEqual(
t,
make([]*kv.Proposal, 0),
receivedHistory,
"Imported proposal signing root is different than the empty default",
)
}
}