From 213eaf4b176fffd839a7e043c8665741304f436b Mon Sep 17 00:00:00 2001 From: Liqun Date: Wed, 18 Dec 2024 18:01:27 +0800 Subject: [PATCH] Improve sync fault tolerance with multiple nodes support (#261) --- cmd/sync.go | 4 ++-- cmd/util/data_context.go | 16 ++++++------- config/config.yml | 8 +++---- sync/catchup/syncer.go | 28 ++++++++++++---------- sync/monitor/monitor.go | 37 ++++++++++++++++++++++++---- sync/sync_db.go | 52 ++++++++++++++++++++++++++++++---------- sync/sync_db_test.go | 6 ++--- sync/sync_eth.go | 50 ++++++++++++++++++++++++++++---------- util/rpc/client_cfx.go | 7 ++++-- util/rpc/client_eth.go | 7 ++++-- util/rpc/config.go | 2 +- 11 files changed, 152 insertions(+), 65 deletions(-) diff --git a/cmd/sync.go b/cmd/sync.go index 381279e5..b6197269 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -81,7 +81,7 @@ func startSyncServiceAdaptively(ctx context.Context, wg *sync.WaitGroup, syncCtx func startSyncCfxDatabase(ctx context.Context, wg *sync.WaitGroup, syncCtx util.SyncContext) *cisync.DatabaseSyncer { logrus.Info("Start to sync core space blockchain data into database") - syncer := cisync.MustNewDatabaseSyncer(syncCtx.SyncCfx, syncCtx.CfxDB) + syncer := cisync.MustNewDatabaseSyncer(syncCtx.SyncCfxs, syncCtx.CfxDB) go syncer.Sync(ctx, wg) // start core space db prune @@ -93,7 +93,7 @@ func startSyncCfxDatabase(ctx context.Context, wg *sync.WaitGroup, syncCtx util. func startSyncEthDatabase(ctx context.Context, wg *sync.WaitGroup, syncCtx util.SyncContext) { logrus.Info("Start to sync evm space blockchain data into database") - ethSyncer := cisync.MustNewEthSyncer(syncCtx.SyncEth, syncCtx.EthDB) + ethSyncer := cisync.MustNewEthSyncer(syncCtx.SyncEths, syncCtx.EthDB) go ethSyncer.Sync(ctx, wg) // start evm space db prune diff --git a/cmd/util/data_context.go b/cmd/util/data_context.go index 170ca232..54de775e 100644 --- a/cmd/util/data_context.go +++ b/cmd/util/data_context.go @@ -74,19 +74,19 @@ func (ctx *StoreContext) GetMysqlStore(network string) (store *mysql.MysqlStore, type SyncContext struct { StoreContext - SyncCfx *sdk.Client - SyncEth *web3go.Client + SyncCfxs []*sdk.Client + SyncEths []*web3go.Client } func MustInitSyncContext(storeCtx StoreContext) SyncContext { sc := SyncContext{StoreContext: storeCtx} if storeCtx.CfxDB != nil || storeCtx.CfxCache != nil { - sc.SyncCfx = rpc.MustNewCfxClientFromViper(rpc.WithClientHookMetrics(true)) + sc.SyncCfxs = rpc.MustNewCfxClientsFromViper(rpc.WithClientHookMetrics(true)) } if storeCtx.EthDB != nil { - sc.SyncEth = rpc.MustNewEthClientFromViper(rpc.WithClientHookMetrics(true)) + sc.SyncEths = rpc.MustNewEthClientsFromViper(rpc.WithClientHookMetrics(true)) } return sc @@ -96,11 +96,11 @@ func (ctx *SyncContext) Close() { // Usually, storeContext will be defer closed by itself // ctx.storeContext.Close() - if ctx.SyncCfx != nil { - ctx.SyncCfx.Close() + for _, client := range ctx.SyncCfxs { + client.Close() } - if ctx.SyncEth != nil { - ctx.SyncEth.Close() + for _, client := range ctx.SyncEths { + client.Close() } } diff --git a/config/config.yml b/config/config.yml index 7aa58dfa..f52d8949 100644 --- a/config/config.yml +++ b/config/config.yml @@ -43,8 +43,8 @@ ethrpc: # Core space SDK client configurations cfx: - # Fullnode HTTP endpoint - http: http://test.confluxrpc.com + # Fullnode HTTP endpoints + http: [http://test.confluxrpc.com] # Retry times if failure, if 0 never retry: 0 # Interval duration before each retry @@ -67,8 +67,8 @@ cfx: # EVM space SDK client configurations eth: - # Fullnode HTTP endpoint - http: http://evmtestnet.confluxrpc.com + # Fullnode HTTP endpoints + http: [http://evmtestnet.confluxrpc.com] # Retry times if failure, if 0 never retry: 0 # Interval duration before each retry diff --git a/sync/catchup/syncer.go b/sync/catchup/syncer.go index ed3ecb43..293845c1 100644 --- a/sync/catchup/syncer.go +++ b/sync/catchup/syncer.go @@ -10,7 +10,6 @@ import ( "github.com/Conflux-Chain/confura/store/mysql" "github.com/Conflux-Chain/confura/sync/election" "github.com/Conflux-Chain/confura/sync/monitor" - "github.com/Conflux-Chain/confura/util" sdk "github.com/Conflux-Chain/go-conflux-sdk" logutil "github.com/Conflux-Chain/go-conflux-util/log" viperutil "github.com/Conflux-Chain/go-conflux-util/viper" @@ -24,8 +23,8 @@ import ( type Syncer struct { // goroutine workers to fetch epoch data concurrently workers []*worker - // conflux sdk client delegated to get network status - cfx sdk.ClientOperator + // conflux sdk clients delegated to get network status + cfxs []*sdk.Client // db store to persist epoch data db *mysql.MysqlStore // min num of db rows per batch persistence @@ -68,7 +67,7 @@ func WithBenchmark(benchmark bool) SyncOption { } func MustNewSyncer( - cfx sdk.ClientOperator, + cfxClients []*sdk.Client, db *mysql.MysqlStore, elm election.LeaderManager, opts ...SyncOption) *Syncer { @@ -89,13 +88,13 @@ func MustNewSyncer( WithWorkers(workers), ) - return newSyncer(cfx, db, elm, append(newOpts, opts...)...) + return newSyncer(cfxClients, db, elm, append(newOpts, opts...)...) } func newSyncer( - cfx sdk.ClientOperator, db *mysql.MysqlStore, + cfxClients []*sdk.Client, db *mysql.MysqlStore, elm election.LeaderManager, opts ...SyncOption) *Syncer { - syncer := &Syncer{elm: elm, db: db, cfx: cfx, minBatchDbRows: 1500} + syncer := &Syncer{elm: elm, db: db, cfxs: cfxClients, minBatchDbRows: 1500} for _, opt := range opts { opt(syncer) } @@ -296,13 +295,16 @@ func (s *Syncer) nextSyncRange() (uint64, uint64, error) { start = 0 } - status, err := s.cfx.GetStatus() - if err != nil { - return 0, 0, errors.WithMessage(err, "failed to get network status") + var retErr error + for _, cfx := range s.cfxs { + status, err := cfx.GetStatus() + if err == nil { + end := max(status.LatestFinalized, status.LatestCheckpoint) + return start, uint64(end), nil + } + retErr = err } - - end := util.MaxUint64(uint64(status.LatestFinalized), uint64(status.LatestCheckpoint)) - return start, end, nil + return 0, 0, errors.WithMessage(retErr, "failed to get network status") } // countDbRows count total db rows and to be stored db row from epoch data. diff --git a/sync/monitor/monitor.go b/sync/monitor/monitor.go index e2b5bc3c..2a951e86 100644 --- a/sync/monitor/monitor.go +++ b/sync/monitor/monitor.go @@ -14,6 +14,17 @@ var ( errSyncHeightNotGrowing = errors.New("sync height not growing") ) +type HealthState string + +const ( + Healthy HealthState = "Healthy" + Unhealthy HealthState = "Unhealthy" +) + +type HealthObserver interface { + OnStateChange(state HealthState, details ...string) +} + // Config holds the configuration parameters for the sync monitor. type Config struct { // Health check configuration @@ -30,7 +41,7 @@ func NewConfig() (conf Config) { return Config{ MaxAllowedLag: 200, MaxStalledDuration: 5 * time.Minute, - Health: health.CounterConfig{Remind: 5}, + Health: health.CounterConfig{Threshold: 2, Remind: 5}, } } @@ -44,6 +55,7 @@ type Monitor struct { lastAdvancedAt time.Time getLatestHeight func() (uint64, error) + observer HealthObserver healthStatus health.Counter } @@ -89,6 +101,10 @@ func (m *Monitor) run(ctx context.Context) { } } +func (m *Monitor) SetObserver(observer HealthObserver) { + m.observer = observer +} + // Update updates the current sync height. func (m *Monitor) Update(newHeight uint64) { m.mu.Lock() @@ -150,16 +166,27 @@ func (m *Monitor) onSuccess() { if recovered { logrus.WithFields(logrus.Fields{ "failures": failures, - }).Info("Sync monitor recovered after failures") + }).Warn("Sync process recovered from failures") + if m.observer != nil { + m.observer.OnStateChange(Healthy) + } } } func (m *Monitor) onFailure(err error, heights ...uint64) { - _, unrecovered, failures := m.healthStatus.OnFailure(m.Health) - if unrecovered { + unhealthy, unrecovered, failures := m.healthStatus.OnFailure(m.Health) + if unhealthy { + logrus.WithFields(logrus.Fields{ + "ctxHeights": heights, + "failures": failures, + }).WithError(err).Error("Sync process becomes unhealthy") + if m.observer != nil { + m.observer.OnStateChange(Unhealthy) + } + } else if unrecovered { logrus.WithFields(logrus.Fields{ "ctxHeights": heights, "failures": failures, - }).WithError(err).Error("Sync monitor not recovered after failures") + }).WithError(err).Error("Sync process still not recovered after failures") } } diff --git a/sync/sync_db.go b/sync/sync_db.go index 4b3e401e..1744e885 100644 --- a/sync/sync_db.go +++ b/sync/sync_db.go @@ -3,6 +3,7 @@ package sync import ( "context" "sync" + "sync/atomic" "time" "github.com/Conflux-Chain/confura/store" @@ -44,8 +45,10 @@ type syncSubConfig struct { // against the latest confirmed epoch. type DatabaseSyncer struct { conf *syncConfig - // conflux sdk client - cfx sdk.ClientOperator + // conflux sdk clients + cfxs []*sdk.Client + // selected sdk client index + cfxIdx atomic.Uint32 // db store db *mysql.MysqlStore // epoch number to sync data from @@ -65,22 +68,33 @@ type DatabaseSyncer struct { } // MustNewDatabaseSyncer creates an instance of DatabaseSyncer to sync blockchain data. -func MustNewDatabaseSyncer(cfx sdk.ClientOperator, db *mysql.MysqlStore) *DatabaseSyncer { +func MustNewDatabaseSyncer(cfxClients []*sdk.Client, db *mysql.MysqlStore) *DatabaseSyncer { + if len(cfxClients) == 0 { + logrus.Fatal("No sdk client provided") + } + var conf syncConfig viperutil.MustUnmarshalKey("sync", &conf) dlm := dlock.NewLockManager(dlock.NewMySQLBackend(db.DB())) - monitor := monitor.NewMonitor(monitor.NewConfig(), func() (uint64, error) { - epoch, err := cfx.GetEpochNumber(types.EpochLatestConfirmed) - if err != nil { - return 0, err + monitor := monitor.NewMonitor(monitor.NewConfig(), func() (latestEpochNum uint64, retErr error) { + for _, cfx := range cfxClients { + epoch, err := cfx.GetEpochNumber(types.EpochLatestConfirmed) + if err == nil { + latestEpochNum = max(latestEpochNum, epoch.ToInt().Uint64()) + } else { + retErr = err + } + } + if latestEpochNum > 0 { + return latestEpochNum, nil } - return epoch.ToInt().Uint64(), nil + return 0, retErr }) syncer := &DatabaseSyncer{ conf: &conf, - cfx: cfx, + cfxs: cfxClients, db: db, epochFrom: 0, maxSyncEpochs: conf.MaxEpochs, @@ -90,6 +104,7 @@ func MustNewDatabaseSyncer(cfx sdk.ClientOperator, db *mysql.MysqlStore) *Databa epochPivotWin: newEpochPivotWindow(syncPivotInfoWinCapacity), elm: election.MustNewLeaderManagerFromViper(dlm, "sync.cfx"), } + monitor.SetObserver(syncer) // Register leader election callbacks syncer.elm.OnElected(func(ctx context.Context, lm election.LeaderManager) { @@ -103,7 +118,7 @@ func MustNewDatabaseSyncer(cfx sdk.ClientOperator, db *mysql.MysqlStore) *Databa }) // Ensure epoch data validity in database - if err := ensureStoreEpochDataOk(cfx, db); err != nil { + if err := ensureStoreEpochDataOk(cfxClients[0], db); err != nil { logrus.WithError(err).Fatal("Db sync failed to ensure epoch data validity in db") } @@ -148,7 +163,7 @@ func (syncer *DatabaseSyncer) Sync(ctx context.Context, wg *sync.WaitGroup) { // fast catch-up until the latest stable epoch // (maximum between the latest finalized and checkpoint epoch) func (syncer *DatabaseSyncer) fastCatchup(ctx context.Context) { - catchUpSyncer := catchup.MustNewSyncer(syncer.cfx, syncer.db, syncer.elm) + catchUpSyncer := catchup.MustNewSyncer(syncer.cfxs, syncer.db, syncer.elm) defer catchUpSyncer.Close() catchUpSyncer.Sync(ctx) @@ -182,8 +197,10 @@ func (syncer *DatabaseSyncer) loadLastSyncEpoch() error { // Sync data once and return true if catch up to the latest confirmed epoch, otherwise false. func (syncer *DatabaseSyncer) syncOnce(ctx context.Context) (bool, error) { + cfx := syncer.cfxs[syncer.cfxIdx.Load()] + // Fetch latest confirmed epoch from blockchain - epoch, err := syncer.cfx.GetEpochNumber(types.EpochLatestConfirmed) + epoch, err := cfx.GetEpochNumber(types.EpochLatestConfirmed) if err != nil { return false, errors.WithMessage( err, "failed to query the latest confirmed epoch number", @@ -217,7 +234,7 @@ func (syncer *DatabaseSyncer) syncOnce(ctx context.Context) (bool, error) { epochNo := syncer.epochFrom + uint64(i) eplogger := logger.WithField("epoch", epochNo) - data, err := store.QueryEpochData(syncer.cfx, epochNo, syncer.conf.UseBatch) + data, err := store.QueryEpochData(cfx, epochNo, syncer.conf.UseBatch) // If epoch pivot chain switched, stop the querying right now since it's pointless to query epoch data // that will be reverted late. @@ -428,3 +445,12 @@ func (syncer *DatabaseSyncer) onLeadershipChanged( logrus.WithField("leaderID", lm.Identity()).Warn("DB syncer lost HA leadership") } } + +func (syncer *DatabaseSyncer) OnStateChange(state monitor.HealthState, details ...string) { + if len(syncer.cfxs) > 1 && state == monitor.Unhealthy { + // Switch to the next cfx client if the sync progress is not healthy + newCfxIdx := (syncer.cfxIdx.Load() + 1) % uint32(len(syncer.cfxs)) + syncer.cfxIdx.Store(newCfxIdx) + logrus.WithField("cfxIndex", newCfxIdx).Info("Switched to the next cfx client") + } +} diff --git a/sync/sync_db_test.go b/sync/sync_db_test.go index ca0403f5..ab62de3e 100644 --- a/sync/sync_db_test.go +++ b/sync/sync_db_test.go @@ -56,7 +56,7 @@ func TestFindFirstRevertedEpochInRange(t *testing.T) { } return false, nil } - res, err := findFirstRevertedEpochInRange(syncer.cfx, syncer.db, tc.epochRange, checker) + res, err := findFirstRevertedEpochInRange(nil, syncer.db, tc.epochRange, checker) assert.Nil(t, err) assert.Equal(t, tc.expected, res) } @@ -109,13 +109,13 @@ func TestEnsureEpochRangeNotRerverted(t *testing.T) { return false, nil } searcher := func(cfx sdk.ClientOperator, s store.StackOperable, epochRange citypes.RangeUint64) (uint64, error) { - return findFirstRevertedEpochInRange(syncer.cfx, syncer.db, epochRange, checker) + return findFirstRevertedEpochInRange(nil, syncer.db, epochRange, checker) } pruner := func(s store.StackOperable, epochRange citypes.RangeUint64) error { assert.Equal(t, tc.expectedPrunedEpochFrom, epochRange.From) return nil } - err := ensureEpochRangeNotRerverted(syncer.cfx, syncer.db, tc.epochRange, searcher, pruner) + err := ensureEpochRangeNotRerverted(nil, syncer.db, tc.epochRange, searcher, pruner) assert.Nil(t, err) } } diff --git a/sync/sync_eth.go b/sync/sync_eth.go index 2bc8f765..aa707eee 100644 --- a/sync/sync_eth.go +++ b/sync/sync_eth.go @@ -3,6 +3,7 @@ package sync import ( "context" "sync" + "sync/atomic" "time" "github.com/Conflux-Chain/confura/rpc/cfxbridge" @@ -31,8 +32,10 @@ type syncEthConfig struct { // EthSyncer is used to synchronize evm space blockchain data into db store. type EthSyncer struct { conf *syncEthConfig - // EVM space ETH client - w3c *web3go.Client + // EVM space web3go clients + w3cs []*web3go.Client + // Selected web3go client index + w3cIdx atomic.Uint32 // EVM space chain id chainId uint32 // db store @@ -54,8 +57,12 @@ type EthSyncer struct { } // MustNewEthSyncer creates an instance of EthSyncer to sync Conflux EVM space chaindata. -func MustNewEthSyncer(ethC *web3go.Client, db *mysql.MysqlStore) *EthSyncer { - ethChainId, err := ethC.Eth.ChainId() +func MustNewEthSyncer(ethClients []*web3go.Client, db *mysql.MysqlStore) *EthSyncer { + if len(ethClients) == 0 { + logrus.Fatal("No web3go client provided") + } + + ethChainId, err := ethClients[0].Eth.ChainId() if err != nil { logrus.WithError(err).Fatal("Failed to get chain ID from eth space") } @@ -64,17 +71,24 @@ func MustNewEthSyncer(ethC *web3go.Client, db *mysql.MysqlStore) *EthSyncer { viperutil.MustUnmarshalKey("sync.eth", ðConf) dlm := dlock.NewLockManager(dlock.NewMySQLBackend(db.DB())) - monitor := monitor.NewMonitor(monitor.NewConfig(), func() (uint64, error) { - block, err := ethC.Eth.BlockByNumber(ethtypes.SafeBlockNumber, false) - if err != nil { - return 0, err + monitor := monitor.NewMonitor(monitor.NewConfig(), func() (latestBlockNum uint64, retErr error) { + for _, ethC := range ethClients { + block, err := ethC.Eth.BlockByNumber(ethtypes.SafeBlockNumber, false) + if err == nil { + latestBlockNum = max(latestBlockNum, block.Number.Uint64()) + } else { + retErr = err + } + } + if latestBlockNum > 0 { + return latestBlockNum, nil } - return block.Number.Uint64(), nil + return 0, retErr }) syncer := &EthSyncer{ conf: ðConf, - w3c: ethC, + w3cs: ethClients, chainId: uint32(*ethChainId), db: db, maxSyncBlocks: ethConf.MaxBlocks, @@ -84,6 +98,7 @@ func MustNewEthSyncer(ethC *web3go.Client, db *mysql.MysqlStore) *EthSyncer { epochPivotWin: newEpochPivotWindow(syncPivotInfoWinCapacity), elm: election.MustNewLeaderManagerFromViper(dlm, "sync.eth"), } + monitor.SetObserver(syncer) // Register leader election callbacks syncer.elm.OnElected(func(ctx context.Context, lm election.LeaderManager) { @@ -158,7 +173,9 @@ func (syncer *EthSyncer) nextBlockTo(maxBlockTo uint64) (uint64, uint64) { // Sync data once and return true if catch up to the most recent block, otherwise false. func (syncer *EthSyncer) syncOnce(ctx context.Context) (bool, error) { - latestBlock, err := syncer.w3c.Eth.BlockByNumber(ethtypes.SafeBlockNumber, false) + w3c := syncer.w3cs[syncer.w3cIdx.Load()] + + latestBlock, err := w3c.Eth.BlockByNumber(ethtypes.SafeBlockNumber, false) if err != nil { return false, errors.WithMessage(err, "failed to query the latest block number") } @@ -196,7 +213,7 @@ func (syncer *EthSyncer) syncOnce(ctx context.Context) (bool, error) { blockNo := syncer.fromBlock + uint64(i) blogger := logger.WithField("block", blockNo) - data, err := store.QueryEthData(ctx, syncer.w3c, blockNo) + data, err := store.QueryEthData(ctx, w3c, blockNo) // If chain re-orged, stop the querying right now since it's pointless to query data // that will be reverted late. @@ -452,3 +469,12 @@ func (syncer *EthSyncer) onLeadershipChanged( logrus.WithField("leaderID", lm.Identity()).Warn("ETH syncer lost HA leadership") } } + +func (syncer *EthSyncer) OnStateChange(state monitor.HealthState, details ...string) { + if len(syncer.w3cs) > 1 && state == monitor.Unhealthy { + // Switch to the next cfx client if the sync progress is not healthy + newW3cIdx := (syncer.w3cIdx.Load() + 1) % uint32(len(syncer.w3cs)) + syncer.w3cIdx.Store(newW3cIdx) + logrus.WithField("w3cIndex", newW3cIdx).Info("Switched to the next web3go client") + } +} diff --git a/util/rpc/client_cfx.go b/util/rpc/client_cfx.go index 87faf26c..eb654693 100644 --- a/util/rpc/client_cfx.go +++ b/util/rpc/client_cfx.go @@ -37,8 +37,11 @@ func (o *cfxClientOption) SetCircuitBreaker(maxFail int, failTimeWindow, openCol } } -func MustNewCfxClientFromViper(options ...ClientOption) *sdk.Client { - return MustNewCfxClient(cfxClientCfg.Http, options...) +func MustNewCfxClientsFromViper(options ...ClientOption) (clients []*sdk.Client) { + for _, url := range cfxClientCfg.Http { + clients = append(clients, MustNewCfxClient(url, options...)) + } + return clients } func MustNewCfxClient(url string, options ...ClientOption) *sdk.Client { diff --git a/util/rpc/client_eth.go b/util/rpc/client_eth.go index 16231a31..73699e2c 100644 --- a/util/rpc/client_eth.go +++ b/util/rpc/client_eth.go @@ -37,8 +37,11 @@ func (o *ethClientOption) SetCircuitBreaker(maxFail int, failTimeWindow, openCol }) } -func MustNewEthClientFromViper(options ...ClientOption) *web3go.Client { - return MustNewEthClient(ethClientCfg.Http, options...) +func MustNewEthClientsFromViper(options ...ClientOption) (clients []*web3go.Client) { + for _, url := range ethClientCfg.Http { + clients = append(clients, MustNewEthClient(url, options...)) + } + return clients } func MustNewEthClient(url string, options ...ClientOption) *web3go.Client { diff --git a/util/rpc/config.go b/util/rpc/config.go index 27f2e428..4e316301 100644 --- a/util/rpc/config.go +++ b/util/rpc/config.go @@ -19,7 +19,7 @@ type circuitBreakerConfig struct { } type clientConfig struct { - Http string + Http []string Retry int RetryInterval time.Duration `default:"1s"` RequestTimeout time.Duration `default:"3s"`