diff --git a/ttl/cache/task.go b/ttl/cache/task.go index 2d06958ddb8da..c88a3fe0abb2e 100644 --- a/ttl/cache/task.go +++ b/ttl/cache/task.go @@ -47,11 +47,21 @@ const insertIntoTTLTask = `INSERT LOW_PRIORITY INTO mysql.tidb_ttl_task SET expire_time = %?, created_time = %?` -// SelectFromTTLTaskWithID returns an SQL statement to get all tasks of the specified job in mysql.tidb_ttl_task -func SelectFromTTLTaskWithID(jobID string) (string, []interface{}) { +// SelectFromTTLTaskWithJobID returns an SQL statement to get all tasks of the specified job in mysql.tidb_ttl_task +func SelectFromTTLTaskWithJobID(jobID string) (string, []interface{}) { return selectFromTTLTask + " WHERE job_id = %?", []interface{}{jobID} } +// SelectFromTTLTaskWithID returns an SQL statement to get all tasks of the specified job and scanID in mysql.tidb_ttl_task +func SelectFromTTLTaskWithID(jobID string, scanID int64) (string, []interface{}) { + return selectFromTTLTask + " WHERE job_id = %? AND scan_id = %?", []interface{}{jobID, scanID} +} + +// PeekWaitingTTLTask returns an SQL statement to get `limit` waiting ttl task +func PeekWaitingTTLTask(limit int, hbExpire time.Time) (string, []interface{}) { + return selectFromTTLTask + " WHERE status = 'waiting' OR owner_hb_time < %? ORDER BY created_time ASC LIMIT %?", []interface{}{hbExpire.Format("2006-01-02 15:04:05"), limit} +} + // InsertIntoTTLTask returns an SQL statement to insert a ttl task into mysql.tidb_ttl_task func InsertIntoTTLTask(sctx sessionctx.Context, jobID string, tableID int64, scanID int, scanRangeStart []types.Datum, scanRangeEnd []types.Datum, expireTime time.Time, createdTime time.Time) (string, []interface{}, error) { rangeStart, err := codec.EncodeKey(sctx.GetSessionVars().StmtCtx, []byte{}, scanRangeStart...) @@ -115,8 +125,8 @@ func RowToTTLTask(sctx sessionctx.Context, row chunk.Row) (*TTLTask, error) { } if !row.IsNull(3) { scanRangeStartBuf := row.GetBytes(3) - // it's still posibble to be nil even this column is not NULL - if scanRangeStartBuf != nil { + // it's still posibble to be empty even this column is not NULL + if len(scanRangeStartBuf) > 0 { task.ScanRangeStart, err = codec.Decode(scanRangeStartBuf, len(scanRangeStartBuf)) if err != nil { return nil, err @@ -125,8 +135,8 @@ func RowToTTLTask(sctx sessionctx.Context, row chunk.Row) (*TTLTask, error) { } if !row.IsNull(4) { scanRangeEndBuf := row.GetBytes(4) - // it's still posibble to be nil even this column is not NULL - if scanRangeEndBuf != nil { + // it's still posibble to be empty even this column is not NULL + if len(scanRangeEndBuf) > 0 { task.ScanRangeEnd, err = codec.Decode(scanRangeEndBuf, len(scanRangeEndBuf)) if err != nil { return nil, err diff --git a/ttl/cache/task_test.go b/ttl/cache/task_test.go index 555ad8566f887..ad1f71278b102 100644 --- a/ttl/cache/task_test.go +++ b/ttl/cache/task_test.go @@ -41,7 +41,7 @@ func newTaskGetter(ctx context.Context, t *testing.T, tk *testkit.TestKit) *task } func (tg *taskGetter) mustGetTestTask() *cache.TTLTask { - sql, args := cache.SelectFromTTLTaskWithID("test-job") + sql, args := cache.SelectFromTTLTaskWithJobID("test-job") rs, err := tg.tk.Session().ExecuteInternal(tg.ctx, sql, args...) require.NoError(tg.t, err) rows, err := session.GetRows4Test(context.Background(), tg.tk.Session(), rs) diff --git a/ttl/session/session.go b/ttl/session/session.go index 4fa8bc198c3ac..8fe2d1674e722 100644 --- a/ttl/session/session.go +++ b/ttl/session/session.go @@ -30,6 +30,16 @@ import ( "github.com/pingcap/tidb/util/sqlexec" ) +// TxnMode represents using optimistic or pessimistic mode in the transaction +type TxnMode int + +const ( + // TxnModeOptimistic means using the optimistic transaction with "BEGIN OPTIMISTIC" + TxnModeOptimistic TxnMode = iota + // TxnModePessimistic means using the pessimistic transaction with "BEGIN PESSIMISTIC" + TxnModePessimistic +) + // Session is used to execute queries for TTL case type Session interface { sessionctx.Context @@ -38,7 +48,7 @@ type Session interface { // ExecuteSQL executes the sql ExecuteSQL(ctx context.Context, sql string, args ...interface{}) ([]chunk.Row, error) // RunInTxn executes the specified function in a txn - RunInTxn(ctx context.Context, fn func() error) (err error) + RunInTxn(ctx context.Context, fn func() error, mode TxnMode) (err error) // ResetWithGlobalTimeZone resets the session time zone to global time zone ResetWithGlobalTimeZone(ctx context.Context) error // Close closes the session @@ -94,12 +104,21 @@ func (s *session) ExecuteSQL(ctx context.Context, sql string, args ...interface{ } // RunInTxn executes the specified function in a txn -func (s *session) RunInTxn(ctx context.Context, fn func() error) (err error) { +func (s *session) RunInTxn(ctx context.Context, fn func() error, txnMode TxnMode) (err error) { tracer := metrics.PhaseTracerFromCtx(ctx) defer tracer.EnterPhase(tracer.Phase()) tracer.EnterPhase(metrics.PhaseBeginTxn) - if _, err = s.ExecuteSQL(ctx, "BEGIN OPTIMISTIC"); err != nil { + sql := "BEGIN " + switch txnMode { + case TxnModeOptimistic: + sql += "OPTIMISTIC" + case TxnModePessimistic: + sql += "PESSIMISTIC" + default: + return errors.New("unknown transaction mode") + } + if _, err = s.ExecuteSQL(ctx, sql); err != nil { return err } tracer.EnterPhase(metrics.PhaseOther) diff --git a/ttl/session/session_test.go b/ttl/session/session_test.go index ecabbc8683158..607c7a2319342 100644 --- a/ttl/session/session_test.go +++ b/ttl/session/session_test.go @@ -36,20 +36,20 @@ func TestSessionRunInTxn(t *testing.T) { require.NoError(t, se.RunInTxn(context.TODO(), func() error { tk.MustExec("insert into t values (1, 10)") return nil - })) + }, session.TxnModeOptimistic)) tk2.MustQuery("select * from t order by id asc").Check(testkit.Rows("1 10")) err := se.RunInTxn(context.TODO(), func() error { tk.MustExec("insert into t values (2, 20)") return errors.New("mockErr") - }) + }, session.TxnModeOptimistic) require.EqualError(t, err, "mockErr") tk2.MustQuery("select * from t order by id asc").Check(testkit.Rows("1 10")) require.NoError(t, se.RunInTxn(context.TODO(), func() error { tk.MustExec("insert into t values (3, 30)") return nil - })) + }, session.TxnModeOptimistic)) tk2.MustQuery("select * from t order by id asc").Check(testkit.Rows("1 10", "3 30")) } diff --git a/ttl/ttlworker/BUILD.bazel b/ttl/ttlworker/BUILD.bazel index 48e6a411b1826..f314eca5c01ed 100644 --- a/ttl/ttlworker/BUILD.bazel +++ b/ttl/ttlworker/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "job_manager.go", "scan.go", "session.go", + "task_manager.go", "worker.go", ], importpath = "github.com/pingcap/tidb/ttl/ttlworker", @@ -27,7 +28,6 @@ go_library( "//types", "//util", "//util/chunk", - "//util/hack", "//util/logutil", "//util/sqlexec", "//util/timeutil", @@ -46,16 +46,17 @@ go_test( name = "ttlworker_test", srcs = [ "del_test.go", - "job_integration_test.go", "job_manager_integration_test.go", "job_manager_test.go", - "job_test.go", "scan_test.go", "session_test.go", + "task_manager_integration_test.go", + "task_manager_test.go", ], embed = [":ttlworker"], flaky = True, deps = [ + "//domain", "//infoschema", "//kv", "//parser/ast", diff --git a/ttl/ttlworker/config.go b/ttl/ttlworker/config.go index 55d005a82e6c2..c1774bc667348 100644 --- a/ttl/ttlworker/config.go +++ b/ttl/ttlworker/config.go @@ -30,6 +30,9 @@ const resizeWorkersInterval = 30 * time.Second const splitScanCount = 64 const ttlJobTimeout = 6 * time.Hour +const taskManagerLoopTickerInterval = time.Minute +const ttlTaskHeartBeatTickerInterval = time.Minute + func getUpdateInfoSchemaCacheInterval() time.Duration { failpoint.Inject("update-info-schema-cache-interval", func(val failpoint.Value) time.Duration { return time.Duration(val.(int)) @@ -50,3 +53,10 @@ func getResizeWorkersInterval() time.Duration { }) return resizeWorkersInterval } + +func getTaskManagerLoopTickerInterval() time.Duration { + failpoint.Inject("task-manager-loop-interval", func(val failpoint.Value) time.Duration { + return time.Duration(val.(int)) + }) + return taskManagerLoopTickerInterval +} diff --git a/ttl/ttlworker/job.go b/ttl/ttlworker/job.go index 4b5ffb147bf1d..f2a78e7ef0270 100644 --- a/ttl/ttlworker/job.go +++ b/ttl/ttlworker/job.go @@ -16,14 +16,12 @@ package ttlworker import ( "context" - "encoding/json" "sync" "time" "github.com/pingcap/errors" "github.com/pingcap/tidb/ttl/cache" "github.com/pingcap/tidb/ttl/session" - "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" ) @@ -44,7 +42,6 @@ const finishJobTemplate = `UPDATE mysql.tidb_ttl_table_status current_job_status = NULL, current_job_status_update_time = NULL WHERE table_id = %? AND current_job_id = %?` -const updateJobStateTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_state = %? WHERE table_id = %? AND current_job_id = %? AND current_job_owner_id = %?" const removeTaskForJobTemplate = "DELETE FROM mysql.tidb_ttl_task WHERE job_id = %?" func updateJobCurrentStatusSQL(tableID int64, oldStatus cache.JobStatus, newStatus cache.JobStatus, jobID string) (string, []interface{}) { @@ -55,10 +52,6 @@ func finishJobSQL(tableID int64, finishTime time.Time, summary string, jobID str return finishJobTemplate, []interface{}{finishTime.Format(timeFormat), summary, tableID, jobID} } -func updateJobState(tableID int64, currentJobID string, currentJobState string, currentJobOwnerID string) (string, []interface{}) { - return updateJobStateTemplate, []interface{}{currentJobState, tableID, currentJobID, currentJobOwnerID} -} - func removeTaskForJob(jobID string) (string, []interface{}) { return removeTaskForJobTemplate, []interface{}{jobID} } @@ -67,78 +60,23 @@ type ttlJob struct { id string ownerID string - ctx context.Context - cancel func() - createTime time.Time tbl *cache.PhysicalTable - tasks []*ttlScanTask - taskIter int - finishedScanTaskCounter int - scanTaskErr error - // status is the only field which should be protected by a mutex, as `Cancel` may be called at any time, and will // change the status statusMutex sync.Mutex status cache.JobStatus - - statistics *ttlStatistics -} - -// changeStatus updates the state of this job -func (job *ttlJob) changeStatus(ctx context.Context, se session.Session, status cache.JobStatus) error { - job.statusMutex.Lock() - oldStatus := job.status - job.status = status - job.statusMutex.Unlock() - - sql, args := updateJobCurrentStatusSQL(job.tbl.ID, oldStatus, status, job.id) - _, err := se.ExecuteSQL(ctx, sql, args...) - if err != nil { - return errors.Wrapf(err, "execute sql: %s", sql) - } - - return nil -} - -func (job *ttlJob) updateState(ctx context.Context, se session.Session) error { - summary, err := job.summary() - if err != nil { - logutil.Logger(job.ctx).Warn("fail to generate summary for ttl job", zap.Error(err)) - } - sql, args := updateJobState(job.tbl.ID, job.id, summary, job.ownerID) - _, err = se.ExecuteSQL(ctx, sql, args...) - if err != nil { - return errors.Wrapf(err, "execute sql: %s", sql) - } - - return nil -} - -// peekScanTask returns the next scan task, but doesn't promote the iterator -func (job *ttlJob) peekScanTask() *ttlScanTask { - return job.tasks[job.taskIter] -} - -// nextScanTask promotes the iterator -func (job *ttlJob) nextScanTask() { - job.taskIter += 1 } // finish turns current job into last job, and update the error message and statistics summary -func (job *ttlJob) finish(se session.Session, now time.Time) { - summary, err := job.summary() - if err != nil { - logutil.Logger(job.ctx).Warn("fail to generate summary for ttl job", zap.Error(err)) - } - +func (job *ttlJob) finish(se session.Session, now time.Time, summary string) { // at this time, the job.ctx may have been canceled (to cancel this job) // even when it's canceled, we'll need to update the states, so use another context - err = se.RunInTxn(context.TODO(), func() error { + err := se.RunInTxn(context.TODO(), func() error { sql, args := finishJobSQL(job.tbl.ID, now, summary, job.id) - _, err = se.ExecuteSQL(context.TODO(), sql, args...) + _, err := se.ExecuteSQL(context.TODO(), sql, args...) if err != nil { return errors.Wrapf(err, "execute sql: %s", sql) } @@ -150,94 +88,9 @@ func (job *ttlJob) finish(se session.Session, now time.Time) { } return nil - }) + }, session.TxnModeOptimistic) if err != nil { - logutil.Logger(job.ctx).Error("fail to finish a ttl job", zap.Error(err), zap.Int64("tableID", job.tbl.ID), zap.String("jobID", job.id)) + logutil.BgLogger().Error("fail to finish a ttl job", zap.Error(err), zap.Int64("tableID", job.tbl.ID), zap.String("jobID", job.id)) } } - -// AllSpawned returns whether all scan tasks have been dumped out -// **This function will be called concurrently, in many workers' goroutine** -func (job *ttlJob) AllSpawned() bool { - return job.taskIter == len(job.tasks) && len(job.tasks) != 0 -} - -// Timeout will return whether the job has timeout, if it is, it will be killed -func (job *ttlJob) Timeout(ctx context.Context, se session.Session, now time.Time) bool { - if !job.createTime.Add(ttlJobTimeout).Before(now) { - return false - } - - err := job.changeStatus(ctx, se, cache.JobStatusTimeout) - if err != nil { - logutil.BgLogger().Info("fail to update status of ttl job", zap.String("jobID", job.id), zap.Error(err)) - } - - return true -} - -// Finished returns whether the job is finished -func (job *ttlJob) Finished() bool { - job.statusMutex.Lock() - defer job.statusMutex.Unlock() - // in three condition, a job is considered finished: - // 1. It's cancelled manually - // 2. All scan tasks have been finished, and all selected rows succeed or in error state - // 3. The job is created one hour ago. It's a timeout. - return job.status == cache.JobStatusCancelled || (job.AllSpawned() && job.finishedScanTaskCounter == len(job.tasks) && job.statistics.TotalRows.Load() == job.statistics.ErrorRows.Load()+job.statistics.SuccessRows.Load()) -} - -// Cancel cancels the job context -func (job *ttlJob) Cancel(ctx context.Context, se session.Session) error { - if job.cancel != nil { - job.cancel() - } - // TODO: wait until all tasks have been finished - return job.changeStatus(ctx, se, cache.JobStatusCancelled) -} - -func findJobWithTableID(jobs []*ttlJob, id int64) *ttlJob { - for _, j := range jobs { - if j.tbl.ID == id { - return j - } - } - - return nil -} - -type ttlSummary struct { - TotalRows uint64 `json:"total_rows"` - SuccessRows uint64 `json:"success_rows"` - ErrorRows uint64 `json:"error_rows"` - - TotalScanTask int `json:"total_scan_task"` - ScheduledScanTask int `json:"scheduled_scan_task"` - FinishedScanTask int `json:"finished_scan_task"` - - ScanTaskErr string `json:"scan_task_err,omitempty"` -} - -func (job *ttlJob) summary() (string, error) { - summary := &ttlSummary{ - TotalRows: job.statistics.TotalRows.Load(), - SuccessRows: job.statistics.SuccessRows.Load(), - ErrorRows: job.statistics.ErrorRows.Load(), - - TotalScanTask: len(job.tasks), - ScheduledScanTask: job.taskIter, - FinishedScanTask: job.finishedScanTaskCounter, - } - - if job.scanTaskErr != nil { - summary.ScanTaskErr = job.scanTaskErr.Error() - } - - summaryJSON, err := json.Marshal(summary) - if err != nil { - return "", err - } - - return string(hack.String(summaryJSON)), nil -} diff --git a/ttl/ttlworker/job_integration_test.go b/ttl/ttlworker/job_integration_test.go deleted file mode 100644 index b90c4a550d858..0000000000000 --- a/ttl/ttlworker/job_integration_test.go +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ttlworker_test - -import ( - "context" - "testing" - - dbsession "github.com/pingcap/tidb/session" - "github.com/pingcap/tidb/testkit" - "github.com/pingcap/tidb/ttl/cache" - "github.com/pingcap/tidb/ttl/session" - "github.com/pingcap/tidb/ttl/ttlworker" - "github.com/stretchr/testify/require" -) - -func TestChangeStatus(t *testing.T) { - store := testkit.CreateMockStore(t) - tk := testkit.NewTestKit(t, store) - - dbSession, err := dbsession.CreateSession4Test(store) - require.NoError(t, err) - se := session.NewSession(dbSession, dbSession, nil) - - job := ttlworker.NewTTLJob(&cache.PhysicalTable{ID: 0}, "0", cache.JobStatusWaiting) - tk.MustExec("insert into mysql.tidb_ttl_table_status(table_id,current_job_id,current_job_status) VALUES(0, '0', 'waiting')") - require.NoError(t, job.ChangeStatus(context.Background(), se, cache.JobStatusRunning)) - tk.MustQuery("select current_job_status from mysql.tidb_ttl_table_status").Check(testkit.Rows("running")) -} diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index 910038666c1b6..223128b52f26c 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -49,6 +49,12 @@ const setTableStatusOwnerTemplate = `UPDATE mysql.tidb_ttl_table_status current_job_owner_hb_time = %? WHERE table_id = %?` const updateHeartBeatTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_owner_hb_time = %? WHERE table_id = %? AND current_job_owner_id = %?" +const taskGCTemplate = `DELETE task FROM + mysql.tidb_ttl_task task + left join + mysql.tidb_ttl_table_status job + ON task.job_id = job.current_job_id + WHERE job.table_id IS NULL` const timeFormat = "2006-01-02 15:04:05" @@ -79,11 +85,6 @@ type JobManager struct { store kv.Storage cmdCli client.CommandClient - // the workers are shared between the loop goroutine and other sessions (e.g. manually resize workers through - // setting variables) - scanWorkers []worker - delWorkers []worker - // infoSchemaCache and tableStatusCache are a cache stores the information from info schema and the tidb_ttl_table_status // table. They don't need to be protected by mutex, because they are only used in job loop goroutine. infoSchemaCache *cache.InfoSchemaCache @@ -95,8 +96,7 @@ type JobManager struct { // to poll scan tasks from the job in the future when there are scan workers in idle. runningJobs []*ttlJob - delCh chan *ttlDeleteTask - notifyStateCh chan interface{} + taskManager *taskManager } // NewJobManager creates a new ttl job manager @@ -105,11 +105,9 @@ func NewJobManager(id string, sessPool sessionPool, store kv.Storage, etcdCli *c manager.id = id manager.store = store manager.sessPool = sessPool - manager.delCh = make(chan *ttlDeleteTask) - manager.notifyStateCh = make(chan interface{}, 1) manager.init(manager.jobLoop) - manager.ctx = logutil.WithKeyValue(manager.ctx, "ttl-worker", "manager") + manager.ctx = logutil.WithKeyValue(manager.ctx, "ttl-worker", "job-manager") manager.infoSchemaCache = cache.NewInfoSchemaCache(getUpdateInfoSchemaCacheInterval()) manager.tableStatusCache = cache.NewTableStatusCache(getUpdateTTLTableStatusCacheInterval()) @@ -120,6 +118,8 @@ func NewJobManager(id string, sessPool sessionPool, store kv.Storage, etcdCli *c manager.cmdCli = client.NewMockCommandClient() } + manager.taskManager = newTaskManager(manager.ctx, sessPool, manager.infoSchemaCache, id) + return } @@ -130,25 +130,33 @@ func (m *JobManager) jobLoop() error { } defer func() { - err = multierr.Combine(err, multierr.Combine(m.resizeScanWorkers(0), m.resizeDelWorkers(0))) + err = multierr.Combine(err, multierr.Combine(m.taskManager.resizeScanWorkers(0), m.taskManager.resizeDelWorkers(0))) se.Close() logutil.Logger(m.ctx).Info("ttlJobManager loop exited.") }() - scheduleTicker := time.Tick(jobManagerLoopTickerInterval) - updateHeartBeatTicker := time.Tick(jobManagerLoopTickerInterval) - jobCheckTicker := time.Tick(jobManagerLoopTickerInterval) - updateScanTaskStateTicker := time.Tick(jobManagerLoopTickerInterval) infoSchemaCacheUpdateTicker := time.Tick(m.infoSchemaCache.GetInterval()) tableStatusCacheUpdateTicker := time.Tick(m.tableStatusCache.GetInterval()) resizeWorkersTicker := time.Tick(getResizeWorkersInterval()) + taskGC := time.Tick(jobManagerLoopTickerInterval) + + scheduleJobTicker := time.Tick(jobManagerLoopTickerInterval) + jobCheckTicker := time.Tick(jobManagerLoopTickerInterval) + updateJobHeartBeatTicker := time.Tick(jobManagerLoopTickerInterval) + + scheduleTaskTicker := time.Tick(getTaskManagerLoopTickerInterval()) + updateTaskHeartBeatTicker := time.Tick(ttlTaskHeartBeatTickerInterval) + taskCheckTicker := time.Tick(getTaskManagerLoopTickerInterval()) + checkScanTaskFinishedTicker := time.Tick(getTaskManagerLoopTickerInterval()) + cmdWatcher := m.cmdCli.WatchCommand(m.ctx) - m.resizeWorkersWithSysVar() + m.taskManager.resizeWorkersWithSysVar() for { m.reportMetrics() now := se.Now() select { + // misc case <-m.ctx.Done(): return nil case <-infoSchemaCacheUpdateTicker: @@ -161,29 +169,25 @@ func (m *JobManager) jobLoop() error { if err != nil { logutil.Logger(m.ctx).Warn("fail to update table status cache", zap.Error(err)) } - case <-updateHeartBeatTicker: - updateHeartBeatCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout) - err = m.updateHeartBeat(updateHeartBeatCtx, se) + case <-taskGC: + taskGCCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout) + _, err = se.ExecuteSQL(taskGCCtx, taskGCTemplate) if err != nil { - logutil.Logger(m.ctx).Warn("fail to update heart beat", zap.Error(err)) + logutil.Logger(m.ctx).Warn("fail to gc redundant scan task", zap.Error(err)) } cancel() - case <-updateScanTaskStateTicker: - if m.updateTaskState() { - m.checkFinishedJob(se, now) - m.rescheduleJobs(se, now) - } - case <-m.notifyStateCh: - if m.updateTaskState() { - m.checkFinishedJob(se, now) - m.rescheduleJobs(se, now) + // Job Schedule loop: + case <-updateJobHeartBeatTicker: + updateHeartBeatCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout) + err = m.updateHeartBeat(updateHeartBeatCtx, se, now) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to update job heart beat", zap.Error(err)) } + cancel() case <-jobCheckTicker: - m.checkFinishedJob(se, now) + m.checkFinishedJob(se) m.checkNotOwnJob() - case <-resizeWorkersTicker: - m.resizeWorkersWithSysVar() - case <-scheduleTicker: + case <-scheduleJobTicker: m.rescheduleJobs(se, now) case cmd, ok := <-cmdWatcher: if !ok { @@ -200,18 +204,31 @@ func (m *JobManager) jobLoop() error { m.triggerTTLJob(cmd.RequestID, triggerJobCmd, se) m.rescheduleJobs(se, now) } - } - } -} -func (m *JobManager) resizeWorkersWithSysVar() { - err := m.resizeScanWorkers(int(variable.TTLScanWorkerCount.Load())) - if err != nil { - logutil.Logger(m.ctx).Warn("fail to resize scan workers", zap.Error(err)) - } - err = m.resizeDelWorkers(int(variable.TTLDeleteWorkerCount.Load())) - if err != nil { - logutil.Logger(m.ctx).Warn("fail to resize delete workers", zap.Error(err)) + // Task Manager Loop + case <-scheduleTaskTicker: + m.taskManager.rescheduleTasks(se, now) + case <-taskCheckTicker: + m.taskManager.checkInvalidTask(se) + m.taskManager.checkFinishedTask(se, now) + case <-resizeWorkersTicker: + m.taskManager.resizeWorkersWithSysVar() + case <-updateTaskHeartBeatTicker: + updateHeartBeatCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout) + err = m.taskManager.updateHeartBeat(updateHeartBeatCtx, se, now) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to update task heart beat", zap.Error(err)) + } + cancel() + case <-checkScanTaskFinishedTicker: + if m.taskManager.handleScanFinishedTask() { + m.taskManager.rescheduleTasks(se, now) + } + case <-m.taskManager.notifyStateCh: + if m.taskManager.handleScanFinishedTask() { + m.taskManager.rescheduleTasks(se, now) + } + } } } @@ -325,272 +342,101 @@ func (m *JobManager) reportMetrics() { metrics.CancellingJobsCnt.Set(cancellingJobs) } -func (m *JobManager) resizeScanWorkers(count int) error { - var err error - var canceledWorkers []worker - m.scanWorkers, canceledWorkers, err = m.resizeWorkers(m.scanWorkers, count, func() worker { - return newScanWorker(m.delCh, m.notifyStateCh, m.sessPool) - }) - for _, w := range canceledWorkers { - s := w.(scanWorker) - - var tableID int64 - var scanErr error - result := s.PollTaskResult() - if result != nil { - tableID = result.task.tbl.ID - scanErr = result.err - } else { - // if the scan worker failed to poll the task, it's possible that the `WaitStopped` has timeout - // we still consider the scan task as finished - curTask := s.CurrentTask() - if curTask == nil { - continue - } - tableID = curTask.tbl.ID - scanErr = errors.New("timeout to cancel scan task") - } - - job := findJobWithTableID(m.runningJobs, tableID) - if job == nil { - logutil.Logger(m.ctx).Warn("task state changed but job not found", zap.Int64("tableID", tableID)) - continue - } - logutil.Logger(m.ctx).Debug("scan task finished", zap.String("jobID", job.id)) - job.finishedScanTaskCounter += 1 - job.scanTaskErr = multierr.Append(job.scanTaskErr, scanErr) - } - return err -} - -func (m *JobManager) resizeDelWorkers(count int) error { - var err error - m.delWorkers, _, err = m.resizeWorkers(m.delWorkers, count, func() worker { - return newDeleteWorker(m.delCh, m.sessPool) - }) - return err -} - -// resizeWorkers scales the worker, and returns the full set of workers as the first return value. If there are workers -// stopped, return the stopped worker in the second return value -func (m *JobManager) resizeWorkers(workers []worker, count int, factory func() worker) ([]worker, []worker, error) { - if count < len(workers) { - logutil.Logger(m.ctx).Info("shrink ttl worker", zap.Int("originalCount", len(workers)), zap.Int("newCount", count)) - - for _, w := range workers[count:] { - w.Stop() - } - - var errs error - // don't use `m.ctx` here, because when shutdown the server, `m.ctx` has already been cancelled - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - for _, w := range workers[count:] { - err := w.WaitStopped(ctx, 30*time.Second) - if err != nil { - logutil.Logger(m.ctx).Warn("fail to stop ttl worker", zap.Error(err)) - errs = multierr.Append(errs, err) - } - } - cancel() - - // remove the existing workers, and keep the left workers - return workers[:count], workers[count:], errs - } - - if count > len(workers) { - logutil.Logger(m.ctx).Info("scale ttl worker", zap.Int("originalCount", len(workers)), zap.Int("newCount", count)) - - for i := len(workers); i < count; i++ { - w := factory() - w.Start() - workers = append(workers, w) - } - return workers, nil, nil - } - - return workers, nil, nil -} - -// updateTaskState polls the result from scan worker and returns whether there are result polled -func (m *JobManager) updateTaskState() bool { - results := m.pollScanWorkerResults() - for _, result := range results { - logger := logutil.Logger(m.ctx).With(zap.Int64("tableID", result.task.tbl.ID)) - if result.err != nil { - logger = logger.With(zap.Error(result.err)) - } - - job := findJobWithTableID(m.runningJobs, result.task.tbl.ID) - if job == nil { - logger.Warn("task state changed but job not found", zap.Int64("tableID", result.task.tbl.ID)) - continue - } - logger.Info("scan task finished", zap.String("jobID", job.id)) - - job.finishedScanTaskCounter += 1 - job.scanTaskErr = multierr.Append(job.scanTaskErr, result.err) - } - - return len(results) > 0 -} - -func (m *JobManager) pollScanWorkerResults() []*ttlScanTaskExecResult { - results := make([]*ttlScanTaskExecResult, 0, len(m.scanWorkers)) - for _, w := range m.scanWorkers { - worker := w.(scanWorker) - result := worker.PollTaskResult() - if result != nil { - results = append(results, result) - } - } - - return results -} - // checkNotOwnJob removes the job whose current job owner is not yourself func (m *JobManager) checkNotOwnJob() { for _, job := range m.runningJobs { tableStatus := m.tableStatusCache.Tables[job.tbl.ID] if tableStatus == nil || tableStatus.CurrentJobOwnerID != m.id { - logger := logutil.Logger(m.ctx).With(zap.String("jobID", job.id), zap.String("statistics", job.statistics.String())) + logger := logutil.Logger(m.ctx).With(zap.String("jobID", job.id)) if tableStatus != nil { logger.With(zap.String("newJobOwnerID", tableStatus.CurrentJobOwnerID)) } logger.Info("job has been taken over by another node") m.removeJob(job) - job.cancel() } } } -func (m *JobManager) checkFinishedJob(se session.Session, now time.Time) { +func (m *JobManager) checkFinishedJob(se session.Session) { +j: for _, job := range m.runningJobs { timeoutJobCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout) - if job.Finished() { - logutil.Logger(m.ctx).Info("job has finished", zap.String("jobID", job.id), zap.String("statistics", job.statistics.String())) - m.removeJob(job) - job.finish(se, se.Now()) - } else if job.Timeout(timeoutJobCtx, se, now) { - logutil.Logger(m.ctx).Info("job is timeout", zap.String("jobID", job.id), zap.String("statistics", job.statistics.String())) - m.removeJob(job) - err := job.Cancel(timeoutJobCtx, se) + + sql, args := cache.SelectFromTTLTaskWithJobID(job.id) + rows, err := se.ExecuteSQL(timeoutJobCtx, sql, args...) + cancel() + if err != nil { + logutil.Logger(m.ctx).Warn("fail to execute sql", zap.String("sql", sql), zap.Any("args", args), zap.Error(err)) + continue + } + + allFinished := true + allTasks := make([]*cache.TTLTask, 0, len(rows)) + for _, r := range rows { + task, err := cache.RowToTTLTask(se, r) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to read task", zap.Error(err)) + continue j + } + allTasks = append(allTasks, task) + + if task.Status != "finished" { + allFinished = false + } + } + + if allFinished { + logutil.Logger(m.ctx).Info("job has finished", zap.String("jobID", job.id)) + summary, err := summarizeTaskResult(allTasks) if err != nil { - logutil.Logger(m.ctx).Warn("fail to cancel job", zap.Error(err)) + logutil.Logger(m.ctx).Info("fail to summarize job", zap.Error(err)) } - job.finish(se, se.Now()) + m.removeJob(job) + job.finish(se, se.Now(), summary) } cancel() } } func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) { - if !timeutil.WithinDayTimePeriod(variable.TTLJobScheduleWindowStartTime.Load(), variable.TTLJobScheduleWindowEndTime.Load(), now) { - // Local jobs will also not run, but as the server is still sending heartbeat, - // and keep the job in memory, it could start the left task in the next window. - return - } - if !variable.EnableTTLJob.Load() { + if !variable.EnableTTLJob.Load() || !timeutil.WithinDayTimePeriod(variable.TTLJobScheduleWindowStartTime.Load(), variable.TTLJobScheduleWindowEndTime.Load(), now) { if len(m.runningJobs) > 0 { - ctx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout) - for _, job := range m.runningJobs { - logutil.Logger(m.ctx).Info("cancel job because tidb_ttl_job_enable turned off", zap.String("jobID", job.id), zap.String("statistics", job.statistics.String())) - m.removeJob(job) - err := job.Cancel(ctx, se) + logutil.Logger(m.ctx).Info("cancel job because tidb_ttl_job_enable turned off", zap.String("jobID", job.id)) + + summary, err := summarizeErr(errors.New("ttl job is disabled")) if err != nil { - logutil.Logger(m.ctx).Warn("fail to cancel job", zap.Error(err)) + logutil.Logger(m.ctx).Info("fail to summarize job", zap.Error(err)) } - job.finish(se, se.Now()) + m.removeJob(job) + job.finish(se, now, summary) } - - cancel() } return } - idleScanWorkers := m.idleScanWorkers() - if len(idleScanWorkers) == 0 { + // don't lock job if there's no free scan workers in local + // it's a mechanism to avoid too many scan tasks waiting in the ttl_tasks table. + if len(m.taskManager.idleScanWorkers()) == 0 { return } - localJobs := m.localJobs() newJobTables := m.readyForNewJobTables(now) // TODO: also consider to resume tables, but it's fine to left them there, as other nodes will take this job // when the heart beat is not sent - for len(idleScanWorkers) > 0 && (len(newJobTables) > 0 || len(localJobs) > 0) { - var job *ttlJob - var err error - - switch { - case len(localJobs) > 0: - job = localJobs[0] - localJobs = localJobs[1:] - case len(newJobTables) > 0: - table := newJobTables[0] - newJobTables = newJobTables[1:] - logutil.Logger(m.ctx).Info("try lock new job", zap.Int64("tableID", table.ID)) - job, err = m.lockNewJob(m.ctx, se, table, now, false) - if job != nil { - logutil.Logger(m.ctx).Info("append new running job", zap.String("jobID", job.id), zap.Int64("tableID", job.tbl.ID)) - m.appendJob(job) - } + for _, table := range newJobTables { + logutil.Logger(m.ctx).Info("try lock new job", zap.Int64("tableID", table.ID)) + job, err := m.lockNewJob(m.ctx, se, table, now, false) + if job != nil { + logutil.Logger(m.ctx).Info("append new running job", zap.String("jobID", job.id), zap.Int64("tableID", job.tbl.ID)) + m.appendJob(job) } if err != nil { logutil.Logger(m.ctx).Warn("fail to create new job", zap.Error(err)) } - if job == nil { - continue - } - - for !job.AllSpawned() { - task := job.peekScanTask() - logger := logutil.Logger(m.ctx).With(zap.String("jobID", job.id), zap.String("table", task.tbl.TableInfo.Name.L)) - if task.tbl.PartitionDef != nil { - logger = logger.With(zap.String("partition", task.tbl.PartitionDef.Name.L)) - } - - for len(idleScanWorkers) > 0 { - idleWorker := idleScanWorkers[0] - idleScanWorkers = idleScanWorkers[1:] - - err := idleWorker.Schedule(task) - if err != nil { - logger.Info("fail to schedule task", zap.Error(err)) - continue - } - - ctx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout) - err = job.changeStatus(ctx, se, cache.JobStatusRunning) - if err != nil { - // not a big problem, current logic doesn't depend on the job status to promote - // the routine, so we could just print a log here - logger.Error("change ttl job status", zap.Error(err), zap.String("id", job.id)) - } - cancel() - - logger.Info("scheduled ttl task") - - job.nextScanTask() - break - } - - if len(idleScanWorkers) == 0 { - break - } - } } } -func (m *JobManager) idleScanWorkers() []scanWorker { - workers := make([]scanWorker, 0, len(m.scanWorkers)) - for _, w := range m.scanWorkers { - if w.(scanWorker).Idle() { - workers = append(workers, w.(scanWorker)) - } - } - return workers -} - func (m *JobManager) localJobs() []*ttlJob { jobs := make([]*ttlJob, 0, len(m.runningJobs)) for _, job := range m.runningJobs { @@ -675,7 +521,10 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * err := se.RunInTxn(ctx, func() error { sql, args := cache.SelectFromTTLTableStatusWithID(table.ID) - rows, err := se.ExecuteSQL(ctx, sql, args...) + // use ` FOR UPDATE NOWAIT`, then if the new job has been locked by other nodes, it will return: + // [tikv:3572]Statement aborted because lock(s) could not be acquired immediately and NOWAIT is set. + // Then this tidb node will not waste resource in calculating the ranges. + rows, err := se.ExecuteSQL(ctx, sql+" FOR UPDATE NOWAIT", args...) if err != nil { return errors.Wrapf(err, "execute sql: %s", sql) } @@ -687,7 +536,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * return errors.Wrapf(err, "execute sql: %s", sql) } sql, args = cache.SelectFromTTLTableStatusWithID(table.ID) - rows, err = se.ExecuteSQL(ctx, sql, args...) + rows, err = se.ExecuteSQL(ctx, sql+" FOR UPDATE NOWAIT", args...) if err != nil { return errors.Wrapf(err, "execute sql: %s", sql) } @@ -709,6 +558,14 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * } jobID := uuid.New().String() + jobExist := false + if len(tableStatus.CurrentJobID) > 0 { + // don't create new job if there is already one running + // so the running tasks don't need to be cancelled + jobID = tableStatus.CurrentJobID + expireTime = tableStatus.CurrentJobTTLExpire + jobExist = true + } failpoint.Inject("set-job-uuid", func(val failpoint.Value) { jobID = val.(string) }) @@ -719,6 +576,11 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * return errors.Wrapf(err, "execute sql: %s", sql) } + // if the job already exist, don't need to submit scan tasks + if jobExist { + return nil + } + ranges, err := table.SplitScanRanges(ctx, m.store, splitScanCount) if err != nil { return errors.Wrap(err, "split scan ranges") @@ -735,7 +597,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * } return nil - }) + }, session.TxnModePessimistic) if err != nil { return nil, err } @@ -749,64 +611,44 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * if err != nil { return nil, err } - return m.createNewJob(expireTime, now, table) + return m.createNewJob(now, table) } -func (m *JobManager) createNewJob(expireTime time.Time, now time.Time, table *cache.PhysicalTable) (*ttlJob, error) { +func (m *JobManager) createNewJob(now time.Time, table *cache.PhysicalTable) (*ttlJob, error) { id := m.tableStatusCache.Tables[table.ID].CurrentJobID - statistics := &ttlStatistics{} - - ranges, err := table.SplitScanRanges(m.ctx, m.store, splitScanCount) - if err != nil { - return nil, err - } - - jobCtx, cancel := context.WithCancel(m.ctx) - - scanTasks := make([]*ttlScanTask, 0, len(ranges)) - for _, r := range ranges { - scanTasks = append(scanTasks, &ttlScanTask{ - ctx: jobCtx, - tbl: table, - expire: expireTime, - scanRange: r, - statistics: statistics, - }) - } - return &ttlJob{ id: id, ownerID: m.id, - ctx: jobCtx, - cancel: cancel, - createTime: now, // at least, the info schema cache and table status cache are consistent in table id, so it's safe to get table // information from schema cache directly - tbl: table, - tasks: scanTasks, + tbl: table, - status: cache.JobStatusWaiting, - statistics: statistics, + status: cache.JobStatusWaiting, }, nil } // updateHeartBeat updates the heartbeat for all task with current instance as owner -func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session) error { - now := se.Now() +func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) error { for _, job := range m.localJobs() { + if job.createTime.Add(ttlJobTimeout).Before(now) { + logutil.Logger(m.ctx).Info("job is timeout", zap.String("jobID", job.id)) + summary, err := summarizeErr(errors.New("job is timeout")) + if err != nil { + logutil.Logger(m.ctx).Info("fail to summarize job", zap.Error(err)) + } + m.removeJob(job) + job.finish(se, now, summary) + continue + } + sql, args := updateHeartBeatSQL(job.tbl.ID, now, m.id) _, err := se.ExecuteSQL(ctx, sql, args...) if err != nil { return errors.Wrapf(err, "execute sql: %s", sql) } - // also updates some internal state for this job - err = job.updateState(ctx, se) - if err != nil { - logutil.Logger(m.ctx).Warn("fail to update state of the job", zap.String("jobID", job.id)) - } } return nil } @@ -840,25 +682,63 @@ func (m *JobManager) appendJob(job *ttlJob) { m.runningJobs = append(m.runningJobs, job) } -// CancelJob cancels a job -// TODO: the delete task is not controlled by the context now (but controlled by the worker context), so cancel -// doesn't work for delete tasks. -func (m *JobManager) CancelJob(ctx context.Context, jobID string) error { - se, err := getSession(m.sessPool) +// GetCommandCli returns the command client +func (m *JobManager) GetCommandCli() client.CommandClient { + return m.cmdCli +} + +type ttlSummary struct { + TotalRows uint64 `json:"total_rows"` + SuccessRows uint64 `json:"success_rows"` + ErrorRows uint64 `json:"error_rows"` + + TotalScanTask int `json:"total_scan_task"` + ScheduledScanTask int `json:"scheduled_scan_task"` + FinishedScanTask int `json:"finished_scan_task"` + + ScanTaskErr string `json:"scan_task_err,omitempty"` +} + +func summarizeErr(err error) (string, error) { + summary := &ttlSummary{ + ScanTaskErr: err.Error(), + } + + buf, err := json.Marshal(summary) if err != nil { - return err + return "", err } + return string(buf), nil +} - for _, job := range m.runningJobs { - if job.id == jobID { - return job.Cancel(ctx, se) +func summarizeTaskResult(tasks []*cache.TTLTask) (string, error) { + summary := &ttlSummary{} + var allErr error + for _, t := range tasks { + if t.State != nil { + summary.TotalRows += t.State.TotalRows + summary.SuccessRows += t.State.SuccessRows + summary.ErrorRows += t.State.ErrorRows + if len(t.State.ScanTaskErr) > 0 { + allErr = multierr.Append(allErr, errors.New(t.State.ScanTaskErr)) + } } - } - return errors.Errorf("cannot find the job with id: %s", jobID) -} + summary.TotalScanTask += 1 + if t.Status != cache.TaskStatusWaiting { + summary.ScheduledScanTask += 1 + } + if t.Status == cache.TaskStatusFinished { + summary.FinishedScanTask += 1 + } + } + if allErr != nil { + summary.ScanTaskErr = allErr.Error() + } -// GetCommandCli returns the command client -func (m *JobManager) GetCommandCli() client.CommandClient { - return m.cmdCli + buf, err := json.Marshal(summary) + if err != nil { + return "", err + } + return string(buf), nil } diff --git a/ttl/ttlworker/job_manager_integration_test.go b/ttl/ttlworker/job_manager_integration_test.go index a1e284540a594..c763e1363aecd 100644 --- a/ttl/ttlworker/job_manager_integration_test.go +++ b/ttl/ttlworker/job_manager_integration_test.go @@ -22,8 +22,8 @@ import ( "testing" "time" - "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" @@ -56,7 +56,8 @@ func sessionFactory(t *testing.T, store kv.Storage) func() session.Session { } func TestParallelLockNewJob(t *testing.T) { - store := testkit.CreateMockStore(t) + store, dom := testkit.CreateMockStoreAndDomain(t) + waitAndStopTTLManager(t, dom) sessionFactory := sessionFactory(t, store) @@ -68,7 +69,7 @@ func TestParallelLockNewJob(t *testing.T) { se := sessionFactory() job, err := m.LockNewJob(context.Background(), se, testTable, time.Now(), false) require.NoError(t, err) - job.Finish(se, time.Now()) + job.Finish(se, time.Now(), "") // lock one table in parallel, only one of them should lock successfully testTimes := 100 @@ -102,12 +103,13 @@ func TestParallelLockNewJob(t *testing.T) { wg.Wait() require.Equal(t, uint64(1), successCounter.Load()) - successJob.Finish(se, time.Now()) + successJob.Finish(se, time.Now(), "") } } func TestFinishJob(t *testing.T) { - store := testkit.CreateMockStore(t) + store, dom := testkit.CreateMockStoreAndDomain(t) + waitAndStopTTLManager(t, dom) tk := testkit.NewTestKit(t, store) sessionFactory := sessionFactory(t, store) @@ -122,10 +124,10 @@ func TestFinishJob(t *testing.T) { se := sessionFactory() job, err := m.LockNewJob(context.Background(), se, testTable, time.Now(), false) require.NoError(t, err) - job.SetScanErr(errors.New(`"'an error message contains both single and double quote'"`)) - job.Finish(se, time.Now()) - tk.MustQuery("select table_id, last_job_summary from mysql.tidb_ttl_table_status").Check(testkit.Rows("2 {\"total_rows\":0,\"success_rows\":0,\"error_rows\":0,\"total_scan_task\":1,\"scheduled_scan_task\":0,\"finished_scan_task\":0,\"scan_task_err\":\"\\\"'an error message contains both single and double quote'\\\"\"}")) + summary := `{"total_rows":0,"scan_task_err":"\"'an error message contains both single and double quote'\""}` + job.Finish(se, time.Now(), summary) + tk.MustQuery("select table_id, last_job_summary from mysql.tidb_ttl_table_status").Check(testkit.Rows(`2 {"total_rows":0,"scan_task_err":"\"'an error message contains both single and double quote'\""}`)) tk.MustQuery("select * from mysql.tidb_ttl_task").Check(testkit.Rows()) } @@ -136,6 +138,8 @@ func TestTTLAutoAnalyze(t *testing.T) { defer failpoint.Disable("github.com/pingcap/tidb/ttl/ttlworker/update-status-table-cache-interval") failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/resize-workers-interval", fmt.Sprintf("return(%d)", time.Second)) defer failpoint.Disable("github.com/pingcap/tidb/ttl/ttlworker/resize-workers-interval") + failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/task-manager-loop-interval", fmt.Sprintf("return(%d)", time.Second)) + defer failpoint.Disable("github.com/pingcap/tidb/ttl/ttlworker/task-manager-loop-interval") originAutoAnalyzeMinCnt := handle.AutoAnalyzeMinCnt handle.AutoAnalyzeMinCnt = 0 @@ -189,6 +193,9 @@ func TestTTLAutoAnalyze(t *testing.T) { } func TestTriggerTTLJob(t *testing.T) { + failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/task-manager-loop-interval", fmt.Sprintf("return(%d)", time.Second)) + defer failpoint.Disable("github.com/pingcap/tidb/ttl/ttlworker/task-manager-loop-interval") + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Minute) defer cancel() @@ -306,3 +313,113 @@ func TestTTLJobDisable(t *testing.T) { } require.False(t, deleted) } + +func TestRescheduleJobs(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + sessionFactory := sessionFactory(t, store) + + waitAndStopTTLManager(t, dom) + + now := time.Now() + tk.MustExec("create table test.t (id int, created_at datetime) ttl = `created_at` + interval 1 minute ttl_job_interval = '1m'") + table, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnTTL) + + se := sessionFactory() + m := ttlworker.NewJobManager("manager-1", nil, store, nil) + m.TaskManager().ResizeWorkersWithSysVar() + require.NoError(t, m.InfoSchemaCache().Update(se)) + // schedule jobs + m.RescheduleJobs(se, now) + sql, args := cache.SelectFromTTLTableStatusWithID(table.Meta().ID) + rows, err := se.ExecuteSQL(ctx, sql, args...) + require.NoError(t, err) + tableStatus, err := cache.RowToTableStatus(se, rows[0]) + require.NoError(t, err) + + originalJobID := tableStatus.CurrentJobID + require.NotEmpty(t, originalJobID) + require.Equal(t, "manager-1", tableStatus.CurrentJobOwnerID) + // there is already a task + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1")) + + // another manager should get this job, if the heart beat is not updated + anotherManager := ttlworker.NewJobManager("manager-2", nil, store, nil) + anotherManager.TaskManager().ResizeWorkersWithSysVar() + require.NoError(t, anotherManager.InfoSchemaCache().Update(se)) + anotherManager.RescheduleJobs(se, now.Add(time.Hour)) + sql, args = cache.SelectFromTTLTableStatusWithID(table.Meta().ID) + rows, err = se.ExecuteSQL(ctx, sql, args...) + require.NoError(t, err) + tableStatus, err = cache.RowToTableStatus(se, rows[0]) + require.NoError(t, err) + + // but the orignal job should be inherited + require.NotEmpty(t, tableStatus.CurrentJobID) + require.Equal(t, "manager-2", tableStatus.CurrentJobOwnerID) + require.Equal(t, originalJobID, tableStatus.CurrentJobID) + + // if the time leaves the time window, it'll finish the job + tk.MustExec("set global tidb_ttl_job_schedule_window_start_time='23:58'") + tk.MustExec("set global tidb_ttl_job_schedule_window_end_time='23:59'") + anotherManager.RescheduleJobs(se, time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, now.Nanosecond(), now.Location())) + tk.MustQuery("select last_job_summary->>'$.scan_task_err' from mysql.tidb_ttl_table_status").Check(testkit.Rows("ttl job is disabled")) +} + +func TestJobTimeout(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + sessionFactory := sessionFactory(t, store) + + waitAndStopTTLManager(t, dom) + + now := time.Now() + tk.MustExec("create table test.t (id int, created_at datetime) ttl = `created_at` + interval 1 minute ttl_job_interval = '1m'") + table, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnTTL) + + se := sessionFactory() + m := ttlworker.NewJobManager("manager-1", nil, store, nil) + m.TaskManager().ResizeWorkersWithSysVar() + require.NoError(t, m.InfoSchemaCache().Update(se)) + // schedule jobs + m.RescheduleJobs(se, now) + // set the worker to be empty, so none of the tasks will be scheduled + m.TaskManager().SetScanWorkers4Test([]ttlworker.Worker{}) + + sql, args := cache.SelectFromTTLTableStatusWithID(table.Meta().ID) + rows, err := se.ExecuteSQL(ctx, sql, args...) + require.NoError(t, err) + tableStatus, err := cache.RowToTableStatus(se, rows[0]) + require.NoError(t, err) + + require.NotEmpty(t, tableStatus.CurrentJobID) + require.Equal(t, "manager-1", tableStatus.CurrentJobOwnerID) + // there is already a task + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("1")) + + // the timeout will be checked while updating heartbeat + require.NoError(t, m.UpdateHeartBeat(ctx, se, now.Add(7*time.Hour))) + tk.MustQuery("select last_job_summary->>'$.scan_task_err' from mysql.tidb_ttl_table_status").Check(testkit.Rows("job is timeout")) + tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0")) +} + +func waitAndStopTTLManager(t *testing.T, dom *domain.Domain) { + maxWaitTime := 30 + for { + maxWaitTime-- + if maxWaitTime < 0 { + require.Fail(t, "fail to stop ttl manager") + } + if dom.TTLJobManager() != nil { + dom.TTLJobManager().Stop() + require.NoError(t, dom.TTLJobManager().WaitStopped(context.Background(), time.Second*10)) + return + } + time.Sleep(time.Second) + continue + } +} diff --git a/ttl/ttlworker/job_manager_test.go b/ttl/ttlworker/job_manager_test.go index 97a84b3cb82a2..9e0211410591b 100644 --- a/ttl/ttlworker/job_manager_test.go +++ b/ttl/ttlworker/job_manager_test.go @@ -138,10 +138,6 @@ func newTTLTableStatusRows(status ...*cache.TableStatus) []chunk.Row { var updateStatusSQL = "SELECT LOW_PRIORITY table_id,parent_table_id,table_statistics,last_job_id,last_job_start_time,last_job_finish_time,last_job_ttl_expire,last_job_summary,current_job_id,current_job_owner_id,current_job_owner_addr,current_job_owner_hb_time,current_job_start_time,current_job_ttl_expire,current_job_state,current_job_status,current_job_status_update_time FROM mysql.tidb_ttl_table_status" -func (m *JobManager) SetScanWorkers4Test(workers []worker) { - m.scanWorkers = workers -} - // TTLJob exports the ttlJob for test type TTLJob = ttlJob @@ -160,21 +156,31 @@ func (m *JobManager) InfoSchemaCache() *cache.InfoSchemaCache { return m.infoSchemaCache } -func (j *ttlJob) Finish(se session.Session, now time.Time) { - j.finish(se, now) +// RescheduleJobs is an exported version of rescheduleJobs for test +func (m *JobManager) RescheduleJobs(se session.Session, now time.Time) { + m.rescheduleJobs(se, now) } -func (j *ttlJob) ID() string { - return j.id +// TaskManager is an exported getter of task manager for test +func (m *JobManager) TaskManager() *taskManager { + return m.taskManager +} + +// UpdateHeartBeat is an exported version of updateHeartBeat for test +func (m *JobManager) UpdateHeartBeat(ctx context.Context, se session.Session, now time.Time) error { + return m.updateHeartBeat(ctx, se, now) +} + +func (j *ttlJob) Finish(se session.Session, now time.Time, summary string) { + j.finish(se, now, summary) } -func (j *ttlJob) SetScanErr(err error) { - j.scanTaskErr = err +func (j *ttlJob) ID() string { + return j.id } func newMockTTLJob(tbl *cache.PhysicalTable, status cache.JobStatus) *ttlJob { - statistics := &ttlStatistics{} - return &ttlJob{tbl: tbl, ctx: context.Background(), statistics: statistics, status: status, tasks: []*ttlScanTask{{ctx: context.Background(), tbl: tbl, statistics: statistics}}} + return &ttlJob{tbl: tbl, status: status} } func TestReadyForNewJobTables(t *testing.T) { @@ -250,6 +256,12 @@ func TestLockNewTable(t *testing.T) { args, } } + getExecuteInfoForUpdate := func(sql string, args []interface{}) executeInfo { + return executeInfo{ + sql + " FOR UPDATE NOWAIT", + args, + } + } getExecuteInfoWithErr := func(sql string, args []interface{}, err error) executeInfo { require.NoError(t, err) return executeInfo{ @@ -275,7 +287,7 @@ func TestLockNewTable(t *testing.T) { }{ {"normal lock table", testPhysicalTable, []sqlExecute{ { - getExecuteInfo(cache.SelectFromTTLTableStatusWithID(1)), + getExecuteInfoForUpdate(cache.SelectFromTTLTableStatusWithID(1)), newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, }, { @@ -293,7 +305,7 @@ func TestLockNewTable(t *testing.T) { }, true, false}, {"select nothing", testPhysicalTable, []sqlExecute{ { - getExecuteInfo(cache.SelectFromTTLTableStatusWithID(1)), + getExecuteInfoForUpdate(cache.SelectFromTTLTableStatusWithID(1)), nil, nil, }, { @@ -301,7 +313,7 @@ func TestLockNewTable(t *testing.T) { nil, nil, }, { - getExecuteInfo(cache.SelectFromTTLTableStatusWithID(1)), + getExecuteInfoForUpdate(cache.SelectFromTTLTableStatusWithID(1)), newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, }, { @@ -319,7 +331,7 @@ func TestLockNewTable(t *testing.T) { }, true, false}, {"return error", testPhysicalTable, []sqlExecute{ { - getExecuteInfo(cache.SelectFromTTLTableStatusWithID(1)), + getExecuteInfoForUpdate(cache.SelectFromTTLTableStatusWithID(1)), newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, }, { @@ -366,63 +378,6 @@ func TestLockNewTable(t *testing.T) { } } -func TestResizeWorkers(t *testing.T) { - tbl := newMockTTLTbl(t, "t1") - - // scale workers - scanWorker1 := newMockScanWorker(t) - scanWorker1.Start() - scanWorker2 := newMockScanWorker(t) - - m := NewJobManager("test-id", nil, nil, nil) - m.sessPool = newMockSessionPool(t, tbl) - m.SetScanWorkers4Test([]worker{ - scanWorker1, - }) - newWorkers, _, err := m.resizeWorkers(m.scanWorkers, 2, func() worker { - return scanWorker2 - }) - assert.NoError(t, err) - assert.Len(t, newWorkers, 2) - scanWorker1.checkWorkerStatus(workerStatusRunning, true, nil) - scanWorker2.checkWorkerStatus(workerStatusRunning, true, nil) - - // shrink scan workers - scanWorker1 = newMockScanWorker(t) - scanWorker1.Start() - scanWorker2 = newMockScanWorker(t) - scanWorker2.Start() - - m = NewJobManager("test-id", nil, nil, nil) - m.sessPool = newMockSessionPool(t, tbl) - m.SetScanWorkers4Test([]worker{ - scanWorker1, - scanWorker2, - }) - - assert.NoError(t, m.resizeScanWorkers(1)) - scanWorker2.checkWorkerStatus(workerStatusStopped, false, nil) - - // shrink scan workers after job is run - scanWorker1 = newMockScanWorker(t) - scanWorker1.Start() - scanWorker2 = newMockScanWorker(t) - scanWorker2.Start() - - m = NewJobManager("test-id", nil, nil, nil) - m.sessPool = newMockSessionPool(t, tbl) - m.SetScanWorkers4Test([]worker{ - scanWorker1, - scanWorker2, - }) - m.runningJobs = append(m.runningJobs, &ttlJob{tbl: tbl}) - - scanWorker2.curTaskResult = &ttlScanTaskExecResult{task: &ttlScanTask{tbl: tbl}} - assert.NoError(t, m.resizeScanWorkers(1)) - scanWorker2.checkWorkerStatus(workerStatusStopped, false, nil) - assert.Equal(t, m.runningJobs[0].finishedScanTaskCounter, 1) -} - func TestLocalJobs(t *testing.T) { tbl1 := newMockTTLTbl(t, "t1") tbl1.ID = 1 @@ -431,7 +386,7 @@ func TestLocalJobs(t *testing.T) { m := NewJobManager("test-id", nil, nil, nil) m.sessPool = newMockSessionPool(t, tbl1, tbl2) - m.runningJobs = []*ttlJob{{tbl: tbl1, id: "1", ctx: context.Background()}, {tbl: tbl2, id: "2", ctx: context.Background()}} + m.runningJobs = []*ttlJob{{tbl: tbl1, id: "1"}, {tbl: tbl2, id: "2"}} m.tableStatusCache.Tables = map[int64]*cache.TableStatus{ tbl1.ID: { CurrentJobOwnerID: m.id, @@ -444,74 +399,22 @@ func TestLocalJobs(t *testing.T) { assert.Equal(t, m.localJobs()[0].id, "1") } -func TestRescheduleJobs(t *testing.T) { - tbl := newMockTTLTbl(t, "t1") - se := newMockSession(t, tbl) - - scanWorker1 := newMockScanWorker(t) - scanWorker1.Start() - scanWorker1.setOneRowResult(tbl, 2022) - scanWorker2 := newMockScanWorker(t) - scanWorker2.Start() - scanWorker2.setOneRowResult(tbl, 2022) - - m := NewJobManager("test-id", nil, nil, nil) - m.sessPool = newMockSessionPool(t, tbl) - m.SetScanWorkers4Test([]worker{ - scanWorker1, - scanWorker2, - }) - - // schedule local running job - m.tableStatusCache.Tables = map[int64]*cache.TableStatus{ - tbl.ID: { - CurrentJobOwnerID: m.id, - }, - } - m.runningJobs = []*ttlJob{newMockTTLJob(tbl, cache.JobStatusWaiting)} - m.rescheduleJobs(se, se.Now()) - scanWorker1.checkWorkerStatus(workerStatusRunning, false, m.runningJobs[0].tasks[0]) - scanWorker1.checkPollResult(false, "") - scanWorker2.checkWorkerStatus(workerStatusRunning, true, nil) - scanWorker2.checkPollResult(false, "") - - // then run reschedule multiple times, no job will be scheduled - m.rescheduleJobs(se, se.Now()) - m.rescheduleJobs(se, se.Now()) - scanWorker1.checkWorkerStatus(workerStatusRunning, false, m.runningJobs[0].tasks[0]) - scanWorker1.checkPollResult(false, "") - scanWorker2.checkWorkerStatus(workerStatusRunning, true, nil) - scanWorker2.checkPollResult(false, "") - - del := scanWorker1.pollDelTask() - assert.Equal(t, 1, len(del.rows)) - assert.Equal(t, 1, len(del.rows[0])) - assert.Equal(t, int64(2022), del.rows[0][0].GetInt64()) - - // then the task ends - msg := scanWorker1.waitNotifyScanTaskEnd() - assert.Same(t, m.runningJobs[0].tasks[0], msg.result.task) - assert.NoError(t, msg.result.err) - scanWorker1.checkWorkerStatus(workerStatusRunning, false, m.runningJobs[0].tasks[0]) - scanWorker1.checkPollResult(true, "") - scanWorker2.checkWorkerStatus(workerStatusRunning, true, nil) - scanWorker2.checkPollResult(false, "") -} - func TestRescheduleJobsOutOfWindow(t *testing.T) { + // TODO: use failpoint to mock return job, and schedule + tbl := newMockTTLTbl(t, "t1") se := newMockSession(t, tbl) - scanWorker1 := newMockScanWorker(t) + scanWorker1 := NewMockScanWorker(t) scanWorker1.Start() scanWorker1.setOneRowResult(tbl, 2022) - scanWorker2 := newMockScanWorker(t) + scanWorker2 := NewMockScanWorker(t) scanWorker2.Start() scanWorker2.setOneRowResult(tbl, 2022) m := NewJobManager("test-id", nil, nil, nil) m.sessPool = newMockSessionPool(t, tbl) - m.SetScanWorkers4Test([]worker{ + m.taskManager.SetScanWorkers4Test([]worker{ scanWorker1, scanWorker2, }) @@ -544,68 +447,8 @@ func TestRescheduleJobsOutOfWindow(t *testing.T) { // jobs will be scheduled within the time window now, _ = time.ParseInLocation(variable.FullDayTimeFormat, "12:02 +0000", time.UTC) m.rescheduleJobs(se, now) - scanWorker1.checkWorkerStatus(workerStatusRunning, false, m.runningJobs[0].tasks[0]) + //scanWorker1.checkWorkerStatus(workerStatusRunning, false, m.runningJobs[0].tasks[0]) scanWorker1.checkPollResult(false, "") scanWorker2.checkWorkerStatus(workerStatusRunning, true, nil) scanWorker2.checkPollResult(false, "") } - -func TestCheckFinishedJob(t *testing.T) { - tbl := newMockTTLTbl(t, "t1") - se := newMockSession(t, tbl) - - // cancelled job will be regarded as finished - m := NewJobManager("test-id", nil, nil, nil) - m.sessPool = newMockSessionPool(t, tbl) - m.runningJobs = []*ttlJob{newMockTTLJob(tbl, cache.JobStatusCancelled)} - m.checkFinishedJob(se, se.Now()) - assert.Len(t, m.runningJobs, 0) - - // a real finished job - finishedStatistics := &ttlStatistics{} - finishedStatistics.TotalRows.Store(1) - finishedStatistics.SuccessRows.Store(1) - m = NewJobManager("test-id", nil, nil, nil) - m.sessPool = newMockSessionPool(t, tbl) - m.runningJobs = []*ttlJob{newMockTTLJob(tbl, cache.JobStatusRunning)} - m.runningJobs[0].statistics = finishedStatistics - m.runningJobs[0].tasks[0].statistics = finishedStatistics - m.runningJobs[0].taskIter = 1 - m.runningJobs[0].finishedScanTaskCounter = 1 - - // meetArg records whether the sql statement uses the arg - meetArg := false - now := se.Now() - jobID := m.runningJobs[0].id - se.executeSQL = func(ctx context.Context, sql string, args ...interface{}) ([]chunk.Row, error) { - if len(args) > 1 { - meetArg = true - expectedSQL, expectedArgs := finishJobSQL(tbl.ID, now, "{\"total_rows\":1,\"success_rows\":1,\"error_rows\":0,\"total_scan_task\":1,\"scheduled_scan_task\":1,\"finished_scan_task\":1}", jobID) - assert.Equal(t, expectedSQL, sql) - assert.Equal(t, expectedArgs, args) - } - return nil, nil - } - m.checkFinishedJob(se, now) - assert.Len(t, m.runningJobs, 0) - assert.Equal(t, true, meetArg) - se.executeSQL = nil - - // check timeout job - now = se.Now() - createTime := now.Add(-20 * time.Hour) - m = NewJobManager("test-id", nil, nil, nil) - m.sessPool = newMockSessionPool(t, tbl) - m.runningJobs = []*ttlJob{ - { - ctx: context.Background(), - tbl: tbl, - status: cache.JobStatusRunning, - statistics: &ttlStatistics{}, - - createTime: createTime, - }, - } - m.checkFinishedJob(se, now) - assert.Len(t, m.runningJobs, 0) -} diff --git a/ttl/ttlworker/job_test.go b/ttl/ttlworker/job_test.go deleted file mode 100644 index 19075b905e22d..0000000000000 --- a/ttl/ttlworker/job_test.go +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ttlworker - -import ( - "context" - "testing" - - "github.com/pingcap/errors" - "github.com/pingcap/tidb/ttl/cache" - "github.com/pingcap/tidb/ttl/session" - "github.com/stretchr/testify/assert" -) - -func NewTTLJob(tbl *cache.PhysicalTable, id string, status cache.JobStatus) *ttlJob { - return &ttlJob{ - tbl: tbl, - id: id, - status: status, - } -} - -func (j *ttlJob) ChangeStatus(ctx context.Context, se session.Session, status cache.JobStatus) error { - return j.changeStatus(ctx, se, status) -} - -func TestIterScanTask(t *testing.T) { - tbl := newMockTTLTbl(t, "t1") - - job := &ttlJob{ - tbl: tbl, - tasks: []*ttlScanTask{{}}, - } - scanTask := job.peekScanTask() - assert.NotNil(t, scanTask) - assert.Len(t, job.tasks, 1) - - job.nextScanTask() - assert.True(t, job.AllSpawned()) -} - -func TestJobSummary(t *testing.T) { - statistics := &ttlStatistics{} - statistics.TotalRows.Store(1) - statistics.ErrorRows.Store(255) - statistics.SuccessRows.Store(128) - - job := &ttlJob{ - statistics: statistics, - tasks: []*ttlScanTask{{}}, - } - summary, err := job.summary() - assert.NoError(t, err) - assert.Equal(t, `{"total_rows":1,"success_rows":128,"error_rows":255,"total_scan_task":1,"scheduled_scan_task":0,"finished_scan_task":0}`, summary) - - job.taskIter += 1 - summary, err = job.summary() - assert.NoError(t, err) - assert.Equal(t, `{"total_rows":1,"success_rows":128,"error_rows":255,"total_scan_task":1,"scheduled_scan_task":1,"finished_scan_task":0}`, summary) - - job.finishedScanTaskCounter += 1 - summary, err = job.summary() - assert.NoError(t, err) - assert.Equal(t, `{"total_rows":1,"success_rows":128,"error_rows":255,"total_scan_task":1,"scheduled_scan_task":1,"finished_scan_task":1}`, summary) - - job.scanTaskErr = errors.New("test error") - summary, err = job.summary() - assert.NoError(t, err) - assert.Equal(t, `{"total_rows":1,"success_rows":128,"error_rows":255,"total_scan_task":1,"scheduled_scan_task":1,"finished_scan_task":1,"scan_task_err":"test error"}`, summary) -} diff --git a/ttl/ttlworker/scan.go b/ttl/ttlworker/scan.go index 48686ef87d2f3..ac1c1cd85ab50 100644 --- a/ttl/ttlworker/scan.go +++ b/ttl/ttlworker/scan.go @@ -74,9 +74,9 @@ func (s *ttlStatistics) String() string { type ttlScanTask struct { ctx context.Context + *cache.TTLTask + tbl *cache.PhysicalTable - expire time.Time - scanRange cache.ScanRange statistics *ttlStatistics } @@ -121,8 +121,8 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s terror.Log(err) }() - sess := newTableSession(rawSess, t.tbl, t.expire) - generator, err := sqlbuilder.NewScanQueryGenerator(t.tbl, t.expire, t.scanRange.Start, t.scanRange.End) + sess := newTableSession(rawSess, t.tbl, t.ExpireTime) + generator, err := sqlbuilder.NewScanQueryGenerator(t.tbl, t.ExpireTime, t.ScanRangeStart, t.ScanRangeEnd) if err != nil { return t.result(err) } @@ -195,7 +195,7 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s delTask := &ttlDeleteTask{ tbl: t.tbl, - expire: t.expire, + expire: t.ExpireTime, rows: lastResult, statistics: t.statistics, } @@ -234,10 +234,11 @@ func newScanWorker(delCh chan<- *ttlDeleteTask, notifyStateCh chan<- interface{} return w } -func (w *ttlScanWorker) Idle() bool { +func (w *ttlScanWorker) CouldSchedule() bool { w.Lock() defer w.Unlock() - return w.status == workerStatusRunning && w.curTask == nil + // see `Schedule`. If a `worker.CouldSchedule()` is true, `worker.Schedule` must success + return w.status == workerStatusRunning && w.curTask == nil && w.curTaskResult == nil } func (w *ttlScanWorker) Schedule(task *ttlScanTask) error { @@ -335,7 +336,7 @@ func (w *ttlScanWorker) handleScanTask(tracer *metrics.PhaseTracer, task *ttlSca type scanWorker interface { worker - Idle() bool + CouldSchedule() bool Schedule(*ttlScanTask) error PollTaskResult() *ttlScanTaskExecResult CurrentTask() *ttlScanTask diff --git a/ttl/ttlworker/scan_test.go b/ttl/ttlworker/scan_test.go index 66582084b18f3..1c280a5b0f2b2 100644 --- a/ttl/ttlworker/scan_test.go +++ b/ttl/ttlworker/scan_test.go @@ -36,7 +36,7 @@ type mockScanWorker struct { sessPoll *mockSessionPool } -func newMockScanWorker(t *testing.T) *mockScanWorker { +func NewMockScanWorker(t *testing.T) *mockScanWorker { w := &mockScanWorker{ t: t, delCh: make(chan *ttlDeleteTask), @@ -46,7 +46,7 @@ func newMockScanWorker(t *testing.T) *mockScanWorker { w.ttlScanWorker = newScanWorker(w.delCh, w.notifyCh, w.sessPoll) require.Equal(t, workerStatusCreated, w.Status()) - require.False(t, w.Idle()) + require.False(t, w.CouldSchedule()) result := w.PollTaskResult() require.Nil(t, result) return w @@ -54,7 +54,7 @@ func newMockScanWorker(t *testing.T) *mockScanWorker { func (w *mockScanWorker) checkWorkerStatus(status workerStatus, idle bool, curTask *ttlScanTask) { require.Equal(w.t, status, w.status) - require.Equal(w.t, idle, w.Idle()) + require.Equal(w.t, idle, w.CouldSchedule()) require.Same(w.t, curTask, w.CurrentTask()) } @@ -98,7 +98,7 @@ func (w *mockScanWorker) pollDelTask() *ttlDeleteTask { require.NotNil(w.t, del) require.NotNil(w.t, del.statistics) require.Same(w.t, w.curTask.tbl, del.tbl) - require.Equal(w.t, w.curTask.expire, del.expire) + require.Equal(w.t, w.curTask.ExpireTime, del.expire) require.NotEqual(w.t, 0, len(del.rows)) return del case <-time.After(10 * time.Second): @@ -129,14 +129,16 @@ func TestScanWorkerSchedule(t *testing.T) { defer variable.TTLScanBatchSize.Store(origLimit) tbl := newMockTTLTbl(t, "t1") - w := newMockScanWorker(t) + w := NewMockScanWorker(t) w.setOneRowResult(tbl, 7) defer w.stopWithWait() task := &ttlScanTask{ - ctx: context.Background(), - tbl: tbl, - expire: time.UnixMilli(0), + ctx: context.Background(), + tbl: tbl, + TTLTask: &cache.TTLTask{ + ExpireTime: time.UnixMilli(0), + }, statistics: &ttlStatistics{}, } @@ -176,14 +178,16 @@ func TestScanWorkerScheduleWithFailedTask(t *testing.T) { defer variable.TTLScanBatchSize.Store(origLimit) tbl := newMockTTLTbl(t, "t1") - w := newMockScanWorker(t) + w := NewMockScanWorker(t) w.clearInfoSchema() defer w.stopWithWait() task := &ttlScanTask{ - ctx: context.Background(), - tbl: tbl, - expire: time.UnixMilli(0), + ctx: context.Background(), + tbl: tbl, + TTLTask: &cache.TTLTask{ + ExpireTime: time.UnixMilli(0), + }, statistics: &ttlStatistics{}, } @@ -221,11 +225,11 @@ func newMockScanTask(t *testing.T, sqlCnt int) *mockScanTask { task := &mockScanTask{ t: t, ttlScanTask: &ttlScanTask{ - ctx: context.Background(), - tbl: tbl, - expire: time.UnixMilli(0), - scanRange: cache.ScanRange{ - Start: []types.Datum{types.NewIntDatum(0)}, + ctx: context.Background(), + tbl: tbl, + TTLTask: &cache.TTLTask{ + ExpireTime: time.UnixMilli(0), + ScanRangeStart: []types.Datum{types.NewIntDatum(0)}, }, statistics: &ttlStatistics{}, }, @@ -307,7 +311,7 @@ loop: require.NotNil(t.t, del.statistics) require.Same(t.t, t.statistics, del.statistics) require.Same(t.t, t.tbl, del.tbl) - require.Equal(t.t, t.expire, del.expire) + require.Equal(t.t, t.ExpireTime, del.expire) if i < len(t.sqlRetry)-1 { require.Equal(t.t, 3, len(del.rows)) require.Equal(t.t, 1, len(del.rows[2])) diff --git a/ttl/ttlworker/session.go b/ttl/ttlworker/session.go index 4e694e32a22e1..d3e0c940e78c7 100644 --- a/ttl/ttlworker/session.go +++ b/ttl/ttlworker/session.go @@ -185,7 +185,7 @@ func (s *ttlTableSession) ExecuteSQLWithCheck(ctx context.Context, sql string) ( result = rows return nil - }) + }, session.TxnModeOptimistic) if err != nil { return nil, shouldRetry, err diff --git a/ttl/ttlworker/session_test.go b/ttl/ttlworker/session_test.go index eb7f03ca81276..335f6a5701a99 100644 --- a/ttl/ttlworker/session_test.go +++ b/ttl/ttlworker/session_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/ttl/cache" + "github.com/pingcap/tidb/ttl/session" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/stretchr/testify/require" @@ -186,7 +187,7 @@ func (s *mockSession) ExecuteSQL(ctx context.Context, sql string, args ...interf return s.rows, s.execErr } -func (s *mockSession) RunInTxn(_ context.Context, fn func() error) error { +func (s *mockSession) RunInTxn(_ context.Context, fn func() error, _ session.TxnMode) error { require.False(s.t, s.closed) if err := fn(); err != nil { return err diff --git a/ttl/ttlworker/task_manager.go b/ttl/ttlworker/task_manager.go new file mode 100644 index 0000000000000..f20e5cee4f3f6 --- /dev/null +++ b/ttl/ttlworker/task_manager.go @@ -0,0 +1,485 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttlworker + +import ( + "context" + "encoding/json" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/ttl/cache" + "github.com/pingcap/tidb/ttl/session" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/multierr" + "go.uber.org/zap" +) + +const setTTLTaskOwnerTemplate = `UPDATE mysql.tidb_ttl_task + SET owner_id = %?, + owner_hb_time = %?, + status = 'running', + status_update_time = %? + WHERE job_id = %? AND scan_id = %?` + +func setTTLTaskOwnerSQL(jobID string, scanID int64, id string, now time.Time) (string, []interface{}) { + return setTTLTaskOwnerTemplate, []interface{}{id, now.Format(timeFormat), now.Format(timeFormat), jobID, scanID} +} + +const setTTLTaskFinishedTemplate = `UPDATE mysql.tidb_ttl_task + SET status = 'finished', + status_update_time = %?, + state = %? + WHERE job_id = %? AND scan_id = %?` + +func setTTLTaskFinishedSQL(jobID string, scanID int64, state *cache.TTLTaskState, now time.Time) (string, []interface{}, error) { + stateStr, err := json.Marshal(state) + if err != nil { + return "", nil, err + } + return setTTLTaskFinishedTemplate, []interface{}{now.Format(timeFormat), string(stateStr), jobID, scanID}, nil +} + +const updateTTLTaskHeartBeatTempalte = `UPDATE mysql.tidb_ttl_task + SET state = %?, + owner_hb_time = %? + WHERE job_id = %? AND scan_id = %?` + +func updateTTLTaskHeartBeatSQL(jobID string, scanID int64, now time.Time, state *cache.TTLTaskState) (string, []interface{}, error) { + stateStr, err := json.Marshal(state) + if err != nil { + return "", nil, err + } + return updateTTLTaskHeartBeatTempalte, []interface{}{string(stateStr), now.Format(timeFormat), jobID, scanID}, nil +} + +// taskManager schedules and manages the ttl tasks on this instance +type taskManager struct { + ctx context.Context + sessPool sessionPool + + id string + + scanWorkers []worker + delWorkers []worker + + infoSchemaCache *cache.InfoSchemaCache + runningTasks []*runningScanTask + + delCh chan *ttlDeleteTask + notifyStateCh chan interface{} +} + +func newTaskManager(ctx context.Context, sessPool sessionPool, infoSchemaCache *cache.InfoSchemaCache, id string) *taskManager { + return &taskManager{ + ctx: logutil.WithKeyValue(ctx, "ttl-worker", "task-manager"), + sessPool: sessPool, + + id: id, + + scanWorkers: []worker{}, + delWorkers: []worker{}, + + infoSchemaCache: infoSchemaCache, + runningTasks: []*runningScanTask{}, + + delCh: make(chan *ttlDeleteTask), + notifyStateCh: make(chan interface{}, 1), + } +} + +func (m *taskManager) resizeWorkersWithSysVar() { + err := m.resizeScanWorkers(int(variable.TTLScanWorkerCount.Load())) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to resize scan workers", zap.Error(err)) + } + err = m.resizeDelWorkers(int(variable.TTLDeleteWorkerCount.Load())) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to resize delete workers", zap.Error(err)) + } +} + +func (m *taskManager) resizeScanWorkers(count int) error { + var err error + var canceledWorkers []worker + m.scanWorkers, canceledWorkers, err = m.resizeWorkers(m.scanWorkers, count, func() worker { + return newScanWorker(m.delCh, m.notifyStateCh, m.sessPool) + }) + for _, w := range canceledWorkers { + s := w.(scanWorker) + + var jobID string + var scanID int64 + var scanErr error + result := s.PollTaskResult() + if result != nil { + jobID = result.task.JobID + scanID = result.task.ScanID + + scanErr = result.err + } else { + // if the scan worker failed to poll the task, it's possible that the `WaitStopped` has timeout + // we still consider the scan task as finished + curTask := s.CurrentTask() + if curTask == nil { + continue + } + jobID = curTask.JobID + scanID = curTask.ScanID + scanErr = errors.New("timeout to cancel scan task") + } + + task := findTaskWithID(m.runningTasks, jobID, scanID) + if task == nil { + logutil.Logger(m.ctx).Warn("task state changed but job not found", zap.String("jobID", jobID), zap.Int64("scanID", scanID)) + continue + } + logutil.Logger(m.ctx).Debug("scan task finished", zap.String("jobID", task.JobID), zap.Int64("taskID", task.ScanID), zap.Error(scanErr)) + + task.result = result + } + return err +} + +func findTaskWithID(tasks []*runningScanTask, jobID string, scanID int64) *runningScanTask { + for _, t := range tasks { + if t.ScanID == scanID && t.JobID == jobID { + return t + } + } + + return nil +} + +func (m *taskManager) resizeDelWorkers(count int) error { + var err error + m.delWorkers, _, err = m.resizeWorkers(m.delWorkers, count, func() worker { + return newDeleteWorker(m.delCh, m.sessPool) + }) + return err +} + +// resizeWorkers scales the worker, and returns the full set of workers as the first return value. If there are workers +// stopped, return the stopped worker in the second return value +func (m *taskManager) resizeWorkers(workers []worker, count int, factory func() worker) ([]worker, []worker, error) { + if count < len(workers) { + logutil.Logger(m.ctx).Info("shrink ttl worker", zap.Int("originalCount", len(workers)), zap.Int("newCount", count)) + + for _, w := range workers[count:] { + w.Stop() + } + + var errs error + // don't use `m.ctx` here, because when shutdown the server, `m.ctx` has already been cancelled + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + for _, w := range workers[count:] { + err := w.WaitStopped(ctx, 30*time.Second) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to stop ttl worker", zap.Error(err)) + errs = multierr.Append(errs, err) + } + } + cancel() + + // remove the existing workers, and keep the left workers + return workers[:count], workers[count:], errs + } + + if count > len(workers) { + logutil.Logger(m.ctx).Info("scale ttl worker", zap.Int("originalCount", len(workers)), zap.Int("newCount", count)) + + for i := len(workers); i < count; i++ { + w := factory() + w.Start() + workers = append(workers, w) + } + return workers, nil, nil + } + + return workers, nil, nil +} + +// handleScanFinishedTask polls the result from scan worker and returns whether there are result polled +func (m *taskManager) handleScanFinishedTask() bool { + results := m.pollScanWorkerResults() + for _, result := range results { + logger := logutil.Logger(m.ctx).With(zap.Int64("tableID", result.task.tbl.ID), zap.String("jobID", result.task.JobID), zap.Int64("scanID", result.task.ScanID)) + if result.err != nil { + logger = logger.With(zap.Error(result.err)) + } + + task := findTaskWithID(m.runningTasks, result.task.JobID, result.task.ScanID) + if task == nil { + logger.Warn("task state changed but task not found") + continue + } + logger.Info("task scans finished") + task.result = result + } + + return len(results) > 0 +} + +func (m *taskManager) pollScanWorkerResults() []*ttlScanTaskExecResult { + results := make([]*ttlScanTaskExecResult, 0, len(m.scanWorkers)) + for _, w := range m.scanWorkers { + worker := w.(scanWorker) + result := worker.PollTaskResult() + if result != nil { + results = append(results, result) + } + } + + return results +} + +func (m *taskManager) idleScanWorkers() []scanWorker { + workers := make([]scanWorker, 0, len(m.scanWorkers)) + for _, w := range m.scanWorkers { + if w.(scanWorker).CouldSchedule() { + workers = append(workers, w.(scanWorker)) + } + } + return workers +} + +func (m *taskManager) rescheduleTasks(se session.Session, now time.Time) { + idleScanWorkers := m.idleScanWorkers() + if len(idleScanWorkers) == 0 { + return + } + + for len(idleScanWorkers) > 0 { + tasks, err := m.peekWaitingScanTasks(se, len(idleScanWorkers), now) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to peek scan task", zap.Error(err)) + return + } + + if len(tasks) == 0 { + break + } + + for _, t := range tasks { + logger := logutil.Logger(m.ctx).With(zap.String("jobID", t.JobID), zap.Int64("scanID", t.ScanID)) + + task, err := m.lockScanTask(se, t, now) + if err != nil { + // If other nodes lock the task, it will return an error. It's expected + // so the log level is only `info` + logutil.Logger(m.ctx).Info("fail to lock scan task", zap.Error(err)) + continue + } + + idleWorker := idleScanWorkers[0] + idleScanWorkers = idleScanWorkers[1:] + + err = idleWorker.Schedule(task.ttlScanTask) + if err != nil { + logger.Warn("fail to schedule task", zap.Error(err)) + continue + } + + logger.Info("scheduled ttl task") + m.runningTasks = append(m.runningTasks, task) + } + } +} + +func (m *taskManager) peekWaitingScanTasks(se session.Session, limit int, now time.Time) ([]*cache.TTLTask, error) { + sql, args := cache.PeekWaitingTTLTask(limit, now.Add(-2*ttlTaskHeartBeatTickerInterval)) + rows, err := se.ExecuteSQL(m.ctx, sql, args...) + if err != nil { + return nil, errors.Wrapf(err, "execute sql: %s", sql) + } + + tasks := make([]*cache.TTLTask, 0, len(rows)) + for _, r := range rows { + task, err := cache.RowToTTLTask(se, r) + if err != nil { + return nil, err + } + tasks = append(tasks, task) + } + + return tasks, nil +} + +func (m *taskManager) lockScanTask(se session.Session, task *cache.TTLTask, now time.Time) (*runningScanTask, error) { + ctx := m.ctx + + table, ok := m.infoSchemaCache.Tables[task.TableID] + if !ok { + return nil, errors.Errorf("didn't find table with id: %d", task.TableID) + } + + err := se.RunInTxn(ctx, func() error { + sql, args := cache.SelectFromTTLTaskWithID(task.JobID, task.ScanID) + rows, err := se.ExecuteSQL(ctx, sql+" FOR UPDATE NOWAIT", args...) + if err != nil { + return errors.Wrapf(err, "execute sql: %s", sql) + } + if len(rows) == 0 { + return errors.Errorf("didn't find task with jobID: %s, scanID: %d", task.JobID, task.ScanID) + } + task, err = cache.RowToTTLTask(se, rows[0]) + if err != nil { + return err + } + if task.OwnerID != "" && !task.OwnerHBTime.Add(2*jobManagerLoopTickerInterval).Before(now) { + return errors.New("task is already scheduled") + } + + sql, args = setTTLTaskOwnerSQL(task.JobID, task.ScanID, m.id, now) + _, err = se.ExecuteSQL(ctx, sql, args...) + if err != nil { + return errors.Wrapf(err, "execute sql: %s", sql) + } + + return nil + }, session.TxnModePessimistic) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(m.ctx) + scanTask := &ttlScanTask{ + ctx: ctx, + + TTLTask: task, + + tbl: table, + statistics: &ttlStatistics{}, + } + return &runningScanTask{ + scanTask, + cancel, + nil, + }, nil +} + +// updateHeartBeat updates the heartbeat for all tasks with current instance as owner +func (m *taskManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) error { + for _, task := range m.runningTasks { + state := &cache.TTLTaskState{ + TotalRows: task.statistics.TotalRows.Load(), + SuccessRows: task.statistics.SuccessRows.Load(), + ErrorRows: task.statistics.ErrorRows.Load(), + } + if task.result != nil && task.result.err != nil { + state.ScanTaskErr = task.result.err.Error() + } + + sql, args, err := updateTTLTaskHeartBeatSQL(task.JobID, task.ScanID, now, state) + if err != nil { + return err + } + _, err = se.ExecuteSQL(ctx, sql, args...) + if err != nil { + return errors.Wrapf(err, "execute sql: %s", sql) + } + } + return nil +} + +func (m *taskManager) checkFinishedTask(se session.Session, now time.Time) { + stillRunningTasks := make([]*runningScanTask, 0, len(m.runningTasks)) + for _, task := range m.runningTasks { + if !task.finished() { + stillRunningTasks = append(stillRunningTasks, task) + continue + } + err := m.reportTaskFinished(se, now, task) + if err != nil { + logutil.Logger(m.ctx).Error("fail to report finished task", zap.Error(err)) + stillRunningTasks = append(stillRunningTasks, task) + continue + } + } + + m.runningTasks = stillRunningTasks +} + +func (m *taskManager) reportTaskFinished(se session.Session, now time.Time, task *runningScanTask) error { + state := &cache.TTLTaskState{ + TotalRows: task.statistics.TotalRows.Load(), + SuccessRows: task.statistics.SuccessRows.Load(), + ErrorRows: task.statistics.ErrorRows.Load(), + } + if task.result.err != nil { + state.ScanTaskErr = task.result.err.Error() + } + + sql, args, err := setTTLTaskFinishedSQL(task.JobID, task.ScanID, state, now) + if err != nil { + return err + } + + timeoutCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout) + _, err = se.ExecuteSQL(timeoutCtx, sql, args...) + cancel() + if err != nil { + return err + } + + return nil +} + +// checkInvalidTask removes the task whose owner is not myself or which has disappeared +func (m *taskManager) checkInvalidTask(se session.Session) { + // TODO: optimize this function through cache or something else + ownRunningTask := make([]*runningScanTask, 0, len(m.runningTasks)) + + for _, task := range m.runningTasks { + sql, args := cache.SelectFromTTLTaskWithID(task.JobID, task.ScanID) + + timeoutCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout) + rows, err := se.ExecuteSQL(timeoutCtx, sql, args...) + cancel() + if err != nil { + logutil.Logger(m.ctx).Warn("fail to execute sql", zap.String("sql", sql), zap.Any("args", args), zap.Error(err)) + task.cancel() + continue + } + if len(rows) == 0 { + logutil.Logger(m.ctx).Warn("didn't find task", zap.String("jobID", task.JobID), zap.Int64("scanID", task.ScanID)) + task.cancel() + continue + } + t, err := cache.RowToTTLTask(se, rows[0]) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to get task", zap.Error(err)) + task.cancel() + continue + } + + if t.OwnerID == m.id { + ownRunningTask = append(ownRunningTask, task) + } + } + + m.runningTasks = ownRunningTask +} + +type runningScanTask struct { + *ttlScanTask + cancel func() + result *ttlScanTaskExecResult +} + +func (t *runningScanTask) finished() bool { + return t.result != nil && t.statistics.TotalRows.Load() == t.statistics.ErrorRows.Load()+t.statistics.SuccessRows.Load() +} diff --git a/ttl/ttlworker/task_manager_integration_test.go b/ttl/ttlworker/task_manager_integration_test.go new file mode 100644 index 0000000000000..8b7d0df5257b0 --- /dev/null +++ b/ttl/ttlworker/task_manager_integration_test.go @@ -0,0 +1,187 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttlworker_test + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/ttl/cache" + "github.com/pingcap/tidb/ttl/ttlworker" + "github.com/pingcap/tidb/util/logutil" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + "go.uber.org/zap" +) + +func TestParallelLockNewTask(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnTTL) + tk.MustExec("create table test.t (id int, created_at datetime) TTL= created_at + interval 1 hour") + testTable, err := tk.Session().GetDomainInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + + sessionFactory := sessionFactory(t, store) + se := sessionFactory() + + now := time.Now() + + isc := cache.NewInfoSchemaCache(time.Minute) + require.NoError(t, isc.Update(se)) + m := ttlworker.NewTaskManager(context.Background(), nil, isc, "test-id") + + // insert and lock a new task + sql, args, err := cache.InsertIntoTTLTask(tk.Session(), "test-job", testTable.Meta().ID, 1, nil, nil, now, now) + require.NoError(t, err) + _, err = tk.Session().ExecuteInternal(ctx, sql, args...) + require.NoError(t, err) + _, err = m.LockScanTask(se, &cache.TTLTask{ + ScanID: 1, + JobID: "test-job", + TableID: testTable.Meta().ID, + }, now) + require.NoError(t, err) + tk.MustExec("DELETE FROM mysql.tidb_ttl_task") + + // lock one table in parallel, only one of them should lock successfully + testTimes := 100 + concurrency := 5 + for i := 0; i < testTimes; i++ { + sql, args, err := cache.InsertIntoTTLTask(tk.Session(), "test-job", testTable.Meta().ID, 1, nil, nil, now, now) + require.NoError(t, err) + _, err = tk.Session().ExecuteInternal(ctx, sql, args...) + require.NoError(t, err) + + successCounter := atomic.NewUint64(0) + + now = now.Add(time.Hour * 48) + + wg := sync.WaitGroup{} + for j := 0; j < concurrency; j++ { + scanManagerID := fmt.Sprintf("test-ttl-manager-%d", j) + wg.Add(1) + go func() { + se := sessionFactory() + + isc := cache.NewInfoSchemaCache(time.Minute) + require.NoError(t, isc.Update(se)) + m := ttlworker.NewTaskManager(context.Background(), nil, isc, scanManagerID) + + _, err := m.LockScanTask(se, &cache.TTLTask{ + ScanID: 1, + JobID: "test-job", + TableID: testTable.Meta().ID, + }, now) + if err == nil { + successCounter.Add(1) + } else { + logutil.BgLogger().Error("lock new task with error", zap.Error(err)) + } + wg.Done() + }() + } + wg.Wait() + + require.Equal(t, uint64(1), successCounter.Load()) + tk.MustExec("DELETE FROM mysql.tidb_ttl_task") + } +} + +func TestParallelSchedule(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + waitAndStopTTLManager(t, dom) + tk := testkit.NewTestKit(t, store) + sessionFactory := sessionFactory(t, store) + + tk.MustExec("create table test.t(id int, created_at datetime) ttl=created_at + interval 1 day") + table, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + // 16 tasks and 16 scan workers (in 4 task manager) should be able to be scheduled in a single "reschedule" + for i := 0; i < 16; i++ { + sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", table.Meta().ID, i) + tk.MustExec(sql) + } + isc := cache.NewInfoSchemaCache(time.Second) + require.NoError(t, isc.Update(sessionFactory())) + now := time.Now() + scheduleWg := sync.WaitGroup{} + for i := 0; i < 4; i++ { + workers := []ttlworker.Worker{} + for j := 0; j < 4; j++ { + scanWorker := ttlworker.NewMockScanWorker(t) + scanWorker.Start() + workers = append(workers, scanWorker) + } + + m := ttlworker.NewTaskManager(context.Background(), nil, isc, fmt.Sprintf("task-manager-%d", i)) + m.SetScanWorkers4Test(workers) + scheduleWg.Add(1) + go func() { + se := sessionFactory() + m.RescheduleTasks(se, now) + scheduleWg.Done() + }() + } + scheduleWg.Wait() + // all tasks should have been scheduled + tk.MustQuery("select count(1) from mysql.tidb_ttl_task where status = 'running'").Check(testkit.Rows("16")) + for i := 0; i < 4; i++ { + sql := fmt.Sprintf("select count(1) from mysql.tidb_ttl_task where status = 'running' AND owner_id = 'task-manager-%d'", i) + tk.MustQuery(sql).Check(testkit.Rows("4")) + } +} + +func TestTaskScheduleExpireHeartBeat(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + waitAndStopTTLManager(t, dom) + tk := testkit.NewTestKit(t, store) + sessionFactory := sessionFactory(t, store) + + // create table and scan task + tk.MustExec("create table test.t(id int, created_at datetime) ttl=created_at + interval 1 day") + table, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", table.Meta().ID, 1) + tk.MustExec(sql) + + // update the infoschema cache + isc := cache.NewInfoSchemaCache(time.Second) + require.NoError(t, isc.Update(sessionFactory())) + now := time.Now() + + // schedule in a task manager + scanWorker := ttlworker.NewMockScanWorker(t) + scanWorker.Start() + m := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-1") + m.SetScanWorkers4Test([]ttlworker.Worker{scanWorker}) + m.RescheduleTasks(sessionFactory(), now) + tk.MustQuery("select status,owner_id from mysql.tidb_ttl_task").Check(testkit.Rows("running task-manager-1")) + + // another task manager should fetch this task after heartbeat expire + scanWorker2 := ttlworker.NewMockScanWorker(t) + scanWorker2.Start() + m2 := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-2") + m2.SetScanWorkers4Test([]ttlworker.Worker{scanWorker2}) + m2.RescheduleTasks(sessionFactory(), now.Add(time.Hour)) + tk.MustQuery("select status,owner_id from mysql.tidb_ttl_task").Check(testkit.Rows("running task-manager-2")) +} diff --git a/ttl/ttlworker/task_manager_test.go b/ttl/ttlworker/task_manager_test.go new file mode 100644 index 0000000000000..9241146b719b3 --- /dev/null +++ b/ttl/ttlworker/task_manager_test.go @@ -0,0 +1,118 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttlworker + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/tidb/ttl/cache" + "github.com/pingcap/tidb/ttl/session" + "github.com/stretchr/testify/assert" +) + +// NewTaskManager is an exported version of newTaskManager for test +var NewTaskManager = newTaskManager + +// Worker is an exported version of worker +type Worker = worker + +func (m *taskManager) SetScanWorkers4Test(workers []worker) { + m.scanWorkers = workers +} + +// LockScanTask is an exported version of lockScanTask +func (m *taskManager) LockScanTask(se session.Session, task *cache.TTLTask, now time.Time) (*runningScanTask, error) { + return m.lockScanTask(se, task, now) +} + +// ResizeWorkersWithSysVar is an exported version of resizeWorkersWithSysVar +func (m *taskManager) ResizeWorkersWithSysVar() { + m.resizeWorkersWithSysVar() +} + +// RescheduleTasks is an exported version of rescheduleTasks +func (m *taskManager) RescheduleTasks(se session.Session, now time.Time) { + m.rescheduleTasks(se, now) +} + +func TestResizeWorkers(t *testing.T) { + tbl := newMockTTLTbl(t, "t1") + + // scale workers + scanWorker1 := NewMockScanWorker(t) + scanWorker1.Start() + scanWorker2 := NewMockScanWorker(t) + + m := newTaskManager(context.Background(), nil, nil, "test-id") + m.sessPool = newMockSessionPool(t, tbl) + m.SetScanWorkers4Test([]worker{ + scanWorker1, + }) + newWorkers, _, err := m.resizeWorkers(m.scanWorkers, 2, func() worker { + return scanWorker2 + }) + assert.NoError(t, err) + assert.Len(t, newWorkers, 2) + scanWorker1.checkWorkerStatus(workerStatusRunning, true, nil) + scanWorker2.checkWorkerStatus(workerStatusRunning, true, nil) + + // shrink scan workers + scanWorker1 = NewMockScanWorker(t) + scanWorker1.Start() + scanWorker2 = NewMockScanWorker(t) + scanWorker2.Start() + + m = newTaskManager(context.Background(), nil, nil, "test-id") + m.sessPool = newMockSessionPool(t, tbl) + m.SetScanWorkers4Test([]worker{ + scanWorker1, + scanWorker2, + }) + + assert.NoError(t, m.resizeScanWorkers(1)) + scanWorker2.checkWorkerStatus(workerStatusStopped, false, nil) + + // shrink scan workers after job is run + scanWorker1 = NewMockScanWorker(t) + scanWorker1.Start() + scanWorker2 = NewMockScanWorker(t) + scanWorker2.Start() + + m = newTaskManager(context.Background(), nil, nil, "test-id") + m.sessPool = newMockSessionPool(t, tbl) + m.SetScanWorkers4Test([]worker{ + scanWorker1, + scanWorker2, + }) + m.runningTasks = append(m.runningTasks, &runningScanTask{ + ttlScanTask: &ttlScanTask{ + tbl: tbl, + TTLTask: &cache.TTLTask{ + JobID: "test-job-id", + ScanID: 1, + }, + }, + }) + + scanWorker2.curTaskResult = &ttlScanTaskExecResult{task: &ttlScanTask{tbl: tbl, TTLTask: &cache.TTLTask{ + JobID: "test-job-id", + ScanID: 1, + }}} + assert.NoError(t, m.resizeScanWorkers(1)) + scanWorker2.checkWorkerStatus(workerStatusStopped, false, nil) + assert.NotNil(t, m.runningTasks[0].result) +} diff --git a/ttl/ttlworker/worker.go b/ttl/ttlworker/worker.go index 68ea0d9a1b952..c89d90cb1d061 100644 --- a/ttl/ttlworker/worker.go +++ b/ttl/ttlworker/worker.go @@ -125,7 +125,7 @@ func (w *baseWorker) loop() { var err error defer func() { if r := recover(); r != nil { - logutil.BgLogger().Info("ttl worker panic", zap.Any("recover", r)) + logutil.BgLogger().Info("ttl worker panic", zap.Any("recover", r), zap.Stack("stack")) } w.Lock() w.toStopped(err)