diff --git a/ttl/ttlworker/BUILD.bazel b/ttl/ttlworker/BUILD.bazel index bf00989883d4e..49085a387530c 100644 --- a/ttl/ttlworker/BUILD.bazel +++ b/ttl/ttlworker/BUILD.bazel @@ -59,7 +59,7 @@ go_test( embed = [":ttlworker"], flaky = True, race = "on", - shard_count = 37, + shard_count = 38, deps = [ "//domain", "//infoschema", @@ -88,6 +88,7 @@ go_test( "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//testutils", + "@org_golang_x_exp//slices", "@org_golang_x_time//rate", "@org_uber_go_atomic//:atomic", "@org_uber_go_zap//:zap", diff --git a/ttl/ttlworker/del.go b/ttl/ttlworker/del.go index a578f75adbd1e..ece8424a82cf0 100644 --- a/ttl/ttlworker/del.go +++ b/ttl/ttlworker/del.go @@ -91,6 +91,12 @@ func (t *ttlDeleteTask) doDelete(ctx context.Context, rawSe session.Session) (re tracer.EnterPhase(metrics.PhaseOther) leftRows := t.rows + defer func() { + if len(leftRows) > 0 { + t.statistics.IncErrorRows(len(leftRows)) + } + }() + se := newTableSession(rawSe, t.tbl, t.expire) for len(leftRows) > 0 { maxBatch := variable.TTLDeleteBatchSize.Load() @@ -207,6 +213,18 @@ func (b *ttlDelRetryBuffer) DoRetry(do func(*ttlDeleteTask) [][]types.Datum) tim return b.retryInterval } +// Drain drains a retry buffer. +func (b *ttlDelRetryBuffer) Drain() { + for ele := b.list.Front(); ele != nil; ele = ele.Next() { + if item, ok := ele.Value.(*ttlDelRetryItem); ok { + item.task.statistics.IncErrorRows(len(item.task.rows)) + } else { + logutil.BgLogger().Error(fmt.Sprintf("invalid retry buffer item type: %T", ele)) + } + } + b.list = list.New() +} + func (b *ttlDelRetryBuffer) recordRetryItem(task *ttlDeleteTask, retryRows [][]types.Datum, retryCnt int) bool { if len(retryRows) == 0 { return false @@ -276,6 +294,8 @@ func (w *ttlDeleteWorker) loop() error { timer := time.NewTimer(w.retryBuffer.retryInterval) defer timer.Stop() + // drain retry buffer to make sure the statistics are correct + defer w.retryBuffer.Drain() for w.Status() == workerStatusRunning { tracer.EnterPhase(metrics.PhaseIdle) select { diff --git a/ttl/ttlworker/del_test.go b/ttl/ttlworker/del_test.go index 524b45c9c8806..648fc2fa4ed58 100644 --- a/ttl/ttlworker/del_test.go +++ b/ttl/ttlworker/del_test.go @@ -17,6 +17,9 @@ package ttlworker import ( "context" "errors" + "fmt" + "math" + "strconv" "strings" "testing" "time" @@ -26,6 +29,7 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" "golang.org/x/time/rate" ) @@ -163,48 +167,56 @@ func TestTTLDelRetryBuffer(t *testing.T) { // test task should be immutable require.Equal(t, 10, len(task5.rows)) + + // test drain + require.Equal(t, 0, buffer.Len()) + task6, rows6, statics6 := createTask("t6") + buffer.RecordTaskResult(task6, rows6[:7]) + require.Equal(t, 1, buffer.Len()) + require.Equal(t, uint64(0), statics6.SuccessRows.Load()) + require.Equal(t, uint64(0), statics6.ErrorRows.Load()) + buffer.Drain() + require.Equal(t, 0, buffer.Len()) + require.Equal(t, uint64(0), statics6.SuccessRows.Load()) + require.Equal(t, uint64(7), statics6.ErrorRows.Load()) } func TestTTLDeleteTaskDoDelete(t *testing.T) { origBatchSize := variable.TTLDeleteBatchSize.Load() - variable.TTLDeleteBatchSize.Store(3) + delBatch := 3 + variable.TTLDeleteBatchSize.Store(int64(delBatch)) defer variable.TTLDeleteBatchSize.Store(origBatchSize) t1 := newMockTTLTbl(t, "t1") - t2 := newMockTTLTbl(t, "t2") - t3 := newMockTTLTbl(t, "t3") - t4 := newMockTTLTbl(t, "t4") s := newMockSession(t) - invokes := 0 - s.executeSQL = func(ctx context.Context, sql string, args ...interface{}) ([]chunk.Row, error) { - invokes++ - s.sessionInfoSchema = newMockInfoSchema(t1.TableInfo, t2.TableInfo, t3.TableInfo, t4.TableInfo) - if strings.Contains(sql, "`t1`") { - return nil, nil + var sqls []string + var retryErrBatches []int + var nonRetryBatches []int + var afterExecuteSQL func() + s.executeSQL = func(ctx context.Context, sql string, args ...any) ([]chunk.Row, error) { + s.sessionInfoSchema = newMockInfoSchema(t1.TableInfo) + sqls = append(sqls, sql) + + if !strings.Contains(sql, "`t1`") { + require.FailNow(t, "") } - if strings.Contains(sql, "`t2`") { + defer func() { + if afterExecuteSQL != nil { + afterExecuteSQL() + } + }() + + if slices.Contains(retryErrBatches, len(sqls)-1) { return nil, errors.New("mockErr") } - if strings.Contains(sql, "`t3`") { + if slices.Contains(nonRetryBatches, len(sqls)-1) { + // set an infoschema that contains no table to make an error that cannot retry s.sessionInfoSchema = newMockInfoSchema() return nil, nil } - if strings.Contains(sql, "`t4`") { - switch invokes { - case 1: - return nil, nil - case 2, 4: - return nil, errors.New("mockErr") - case 3: - s.sessionInfoSchema = newMockInfoSchema() - return nil, nil - } - } - - require.FailNow(t, "") return nil, nil } @@ -218,63 +230,117 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) { return rows } - delTask := func(t *cache.PhysicalTable) *ttlDeleteTask { + delTask := func(batchCnt int) *ttlDeleteTask { task := &ttlDeleteTask{ - tbl: t, + tbl: t1, expire: time.UnixMilli(0), - rows: nRows(10), + rows: nRows(batchCnt * delBatch), statistics: &ttlStatistics{}, } - task.statistics.TotalRows.Add(10) + task.statistics.TotalRows.Add(uint64(batchCnt * delBatch)) return task } cases := []struct { - task *ttlDeleteTask - retryRows []int - successRows int - errorRows int + batchCnt int + retryErrBatches []int + noRetryErrBatches []int + cancelCtx bool + cancelCtxBatch int }{ { - task: delTask(t1), - retryRows: nil, - successRows: 10, - errorRows: 0, + // all success + batchCnt: 10, + }, + { + // all retries + batchCnt: 10, + retryErrBatches: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, }, { - task: delTask(t2), - retryRows: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, - successRows: 0, - errorRows: 0, + // all errors without retry + batchCnt: 10, + noRetryErrBatches: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, }, { - task: delTask(t3), - retryRows: nil, - successRows: 0, - errorRows: 10, + // some retries and some not + batchCnt: 10, + noRetryErrBatches: []int{3, 8, 9}, + retryErrBatches: []int{1, 2, 4}, }, { - task: delTask(t4), - retryRows: []int{3, 4, 5, 9}, - successRows: 3, - errorRows: 3, + // some retries and some not + batchCnt: 10, + noRetryErrBatches: []int{3, 8, 9}, + retryErrBatches: []int{1, 2, 4}, + cancelCtx: true, + cancelCtxBatch: 6, }, } for _, c := range cases { - invokes = 0 - retryRows := c.task.doDelete(context.Background(), s) - require.Equal(t, 4, invokes) - if c.retryRows == nil { - require.Nil(t, retryRows) + ctx, cancel := context.WithCancel(context.Background()) + if c.cancelCtx && c.cancelCtxBatch == 0 { + cancel() } - require.Equal(t, len(c.retryRows), len(retryRows)) - for i, row := range retryRows { - require.Equal(t, int64(c.retryRows[i]), row[0].GetInt64()) + + afterExecuteSQL = func() { + if c.cancelCtx { + if len(sqls) == c.cancelCtxBatch { + cancel() + } + } + } + + task := delTask(c.batchCnt) + require.Equal(t, len(task.rows), c.batchCnt*delBatch) + sqls = make([]string, 0, c.batchCnt) + retryErrBatches = c.retryErrBatches + nonRetryBatches = c.noRetryErrBatches + retryRows := task.doDelete(ctx, s) + realBatchCnt := c.batchCnt + if c.cancelCtx { + realBatchCnt = c.cancelCtxBatch } - require.Equal(t, uint64(10), c.task.statistics.TotalRows.Load()) - require.Equal(t, uint64(c.successRows), c.task.statistics.SuccessRows.Load()) - require.Equal(t, uint64(c.errorRows), c.task.statistics.ErrorRows.Load()) + require.LessOrEqual(t, realBatchCnt, c.batchCnt) + + // check SQLs + require.Equal(t, realBatchCnt, len(sqls)) + expectedSQLs := make([]string, 0, len(sqls)) + for i := 0; i < realBatchCnt; i++ { + batch := task.rows[i*delBatch : (i+1)*delBatch] + idList := make([]string, 0, delBatch) + for _, row := range batch { + idList = append(idList, strconv.FormatInt(row[0].GetInt64(), 10)) + } + sql := fmt.Sprintf("DELETE LOW_PRIORITY FROM `test`.`t1` "+ + "WHERE `_tidb_rowid` IN (%s) AND `time` < FROM_UNIXTIME(0) LIMIT %d", + strings.Join(idList, ", "), + delBatch, + ) + expectedSQLs = append(expectedSQLs, sql) + } + require.Equal(t, strings.Join(expectedSQLs, "\n"), strings.Join(sqls, "\n")) + + // check retry rows + var expectedRetryRows [][]types.Datum + for i := 0; i < realBatchCnt; i++ { + if slices.Contains(c.retryErrBatches, i) { + expectedRetryRows = append(expectedRetryRows, task.rows[i*delBatch:(i+1)*delBatch]...) + } + } + require.Equal(t, expectedRetryRows, retryRows) + + // check statistics + var expectedErrRows uint64 + for i := 0; i < c.batchCnt; i++ { + if i >= realBatchCnt || slices.Contains(c.noRetryErrBatches, i) { + expectedErrRows += uint64(delBatch) + } + } + expectedSuccessRows := uint64(len(task.rows)) - expectedErrRows - uint64(len(expectedRetryRows)) + require.Equal(t, expectedSuccessRows, task.statistics.SuccessRows.Load()) + require.Equal(t, expectedErrRows, task.statistics.ErrorRows.Load()) } } @@ -317,30 +383,49 @@ func TestTTLDeleteTaskWorker(t *testing.T) { t1 := newMockTTLTbl(t, "t1") t2 := newMockTTLTbl(t, "t2") t3 := newMockTTLTbl(t, "t3") + t4 := newMockTTLTbl(t, "t4") s := newMockSession(t) pool := newMockSessionPool(t) pool.se = s - sqlMap := make(map[string]struct{}) - s.executeSQL = func(ctx context.Context, sql string, args ...interface{}) ([]chunk.Row, error) { - pool.lastSession.sessionInfoSchema = newMockInfoSchema(t1.TableInfo, t2.TableInfo, t3.TableInfo) + sqlMap := make(map[string]int) + t3Retried := make(chan struct{}) + t4Retried := make(chan struct{}) + s.executeSQL = func(ctx context.Context, sql string, args ...any) ([]chunk.Row, error) { + pool.lastSession.sessionInfoSchema = newMockInfoSchema(t1.TableInfo, t2.TableInfo, t3.TableInfo, t4.TableInfo) if strings.Contains(sql, "`t1`") { + // success return nil, nil } if strings.Contains(sql, "`t2`") { + // first error, retry success if _, ok := sqlMap[sql]; ok { + close(t3Retried) return nil, nil } - sqlMap[sql] = struct{}{} + sqlMap[sql] = 1 return nil, errors.New("mockErr") } if strings.Contains(sql, "`t3`") { + // error no retry pool.lastSession.sessionInfoSchema = newMockInfoSchema() return nil, nil } + if strings.Contains(sql, "`t4`") { + // error and retry still error + // this is to test the retry buffer should be drained after the delete worker stopped + i := sqlMap[sql] + if i >= 2 { + // i >= 2 means t4 has retried once and records in retry buffer + close(t4Retried) + } + sqlMap[sql] = i + 1 + return nil, errors.New("mockErr") + } + require.FailNow(t, "") return nil, nil } @@ -348,6 +433,7 @@ func TestTTLDeleteTaskWorker(t *testing.T) { delCh := make(chan *ttlDeleteTask) w := newDeleteWorker(delCh, pool) w.retryBuffer.retryInterval = time.Millisecond + w.retryBuffer.maxRetry = math.MaxInt require.Equal(t, workerStatusCreated, w.Status()) w.Start() require.Equal(t, workerStatusRunning, w.Status()) @@ -357,7 +443,7 @@ func TestTTLDeleteTaskWorker(t *testing.T) { }() tasks := make([]*ttlDeleteTask, 0) - for _, tbl := range []*cache.PhysicalTable{t1, t2, t3} { + for _, tbl := range []*cache.PhysicalTable{t1, t2, t3, t4} { task := &ttlDeleteTask{ tbl: tbl, expire: time.UnixMilli(0), @@ -377,7 +463,23 @@ func TestTTLDeleteTaskWorker(t *testing.T) { } } - time.Sleep(time.Millisecond * 100) + select { + case <-t3Retried: + case <-time.After(time.Second): + require.FailNow(t, "") + } + + select { + case <-t4Retried: + case <-time.After(time.Second): + require.FailNow(t, "") + } + + // before stop, t4 should always retry without any error rows + require.Equal(t, uint64(0), tasks[3].statistics.ErrorRows.Load()) + w.Stop() + require.NoError(t, w.WaitStopped(context.Background(), 10*time.Second)) + require.Equal(t, uint64(3), tasks[0].statistics.SuccessRows.Load()) require.Equal(t, uint64(0), tasks[0].statistics.ErrorRows.Load()) @@ -386,4 +488,7 @@ func TestTTLDeleteTaskWorker(t *testing.T) { require.Equal(t, uint64(0), tasks[2].statistics.SuccessRows.Load()) require.Equal(t, uint64(3), tasks[2].statistics.ErrorRows.Load()) + + require.Equal(t, uint64(0), tasks[3].statistics.SuccessRows.Load()) + require.Equal(t, uint64(3), tasks[3].statistics.ErrorRows.Load()) } diff --git a/ttl/ttlworker/scan.go b/ttl/ttlworker/scan.go index 37855d0f2e238..aa0f93712f04a 100644 --- a/ttl/ttlworker/scan.go +++ b/ttl/ttlworker/scan.go @@ -81,12 +81,13 @@ type ttlScanTask struct { } type ttlScanTaskExecResult struct { + time time.Time task *ttlScanTask err error } func (t *ttlScanTask) result(err error) *ttlScanTaskExecResult { - return &ttlScanTaskExecResult{task: t, err: err} + return &ttlScanTaskExecResult{time: time.Now(), task: t, err: err} } func (t *ttlScanTask) getDatumRows(rows []chunk.Row) [][]types.Datum { diff --git a/ttl/ttlworker/task_manager.go b/ttl/ttlworker/task_manager.go index 20f70d7fc49ac..a3fa943761d41 100644 --- a/ttl/ttlworker/task_manager.go +++ b/ttl/ttlworker/task_manager.go @@ -72,6 +72,10 @@ func updateTTLTaskHeartBeatSQL(jobID string, scanID int64, now time.Time, state const countRunningTasks = "SELECT count(1) FROM mysql.tidb_ttl_task WHERE status = 'running'" +// waitTaskProcessRowTimeout is the timeout for waiting the task to process all rows after a scan task finished. +// If not all rows are processed after this timeout, the task will still be marked as finished. +const waitTaskProcessRowsTimeout = 5 * time.Minute + var errAlreadyScheduled = errors.New("task is already scheduled") var errTooManyRunningTasks = errors.New("there are too many running tasks") @@ -454,7 +458,7 @@ func (m *taskManager) updateHeartBeat(ctx context.Context, se session.Session, n func (m *taskManager) checkFinishedTask(se session.Session, now time.Time) { stillRunningTasks := make([]*runningScanTask, 0, len(m.runningTasks)) for _, task := range m.runningTasks { - if !task.finished() { + if !task.finished(logutil.Logger(m.ctx)) { stillRunningTasks = append(stillRunningTasks, task) continue } @@ -587,6 +591,57 @@ func (t *runningScanTask) Context() context.Context { return t.ctx } -func (t *runningScanTask) finished() bool { - return t.result != nil && t.statistics.TotalRows.Load() == t.statistics.ErrorRows.Load()+t.statistics.SuccessRows.Load() +func (t *runningScanTask) finished(logger *zap.Logger) bool { + if t.result == nil { + // Scan task isn't finished + return false + } + + logger = logger.With( + zap.String("jobID", t.JobID), + zap.Int64("scanID", t.ScanID), + zap.String("table", t.tbl.Name.O), + ) + + totalRows := t.statistics.TotalRows.Load() + errRows := t.statistics.ErrorRows.Load() + successRows := t.statistics.SuccessRows.Load() + processedRows := successRows + errRows + if processedRows == totalRows { + // All rows are processed. + logger.Info( + "mark TTL task finished because all scanned rows are processed", + zap.Uint64("totalRows", totalRows), + zap.Uint64("successRows", successRows), + zap.Uint64("errorRows", errRows), + ) + return true + } + + if processedRows > totalRows { + // All rows are processed but processed rows are more than total rows. + // We still think it is finished. + logger.Warn( + "mark TTL task finished but processed rows are more than total rows", + zap.Uint64("totalRows", totalRows), + zap.Uint64("successRows", successRows), + zap.Uint64("errorRows", errRows), + ) + return true + } + + if time.Since(t.result.time) > waitTaskProcessRowsTimeout { + // If the scan task is finished and not all rows are processed, we should wait a certain time to report the task. + // After a certain time, if the rows are still not processed, we need to mark the task finished anyway to make + // sure the TTL job does not hang. + logger.Info( + "mark TTL task finished because timeout for waiting all scanned rows processed after scan task done", + zap.Uint64("totalRows", totalRows), + zap.Uint64("successRows", successRows), + zap.Uint64("errorRows", errRows), + ) + return true + } + + return false } diff --git a/ttl/ttlworker/task_manager_test.go b/ttl/ttlworker/task_manager_test.go index 624ca4c752c2c..8ac40abc0941c 100644 --- a/ttl/ttlworker/task_manager_test.go +++ b/ttl/ttlworker/task_manager_test.go @@ -19,9 +19,12 @@ import ( "testing" "time" + "github.com/pingcap/errors" "github.com/pingcap/tidb/ttl/cache" "github.com/pingcap/tidb/ttl/session" + "github.com/pingcap/tidb/util/logutil" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) // NewTaskManager is an exported version of newTaskManager for test @@ -71,10 +74,7 @@ func (m *taskManager) MeetTTLRunningTasks(count int, taskStatus cache.TaskStatus // ReportTaskFinished is an exported version of reportTaskFinished func (t *runningScanTask) SetResult(err error) { - t.result = &ttlScanTaskExecResult{ - task: t.ttlScanTask, - err: err, - } + t.result = t.ttlScanTask.result(err) } func TestResizeWorkers(t *testing.T) { @@ -136,11 +136,56 @@ func TestResizeWorkers(t *testing.T) { }, }) - scanWorker2.curTaskResult = &ttlScanTaskExecResult{task: &ttlScanTask{tbl: tbl, TTLTask: &cache.TTLTask{ + task := &ttlScanTask{tbl: tbl, TTLTask: &cache.TTLTask{ JobID: "test-job-id", ScanID: 1, - }}} + }} + scanWorker2.curTaskResult = task.result(nil) assert.NoError(t, m.resizeScanWorkers(1)) scanWorker2.checkWorkerStatus(workerStatusStopped, false, nil) assert.NotNil(t, m.runningTasks[0].result) } + +func TestTaskFinishedCondition(t *testing.T) { + tbl := newMockTTLTbl(t, "t1") + task := runningScanTask{ + ttlScanTask: &ttlScanTask{ + tbl: tbl, + TTLTask: &cache.TTLTask{ + JobID: "test-job-id", + ScanID: 1, + }, + statistics: &ttlStatistics{}, + }, + } + logger := logutil.BgLogger() + + // result == nil means it is not finished, even if all rows processed + require.Nil(t, task.result) + require.False(t, task.finished(logger)) + task.statistics.TotalRows.Store(10) + task.statistics.SuccessRows.Store(10) + require.False(t, task.finished(logger)) + + for _, resultErr := range []error{nil, errors.New("mockErr")} { + // result != nil but not all rows processed means it is not finished + task.statistics.SuccessRows.Store(0) + task.statistics.ErrorRows.Store(0) + task.result = task.ttlScanTask.result(resultErr) + require.InDelta(t, task.result.time.Unix(), time.Now().Unix(), 5) + require.False(t, task.finished(logger)) + task.statistics.SuccessRows.Store(8) + task.statistics.ErrorRows.Store(1) + require.False(t, task.finished(logger)) + + // result != nil but time out means it is finished + task.result = task.ttlScanTask.result(resultErr) + task.result.time = time.Now().Add(-waitTaskProcessRowsTimeout - time.Second) + require.True(t, task.finished(logger)) + + // result != nil and processed rows are more that total rows means it is finished + task.statistics.SuccessRows.Store(8) + task.statistics.ErrorRows.Store(3) + require.True(t, task.finished(logger)) + } +}