diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index 8a9a3c17d48cf..9a437d1cdf8d1 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -353,6 +353,24 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) { // and keep the job in memory, it could start the left task in the next window. return } + if !variable.EnableTTLJob.Load() { + if len(m.runningJobs) > 0 { + ctx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout) + + for _, job := range m.runningJobs { + logutil.Logger(m.ctx).Info("cancel job because tidb_ttl_job_enable turned off", zap.String("jobID", job.id), zap.String("statistics", job.statistics.String())) + m.removeJob(job) + err := job.Cancel(ctx, se) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to cancel job", zap.Error(err)) + } + job.finish(se, se.Now()) + } + + cancel() + } + return + } idleScanWorkers := m.idleScanWorkers() if len(idleScanWorkers) == 0 { diff --git a/ttl/ttlworker/job_manager_integration_test.go b/ttl/ttlworker/job_manager_integration_test.go index b4814bed9483a..172754f8dd96a 100644 --- a/ttl/ttlworker/job_manager_integration_test.go +++ b/ttl/ttlworker/job_manager_integration_test.go @@ -17,6 +17,7 @@ package ttlworker_test import ( "context" "fmt" + "strconv" "sync" "testing" "time" @@ -128,8 +129,12 @@ func TestFinishJob(t *testing.T) { func TestTTLAutoAnalyze(t *testing.T) { failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/update-info-schema-cache-interval", fmt.Sprintf("return(%d)", time.Second)) + defer failpoint.Disable("github.com/pingcap/tidb/ttl/ttlworker/update-info-schema-cache-interval") failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/update-status-table-cache-interval", fmt.Sprintf("return(%d)", time.Second)) + defer failpoint.Disable("github.com/pingcap/tidb/ttl/ttlworker/update-status-table-cache-interval") failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/resize-workers-interval", fmt.Sprintf("return(%d)", time.Second)) + defer failpoint.Disable("github.com/pingcap/tidb/ttl/ttlworker/resize-workers-interval") + originAutoAnalyzeMinCnt := handle.AutoAnalyzeMinCnt handle.AutoAnalyzeMinCnt = 0 defer func() { @@ -180,3 +185,56 @@ func TestTTLAutoAnalyze(t *testing.T) { require.NoError(t, h.Update(is)) require.True(t, h.HandleAutoAnalyze(is)) } + +func TestTTLJobDisable(t *testing.T) { + failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/update-info-schema-cache-interval", fmt.Sprintf("return(%d)", time.Second)) + defer failpoint.Disable("github.com/pingcap/tidb/ttl/ttlworker/update-info-schema-cache-interval") + failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/update-status-table-cache-interval", fmt.Sprintf("return(%d)", time.Second)) + defer failpoint.Disable("github.com/pingcap/tidb/ttl/ttlworker/update-status-table-cache-interval") + failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/resize-workers-interval", fmt.Sprintf("return(%d)", time.Second)) + defer failpoint.Disable("github.com/pingcap/tidb/ttl/ttlworker/resize-workers-interval") + + originAutoAnalyzeMinCnt := handle.AutoAnalyzeMinCnt + handle.AutoAnalyzeMinCnt = 0 + defer func() { + handle.AutoAnalyzeMinCnt = originAutoAnalyzeMinCnt + }() + + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("create table t (id int, created_at datetime) ttl = `created_at` + interval 1 day") + + // insert ten rows, the 2,3,4,6,9,10 of them are expired + for i := 1; i <= 10; i++ { + t := time.Now() + if i%2 == 0 || i%3 == 0 { + t = t.Add(-time.Hour * 48) + } + + tk.MustExec("insert into t values(?, ?)", i, t.Format(time.RFC3339)) + } + // turn off the `tidb_ttl_job_enable` + tk.MustExec("set global tidb_ttl_job_enable = 'OFF'") + defer tk.MustExec("set global tidb_ttl_job_enable = 'ON'") + + retryTime := 15 + retryInterval := time.Second * 2 + deleted := false + for retryTime >= 0 { + retryTime-- + time.Sleep(retryInterval) + + rows := tk.MustQuery("select count(*) from t").Rows() + count, err := strconv.Atoi(rows[0][0].(string)) + require.NoError(t, err) + if count < 10 { + deleted = true + break + } + + require.Len(t, dom.TTLJobManager().RunningJobs(), 0) + } + require.False(t, deleted) +} diff --git a/ttl/ttlworker/job_manager_test.go b/ttl/ttlworker/job_manager_test.go index c218c7ee81a08..82a222b088c64 100644 --- a/ttl/ttlworker/job_manager_test.go +++ b/ttl/ttlworker/job_manager_test.go @@ -148,6 +148,11 @@ func (m *JobManager) LockNewJob(ctx context.Context, se session.Session, table * return m.lockNewJob(ctx, se, table, now) } +// RunningJobs returns the running jobs inside ttl job manager +func (m *JobManager) RunningJobs() []*TTLJob { + return m.runningJobs +} + func (j *ttlJob) Finish(se session.Session, now time.Time) { j.finish(se, now) }