diff --git a/metrics/grafana/tidb.json b/metrics/grafana/tidb.json index 25042b63a8bab..f8b125746a804 100644 --- a/metrics/grafana/tidb.json +++ b/metrics/grafana/tidb.json @@ -18049,6 +18049,108 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The TTL task statuses in each worker", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 100 + }, + "hiddenSeries": false, + "id": 294, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.10", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [ + { + "alias": "running", + "color": "#5794F2" + } + ], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(tidb_server_ttl_task_status{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}) by (type, instance)", + "interval": "", + "legendFormat": "{{ instance }} {{ type }}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "TTL Task Count By Status", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "title": "TTL", diff --git a/metrics/metrics.go b/metrics/metrics.go index 633aa551564bc..68a2729f3483c 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -214,6 +214,7 @@ func RegisterMetrics() { prometheus.MustRegister(TTLQueryDuration) prometheus.MustRegister(TTLProcessedExpiredRowsCounter) prometheus.MustRegister(TTLJobStatus) + prometheus.MustRegister(TTLTaskStatus) prometheus.MustRegister(TTLPhaseTime) prometheus.MustRegister(EMACPUUsageGauge) diff --git a/metrics/ttl.go b/metrics/ttl.go index ab7e47e615e28..754744e93d1d8 100644 --- a/metrics/ttl.go +++ b/metrics/ttl.go @@ -43,6 +43,14 @@ var ( Help: "The jobs count in the specified status", }, []string{LblType}) + TTLTaskStatus = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "tidb", + Subsystem: "server", + Name: "ttl_task_status", + Help: "The tasks count in the specified status", + }, []string{LblType}) + TTLPhaseTime = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "tidb", diff --git a/ttl/metrics/metrics.go b/ttl/metrics/metrics.go index 3c8ceee213a14..8bc01551bc2a0 100644 --- a/ttl/metrics/metrics.go +++ b/ttl/metrics/metrics.go @@ -48,6 +48,8 @@ var ( RunningJobsCnt = metrics.TTLJobStatus.With(prometheus.Labels{metrics.LblType: "running"}) CancellingJobsCnt = metrics.TTLJobStatus.With(prometheus.Labels{metrics.LblType: "cancelling"}) + + RunningTaskCnt = metrics.TTLTaskStatus.With(prometheus.Labels{metrics.LblType: "running"}) ) func initWorkerPhases(workerType string) map[string]prometheus.Counter { diff --git a/ttl/ttlworker/BUILD.bazel b/ttl/ttlworker/BUILD.bazel index f314eca5c01ed..55fe5dffc2b8a 100644 --- a/ttl/ttlworker/BUILD.bazel +++ b/ttl/ttlworker/BUILD.bazel @@ -55,6 +55,7 @@ go_test( ], embed = [":ttlworker"], flaky = True, + race = "on", deps = [ "//domain", "//infoschema", @@ -69,6 +70,7 @@ go_test( "//testkit", "//ttl/cache", "//ttl/client", + "//ttl/metrics", "//ttl/session", "//types", "//util/chunk", @@ -76,6 +78,7 @@ go_test( "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", + "@com_github_prometheus_client_model//go", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@org_golang_x_time//rate", diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index 0b427e64318ac..5f8b7bd038fc4 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -45,7 +45,7 @@ const setTableStatusOwnerTemplate = `UPDATE mysql.tidb_ttl_table_status SET current_job_id = %?, current_job_owner_id = %?, current_job_start_time = %?, - current_job_status = 'waiting', + current_job_status = 'running', current_job_status_update_time = %?, current_job_ttl_expire = %?, current_job_owner_hb_time = %? @@ -161,6 +161,7 @@ func (m *JobManager) jobLoop() error { m.taskManager.resizeWorkersWithSysVar() for { m.reportMetrics() + m.taskManager.reportMetrics() now := se.Now() select { @@ -651,7 +652,7 @@ func (m *JobManager) createNewJob(expireTime time.Time, now time.Time, table *ca // information from schema cache directly tbl: table, - status: cache.JobStatusWaiting, + status: cache.JobStatusRunning, } } diff --git a/ttl/ttlworker/job_manager_integration_test.go b/ttl/ttlworker/job_manager_integration_test.go index e2e864344fde3..45fd9e69e2bf8 100644 --- a/ttl/ttlworker/job_manager_integration_test.go +++ b/ttl/ttlworker/job_manager_integration_test.go @@ -34,9 +34,11 @@ import ( "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/ttl/cache" "github.com/pingcap/tidb/ttl/client" + "github.com/pingcap/tidb/ttl/metrics" "github.com/pingcap/tidb/ttl/session" "github.com/pingcap/tidb/ttl/ttlworker" "github.com/pingcap/tidb/util/logutil" + dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" "go.uber.org/atomic" "go.uber.org/zap" @@ -574,3 +576,41 @@ func TestGCTTLHistory(t *testing.T) { ttlworker.DoGC(context.TODO(), se) tk.MustQuery("select job_id from mysql.tidb_ttl_job_history order by job_id asc").Check(testkit.Rows("1", "2", "3", "4", "5")) } + +func TestJobMetrics(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + sessionFactory := sessionFactory(t, store) + + waitAndStopTTLManager(t, dom) + + now := time.Now() + tk.MustExec("create table test.t (id int, created_at datetime) ttl = `created_at` + interval 1 minute ttl_job_interval = '1m'") + table, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnTTL) + + se := sessionFactory() + m := ttlworker.NewJobManager("manager-1", nil, store, nil) + m.TaskManager().ResizeWorkersWithSysVar() + require.NoError(t, m.InfoSchemaCache().Update(se)) + // schedule jobs + m.RescheduleJobs(se, now) + // set the worker to be empty, so none of the tasks will be scheduled + m.TaskManager().SetScanWorkers4Test([]ttlworker.Worker{}) + + sql, args := cache.SelectFromTTLTableStatusWithID(table.Meta().ID) + rows, err := se.ExecuteSQL(ctx, sql, args...) + require.NoError(t, err) + tableStatus, err := cache.RowToTableStatus(se, rows[0]) + require.NoError(t, err) + + require.NotEmpty(t, tableStatus.CurrentJobID) + require.Equal(t, "manager-1", tableStatus.CurrentJobOwnerID) + require.Equal(t, cache.JobStatusRunning, tableStatus.CurrentJobStatus) + + m.ReportMetrics() + out := &dto.Metric{} + require.NoError(t, metrics.RunningJobsCnt.Write(out)) + require.Equal(t, float64(1), out.GetGauge().GetValue()) +} diff --git a/ttl/ttlworker/job_manager_test.go b/ttl/ttlworker/job_manager_test.go index bf7837fc2ee64..311a42072ea91 100644 --- a/ttl/ttlworker/job_manager_test.go +++ b/ttl/ttlworker/job_manager_test.go @@ -171,6 +171,11 @@ func (m *JobManager) UpdateHeartBeat(ctx context.Context, se session.Session, no return m.updateHeartBeat(ctx, se, now) } +// ReportMetrics is an exported version of reportMetrics +func (m *JobManager) ReportMetrics() { + m.reportMetrics() +} + func (j *ttlJob) Finish(se session.Session, now time.Time, summary *TTLSummary) { j.finish(se, now, summary) } diff --git a/ttl/ttlworker/task_manager.go b/ttl/ttlworker/task_manager.go index f20e5cee4f3f6..0db5405034c25 100644 --- a/ttl/ttlworker/task_manager.go +++ b/ttl/ttlworker/task_manager.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/ttl/cache" + "github.com/pingcap/tidb/ttl/metrics" "github.com/pingcap/tidb/ttl/session" "github.com/pingcap/tidb/util/logutil" "go.uber.org/multierr" @@ -327,15 +328,9 @@ func (m *taskManager) lockScanTask(se session.Session, task *cache.TTLTask, now } err := se.RunInTxn(ctx, func() error { - sql, args := cache.SelectFromTTLTaskWithID(task.JobID, task.ScanID) - rows, err := se.ExecuteSQL(ctx, sql+" FOR UPDATE NOWAIT", args...) - if err != nil { - return errors.Wrapf(err, "execute sql: %s", sql) - } - if len(rows) == 0 { - return errors.Errorf("didn't find task with jobID: %s, scanID: %d", task.JobID, task.ScanID) - } - task, err = cache.RowToTTLTask(se, rows[0]) + var err error + + task, err = m.syncTaskFromTable(se, task.JobID, task.ScanID, true) if err != nil { return err } @@ -343,7 +338,7 @@ func (m *taskManager) lockScanTask(se session.Session, task *cache.TTLTask, now return errors.New("task is already scheduled") } - sql, args = setTTLTaskOwnerSQL(task.JobID, task.ScanID, m.id, now) + sql, args := setTTLTaskOwnerSQL(task.JobID, task.ScanID, m.id, now) _, err = se.ExecuteSQL(ctx, sql, args...) if err != nil { return errors.Wrapf(err, "execute sql: %s", sql) @@ -355,6 +350,12 @@ func (m *taskManager) lockScanTask(se session.Session, task *cache.TTLTask, now return nil, err } + // update the task after setting status and owner + task, err = m.syncTaskFromTable(se, task.JobID, task.ScanID, false) + if err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(m.ctx) scanTask := &ttlScanTask{ ctx: ctx, @@ -371,6 +372,28 @@ func (m *taskManager) lockScanTask(se session.Session, task *cache.TTLTask, now }, nil } +func (m *taskManager) syncTaskFromTable(se session.Session, jobID string, scanID int64, detectLock bool) (*cache.TTLTask, error) { + ctx := m.ctx + + sql, args := cache.SelectFromTTLTaskWithID(jobID, scanID) + if detectLock { + sql += " FOR UPDATE NOWAIT" + } + rows, err := se.ExecuteSQL(ctx, sql, args...) + if err != nil { + return nil, errors.Wrapf(err, "execute sql: %s", sql) + } + if len(rows) == 0 { + return nil, errors.Errorf("didn't find task with jobID: %s, scanID: %d", jobID, scanID) + } + task, err := cache.RowToTTLTask(se, rows[0]) + if err != nil { + return nil, err + } + + return task, nil +} + // updateHeartBeat updates the heartbeat for all tasks with current instance as owner func (m *taskManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) error { for _, task := range m.runningTasks { @@ -427,6 +450,7 @@ func (m *taskManager) reportTaskFinished(se session.Session, now time.Time, task if err != nil { return err } + task.Status = cache.TaskStatusFinished timeoutCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout) _, err = se.ExecuteSQL(timeoutCtx, sql, args...) @@ -474,6 +498,10 @@ func (m *taskManager) checkInvalidTask(se session.Session) { m.runningTasks = ownRunningTask } +func (m *taskManager) reportMetrics() { + metrics.RunningTaskCnt.Set(float64(len(m.runningTasks))) +} + type runningScanTask struct { *ttlScanTask cancel func() diff --git a/ttl/ttlworker/task_manager_integration_test.go b/ttl/ttlworker/task_manager_integration_test.go index 8b7d0df5257b0..9e3bad19b2acd 100644 --- a/ttl/ttlworker/task_manager_integration_test.go +++ b/ttl/ttlworker/task_manager_integration_test.go @@ -26,8 +26,10 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/ttl/cache" + "github.com/pingcap/tidb/ttl/metrics" "github.com/pingcap/tidb/ttl/ttlworker" "github.com/pingcap/tidb/util/logutil" + dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" "go.uber.org/atomic" "go.uber.org/zap" @@ -185,3 +187,35 @@ func TestTaskScheduleExpireHeartBeat(t *testing.T) { m2.RescheduleTasks(sessionFactory(), now.Add(time.Hour)) tk.MustQuery("select status,owner_id from mysql.tidb_ttl_task").Check(testkit.Rows("running task-manager-2")) } + +func TestTaskMetrics(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + waitAndStopTTLManager(t, dom) + tk := testkit.NewTestKit(t, store) + sessionFactory := sessionFactory(t, store) + + // create table and scan task + tk.MustExec("create table test.t(id int, created_at datetime) ttl=created_at + interval 1 day") + table, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", table.Meta().ID, 1) + tk.MustExec(sql) + + // update the infoschema cache + isc := cache.NewInfoSchemaCache(time.Second) + require.NoError(t, isc.Update(sessionFactory())) + now := time.Now() + + // schedule in a task manager + scanWorker := ttlworker.NewMockScanWorker(t) + scanWorker.Start() + m := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-1") + m.SetScanWorkers4Test([]ttlworker.Worker{scanWorker}) + m.RescheduleTasks(sessionFactory(), now) + tk.MustQuery("select status,owner_id from mysql.tidb_ttl_task").Check(testkit.Rows("running task-manager-1")) + + m.ReportMetrics() + out := &dto.Metric{} + require.NoError(t, metrics.RunningTaskCnt.Write(out)) + require.Equal(t, float64(1), out.GetGauge().GetValue()) +} diff --git a/ttl/ttlworker/task_manager_test.go b/ttl/ttlworker/task_manager_test.go index 9241146b719b3..37365cb9757f6 100644 --- a/ttl/ttlworker/task_manager_test.go +++ b/ttl/ttlworker/task_manager_test.go @@ -49,6 +49,11 @@ func (m *taskManager) RescheduleTasks(se session.Session, now time.Time) { m.rescheduleTasks(se, now) } +// ReportMetrics is an exported version of reportMetrics +func (m *taskManager) ReportMetrics() { + m.reportMetrics() +} + func TestResizeWorkers(t *testing.T) { tbl := newMockTTLTbl(t, "t1")