Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: add more metrics for TTL #41155

Merged
merged 6 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
600 changes: 489 additions & 111 deletions metrics/grafana/tidb.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TTLJobStatus)
prometheus.MustRegister(TTLTaskStatus)
prometheus.MustRegister(TTLPhaseTime)
prometheus.MustRegister(TTLInsertRowsCount)

prometheus.MustRegister(EMACPUUsageGauge)
prometheus.MustRegister(PoolConcurrencyCounter)
Expand Down
8 changes: 8 additions & 0 deletions metrics/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,12 @@ var (
Name: "ttl_phase_time",
Help: "The time spent in each phase",
}, []string{LblType, LblPhase})

TTLInsertRowsCount = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "server",
Name: "ttl_insert_rows",
Help: "The count of TTL rows inserted",
})
)
3 changes: 3 additions & 0 deletions session/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ go_test(
"//expression",
"//kv",
"//meta",
"//metrics",
"//parser/ast",
"//parser/auth",
"//parser/model",
Expand All @@ -162,6 +163,8 @@ go_test(
"//util/sqlexec",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_prometheus_client_model//go",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_client_go_v2//tikv",
Expand Down
3 changes: 3 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,9 @@ func (s *session) CommitTxn(ctx context.Context) error {
s.sessionVars.StmtCtx.MergeExecDetails(nil, commitDetail)
}

// record the TTLInsertRows in the metric
metrics.TTLInsertRowsCount.Add(float64(s.sessionVars.TxnCtx.InsertTTLRowsCount))

failpoint.Inject("keepHistory", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(err)
Expand Down
49 changes: 49 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import (
"testing"

"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/testkit/external"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -89,3 +92,49 @@ func TestMetaTableRegion(t *testing.T) {

require.NotEqual(t, ddlBackfillTableRegionID, ddlBackfillHistoryTableRegionID)
}

func MustReadCounter(t *testing.T, m prometheus.Counter) float64 {
pb := &dto.Metric{}
require.NoError(t, m.Write(pb))
return pb.GetCounter().GetValue()
}

func TestRecordTTLRows(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("create table t(created_at datetime) TTL = created_at + INTERVAL 1 DAY")
// simple insert should be recorded
tk.MustExec("insert into t values (NOW())")
require.Equal(t, 1.0, MustReadCounter(t, metrics.TTLInsertRowsCount))

// insert in a explicit transaction should be recorded
tk.MustExec("begin")
tk.MustExec("insert into t values (NOW())")
tk.MustExec("commit")
require.Equal(t, 2.0, MustReadCounter(t, metrics.TTLInsertRowsCount))

// insert multiple rows should be the same
tk.MustExec("begin")
tk.MustExec("insert into t values (NOW())")
tk.MustExec("insert into t values (NOW())")
tk.MustExec("commit")
require.Equal(t, 4.0, MustReadCounter(t, metrics.TTLInsertRowsCount))

// rollback will remove all recorded TTL rows
tk.MustExec("begin")
tk.MustExec("insert into t values (NOW())")
tk.MustExec("insert into t values (NOW())")
tk.MustExec("rollback")
require.Equal(t, 6.0, MustReadCounter(t, metrics.TTLInsertRowsCount))

// savepoint will save the recorded TTL rows
tk.MustExec("begin")
tk.MustExec("insert into t values (NOW())")
tk.MustExec("savepoint insert1")
tk.MustExec("insert into t values (NOW())")
tk.MustExec("rollback to insert1")
tk.MustExec("commit")
require.Equal(t, 7.0, MustReadCounter(t, metrics.TTLInsertRowsCount))
}
5 changes: 5 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ type TxnCtxNeedToRestore struct {

// CachedTables is not nil if the transaction write on cached table.
CachedTables map[int64]interface{}

// InsertTTLRowsCount counts how many rows are inserted in this statement
InsertTTLRowsCount int
}

// TxnCtxNoNeedToRestore stores transaction variables which do not need to restored when rolling back to a savepoint.
Expand Down Expand Up @@ -377,6 +380,7 @@ func (tc *TransactionContext) GetCurrentSavepoint() TxnCtxNeedToRestore {
TableDeltaMap: tableDeltaMap,
pessimisticLockCache: pessimisticLockCache,
CachedTables: cachedTables,
InsertTTLRowsCount: tc.InsertTTLRowsCount,
}
}

Expand All @@ -385,6 +389,7 @@ func (tc *TransactionContext) RestoreBySavepoint(savepoint TxnCtxNeedToRestore)
tc.TableDeltaMap = savepoint.TableDeltaMap
tc.pessimisticLockCache = savepoint.pessimisticLockCache
tc.CachedTables = savepoint.CachedTables
tc.InsertTTLRowsCount = savepoint.InsertTTLRowsCount
}

// AddSavepoint adds a new savepoint.
Expand Down
7 changes: 7 additions & 0 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,9 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts .
return nil, err
}
}
if shouldIncreaseTTLMetricCount(t.meta) {
sctx.GetSessionVars().TxnCtx.InsertTTLRowsCount += 1
}
if sessVars.TxnCtx == nil {
return recordID, nil
}
Expand Down Expand Up @@ -1592,6 +1595,10 @@ func shouldWriteBinlog(ctx sessionctx.Context, tblInfo *model.TableInfo) bool {
return !ctx.GetSessionVars().InRestrictedSQL
}

func shouldIncreaseTTLMetricCount(tblInfo *model.TableInfo) bool {
return tblInfo.TTLInfo != nil
}

func (t *TableCommon) getMutation(ctx sessionctx.Context) *binlog.TableMutation {
return ctx.StmtGetMutation(t.tableID)
}
Expand Down
3 changes: 2 additions & 1 deletion ttl/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ var (
RunningJobsCnt = metrics.TTLJobStatus.With(prometheus.Labels{metrics.LblType: "running"})
CancellingJobsCnt = metrics.TTLJobStatus.With(prometheus.Labels{metrics.LblType: "cancelling"})

RunningTaskCnt = metrics.TTLTaskStatus.With(prometheus.Labels{metrics.LblType: "running"})
ScanningTaskCnt = metrics.TTLTaskStatus.With(prometheus.Labels{metrics.LblType: "scanning"})
DeletingTaskCnt = metrics.TTLTaskStatus.With(prometheus.Labels{metrics.LblType: "deleting"})
)

func initWorkerPhases(workerType string) map[string]prometheus.Counter {
Expand Down
12 changes: 11 additions & 1 deletion ttl/ttlworker/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,17 @@ func (m *taskManager) checkInvalidTask(se session.Session) {
}

func (m *taskManager) reportMetrics() {
metrics.RunningTaskCnt.Set(float64(len(m.runningTasks)))
scanningTaskCnt := 0
deletingTaskCnt := 0
for _, task := range m.runningTasks {
if task.result != nil {
scanningTaskCnt += 1
} else {
deletingTaskCnt += 1
}
}
metrics.ScanningTaskCnt.Set(float64(scanningTaskCnt))
metrics.DeletingTaskCnt.Set(float64(deletingTaskCnt))
}

type runningScanTask struct {
Expand Down
2 changes: 1 addition & 1 deletion ttl/ttlworker/task_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func TestTaskMetrics(t *testing.T) {

m.ReportMetrics()
out := &dto.Metric{}
require.NoError(t, metrics.RunningTaskCnt.Write(out))
require.NoError(t, metrics.DeletingTaskCnt.Write(out))
require.Equal(t, float64(1), out.GetGauge().GetValue())
}

Expand Down