diff --git a/gpbft/chain.go b/gpbft/chain.go index 83a75164..a759e481 100644 --- a/gpbft/chain.go +++ b/gpbft/chain.go @@ -288,7 +288,7 @@ func (c ECChain) Validate() error { // Returns an identifier for the chain suitable for use as a map key. // This must completely determine the sequence of tipsets in the chain. func (c ECChain) Key() ChainKey { - ln := len(c) * (8 + 32 + 4) // epoch + commitement + ts length + ln := len(c) * (8 + 32 + 4) // epoch + commitment + ts length for i := range c { ln += len(c[i].Key) + len(c[i].PowerTable) } diff --git a/gpbft/gpbft.go b/gpbft/gpbft.go index 20a1e695..7c0b53fb 100644 --- a/gpbft/gpbft.go +++ b/gpbft/gpbft.go @@ -184,17 +184,23 @@ type instance struct { // Supplemental data that all participants must agree on ahead of time. Messages that // propose supplemental data that differs with our supplemental data will be discarded. supplementalData *SupplementalData + // initialProposal represents the unmodified initial input chain of this + // instance. Constructed when the instance is initialized, the initialProposal is + // utilized to update candidates during the reception of late-arriving QUALITY + // messages to aid faster convergence. + initialProposal ECChain // This instance's proposal for the current round. Never bottom. // This is set after the QUALITY phase, and changes only at the end of a full round. proposal ECChain // The value to be transmitted at the next phase, which may be bottom. // This value may change away from the proposal between phases. value ECChain - // The set of values that are acceptable candidates to this instance. - // This includes the base chain, all prefixes of proposal that found a strong quorum - // of support in the QUALITY phase, and any chains that could possibly have been - // decided by another participant. - candidates []ECChain + // candidates contains a set of values that are acceptable candidates to this + // instance. This includes the base chain, all prefixes of proposal that found a + // strong quorum of support in the QUALITY phase or late arriving quality + // messages, including any chains that could possibly have been decided by + // another participant. + candidates map[ChainKey]struct{} // The final termination value of the instance, for communication to the participant. // This field is an alternative to plumbing an optional decision value out through // all the method calls, or holding a callback handle to receive it here. @@ -235,11 +241,14 @@ func newInstance( round: 0, phase: INITIAL_PHASE, supplementalData: data, + initialProposal: input, proposal: input, broadcasted: newBroadcastState(), value: ECChain{}, - candidates: []ECChain{input.BaseChain()}, - quality: newQuorumState(powerTable), + candidates: map[ChainKey]struct{}{ + input.BaseChain().Key(): {}, + }, + quality: newQuorumState(powerTable), rounds: map[uint64]*roundState{ 0: newRoundState(powerTable), }, @@ -351,11 +360,9 @@ func (i *instance) receiveOne(msg *GMessage) (bool, error) { if i.phase == TERMINATED_PHASE { return false, nil // No-op } - // Ignore QUALITY messages after exiting the QUALITY phase. // Ignore CONVERGE and PREPARE messages for prior rounds. forPriorRound := msg.Vote.Round < i.round - if (msg.Vote.Step == QUALITY_PHASE && i.phase != QUALITY_PHASE) || - (forPriorRound && msg.Vote.Step == CONVERGE_PHASE) || + if (forPriorRound && msg.Vote.Step == CONVERGE_PHASE) || (forPriorRound && msg.Vote.Step == PREPARE_PHASE) { return false, nil } @@ -373,7 +380,8 @@ func (i *instance) receiveOne(msg *GMessage) (bool, error) { msgRound := i.getRound(msg.Vote.Round) switch msg.Vote.Step { case QUALITY_PHASE: - // Receive each prefix of the proposal independently. + // Receive each prefix of the proposal independently, which is accepted at any + // round/phase. i.quality.ReceiveEachPrefix(msg.Sender, msg.Vote.Value) case CONVERGE_PHASE: if err := msgRound.converged.Receive(msg.Sender, i.powerTable, msg.Vote.Value, msg.Ticket, msg.Justification); err != nil { @@ -498,12 +506,8 @@ func (i *instance) tryQuality() error { } if foundQuorum || timeoutExpired { - // Add prefixes with quorum to candidates (skipping base chain, which is already there). - for l := range i.proposal { - if l > 0 { - i.candidates = append(i.candidates, i.proposal.Prefix(l)) - } - } + // Add prefixes with quorum to candidates. + i.addCandidatePrefixes(i.proposal) i.value = i.proposal i.log("adopting proposal/value %s", &i.proposal) i.beginPrepare(nil) @@ -544,6 +548,17 @@ func (i *instance) tryConverge() error { return nil } + // Find the first initial proposal prefix that has strong quorum as a result of + // arriving QUALITY messages and update candidates for its consecutive prefixes. + for p := range i.initialProposal { + initialProposalPrefix := i.initialProposal.Prefix(p) + if foundQuorum := i.quality.HasStrongQuorumFor(initialProposalPrefix.Key()); foundQuorum { + i.log("expanding candidates for proposal %s due to late QUALITY quorum for %s", i.proposal, &initialProposalPrefix) + i.addCandidatePrefixes(initialProposalPrefix) + break + } + } + winner := i.getRound(i.round).converged.FindMaxTicketProposal(i.powerTable) if !winner.IsValid() { return fmt.Errorf("no values at CONVERGE") @@ -555,11 +570,11 @@ func (i *instance) tryConverge() error { // in the last round, consider it a candidate. if !i.isCandidate(winner.Chain) && winner.Justification.Vote.Step == PREPARE_PHASE && possibleDecisionLastRound { i.log("⚠️ swaying from %s to %s by CONVERGE", &i.proposal, &winner.Chain) - i.candidates = append(i.candidates, winner.Chain) + i.addCandidate(winner.Chain) } if i.isCandidate(winner.Chain) { i.proposal = winner.Chain - i.log("adopting proposal %s after converge", &winner.Chain) + i.log("adopting proposal %s after CONVERGE", &winner.Chain) } else { // Else preserve own proposal. // This could alternatively loop to next lowest ticket as an optimisation to increase the @@ -648,7 +663,8 @@ func (i *instance) tryCommit(round uint64) error { committed := i.getRound(round).committed quorumValue, foundStrongQuorum := committed.FindStrongQuorumValue() timedOut := atOrAfter(i.participant.host.Time(), i.phaseTimeout) - phaseComplete := timedOut && committed.ReceivedFromStrongQuorum() + receivedFromStrongQuorum := committed.ReceivedFromStrongQuorum() + phaseComplete := timedOut && receivedFromStrongQuorum if foundStrongQuorum && !quorumValue.IsZero() { // A participant may be forced to decide a value that's not its preferred chain. @@ -668,7 +684,7 @@ func (i *instance) tryCommit(round uint64) error { if !v.IsZero() { if !i.isCandidate(v) { i.log("⚠️ swaying from %s to %s by COMMIT", &i.input, &v) - i.candidates = append(i.candidates, v) + i.addCandidate(v) } if !v.Eq(i.proposal) { i.proposal = v @@ -786,7 +802,7 @@ func (i *instance) skipToRound(round uint64, chain ECChain, justification *Justi if justification.Vote.Step == PREPARE_PHASE { i.log("⚠️ swaying from %s to %s by skip to round %d", &i.proposal, chain, i.round) - i.candidates = append(i.candidates, chain) + i.addCandidate(chain) i.proposal = chain } i.beginConverge(justification) @@ -795,12 +811,21 @@ func (i *instance) skipToRound(round uint64, chain ECChain, justification *Justi // Returns whether a chain is acceptable as a proposal for this instance to vote for. // This is "EC Compatible" in the pseudocode. func (i *instance) isCandidate(c ECChain) bool { - for _, candidate := range i.candidates { - if c.Eq(candidate) { - return true - } + _, exists := i.candidates[c.Key()] + return exists +} + +func (i *instance) addCandidatePrefixes(c ECChain) { + for l := range c { + i.addCandidate(c.Prefix(l)) + } +} + +func (i *instance) addCandidate(c ECChain) { + key := c.Key() + if _, exists := i.candidates[key]; !exists { + i.candidates[key] = struct{}{} } - return false } func (i *instance) terminate(decision *Justification) { @@ -1279,9 +1304,9 @@ func newConvergeState() *convergeState { } } -// SetSelfValue sets the participant's locally-proposed converge value. -// This means the participant need not rely on messages broadcast to be received by itself. -// See HasSelfValue. +// SetSelfValue sets the participant's locally-proposed converge value. This +// means the participant need not to rely on messages broadcast to be received by +// itself. func (c *convergeState) SetSelfValue(value ECChain, justification *Justification) { // any converge for the given value is better than self-reported // as self-reported has no ticket