Skip to content

ttl: fix the issue that one task losing heartbeat will block other tasks (#57919) #58710

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 32 additions & 25 deletions pkg/ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,7 @@ func (m *JobManager) jobLoop() error {
// Job Schedule loop:
case <-updateJobHeartBeatTicker:
updateHeartBeatCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout)
err = m.updateHeartBeat(updateHeartBeatCtx, se, now)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to update job heart beat", zap.Error(err))
}
m.updateHeartBeat(updateHeartBeatCtx, se, now)
cancel()
case <-jobCheckTicker:
m.checkFinishedJob(se)
Expand Down Expand Up @@ -277,10 +274,7 @@ func (m *JobManager) jobLoop() error {
m.taskManager.resizeWorkersWithSysVar()
case <-updateTaskHeartBeatTicker:
updateHeartBeatCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout)
err = m.taskManager.updateHeartBeat(updateHeartBeatCtx, se, now)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to update task heart beat", zap.Error(err))
}
m.taskManager.updateHeartBeat(updateHeartBeatCtx, se, now)
cancel()
case <-checkScanTaskFinishedTicker:
if m.taskManager.handleScanFinishedTask() {
Expand Down Expand Up @@ -897,29 +891,42 @@ func (m *JobManager) appendLockedJob(id string, se session.Session, createTime t
}

// updateHeartBeat updates the heartbeat for all task with current instance as owner
func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) error {
func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) {
for _, job := range m.localJobs() {
if job.createTime.Add(ttlJobTimeout).Before(now) {
logutil.Logger(m.ctx).Info("job is timeout", zap.String("jobID", job.id))
summary, err := summarizeErr(errors.New("job is timeout"))
if err != nil {
logutil.Logger(m.ctx).Warn("fail to summarize job", zap.Error(err))
}
err = job.finish(se, now, summary)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err))
continue
}
m.removeJob(job)
err := m.updateHeartBeatForJob(ctx, se, now, job)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to update heartbeat for job", zap.Error(err), zap.String("jobID", job.id))
}
}
}

intest.Assert(se.GetSessionVars().TimeZone.String() == now.Location().String())
sql, args := updateHeartBeatSQL(job.tbl.ID, now, m.id)
_, err := se.ExecuteSQL(ctx, sql, args...)
func (m *JobManager) updateHeartBeatForJob(ctx context.Context, se session.Session, now time.Time, job *ttlJob) error {
if job.createTime.Add(ttlJobTimeout).Before(now) {
logutil.Logger(m.ctx).Info("job is timeout", zap.String("jobID", job.id))
summary, err := summarizeErr(errors.New("job is timeout"))
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
return errors.Wrapf(err, "fail to summarize job")
}
err = job.finish(se, now, summary)
if err != nil {
return errors.Wrapf(err, "fail to finish job")
}
m.removeJob(job)
return nil
}

intest.Assert(se.GetSessionVars().TimeZone.String() == now.Location().String())
sql, args := updateHeartBeatSQL(job.tbl.ID, now, m.id)
_, err := se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}

if se.GetSessionVars().StmtCtx.AffectedRows() != 1 {
return errors.Errorf("fail to update job heartbeat, maybe the owner is not myself (%s), affected rows: %d",
m.id, se.GetSessionVars().StmtCtx.AffectedRows())
}

return nil
}

Expand Down
45 changes: 44 additions & 1 deletion pkg/ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ func TestJobTimeout(t *testing.T) {
require.Equal(t, now.Format(time.RFC3339), newTableStatus.CurrentJobStatusUpdateTime.Format(time.RFC3339))

// the timeout will be checked while updating heartbeat
require.NoError(t, m2.UpdateHeartBeat(ctx, se, now.Add(7*time.Hour)))
require.NoError(t, m2.UpdateHeartBeatForJob(ctx, se, now.Add(7*time.Hour), m2.RunningJobs()[0]))
tk.MustQuery("select last_job_summary->>'$.scan_task_err' from mysql.tidb_ttl_table_status").Check(testkit.Rows("job is timeout"))
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
}
Expand Down Expand Up @@ -1617,3 +1617,46 @@ func TestTimerJobAfterDropTable(t *testing.T) {
require.NotNil(t, job)
require.True(t, job.Finished)
}

