Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl, domain: setup a customized session pool with stats collector (#40171) #40233

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ go_test(
"//tablecodec",
"//testkit",
"//testkit/testsetup",
"//ttl/ttlworker",
"//types",
"//util/codec",
"@com_github_fsouza_fake_gcs_server//fakestorage",
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/restore/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/ttl/ttlworker"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -368,9 +369,12 @@ func TestFilterDDLJobByRules(t *testing.T) {
}

func TestGetExistedUserDBs(t *testing.T) {
ttlworker.SkipTTLJobManager4Test = true

m, err := mock.NewCluster()
require.Nil(t, err)
defer m.Stop()

dom := m.Domain

dbs := restore.GetExistedUserDBs(dom)
Expand Down
27 changes: 13 additions & 14 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1060,10 +1060,6 @@ func (do *Domain) Init(
return err
}

do.wg.Run(func() {
do.runTTLJobManager(ctx)
})

return nil
}

Expand Down Expand Up @@ -2380,18 +2376,21 @@ func (do *Domain) serverIDKeeper() {
}
}

func (do *Domain) runTTLJobManager(ctx context.Context) {
ttlJobManager := ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool, do.store)
ttlJobManager.Start()
do.ttlJobManager = ttlJobManager
// StartTTLJobManager creates and starts the ttl job manager
func (do *Domain) StartTTLJobManager() {
do.wg.Run(func() {
ttlJobManager := ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool, do.store)
do.ttlJobManager = ttlJobManager
ttlJobManager.Start()

<-do.exit
<-do.exit

ttlJobManager.Stop()
err := ttlJobManager.WaitStopped(ctx, 30*time.Second)
if err != nil {
logutil.BgLogger().Warn("fail to wait until the ttl job manager stop", zap.Error(err))
}
ttlJobManager.Stop()
err := ttlJobManager.WaitStopped(context.Background(), 30*time.Second)
if err != nil {
logutil.BgLogger().Warn("fail to wait until the ttl job manager stop", zap.Error(err))
}
})
}

// TTLJobManager returns the ttl job manager on this domain
Expand Down
1 change: 1 addition & 0 deletions session/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ go_library(
"//table/temptable",
"//tablecodec",
"//telemetry",
"//ttl/ttlworker",
"//types",
"//types/parser_driver",
"//util",
Expand Down
37 changes: 37 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ import (
"github.com/pingcap/tidb/table/temptable"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/telemetry"
"github.com/pingcap/tidb/ttl/ttlworker"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/collate"
Expand Down Expand Up @@ -3369,6 +3370,22 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
return nil, err
}

// start TTL job manager after setup stats collector
// because TTL could modify a lot of columns, and need to trigger auto analyze
ttlworker.AttachStatsCollector = func(s sqlexec.SQLExecutor) sqlexec.SQLExecutor {
if s, ok := s.(*session); ok {
return attachStatsCollector(s, dom)
}
return s
}
ttlworker.DetachStatsCollector = func(s sqlexec.SQLExecutor) sqlexec.SQLExecutor {
if s, ok := s.(*session); ok {
return detachStatsCollector(s)
}
return s
}
dom.StartTTLJobManager()

analyzeCtxs, err := createSessions(store, analyzeConcurrencyQuota)
if err != nil {
return nil, err
Expand Down Expand Up @@ -3474,6 +3491,26 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) {
return s, nil
}

// attachStatsCollector attaches the stats collector in the dom for the session
func attachStatsCollector(s *session, dom *domain.Domain) *session {
if dom.StatsHandle() != nil && dom.StatsUpdating() {
s.statsCollector = dom.StatsHandle().NewSessionStatsCollector()
if GetIndexUsageSyncLease() > 0 {
s.idxUsageCollector = dom.StatsHandle().NewSessionIndexUsageCollector()
}
}

return s
}

// detachStatsCollector removes the stats collector in the session
func detachStatsCollector(s *session) *session {
s.statsCollector = nil
s.idxUsageCollector = nil

return s
}

// CreateSessionWithDomain creates a new Session and binds it with a Domain.
// We need this because when we start DDL in Domain, the DDL need a session
// to change some system tables. But at that time, we have been already in
Expand Down
3 changes: 3 additions & 0 deletions ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
"//util/timeutil",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@org_golang_x_time//rate",
"@org_uber_go_multierr//:multierr",
"@org_uber_go_zap//:zap",
Expand Down Expand Up @@ -58,6 +59,7 @@ go_test(
"//session",
"//sessionctx",
"//sessionctx/variable",
"//statistics/handle",
"//testkit",
"//ttl/cache",
"//ttl/session",
Expand All @@ -66,6 +68,7 @@ go_test(
"//util/logutil",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_x_time//rate",
Expand Down
23 changes: 23 additions & 0 deletions ttl/ttlworker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package ttlworker

import (
"time"

"github.com/pingcap/failpoint"
)

const jobManagerLoopTickerInterval = 10 * time.Second
Expand All @@ -27,3 +29,24 @@ const ttlInternalSQLTimeout = 30 * time.Second
const resizeWorkersInterval = 30 * time.Second
const splitScanCount = 64
const ttlJobTimeout = 6 * time.Hour

func getUpdateInfoSchemaCacheInterval() time.Duration {
failpoint.Inject("update-info-schema-cache-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
})
return updateInfoSchemaCacheInterval
}

func getUpdateTTLTableStatusCacheInterval() time.Duration {
failpoint.Inject("update-status-table-cache-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
})
return updateTTLTableStatusCacheInterval
}

func getResizeWorkersInterval() time.Duration {
failpoint.Inject("resize-workers-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
})
return resizeWorkersInterval
}
18 changes: 14 additions & 4 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ const updateHeartBeatTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_

