Skip to content

Commit

Permalink
Improve sync HA leader election
Browse files Browse the repository at this point in the history
  • Loading branch information
wanliqun committed Apr 15, 2024
1 parent 3b098b4 commit 55ac577
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 94 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.22

require (
github.com/Conflux-Chain/go-conflux-sdk v1.5.9
github.com/Conflux-Chain/go-conflux-util v0.2.1-0.20240401035823-007ed13c1155
github.com/Conflux-Chain/go-conflux-util v0.2.1-0.20240415073045-459b298cdcbd
github.com/Conflux-Chain/web3pay-service v0.0.0-20230609030113-dc3c4d42820a
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce
github.com/buraksezer/consistent v0.9.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ github.com/Conflux-Chain/go-conflux-sdk v1.0.29/go.mod h1:eUYR736AUkH29rjeiPCLR2
github.com/Conflux-Chain/go-conflux-sdk v1.5.9 h1:S61iF4G+uicWJ+k62FDpfCvUciBkKEhWPhPAzWY/yhI=
github.com/Conflux-Chain/go-conflux-sdk v1.5.9/go.mod h1:L1DiAH5zBCZmA4Eq/XLC3JA3d5ecaPczhqrSzk6y+XA=
github.com/Conflux-Chain/go-conflux-util v0.0.0-20220907035343-2d1233bccd70/go.mod h1:dcfcQp/A6V0nu9LLqdcg8yv+lSg378JktLtuPUVcm4k=
github.com/Conflux-Chain/go-conflux-util v0.2.1-0.20240401035823-007ed13c1155 h1:4R2nwLTqzFOhDd+GB7EFYcj22r8927m9aK4auwI2V1A=
github.com/Conflux-Chain/go-conflux-util v0.2.1-0.20240401035823-007ed13c1155/go.mod h1:8WdtAuUTRWLDPnyUgAqw5WPr6hM+WCp4R857nFc1l04=
github.com/Conflux-Chain/go-conflux-util v0.2.1-0.20240415073045-459b298cdcbd h1:zgY7RyJ1Rux70Z0/xf3ZHX+iRruLOApCq/I0ps/DZWk=
github.com/Conflux-Chain/go-conflux-util v0.2.1-0.20240415073045-459b298cdcbd/go.mod h1:8WdtAuUTRWLDPnyUgAqw5WPr6hM+WCp4R857nFc1l04=
github.com/Conflux-Chain/web3pay-service v0.0.0-20230609030113-dc3c4d42820a h1:3Rr/2Z/dCXx1PVh+VQns/R8f8niJ2jvWMViSGHPupJg=
github.com/Conflux-Chain/web3pay-service v0.0.0-20230609030113-dc3c4d42820a/go.mod h1:YCr1LGh6g2mgceQ5K+YBLxWK1Ahq2IgdTDoeZkXbgBY=
github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
Expand Down
7 changes: 3 additions & 4 deletions sync/catchup/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,7 @@ func (s *Syncer) persist(ctx context.Context, state *persistState, bmarker *benc

start := time.Now()
err := s.db.PushnWithFinalizer(state.epochs, func(d *gorm.DB) error {
if !s.elm.Extend(ctx) {
return store.ErrLeaderRenewal
}
return nil
return s.elm.Extend(ctx)
})

if err != nil {
Expand All @@ -288,6 +285,8 @@ func (s *Syncer) nextSyncRange() (uint64, uint64, error) {

if ok {
start++
} else {
start = 0
}

status, err := s.cfx.GetStatus()
Expand Down
47 changes: 25 additions & 22 deletions sync/election/leader_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package election

import (
"context"
"errors"
"sync"
"sync/atomic"
"time"

"github.com/Conflux-Chain/confura/store"
"github.com/Conflux-Chain/go-conflux-util/dlock"
logutil "github.com/Conflux-Chain/go-conflux-util/log"
"github.com/Conflux-Chain/go-conflux-util/viper"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -42,7 +43,7 @@ type LeaderManager interface {
// Wait until being elected as leader or context canceled
Await(ctx context.Context) bool
// Extend extends the leadership lease
Extend(ctx context.Context) bool
Extend(ctx context.Context) error
// Campaign starts the leader election process
Campaign(ctx context.Context)
// Stop stops the leader election process
Expand All @@ -57,22 +58,21 @@ type LeaderManager interface {

// MustNewLeaderManagerFromViper creates a new LeaderManager with the given LockManager and election key.
func MustNewLeaderManagerFromViper(dlm *dlock.LockManager, elecKey string) LeaderManager {
var conf struct {
Config
Enabled bool // leader election enabled or not?
}
var conf Config
viper.MustUnmarshalKey("sync.election", &conf)

if conf.Enabled {
logrus.WithField("config", conf.Config).Info("HA leader election enabled")
return NewDlockLeaderManager(dlm, conf.Config, elecKey)
logrus.WithField("config", conf).Info("HA leader election enabled")
return NewDlockLeaderManager(dlm, conf, elecKey)
}

return &noopLeaderManager{}
}

// Config holds the configuration for the leader election.
type Config struct {
// Leader election enabled or not?
Enabled bool
// Unique identifier for the leader
ID string `default:"leader"`
// Duration of the leader term
Expand Down Expand Up @@ -102,7 +102,7 @@ type DlockLeaderManager struct {

// NewDlockLeaderManager creates a new `LeaderManager` with the provided configuration and election key.
func NewDlockLeaderManager(dlm *dlock.LockManager, conf Config, elecKey string) *DlockLeaderManager {
conf.ID += uuid.NewString()[:8] // append a unique suffix to the `Id`
conf.ID += "#" + uuid.NewString()[:8] // append a unique suffix to the `Id`
return &DlockLeaderManager{
Config: conf,
lockMan: dlm,
Expand Down Expand Up @@ -135,13 +135,16 @@ func (l *DlockLeaderManager) Await(ctx context.Context) bool {
}

// Extend extends leadership lease.
func (l *DlockLeaderManager) Extend(ctx context.Context) bool {
func (l *DlockLeaderManager) Extend(ctx context.Context) error {
if atomic.LoadInt32(&l.electionStatus) == StatusElected {
// acquire the lock to extend the lease
return l.acquireLock(ctx) == nil
if err := l.acquireLock(ctx); err != nil {
return errors.WithMessage(store.ErrLeaderRenewal, err.Error())
}

return nil
}

return false
return store.ErrLeaderRenewal
}

// Campaign starts the election process, which will run in a goroutine until contex canceled.
Expand Down Expand Up @@ -309,7 +312,7 @@ func (l *DlockLeaderManager) onOusted(ctx context.Context) {
logrus.WithFields(logrus.Fields{
"electionKey": l.electionKey,
"leaderID": l.ID,
}).Warn("Leader ousted")
}).Info("Leader ousted")

atomic.StoreInt32(&l.electionStatus, int32(StatusOusted))

Expand Down Expand Up @@ -348,11 +351,11 @@ func (l *DlockLeaderManager) onError(ctx context.Context, err error) {
// noopLeaderManager dummy leader manager that does nothing at all.
type noopLeaderManager struct{}

func (l *noopLeaderManager) Identity() string { return "noop" }
func (l *noopLeaderManager) Extend(ctx context.Context) bool { return true }
func (l *noopLeaderManager) Await(ctx context.Context) bool { return true }
func (l *noopLeaderManager) Campaign(ctx context.Context) { /* do nothing */ }
func (l *noopLeaderManager) Stop(ctx context.Context) error { return nil }
func (l *noopLeaderManager) OnElected(cb ElectedCallback) { /* do nothing */ }
func (l *noopLeaderManager) OnOusted(cb OustedCallback) { /* do nothing */ }
func (l *noopLeaderManager) OnError(cb ErrorCallback) { /* do nothing */ }
func (l *noopLeaderManager) Identity() string { return "noop" }
func (l *noopLeaderManager) Extend(ctx context.Context) error { return nil }
func (l *noopLeaderManager) Await(ctx context.Context) bool { return true }
func (l *noopLeaderManager) Campaign(ctx context.Context) { /* do nothing */ }
func (l *noopLeaderManager) Stop(ctx context.Context) error { return nil }
func (l *noopLeaderManager) OnElected(cb ElectedCallback) { /* do nothing */ }
func (l *noopLeaderManager) OnOusted(cb OustedCallback) { /* do nothing */ }
func (l *noopLeaderManager) OnError(cb ErrorCallback) { /* do nothing */ }
67 changes: 34 additions & 33 deletions sync/sync_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func MustNewDatabaseSyncer(cfx sdk.ClientOperator, db *mysql.MysqlStore) *Databa
var conf syncConfig
viperutil.MustUnmarshalKey("sync", &conf)

dlm := dlock.NewLockManager(dlock.NewMySQLBackend(db.DB()))
syncer := &DatabaseSyncer{
conf: &conf,
cfx: cfx,
Expand All @@ -75,13 +76,17 @@ func MustNewDatabaseSyncer(cfx sdk.ClientOperator, db *mysql.MysqlStore) *Databa
syncIntervalNormal: time.Second,
syncIntervalCatchUp: time.Millisecond,
epochPivotWin: newEpochPivotWindow(syncPivotInfoWinCapacity),
elm: election.MustNewLeaderManagerFromViper(dlm, "sync.cfx"),
}

dlm := dlock.NewLockManager(dlock.NewMySQLBackend(db.DB()))
elm := election.MustNewLeaderManagerFromViper(dlm, "sync.cfx")
elm.OnElected(syncer.onLeadershipChanged)
elm.OnOusted(syncer.onLeadershipChanged)
syncer.elm = elm
// Register leader election callbacks
syncer.elm.OnElected(func(ctx context.Context, lm election.LeaderManager) {
syncer.onLeadershipChanged(ctx, lm, true)
})

syncer.elm.OnOusted(func(ctx context.Context, lm election.LeaderManager) {
syncer.onLeadershipChanged(ctx, lm, false)
})

// Ensure epoch data validity in database
if err := ensureStoreEpochDataOk(cfx, db); err != nil {
Expand All @@ -102,7 +107,7 @@ func (syncer *DatabaseSyncer) Sync(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()

go syncer.elm.Campaign(ctx)
defer syncer.elm.Stop(ctx)
defer syncer.elm.Stop(context.Background())

syncer.fastCatchup(ctx)

Expand Down Expand Up @@ -137,28 +142,28 @@ func (syncer *DatabaseSyncer) fastCatchup(ctx context.Context) {

// Load last sync epoch from databse to continue synchronization.
func (syncer *DatabaseSyncer) mustLoadLastSyncEpoch() {
loaded, err := syncer.loadLastSyncEpoch()
if err != nil {
if err := syncer.loadLastSyncEpoch(); err != nil {
logrus.WithError(err).Fatal("Failed to load last sync epoch range from db")
}

// Load db sync start epoch config on initial loading if necessary.
if !loaded && syncer.conf != nil {
syncer.epochFrom = syncer.conf.FromEpoch
}
}

func (syncer *DatabaseSyncer) loadLastSyncEpoch() (loaded bool, err error) {
func (syncer *DatabaseSyncer) loadLastSyncEpoch() error {
// Load last sync epoch from databse
maxEpoch, ok, err := syncer.db.MaxEpoch()
if err != nil {
return false, errors.WithMessage(err, "failed to get max epoch from epoch to block mapping")
return errors.WithMessage(err, "failed to get max epoch from epoch to block mapping")
}

if ok {
if ok { // continue from the last sync epoch
syncer.epochFrom = maxEpoch + 1
} else { // start from genesis or configured start epoch
syncer.epochFrom = 0
if syncer.conf != nil {
syncer.epochFrom = syncer.conf.FromEpoch
}
}

return ok, nil
return nil
}

// Sync data once and return true if catch up to the latest confirmed epoch, otherwise false.
Expand All @@ -172,7 +177,7 @@ func (syncer *DatabaseSyncer) syncOnce(ctx context.Context) (bool, error) {
}

// Load latest sync epoch from database
if _, err := syncer.loadLastSyncEpoch(); err != nil {
if err := syncer.loadLastSyncEpoch(); err != nil {
return false, errors.WithMessage(err, "failed to load last sync epoch")
}

Expand Down Expand Up @@ -265,17 +270,13 @@ func (syncer *DatabaseSyncer) syncOnce(ctx context.Context) (bool, error) {
}

err = syncer.db.PushnWithFinalizer(epochDataSlice, func(d *gorm.DB) error {
if !syncer.elm.Extend(ctx) {
return store.ErrLeaderRenewal
}

return nil
return syncer.elm.Extend(ctx)
})

if err != nil {
if errors.Is(err, store.ErrLeaderRenewal) {
logger.
WithField("leaderIdentity", syncer.elm.Identity()).
logger.WithField("leaderIdentity", syncer.elm.Identity()).
WithError(err).
Info("Db syncer failed to renew leadership on pushing epoch data to db")
return false, nil
}
Expand Down Expand Up @@ -356,17 +357,13 @@ func (syncer *DatabaseSyncer) pivotSwitchRevert(ctx context.Context, revertTo ui

// remove epoch data from database due to pivot switch
err := syncer.db.PopnWithFinalizer(revertTo, func(tx *gorm.DB) error {
if !syncer.elm.Extend(ctx) {
return store.ErrLeaderRenewal
}

return nil
return syncer.elm.Extend(ctx)
})

if err != nil {
if errors.Is(err, store.ErrLeaderRenewal) {
logger.
WithField("leaderIdentity", syncer.elm.Identity()).
logger.WithField("leaderIdentity", syncer.elm.Identity()).
WithError(err).
Info("Db syncer failed to renew leadership on popping epoch data from db")
return nil
}
Expand Down Expand Up @@ -409,6 +406,10 @@ func (syncer *DatabaseSyncer) latestStoreEpoch() uint64 {
return 0
}

func (syncer *DatabaseSyncer) onLeadershipChanged(ctx context.Context, lm election.LeaderManager) {
func (syncer *DatabaseSyncer) onLeadershipChanged(
ctx context.Context, lm election.LeaderManager, gainedOrLost bool) {
syncer.epochPivotWin.Reset()
if !gainedOrLost && ctx.Err() != context.Canceled {
logrus.WithField("leaderID", lm.Identity()).Warn("DB syncer lost HA leadership")
}
}
Loading

0 comments on commit 55ac577

Please sign in to comment.