Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve sync HA leader election #184

Merged
merged 2 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.22

require (
github.com/Conflux-Chain/go-conflux-sdk v1.5.9
github.com/Conflux-Chain/go-conflux-util v0.2.1-0.20240401035823-007ed13c1155
github.com/Conflux-Chain/go-conflux-util v0.2.1-0.20240415073045-459b298cdcbd
github.com/Conflux-Chain/web3pay-service v0.0.0-20230609030113-dc3c4d42820a
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce
github.com/buraksezer/consistent v0.9.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ github.com/Conflux-Chain/go-conflux-sdk v1.0.29/go.mod h1:eUYR736AUkH29rjeiPCLR2
github.com/Conflux-Chain/go-conflux-sdk v1.5.9 h1:S61iF4G+uicWJ+k62FDpfCvUciBkKEhWPhPAzWY/yhI=
github.com/Conflux-Chain/go-conflux-sdk v1.5.9/go.mod h1:L1DiAH5zBCZmA4Eq/XLC3JA3d5ecaPczhqrSzk6y+XA=
github.com/Conflux-Chain/go-conflux-util v0.0.0-20220907035343-2d1233bccd70/go.mod h1:dcfcQp/A6V0nu9LLqdcg8yv+lSg378JktLtuPUVcm4k=
github.com/Conflux-Chain/go-conflux-util v0.2.1-0.20240401035823-007ed13c1155 h1:4R2nwLTqzFOhDd+GB7EFYcj22r8927m9aK4auwI2V1A=
github.com/Conflux-Chain/go-conflux-util v0.2.1-0.20240401035823-007ed13c1155/go.mod h1:8WdtAuUTRWLDPnyUgAqw5WPr6hM+WCp4R857nFc1l04=
github.com/Conflux-Chain/go-conflux-util v0.2.1-0.20240415073045-459b298cdcbd h1:zgY7RyJ1Rux70Z0/xf3ZHX+iRruLOApCq/I0ps/DZWk=
github.com/Conflux-Chain/go-conflux-util v0.2.1-0.20240415073045-459b298cdcbd/go.mod h1:8WdtAuUTRWLDPnyUgAqw5WPr6hM+WCp4R857nFc1l04=
github.com/Conflux-Chain/web3pay-service v0.0.0-20230609030113-dc3c4d42820a h1:3Rr/2Z/dCXx1PVh+VQns/R8f8niJ2jvWMViSGHPupJg=
github.com/Conflux-Chain/web3pay-service v0.0.0-20230609030113-dc3c4d42820a/go.mod h1:YCr1LGh6g2mgceQ5K+YBLxWK1Ahq2IgdTDoeZkXbgBY=
github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
Expand Down
7 changes: 3 additions & 4 deletions sync/catchup/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,7 @@ func (s *Syncer) persist(ctx context.Context, state *persistState, bmarker *benc

start := time.Now()
err := s.db.PushnWithFinalizer(state.epochs, func(d *gorm.DB) error {
if !s.elm.Extend(ctx) {
return store.ErrLeaderRenewal
}
return nil
return s.elm.Extend(ctx)
})

if err != nil {
Expand All @@ -288,6 +285,8 @@ func (s *Syncer) nextSyncRange() (uint64, uint64, error) {

if ok {
start++
} else {
start = 0
}

status, err := s.cfx.GetStatus()
Expand Down
53 changes: 28 additions & 25 deletions sync/election/leader_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package election

import (
"context"
"errors"
"sync"
"sync/atomic"
"time"

"github.com/Conflux-Chain/confura/store"
"github.com/Conflux-Chain/go-conflux-util/dlock"
logutil "github.com/Conflux-Chain/go-conflux-util/log"
"github.com/Conflux-Chain/go-conflux-util/viper"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -42,11 +43,11 @@ type LeaderManager interface {
// Wait until being elected as leader or context canceled
Await(ctx context.Context) bool
// Extend extends the leadership lease
Extend(ctx context.Context) bool
Extend(ctx context.Context) error
// Campaign starts the leader election process
Campaign(ctx context.Context)
// Stop stops the leader election process
Stop(ctx context.Context) error
Stop() error
// OnElected registers a leader elected callback function.
OnElected(cb ElectedCallback)
// OnOusted registers a leader ousted callback function.
Expand All @@ -57,22 +58,21 @@ type LeaderManager interface {

// MustNewLeaderManagerFromViper creates a new LeaderManager with the given LockManager and election key.
func MustNewLeaderManagerFromViper(dlm *dlock.LockManager, elecKey string) LeaderManager {
var conf struct {
Config
Enabled bool // leader election enabled or not?
}
var conf Config
viper.MustUnmarshalKey("sync.election", &conf)

if conf.Enabled {
logrus.WithField("config", conf.Config).Info("HA leader election enabled")
return NewDlockLeaderManager(dlm, conf.Config, elecKey)
logrus.WithField("config", conf).Info("HA leader election enabled")
return NewDlockLeaderManager(dlm, conf, elecKey)
}

return &noopLeaderManager{}
}

// Config holds the configuration for the leader election.
type Config struct {
// Leader election enabled or not?
Enabled bool
// Unique identifier for the leader
ID string `default:"leader"`
// Duration of the leader term
Expand Down Expand Up @@ -102,7 +102,7 @@ type DlockLeaderManager struct {

// NewDlockLeaderManager creates a new `LeaderManager` with the provided configuration and election key.
func NewDlockLeaderManager(dlm *dlock.LockManager, conf Config, elecKey string) *DlockLeaderManager {
conf.ID += uuid.NewString()[:8] // append a unique suffix to the `Id`
conf.ID += "#" + uuid.NewString()[:8] // append a unique suffix to the `Id`
return &DlockLeaderManager{
Config: conf,
lockMan: dlm,
Expand Down Expand Up @@ -135,13 +135,16 @@ func (l *DlockLeaderManager) Await(ctx context.Context) bool {
}

// Extend extends leadership lease.
func (l *DlockLeaderManager) Extend(ctx context.Context) bool {
func (l *DlockLeaderManager) Extend(ctx context.Context) error {
if atomic.LoadInt32(&l.electionStatus) == StatusElected {
// acquire the lock to extend the lease
return l.acquireLock(ctx) == nil
if err := l.acquireLock(ctx); err != nil {
return errors.WithMessage(store.ErrLeaderRenewal, err.Error())
}

return nil
}

return false
return store.ErrLeaderRenewal
}

// Campaign starts the election process, which will run in a goroutine until contex canceled.
Expand Down Expand Up @@ -195,12 +198,12 @@ func (l *DlockLeaderManager) Campaign(ctx context.Context) {
}

// Stop stops the leader election process and resigns from the leadership if appliable.
func (l *DlockLeaderManager) Stop(ctx context.Context) error {
func (l *DlockLeaderManager) Stop() error {
if l.cancel != nil {
l.cancel()
}

return l.lockMan.Release(ctx, l.lockIntent())
return l.lockMan.Release(context.Background(), l.lockIntent())
}

// retryLock consistently retries to acquire a lock until success.
Expand Down Expand Up @@ -309,7 +312,7 @@ func (l *DlockLeaderManager) onOusted(ctx context.Context) {
logrus.WithFields(logrus.Fields{
"electionKey": l.electionKey,
"leaderID": l.ID,
}).Warn("Leader ousted")
}).Info("Leader ousted")

atomic.StoreInt32(&l.electionStatus, int32(StatusOusted))

Expand Down Expand Up @@ -348,11 +351,11 @@ func (l *DlockLeaderManager) onError(ctx context.Context, err error) {
// noopLeaderManager dummy leader manager that does nothing at all.
type noopLeaderManager struct{}

func (l *noopLeaderManager) Identity() string { return "noop" }
func (l *noopLeaderManager) Extend(ctx context.Context) bool { return true }
func (l *noopLeaderManager) Await(ctx context.Context) bool { return true }
func (l *noopLeaderManager) Campaign(ctx context.Context) { /* do nothing */ }
func (l *noopLeaderManager) Stop(ctx context.Context) error { return nil }
func (l *noopLeaderManager) OnElected(cb ElectedCallback) { /* do nothing */ }
func (l *noopLeaderManager) OnOusted(cb OustedCallback) { /* do nothing */ }
func (l *noopLeaderManager) OnError(cb ErrorCallback) { /* do nothing */ }
func (l *noopLeaderManager) Identity() string { return "noop" }
func (l *noopLeaderManager) Extend(ctx context.Context) error { return nil }
func (l *noopLeaderManager) Await(ctx context.Context) bool { return true }
func (l *noopLeaderManager) Campaign(ctx context.Context) { /* do nothing */ }
func (l *noopLeaderManager) Stop() error { return nil }
func (l *noopLeaderManager) OnElected(cb ElectedCallback) { /* do nothing */ }
func (l *noopLeaderManager) OnOusted(cb OustedCallback) { /* do nothing */ }
func (l *noopLeaderManager) OnError(cb ErrorCallback) { /* do nothing */ }
67 changes: 34 additions & 33 deletions sync/sync_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func MustNewDatabaseSyncer(cfx sdk.ClientOperator, db *mysql.MysqlStore) *Databa
var conf syncConfig
viperutil.MustUnmarshalKey("sync", &conf)

dlm := dlock.NewLockManager(dlock.NewMySQLBackend(db.DB()))
syncer := &DatabaseSyncer{
conf: &conf,
cfx: cfx,
Expand All @@ -75,13 +76,17 @@ func MustNewDatabaseSyncer(cfx sdk.ClientOperator, db *mysql.MysqlStore) *Databa
syncIntervalNormal: time.Second,
syncIntervalCatchUp: time.Millisecond,
epochPivotWin: newEpochPivotWindow(syncPivotInfoWinCapacity),
elm: election.MustNewLeaderManagerFromViper(dlm, "sync.cfx"),
}

dlm := dlock.NewLockManager(dlock.NewMySQLBackend(db.DB()))
elm := election.MustNewLeaderManagerFromViper(dlm, "sync.cfx")
elm.OnElected(syncer.onLeadershipChanged)
elm.OnOusted(syncer.onLeadershipChanged)
syncer.elm = elm
// Register leader election callbacks
syncer.elm.OnElected(func(ctx context.Context, lm election.LeaderManager) {
syncer.onLeadershipChanged(ctx, lm, true)
})

syncer.elm.OnOusted(func(ctx context.Context, lm election.LeaderManager) {
syncer.onLeadershipChanged(ctx, lm, false)
})

// Ensure epoch data validity in database
if err := ensureStoreEpochDataOk(cfx, db); err != nil {
Expand All @@ -102,7 +107,7 @@ func (syncer *DatabaseSyncer) Sync(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()

go syncer.elm.Campaign(ctx)
defer syncer.elm.Stop(ctx)
defer syncer.elm.Stop()

syncer.fastCatchup(ctx)

Expand Down Expand Up @@ -137,28 +142,28 @@ func (syncer *DatabaseSyncer) fastCatchup(ctx context.Context) {

// Load last sync epoch from databse to continue synchronization.
func (syncer *DatabaseSyncer) mustLoadLastSyncEpoch() {
loaded, err := syncer.loadLastSyncEpoch()
if err != nil {
if err := syncer.loadLastSyncEpoch(); err != nil {
logrus.WithError(err).Fatal("Failed to load last sync epoch range from db")
}

// Load db sync start epoch config on initial loading if necessary.
if !loaded && syncer.conf != nil {
syncer.epochFrom = syncer.conf.FromEpoch
}
}

func (syncer *DatabaseSyncer) loadLastSyncEpoch() (loaded bool, err error) {
func (syncer *DatabaseSyncer) loadLastSyncEpoch() error {
// Load last sync epoch from databse
maxEpoch, ok, err := syncer.db.MaxEpoch()
if err != nil {
return false, errors.WithMessage(err, "failed to get max epoch from epoch to block mapping")
return errors.WithMessage(err, "failed to get max epoch from epoch to block mapping")
}

if ok {
if ok { // continue from the last sync epoch
syncer.epochFrom = maxEpoch + 1
} else { // start from genesis or configured start epoch
syncer.epochFrom = 0
if syncer.conf != nil {
syncer.epochFrom = syncer.conf.FromEpoch
}
}

return ok, nil
return nil
}

// Sync data once and return true if catch up to the latest confirmed epoch, otherwise false.
Expand All @@ -172,7 +177,7 @@ func (syncer *DatabaseSyncer) syncOnce(ctx context.Context) (bool, error) {
}

// Load latest sync epoch from database
if _, err := syncer.loadLastSyncEpoch(); err != nil {
if err := syncer.loadLastSyncEpoch(); err != nil {
return false, errors.WithMessage(err, "failed to load last sync epoch")
}

Expand Down Expand Up @@ -265,17 +270,13 @@ func (syncer *DatabaseSyncer) syncOnce(ctx context.Context) (bool, error) {
}

err = syncer.db.PushnWithFinalizer(epochDataSlice, func(d *gorm.DB) error {
if !syncer.elm.Extend(ctx) {
return store.ErrLeaderRenewal
}

return nil
return syncer.elm.Extend(ctx)
})

if err != nil {
if errors.Is(err, store.ErrLeaderRenewal) {
logger.
WithField("leaderIdentity", syncer.elm.Identity()).
logger.WithField("leaderIdentity", syncer.elm.Identity()).
WithError(err).
Info("Db syncer failed to renew leadership on pushing epoch data to db")
return false, nil
}
Expand Down Expand Up @@ -356,17 +357,13 @@ func (syncer *DatabaseSyncer) pivotSwitchRevert(ctx context.Context, revertTo ui

// remove epoch data from database due to pivot switch
err := syncer.db.PopnWithFinalizer(revertTo, func(tx *gorm.DB) error {
if !syncer.elm.Extend(ctx) {
return store.ErrLeaderRenewal
}

return nil
return syncer.elm.Extend(ctx)
})

if err != nil {
if errors.Is(err, store.ErrLeaderRenewal) {
logger.
WithField("leaderIdentity", syncer.elm.Identity()).
logger.WithField("leaderIdentity", syncer.elm.Identity()).
WithError(err).
Info("Db syncer failed to renew leadership on popping epoch data from db")
return nil
}
Expand Down Expand Up @@ -409,6 +406,10 @@ func (syncer *DatabaseSyncer) latestStoreEpoch() uint64 {
return 0
}

func (syncer *DatabaseSyncer) onLeadershipChanged(ctx context.Context, lm election.LeaderManager) {
func (syncer *DatabaseSyncer) onLeadershipChanged(
ctx context.Context, lm election.LeaderManager, gainedOrLost bool) {
syncer.epochPivotWin.Reset()
if !gainedOrLost && ctx.Err() != context.Canceled {
logrus.WithField("leaderID", lm.Identity()).Warn("DB syncer lost HA leadership")
}
}
Loading
Loading