diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index 7b80fa3165c50..b523baab3c05d 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -143,9 +143,13 @@ func (m *JobManager) jobLoop() error { } cancel() case <-updateScanTaskStateTicker: - m.updateTaskState() + if m.updateTaskState() { + m.rescheduleJobs(se, now) + } case <-m.notifyStateCh: - m.updateTaskState() + if m.updateTaskState() { + m.rescheduleJobs(se, now) + } case <-jobCheckTicker: m.checkFinishedJob(se, now) m.checkNotOwnJob() @@ -229,7 +233,8 @@ func (m *JobManager) resizeWorkers(workers []worker, count int, factory func() w return workers, nil } -func (m *JobManager) updateTaskState() { +// updateTaskState polls the result from scan worker and returns whether there are result polled +func (m *JobManager) updateTaskState() bool { results := m.pollScanWorkerResults() for _, result := range results { job := findJobWithTableID(m.runningJobs, result.task.tbl.ID) @@ -240,6 +245,8 @@ func (m *JobManager) updateTaskState() { job.scanTaskErr = multierr.Append(job.scanTaskErr, result.err) } } + + return len(results) > 0 } func (m *JobManager) pollScanWorkerResults() []*ttlScanTaskExecResult {