Skip to content

Commit

Permalink
ttl: don't fetch ttl scan task after it finished (#40919)
Browse files Browse the repository at this point in the history
close #40918
  • Loading branch information
YangKeao committed Feb 1, 2023
1 parent ff7d668 commit 4a38dee
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 1 deletion.
2 changes: 1 addition & 1 deletion ttl/cache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func SelectFromTTLTaskWithID(jobID string, scanID int64) (string, []interface{})

// PeekWaitingTTLTask returns an SQL statement to get `limit` waiting ttl task
func PeekWaitingTTLTask(limit int, hbExpire time.Time) (string, []interface{}) {
return selectFromTTLTask + " WHERE status = 'waiting' OR owner_hb_time < %? ORDER BY created_time ASC LIMIT %?", []interface{}{hbExpire.Format("2006-01-02 15:04:05"), limit}
return selectFromTTLTask + " WHERE status = 'waiting' OR (owner_hb_time < %? AND status = 'running') ORDER BY created_time ASC LIMIT %?", []interface{}{hbExpire.Format("2006-01-02 15:04:05"), limit}
}

// InsertIntoTTLTask returns an SQL statement to insert a ttl task into mysql.tidb_ttl_task
Expand Down
11 changes: 11 additions & 0 deletions ttl/ttlworker/task_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,17 @@ func TestTaskScheduleExpireHeartBeat(t *testing.T) {
m2.SetScanWorkers4Test([]ttlworker.Worker{scanWorker2})
m2.RescheduleTasks(sessionFactory(), now.Add(time.Hour))
tk.MustQuery("select status,owner_id from mysql.tidb_ttl_task").Check(testkit.Rows("running task-manager-2"))

// another task manager shouldn't fetch this task if it has finished
task := m2.GetRunningTasks()[0]
task.SetResult(nil)
m2.CheckFinishedTask(sessionFactory(), now)
scanWorker3 := ttlworker.NewMockScanWorker(t)
scanWorker3.Start()
m3 := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-3")
m3.SetScanWorkers4Test([]ttlworker.Worker{scanWorker3})
m3.RescheduleTasks(sessionFactory(), now.Add(time.Hour))
tk.MustQuery("select status,owner_id from mysql.tidb_ttl_task").Check(testkit.Rows("finished task-manager-2"))
}

func TestTaskMetrics(t *testing.T) {
Expand Down
18 changes: 18 additions & 0 deletions ttl/ttlworker/task_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,24 @@ func (m *taskManager) ReportMetrics() {
m.reportMetrics()
}

// CheckFinishedTask is an exported version of checkFinishedTask
func (m *taskManager) CheckFinishedTask(se session.Session, now time.Time) {
m.checkFinishedTask(se, now)
}

// ReportTaskFinished is an exported version of reportTaskFinished
func (m *taskManager) GetRunningTasks() []*runningScanTask {
return m.runningTasks
}

// ReportTaskFinished is an exported version of reportTaskFinished
func (t *runningScanTask) SetResult(err error) {
t.result = &ttlScanTaskExecResult{
task: t.ttlScanTask,
err: err,
}
}

func TestResizeWorkers(t *testing.T) {
tbl := newMockTTLTbl(t, "t1")

Expand Down

0 comments on commit 4a38dee

Please sign in to comment.