diff --git a/ttl/cache/task.go b/ttl/cache/task.go index 3ef1e2ebee811..5ba2f427f67c8 100644 --- a/ttl/cache/task.go +++ b/ttl/cache/task.go @@ -58,8 +58,8 @@ func SelectFromTTLTaskWithID(jobID string, scanID int64) (string, []interface{}) } // PeekWaitingTTLTask returns an SQL statement to get `limit` waiting ttl task -func PeekWaitingTTLTask(limit int, hbExpire time.Time) (string, []interface{}) { - return selectFromTTLTask + " WHERE status = 'waiting' OR (owner_hb_time < %? AND status = 'running') ORDER BY created_time ASC LIMIT %?", []interface{}{hbExpire.Format("2006-01-02 15:04:05"), limit} +func PeekWaitingTTLTask(hbExpire time.Time) (string, []interface{}) { + return selectFromTTLTask + " WHERE status = 'waiting' OR (owner_hb_time < %? AND status = 'running') ORDER BY created_time ASC", []interface{}{hbExpire.Format("2006-01-02 15:04:05")} } // InsertIntoTTLTask returns an SQL statement to insert a ttl task into mysql.tidb_ttl_task diff --git a/ttl/ttlworker/job_manager_integration_test.go b/ttl/ttlworker/job_manager_integration_test.go index 45fd9e69e2bf8..b223eb54a43e1 100644 --- a/ttl/ttlworker/job_manager_integration_test.go +++ b/ttl/ttlworker/job_manager_integration_test.go @@ -99,7 +99,7 @@ func TestParallelLockNewJob(t *testing.T) { successCounter.Add(1) successJob = job } else { - logutil.BgLogger().Error("lock new job with error", zap.Error(err)) + logutil.BgLogger().Info("lock new job with error", zap.Error(err)) } wg.Done() }() diff --git a/ttl/ttlworker/task_manager.go b/ttl/ttlworker/task_manager.go index 0db5405034c25..ee245aa1c8c29 100644 --- a/ttl/ttlworker/task_manager.go +++ b/ttl/ttlworker/task_manager.go @@ -263,45 +263,43 @@ func (m *taskManager) rescheduleTasks(se session.Session, now time.Time) { return } - for len(idleScanWorkers) > 0 { - tasks, err := m.peekWaitingScanTasks(se, len(idleScanWorkers), now) - if err != nil { - logutil.Logger(m.ctx).Warn("fail to peek scan task", zap.Error(err)) - return - } + tasks, err := m.peekWaitingScanTasks(se, now) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to peek scan task", zap.Error(err)) + return + } - if len(tasks) == 0 { - break + for _, t := range tasks { + logger := logutil.Logger(m.ctx).With(zap.String("jobID", t.JobID), zap.Int64("scanID", t.ScanID)) + + task, err := m.lockScanTask(se, t, now) + if err != nil { + // If other nodes lock the task, it will return an error. It's expected + // so the log level is only `info` + logutil.Logger(m.ctx).Info("fail to lock scan task", zap.Error(err)) + continue } - for _, t := range tasks { - logger := logutil.Logger(m.ctx).With(zap.String("jobID", t.JobID), zap.Int64("scanID", t.ScanID)) + idleWorker := idleScanWorkers[0] + idleScanWorkers = idleScanWorkers[1:] - task, err := m.lockScanTask(se, t, now) - if err != nil { - // If other nodes lock the task, it will return an error. It's expected - // so the log level is only `info` - logutil.Logger(m.ctx).Info("fail to lock scan task", zap.Error(err)) - continue - } - - idleWorker := idleScanWorkers[0] - idleScanWorkers = idleScanWorkers[1:] + err = idleWorker.Schedule(task.ttlScanTask) + if err != nil { + logger.Warn("fail to schedule task", zap.Error(err)) + continue + } - err = idleWorker.Schedule(task.ttlScanTask) - if err != nil { - logger.Warn("fail to schedule task", zap.Error(err)) - continue - } + logger.Info("scheduled ttl task") + m.runningTasks = append(m.runningTasks, task) - logger.Info("scheduled ttl task") - m.runningTasks = append(m.runningTasks, task) + if len(idleScanWorkers) == 0 { + return } } } -func (m *taskManager) peekWaitingScanTasks(se session.Session, limit int, now time.Time) ([]*cache.TTLTask, error) { - sql, args := cache.PeekWaitingTTLTask(limit, now.Add(-2*ttlTaskHeartBeatTickerInterval)) +func (m *taskManager) peekWaitingScanTasks(se session.Session, now time.Time) ([]*cache.TTLTask, error) { + sql, args := cache.PeekWaitingTTLTask(now.Add(-2 * ttlTaskHeartBeatTickerInterval)) rows, err := se.ExecuteSQL(m.ctx, sql, args...) if err != nil { return nil, errors.Wrapf(err, "execute sql: %s", sql) diff --git a/ttl/ttlworker/task_manager_integration_test.go b/ttl/ttlworker/task_manager_integration_test.go index 0419a14f4133f..13278d66bf010 100644 --- a/ttl/ttlworker/task_manager_integration_test.go +++ b/ttl/ttlworker/task_manager_integration_test.go @@ -97,7 +97,7 @@ func TestParallelLockNewTask(t *testing.T) { if err == nil { successCounter.Add(1) } else { - logutil.BgLogger().Error("lock new task with error", zap.Error(err)) + logutil.BgLogger().Info("lock new task with error", zap.Error(err)) } wg.Done() }() @@ -230,3 +230,38 @@ func TestTaskMetrics(t *testing.T) { require.NoError(t, metrics.RunningTaskCnt.Write(out)) require.Equal(t, float64(1), out.GetGauge().GetValue()) } + +func TestRescheduleWithError(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + waitAndStopTTLManager(t, dom) + tk := testkit.NewTestKit(t, store) + + sessionFactory := sessionFactory(t, store) + // insert a wrong scan task with random table id + sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", 613, 1) + tk.MustExec(sql) + + isc := cache.NewInfoSchemaCache(time.Second) + require.NoError(t, isc.Update(sessionFactory())) + now := time.Now() + + // schedule in a task manager + scanWorker := ttlworker.NewMockScanWorker(t) + scanWorker.Start() + m := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-1") + m.SetScanWorkers4Test([]ttlworker.Worker{scanWorker}) + notify := make(chan struct{}) + go func() { + m.RescheduleTasks(sessionFactory(), now) + notify <- struct{}{} + }() + timeout, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + select { + case <-timeout.Done(): + require.Fail(t, "reschedule didn't finish in time") + case <-notify: + } + tk.MustQuery("select status from mysql.tidb_ttl_task").Check(testkit.Rows("waiting")) +}