Skip to content

Commit

Permalink
Begin Dynamic Rescan of Validating Keys (#6963)
Browse files Browse the repository at this point in the history
* begin on dynamic key rescan
* Merge branch 'master' into dynamic-rescan
* begin dynamic rescan
* fsnotify to listen for rescan dir changes
* recheck for slashing protection
* lint
* Merge branch 'master' into dynamic-rescan
* less aggressive recheck interval
* imports
* Merge branch 'dynamic-rescan' of github.com:prysmaticlabs/prysm into dynamic-rescan
* resolve confs
* listen for file changes for accounts file
* reload accounts from keystore
* begin fixing rescan test
* add event feed
* fix confs
* fix conf
* fix broken tests
* Merge branch 'master' into dynamic-rescan
* simplify lines
* do nothing if no subscribers
* Merge branch 'dynamic-rescan' of github.com:prysmaticlabs/prysm into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* fix tests
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* gaz
* Merge branch 'dynamic-rescan' of github.com:prysmaticlabs/prysm into dynamic-rescan
* ident
* Update WORKSPACE
* gaz
* Merge branch 'dynamic-rescan' of github.com:prysmaticlabs/prysm into dynamic-rescan
* add keys on service start
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* ensure debounce util works
* Merge branch 'dynamic-rescan' of github.com:prysmaticlabs/prysm into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* complete refresh, debounce test, and ensure works at runtime
* Merge branch 'dynamic-rescan' of github.com:prysmaticlabs/prysm into dynamic-rescan
* imports and remove log
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* Merge refs/heads/master into dynamic-rescan
* resolve confs
* fix up e2e tests
* Merge branch 'dynamic-rescan' of github.com:prysmaticlabs/prysm into dynamic-rescan
* fix up e2e
* Merge refs/heads/master into dynamic-rescan
  • Loading branch information
rauljordan authored Aug 31, 2020
1 parent 381b5be commit ecbab20
Show file tree
Hide file tree
Showing 22 changed files with 376 additions and 85 deletions.
6 changes: 6 additions & 0 deletions deps.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3614,3 +3614,9 @@ def prysm_deps():
sum = "h1:DP+LD/t0njgoPBvT5MJLeliUIVQR03hiKR6vezdwHlc=",
version = "v0.7.1",
)
go_repository(
name = "com_github_go_fsnotify_fsnotify",
importpath = "github.com/go-fsnotify/fsnotify",
sum = "h1:PeVNzgTRtWGm6fVic5i21t+n5ptPGCZuMcSPVMyTWjs=",
version = "v0.0.0-20180321022601-755488143dae",
)
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/fatih/color v1.9.0 // indirect
github.com/ferranbt/fastssz v0.0.0-20200826142241-3a913c5a1313
github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5
github.com/fsnotify/fsnotify v1.4.7
github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 // indirect
github.com/ghodss/yaml v1.0.0
github.com/go-yaml/yaml v2.1.0+incompatible
Expand Down
15 changes: 15 additions & 0 deletions shared/asyncutil/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")
load("@prysm//tools/go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = ["debounce.go"],
importpath = "github.com/prysmaticlabs/prysm/shared/asyncutil",
visibility = ["//visibility:public"],
)

go_test(
name = "go_default_test",
srcs = ["debounce_test.go"],
embed = [":go_default_library"],
)
29 changes: 29 additions & 0 deletions shared/asyncutil/debounce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package asyncutil

import (
"context"
"time"
)

// Debounce events fired over a channel by a specified duration, ensuring no events
// are handled until a certain interval of time has passed.
func Debounce(ctx context.Context, interval time.Duration, eventsChan <-chan interface{}, handler func(interface{})) {
for event := range eventsChan {
loop:
for {
// If an event is received, wait the specified interval before calling the
// handler.
// If another event is received before the interval has passed, store
// it and reset the timer.
select {
// Do nothing until we can handle the events after the debounce interval.
case event = <-eventsChan:
case <-time.After(interval):
handler(event)
break loop
case <-ctx.Done():
return
}
}
}
}
27 changes: 27 additions & 0 deletions shared/asyncutil/debounce_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package asyncutil

import (
"context"
"testing"
"time"
)

