Skip to content

Commit

Permalink
feat(config): add initial sync timeout setting
Browse files Browse the repository at this point in the history
  • Loading branch information
lklimek committed Dec 19, 2024
1 parent b8f428b commit 6118718
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 5 deletions.
8 changes: 8 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,13 @@ type BaseConfig struct { //nolint: maligned
// Default: 0
DeadlockDetection time.Duration `mapstructure:"deadlock-detection"`

// SyncTimeout is the timeout for the initial sync process, before switching to consensus.
// If zero or empty, the default value is used.
//
// Default: 60s
SyncTimeout time.Duration `mapstructure:"sync-timeout"`

// Other options should be empty
Other map[string]interface{} `mapstructure:",remain"`
}

Expand All @@ -250,6 +257,7 @@ func DefaultBaseConfig() BaseConfig {
DBBackend: "goleveldb",
DBPath: "data",
DeadlockDetection: 0,
SyncTimeout: 60 * time.Second,
}
}

Expand Down
2 changes: 2 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func TestDefaultConfig(t *testing.T) {

assert.Equal(t, "/foo/bar", cfg.GenesisFile())
assert.Equal(t, "/opt/data", cfg.DBDir())

assert.Equal(t, 60*time.Second, cfg.BaseConfig.SyncTimeout)
}

func TestConfigValidateBasic(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ filter-peers = {{ .BaseConfig.FilterPeers }}
# Default: 0
deadlock-detection = "{{ .BaseConfig.DeadlockDetection }}"
# Timeout for the initial sync process, before switching to consensus.
# If zero or empty, the default value is used.
#
# Default: 60s
sync-timeout = "{{ .BaseConfig.SyncTimeout }}"
#######################################################
### ABCI App Connection Options ###
#######################################################
Expand Down
16 changes: 14 additions & 2 deletions internal/blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
switchToConsensusIntervalSeconds = 1

// switch to consensus after this duration of inactivity
syncTimeout = 60 * time.Second
defaultSyncTimeout = 60 * time.Second
)

type consensusReactor interface {
Expand Down Expand Up @@ -62,6 +62,8 @@ type Reactor struct {
eventBus *eventbus.EventBus

syncStartTime time.Time
// syncTimeout defines how much time we will try to start sync before switching to consensus
syncTimeout time.Duration

nodeProTxHash types.ProTxHash

Expand Down Expand Up @@ -94,6 +96,7 @@ func NewReactor(
metrics: metrics,
eventBus: eventBus,
nodeProTxHash: nodeProTxHash,
syncTimeout: defaultSyncTimeout,
executor: newBlockApplier(
blockExec,
store,
Expand All @@ -106,6 +109,12 @@ func NewReactor(
return r
}

func (r *Reactor) WithSyncTimeout(timeout time.Duration) *Reactor {
r.syncTimeout = timeout

return r
}

// OnStart starts separate go routines for each p2p Channel and listens for
// envelopes on each. In addition, it also listens for peer updates and handles
// messages on that p2p channel accordingly. The caller must be sure to execute
Expand All @@ -130,7 +139,10 @@ func (r *Reactor) OnStart(ctx context.Context) error {
startHeight = state.InitialHeight
}

r.synchronizer = NewSynchronizer(startHeight, r.p2pClient, r.executor, WithLogger(r.logger))
r.synchronizer = NewSynchronizer(startHeight, r.p2pClient, r.executor,
WithLogger(r.logger),
WithSyncTimeout(r.syncTimeout),
)
if r.blockSyncFlag.Load() {
if err := r.synchronizer.Start(ctx); err != nil {
return err
Expand Down
13 changes: 11 additions & 2 deletions internal/blocksync/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type (
logger log.Logger

lastAdvance time.Time
// syncTimeout defines how much time we will try to sync; defaults to 60 seconds
syncTimeout time.Duration

mtx sync.RWMutex

Expand Down Expand Up @@ -112,6 +114,12 @@ func WithClock(clock clockwork.Clock) OptionFunc {
}
}

func WithSyncTimeout(timeout time.Duration) OptionFunc {
return func(v *Synchronizer) {
v.syncTimeout = timeout
}
}

// NewSynchronizer returns a new Synchronizer with the height equal to start
func NewSynchronizer(start int64, client client.BlockClient, blockExec *blockApplier, opts ...OptionFunc) *Synchronizer {
peerStore := NewInMemPeerStore()
Expand All @@ -127,6 +135,7 @@ func NewSynchronizer(start int64, client client.BlockClient, blockExec *blockApp
height: start,
workerPool: workerpool.New(poolWorkerSize, workerpool.WithLogger(logger)),
pendingToApply: map[int64]BlockResponse{},
syncTimeout: defaultSyncTimeout,
}
for _, opt := range opts {
opt(bp)
Expand Down Expand Up @@ -239,14 +248,14 @@ func (s *Synchronizer) WaitForSync(ctx context.Context) {
lastAdvance = s.LastAdvance()
isCaughtUp = s.IsCaughtUp()
)
if isCaughtUp || time.Since(lastAdvance) > syncTimeout {
if isCaughtUp || time.Since(lastAdvance) > s.syncTimeout {
return
}
s.logger.Info(
"not caught up yet",
"height", height,
"max_peer_height", s.MaxPeerHeight(),
"timeout_in", syncTimeout-time.Since(lastAdvance),
"timeout_in", s.syncTimeout-time.Since(lastAdvance),
)
}
}
Expand Down
2 changes: 1 addition & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func makeNode(
blockSync && !stateSync,
nodeMetrics.consensus,
eventBus,
)
).WithSyncTimeout(cfg.SyncTimeout)
node.services = append(node.services, bcReactor)
node.rpcEnv.BlockSyncReactor = bcReactor

Expand Down

0 comments on commit 6118718

Please sign in to comment.