Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EFM] Recoverable Random Beacon State Machine follow up updates #6815

Open
wants to merge 11 commits into
base: feature/efm-recovery
Choose a base branch
from
Open
8 changes: 6 additions & 2 deletions cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func main() {
return fmt.Errorf("could not load beacon key file: %w", err)
}

rootEpoch := node.State.AtBlockID(node.FinalizedRootBlock.ID()).Epochs().Current()
rootEpoch := rootSnapshot.Epochs().Current()
epochCounter, err := rootEpoch.Counter()
if err != nil {
return fmt.Errorf("could not get root epoch counter: %w", err)
Expand Down Expand Up @@ -346,7 +346,11 @@ func main() {
// perform this only if state machine is in initial state
if !started {
// store my beacon key for the first epoch post-spork
err = myBeaconKeyStateMachine.UpsertMyBeaconPrivateKey(epochCounter, beaconPrivateKey.PrivateKey)
epochProtocolState, err := rootSnapshot.EpochProtocolState()
if err != nil {
return fmt.Errorf("could not get epoch protocol state for root snapshot: %w", err)
}
err = myBeaconKeyStateMachine.UpsertMyBeaconPrivateKey(epochCounter, beaconPrivateKey.PrivateKey, epochProtocolState.EpochCommit())
if err != nil {
return fmt.Errorf("could not upsert my beacon private key for root epoch %d: %w", epochCounter, err)
}
Expand Down
11 changes: 9 additions & 2 deletions engine/consensus/dkg/reactor_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,13 @@ func (e *ReactorEngine) handleEpochCommittedPhaseStarted(currentEpochCounter uin
log.Warn().Msgf("checking beacon key consistency: exiting because dkg didn't reach completed state: %s", currentState.String())
return
}
snapshot := e.State.AtBlockID(firstBlock.ID())

// Since epoch phase transitions are emitted when the first block of the new
// phase is finalized, the block's snapshot is guaranteed to already be
// accessible in the protocol state at this point (even though the Badger
// transaction finalizing the block has not been committed yet).
nextDKG, err := e.State.AtBlockID(firstBlock.ID()).Epochs().Next().DKG()
nextDKG, err := snapshot.Epochs().Next().DKG()
if err != nil {
// CAUTION: this should never happen, indicates a storage failure or corruption
// TODO use irrecoverable context
Expand Down Expand Up @@ -339,7 +340,13 @@ func (e *ReactorEngine) handleEpochCommittedPhaseStarted(currentEpochCounter uin
return
}

err = e.dkgState.SetDKGState(nextEpochCounter, flow.RandomBeaconKeyCommitted)
epochProtocolState, err := snapshot.EpochProtocolState()
if err != nil {
// TODO use irrecoverable context
log.Fatal().Err(err).Msg("failed to retrieve epoch protocol state")
return
}
err = e.dkgState.CommitMyBeaconPrivateKey(nextEpochCounter, epochProtocolState.Entry().NextEpochCommit)
if err != nil {
// TODO use irrecoverable context
e.log.Fatal().Err(err).Msg("failed to set dkg current state")
Expand Down
34 changes: 25 additions & 9 deletions engine/consensus/dkg/reactor_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,11 +378,18 @@ func (suite *ReactorEngineSuite_CommittedPhase) SetupTest() {
// * set the DKG end state to Success
func (suite *ReactorEngineSuite_CommittedPhase) TestDKGSuccess() {

// no change to suite - this is the happy path

entry := unittest.EpochStateFixture(unittest.WithNextEpochProtocolState(), func(entry *flow.RichEpochStateEntry) {
entry.NextEpochCommit.Counter = suite.NextEpochCounter()
entry.NextEpoch.CommitID = entry.NextEpochCommit.ID()
})
epochProtocolState := protocol.NewEpochProtocolState(suite.T())
epochProtocolState.On("Entry").Return(entry)
suite.snap.On("EpochProtocolState").Return(epochProtocolState, nil)
suite.dkgState.On("CommitMyBeaconPrivateKey", suite.NextEpochCounter(), entry.NextEpochCommit).Return(nil).Once()
suite.engine.EpochCommittedPhaseStarted(suite.epochCounter, suite.firstBlock)
suite.Require().Equal(0, suite.warnsLogged)
suite.Assert().Equal(flow.RandomBeaconKeyCommitted, suite.DKGState)
// ensure we commit my beacon private key
suite.dkgState.AssertCalled(suite.T(), "CommitMyBeaconPrivateKey", suite.NextEpochCounter(), entry.NextEpochCommit)
}

// TestInconsistentKey tests the path where we are checking the global DKG
Expand Down Expand Up @@ -438,7 +445,16 @@ func (suite *ReactorEngineSuite_CommittedPhase) TestStartupInCommittedPhase_DKGS
suite.snap.On("EpochPhase").Return(flow.EpochPhaseCommitted, nil).Once()
// the dkg for this epoch has been started but not ended
suite.dkgState.On("IsDKGStarted", suite.NextEpochCounter()).Return(true, nil).Once()
suite.dkgState.On("GetDKGState", suite.NextEpochCounter()).Return(flow.DKGStateUninitialized, storerr.ErrNotFound).Once()
suite.DKGState = flow.DKGStateCompleted

entry := unittest.EpochStateFixture(unittest.WithNextEpochProtocolState(), func(entry *flow.RichEpochStateEntry) {
entry.NextEpochCommit.Counter = suite.NextEpochCounter()
entry.NextEpoch.CommitID = entry.NextEpochCommit.ID()
})
epochProtocolState := protocol.NewEpochProtocolState(suite.T())
epochProtocolState.On("Entry").Return(entry)
suite.snap.On("EpochProtocolState").Return(epochProtocolState, nil)
suite.dkgState.On("CommitMyBeaconPrivateKey", suite.NextEpochCounter(), entry.NextEpochCommit).Return(nil).Once()

// start up the engine
unittest.AssertClosesBefore(suite.T(), suite.engine.Ready(), time.Second)
Expand All @@ -449,22 +465,22 @@ func (suite *ReactorEngineSuite_CommittedPhase) TestStartupInCommittedPhase_DKGS
mock.Anything,
mock.Anything,
)
// should set DKG end state
suite.Assert().Equal(flow.RandomBeaconKeyCommitted, suite.DKGState)
// ensure we commit my beacon private key
suite.dkgState.AssertCalled(suite.T(), "CommitMyBeaconPrivateKey", suite.NextEpochCounter(), entry.NextEpochCommit)
}

// TestStartupInCommittedPhase_DKGSuccess tests that the dkg end state is correctly
// set when starting in EpochCommitted phase and the DKG end state is already set.
func (suite *ReactorEngineSuite_CommittedPhase) TestStartupInCommittedPhase_DKGStateAlreadySet() {

// we are in the EpochSetup phase
// we are in the Epoch Commit phase
suite.snap.On("EpochPhase").Return(flow.EpochPhaseCommitted, nil).Once()
// the dkg for this epoch has been started and ended
suite.dkgState.On("IsDKGStarted", suite.NextEpochCounter()).Return(true, nil).Once()
suite.dkgState.On("GetDKGState", suite.NextEpochCounter()).Return(flow.DKGStateFailure, nil).Once()
suite.DKGState = flow.DKGStateFailure

// start up the engine
unittest.AssertClosesBefore(suite.T(), suite.engine.Ready(), time.Second)
unittest.AssertClosesBefore(suite.T(), suite.engine.Ready(), 100*time.Second)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really not starting up within a second? 100s seems like a lot.. Maybe 5s or 10s?


// we should not have instantiated the DKG
suite.factory.AssertNotCalled(suite.T(), "Create",
Expand Down
2 changes: 1 addition & 1 deletion module/dkg/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (b *BeaconKeyRecovery) recoverMyBeaconPrivateKey(final protocol.Snapshot) e
return fmt.Errorf("could not get beacon key share for my node(%x): %w", b.local.NodeID(), err)
}
if beaconPubKey.Equals(myBeaconPrivateKey.PublicKey()) {
err := b.localDKGState.UpsertMyBeaconPrivateKey(nextEpochCounter, myBeaconPrivateKey)
err := b.localDKGState.UpsertMyBeaconPrivateKey(nextEpochCounter, myBeaconPrivateKey, epochProtocolState.Entry().NextEpochCommit)
if err != nil {
return fmt.Errorf("could not overwrite my beacon private key for the next epoch: %w", err)
}
Expand Down
11 changes: 9 additions & 2 deletions module/dkg/recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type BeaconKeyRecoverySuite struct {
currentEpochCounter uint64
nextEpochCounter uint64
currentEpochPhase flow.EpochPhase
nextEpochCommit *flow.EpochCommit
}

func (s *BeaconKeyRecoverySuite) SetupTest() {
Expand All @@ -53,10 +54,16 @@ func (s *BeaconKeyRecoverySuite) SetupTest() {
s.currentEpochPhase = flow.EpochPhaseCommitted
s.currentEpochCounter = uint64(0)
s.nextEpochCounter = uint64(1)
entry := unittest.EpochStateFixture(unittest.WithNextEpochProtocolState(), func(entry *flow.RichEpochStateEntry) {
entry.NextEpochCommit.Counter = s.nextEpochCounter
entry.NextEpoch.CommitID = entry.NextEpochCommit.ID()
})
s.nextEpochCommit = entry.NextEpochCommit

s.local.On("NodeID").Return(unittest.IdentifierFixture()).Maybe()
s.epochProtocolState.On("Epoch").Return(s.currentEpochCounter).Maybe()
s.epochProtocolState.On("EpochPhase").Return(func() flow.EpochPhase { return s.currentEpochPhase }).Maybe()
s.epochProtocolState.On("Entry").Return(entry, nil).Maybe()
s.nextEpoch.On("Counter").Return(s.nextEpochCounter, nil).Maybe()

epochs := mockprotocol.NewEpochQuery(s.T())
Expand Down Expand Up @@ -307,7 +314,7 @@ func (s *BeaconKeyRecoverySuite) TestNewBeaconKeyRecovery_RecoverKey() {
dkg.On("KeyShare", s.local.NodeID()).Return(myBeaconKey.PublicKey(), nil).Once()
s.nextEpoch.On("DKG").Return(dkg, nil).Once()

dkgState.On("UpsertMyBeaconPrivateKey", s.nextEpochCounter, myBeaconKey).Return(nil).Once()
dkgState.On("UpsertMyBeaconPrivateKey", s.nextEpochCounter, myBeaconKey, s.nextEpochCommit).Return(nil).Once()

recovery, err := NewBeaconKeyRecovery(unittest.Logger(), s.local, s.state, dkgState)
require.NoError(s.T(), err)
Expand Down Expand Up @@ -363,7 +370,7 @@ func (s *BeaconKeyRecoverySuite) TestEpochFallbackModeExited() {
dkg.On("KeyShare", s.local.NodeID()).Return(myBeaconKey.PublicKey(), nil).Once()
s.nextEpoch.On("DKG").Return(dkg, nil).Once()

s.dkgState.On("UpsertMyBeaconPrivateKey", s.nextEpochCounter, myBeaconKey).Return(nil).Once()
s.dkgState.On("UpsertMyBeaconPrivateKey", s.nextEpochCounter, myBeaconKey, s.nextEpochCommit).Return(nil).Once()

recovery.EpochFallbackModeExited(s.currentEpochCounter, s.head)
s.dkgState.AssertNumberOfCalls(s.T(), "UpsertMyBeaconPrivateKey", 1)
Expand Down
126 changes: 107 additions & 19 deletions storage/badger/dkg_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
var allowedStateTransitions = map[flow.DKGState][]flow.DKGState{
flow.DKGStateStarted: {flow.DKGStateCompleted, flow.DKGStateFailure, flow.RandomBeaconKeyCommitted},
flow.DKGStateCompleted: {flow.RandomBeaconKeyCommitted, flow.DKGStateFailure},
flow.RandomBeaconKeyCommitted: {flow.RandomBeaconKeyCommitted},
flow.RandomBeaconKeyCommitted: {},
flow.DKGStateFailure: {flow.RandomBeaconKeyCommitted, flow.DKGStateFailure},
flow.DKGStateUninitialized: {flow.DKGStateStarted, flow.DKGStateFailure, flow.RandomBeaconKeyCommitted},
}
Expand Down Expand Up @@ -88,11 +88,15 @@ func (ds *RecoverablePrivateBeaconKeyStateMachine) InsertMyBeaconPrivateKey(epoc
}
encodableKey := &encodable.RandomBeaconPrivKey{PrivateKey: key}
return operation.RetryOnConflictTx(ds.db, transaction.Update, func(tx *transaction.Tx) error {
err := ds.keyCache.PutTx(epochCounter, encodableKey)(tx)
currentState, err := retrieveCurrentStateTx(epochCounter)(tx.DBTxn)
if err != nil {
return err
}
return ds.processStateTransition(epochCounter, flow.DKGStateCompleted)(tx)
err = ds.keyCache.PutTx(epochCounter, encodableKey)(tx)
if err != nil {
return err
}
return ds.processStateTransition(epochCounter, currentState, flow.DKGStateCompleted)(tx)
})
}

Expand Down Expand Up @@ -129,31 +133,32 @@ func (ds *RecoverablePrivateBeaconKeyStateMachine) IsDKGStarted(epochCounter uin
// Error returns:
// - [storage.InvalidDKGStateTransitionError] - if the requested state transition is invalid.
func (ds *RecoverablePrivateBeaconKeyStateMachine) SetDKGState(epochCounter uint64, newState flow.DKGState) error {
return operation.RetryOnConflictTx(ds.db, transaction.Update, ds.processStateTransition(epochCounter, newState))
return operation.RetryOnConflictTx(ds.db, transaction.Update, func(tx *transaction.Tx) error {
currentState, err := retrieveCurrentStateTx(epochCounter)(tx.DBTxn)
if err != nil {
return err
}

if newState == flow.RandomBeaconKeyCommitted {
return storage.NewInvalidDKGStateTransitionErrorf(currentState, newState, "cannot transition directly to committed state without evidence")
} else {
return operation.RetryOnConflictTx(ds.db, transaction.Update, ds.processStateTransition(epochCounter, currentState, newState))
}
})
}

// Error returns:
// - storage.InvalidDKGStateTransitionError - if the requested state transition is invalid
func (ds *RecoverablePrivateBeaconKeyStateMachine) processStateTransition(epochCounter uint64, newState flow.DKGState) func(*transaction.Tx) error {
func (ds *RecoverablePrivateBeaconKeyStateMachine) processStateTransition(epochCounter uint64, currentState, newState flow.DKGState) func(*transaction.Tx) error {
return func(tx *transaction.Tx) error {
var currentState flow.DKGState
err := operation.RetrieveDKGStateForEpoch(epochCounter, &currentState)(tx.DBTxn)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
currentState = flow.DKGStateUninitialized
} else {
return fmt.Errorf("could not retrieve current state for epoch %d: %w", epochCounter, err)
}
}

allowedStates := allowedStateTransitions[currentState]
if slices.Index(allowedStates, newState) < 0 {
return storage.NewInvalidDKGStateTransitionErrorf(currentState, newState, "not allowed")
}

// ensure invariant holds and we still have a valid private key stored
if newState == flow.RandomBeaconKeyCommitted || newState == flow.DKGStateCompleted {
_, err = ds.keyCache.Get(epochCounter)(tx.DBTxn)
_, err := ds.keyCache.Get(epochCounter)(tx.DBTxn)
if err != nil {
return storage.NewInvalidDKGStateTransitionErrorf(currentState, newState, "cannot transition without a valid random beacon key: %w", err)
}
Expand Down Expand Up @@ -219,21 +224,74 @@ func (ds *RecoverablePrivateBeaconKeyStateMachine) RetrieveMyBeaconPrivateKey(ep
return
}

// CommitMyBeaconPrivateKey commits the previously inserted random beacon private key for an epoch.
// Effectively, this method transitions the state machine into the [flow.RandomBeaconKeyCommitted] state if the current state is [flow.DKGStateCompleted].
// Caller needs to supply the [flow.EpochCommit] which is an evidence that the key has been indeed included for the given epoch.
// No errors are expected during normal operations.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// No errors are expected during normal operations.
// If the current state is already [flow.RandomBeaconKeyCommitted], this function is a no-op regardless of input.
// No errors are expected during normal operations.

Describing the behaviour on line 238.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regarding Jordan's suggestion:

If the current state is already [flow.RandomBeaconKeyCommitted], this function is a no-op regardless of input.

This is probably the most minimal implementation, but I am not sure it would be the most robust. I think there are two non-happy path scenarios I think we should consider for the CommitMyBeaconPrivateKey:

  1. A random beacon key was previously committed (i.e. we are in the flow.RandomBeaconKeyCommitted state). If there is a call to CommitMyBeaconPrivateKey with an flow.EpochCommit that is inconsistent with the committed key, we have a clear failure condition.
  2. A random beacon key was previously committed (i.e. we are in the flow.RandomBeaconKeyCommitted state). The state machine receives a call to CommitMyBeaconPrivateKey with an flow.EpochCommit that is consistent with the committed key. Here, we could either return an exception and say that repeated calls are in principle not allowed. Alternatively, we can treat this call as confirming information that the state machine already has and ignore this call (as information is idempotent). The latter is my preference.

What makes me nervous is taking the response strategy for 2. and also applying it to 1., because we would be proceeding in a clear failure case. I think it would be comparatively easy to be more strict in this case.

func (ds *RecoverablePrivateBeaconKeyStateMachine) CommitMyBeaconPrivateKey(epochCounter uint64, commit *flow.EpochCommit) error {
return operation.RetryOnConflictTx(ds.db, transaction.Update, func(tx *transaction.Tx) error {
currentState, err := retrieveCurrentStateTx(epochCounter)(tx.DBTxn)
if err != nil {
return err
}
// if we are in committed state then there is nothing to do
if currentState == flow.RandomBeaconKeyCommitted {
return nil
}
key, err := ds.keyCache.Get(epochCounter)(tx.DBTxn)
if err != nil {
return storage.NewInvalidDKGStateTransitionErrorf(currentState, flow.RandomBeaconKeyCommitted, "cannot transition without a valid random beacon key: %w", err)
}

// verify that the key is part of the EpochCommit
if err = ensureKeyIncludedInEpoch(epochCounter, key, commit); err != nil {
return storage.NewInvalidDKGStateTransitionErrorf(currentState, flow.RandomBeaconKeyCommitted,
"previously storred key has not been found in epoch commit event: %w", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"previously storred key has not been found in epoch commit event: %w", err)
"previously stored key has not been found in epoch commit event: %w", err)

}
return ds.processStateTransition(epochCounter, currentState, flow.RandomBeaconKeyCommitted)(tx)
})
}

// UpsertMyBeaconPrivateKey overwrites the random beacon private key for the epoch that recovers the protocol
// from Epoch Fallback Mode. State transitions are allowed if and only if the current state is not equal to
// [flow.RandomBeaconKeyCommitted]. The resulting state of this method call is [flow.RandomBeaconKeyCommitted].
// No errors are expected during normal operations.
func (ds *RecoverablePrivateBeaconKeyStateMachine) UpsertMyBeaconPrivateKey(epochCounter uint64, key crypto.PrivateKey) error {
func (ds *RecoverablePrivateBeaconKeyStateMachine) UpsertMyBeaconPrivateKey(epochCounter uint64, key crypto.PrivateKey, commit *flow.EpochCommit) error {
if key == nil {
return fmt.Errorf("will not store nil beacon key")
}
encodableKey := &encodable.RandomBeaconPrivKey{PrivateKey: key}
err := operation.RetryOnConflictTx(ds.db, transaction.Update, func(tx *transaction.Tx) error {
err := operation.UpsertMyBeaconPrivateKey(epochCounter, encodableKey)(tx.DBTxn)
currentState, err := retrieveCurrentStateTx(epochCounter)(tx.DBTxn)
if err != nil {
return err
}
return ds.processStateTransition(epochCounter, flow.RandomBeaconKeyCommitted)(tx)
// verify that the key is part of the EpochCommit
if err = ensureKeyIncludedInEpoch(epochCounter, key, commit); err != nil {
return storage.NewInvalidDKGStateTransitionErrorf(currentState, flow.RandomBeaconKeyCommitted,
"previously storred key has not been found in epoch commit event: %w", err)
}

// if we are in committed state, we cannot overwrite the key, but we can ignore this input iff the provided key is the same
if currentState == flow.RandomBeaconKeyCommitted {
// check if the stored key is equal to the provided key
storedKey, err := ds.keyCache.Get(epochCounter)(tx.DBTxn)
if err != nil {
return irrecoverable.NewExceptionf("could not retrieve a previously committed beacon key for epoch %d: %v", epochCounter, err)
}
if key.Equals(storedKey.PrivateKey) {
return nil
} else {
return storage.NewInvalidDKGStateTransitionErrorf(currentState, flow.RandomBeaconKeyCommitted,
"cannot overwrite previously committed key for epoch: %d", epochCounter)
}
}

err = operation.UpsertMyBeaconPrivateKey(epochCounter, encodableKey)(tx.DBTxn)
if err != nil {
return err
}
return ds.processStateTransition(epochCounter, currentState, flow.RandomBeaconKeyCommitted)(tx)
})
if err != nil {
return fmt.Errorf("could not overwrite beacon key for epoch %d: %w", epochCounter, err)
Expand All @@ -242,3 +300,33 @@ func (ds *RecoverablePrivateBeaconKeyStateMachine) UpsertMyBeaconPrivateKey(epoc
ds.keyCache.Insert(epochCounter, encodableKey)
return nil
}

// ensureKeyIncludedInEpoch performs a sanity check that the key is included in the epoch commit.
// The key is expected to be part of the commit.
// No errors are expected during normal operations.
func ensureKeyIncludedInEpoch(epochCounter uint64, key crypto.PrivateKey, commit *flow.EpochCommit) error {
if commit.Counter != epochCounter {
return fmt.Errorf("commit counter does not match epoch counter: %d != %d", epochCounter, commit.Counter)
}
publicKey := key.PublicKey()
if slices.IndexFunc(commit.DKGParticipantKeys, func(lhs crypto.PublicKey) bool {
return lhs.Equals(publicKey)
}) < 0 {
return fmt.Errorf("key not included in epoch commit: %s", publicKey)
}
return nil
}

// retrieveCurrentStateTx prepares a badger tx which retrieves the current state for the given epoch.
// No errors are expected during normal operations.
func retrieveCurrentStateTx(epochCounter uint64) func(*badger.Txn) (flow.DKGState, error) {
return func(txn *badger.Txn) (flow.DKGState, error) {
currentState := flow.DKGStateUninitialized
err := operation.RetrieveDKGStateForEpoch(epochCounter, &currentState)(txn)
if err != nil && !errors.Is(err, storage.ErrNotFound) {
return currentState, fmt.Errorf("could not retrieve current state for epoch %d: %w", epochCounter, err)

}
return currentState, nil
}
}
Loading
Loading