func TestDebounce(t *testing.T) {
eventsChan := make(chan interface{}, 100)
ctx, cancel := context.WithCancel(context.Background())
interval := time.Second
timesHandled := 0
go Debounce(ctx, interval, eventsChan, func(event interface{}) {
timesHandled++
})
for i := 0; i < 100; i++ {
eventsChan <- struct{}{}
}
time.Sleep(interval)
cancel()
// We should expect 100 rapid fire changes to only have caused
// 1 handler to trigger after the debouncing period.
if timesHandled != 1 {
t.Errorf("Expected 1 handler call, received %d", timesHandled)
}
}
1 change: 0 additions & 1 deletion validator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/validator",
visibility = ["//validator:__subpackages__"],
deps = [
"//shared/bytesutil:go_default_library",
"//shared/cmd:go_default_library",
"//shared/debug:go_default_library",
"//shared/featureconfig:go_default_library",
Expand Down
1 change: 1 addition & 0 deletions validator/accounts/v2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
deps = [
"//shared/bls:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/event:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/fileutil:go_default_library",
"//shared/params:go_default_library",
Expand Down
1 change: 1 addition & 0 deletions validator/accounts/v2/cmd_accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ this command outputs a deposit data string which is required to become a validat
"list of hex string public keys",
Flags: []cli.Flag{
flags.WalletDirFlag,
flags.WalletPasswordFileFlag,
flags.BackupDirFlag,
flags.BackupPublicKeysFlag,
flags.BackupPasswordFile,
Expand Down
14 changes: 8 additions & 6 deletions validator/accounts/v2/wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/gofrs/flock"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/fileutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/promptutil"
Expand Down Expand Up @@ -67,12 +68,13 @@ type WalletConfig struct {
// and providing secure access to eth2 secrets depending on an
// associated keymanager (either direct, derived, or remote signing enabled).
type Wallet struct {
walletDir string
accountsPath string
configFilePath string
walletPassword string
walletFileLock *flock.Flock
keymanagerKind v2keymanager.Kind
walletDir string
accountsPath string
configFilePath string
walletPassword string
walletFileLock *flock.Flock
keymanagerKind v2keymanager.Kind
accountsChangedFeed *event.Feed
}

// WalletExists check if a wallet at the specified directory
Expand Down
1 change: 1 addition & 0 deletions validator/client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_library(
"//validator/db:go_default_library",
"//validator/keymanager/v1:go_default_library",
"//validator/keymanager/v2:go_default_library",
"//validator/keymanager/v2/direct:go_default_library",
"//validator/slashing-protection:go_default_library",
"@com_github_dgraph_io_ristretto//:go_default_library",
"@com_github_ferranbt_fastssz//:go_default_library",
Expand Down
47 changes: 44 additions & 3 deletions validator/client/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/prysmaticlabs/prysm/validator/db"
keymanager "github.com/prysmaticlabs/prysm/validator/keymanager/v1"
v2 "github.com/prysmaticlabs/prysm/validator/keymanager/v2"
"github.com/prysmaticlabs/prysm/validator/keymanager/v2/direct"
slashingprotection "github.com/prysmaticlabs/prysm/validator/slashing-protection"
"github.com/sirupsen/logrus"
"go.opencensus.io/plugin/ocgrpc"
Expand Down Expand Up @@ -51,7 +52,6 @@ type ValidatorService struct {
logValidatorBalances bool
emitAccountMetrics bool
maxCallRecvMsgSize int
validatingPubKeys [][48]byte
grpcRetries uint
grpcRetryDelay time.Duration
grpcHeaders []string
Expand All @@ -64,7 +64,6 @@ type Config struct {
DataDir string
CertFlag string
GraffitiFlag string
ValidatingPubKeys [][48]byte
KeyManager keymanager.KeyManager
KeyManagerV2 v2.IKeymanager
LogValidatorBalances bool
Expand All @@ -91,7 +90,6 @@ func NewValidatorService(ctx context.Context, cfg *Config) (*ValidatorService, e
graffiti: []byte(cfg.GraffitiFlag),
keyManager: cfg.KeyManager,
keyManagerV2: cfg.KeyManagerV2,
validatingPubKeys: cfg.ValidatingPubKeys,
logValidatorBalances: cfg.LogValidatorBalances,
emitAccountMetrics: cfg.EmitAccountMetrics,
maxCallRecvMsgSize: cfg.GrpcMaxCallRecvMsgSizeFlag,
Expand Down Expand Up @@ -169,7 +167,26 @@ func (v *ValidatorService) Start() {
protector: v.protector,
voteStats: voteStats{startEpoch: ^uint64(0)},
}
var validatingKeys [][48]byte
go run(v.ctx, v.validator)
if featureconfig.Get().EnableAccountsV2 {
validatingKeys, err = v.keyManagerV2.FetchValidatingPublicKeys(v.ctx)
if err != nil {
log.WithError(err).Debug("Could not fetch validating keys")
}
if err := v.db.UpdatePublicKeysBuckets(validatingKeys); err != nil {
log.WithError(err).Debug("Could not update public keys buckets")
}
go recheckValidatingKeysBucket(v.ctx, v.db, v.keyManagerV2)
} else {
validatingKeys, err = v.keyManager.FetchValidatingKeys()
if err != nil {
log.WithError(err).Debug("Could not fetch validating keys")
}
if err := v.db.UpdatePublicKeysBuckets(validatingKeys); err != nil {
log.WithError(err).Debug("Could not update public keys buckets")
}
}
}

// Stop the validator service.
Expand Down Expand Up @@ -293,6 +310,30 @@ func ConstructDialOptions(
return dialOpts
}

// Reloads the validating keys upon receiving an event over a feed subscription
// to accounts changes in the keymanager, then updates those keys'
// buckets in bolt DB if a bucket for a key does not exist.
func recheckValidatingKeysBucket(ctx context.Context, valDB db.Database, km v2.IKeymanager) {
directKeymanager, ok := km.(*direct.Keymanager)
if !ok {
return
}
validatingPubKeysChan := make(chan [][48]byte, 1)
sub := directKeymanager.SubscribeAccountChanges(validatingPubKeysChan)
defer sub.Unsubscribe()
for {
select {
case keys := <-validatingPubKeysChan:
if err := valDB.UpdatePublicKeysBuckets(keys); err != nil {
log.WithError(err).Debug("Could not update public keys buckets")
continue
}
case <-ctx.Done():
return
}
}
}

// ValidatorBalances returns the validator balances mapping keyed by public keys.
func (v *ValidatorService) ValidatorBalances(ctx context.Context) map[[48]byte]uint64 {
return v.validator.BalancesByPubkeys(ctx)
Expand Down
1 change: 1 addition & 0 deletions validator/db/iface/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type ValidatorDB interface {
io.Closer
DatabasePath() string
ClearDB() error
UpdatePublicKeysBuckets(publicKeys [][48]byte) error
// Proposer protection related methods.
ProposalHistoryForEpoch(ctx context.Context, publicKey []byte, epoch uint64) (bitfield.Bitlist, error)
SaveProposalHistoryForEpoch(ctx context.Context, publicKey []byte, epoch uint64, history bitfield.Bitlist) error
Expand Down
2 changes: 1 addition & 1 deletion validator/db/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func NewKVStore(dirPath string, pubKeys [][48]byte) (*Store, error) {
}

// Initialize the required public keys into the DB to ensure they're not empty.
if err := kv.initializeSubBuckets(pubKeys); err != nil {
if err := kv.UpdatePublicKeysBuckets(pubKeys); err != nil {
return nil, err
}

Expand Down
25 changes: 13 additions & 12 deletions validator/db/kv/proposal_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,19 @@ func (store *Store) SaveProposalHistoryForEpoch(ctx context.Context, pubKey []by
return err
}

// UpdatePublicKeysBuckets for a specified list of keys.
func (store *Store) UpdatePublicKeysBuckets(pubKeys [][48]byte) error {
return store.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(historicProposalsBucket)
for _, pubKey := range pubKeys {
if _, err := bucket.CreateBucketIfNotExists(pubKey[:]); err != nil {
return errors.Wrap(err, "failed to create proposal history bucket")
}
}
return nil
})
}

func pruneProposalHistory(valBucket *bolt.Bucket, newestEpoch uint64) error {
c := valBucket.Cursor()
for k, _ := c.First(); k != nil; k, _ = c.First() {
Expand All @@ -77,15 +90,3 @@ func pruneProposalHistory(valBucket *bolt.Bucket, newestEpoch uint64) error {
}
return nil
}

func (store *Store) initializeSubBuckets(pubKeys [][48]byte) error {
return store.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(historicProposalsBucket)
for _, pubKey := range pubKeys {
if _, err := bucket.CreateBucketIfNotExists(pubKey[:]); err != nil {
return errors.Wrap(err, "failed to create proposal history bucket")
}
}
return nil
})
}
7 changes: 7 additions & 0 deletions validator/keymanager/v2/direct/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"direct.go",
"doc.go",
"import.go",
"refresh.go",
],
importpath = "github.com/prysmaticlabs/prysm/validator/keymanager/v2/direct",
visibility = [
Expand All @@ -17,15 +18,19 @@ go_library(
deps = [
"//beacon-chain/core/helpers:go_default_library",
"//proto/validator/accounts/v2:go_default_library",
"//shared/asyncutil:go_default_library",
"//shared/bls:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/depositutil:go_default_library",
"//shared/event:go_default_library",
"//shared/fileutil:go_default_library",
"//shared/interop:go_default_library",
"//shared/params:go_default_library",
"//shared/petnames:go_default_library",
"//shared/promptutil:go_default_library",
"//validator/accounts/v2/iface:go_default_library",
"//validator/keymanager/v2:go_default_library",
"@com_github_fsnotify_fsnotify//:go_default_library",
"@com_github_google_uuid//:go_default_library",
"@com_github_k0kubun_go_ansi//:go_default_library",
"@com_github_logrusorgru_aurora//:go_default_library",
Expand All @@ -42,12 +47,14 @@ go_test(
"backup_test.go",
"direct_test.go",
"import_test.go",
"refresh_test.go",
],
embed = [":go_default_library"],
deps = [
"//proto/validator/accounts/v2:go_default_library",
"//shared/bls:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/event:go_default_library",
"//shared/testutil/assert:go_default_library",
"//shared/testutil/require:go_default_library",
"//validator/accounts/v2/testing:go_default_library",
Expand Down
Loading

0 comments on commit ecbab20

Please sign in to comment.