Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl, domain: setup a customized session pool with stats collector #40171

Merged
merged 1 commit into from
Dec 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 13 additions & 14 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1062,10 +1062,6 @@ func (do *Domain) Init(
return err
}

do.wg.Run(func() {
do.runTTLJobManager(ctx)
})

return nil
}

Expand Down Expand Up @@ -2457,18 +2453,21 @@ func (do *Domain) serverIDKeeper() {
}
}

func (do *Domain) runTTLJobManager(ctx context.Context) {
ttlJobManager := ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool, do.store)
ttlJobManager.Start()
do.ttlJobManager = ttlJobManager
// StartTTLJobManager creates and starts the ttl job manager
func (do *Domain) StartTTLJobManager() {
do.wg.Run(func() {
ttlJobManager := ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool, do.store)
do.ttlJobManager = ttlJobManager
ttlJobManager.Start()

<-do.exit
<-do.exit

ttlJobManager.Stop()
err := ttlJobManager.WaitStopped(ctx, 30*time.Second)
if err != nil {
logutil.BgLogger().Warn("fail to wait until the ttl job manager stop", zap.Error(err))
}
ttlJobManager.Stop()
err := ttlJobManager.WaitStopped(context.Background(), 30*time.Second)
if err != nil {
logutil.BgLogger().Warn("fail to wait until the ttl job manager stop", zap.Error(err))
}
})
}

