Skip to content

Commit

Permalink
ttl: avoid endless loop in TTL task schedule (#41020)
Browse files Browse the repository at this point in the history
close #41019
  • Loading branch information
YangKeao authored Feb 3, 2023
1 parent f7d5db2 commit eb11fb2
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 33 deletions.
4 changes: 2 additions & 2 deletions ttl/cache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func SelectFromTTLTaskWithID(jobID string, scanID int64) (string, []interface{})
}

// PeekWaitingTTLTask returns an SQL statement to get `limit` waiting ttl task
func PeekWaitingTTLTask(limit int, hbExpire time.Time) (string, []interface{}) {
return selectFromTTLTask + " WHERE status = 'waiting' OR (owner_hb_time < %? AND status = 'running') ORDER BY created_time ASC LIMIT %?", []interface{}{hbExpire.Format("2006-01-02 15:04:05"), limit}
func PeekWaitingTTLTask(hbExpire time.Time) (string, []interface{}) {
return selectFromTTLTask + " WHERE status = 'waiting' OR (owner_hb_time < %? AND status = 'running') ORDER BY created_time ASC", []interface{}{hbExpire.Format("2006-01-02 15:04:05")}
}

// InsertIntoTTLTask returns an SQL statement to insert a ttl task into mysql.tidb_ttl_task
Expand Down
2 changes: 1 addition & 1 deletion ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestParallelLockNewJob(t *testing.T) {
successCounter.Add(1)
successJob = job
} else {
logutil.BgLogger().Error("lock new job with error", zap.Error(err))
logutil.BgLogger().Info("lock new job with error", zap.Error(err))
}
wg.Done()
}()
Expand Down
56 changes: 27 additions & 29 deletions ttl/ttlworker/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,45 +263,43 @@ func (m *taskManager) rescheduleTasks(se session.Session, now time.Time) {
return
}

for len(idleScanWorkers) > 0 {
tasks, err := m.peekWaitingScanTasks(se, len(idleScanWorkers), now)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to peek scan task", zap.Error(err))
return
}
tasks, err := m.peekWaitingScanTasks(se, now)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to peek scan task", zap.Error(err))
return
}

if len(tasks) == 0 {
break
for _, t := range tasks {
logger := logutil.Logger(m.ctx).With(zap.String("jobID", t.JobID), zap.Int64("scanID", t.ScanID))

task, err := m.lockScanTask(se, t, now)
if err != nil {
// If other nodes lock the task, it will return an error. It's expected
// so the log level is only `info`
logutil.Logger(m.ctx).Info("fail to lock scan task", zap.Error(err))
continue
}

for _, t := range tasks {
logger := logutil.Logger(m.ctx).With(zap.String("jobID", t.JobID), zap.Int64("scanID", t.ScanID))
idleWorker := idleScanWorkers[0]
idleScanWorkers = idleScanWorkers[1:]

task, err := m.lockScanTask(se, t, now)
if err != nil {
// If other nodes lock the task, it will return an error. It's expected
// so the log level is only `info`
logutil.Logger(m.ctx).Info("fail to lock scan task", zap.Error(err))
continue
}

idleWorker := idleScanWorkers[0]
idleScanWorkers = idleScanWorkers[1:]
err = idleWorker.Schedule(task.ttlScanTask)
if err != nil {
logger.Warn("fail to schedule task", zap.Error(err))
continue
}

err = idleWorker.Schedule(task.ttlScanTask)
if err != nil {
logger.Warn("fail to schedule task", zap.Error(err))
continue
}
logger.Info("scheduled ttl task")
m.runningTasks = append(m.runningTasks, task)

logger.Info("scheduled ttl task")
m.runningTasks = append(m.runningTasks, task)
if len(idleScanWorkers) == 0 {
return
}
}
}

func (m *taskManager) peekWaitingScanTasks(se session.Session, limit int, now time.Time) ([]*cache.TTLTask, error) {
sql, args := cache.PeekWaitingTTLTask(limit, now.Add(-2*ttlTaskHeartBeatTickerInterval))
func (m *taskManager) peekWaitingScanTasks(se session.Session, now time.Time) ([]*cache.TTLTask, error) {
sql, args := cache.PeekWaitingTTLTask(now.Add(-2 * ttlTaskHeartBeatTickerInterval))
rows, err := se.ExecuteSQL(m.ctx, sql, args...)
if err != nil {
return nil, errors.Wrapf(err, "execute sql: %s", sql)
Expand Down
37 changes: 36 additions & 1 deletion ttl/ttlworker/task_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestParallelLockNewTask(t *testing.T) {
if err == nil {
successCounter.Add(1)
} else {
logutil.BgLogger().Error("lock new task with error", zap.Error(err))
logutil.BgLogger().Info("lock new task with error", zap.Error(err))
}
wg.Done()
}()
Expand Down Expand Up @@ -230,3 +230,38 @@ func TestTaskMetrics(t *testing.T) {
require.NoError(t, metrics.RunningTaskCnt.Write(out))
require.Equal(t, float64(1), out.GetGauge().GetValue())
}

func TestRescheduleWithError(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)

sessionFactory := sessionFactory(t, store)
// insert a wrong scan task with random table id
sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", 613, 1)
tk.MustExec(sql)

isc := cache.NewInfoSchemaCache(time.Second)
require.NoError(t, isc.Update(sessionFactory()))
now := time.Now()

// schedule in a task manager
scanWorker := ttlworker.NewMockScanWorker(t)
scanWorker.Start()
m := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-1")
m.SetScanWorkers4Test([]ttlworker.Worker{scanWorker})
notify := make(chan struct{})
go func() {
m.RescheduleTasks(sessionFactory(), now)
notify <- struct{}{}
}()
timeout, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

select {
case <-timeout.Done():
require.Fail(t, "reschedule didn't finish in time")
case <-notify:
}
tk.MustQuery("select status from mysql.tidb_ttl_task").Check(testkit.Rows("waiting"))
}

0 comments on commit eb11fb2

Please sign in to comment.