diff --git a/ttl/cache/task.go b/ttl/cache/task.go index c88a3fe0abb2e..3ef1e2ebee811 100644 --- a/ttl/cache/task.go +++ b/ttl/cache/task.go @@ -59,7 +59,7 @@ func SelectFromTTLTaskWithID(jobID string, scanID int64) (string, []interface{}) // PeekWaitingTTLTask returns an SQL statement to get `limit` waiting ttl task func PeekWaitingTTLTask(limit int, hbExpire time.Time) (string, []interface{}) { - return selectFromTTLTask + " WHERE status = 'waiting' OR owner_hb_time < %? ORDER BY created_time ASC LIMIT %?", []interface{}{hbExpire.Format("2006-01-02 15:04:05"), limit} + return selectFromTTLTask + " WHERE status = 'waiting' OR (owner_hb_time < %? AND status = 'running') ORDER BY created_time ASC LIMIT %?", []interface{}{hbExpire.Format("2006-01-02 15:04:05"), limit} } // InsertIntoTTLTask returns an SQL statement to insert a ttl task into mysql.tidb_ttl_task diff --git a/ttl/ttlworker/task_manager_integration_test.go b/ttl/ttlworker/task_manager_integration_test.go index 9e3bad19b2acd..0419a14f4133f 100644 --- a/ttl/ttlworker/task_manager_integration_test.go +++ b/ttl/ttlworker/task_manager_integration_test.go @@ -186,6 +186,17 @@ func TestTaskScheduleExpireHeartBeat(t *testing.T) { m2.SetScanWorkers4Test([]ttlworker.Worker{scanWorker2}) m2.RescheduleTasks(sessionFactory(), now.Add(time.Hour)) tk.MustQuery("select status,owner_id from mysql.tidb_ttl_task").Check(testkit.Rows("running task-manager-2")) + + // another task manager shouldn't fetch this task if it has finished + task := m2.GetRunningTasks()[0] + task.SetResult(nil) + m2.CheckFinishedTask(sessionFactory(), now) + scanWorker3 := ttlworker.NewMockScanWorker(t) + scanWorker3.Start() + m3 := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-3") + m3.SetScanWorkers4Test([]ttlworker.Worker{scanWorker3}) + m3.RescheduleTasks(sessionFactory(), now.Add(time.Hour)) + tk.MustQuery("select status,owner_id from mysql.tidb_ttl_task").Check(testkit.Rows("finished task-manager-2")) } func TestTaskMetrics(t *testing.T) { diff --git a/ttl/ttlworker/task_manager_test.go b/ttl/ttlworker/task_manager_test.go index 37365cb9757f6..cffb8e071b62d 100644 --- a/ttl/ttlworker/task_manager_test.go +++ b/ttl/ttlworker/task_manager_test.go @@ -54,6 +54,24 @@ func (m *taskManager) ReportMetrics() { m.reportMetrics() } +// CheckFinishedTask is an exported version of checkFinishedTask +func (m *taskManager) CheckFinishedTask(se session.Session, now time.Time) { + m.checkFinishedTask(se, now) +} + +// ReportTaskFinished is an exported version of reportTaskFinished +func (m *taskManager) GetRunningTasks() []*runningScanTask { + return m.runningTasks +} + +// ReportTaskFinished is an exported version of reportTaskFinished +func (t *runningScanTask) SetResult(err error) { + t.result = &ttlScanTaskExecResult{ + task: t.ttlScanTask, + err: err, + } +} + func TestResizeWorkers(t *testing.T) { tbl := newMockTTLTbl(t, "t1")