-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dynamic key reload for remote keymanager #8611
Changes from all commits
11ac537
f53b629
1d1d450
f071533
6a516e9
a77e147
dcca756
c09b0b3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,28 @@ | ||
package testutil | ||
|
||
import "github.com/prysmaticlabs/prysm/shared/bytesutil" | ||
import ( | ||
types "github.com/prysmaticlabs/eth2-types" | ||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" | ||
"github.com/prysmaticlabs/prysm/shared/bytesutil" | ||
) | ||
|
||
// ActiveKey represents a public key whose status is ACTIVE. | ||
var ActiveKey = bytesutil.ToBytes48([]byte("active")) | ||
|
||
// GenerateMultipleValidatorStatusResponse prepares a response from the passed in keys. | ||
func GenerateMultipleValidatorStatusResponse(pubkeys [][]byte) *ethpb.MultipleValidatorStatusResponse { | ||
resp := ðpb.MultipleValidatorStatusResponse{ | ||
PublicKeys: make([][]byte, len(pubkeys)), | ||
Statuses: make([]*ethpb.ValidatorStatusResponse, len(pubkeys)), | ||
Indices: make([]types.ValidatorIndex, len(pubkeys)), | ||
} | ||
for i, key := range pubkeys { | ||
resp.PublicKeys[i] = key | ||
resp.Statuses[i] = ðpb.ValidatorStatusResponse{ | ||
Status: ethpb.ValidatorStatus_UNKNOWN_STATUS, | ||
} | ||
resp.Indices[i] = types.ValidatorIndex(i) | ||
} | ||
|
||
return resp | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,7 @@ import ( | |
"github.com/prysmaticlabs/prysm/shared/params" | ||
"github.com/prysmaticlabs/prysm/shared/slotutil" | ||
"github.com/prysmaticlabs/prysm/shared/traceutil" | ||
"github.com/prysmaticlabs/prysm/validator/keymanager/remote" | ||
"go.opencensus.io/trace" | ||
) | ||
|
||
|
@@ -24,7 +25,7 @@ import ( | |
func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan chan [][48]byte) error { | ||
// Monitor the key manager for updates. | ||
if accountsChangedChan == nil { | ||
accountsChangedChan = make(chan [][48]byte) | ||
accountsChangedChan = make(chan [][48]byte, 1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change is required to prevent a deadlock. |
||
sub := v.GetKeymanager().SubscribeAccountChanges(accountsChangedChan) | ||
defer func() { | ||
sub.Unsubscribe() | ||
|
@@ -87,47 +88,87 @@ func (v *validator) waitForActivation(ctx context.Context, accountsChangedChan < | |
time.Sleep(time.Second * time.Duration(mathutil.Min(uint64(attempts), 60))) | ||
return v.waitForActivation(incrementRetries(ctx), accountsChangedChan) | ||
} | ||
for { | ||
select { | ||
case <-accountsChangedChan: | ||
// Accounts (keys) changed, restart the process. | ||
return v.waitForActivation(ctx, accountsChangedChan) | ||
default: | ||
res, err := stream.Recv() | ||
// If the stream is closed, we stop the loop. | ||
if errors.Is(err, io.EOF) { | ||
break | ||
} | ||
// If context is canceled we return from the function. | ||
|
||
remoteKm, ok := v.keyManager.(remote.RemoteKeymanager) | ||
if ok { | ||
Comment on lines
+92
to
+93
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The diff here looks like a mess, but I basically moved all existing code into the |
||
for range v.NextSlot() { | ||
if ctx.Err() == context.Canceled { | ||
return errors.Wrap(ctx.Err(), "context has been canceled so shutting down the loop") | ||
return errors.Wrap(ctx.Err(), "context canceled, not waiting for activation anymore") | ||
} | ||
|
||
validatingKeys, err = remoteKm.ReloadPublicKeys(ctx) | ||
if err != nil { | ||
traceutil.AnnotateError(span, err) | ||
attempts := streamAttempts(ctx) | ||
log.WithError(err).WithField("attempts", attempts). | ||
Error("Stream broken while waiting for activation. Reconnecting...") | ||
// Reconnection attempt backoff, up to 60s. | ||
time.Sleep(time.Second * time.Duration(mathutil.Min(uint64(attempts), 60))) | ||
return v.waitForActivation(incrementRetries(ctx), accountsChangedChan) | ||
return errors.Wrap(err, msgCouldNotFetchKeys) | ||
} | ||
|
||
statuses := make([]*validatorStatus, len(res.Statuses)) | ||
for i, s := range res.Statuses { | ||
statusRequestKeys := make([][]byte, len(validatingKeys)) | ||
for i := range validatingKeys { | ||
statusRequestKeys[i] = validatingKeys[i][:] | ||
} | ||
resp, err := v.validatorClient.MultipleValidatorStatus(ctx, ðpb.MultipleValidatorStatusRequest{ | ||
PublicKeys: statusRequestKeys, | ||
}) | ||
if err != nil { | ||
return err | ||
} | ||
statuses := make([]*validatorStatus, len(resp.Statuses)) | ||
for i, s := range resp.Statuses { | ||
statuses[i] = &validatorStatus{ | ||
publicKey: s.PublicKey, | ||
status: s.Status, | ||
index: s.Index, | ||
publicKey: resp.PublicKeys[i], | ||
status: s, | ||
index: resp.Indices[i], | ||
} | ||
} | ||
|
||
valActivated := v.checkAndLogValidatorStatus(statuses) | ||
if valActivated { | ||
logActiveValidatorStatus(statuses) | ||
} else { | ||
continue | ||
break | ||
} | ||
} | ||
} else { | ||
for { | ||
select { | ||
case <-accountsChangedChan: | ||
// Accounts (keys) changed, restart the process. | ||
return v.waitForActivation(ctx, accountsChangedChan) | ||
default: | ||
res, err := stream.Recv() | ||
// If the stream is closed, we stop the loop. | ||
if errors.Is(err, io.EOF) { | ||
break | ||
} | ||
// If context is canceled we return from the function. | ||
if ctx.Err() == context.Canceled { | ||
return errors.Wrap(ctx.Err(), "context has been canceled so shutting down the loop") | ||
} | ||
if err != nil { | ||
traceutil.AnnotateError(span, err) | ||
attempts := streamAttempts(ctx) | ||
log.WithError(err).WithField("attempts", attempts). | ||
Error("Stream broken while waiting for activation. Reconnecting...") | ||
// Reconnection attempt backoff, up to 60s. | ||
time.Sleep(time.Second * time.Duration(mathutil.Min(uint64(attempts), 60))) | ||
return v.waitForActivation(incrementRetries(ctx), accountsChangedChan) | ||
} | ||
|
||
statuses := make([]*validatorStatus, len(res.Statuses)) | ||
for i, s := range res.Statuses { | ||
statuses[i] = &validatorStatus{ | ||
publicKey: s.PublicKey, | ||
status: s.Status, | ||
index: s.Index, | ||
} | ||
} | ||
|
||
valActivated := v.checkAndLogValidatorStatus(statuses) | ||
if valActivated { | ||
logActiveValidatorStatus(statuses) | ||
} else { | ||
continue | ||
} | ||
} | ||
break | ||
} | ||
break | ||
} | ||
|
||
v.ticker = slotutil.NewSlotTicker(time.Unix(int64(v.genesisTime), 0), params.BeaconConfig().SecondsPerSlot) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
package keymanager | ||
|
||
const KeysReloaded = "Reloaded validator keys into keymanager" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is required to prevent a deadlock.