Skip to content

Commit

Permalink
Handle late arriving QUALITY messages in every round
Browse files Browse the repository at this point in the history
Change the core GPBFT protocol to:
 1) accept QUALITY messages in ant phase or round.
 2) update candidate chains prior to executing CONVERGE phase based
    on the initial proposal.

This would permit participants with late-arriving/partially delivered
QUALITY messages to affect the candidates chains in CONVERGE phase to
sway for the initial proposal or any of its prefixes should there be a
strong quorum. The changes here essentially execute the QUALITY phase
prior to `tryConverge` but without updating the current proposal.

The changes above, in conjunction with rebroadcast of QUALITY messages
introduced in #597 should significantly reduce the likelihood of lack of
progress due to partially seen QUALITY messages. See:

 - filecoin-project/FIPs#809
 #discussioncomment-10409902

As part of the changes introduced by this commit, the data structure
used by the instance to store candidate chains is changed to a map of
chain keys for a faster candidate set update (O(1)), and a more
concise way to assure uniqueness of the set of candidates.

Fixes #591
  • Loading branch information
masih committed Aug 30, 2024
1 parent 8e05800 commit 874a82d
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 31 deletions.
2 changes: 1 addition & 1 deletion gpbft/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (c ECChain) Validate() error {
// Returns an identifier for the chain suitable for use as a map key.
// This must completely determine the sequence of tipsets in the chain.
func (c ECChain) Key() ChainKey {
ln := len(c) * (8 + 32 + 4) // epoch + commitement + ts length
ln := len(c) * (8 + 32 + 4) // epoch + commitment + ts length
for i := range c {
ln += len(c[i].Key) + len(c[i].PowerTable)
}
Expand Down
85 changes: 55 additions & 30 deletions gpbft/gpbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,17 +184,23 @@ type instance struct {
// Supplemental data that all participants must agree on ahead of time. Messages that
// propose supplemental data that differs with our supplemental data will be discarded.
supplementalData *SupplementalData
// initialProposal represents the unmodified initial input chain of this
// instance. Constructed when the instance is initialized, the initialProposal is
// utilized to update candidates during the reception of late-arriving QUALITY
// messages to aid faster convergence.
initialProposal ECChain
// This instance's proposal for the current round. Never bottom.
// This is set after the QUALITY phase, and changes only at the end of a full round.
proposal ECChain
// The value to be transmitted at the next phase, which may be bottom.
// This value may change away from the proposal between phases.
value ECChain
// The set of values that are acceptable candidates to this instance.
// This includes the base chain, all prefixes of proposal that found a strong quorum
// of support in the QUALITY phase, and any chains that could possibly have been
// decided by another participant.
candidates []ECChain
// candidates contains a set of values that are acceptable candidates to this
// instance. This includes the base chain, all prefixes of proposal that found a
// strong quorum of support in the QUALITY phase or late arriving quality
// messages, including any chains that could possibly have been decided by
// another participant.
candidates map[ChainKey]struct{}
// The final termination value of the instance, for communication to the participant.
// This field is an alternative to plumbing an optional decision value out through
// all the method calls, or holding a callback handle to receive it here.
Expand Down Expand Up @@ -235,11 +241,14 @@ func newInstance(
round: 0,
phase: INITIAL_PHASE,
supplementalData: data,
initialProposal: input,
proposal: input,
broadcasted: newBroadcastState(),
value: ECChain{},
candidates: []ECChain{input.BaseChain()},
quality: newQuorumState(powerTable),
candidates: map[ChainKey]struct{}{
input.BaseChain().Key(): {},
},
quality: newQuorumState(powerTable),
rounds: map[uint64]*roundState{
0: newRoundState(powerTable),
},
Expand Down Expand Up @@ -351,11 +360,9 @@ func (i *instance) receiveOne(msg *GMessage) (bool, error) {
if i.phase == TERMINATED_PHASE {
return false, nil // No-op
}
// Ignore QUALITY messages after exiting the QUALITY phase.
// Ignore CONVERGE and PREPARE messages for prior rounds.
forPriorRound := msg.Vote.Round < i.round
if (msg.Vote.Step == QUALITY_PHASE && i.phase != QUALITY_PHASE) ||
(forPriorRound && msg.Vote.Step == CONVERGE_PHASE) ||
if (forPriorRound && msg.Vote.Step == CONVERGE_PHASE) ||
(forPriorRound && msg.Vote.Step == PREPARE_PHASE) {
return false, nil
}
Expand All @@ -373,7 +380,8 @@ func (i *instance) receiveOne(msg *GMessage) (bool, error) {
msgRound := i.getRound(msg.Vote.Round)
switch msg.Vote.Step {
case QUALITY_PHASE:
// Receive each prefix of the proposal independently.
// Receive each prefix of the proposal independently, which is accepted at any
// round/phase.
i.quality.ReceiveEachPrefix(msg.Sender, msg.Vote.Value)
case CONVERGE_PHASE:
if err := msgRound.converged.Receive(msg.Sender, i.powerTable, msg.Vote.Value, msg.Ticket, msg.Justification); err != nil {
Expand Down Expand Up @@ -498,12 +506,8 @@ func (i *instance) tryQuality() error {
}

if foundQuorum || timeoutExpired {
// Add prefixes with quorum to candidates (skipping base chain, which is already there).
for l := range i.proposal {
if l > 0 {
i.candidates = append(i.candidates, i.proposal.Prefix(l))
}
}
// Add prefixes with quorum to candidates.
i.addCandidatePrefixes(i.proposal)
i.value = i.proposal
i.log("adopting proposal/value %s", &i.proposal)
i.beginPrepare(nil)
Expand Down Expand Up @@ -544,6 +548,17 @@ func (i *instance) tryConverge() error {
return nil
}

// Find the first initial proposal prefix that has strong quorum as a result of
// arriving QUALITY messages and update candidates for its consecutive prefixes.
for p := range i.initialProposal {
initialProposalPrefix := i.initialProposal.Prefix(p)
if foundQuorum := i.quality.HasStrongQuorumFor(initialProposalPrefix.Key()); foundQuorum {
i.log("expanding candidates for proposal %s due to late QUALITY quorum for %s", i.proposal, &initialProposalPrefix)
i.addCandidatePrefixes(initialProposalPrefix)
break
}
}

winner := i.getRound(i.round).converged.FindMaxTicketProposal(i.powerTable)
if !winner.IsValid() {
return fmt.Errorf("no values at CONVERGE")
Expand All @@ -555,11 +570,11 @@ func (i *instance) tryConverge() error {
// in the last round, consider it a candidate.
if !i.isCandidate(winner.Chain) && winner.Justification.Vote.Step == PREPARE_PHASE && possibleDecisionLastRound {
i.log("⚠️ swaying from %s to %s by CONVERGE", &i.proposal, &winner.Chain)
i.candidates = append(i.candidates, winner.Chain)
i.addCandidate(winner.Chain)

Check warning on line 573 in gpbft/gpbft.go

View check run for this annotation

Codecov / codecov/patch

gpbft/gpbft.go#L573

Added line #L573 was not covered by tests
}
if i.isCandidate(winner.Chain) {
i.proposal = winner.Chain
i.log("adopting proposal %s after converge", &winner.Chain)
i.log("adopting proposal %s after CONVERGE", &winner.Chain)
} else {
// Else preserve own proposal.
// This could alternatively loop to next lowest ticket as an optimisation to increase the
Expand Down Expand Up @@ -648,7 +663,8 @@ func (i *instance) tryCommit(round uint64) error {
committed := i.getRound(round).committed
quorumValue, foundStrongQuorum := committed.FindStrongQuorumValue()
timedOut := atOrAfter(i.participant.host.Time(), i.phaseTimeout)
phaseComplete := timedOut && committed.ReceivedFromStrongQuorum()
receivedFromStrongQuorum := committed.ReceivedFromStrongQuorum()
phaseComplete := timedOut && receivedFromStrongQuorum

if foundStrongQuorum && !quorumValue.IsZero() {
// A participant may be forced to decide a value that's not its preferred chain.
Expand All @@ -668,7 +684,7 @@ func (i *instance) tryCommit(round uint64) error {
if !v.IsZero() {
if !i.isCandidate(v) {
i.log("⚠️ swaying from %s to %s by COMMIT", &i.input, &v)
i.candidates = append(i.candidates, v)
i.addCandidate(v)
}
if !v.Eq(i.proposal) {
i.proposal = v
Expand Down Expand Up @@ -786,7 +802,7 @@ func (i *instance) skipToRound(round uint64, chain ECChain, justification *Justi

if justification.Vote.Step == PREPARE_PHASE {
i.log("⚠️ swaying from %s to %s by skip to round %d", &i.proposal, chain, i.round)
i.candidates = append(i.candidates, chain)
i.addCandidate(chain)
i.proposal = chain
}
i.beginConverge(justification)
Expand All @@ -795,12 +811,21 @@ func (i *instance) skipToRound(round uint64, chain ECChain, justification *Justi
// Returns whether a chain is acceptable as a proposal for this instance to vote for.
// This is "EC Compatible" in the pseudocode.
func (i *instance) isCandidate(c ECChain) bool {
for _, candidate := range i.candidates {
if c.Eq(candidate) {
return true
}
_, exists := i.candidates[c.Key()]
return exists
}

func (i *instance) addCandidatePrefixes(c ECChain) {
for l := range c {
i.addCandidate(c.Prefix(l))
}
}

func (i *instance) addCandidate(c ECChain) {
key := c.Key()
if _, exists := i.candidates[key]; !exists {
i.candidates[key] = struct{}{}
}
return false
}

func (i *instance) terminate(decision *Justification) {
Expand Down Expand Up @@ -1279,9 +1304,9 @@ func newConvergeState() *convergeState {
}
}

// SetSelfValue sets the participant's locally-proposed converge value.
// This means the participant need not rely on messages broadcast to be received by itself.
// See HasSelfValue.
// SetSelfValue sets the participant's locally-proposed converge value. This
// means the participant need not to rely on messages broadcast to be received by
// itself.
func (c *convergeState) SetSelfValue(value ECChain, justification *Justification) {
// any converge for the given value is better than self-reported
// as self-reported has no ticket
Expand Down

0 comments on commit 874a82d

Please sign in to comment.