Skip to content

Commit

Permalink
Dynamic key reload for remote keymanager (#8611)
Browse files Browse the repository at this point in the history
* Dynamic key reload for remote keymanager

* fix failing keymanager test

* keymanager tests

* define RemoteKeymanager interface

* WaitForActivation tests

* gzl

* handle error in test
  • Loading branch information
rkapka authored Mar 16, 2021
1 parent 9fea981 commit 1f8171d
Show file tree
Hide file tree
Showing 14 changed files with 355 additions and 68 deletions.
3 changes: 3 additions & 0 deletions validator/client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ go_library(
"//validator/graffiti:go_default_library",
"//validator/keymanager:go_default_library",
"//validator/keymanager/imported:go_default_library",
"//validator/keymanager/remote:go_default_library",
"//validator/slashing-protection/iface:go_default_library",
"@com_github_dgraph_io_ristretto//:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
Expand Down Expand Up @@ -107,6 +108,7 @@ go_test(
"//shared/mock:go_default_library",
"//shared/params:go_default_library",
"//shared/slotutil:go_default_library",
"//shared/slotutil/testing:go_default_library",
"//shared/testutil:go_default_library",
"//shared/testutil/assert:go_default_library",
"//shared/testutil/require:go_default_library",
Expand All @@ -117,6 +119,7 @@ go_test(
"//validator/db/testing:go_default_library",
"//validator/graffiti:go_default_library",
"//validator/keymanager/derived:go_default_library",
"//validator/keymanager/remote:go_default_library",
"//validator/slashing-protection/local/standard-protection-format:go_default_library",
"//validator/testing:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",
Expand Down
23 changes: 3 additions & 20 deletions validator/client/key_reload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (

"github.com/golang/mock/gomock"
"github.com/pkg/errors"
types "github.com/prysmaticlabs/eth2-types"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bls"
"github.com/prysmaticlabs/prysm/shared/mock"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
"github.com/prysmaticlabs/prysm/validator/client/testutil"
logTest "github.com/sirupsen/logrus/hooks/test"
)

Expand Down Expand Up @@ -42,7 +42,7 @@ func TestValidator_HandleKeyReload(t *testing.T) {
genesisTime: 1,
}

resp := generateResponse([][]byte{inactivePubKey[:], activePubKey[:]})
resp := testutil.GenerateMultipleValidatorStatusResponse([][]byte{inactivePubKey[:], activePubKey[:]})
resp.Statuses[0].Status = ethpb.ValidatorStatus_UNKNOWN_STATUS
resp.Statuses[1].Status = ethpb.ValidatorStatus_ACTIVE
client.EXPECT().MultipleValidatorStatus(
Expand Down Expand Up @@ -78,7 +78,7 @@ func TestValidator_HandleKeyReload(t *testing.T) {
genesisTime: 1,
}

resp := generateResponse([][]byte{inactivePubKey[:]})
resp := testutil.GenerateMultipleValidatorStatusResponse([][]byte{inactivePubKey[:]})
resp.Statuses[0].Status = ethpb.ValidatorStatus_UNKNOWN_STATUS
client.EXPECT().MultipleValidatorStatus(
gomock.Any(),
Expand Down Expand Up @@ -122,20 +122,3 @@ func TestValidator_HandleKeyReload(t *testing.T) {
assert.ErrorContains(t, "error", err)
})
}

func generateResponse(pubkeys [][]byte) *ethpb.MultipleValidatorStatusResponse {
resp := &ethpb.MultipleValidatorStatusResponse{
PublicKeys: make([][]byte, len(pubkeys)),
Statuses: make([]*ethpb.ValidatorStatusResponse, len(pubkeys)),
Indices: make([]types.ValidatorIndex, len(pubkeys)),
}
for i, key := range pubkeys {
resp.PublicKeys[i] = key
resp.Statuses[i] = &ethpb.ValidatorStatusResponse{
Status: ethpb.ValidatorStatus_UNKNOWN_STATUS,
}
resp.Indices[i] = types.ValidatorIndex(i)
}

return resp
}
2 changes: 1 addition & 1 deletion validator/client/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func run(ctx context.Context, v iface.Validator) {
handleAssignmentError(err, headSlot)
}

accountsChangedChan := make(chan [][48]byte)
accountsChangedChan := make(chan [][48]byte, 1)
sub := v.GetKeymanager().SubscribeAccountChanges(accountsChangedChan)
for {
slotCtx, cancel := context.WithCancel(ctx)
Expand Down
24 changes: 23 additions & 1 deletion validator/client/testutil/helper.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,28 @@
package testutil

import "github.com/prysmaticlabs/prysm/shared/bytesutil"
import (
types "github.com/prysmaticlabs/eth2-types"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
)

// ActiveKey represents a public key whose status is ACTIVE.
var ActiveKey = bytesutil.ToBytes48([]byte("active"))

// GenerateMultipleValidatorStatusResponse prepares a response from the passed in keys.
func GenerateMultipleValidatorStatusResponse(pubkeys [][]byte) *ethpb.MultipleValidatorStatusResponse {
resp := &ethpb.MultipleValidatorStatusResponse{
PublicKeys: make([][]byte, len(pubkeys)),
Statuses: make([]*ethpb.ValidatorStatusResponse, len(pubkeys)),
Indices: make([]types.ValidatorIndex, len(pubkeys)),
}
for i, key := range pubkeys {
resp.PublicKeys[i] = key
resp.Statuses[i] = &ethpb.ValidatorStatusResponse{
Status: ethpb.ValidatorStatus_UNKNOWN_STATUS,
}
resp.Indices[i] = types.ValidatorIndex(i)
}

return resp
}
2 changes: 1 addition & 1 deletion validator/client/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type validator struct {
highestValidSlot types.Slot
domainDataCache *ristretto.Cache
aggregatedSlotCommitteeIDCache *lru.Cache
ticker *slotutil.SlotTicker
ticker slotutil.Ticker
prevBalance map[[48]byte]uint64
duties *ethpb.DutiesResponse
startBalances map[[48]byte]uint64
Expand Down
101 changes: 71 additions & 30 deletions validator/client/wait_for_activation.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/slotutil"
"github.com/prysmaticlabs/prysm/shared/traceutil"
"github.com/prysmaticlabs/prysm/validator/keymanager/remote"
"go.opencensus.io/trace"
)

Expand All @@ -24,7 +25,7 @@ import (
func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan chan [][48]byte) error {
// Monitor the key manager for updates.
if accountsChangedChan == nil {
accountsChangedChan = make(chan [][48]byte)
accountsChangedChan = make(chan [][48]byte, 1)
sub := v.GetKeymanager().SubscribeAccountChanges(accountsChangedChan)
defer func() {
sub.Unsubscribe()
Expand Down Expand Up @@ -87,47 +88,87 @@ func (v *validator) waitForActivation(ctx context.Context, accountsChangedChan <
time.Sleep(time.Second * time.Duration(mathutil.Min(uint64(attempts), 60)))
return v.waitForActivation(incrementRetries(ctx), accountsChangedChan)
}
for {
select {
case <-accountsChangedChan:
// Accounts (keys) changed, restart the process.
return v.waitForActivation(ctx, accountsChangedChan)
default:
res, err := stream.Recv()
// If the stream is closed, we stop the loop.
if errors.Is(err, io.EOF) {
break
}
// If context is canceled we return from the function.

remoteKm, ok := v.keyManager.(remote.RemoteKeymanager)
if ok {
for range v.NextSlot() {
if ctx.Err() == context.Canceled {
return errors.Wrap(ctx.Err(), "context has been canceled so shutting down the loop")
return errors.Wrap(ctx.Err(), "context canceled, not waiting for activation anymore")
}

validatingKeys, err = remoteKm.ReloadPublicKeys(ctx)
if err != nil {
traceutil.AnnotateError(span, err)
attempts := streamAttempts(ctx)
log.WithError(err).WithField("attempts", attempts).
Error("Stream broken while waiting for activation. Reconnecting...")
// Reconnection attempt backoff, up to 60s.
time.Sleep(time.Second * time.Duration(mathutil.Min(uint64(attempts), 60)))
return v.waitForActivation(incrementRetries(ctx), accountsChangedChan)
return errors.Wrap(err, msgCouldNotFetchKeys)
}

statuses := make([]*validatorStatus, len(res.Statuses))
for i, s := range res.Statuses {
statusRequestKeys := make([][]byte, len(validatingKeys))
for i := range validatingKeys {
statusRequestKeys[i] = validatingKeys[i][:]
}
resp, err := v.validatorClient.MultipleValidatorStatus(ctx, &ethpb.MultipleValidatorStatusRequest{
PublicKeys: statusRequestKeys,
})
if err != nil {
return err
}
statuses := make([]*validatorStatus, len(resp.Statuses))
for i, s := range resp.Statuses {
statuses[i] = &validatorStatus{
publicKey: s.PublicKey,
status: s.Status,
index: s.Index,
publicKey: resp.PublicKeys[i],
status: s,
index: resp.Indices[i],
}
}

valActivated := v.checkAndLogValidatorStatus(statuses)
if valActivated {
logActiveValidatorStatus(statuses)
} else {
continue
break
}
}
} else {
for {
select {
case <-accountsChangedChan:
// Accounts (keys) changed, restart the process.
return v.waitForActivation(ctx, accountsChangedChan)
default:
res, err := stream.Recv()
// If the stream is closed, we stop the loop.
if errors.Is(err, io.EOF) {
break
}
// If context is canceled we return from the function.
if ctx.Err() == context.Canceled {
return errors.Wrap(ctx.Err(), "context has been canceled so shutting down the loop")
}
if err != nil {
traceutil.AnnotateError(span, err)
attempts := streamAttempts(ctx)
log.WithError(err).WithField("attempts", attempts).
Error("Stream broken while waiting for activation. Reconnecting...")
// Reconnection attempt backoff, up to 60s.
time.Sleep(time.Second * time.Duration(mathutil.Min(uint64(attempts), 60)))
return v.waitForActivation(incrementRetries(ctx), accountsChangedChan)
}

statuses := make([]*validatorStatus, len(res.Statuses))
for i, s := range res.Statuses {
statuses[i] = &validatorStatus{
publicKey: s.PublicKey,
status: s.Status,
index: s.Index,
}
}

valActivated := v.checkAndLogValidatorStatus(statuses)
if valActivated {
logActiveValidatorStatus(statuses)
} else {
continue
}
}
break
}
break
}

v.ticker = slotutil.NewSlotTicker(time.Unix(int64(v.genesisTime), 0), params.BeaconConfig().SecondsPerSlot)
Expand Down
81 changes: 81 additions & 0 deletions validator/client/wait_for_activation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@ import (

"github.com/golang/mock/gomock"
"github.com/pkg/errors"
types "github.com/prysmaticlabs/eth2-types"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bls"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/mock"
slotutilmock "github.com/prysmaticlabs/prysm/shared/slotutil/testing"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
walletMock "github.com/prysmaticlabs/prysm/validator/accounts/testing"
"github.com/prysmaticlabs/prysm/validator/client/testutil"
"github.com/prysmaticlabs/prysm/validator/keymanager/derived"
"github.com/prysmaticlabs/prysm/validator/keymanager/remote"
constant "github.com/prysmaticlabs/prysm/validator/testing"
logTest "github.com/sirupsen/logrus/hooks/test"
"github.com/tyler-smith/go-bip39"
Expand Down Expand Up @@ -378,3 +383,79 @@ func TestWaitForActivation_AccountsChanged(t *testing.T) {
assert.LogsContain(t, hook, "Validator activated")
})
}

func TestWaitForActivation_RemoteKeymanager(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

client := mock.NewMockBeaconNodeValidatorClient(ctrl)
stream := mock.NewMockBeaconNodeValidator_WaitForActivationClient(ctrl)
client.EXPECT().WaitForActivation(
gomock.Any(),
gomock.Any(),
).Return(stream, nil /* err */).AnyTimes()

inactiveKey := bytesutil.ToBytes48([]byte("inactive"))
activeKey := bytesutil.ToBytes48([]byte("active"))
km := &remote.MockKeymanager{
PublicKeys: [][48]byte{inactiveKey, activeKey},
}
slot := types.Slot(0)

t.Run("activated", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
hook := logTest.NewGlobal()

tickerChan := make(chan types.Slot)
ticker := &slotutilmock.MockTicker{
Channel: tickerChan,
}
v := validator{
validatorClient: client,
keyManager: km,
ticker: ticker,
}
go func() {
tickerChan <- slot
// Cancel after timeout to avoid waiting on channel forever in case test goes wrong.
time.Sleep(time.Second)
cancel()
}()

resp := testutil.GenerateMultipleValidatorStatusResponse([][]byte{inactiveKey[:], activeKey[:]})
resp.Statuses[0].Status = ethpb.ValidatorStatus_UNKNOWN_STATUS
resp.Statuses[1].Status = ethpb.ValidatorStatus_ACTIVE
client.EXPECT().MultipleValidatorStatus(
gomock.Any(),
&ethpb.MultipleValidatorStatusRequest{
PublicKeys: [][]byte{inactiveKey[:], activeKey[:]},
},
).Return(resp, nil /* err */)

err := v.waitForActivation(ctx, nil /* accountsChangedChan */)
require.NoError(t, err)
assert.LogsContain(t, hook, "Waiting for deposit to be observed by beacon node")
assert.LogsContain(t, hook, "Validator activated")
})

t.Run("cancelled", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

tickerChan := make(chan types.Slot)
ticker := &slotutilmock.MockTicker{
Channel: tickerChan,
}
v := validator{
validatorClient: client,
keyManager: km,
ticker: ticker,
}
go func() {
cancel()
tickerChan <- slot
}()

err := v.waitForActivation(ctx, nil /* accountsChangedChan */)
assert.ErrorContains(t, "context canceled, not waiting for activation anymore", err)
})
}
5 changes: 4 additions & 1 deletion validator/keymanager/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ load("@prysm//tools/go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = ["types.go"],
srcs = [
"constants.go",
"types.go",
],
importpath = "github.com/prysmaticlabs/prysm/validator/keymanager",
visibility = [
"//tools/keystores:__pkg__",
Expand Down
3 changes: 3 additions & 0 deletions validator/keymanager/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package keymanager

const KeysReloaded = "Reloaded validator keys into keymanager"
3 changes: 2 additions & 1 deletion validator/keymanager/imported/refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/fileutil"
"github.com/prysmaticlabs/prysm/validator/keymanager"
keystorev4 "github.com/wealdtech/go-eth2-wallet-encryptor-keystorev4"
)

Expand Down Expand Up @@ -118,7 +119,7 @@ func (km *Keymanager) reloadAccountsFromKeystore(keystore *AccountsKeystoreRepre
if err := km.initializeKeysCachesFromKeystore(); err != nil {
return err
}
log.Info("Reloaded validator keys into keymanager")
log.Info(keymanager.KeysReloaded)
km.accountsChangedFeed.Send(pubKeys)
return nil
}
Loading

0 comments on commit 1f8171d

Please sign in to comment.