func TestJobHeartBeatFailNotBlockOthers(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)

sessionFactory := sessionFactory(t, dom)
se := sessionFactory()

tk.MustExec("use test")
tk.MustExec("CREATE TABLE t1 (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
tk.MustExec("CREATE TABLE t2 (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR")
testTable1, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t1"))
require.NoError(t, err)
testTable2, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t2"))
require.NoError(t, err)

ctx := context.Background()
m := ttlworker.NewJobManager("test-ttl-job-manager-1", nil, store, nil, nil)

now := se.Now()
// acquire two jobs
require.NoError(t, m.InfoSchemaCache().Update(se))
require.NoError(t, m.TableStatusCache().Update(ctx, se))
_, err = m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable1.Meta().ID], now, uuid.NewString(), false)
require.NoError(t, err)
_, err = m.LockJob(context.Background(), se, m.InfoSchemaCache().Tables[testTable2.Meta().ID], now, uuid.NewString(), false)
require.NoError(t, err)
tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("running", "running"))

// assign the first job to another manager
tk.MustExec("update mysql.tidb_ttl_table_status set current_job_owner_id = 'test-ttl-job-manager-2' where table_id = ?", testTable1.Meta().ID)
// the heartbeat of the first job will fail, but the second one will still success
now = now.Add(time.Hour)
require.Error(t, m.UpdateHeartBeatForJob(context.Background(), se, now, m.RunningJobs()[0]))
require.NoError(t, m.UpdateHeartBeatForJob(context.Background(), se, now, m.RunningJobs()[1]))

now = now.Add(time.Hour)
m.UpdateHeartBeat(ctx, se, now)
tk.MustQuery("select table_id, current_job_owner_hb_time from mysql.tidb_ttl_table_status").Sort().Check(testkit.Rows(
fmt.Sprintf("%d %s", testTable1.Meta().ID, now.Add(-2*time.Hour).Format(time.DateTime)),
fmt.Sprintf("%d %s", testTable2.Meta().ID, now.Format(time.DateTime))))
}
8 changes: 6 additions & 2 deletions pkg/ttl/ttlworker/job_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,12 @@ func (m *JobManager) TaskManager() *taskManager {
}