// TTLJobManager returns the ttl job manager on this domain
Expand Down
1 change: 1 addition & 0 deletions session/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ go_library(
"//table/temptable",
"//tablecodec",
"//telemetry",
"//ttl/ttlworker",
"//types",
"//types/parser_driver",
"//util",
Expand Down
37 changes: 37 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ import (
"github.com/pingcap/tidb/table/temptable"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/telemetry"
"github.com/pingcap/tidb/ttl/ttlworker"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/collate"
Expand Down Expand Up @@ -3418,6 +3419,22 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
return nil, err
}

// start TTL job manager after setup stats collector
// because TTL could modify a lot of columns, and need to trigger auto analyze
ttlworker.AttachStatsCollector = func(s sqlexec.SQLExecutor) sqlexec.SQLExecutor {
if s, ok := s.(*session); ok {
return attachStatsCollector(s, dom)
}
return s
}
ttlworker.DetachStatsCollector = func(s sqlexec.SQLExecutor) sqlexec.SQLExecutor {
if s, ok := s.(*session); ok {
return detachStatsCollector(s)
}
return s
}
dom.StartTTLJobManager()

analyzeCtxs, err := createSessions(store, analyzeConcurrencyQuota)
if err != nil {
return nil, err
Expand Down Expand Up @@ -3523,6 +3540,26 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) {
return s, nil
}

// attachStatsCollector attaches the stats collector in the dom for the session
func attachStatsCollector(s *session, dom *domain.Domain) *session {
if dom.StatsHandle() != nil && dom.StatsUpdating() {
s.statsCollector = dom.StatsHandle().NewSessionStatsCollector()
if GetIndexUsageSyncLease() > 0 {
s.idxUsageCollector = dom.StatsHandle().NewSessionIndexUsageCollector()
}
}

return s
}

// detachStatsCollector removes the stats collector in the session
func detachStatsCollector(s *session) *session {
s.statsCollector = nil
s.idxUsageCollector = nil

return s
}

// CreateSessionWithDomain creates a new Session and binds it with a Domain.
// We need this because when we start DDL in Domain, the DDL need a session
// to change some system tables. But at that time, we have been already in
Expand Down
3 changes: 3 additions & 0 deletions ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
"//util/timeutil",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@org_golang_x_time//rate",
"@org_uber_go_multierr//:multierr",
"@org_uber_go_zap//:zap",
Expand Down Expand Up @@ -58,6 +59,7 @@ go_test(
"//session",
"//sessionctx",
"//sessionctx/variable",
"//statistics/handle",
"//testkit",
"//ttl/cache",
"//ttl/session",
Expand All @@ -66,6 +68,7 @@ go_test(
"//util/logutil",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_x_time//rate",
Expand Down
23 changes: 23 additions & 0 deletions ttl/ttlworker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package ttlworker

import (
"time"

"github.com/pingcap/failpoint"
)

const jobManagerLoopTickerInterval = 10 * time.Second
Expand All @@ -27,3 +29,24 @@ const ttlInternalSQLTimeout = 30 * time.Second
const resizeWorkersInterval = 30 * time.Second
const splitScanCount = 64
const ttlJobTimeout = 6 * time.Hour

func getUpdateInfoSchemaCacheInterval() time.Duration {
failpoint.Inject("update-info-schema-cache-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
})
return updateInfoSchemaCacheInterval
}

func getUpdateTTLTableStatusCacheInterval() time.Duration {
failpoint.Inject("update-status-table-cache-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
})
return updateTTLTableStatusCacheInterval
}

func getResizeWorkersInterval() time.Duration {
failpoint.Inject("resize-workers-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
})
return resizeWorkersInterval
}
8 changes: 4 additions & 4 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ func NewJobManager(id string, sessPool sessionPool, store kv.Storage) (manager *
manager.init(manager.jobLoop)
manager.ctx = logutil.WithKeyValue(manager.ctx, "ttl-worker", "manager")

manager.infoSchemaCache = cache.NewInfoSchemaCache(updateInfoSchemaCacheInterval)
manager.tableStatusCache = cache.NewTableStatusCache(updateTTLTableStatusCacheInterval)
manager.infoSchemaCache = cache.NewInfoSchemaCache(getUpdateInfoSchemaCacheInterval())
manager.tableStatusCache = cache.NewTableStatusCache(getUpdateTTLTableStatusCacheInterval())

return
}
Expand All @@ -125,7 +125,7 @@ func (m *JobManager) jobLoop() error {
updateScanTaskStateTicker := time.Tick(jobManagerLoopTickerInterval)
infoSchemaCacheUpdateTicker := time.Tick(m.infoSchemaCache.GetInterval())
tableStatusCacheUpdateTicker := time.Tick(m.tableStatusCache.GetInterval())
resizeWorkersTicker := time.Tick(resizeWorkersInterval)
resizeWorkersTicker := time.Tick(getResizeWorkersInterval())
for {
m.reportMetrics()
now := se.Now()
Expand Down Expand Up @@ -487,7 +487,7 @@ func (m *JobManager) couldTrySchedule(table *cache.TableStatus, now time.Time) b
hbTime := table.CurrentJobOwnerHBTime
// a more concrete value is `2 * max(updateTTLTableStatusCacheInterval, jobManagerLoopTickerInterval)`, but the
// `updateTTLTableStatusCacheInterval` is greater than `jobManagerLoopTickerInterval` in most cases.
if hbTime.Add(2 * updateTTLTableStatusCacheInterval).Before(now) {
if hbTime.Add(2 * getUpdateTTLTableStatusCacheInterval()).Before(now) {
logutil.Logger(m.ctx).Info("task heartbeat has stopped", zap.Int64("tableID", table.TableID), zap.Time("hbTime", hbTime), zap.Time("now", now))
return true
}
Expand Down
57 changes: 57 additions & 0 deletions ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
dbsession "github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/ttl/cache"
"github.com/pingcap/tidb/ttl/session"
Expand Down Expand Up @@ -123,3 +125,58 @@ func TestFinishJob(t *testing.T) {

tk.MustQuery("select table_id, last_job_summary from mysql.tidb_ttl_table_status").Check(testkit.Rows("2 {\"total_rows\":0,\"success_rows\":0,\"error_rows\":0,\"total_scan_task\":1,\"scheduled_scan_task\":0,\"finished_scan_task\":0,\"scan_task_err\":\"\\\"'an error message contains both single and double quote'\\\"\"}"))
}

func TestTTLAutoAnalyze(t *testing.T) {
failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/update-info-schema-cache-interval", fmt.Sprintf("return(%d)", time.Second))
failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/update-status-table-cache-interval", fmt.Sprintf("return(%d)", time.Second))
failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/resize-workers-interval", fmt.Sprintf("return(%d)", time.Second))
originAutoAnalyzeMinCnt := handle.AutoAnalyzeMinCnt
handle.AutoAnalyzeMinCnt = 0
defer func() {
handle.AutoAnalyzeMinCnt = originAutoAnalyzeMinCnt
}()

store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("create table t (id int, created_at datetime) ttl = `created_at` + interval 1 day")

// insert ten rows, the 2,3,4,6,9,10 of them are expired
for i := 1; i <= 10; i++ {
t := time.Now()
if i%2 == 0 || i%3 == 0 {
t = t.Add(-time.Hour * 48)
}

tk.MustExec("insert into t values(?, ?)", i, t.Format(time.RFC3339))
}
// TODO: use a better way to pause and restart ttl worker after analyze the table to make it more stable
// but as the ttl worker takes several seconds to start, it's not too serious.
tk.MustExec("analyze table t")
rows := tk.MustQuery("show stats_meta").Rows()
require.Equal(t, rows[0][4], "0")
require.Equal(t, rows[0][5], "10")

retryTime := 15
retryInterval := time.Second * 2
deleted := false
for retryTime >= 0 {
retryTime--
time.Sleep(retryInterval)

rows := tk.MustQuery("select count(*) from t").Rows()
count := rows[0][0].(string)
if count == "3" {
deleted = true
break
}
}
require.True(t, deleted, "ttl should remove expired rows")

h := dom.StatsHandle()
is := dom.InfoSchema()
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll))
require.NoError(t, h.Update(is))
require.True(t, h.HandleAutoAnalyze(is))
}
33 changes: 22 additions & 11 deletions ttl/ttlworker/job_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ func newMockTTLJob(tbl *cache.PhysicalTable, status cache.JobStatus) *ttlJob {

func TestReadyForNewJobTables(t *testing.T) {
tbl := newMockTTLTbl(t, "t1")
m := NewJobManager("test-id", newMockSessionPool(t, tbl), nil)
m := NewJobManager("test-id", nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
se := newMockSession(t, tbl)

cases := []struct {
Expand Down Expand Up @@ -295,7 +296,8 @@ func TestLockNewTable(t *testing.T) {
t.Run(c.name, func(t *testing.T) {
tbl := newMockTTLTbl(t, "t1")

m := NewJobManager("test-id", newMockSessionPool(t, tbl), nil)
m := NewJobManager("test-id", nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
sqlCounter := 0
se := newMockSession(t, tbl)
se.executeSQL = func(ctx context.Context, sql string, args ...interface{}) (rows []chunk.Row, err error) {
Expand Down Expand Up @@ -333,7 +335,8 @@ func TestResizeWorkers(t *testing.T) {
scanWorker1.Start()
scanWorker2 := newMockScanWorker(t)

m := NewJobManager("test-id", newMockSessionPool(t, tbl), nil)
m := NewJobManager("test-id", nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
m.SetScanWorkers4Test([]worker{
scanWorker1,
})
Expand All @@ -351,7 +354,8 @@ func TestResizeWorkers(t *testing.T) {
scanWorker2 = newMockScanWorker(t)
scanWorker2.Start()

m = NewJobManager("test-id", newMockSessionPool(t, tbl), nil)
m = NewJobManager("test-id", nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
m.SetScanWorkers4Test([]worker{
scanWorker1,
scanWorker2,
Expand All @@ -366,7 +370,8 @@ func TestResizeWorkers(t *testing.T) {
scanWorker2 = newMockScanWorker(t)
scanWorker2.Start()

m = NewJobManager("test-id", newMockSessionPool(t, tbl), nil)
m = NewJobManager("test-id", nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
m.SetScanWorkers4Test([]worker{
scanWorker1,
scanWorker2,
Expand All @@ -384,7 +389,8 @@ func TestLocalJobs(t *testing.T) {
tbl1.ID = 1
tbl2 := newMockTTLTbl(t, "t2")
tbl2.ID = 2
m := NewJobManager("test-id", newMockSessionPool(t, tbl1, tbl2), nil)
m := NewJobManager("test-id", nil, nil)
m.sessPool = newMockSessionPool(t, tbl1, tbl2)

m.runningJobs = []*ttlJob{{tbl: tbl1, id: "1", ctx: context.Background()}, {tbl: tbl2, id: "2", ctx: context.Background()}}
m.tableStatusCache.Tables = map[int64]*cache.TableStatus{
Expand All @@ -410,7 +416,8 @@ func TestRescheduleJobs(t *testing.T) {
scanWorker2.Start()
scanWorker2.setOneRowResult(tbl, 2022)

m := NewJobManager("test-id", newMockSessionPool(t, tbl), nil)
m := NewJobManager("test-id", nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
m.SetScanWorkers4Test([]worker{
scanWorker1,
scanWorker2,
Expand Down Expand Up @@ -463,7 +470,8 @@ func TestRescheduleJobsOutOfWindow(t *testing.T) {
scanWorker2.Start()
scanWorker2.setOneRowResult(tbl, 2022)

m := NewJobManager("test-id", newMockSessionPool(t, tbl), nil)
m := NewJobManager("test-id", nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
m.SetScanWorkers4Test([]worker{
scanWorker1,
scanWorker2,
Expand Down Expand Up @@ -508,7 +516,8 @@ func TestCheckFinishedJob(t *testing.T) {
se := newMockSession(t, tbl)

// cancelled job will be regarded as finished
m := NewJobManager("test-id", newMockSessionPool(t, tbl), nil)
m := NewJobManager("test-id", nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
m.runningJobs = []*ttlJob{newMockTTLJob(tbl, cache.JobStatusCancelled)}
m.checkFinishedJob(se, se.Now())
assert.Len(t, m.runningJobs, 0)
Expand All @@ -517,7 +526,8 @@ func TestCheckFinishedJob(t *testing.T) {
finishedStatistics := &ttlStatistics{}
finishedStatistics.TotalRows.Store(1)
finishedStatistics.SuccessRows.Store(1)
m = NewJobManager("test-id", newMockSessionPool(t, tbl), nil)
m = NewJobManager("test-id", nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
m.runningJobs = []*ttlJob{newMockTTLJob(tbl, cache.JobStatusRunning)}
m.runningJobs[0].statistics = finishedStatistics
m.runningJobs[0].tasks[0].statistics = finishedStatistics
Expand Down Expand Up @@ -545,7 +555,8 @@ func TestCheckFinishedJob(t *testing.T) {
// check timeout job
now = se.Now()
createTime := now.Add(-20 * time.Hour)
m = NewJobManager("test-id", newMockSessionPool(t, tbl), nil)
m = NewJobManager("test-id", nil, nil)
m.sessPool = newMockSessionPool(t, tbl)
m.runningJobs = []*ttlJob{
{
ctx: context.Background(),
Expand Down
Loading