diff --git a/ddl/job_table.go b/ddl/job_table.go index e2caff2a74b08..fd1bb28ff0b13 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -266,7 +266,7 @@ func (d *ddl) startDispatchLoop() { notifyDDLJobByEtcdCh = d.etcdCli.Watch(d.ctx, addingDDLJobConcurrent) } ticker := time.NewTicker(dispatchLoopWaitingDuration) - if err := d.doCheckClusterState(false); err != nil { + if err := d.checkAndUpdateClusterState(true); err != nil { logutil.BgLogger().Fatal("dispatch loop get cluster state failed, it should not happen, please try restart TiDB", zap.Error(err)) } defer ticker.Stop() @@ -294,7 +294,7 @@ func (d *ddl) startDispatchLoop() { case <-d.ctx.Done(): return } - if err := d.needCheckClusterState(isOnce); err != nil { + if err := d.checkAndUpdateClusterState(isOnce); err != nil { continue } isOnce = false @@ -303,23 +303,17 @@ func (d *ddl) startDispatchLoop() { } } -func (d *ddl) needCheckClusterState(mustCheck bool) error { +func (d *ddl) checkAndUpdateClusterState(needUpdate bool) error { select { case _, ok := <-d.stateSyncer.WatchChan(): - return d.doCheckClusterState(!ok) + if !ok { + d.stateSyncer.Rewatch(d.ctx) + } default: - if mustCheck { - return d.doCheckClusterState(false) + if !needUpdate { + return nil } } - return nil -} - -func (d *ddl) doCheckClusterState(needRewatch bool) error { - if needRewatch { - d.stateSyncer.Rewatch(d.ctx) - return nil - } oldState := d.stateSyncer.IsUpgradingState() stateInfo, err := d.stateSyncer.GetGlobalState(d.ctx)