Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: avoid endless loop in TTL task schedule #41020

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ttl/cache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func SelectFromTTLTaskWithID(jobID string, scanID int64) (string, []interface{})
}

// PeekWaitingTTLTask returns an SQL statement to get `limit` waiting ttl task
func PeekWaitingTTLTask(limit int, hbExpire time.Time) (string, []interface{}) {
return selectFromTTLTask + " WHERE status = 'waiting' OR (owner_hb_time < %? AND status = 'running') ORDER BY created_time ASC LIMIT %?", []interface{}{hbExpire.Format("2006-01-02 15:04:05"), limit}
func PeekWaitingTTLTask(hbExpire time.Time) (string, []interface{}) {
return selectFromTTLTask + " WHERE status = 'waiting' OR (owner_hb_time < %? AND status = 'running') ORDER BY created_time ASC", []interface{}{hbExpire.Format("2006-01-02 15:04:05")}
}

// InsertIntoTTLTask returns an SQL statement to insert a ttl task into mysql.tidb_ttl_task
Expand Down
2 changes: 1 addition & 1 deletion ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestParallelLockNewJob(t *testing.T) {
successCounter.Add(1)
successJob = job
} else {
logutil.BgLogger().Error("lock new job with error", zap.Error(err))
logutil.BgLogger().Info("lock new job with error", zap.Error(err))
}
wg.Done()
}()
Expand Down
56 changes: 27 additions & 29 deletions ttl/ttlworker/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,45 +263,43 @@ func (m *taskManager) rescheduleTasks(se session.Session, now time.Time) {
return
}

for len(idleScanWorkers) > 0 {
tasks, err := m.peekWaitingScanTasks(se, len(idleScanWorkers), now)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to peek scan task", zap.Error(err))
return
}
tasks, err := m.peekWaitingScanTasks(se, now)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to peek scan task", zap.Error(err))
return
}

if len(tasks) == 0 {
break
for _, t := range tasks {
logger := logutil.Logger(m.ctx).With(zap.String("jobID", t.JobID), zap.Int64("scanID", t.ScanID))

task, err := m.lockScanTask(se, t, now)
if err != nil {
// If other nodes lock the task, it will return an error. It's expected
// so the log level is only `info`
logutil.Logger(m.ctx).Info("fail to lock scan task", zap.Error(err))
continue
}

for _, t := range tasks {
logger := logutil.Logger(m.ctx).With(zap.String("jobID", t.JobID), zap.Int64("scanID", t.ScanID))
idleWorker := idleScanWorkers[0]
idleScanWorkers = idleScanWorkers[1:]

task, err := m.lockScanTask(se, t, now)
if err != nil {
// If other nodes lock the task, it will return an error. It's expected
// so the log level is only `info`
logutil.Logger(m.ctx).Info("fail to lock scan task", zap.Error(err))
continue
}

idleWorker := idleScanWorkers[0]
idleScanWorkers = idleScanWorkers[1:]
err = idleWorker.Schedule(task.ttlScanTask)
if err != nil {
logger.Warn("fail to schedule task", zap.Error(err))
continue
}

err = idleWorker.Schedule(task.ttlScanTask)
if err != nil {
logger.Warn("fail to schedule task", zap.Error(err))
continue
}
logger.Info("scheduled ttl task")
m.runningTasks = append(m.runningTasks, task)

logger.Info("scheduled ttl task")
m.runningTasks = append(m.runningTasks, task)
if len(idleScanWorkers) == 0 {
return
}
}
}

func (m *taskManager) peekWaitingScanTasks(se session.Session, limit int, now time.Time) ([]*cache.TTLTask, error) {
sql, args := cache.PeekWaitingTTLTask(limit, now.Add(-2*ttlTaskHeartBeatTickerInterval))
func (m *taskManager) peekWaitingScanTasks(se session.Session, now time.Time) ([]*cache.TTLTask, error) {
sql, args := cache.PeekWaitingTTLTask(now.Add(-2 * ttlTaskHeartBeatTickerInterval))
rows, err := se.ExecuteSQL(m.ctx, sql, args...)
if err != nil {
return nil, errors.Wrapf(err, "execute sql: %s", sql)
Expand Down
37 changes: 36 additions & 1 deletion ttl/ttlworker/task_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestParallelLockNewTask(t *testing.T) {
if err == nil {
successCounter.Add(1)
} else {
logutil.BgLogger().Error("lock new task with error", zap.Error(err))
logutil.BgLogger().Info("lock new task with error", zap.Error(err))
}
wg.Done()
}()
Expand Down Expand Up @@ -230,3 +230,38 @@ func TestTaskMetrics(t *testing.T) {
require.NoError(t, metrics.RunningTaskCnt.Write(out))
require.Equal(t, float64(1), out.GetGauge().GetValue())
}

func TestRescheduleWithError(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)

sessionFactory := sessionFactory(t, store)
// insert a wrong scan task with random table id
sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", 613, 1)
tk.MustExec(sql)

isc := cache.NewInfoSchemaCache(time.Second)
require.NoError(t, isc.Update(sessionFactory()))
now := time.Now()

// schedule in a task manager
scanWorker := ttlworker.NewMockScanWorker(t)
scanWorker.Start()
m := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-1")
m.SetScanWorkers4Test([]ttlworker.Worker{scanWorker})
notify := make(chan struct{})
go func() {
m.RescheduleTasks(sessionFactory(), now)
notify <- struct{}{}
}()
timeout, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

select {
case <-timeout.Done():
require.Fail(t, "reschedule didn't finish in time")
case <-notify:
}
tk.MustQuery("select status from mysql.tidb_ttl_task").Check(testkit.Rows("waiting"))
}