Skip to content

Commit

Permalink
only pass extension to recompute leader selection
Browse files Browse the repository at this point in the history
  • Loading branch information
jordanschalm committed Jul 8, 2024
1 parent 2b0d7e2 commit 486c076
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 40 deletions.
59 changes: 32 additions & 27 deletions consensus/hotstuff/committees/consensus_committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
// Guaranteeing concurrency safety is delegated to the higher-level logic.
type epochInfo struct {
*leader.LeaderSelection // pre-computed leader selection for the epoch
randomSeed []byte
initialCommittee flow.IdentitySkeletonList
initialCommitteeMap map[flow.Identifier]*flow.IdentitySkeleton
weightThresholdForQC uint64 // computed based on initial committee weights
Expand All @@ -42,26 +43,22 @@ type epochInfo struct {
}

// recomputeLeaderSelectionForExtendedViewRange re-computes the LeaderSelection field
// for the input epoch's entire view range, including extensions. This must be called
// each time an extension is added to an epoch. This method is idempotent, i.e.
// repeated calls for the same final view are no-ops.
// for the input epoch's entire view range, including the new extension.
// This must be called each time an extension is added to an epoch.
// This method is idempotent, i.e. repeated calls for the same final view are no-ops.
// Caution, not concurrency safe.
// No errors are expected during normal operation.
func (e *epochInfo) recomputeLeaderSelectionForExtendedViewRange(epoch protocol.Epoch) error {
extendedFinalView, err := epoch.FinalView()
if err != nil {
return fmt.Errorf("could not get final view for extended epoch: %w", err)
}
func (e *epochInfo) recomputeLeaderSelectionForExtendedViewRange(extension flow.EpochExtension) error {
// sanity check: ensure the final view of the current epoch monotonically increases
lastViewOfLeaderSelection := e.FinalView()
if extendedFinalView < lastViewOfLeaderSelection {
return fmt.Errorf("final view of epoch must be monotonically increases, but is decreasing from %d to %d", lastViewOfLeaderSelection, extendedFinalView)
if extension.FinalView < lastViewOfLeaderSelection {
return fmt.Errorf("final view of epoch must be monotonically increases, but is decreasing from %d to %d", lastViewOfLeaderSelection, extension.FinalView)
}
if extendedFinalView == lastViewOfLeaderSelection {
if extension.FinalView == lastViewOfLeaderSelection {
return nil
}

leaderSelection, err := leader.SelectionForConsensus(epoch)
leaderSelection, err := leader.SelectionForConsensus(e.initialCommittee, e.randomSeed, e.FirstView(), extension.FinalView)
if err != nil {
return fmt.Errorf("could not re-compute leader selection for epoch after extension: %w", err)
}
Expand All @@ -73,14 +70,27 @@ func (e *epochInfo) recomputeLeaderSelectionForExtendedViewRange(epoch protocol.
// This can be cached and used for all by-view queries for this epoch.
// No errors are expected during normal operation.
func newEpochInfo(epoch protocol.Epoch) (*epochInfo, error) {
leaders, err := leader.SelectionForConsensus(epoch)
randomSeed, err := epoch.RandomSource()
if err != nil {
return nil, fmt.Errorf("could not get leader selection: %w", err)
return nil, fmt.Errorf("could not get epoch random source: %w", err)
}
firstView, err := epoch.FirstView()
if err != nil {
return nil, fmt.Errorf("could not get epoch first view: %w", err)
}
finalView, err := epoch.FinalView()
if err != nil {
return nil, fmt.Errorf("could not get epoch final view: %w", err)
}
initialIdentities, err := epoch.InitialIdentities()
if err != nil {
return nil, fmt.Errorf("could not initial identities: %w", err)
}
leaders, err := leader.SelectionForConsensus(initialIdentities, randomSeed, firstView, finalView)
if err != nil {
return nil, fmt.Errorf("could not get leader selection: %w", err)
}

initialCommittee := initialIdentities.Filter(filter.IsConsensusCommitteeMember)
dkg, err := epoch.DKG()
if err != nil {
Expand All @@ -90,6 +100,7 @@ func newEpochInfo(epoch protocol.Epoch) (*epochInfo, error) {
totalWeight := initialCommittee.TotalWeight()
ei := &epochInfo{
LeaderSelection: leaders,
randomSeed: randomSeed,
initialCommittee: initialCommittee,
initialCommitteeMap: initialCommittee.Lookup(),
weightThresholdForQC: WeightThresholdToBuildQC(totalWeight),
Expand Down Expand Up @@ -338,9 +349,9 @@ func (c *Consensus) EpochCommittedPhaseStarted(_ uint64, first *flow.Header) {

// EpochExtended informs `committees.Consensus` that a block including a new epoch extension has been finalized.
// This event consumer function enqueues an event handler function for the single event handler thread to execute.
func (c *Consensus) EpochExtended(_ uint64, refBlock *flow.Header, _ flow.EpochExtension) {
func (c *Consensus) EpochExtended(epochCounter uint64, _ *flow.Header, extension flow.EpochExtension) {
c.epochEvents <- func() error {
return c.handleEpochExtended(refBlock)
return c.handleEpochExtended(epochCounter, extension)
}
}

Expand All @@ -349,25 +360,19 @@ func (c *Consensus) EpochExtended(_ uint64, refBlock *flow.Header, _ flow.EpochE
// When an extension is observed, we re-compute leader selection for the current epoch, taking into
// account the most recent extension (included as of refBlock).
// No errors are expected during normal operation.
func (c *Consensus) handleEpochExtended(refBlock *flow.Header) error {
currentEpoch := c.state.AtHeight(refBlock.Height).Epochs().Current()
counter, err := currentEpoch.Counter()
if err != nil {
return fmt.Errorf("could not read current epoch info: %w", err)
}

func (c *Consensus) handleEpochExtended(epochCounter uint64, extension flow.EpochExtension) error {
c.mu.Lock()
defer c.mu.Unlock()

epochInf, ok := c.epochs[counter]
epochInf, ok := c.epochs[epochCounter]
if !ok {
return fmt.Errorf("sanity check failed: current epoch committee info does not exist")
}
// sanity check: we can only extend the current epoch, if the next epoch has not yet been committed:
if _, nextEpochCommitted := c.epochs[counter+1]; nextEpochCommitted {
return fmt.Errorf("sanity check failed: attempting to extend epoch %d, but subsequent epoch %d is already committed", counter, counter+1)
if _, nextEpochCommitted := c.epochs[epochCounter+1]; nextEpochCommitted {
return fmt.Errorf("sanity check failed: attempting to extend epoch %d, but subsequent epoch %d is already committed", epochCounter, epochCounter+1)
}
err = epochInf.recomputeLeaderSelectionForExtendedViewRange(currentEpoch)
err := epochInf.recomputeLeaderSelectionForExtendedViewRange(extension)
if err != nil {
return fmt.Errorf("could not recompute leader selection for current epoch upon extension: %w", err)
}
Expand Down
35 changes: 22 additions & 13 deletions consensus/hotstuff/committees/leader/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,24 @@ package leader
import (
"fmt"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/state/protocol/prg"
)

// SelectionForConsensus pre-computes and returns leaders for the consensus committee
// in the given epoch. The consensus committee spans multiple epochs and the leader
// selection returned here is only valid for the input epoch, so it is necessary to
// call this for each upcoming epoch.
func SelectionForConsensus(epoch protocol.Epoch) (*LeaderSelection, error) {
// SelectionForConsensusFromEpoch is a ...
func SelectionForConsensusFromEpoch(epoch protocol.Epoch) (*LeaderSelection, error) {

// pre-compute leader selection for the epoch
identities, err := epoch.InitialIdentities()
if err != nil {
return nil, fmt.Errorf("could not get epoch initial identities: %w", err)
}

// get the epoch source of randomness
randomSeed, err := epoch.RandomSource()
if err != nil {
return nil, fmt.Errorf("could not get epoch seed: %w", err)
}
// create random number generator from the seed and customizer
rng, err := prg.New(randomSeed, prg.ConsensusLeaderSelection, nil)
if err != nil {
return nil, fmt.Errorf("could not create rng: %w", err)
}
firstView, err := epoch.FirstView()
if err != nil {
return nil, fmt.Errorf("could not get epoch first view: %w", err)
Expand All @@ -39,11 +30,29 @@ func SelectionForConsensus(epoch protocol.Epoch) (*LeaderSelection, error) {
return nil, fmt.Errorf("could not get epoch final view: %w", err)
}

leaders, err := SelectionForConsensus(
identities,
randomSeed,
firstView,
finalView,
)
return leaders, err
}

// SelectionForConsensus pre-computes and returns leaders for the consensus committee
// in the given epoch. The consensus committee spans multiple epochs and the leader
// selection returned here is only valid for the input epoch, so it is necessary to
// call this for each upcoming epoch.
func SelectionForConsensus(initialIdentities flow.IdentitySkeletonList, randomSeed []byte, firstView, finalView uint64) (*LeaderSelection, error) {
rng, err := prg.New(randomSeed, prg.ConsensusLeaderSelection, nil)
if err != nil {
return nil, fmt.Errorf("could not create rng: %w", err)
}
leaders, err := ComputeLeaderSelection(
firstView,
rng,
int(finalView-firstView+1), // add 1 because both first/final view are inclusive
identities.Filter(filter.IsConsensusCommitteeMember),
initialIdentities.Filter(filter.IsConsensusCommitteeMember),
)
return leaders, err
}

0 comments on commit 486c076

Please sign in to comment.