Skip to content

Commit

Permalink
Merge branch 'feature/efm-recovery' of github.com:onflow/flow-go into…
Browse files Browse the repository at this point in the history
… khalil/5732-adjust-blocktime-controller
  • Loading branch information
kc1116 committed Jul 8, 2024
2 parents 66d0275 + 9c3ed48 commit 36b3e03
Show file tree
Hide file tree
Showing 63 changed files with 919 additions and 238 deletions.
1 change: 1 addition & 0 deletions cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,7 @@ func main() {
}).
Component("consensus participant", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
mutableProtocolState := protocol_state.NewMutableProtocolState(
node.Logger,
node.Storage.EpochProtocolStateEntries,
node.Storage.ProtocolKVStore,
node.State.Params(),
Expand Down
2 changes: 1 addition & 1 deletion cmd/dynamic_startup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func getMockSnapshot(t *testing.T, epochCounter uint64, phase flow.EpochPhase) *

snapshot := new(protocolmock.Snapshot)
snapshot.On("Epochs").Return(epochQuery)
snapshot.On("Phase").Return(phase, nil)
snapshot.On("EpochPhase").Return(phase, nil)
snapshot.On("Head").Return(unittest.BlockHeaderFixture(), nil)

return snapshot
Expand Down
24 changes: 21 additions & 3 deletions cmd/util/cmd/common/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,12 @@ func GetSnapshotAtEpochAndPhase(ctx context.Context, log zerolog.Logger, startup
return fmt.Errorf("failed to get the current epoch counter: %w", err)
}

currEpochPhase, err := snapshot.Phase()
currEpochPhase, err := snapshot.EpochPhase()
if err != nil {
return fmt.Errorf("failed to get the current epoch phase: %w", err)
}

// check if we are in or past the target epoch and phase
if currEpochCounter > startupEpoch || (currEpochCounter == startupEpoch && currEpochPhase >= startupEpochPhase) {
if shouldStartAtEpochPhase(currEpochCounter, startupEpoch, currEpochPhase, startupEpochPhase) {
head, err := snapshot.Head()
if err != nil {
return fmt.Errorf("could not get Dynamic Startup snapshot header: %w", err)
Expand Down Expand Up @@ -120,3 +119,22 @@ func GetSnapshotAtEpochAndPhase(ctx context.Context, log zerolog.Logger, startup

return snapshot, nil
}

// shouldStartAtEpochPhase determines whether Dynamic Startup should start up the node, based on a
// target epoch/phase and a current epoch/phase.
func shouldStartAtEpochPhase(currentEpoch, targetEpoch uint64, currentPhase, targetPhase flow.EpochPhase) bool {
// if the current epoch is after the target epoch, start up regardless of phase
if currentEpoch > targetEpoch {
return true
}
// if the current epoch is before the target epoch, do not start up regardless of phase
if currentEpoch < targetEpoch {
return false
}
// if the target phase is EpochPhaseFallback, only start up if the current phase exactly matches
if targetPhase == flow.EpochPhaseFallback {
return currentPhase == flow.EpochPhaseFallback
}
// for any other target phase, start up if current phase is >= target
return currentPhase >= targetPhase
}
5 changes: 3 additions & 2 deletions consensus/hotstuff/committees/consensus_committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ func NewConsensusCommittee(state protocol.State, me flow.Identifier) (*Consensus
epochs = append(epochs, final.Epochs().Current())

// we prepare the next epoch, if it is committed
phase, err := final.Phase()
// TODO(EFM, #5730): update phase logic if needed
phase, err := final.EpochPhase()
if err != nil {
return nil, fmt.Errorf("could not check epoch phase: %w", err)
}
Expand All @@ -183,7 +184,7 @@ func NewConsensusCommittee(state protocol.State, me flow.Identifier) (*Consensus
}

// if epoch fallback mode was triggered, inject the fallback epoch
// TODO(EFM, #6020): consider replacing with phase check when it's available
// TODO(EFM, #5730): consider replacing with phase check when it's available
if epochStateSnapshot.EpochFallbackTriggered() {
err = com.onEpochFallbackModeTriggered()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions consensus/hotstuff/committees/consensus_committee_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (suite *ConsensusSuite) SetupTest() {
func() error { return nil },
)
suite.snapshot.On("EpochProtocolState").Return(suite.epochProtocolState, nil)
suite.snapshot.On("Phase").Return(
suite.snapshot.On("EpochPhase").Return(
func() flow.EpochPhase { return suite.phase },
func() error { return nil },
)
Expand Down Expand Up @@ -624,7 +624,7 @@ func TestRemoveOldEpochs(t *testing.T) {
epochQuery := mocks.NewEpochQuery(t, currentEpochCounter, epoch1)
snapshot.On("Epochs").Return(epochQuery)
currentEpochPhase := flow.EpochPhaseStaking
snapshot.On("Phase").Return(
snapshot.On("EpochPhase").Return(
func() flow.EpochPhase { return currentEpochPhase },
func() error { return nil },
)
Expand Down
2 changes: 1 addition & 1 deletion consensus/hotstuff/cruisectl/block_time_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (ctl *BlockTimeController) initEpochInfo() error {
}
ctl.curEpochTargetEndTime = curEpochTargetEndTime

phase, err := finalSnapshot.Phase()
phase, err := finalSnapshot.EpochPhase()
if err != nil {
return fmt.Errorf("could not check snapshot phase: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/hotstuff/cruisectl/block_time_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func setupMocks(bs *BlockTimeControllerSuite) {
func() error { return nil },
)
bs.snapshot.On("EpochProtocolState").Return(&bs.epochProtocolState, nil)
bs.snapshot.On("Phase").Return(
bs.snapshot.On("EpochPhase").Return(
func() flow.EpochPhase { return bs.epochs.Phase() },
func() error { return nil })
bs.snapshot.On("Head").Return(unittest.BlockHeaderFixture(unittest.HeaderWithView(bs.initialView+11)), nil).Maybe()
Expand Down
1 change: 1 addition & 0 deletions consensus/integration/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ func createNode(
seals := stdmap.NewIncorporatedResultSeals(sealLimit)

mutableProtocolState := protocol_state.NewMutableProtocolState(
log,
protocolStateDB,
protocokKVStoreDB,
state.Params(),
Expand Down
2 changes: 1 addition & 1 deletion engine/collection/epochmgr/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (e *Engine) checkShouldStartPreviousEpochComponentsOnStartup(engineCtx irre
func (e *Engine) checkShouldVoteOnStartup(finalSnapshot protocol.Snapshot) error {
// check the current phase on startup, in case we are in setup phase
// and haven't yet voted for the next root QC
phase, err := finalSnapshot.Phase()
phase, err := finalSnapshot.EpochPhase()
if err != nil {
return fmt.Errorf("could not get epoch phase for finalized snapshot: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion engine/collection/epochmgr/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (suite *Suite) SetupTest() {
suite.snap.On("Head").Return(
func() *flow.Header { return suite.header },
func() error { return nil })
suite.snap.On("Phase").Return(
suite.snap.On("EpochPhase").Return(
func() flow.EpochPhase { return suite.phase },
func() error { return nil })

Expand Down
1 change: 1 addition & 0 deletions engine/collection/test/cluster_switchover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func NewClusterSwitchoverTestCase(t *testing.T, conf ClusterSwitchoverTestConf)
// take first collection node and use its storage as data source for stateMutator
refNode := tc.nodes[0]
stateMutator := protocol_state.NewMutableProtocolState(
refNode.Log,
refNode.EpochProtocolState,
refNode.ProtocolKVStore,
refNode.State.Params(),
Expand Down
2 changes: 1 addition & 1 deletion engine/consensus/dkg/reactor_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (e *ReactorEngine) Ready() <-chan struct{} {
// and fail this epoch's DKG.
snap := e.State.Final()

phase, err := snap.Phase()
phase, err := snap.EpochPhase()
if err != nil {
// unexpected storage-level error
// TODO use irrecoverable context
Expand Down
12 changes: 6 additions & 6 deletions engine/consensus/dkg/reactor_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (suite *ReactorEngineSuite_SetupPhase) TestRunDKG_PhaseTransition() {
func (suite *ReactorEngineSuite_SetupPhase) TestRunDKG_StartupInSetupPhase() {

// we are in the EpochSetup phase
suite.snapshot.On("Phase").Return(flow.EpochPhaseSetup, nil).Once()
suite.snapshot.On("EpochPhase").Return(flow.EpochPhaseSetup, nil).Once()
// the dkg for this epoch has not been started
suite.dkgState.On("GetDKGStarted", suite.NextEpochCounter()).Return(false, nil).Once()

Expand Down Expand Up @@ -238,7 +238,7 @@ func (suite *ReactorEngineSuite_SetupPhase) TestRunDKG_StartupInSetupPhase() {
func (suite *ReactorEngineSuite_SetupPhase) TestRunDKG_StartupInSetupPhase_DKGAlreadyStarted() {

// we are in the EpochSetup phase
suite.snapshot.On("Phase").Return(flow.EpochPhaseSetup, nil).Once()
suite.snapshot.On("EpochPhase").Return(flow.EpochPhaseSetup, nil).Once()
// the dkg for this epoch has been started
suite.dkgState.On("GetDKGStarted", suite.NextEpochCounter()).Return(true, nil).Once()

Expand Down Expand Up @@ -435,7 +435,7 @@ func (suite *ReactorEngineSuite_CommittedPhase) TestLocalDKGFailure() {
func (suite *ReactorEngineSuite_CommittedPhase) TestStartupInCommittedPhase_DKGSuccess() {

// we are in the EpochSetup phase
suite.snap.On("Phase").Return(flow.EpochPhaseCommitted, nil).Once()
suite.snap.On("EpochPhase").Return(flow.EpochPhaseCommitted, nil).Once()
// the dkg for this epoch has been started but not ended
suite.dkgState.On("GetDKGStarted", suite.NextEpochCounter()).Return(true, nil).Once()
suite.dkgState.On("GetDKGEndState", suite.NextEpochCounter()).Return(flow.DKGEndStateUnknown, storerr.ErrNotFound).Once()
Expand All @@ -458,7 +458,7 @@ func (suite *ReactorEngineSuite_CommittedPhase) TestStartupInCommittedPhase_DKGS
func (suite *ReactorEngineSuite_CommittedPhase) TestStartupInCommittedPhase_DKGEndStateAlreadySet() {

// we are in the EpochSetup phase
suite.snap.On("Phase").Return(flow.EpochPhaseCommitted, nil).Once()
suite.snap.On("EpochPhase").Return(flow.EpochPhaseCommitted, nil).Once()
// the dkg for this epoch has been started and ended
suite.dkgState.On("GetDKGStarted", suite.NextEpochCounter()).Return(true, nil).Once()
suite.dkgState.On("GetDKGEndState", suite.NextEpochCounter()).Return(flow.DKGEndStateNoKey, nil).Once()
Expand All @@ -479,7 +479,7 @@ func (suite *ReactorEngineSuite_CommittedPhase) TestStartupInCommittedPhase_DKGE
func (suite *ReactorEngineSuite_CommittedPhase) TestStartupInCommittedPhase_InconsistentKey() {

// we are in the EpochSetup phase
suite.snap.On("Phase").Return(flow.EpochPhaseCommitted, nil).Once()
suite.snap.On("EpochPhase").Return(flow.EpochPhaseCommitted, nil).Once()
// the dkg for this epoch has been started but not ended
suite.dkgState.On("GetDKGStarted", suite.NextEpochCounter()).Return(true, nil).Once()
suite.dkgState.On("GetDKGEndState", suite.NextEpochCounter()).Return(flow.DKGEndStateUnknown, storerr.ErrNotFound).Once()
Expand All @@ -505,7 +505,7 @@ func (suite *ReactorEngineSuite_CommittedPhase) TestStartupInCommittedPhase_Inco
func (suite *ReactorEngineSuite_CommittedPhase) TestStartupInCommittedPhase_MissingKey() {

// we are in the EpochSetup phase
suite.snap.On("Phase").Return(flow.EpochPhaseCommitted, nil).Once()
suite.snap.On("EpochPhase").Return(flow.EpochPhaseCommitted, nil).Once()
// the dkg for this epoch has been started but not ended
suite.dkgState.On("GetDKGStarted", suite.NextEpochCounter()).Return(true, nil).Once()
suite.dkgState.On("GetDKGEndState", suite.NextEpochCounter()).Return(flow.DKGEndStateUnknown, storerr.ErrNotFound).Once()
Expand Down
1 change: 1 addition & 0 deletions engine/verification/utils/unittest/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,7 @@ func bootstrapSystem(
identities = append(identities, verID.Identity())

mutableProtocolState := protocol_state.NewMutableProtocolState(
log,
stateFixture.Storage.EpochProtocolStateEntries,
stateFixture.Storage.ProtocolKVStore,
stateFixture.State.Params(),
Expand Down
2 changes: 1 addition & 1 deletion integration/dkg/dkg_whiteboard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func createNode(
epochQuery.Add(nextEpoch)
snapshot := new(protocolmock.Snapshot)
snapshot.On("Epochs").Return(epochQuery)
snapshot.On("Phase").Return(flow.EpochPhaseStaking, nil)
snapshot.On("EpochPhase").Return(flow.EpochPhaseStaking, nil)
snapshot.On("Head").Return(firstBlock, nil)
state := new(protocolmock.ParticipantState)
state.On("AtBlockID", firstBlock.ID()).Return(snapshot)
Expand Down
2 changes: 1 addition & 1 deletion integration/dkg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (n *node) setEpochs(t *testing.T, currentSetup flow.EpochSetup, nextSetup f
epochQuery.Add(nextEpoch)
snapshot := new(protocolmock.Snapshot)
snapshot.On("Epochs").Return(epochQuery)
snapshot.On("Phase").Return(flow.EpochPhaseStaking, nil)
snapshot.On("EpochPhase").Return(flow.EpochPhaseStaking, nil)
snapshot.On("Head").Return(firstBlock, nil)
state := new(protocolmock.ParticipantState)
state.On("AtBlockID", firstBlock.ID()).Return(snapshot)
Expand Down
2 changes: 1 addition & 1 deletion integration/epochs/epoch_qc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (s *Suite) TestEpochQuorumCertificate() {
hotSigner.On("CreateVote", mock.Anything).Return(vote, nil)

snapshot := &protomock.Snapshot{}
snapshot.On("Phase").Return(flow.EpochPhaseSetup, nil)
snapshot.On("EpochPhase").Return(flow.EpochPhaseSetup, nil)

state := &protomock.State{}
state.On("CanonicalRootBlock").Return(rootBlock)
Expand Down
2 changes: 1 addition & 1 deletion integration/tests/epochs/base_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (s *BaseSuite) AwaitEpochPhase(ctx context.Context, expectedEpoch uint64, e

actualEpoch, err = snapshot.Epochs().Current().Counter()
require.NoError(s.T(), err)
actualPhase, err = snapshot.Phase()
actualPhase, err = snapshot.EpochPhase()
require.NoError(s.T(), err)

return actualEpoch == expectedEpoch && actualPhase == expectedPhase
Expand Down
2 changes: 1 addition & 1 deletion integration/tests/epochs/dynamic_epoch_transition_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func (s *DynamicEpochTransitionSuite) AssertInEpochPhase(ctx context.Context, ex
require.NoError(s.T(), err)
actualEpoch, err := snapshot.Epochs().Current().Counter()
require.NoError(s.T(), err)
actualPhase, err := snapshot.Phase()
actualPhase, err := snapshot.EpochPhase()
require.NoError(s.T(), err)
require.Equal(s.T(), expectedPhase, actualPhase, "not in correct phase")
require.Equal(s.T(), expectedEpoch, actualEpoch, "not in correct epoch")
Expand Down
2 changes: 1 addition & 1 deletion integration/tests/lib/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func LogStatus(t *testing.T, ctx context.Context, log zerolog.Logger, client *te
sealed := sealingSegment.Sealed()
finalized := sealingSegment.Finalized()

phase, err := snapshot.Phase()
phase, err := snapshot.EpochPhase()
require.NoError(t, err)
epoch := snapshot.Epochs().Current()
counter, err := epoch.Counter()
Expand Down
Loading

0 comments on commit 36b3e03

Please sign in to comment.