// UpdateHeartBeat is an exported version of updateHeartBeat for test
func (m *JobManager) UpdateHeartBeat(ctx context.Context, se session.Session, now time.Time) error {
return m.updateHeartBeat(ctx, se, now)
func (m *JobManager) UpdateHeartBeat(ctx context.Context, se session.Session, now time.Time) {
m.updateHeartBeat(ctx, se, now)
}

func (m *JobManager) UpdateHeartBeatForJob(ctx context.Context, se session.Session, now time.Time, job *ttlJob) error {
return m.updateHeartBeatForJob(ctx, se, now, job)
}

// ReportMetrics is an exported version of reportMetrics
Expand Down
50 changes: 29 additions & 21 deletions pkg/ttl/ttlworker/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,32 +439,40 @@ func (m *taskManager) syncTaskFromTable(se session.Session, jobID string, scanID
}

// updateHeartBeat updates the heartbeat for all tasks with current instance as owner
func (m *taskManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) error {
func (m *taskManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) {
for _, task := range m.runningTasks {
state := &cache.TTLTaskState{
TotalRows: task.statistics.TotalRows.Load(),
SuccessRows: task.statistics.SuccessRows.Load(),
ErrorRows: task.statistics.ErrorRows.Load(),
}
if task.result != nil && task.result.err != nil {
state.ScanTaskErr = task.result.err.Error()
}

intest.Assert(se.GetSessionVars().Location().String() == now.Location().String())
sql, args, err := updateTTLTaskHeartBeatSQL(task.JobID, task.ScanID, now, state, m.id)
err := m.updateHeartBeatForTask(ctx, se, now, task)
if err != nil {
return err
}
_, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
logutil.Logger(m.ctx).Warn("fail to update task heart beat", zap.Error(err), zap.String("jobID", task.JobID), zap.Int64("scanID", task.ScanID))
}
}
}

if se.GetSessionVars().StmtCtx.AffectedRows() != 1 {
return errors.Errorf("fail to update task status, maybe the owner is not myself (%s), affected rows: %d",
m.id, se.GetSessionVars().StmtCtx.AffectedRows())
}
func (m *taskManager) updateHeartBeatForTask(ctx context.Context, se session.Session, now time.Time, task *runningScanTask) error {
state := &cache.TTLTaskState{
TotalRows: task.statistics.TotalRows.Load(),
SuccessRows: task.statistics.SuccessRows.Load(),
ErrorRows: task.statistics.ErrorRows.Load(),
}
if task.result != nil && task.result.err != nil {
state.ScanTaskErr = task.result.err.Error()
}

intest.Assert(se.GetSessionVars().Location().String() == now.Location().String())
sql, args, err := updateTTLTaskHeartBeatSQL(task.JobID, task.ScanID, now, state, m.id)
if err != nil {
return err
}
_, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}

if se.GetSessionVars().StmtCtx.AffectedRows() != 1 {
return errors.Errorf("fail to update task heartbeat, maybe the owner is not myself (%s), affected rows: %d",
m.id, se.GetSessionVars().StmtCtx.AffectedRows())
}

return nil
}

