diff --git a/go.mod b/go.mod index a53156ce..3c63c92e 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.22 require ( github.com/Conflux-Chain/go-conflux-sdk v1.5.9 - github.com/Conflux-Chain/go-conflux-util v0.2.1-0.20240401035823-007ed13c1155 + github.com/Conflux-Chain/go-conflux-util v0.2.1-0.20240415073045-459b298cdcbd github.com/Conflux-Chain/web3pay-service v0.0.0-20230609030113-dc3c4d42820a github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce github.com/buraksezer/consistent v0.9.0 diff --git a/go.sum b/go.sum index 249265f3..e4e7a4cd 100644 --- a/go.sum +++ b/go.sum @@ -71,8 +71,8 @@ github.com/Conflux-Chain/go-conflux-sdk v1.0.29/go.mod h1:eUYR736AUkH29rjeiPCLR2 github.com/Conflux-Chain/go-conflux-sdk v1.5.9 h1:S61iF4G+uicWJ+k62FDpfCvUciBkKEhWPhPAzWY/yhI= github.com/Conflux-Chain/go-conflux-sdk v1.5.9/go.mod h1:L1DiAH5zBCZmA4Eq/XLC3JA3d5ecaPczhqrSzk6y+XA= github.com/Conflux-Chain/go-conflux-util v0.0.0-20220907035343-2d1233bccd70/go.mod h1:dcfcQp/A6V0nu9LLqdcg8yv+lSg378JktLtuPUVcm4k= -github.com/Conflux-Chain/go-conflux-util v0.2.1-0.20240401035823-007ed13c1155 h1:4R2nwLTqzFOhDd+GB7EFYcj22r8927m9aK4auwI2V1A= -github.com/Conflux-Chain/go-conflux-util v0.2.1-0.20240401035823-007ed13c1155/go.mod h1:8WdtAuUTRWLDPnyUgAqw5WPr6hM+WCp4R857nFc1l04= +github.com/Conflux-Chain/go-conflux-util v0.2.1-0.20240415073045-459b298cdcbd h1:zgY7RyJ1Rux70Z0/xf3ZHX+iRruLOApCq/I0ps/DZWk= +github.com/Conflux-Chain/go-conflux-util v0.2.1-0.20240415073045-459b298cdcbd/go.mod h1:8WdtAuUTRWLDPnyUgAqw5WPr6hM+WCp4R857nFc1l04= github.com/Conflux-Chain/web3pay-service v0.0.0-20230609030113-dc3c4d42820a h1:3Rr/2Z/dCXx1PVh+VQns/R8f8niJ2jvWMViSGHPupJg= github.com/Conflux-Chain/web3pay-service v0.0.0-20230609030113-dc3c4d42820a/go.mod h1:YCr1LGh6g2mgceQ5K+YBLxWK1Ahq2IgdTDoeZkXbgBY= github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= diff --git a/sync/catchup/syncer.go b/sync/catchup/syncer.go index 4dd4e4da..4c0c918c 100644 --- a/sync/catchup/syncer.go +++ b/sync/catchup/syncer.go @@ -261,10 +261,7 @@ func (s *Syncer) persist(ctx context.Context, state *persistState, bmarker *benc start := time.Now() err := s.db.PushnWithFinalizer(state.epochs, func(d *gorm.DB) error { - if !s.elm.Extend(ctx) { - return store.ErrLeaderRenewal - } - return nil + return s.elm.Extend(ctx) }) if err != nil { @@ -288,6 +285,8 @@ func (s *Syncer) nextSyncRange() (uint64, uint64, error) { if ok { start++ + } else { + start = 0 } status, err := s.cfx.GetStatus() diff --git a/sync/election/leader_manager.go b/sync/election/leader_manager.go index 1444bca3..c9af9e0e 100644 --- a/sync/election/leader_manager.go +++ b/sync/election/leader_manager.go @@ -2,15 +2,16 @@ package election import ( "context" - "errors" "sync" "sync/atomic" "time" + "github.com/Conflux-Chain/confura/store" "github.com/Conflux-Chain/go-conflux-util/dlock" logutil "github.com/Conflux-Chain/go-conflux-util/log" "github.com/Conflux-Chain/go-conflux-util/viper" "github.com/google/uuid" + "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -42,7 +43,7 @@ type LeaderManager interface { // Wait until being elected as leader or context canceled Await(ctx context.Context) bool // Extend extends the leadership lease - Extend(ctx context.Context) bool + Extend(ctx context.Context) error // Campaign starts the leader election process Campaign(ctx context.Context) // Stop stops the leader election process @@ -57,15 +58,12 @@ type LeaderManager interface { // MustNewLeaderManagerFromViper creates a new LeaderManager with the given LockManager and election key. func MustNewLeaderManagerFromViper(dlm *dlock.LockManager, elecKey string) LeaderManager { - var conf struct { - Config - Enabled bool // leader election enabled or not? - } + var conf Config viper.MustUnmarshalKey("sync.election", &conf) if conf.Enabled { - logrus.WithField("config", conf.Config).Info("HA leader election enabled") - return NewDlockLeaderManager(dlm, conf.Config, elecKey) + logrus.WithField("config", conf).Info("HA leader election enabled") + return NewDlockLeaderManager(dlm, conf, elecKey) } return &noopLeaderManager{} @@ -73,6 +71,8 @@ func MustNewLeaderManagerFromViper(dlm *dlock.LockManager, elecKey string) Leade // Config holds the configuration for the leader election. type Config struct { + // Leader election enabled or not? + Enabled bool // Unique identifier for the leader ID string `default:"leader"` // Duration of the leader term @@ -102,7 +102,7 @@ type DlockLeaderManager struct { // NewDlockLeaderManager creates a new `LeaderManager` with the provided configuration and election key. func NewDlockLeaderManager(dlm *dlock.LockManager, conf Config, elecKey string) *DlockLeaderManager { - conf.ID += uuid.NewString()[:8] // append a unique suffix to the `Id` + conf.ID += "#" + uuid.NewString()[:8] // append a unique suffix to the `Id` return &DlockLeaderManager{ Config: conf, lockMan: dlm, @@ -135,13 +135,16 @@ func (l *DlockLeaderManager) Await(ctx context.Context) bool { } // Extend extends leadership lease. -func (l *DlockLeaderManager) Extend(ctx context.Context) bool { +func (l *DlockLeaderManager) Extend(ctx context.Context) error { if atomic.LoadInt32(&l.electionStatus) == StatusElected { - // acquire the lock to extend the lease - return l.acquireLock(ctx) == nil + if err := l.acquireLock(ctx); err != nil { + return errors.WithMessage(store.ErrLeaderRenewal, err.Error()) + } + + return nil } - return false + return store.ErrLeaderRenewal } // Campaign starts the election process, which will run in a goroutine until contex canceled. @@ -309,7 +312,7 @@ func (l *DlockLeaderManager) onOusted(ctx context.Context) { logrus.WithFields(logrus.Fields{ "electionKey": l.electionKey, "leaderID": l.ID, - }).Warn("Leader ousted") + }).Info("Leader ousted") atomic.StoreInt32(&l.electionStatus, int32(StatusOusted)) @@ -348,11 +351,11 @@ func (l *DlockLeaderManager) onError(ctx context.Context, err error) { // noopLeaderManager dummy leader manager that does nothing at all. type noopLeaderManager struct{} -func (l *noopLeaderManager) Identity() string { return "noop" } -func (l *noopLeaderManager) Extend(ctx context.Context) bool { return true } -func (l *noopLeaderManager) Await(ctx context.Context) bool { return true } -func (l *noopLeaderManager) Campaign(ctx context.Context) { /* do nothing */ } -func (l *noopLeaderManager) Stop(ctx context.Context) error { return nil } -func (l *noopLeaderManager) OnElected(cb ElectedCallback) { /* do nothing */ } -func (l *noopLeaderManager) OnOusted(cb OustedCallback) { /* do nothing */ } -func (l *noopLeaderManager) OnError(cb ErrorCallback) { /* do nothing */ } +func (l *noopLeaderManager) Identity() string { return "noop" } +func (l *noopLeaderManager) Extend(ctx context.Context) error { return nil } +func (l *noopLeaderManager) Await(ctx context.Context) bool { return true } +func (l *noopLeaderManager) Campaign(ctx context.Context) { /* do nothing */ } +func (l *noopLeaderManager) Stop(ctx context.Context) error { return nil } +func (l *noopLeaderManager) OnElected(cb ElectedCallback) { /* do nothing */ } +func (l *noopLeaderManager) OnOusted(cb OustedCallback) { /* do nothing */ } +func (l *noopLeaderManager) OnError(cb ErrorCallback) { /* do nothing */ } diff --git a/sync/sync_db.go b/sync/sync_db.go index bce4be49..b3eb4dfa 100644 --- a/sync/sync_db.go +++ b/sync/sync_db.go @@ -66,6 +66,7 @@ func MustNewDatabaseSyncer(cfx sdk.ClientOperator, db *mysql.MysqlStore) *Databa var conf syncConfig viperutil.MustUnmarshalKey("sync", &conf) + dlm := dlock.NewLockManager(dlock.NewMySQLBackend(db.DB())) syncer := &DatabaseSyncer{ conf: &conf, cfx: cfx, @@ -75,13 +76,17 @@ func MustNewDatabaseSyncer(cfx sdk.ClientOperator, db *mysql.MysqlStore) *Databa syncIntervalNormal: time.Second, syncIntervalCatchUp: time.Millisecond, epochPivotWin: newEpochPivotWindow(syncPivotInfoWinCapacity), + elm: election.MustNewLeaderManagerFromViper(dlm, "sync.cfx"), } - dlm := dlock.NewLockManager(dlock.NewMySQLBackend(db.DB())) - elm := election.MustNewLeaderManagerFromViper(dlm, "sync.cfx") - elm.OnElected(syncer.onLeadershipChanged) - elm.OnOusted(syncer.onLeadershipChanged) - syncer.elm = elm + // Register leader election callbacks + syncer.elm.OnElected(func(ctx context.Context, lm election.LeaderManager) { + syncer.onLeadershipChanged(ctx, lm, true) + }) + + syncer.elm.OnOusted(func(ctx context.Context, lm election.LeaderManager) { + syncer.onLeadershipChanged(ctx, lm, false) + }) // Ensure epoch data validity in database if err := ensureStoreEpochDataOk(cfx, db); err != nil { @@ -102,7 +107,7 @@ func (syncer *DatabaseSyncer) Sync(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() go syncer.elm.Campaign(ctx) - defer syncer.elm.Stop(ctx) + defer syncer.elm.Stop(context.Background()) syncer.fastCatchup(ctx) @@ -137,28 +142,28 @@ func (syncer *DatabaseSyncer) fastCatchup(ctx context.Context) { // Load last sync epoch from databse to continue synchronization. func (syncer *DatabaseSyncer) mustLoadLastSyncEpoch() { - loaded, err := syncer.loadLastSyncEpoch() - if err != nil { + if err := syncer.loadLastSyncEpoch(); err != nil { logrus.WithError(err).Fatal("Failed to load last sync epoch range from db") } - - // Load db sync start epoch config on initial loading if necessary. - if !loaded && syncer.conf != nil { - syncer.epochFrom = syncer.conf.FromEpoch - } } -func (syncer *DatabaseSyncer) loadLastSyncEpoch() (loaded bool, err error) { +func (syncer *DatabaseSyncer) loadLastSyncEpoch() error { + // Load last sync epoch from databse maxEpoch, ok, err := syncer.db.MaxEpoch() if err != nil { - return false, errors.WithMessage(err, "failed to get max epoch from epoch to block mapping") + return errors.WithMessage(err, "failed to get max epoch from epoch to block mapping") } - if ok { + if ok { // continue from the last sync epoch syncer.epochFrom = maxEpoch + 1 + } else { // start from genesis or configured start epoch + syncer.epochFrom = 0 + if syncer.conf != nil { + syncer.epochFrom = syncer.conf.FromEpoch + } } - return ok, nil + return nil } // Sync data once and return true if catch up to the latest confirmed epoch, otherwise false. @@ -172,7 +177,7 @@ func (syncer *DatabaseSyncer) syncOnce(ctx context.Context) (bool, error) { } // Load latest sync epoch from database - if _, err := syncer.loadLastSyncEpoch(); err != nil { + if err := syncer.loadLastSyncEpoch(); err != nil { return false, errors.WithMessage(err, "failed to load last sync epoch") } @@ -265,17 +270,13 @@ func (syncer *DatabaseSyncer) syncOnce(ctx context.Context) (bool, error) { } err = syncer.db.PushnWithFinalizer(epochDataSlice, func(d *gorm.DB) error { - if !syncer.elm.Extend(ctx) { - return store.ErrLeaderRenewal - } - - return nil + return syncer.elm.Extend(ctx) }) if err != nil { if errors.Is(err, store.ErrLeaderRenewal) { - logger. - WithField("leaderIdentity", syncer.elm.Identity()). + logger.WithField("leaderIdentity", syncer.elm.Identity()). + WithError(err). Info("Db syncer failed to renew leadership on pushing epoch data to db") return false, nil } @@ -356,17 +357,13 @@ func (syncer *DatabaseSyncer) pivotSwitchRevert(ctx context.Context, revertTo ui // remove epoch data from database due to pivot switch err := syncer.db.PopnWithFinalizer(revertTo, func(tx *gorm.DB) error { - if !syncer.elm.Extend(ctx) { - return store.ErrLeaderRenewal - } - - return nil + return syncer.elm.Extend(ctx) }) if err != nil { if errors.Is(err, store.ErrLeaderRenewal) { - logger. - WithField("leaderIdentity", syncer.elm.Identity()). + logger.WithField("leaderIdentity", syncer.elm.Identity()). + WithError(err). Info("Db syncer failed to renew leadership on popping epoch data from db") return nil } @@ -409,6 +406,10 @@ func (syncer *DatabaseSyncer) latestStoreEpoch() uint64 { return 0 } -func (syncer *DatabaseSyncer) onLeadershipChanged(ctx context.Context, lm election.LeaderManager) { +func (syncer *DatabaseSyncer) onLeadershipChanged( + ctx context.Context, lm election.LeaderManager, gainedOrLost bool) { syncer.epochPivotWin.Reset() + if !gainedOrLost && ctx.Err() != context.Canceled { + logrus.WithField("leaderID", lm.Identity()).Warn("DB syncer lost HA leadership") + } } diff --git a/sync/sync_eth.go b/sync/sync_eth.go index a2db1a1d..bdaee507 100644 --- a/sync/sync_eth.go +++ b/sync/sync_eth.go @@ -66,6 +66,7 @@ func MustNewEthSyncer(ethC *web3go.Client, db *mysql.MysqlStore) *EthSyncer { var ethConf syncEthConfig viperutil.MustUnmarshalKey("sync.eth", ðConf) + dlm := dlock.NewLockManager(dlock.NewMySQLBackend(db.DB())) syncer := &EthSyncer{ conf: ðConf, w3c: ethC, @@ -75,13 +76,17 @@ func MustNewEthSyncer(ethC *web3go.Client, db *mysql.MysqlStore) *EthSyncer { syncIntervalNormal: time.Second, syncIntervalCatchUp: time.Millisecond, epochPivotWin: newEpochPivotWindow(syncPivotInfoWinCapacity), + elm: election.MustNewLeaderManagerFromViper(dlm, "sync.eth"), } - dlm := dlock.NewLockManager(dlock.NewMySQLBackend(db.DB())) - elm := election.MustNewLeaderManagerFromViper(dlm, "sync.eth") - elm.OnElected(syncer.onLeadershipChanged) - elm.OnOusted(syncer.onLeadershipChanged) - syncer.elm = elm + // Register leader election callbacks + syncer.elm.OnElected(func(ctx context.Context, lm election.LeaderManager) { + syncer.onLeadershipChanged(ctx, lm, true) + }) + + syncer.elm.OnOusted(func(ctx context.Context, lm election.LeaderManager) { + syncer.onLeadershipChanged(ctx, lm, false) + }) // Load last sync block information syncer.mustLoadLastSyncBlock() @@ -97,7 +102,7 @@ func (syncer *EthSyncer) Sync(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() go syncer.elm.Campaign(ctx) - defer syncer.elm.Stop(ctx) + defer syncer.elm.Stop(context.Background()) ticker := time.NewTimer(syncer.syncIntervalCatchUp) defer ticker.Stop() @@ -156,7 +161,7 @@ func (syncer *EthSyncer) syncOnce(ctx context.Context) (bool, error) { } // Load latest sync block from database - if _, err := syncer.loadLastSyncBlock(); err != nil { + if err := syncer.loadLastSyncBlock(); err != nil { return false, errors.WithMessage(err, "failed to load last sync epoch") } @@ -262,16 +267,13 @@ func (syncer *EthSyncer) syncOnce(ctx context.Context) (bool, error) { } err = syncer.db.PushnWithFinalizer(epochDataSlice, func(d *gorm.DB) error { - if !syncer.elm.Extend(ctx) { - return store.ErrLeaderRenewal - } - return nil + return syncer.elm.Extend(ctx) }) if err != nil { if errors.Is(err, store.ErrLeaderRenewal) { - logger. - WithField("leaderIdentity", syncer.elm.Identity()). + logger.WithField("leaderIdentity", syncer.elm.Identity()). + WithError(err). Info("ETH syncer failed to renew leadership on pushing eth data to db") return false, nil } @@ -321,17 +323,13 @@ func (syncer *EthSyncer) reorgRevert(ctx context.Context, revertTo uint64) error // remove block data from database due to chain re-org err := syncer.db.PopnWithFinalizer(revertTo, func(d *gorm.DB) error { - if !syncer.elm.Extend(ctx) { - return store.ErrLeaderRenewal - } - - return nil + return syncer.elm.Extend(ctx) }) if err != nil { if errors.Is(err, store.ErrLeaderRenewal) { - logger. - WithField("leaderIdentity", syncer.elm.Identity()). + logger.WithField("leaderIdentity", syncer.elm.Identity()). + WithError(err). Info("ETH syncer failed to renew leadership on popping eth data from db") return nil } @@ -353,28 +351,28 @@ func (syncer *EthSyncer) reorgRevert(ctx context.Context, revertTo uint64) error // Load last sync block from databse to continue synchronization. func (syncer *EthSyncer) mustLoadLastSyncBlock() { - loaded, err := syncer.loadLastSyncBlock() - if err != nil { + if err := syncer.loadLastSyncBlock(); err != nil { logrus.WithError(err).Fatal("Failed to load last sync block range from ethdb") } - - // Load eth sync start block config on initial loading if necessary. - if !loaded && syncer.conf != nil { - syncer.fromBlock = syncer.conf.FromBlock - } } -func (syncer *EthSyncer) loadLastSyncBlock() (loaded bool, err error) { +func (syncer *EthSyncer) loadLastSyncBlock() error { + // load last sync block from databse maxBlock, ok, err := syncer.db.MaxEpoch() if err != nil { - return false, errors.WithMessage(err, "failed to get max block from e2b mapping") + return errors.WithMessage(err, "failed to get max block from e2b mapping") } - if ok { + if ok { // continue from the last sync epoch syncer.fromBlock = maxBlock + 1 + } else { // start from genesis or configured start block + syncer.fromBlock = 0 + if syncer.conf != nil { + syncer.fromBlock = syncer.conf.FromBlock + } } - return ok, nil + return nil } func (syncer *EthSyncer) getStoreLatestBlockHash() (string, error) { @@ -427,6 +425,10 @@ func (syncer *EthSyncer) latestStoreBlock() uint64 { return 0 } -func (syncer *EthSyncer) onLeadershipChanged(ctx context.Context, lm election.LeaderManager) { +func (syncer *EthSyncer) onLeadershipChanged( + ctx context.Context, lm election.LeaderManager, gainedOrLost bool) { syncer.epochPivotWin.Reset() + if !gainedOrLost && ctx.Err() != context.Canceled { + logrus.WithField("leaderID", lm.Identity()).Warn("ETH syncer lost HA leadership") + } }