Skip to content

Commit

Permalink
ttl: remove TTL job when the table has been dropped (pingcap#51541) (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Mar 8, 2024
1 parent b244b1b commit 1e74617
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pkg/ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ go_test(
embed = [":ttlworker"],
flaky = True,
race = "on",
shard_count = 46,
shard_count = 48,
deps = [
"//pkg/domain",
"//pkg/infoschema",
Expand Down
49 changes: 47 additions & 2 deletions pkg/ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ const taskGCTemplate = `DELETE task FROM
WHERE job.table_id IS NULL`

const ttlJobHistoryGCTemplate = `DELETE FROM mysql.tidb_ttl_job_history WHERE create_time < CURDATE() - INTERVAL 90 DAY`
const ttlTableStatusGCWithoutIDTemplate = `DELETE FROM mysql.tidb_ttl_table_status WHERE current_job_status IS NULL`
const ttlTableStatusGCWithIDTemplate = ttlTableStatusGCWithoutIDTemplate + ` AND table_id NOT IN (%s)`

const timeFormat = time.DateTime

Expand All @@ -76,6 +78,17 @@ func updateHeartBeatSQL(tableID int64, now time.Time, id string) (string, []inte
return updateHeartBeatTemplate, []interface{}{now.Format(timeFormat), tableID, id}
}

func gcTTLTableStatusGCSQL(existIDs []int64) string {
existIDStrs := make([]string, 0, len(existIDs))
for _, id := range existIDs {
existIDStrs = append(existIDStrs, strconv.Itoa(int(id)))
}
if len(existIDStrs) > 0 {
return fmt.Sprintf(ttlTableStatusGCWithIDTemplate, strings.Join(existIDStrs, ","))
}
return ttlTableStatusGCWithoutIDTemplate
}

// JobManager schedules and manages the ttl jobs on this instance
type JobManager struct {
// the `runningJobs`, `scanWorkers` and `delWorkers` should be protected by mutex:
Expand Down Expand Up @@ -203,7 +216,7 @@ func (m *JobManager) jobLoop() error {
}
case <-gcTicker:
gcCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout)
DoGC(gcCtx, se)
m.DoGC(gcCtx, se)
cancel()
// Job Schedule loop:
case <-updateJobHeartBeatTicker:
Expand Down Expand Up @@ -561,6 +574,23 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
return
}

// if the table of a running job disappears, also cancel it
for _, job := range m.runningJobs {
_, ok := m.infoSchemaCache.Tables[job.tbl.ID]
if ok {
continue
}

// when the job is locked, it can be found in `infoSchemaCache`. Therefore, it must have been dropped.
logutil.Logger(m.ctx).Info("cancel job because the table has been dropped or it's no longer TTL table", zap.String("jobID", job.id), zap.Int64("tableID", job.tbl.ID))
summary, err := summarizeErr(errors.New("TTL table has been removed or the TTL on this table has been stopped"))
if err != nil {
logutil.Logger(m.ctx).Info("fail to summarize job", zap.Error(err))
}
m.removeJob(job)
job.finish(se, now, summary)
}

jobTables := m.readyForLockHBTimeoutJobTables(now)
// TODO: also consider to resume tables, but it's fine to left them there, as other nodes will take this job
// when the heart beat is not sent
Expand Down Expand Up @@ -938,7 +968,22 @@ func summarizeTaskResult(tasks []*cache.TTLTask) (*TTLSummary, error) {
}

// DoGC deletes some old TTL job histories and redundant scan tasks
func DoGC(ctx context.Context, se session.Session) {
func (m *JobManager) DoGC(ctx context.Context, se session.Session) {
// Remove the table not exist in info schema cache.
// Delete the table status before deleting the tasks. Therefore the related tasks
if err := m.updateInfoSchemaCache(se); err == nil {
// only remove table status after updating info schema without error
existIDs := make([]int64, 0, len(m.infoSchemaCache.Tables))
for id := range m.infoSchemaCache.Tables {
existIDs = append(existIDs, id)
}
if _, err := se.ExecuteSQL(ctx, gcTTLTableStatusGCSQL(existIDs)); err != nil {
logutil.Logger(ctx).Warn("fail to gc ttl table status", zap.Error(err))
}
} else {
logutil.Logger(m.ctx).Warn("failed to update info schema cache", zap.Error(err))
}

if _, err := se.ExecuteSQL(ctx, taskGCTemplate); err != nil {
logutil.Logger(ctx).Warn("fail to gc redundant scan task", zap.Error(err))
}
Expand Down
101 changes: 99 additions & 2 deletions pkg/ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,62 @@ func TestRescheduleJobs(t *testing.T) {
tk.MustQuery("select last_job_summary->>'$.scan_task_err' from mysql.tidb_ttl_table_status").Check(testkit.Rows("ttl job is disabled"))
}

func TestRescheduleJobsAfterTableDropped(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
sessionFactory := sessionFactory(t, store)

waitAndStopTTLManager(t, dom)

now := time.Now()
createTableSQL := "create table test.t (id int, created_at datetime) ttl = `created_at` + interval 1 minute ttl_job_interval = '1m'"
tk.MustExec("create table test.t (id int, created_at datetime) ttl = `created_at` + interval 1 minute ttl_job_interval = '1m'")
table, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnTTL)

removeBehaviors := []struct {
remove string
resume string
}{
{"drop table test.t", createTableSQL},
{"alter table test.t remove ttl", "alter table test.t ttl = `created_at` + interval 1 minute ttl_job_interval = '1m'"},
{"alter table test.t ttl_enable = 'OFF'", "alter table test.t ttl_enable = 'ON'"},
}
for i, rb := range removeBehaviors {
se := sessionFactory()
m := ttlworker.NewJobManager("manager-1", nil, store, nil, func() bool {
return true
})
m.TaskManager().ResizeWorkersWithSysVar()
require.NoError(t, m.InfoSchemaCache().Update(se))
require.NoError(t, m.TableStatusCache().Update(context.Background(), se))
// submit job
require.NoError(t, m.SubmitJob(se, table.Meta().ID, table.Meta().ID, fmt.Sprintf("request%d", i)))
sql, args := cache.SelectFromTTLTableStatusWithID(table.Meta().ID)
rows, err := se.ExecuteSQL(ctx, sql, args...)
require.NoError(t, err)
tableStatus, err := cache.RowToTableStatus(se, rows[0])
require.NoError(t, err)
require.Equal(t, "manager-1", tableStatus.CurrentJobOwnerID)
// there is already a task
tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1"))

// break the table
tk.MustExec(rb.remove)
require.NoError(t, m.InfoSchemaCache().Update(se))
require.NoError(t, m.TableStatusCache().Update(context.Background(), se))
m.RescheduleJobs(se, time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, now.Nanosecond(), now.Location()))
tk.MustQuery("select last_job_summary->>'$.scan_task_err' from mysql.tidb_ttl_table_status").Check(testkit.Rows("TTL table has been removed or the TTL on this table has been stopped"))

// resume the table
tk.MustExec(rb.resume)
table, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
m.DoGC(context.TODO(), se)
}
}

func TestJobTimeout(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -715,11 +771,48 @@ func TestGCScanTasks(t *testing.T) {
addScanTaskRecord(3, 2, 1)
addScanTaskRecord(3, 2, 2)

m := ttlworker.NewJobManager("manager-1", nil, store, nil, func() bool {
return true
})
se := session.NewSession(tk.Session(), tk.Session(), func(_ session.Session) {})
ttlworker.DoGC(context.TODO(), se)
m.DoGC(context.TODO(), se)
tk.MustQuery("select job_id, scan_id from mysql.tidb_ttl_task order by job_id, scan_id asc").Check(testkit.Rows("1 1", "1 2"))
}

func TestGCTableStatus(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)

// stop TTLJobManager to avoid unnecessary job schedule and make test stable
dom.TTLJobManager().Stop()
require.NoError(t, dom.TTLJobManager().WaitStopped(context.Background(), time.Minute))

// insert table status without corresponding table
tk.MustExec("INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (?, ?)", 2024, 2024)

m := ttlworker.NewJobManager("manager-1", nil, store, nil, func() bool {
return true
})
se := session.NewSession(tk.Session(), tk.Session(), func(_ session.Session) {})
m.DoGC(context.TODO(), se)
tk.MustQuery("select * from mysql.tidb_ttl_table_status").Check(nil)

// insert a running table status without corresponding table
tk.MustExec("INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (?, ?)", 2024, 2024)
tk.MustExec(`UPDATE mysql.tidb_ttl_table_status
SET current_job_id = ?,
current_job_owner_id = '12345',
current_job_start_time = NOW(),
current_job_status = 'running',
current_job_status_update_time = NOW(),
current_job_ttl_expire = NOW(),
current_job_owner_hb_time = NOW()
WHERE table_id = ?`, 1, 2024)
m.DoGC(context.TODO(), se)
// it'll not be removed
tk.MustQuery("select current_job_id from mysql.tidb_ttl_table_status").Check(testkit.Rows("1"))
}

func TestGCTTLHistory(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -757,8 +850,12 @@ func TestGCTTLHistory(t *testing.T) {
addHistory(5, 90)
addHistory(6, 91)
addHistory(7, 100)

m := ttlworker.NewJobManager("manager-1", nil, store, nil, func() bool {
return true
})
se := session.NewSession(tk.Session(), tk.Session(), func(_ session.Session) {})
ttlworker.DoGC(context.TODO(), se)
m.DoGC(context.TODO(), se)
tk.MustQuery("select job_id from mysql.tidb_ttl_job_history order by job_id asc").Check(testkit.Rows("1", "2", "3", "4", "5"))
}

Expand Down

0 comments on commit 1e74617

Please sign in to comment.