const timeFormat = "2006-01-02 15:04:05"

// SkipTTLJobManager4Test skips the bootstrap of TTLJobManager if it's true. It's used to avoid data race in test.
var SkipTTLJobManager4Test bool

func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) (string, []interface{}) {
return insertNewTableIntoStatusTemplate, []interface{}{tableID, parentTableID}
}
Expand Down Expand Up @@ -99,11 +102,18 @@ func NewJobManager(id string, sessPool sessionPool, store kv.Storage) (manager *
manager.delCh = make(chan *ttlDeleteTask)
manager.notifyStateCh = make(chan interface{}, 1)

if SkipTTLJobManager4Test {
manager.init(func() error {
return nil
})
return
}

manager.init(manager.jobLoop)
manager.ctx = logutil.WithKeyValue(manager.ctx, "ttl-worker", "manager")

manager.infoSchemaCache = cache.NewInfoSchemaCache(updateInfoSchemaCacheInterval)
manager.tableStatusCache = cache.NewTableStatusCache(updateTTLTableStatusCacheInterval)
manager.infoSchemaCache = cache.NewInfoSchemaCache(getUpdateInfoSchemaCacheInterval())
manager.tableStatusCache = cache.NewTableStatusCache(getUpdateTTLTableStatusCacheInterval())

return
}
Expand All @@ -125,7 +135,7 @@ func (m *JobManager) jobLoop() error {
updateScanTaskStateTicker := time.Tick(jobManagerLoopTickerInterval)
infoSchemaCacheUpdateTicker := time.Tick(m.infoSchemaCache.GetInterval())
tableStatusCacheUpdateTicker := time.Tick(m.tableStatusCache.GetInterval())
resizeWorkersTicker := time.Tick(resizeWorkersInterval)
resizeWorkersTicker := time.Tick(getResizeWorkersInterval())
for {
m.reportMetrics()
now := se.Now()
Expand Down Expand Up @@ -487,7 +497,7 @@ func (m *JobManager) couldTrySchedule(table *cache.TableStatus, now time.Time) b
hbTime := table.CurrentJobOwnerHBTime
// a more concrete value is `2 * max(updateTTLTableStatusCacheInterval, jobManagerLoopTickerInterval)`, but the
// `updateTTLTableStatusCacheInterval` is greater than `jobManagerLoopTickerInterval` in most cases.
if hbTime.Add(2 * updateTTLTableStatusCacheInterval).Before(now) {
if hbTime.Add(2 * getUpdateTTLTableStatusCacheInterval()).Before(now) {
logutil.Logger(m.ctx).Info("task heartbeat has stopped", zap.Int64("tableID", table.TableID), zap.Time("hbTime", hbTime), zap.Time("now", now))
return true
}
Expand Down
57 changes: 57 additions & 0 deletions ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
dbsession "github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/ttl/cache"
"github.com/pingcap/tidb/ttl/session"
Expand Down Expand Up @@ -123,3 +125,58 @@ func TestFinishJob(t *testing.T) {

tk.MustQuery("select table_id, last_job_summary from mysql.tidb_ttl_table_status").Check(testkit.Rows("2 {\"total_rows\":0,\"success_rows\":0,\"error_rows\":0,\"total_scan_task\":1,\"scheduled_scan_task\":0,\"finished_scan_task\":0,\"scan_task_err\":\"\\\"'an error message contains both single and double quote'\\\"\"}"))
}

func TestTTLAutoAnalyze(t *testing.T) {
failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/update-info-schema-cache-interval", fmt.Sprintf("return(%d)", time.Second))
failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/update-status-table-cache-interval", fmt.Sprintf("return(%d)", time.Second))
failpoint.Enable("github.com/pingcap/tidb/ttl/ttlworker/resize-workers-interval", fmt.Sprintf("return(%d)", time.Second))
originAutoAnalyzeMinCnt := handle.AutoAnalyzeMinCnt
handle.AutoAnalyzeMinCnt = 0
defer func() {
handle.AutoAnalyzeMinCnt = originAutoAnalyzeMinCnt
}()

store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("create table t (id int, created_at datetime) ttl = `created_at` + interval 1 day")

// insert ten rows, the 2,3,4,6,9,10 of them are expired
for i := 1; i <= 10; i++ {
t := time.Now()
if i%2 == 0 || i%3 == 0 {
t = t.Add(-time.Hour * 48)
}

tk.MustExec("insert into t values(?, ?)", i, t.Format(time.RFC3339))
}
// TODO: use a better way to pause and restart ttl worker after analyze the table to make it more stable
// but as the ttl worker takes several seconds to start, it's not too serious.
tk.MustExec("analyze table t")
rows := tk.MustQuery("show stats_meta").Rows()
require.Equal(t, rows[0][4], "0")
require.Equal(t, rows[0][5], "10")

retryTime := 15
retryInterval := time.Second * 2
deleted := false
for retryTime >= 0 {
retryTime--
time.Sleep(retryInterval)

rows := tk.MustQuery("select count(*) from t").Rows()
count := rows[0][0].(string)
if count == "3" {
deleted = true
break
}
}
require.True(t, deleted, "ttl should remove expired rows")

h := dom.StatsHandle()
is := dom.InfoSchema()
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll))
require.NoError(t, h.Update(is))
require.True(t, h.HandleAutoAnalyze(is))
}
Loading