Skip to content

Commit

Permalink
Improve sync fault tolerance with multiple nodes support (#261)
Browse files Browse the repository at this point in the history
  • Loading branch information
wanliqun authored Dec 18, 2024
1 parent 1508b84 commit 213eaf4
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 65 deletions.
4 changes: 2 additions & 2 deletions cmd/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func startSyncServiceAdaptively(ctx context.Context, wg *sync.WaitGroup, syncCtx
func startSyncCfxDatabase(ctx context.Context, wg *sync.WaitGroup, syncCtx util.SyncContext) *cisync.DatabaseSyncer {
logrus.Info("Start to sync core space blockchain data into database")

syncer := cisync.MustNewDatabaseSyncer(syncCtx.SyncCfx, syncCtx.CfxDB)
syncer := cisync.MustNewDatabaseSyncer(syncCtx.SyncCfxs, syncCtx.CfxDB)
go syncer.Sync(ctx, wg)

// start core space db prune
Expand All @@ -93,7 +93,7 @@ func startSyncCfxDatabase(ctx context.Context, wg *sync.WaitGroup, syncCtx util.
func startSyncEthDatabase(ctx context.Context, wg *sync.WaitGroup, syncCtx util.SyncContext) {
logrus.Info("Start to sync evm space blockchain data into database")

ethSyncer := cisync.MustNewEthSyncer(syncCtx.SyncEth, syncCtx.EthDB)
ethSyncer := cisync.MustNewEthSyncer(syncCtx.SyncEths, syncCtx.EthDB)
go ethSyncer.Sync(ctx, wg)

// start evm space db prune
Expand Down
16 changes: 8 additions & 8 deletions cmd/util/data_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,19 @@ func (ctx *StoreContext) GetMysqlStore(network string) (store *mysql.MysqlStore,
type SyncContext struct {
StoreContext

SyncCfx *sdk.Client
SyncEth *web3go.Client
SyncCfxs []*sdk.Client
SyncEths []*web3go.Client
}

func MustInitSyncContext(storeCtx StoreContext) SyncContext {
sc := SyncContext{StoreContext: storeCtx}

if storeCtx.CfxDB != nil || storeCtx.CfxCache != nil {
sc.SyncCfx = rpc.MustNewCfxClientFromViper(rpc.WithClientHookMetrics(true))
sc.SyncCfxs = rpc.MustNewCfxClientsFromViper(rpc.WithClientHookMetrics(true))
}

if storeCtx.EthDB != nil {
sc.SyncEth = rpc.MustNewEthClientFromViper(rpc.WithClientHookMetrics(true))
sc.SyncEths = rpc.MustNewEthClientsFromViper(rpc.WithClientHookMetrics(true))
}

return sc
Expand All @@ -96,11 +96,11 @@ func (ctx *SyncContext) Close() {
// Usually, storeContext will be defer closed by itself
// ctx.storeContext.Close()

if ctx.SyncCfx != nil {
ctx.SyncCfx.Close()
for _, client := range ctx.SyncCfxs {
client.Close()
}

if ctx.SyncEth != nil {
ctx.SyncEth.Close()
for _, client := range ctx.SyncEths {
client.Close()
}
}
8 changes: 4 additions & 4 deletions config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ ethrpc:

# Core space SDK client configurations
cfx:
# Fullnode HTTP endpoint
http: http://test.confluxrpc.com
# Fullnode HTTP endpoints
http: [http://test.confluxrpc.com]
# Retry times if failure, if 0 never
retry: 0
# Interval duration before each retry
Expand All @@ -67,8 +67,8 @@ cfx:

# EVM space SDK client configurations
eth:
# Fullnode HTTP endpoint
http: http://evmtestnet.confluxrpc.com
# Fullnode HTTP endpoints
http: [http://evmtestnet.confluxrpc.com]
# Retry times if failure, if 0 never
retry: 0
# Interval duration before each retry
Expand Down
28 changes: 15 additions & 13 deletions sync/catchup/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/Conflux-Chain/confura/store/mysql"
"github.com/Conflux-Chain/confura/sync/election"
"github.com/Conflux-Chain/confura/sync/monitor"
"github.com/Conflux-Chain/confura/util"
sdk "github.com/Conflux-Chain/go-conflux-sdk"
logutil "github.com/Conflux-Chain/go-conflux-util/log"
viperutil "github.com/Conflux-Chain/go-conflux-util/viper"
Expand All @@ -24,8 +23,8 @@ import (
type Syncer struct {
// goroutine workers to fetch epoch data concurrently
workers []*worker
// conflux sdk client delegated to get network status
cfx sdk.ClientOperator
// conflux sdk clients delegated to get network status
cfxs []*sdk.Client
// db store to persist epoch data
db *mysql.MysqlStore
// min num of db rows per batch persistence
Expand Down Expand Up @@ -68,7 +67,7 @@ func WithBenchmark(benchmark bool) SyncOption {
}

func MustNewSyncer(
cfx sdk.ClientOperator,
cfxClients []*sdk.Client,
db *mysql.MysqlStore,
elm election.LeaderManager,
opts ...SyncOption) *Syncer {
Expand All @@ -89,13 +88,13 @@ func MustNewSyncer(
WithWorkers(workers),
)

return newSyncer(cfx, db, elm, append(newOpts, opts...)...)
return newSyncer(cfxClients, db, elm, append(newOpts, opts...)...)
}

func newSyncer(
cfx sdk.ClientOperator, db *mysql.MysqlStore,
cfxClients []*sdk.Client, db *mysql.MysqlStore,
elm election.LeaderManager, opts ...SyncOption) *Syncer {
syncer := &Syncer{elm: elm, db: db, cfx: cfx, minBatchDbRows: 1500}
syncer := &Syncer{elm: elm, db: db, cfxs: cfxClients, minBatchDbRows: 1500}
for _, opt := range opts {
opt(syncer)
}
Expand Down Expand Up @@ -296,13 +295,16 @@ func (s *Syncer) nextSyncRange() (uint64, uint64, error) {
start = 0
}

status, err := s.cfx.GetStatus()
if err != nil {
return 0, 0, errors.WithMessage(err, "failed to get network status")
var retErr error
for _, cfx := range s.cfxs {
status, err := cfx.GetStatus()
if err == nil {
end := max(status.LatestFinalized, status.LatestCheckpoint)
return start, uint64(end), nil
}
retErr = err
}

end := util.MaxUint64(uint64(status.LatestFinalized), uint64(status.LatestCheckpoint))
return start, end, nil
return 0, 0, errors.WithMessage(retErr, "failed to get network status")
}

// countDbRows count total db rows and to be stored db row from epoch data.
Expand Down
37 changes: 32 additions & 5 deletions sync/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ var (
errSyncHeightNotGrowing = errors.New("sync height not growing")
)

type HealthState string

const (
Healthy HealthState = "Healthy"
Unhealthy HealthState = "Unhealthy"
)

type HealthObserver interface {
OnStateChange(state HealthState, details ...string)
}

// Config holds the configuration parameters for the sync monitor.
type Config struct {
// Health check configuration
Expand All @@ -30,7 +41,7 @@ func NewConfig() (conf Config) {
return Config{
MaxAllowedLag: 200,
MaxStalledDuration: 5 * time.Minute,
Health: health.CounterConfig{Remind: 5},
Health: health.CounterConfig{Threshold: 2, Remind: 5},
}
}

Expand All @@ -44,6 +55,7 @@ type Monitor struct {
lastAdvancedAt time.Time
getLatestHeight func() (uint64, error)

observer HealthObserver
healthStatus health.Counter
}

Expand Down Expand Up @@ -89,6 +101,10 @@ func (m *Monitor) run(ctx context.Context) {
}
}

func (m *Monitor) SetObserver(observer HealthObserver) {
m.observer = observer
}

// Update updates the current sync height.
func (m *Monitor) Update(newHeight uint64) {
m.mu.Lock()
Expand Down Expand Up @@ -150,16 +166,27 @@ func (m *Monitor) onSuccess() {
if recovered {
logrus.WithFields(logrus.Fields{
"failures": failures,
}).Info("Sync monitor recovered after failures")
}).Warn("Sync process recovered from failures")
if m.observer != nil {
m.observer.OnStateChange(Healthy)
}
}
}

func (m *Monitor) onFailure(err error, heights ...uint64) {
_, unrecovered, failures := m.healthStatus.OnFailure(m.Health)
if unrecovered {
unhealthy, unrecovered, failures := m.healthStatus.OnFailure(m.Health)
if unhealthy {
logrus.WithFields(logrus.Fields{
"ctxHeights": heights,
"failures": failures,
}).WithError(err).Error("Sync process becomes unhealthy")
if m.observer != nil {
m.observer.OnStateChange(Unhealthy)
}
} else if unrecovered {
logrus.WithFields(logrus.Fields{
"ctxHeights": heights,
"failures": failures,
}).WithError(err).Error("Sync monitor not recovered after failures")
}).WithError(err).Error("Sync process still not recovered after failures")
}
}
52 changes: 39 additions & 13 deletions sync/sync_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sync
import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/Conflux-Chain/confura/store"
Expand Down Expand Up @@ -44,8 +45,10 @@ type syncSubConfig struct {
// against the latest confirmed epoch.
type DatabaseSyncer struct {
conf *syncConfig
// conflux sdk client
cfx sdk.ClientOperator
// conflux sdk clients
cfxs []*sdk.Client
// selected sdk client index
cfxIdx atomic.Uint32
// db store
db *mysql.MysqlStore
// epoch number to sync data from
Expand All @@ -65,22 +68,33 @@ type DatabaseSyncer struct {
}

// MustNewDatabaseSyncer creates an instance of DatabaseSyncer to sync blockchain data.
func MustNewDatabaseSyncer(cfx sdk.ClientOperator, db *mysql.MysqlStore) *DatabaseSyncer {
func MustNewDatabaseSyncer(cfxClients []*sdk.Client, db *mysql.MysqlStore) *DatabaseSyncer {
if len(cfxClients) == 0 {
logrus.Fatal("No sdk client provided")
}

var conf syncConfig
viperutil.MustUnmarshalKey("sync", &conf)

dlm := dlock.NewLockManager(dlock.NewMySQLBackend(db.DB()))
monitor := monitor.NewMonitor(monitor.NewConfig(), func() (uint64, error) {
epoch, err := cfx.GetEpochNumber(types.EpochLatestConfirmed)
if err != nil {
return 0, err
monitor := monitor.NewMonitor(monitor.NewConfig(), func() (latestEpochNum uint64, retErr error) {
for _, cfx := range cfxClients {
epoch, err := cfx.GetEpochNumber(types.EpochLatestConfirmed)
if err == nil {
latestEpochNum = max(latestEpochNum, epoch.ToInt().Uint64())
} else {
retErr = err
}
}
if latestEpochNum > 0 {
return latestEpochNum, nil
}
return epoch.ToInt().Uint64(), nil
return 0, retErr
})

syncer := &DatabaseSyncer{
conf: &conf,
cfx: cfx,
cfxs: cfxClients,
db: db,
epochFrom: 0,
maxSyncEpochs: conf.MaxEpochs,
Expand All @@ -90,6 +104,7 @@ func MustNewDatabaseSyncer(cfx sdk.ClientOperator, db *mysql.MysqlStore) *Databa
epochPivotWin: newEpochPivotWindow(syncPivotInfoWinCapacity),
elm: election.MustNewLeaderManagerFromViper(dlm, "sync.cfx"),
}
monitor.SetObserver(syncer)

// Register leader election callbacks
syncer.elm.OnElected(func(ctx context.Context, lm election.LeaderManager) {
Expand All @@ -103,7 +118,7 @@ func MustNewDatabaseSyncer(cfx sdk.ClientOperator, db *mysql.MysqlStore) *Databa
})

// Ensure epoch data validity in database
if err := ensureStoreEpochDataOk(cfx, db); err != nil {
if err := ensureStoreEpochDataOk(cfxClients[0], db); err != nil {
logrus.WithError(err).Fatal("Db sync failed to ensure epoch data validity in db")
}

Expand Down Expand Up @@ -148,7 +163,7 @@ func (syncer *DatabaseSyncer) Sync(ctx context.Context, wg *sync.WaitGroup) {
// fast catch-up until the latest stable epoch
// (maximum between the latest finalized and checkpoint epoch)
func (syncer *DatabaseSyncer) fastCatchup(ctx context.Context) {
catchUpSyncer := catchup.MustNewSyncer(syncer.cfx, syncer.db, syncer.elm)
catchUpSyncer := catchup.MustNewSyncer(syncer.cfxs, syncer.db, syncer.elm)
defer catchUpSyncer.Close()

catchUpSyncer.Sync(ctx)
Expand Down Expand Up @@ -182,8 +197,10 @@ func (syncer *DatabaseSyncer) loadLastSyncEpoch() error {

// Sync data once and return true if catch up to the latest confirmed epoch, otherwise false.
func (syncer *DatabaseSyncer) syncOnce(ctx context.Context) (bool, error) {
cfx := syncer.cfxs[syncer.cfxIdx.Load()]

// Fetch latest confirmed epoch from blockchain
epoch, err := syncer.cfx.GetEpochNumber(types.EpochLatestConfirmed)
epoch, err := cfx.GetEpochNumber(types.EpochLatestConfirmed)
if err != nil {
return false, errors.WithMessage(
err, "failed to query the latest confirmed epoch number",
Expand Down Expand Up @@ -217,7 +234,7 @@ func (syncer *DatabaseSyncer) syncOnce(ctx context.Context) (bool, error) {
epochNo := syncer.epochFrom + uint64(i)
eplogger := logger.WithField("epoch", epochNo)

data, err := store.QueryEpochData(syncer.cfx, epochNo, syncer.conf.UseBatch)
data, err := store.QueryEpochData(cfx, epochNo, syncer.conf.UseBatch)

// If epoch pivot chain switched, stop the querying right now since it's pointless to query epoch data
// that will be reverted late.
Expand Down Expand Up @@ -428,3 +445,12 @@ func (syncer *DatabaseSyncer) onLeadershipChanged(
logrus.WithField("leaderID", lm.Identity()).Warn("DB syncer lost HA leadership")
}
}

func (syncer *DatabaseSyncer) OnStateChange(state monitor.HealthState, details ...string) {
if len(syncer.cfxs) > 1 && state == monitor.Unhealthy {
// Switch to the next cfx client if the sync progress is not healthy
newCfxIdx := (syncer.cfxIdx.Load() + 1) % uint32(len(syncer.cfxs))
syncer.cfxIdx.Store(newCfxIdx)
logrus.WithField("cfxIndex", newCfxIdx).Info("Switched to the next cfx client")
}
}
6 changes: 3 additions & 3 deletions sync/sync_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestFindFirstRevertedEpochInRange(t *testing.T) {
}
return false, nil
}
res, err := findFirstRevertedEpochInRange(syncer.cfx, syncer.db, tc.epochRange, checker)
res, err := findFirstRevertedEpochInRange(nil, syncer.db, tc.epochRange, checker)
assert.Nil(t, err)
assert.Equal(t, tc.expected, res)
}
Expand Down Expand Up @@ -109,13 +109,13 @@ func TestEnsureEpochRangeNotRerverted(t *testing.T) {
return false, nil
}
searcher := func(cfx sdk.ClientOperator, s store.StackOperable, epochRange citypes.RangeUint64) (uint64, error) {
return findFirstRevertedEpochInRange(syncer.cfx, syncer.db, epochRange, checker)
return findFirstRevertedEpochInRange(nil, syncer.db, epochRange, checker)
}
pruner := func(s store.StackOperable, epochRange citypes.RangeUint64) error {
assert.Equal(t, tc.expectedPrunedEpochFrom, epochRange.From)
return nil
}
err := ensureEpochRangeNotRerverted(syncer.cfx, syncer.db, tc.epochRange, searcher, pruner)
err := ensureEpochRangeNotRerverted(nil, syncer.db, tc.epochRange, searcher, pruner)
assert.Nil(t, err)
}
}
Loading

0 comments on commit 213eaf4

Please sign in to comment.