Expand Down
65 changes: 63 additions & 2 deletions pkg/ttl/ttlworker/task_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,9 @@ func TestTaskCancelledAfterHeartbeatTimeout(t *testing.T) {
tk.MustQuery("select count(1) from mysql.tidb_ttl_task where status = 'running' and owner_id = 'task-manager-2'").Check(testkit.Rows("4"))

// Then m1 cannot update the heartbeat of its task
require.Error(t, m1.UpdateHeartBeat(context.Background(), se, now.Add(time.Hour)))
for i := 0; i < 4; i++ {
require.Error(t, m1.UpdateHeartBeatForTask(context.Background(), se, now.Add(time.Hour), m1.GetRunningTasks()[i]))
}
tk.MustQuery("select owner_hb_time from mysql.tidb_ttl_task").Check(testkit.Rows(
now.Format(time.DateTime),
now.Format(time.DateTime),
Expand All @@ -490,7 +492,9 @@ func TestTaskCancelledAfterHeartbeatTimeout(t *testing.T) {
))

// m2 can successfully update the heartbeat
require.NoError(t, m2.UpdateHeartBeat(context.Background(), se, now.Add(time.Hour)))
for i := 0; i < 4; i++ {
require.NoError(t, m2.UpdateHeartBeatForTask(context.Background(), se, now.Add(time.Hour), m2.GetRunningTasks()[i]))
}
tk.MustQuery("select owner_hb_time from mysql.tidb_ttl_task").Check(testkit.Rows(
now.Add(time.Hour).Format(time.DateTime),
now.Add(time.Hour).Format(time.DateTime),
Expand Down Expand Up @@ -522,3 +526,60 @@ func TestTaskCancelledAfterHeartbeatTimeout(t *testing.T) {
`finished {"total_rows":0,"success_rows":0,"error_rows":0,"scan_task_err":""} task-manager-2`,
))
}

func TestHeartBeatErrorNotBlockOthers(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
pool := wrapPoolForTest(dom.SysSessionPool())
defer pool.AssertNoSessionInUse(t)
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)
sessionFactory := sessionFactory(t, dom)

tk.MustExec("set global tidb_ttl_running_tasks = 32")

tk.MustExec("create table test.t(id int, created_at datetime) ttl=created_at + interval 1 day")
testTable, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
for id := 0; id < 4; id++ {
sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW() - INTERVAL 1 DAY, NOW())", testTable.Meta().ID, id)
tk.MustExec(sql)
}

se := sessionFactory()
now := se.Now()

isc := cache.NewInfoSchemaCache(time.Minute)
require.NoError(t, isc.Update(se))
m := ttlworker.NewTaskManager(context.Background(), pool, isc, "task-manager-1", store)
workers := []ttlworker.Worker{}
for j := 0; j < 4; j++ {
scanWorker := ttlworker.NewMockScanWorker(t)
scanWorker.Start()
workers = append(workers, scanWorker)
}
m.SetScanWorkers4Test(workers)
m.RescheduleTasks(se, now)

// All tasks should be scheduled to m1 and running
tk.MustQuery("select count(1) from mysql.tidb_ttl_task where status = 'running' and owner_id = 'task-manager-1'").Check(testkit.Rows("4"))

// Mock the situation that the owner of task 0 has changed
tk.MustExec("update mysql.tidb_ttl_task set owner_id = 'task-manager-2' where scan_id = 0")
tk.MustQuery("select count(1) from mysql.tidb_ttl_task where status = 'running' and owner_id = 'task-manager-1'").Check(testkit.Rows("3"))

now = now.Add(time.Hour)
require.Error(t, m.UpdateHeartBeatForTask(context.Background(), se, now, m.GetRunningTasks()[0]))
for i := 1; i < 4; i++ {
require.NoError(t, m.UpdateHeartBeatForTask(context.Background(), se, now, m.GetRunningTasks()[i]))
}

now = now.Add(time.Hour)
m.UpdateHeartBeat(context.Background(), se, now)
tk.MustQuery("select count(1) from mysql.tidb_ttl_task where status = 'running' and owner_id = 'task-manager-1'").Check(testkit.Rows("3"))
tk.MustQuery("select scan_id, owner_hb_time from mysql.tidb_ttl_task").Sort().Check(testkit.Rows(
fmt.Sprintf("0 %s", now.Add(-2*time.Hour).Format(time.DateTime)),
fmt.Sprintf("1 %s", now.Format(time.DateTime)),
fmt.Sprintf("2 %s", now.Format(time.DateTime)),
fmt.Sprintf("3 %s", now.Format(time.DateTime)),
))
}
15 changes: 10 additions & 5 deletions pkg/ttl/ttlworker/task_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ func (t *runningScanTask) SetResult(err error) {
t.result = t.ttlScanTask.result(err)
}

// UpdateHeartBeat is an exported version of updateHeartBeat
func (m *taskManager) UpdateHeartBeat(ctx context.Context, se session.Session, now time.Time) {
m.updateHeartBeat(ctx, se, now)
}

// UpdateHeartBeatForTask is an exported version of updateHeartBeatForTask
func (m *taskManager) UpdateHeartBeatForTask(ctx context.Context, se session.Session, now time.Time, task *runningScanTask) error {
return m.updateHeartBeatForTask(ctx, se, now, task)
}

// SetCancel sets the cancel function of the task
func (t *runningScanTask) SetCancel(cancel func()) {
t.cancel = cancel
Expand All @@ -111,11 +121,6 @@ func (m *taskManager) CheckInvalidTask(se session.Session) {
m.checkInvalidTask(se)
}

// UpdateHeartBeat is an exported version of updateHeartBeat
func (m *taskManager) UpdateHeartBeat(ctx context.Context, se session.Session, now time.Time) error {
return m.updateHeartBeat(ctx, se, now)
}

func TestResizeWorkers(t *testing.T) {
tbl := newMockTTLTbl(t, "t1")

Expand Down