From 0ace00e9d685b994b17c68d6217e313f28958602 Mon Sep 17 00:00:00 2001 From: YangKeao Date: Tue, 13 Dec 2022 20:08:46 +0800 Subject: [PATCH] reschedule scan tasks after update task state Signed-off-by: YangKeao --- ttl/ttlworker/job_manager.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index 7b80fa3165c50..b523baab3c05d 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -143,9 +143,13 @@ func (m *JobManager) jobLoop() error { } cancel() case <-updateScanTaskStateTicker: - m.updateTaskState() + if m.updateTaskState() { + m.rescheduleJobs(se, now) + } case <-m.notifyStateCh: - m.updateTaskState() + if m.updateTaskState() { + m.rescheduleJobs(se, now) + } case <-jobCheckTicker: m.checkFinishedJob(se, now) m.checkNotOwnJob() @@ -229,7 +233,8 @@ func (m *JobManager) resizeWorkers(workers []worker, count int, factory func() w return workers, nil } -func (m *JobManager) updateTaskState() { +// updateTaskState polls the result from scan worker and returns whether there are result polled +func (m *JobManager) updateTaskState() bool { results := m.pollScanWorkerResults() for _, result := range results { job := findJobWithTableID(m.runningJobs, result.task.tbl.ID) @@ -240,6 +245,8 @@ func (m *JobManager) updateTaskState() { job.scanTaskErr = multierr.Append(job.scanTaskErr, result.err) } } + + return len(results) > 0 } func (m *JobManager) pollScanWorkerResults() []*ttlScanTaskExecResult {