Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: add integration test with random fault session for TTL #58110

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/meta/model/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"//pkg/planner/cascades/base",
"//pkg/util/intest",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_tikv_pd_client//http",
"@org_uber_go_atomic//:atomic",
],
Expand Down
5 changes: 5 additions & 0 deletions pkg/meta/model/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"
"unsafe"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/parser/auth"
"github.com/pingcap/tidb/pkg/parser/duration"
"github.com/pingcap/tidb/pkg/parser/model"
Expand Down Expand Up @@ -1351,6 +1352,10 @@ func (t *TTLInfo) Clone() *TTLInfo {
// Didn't set TTL_JOB_INTERVAL during upgrade and bootstrap because setting default value here is much simpler
// and could avoid bugs blocking users from upgrading or bootstrapping the cluster.
func (t *TTLInfo) GetJobInterval() (time.Duration, error) {
failpoint.Inject("overwrite-ttl-job-interval", func(val failpoint.Value) (time.Duration, error) {
return time.Duration(val.(int)), nil
})

if len(t.JobInterval) == 0 {
// This only happens when the table is created from 6.5 in which the `tidb_job_interval` is not introduced yet.
// We use `OldDefaultTTLJobInterval` as the return value to ensure a consistent behavior for the
Expand Down
20 changes: 17 additions & 3 deletions pkg/ttl/ttlworker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ func getCheckJobInterval() time.Duration {
return jobManagerLoopTickerInterval
}

func getHeartbeatInterval() time.Duration {
failpoint.Inject("heartbeat-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
})
return jobManagerLoopTickerInterval
}

func getJobManagerLoopSyncTimerInterval() time.Duration {
failpoint.Inject("sync-timer", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
Expand Down Expand Up @@ -86,11 +93,11 @@ func getTaskManagerLoopTickerInterval() time.Duration {
return taskManagerLoopTickerInterval
}

func getTaskManagerHeartBeatExpireInterval() time.Duration {
failpoint.Inject("task-manager-heartbeat-expire-interval", func(val failpoint.Value) time.Duration {
func getTaskManagerHeartBeatInterval() time.Duration {
failpoint.Inject("task-manager-heartbeat-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
})
return 2 * ttlTaskHeartBeatTickerInterval
return ttlTaskHeartBeatTickerInterval
}

func getCheckJobTriggeredInterval() time.Duration {
Expand All @@ -100,6 +107,13 @@ func getCheckJobTriggeredInterval() time.Duration {
return 2 * time.Second
}

func getTTLGCInterval() time.Duration {
failpoint.Inject("gc-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
})
return ttlGCInterval
}

func getScanSplitCnt(store kv.Storage) int {
tikvStore, ok := store.(tikv.Storage)
if !ok {
Expand Down
16 changes: 10 additions & 6 deletions pkg/ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ const taskGCTemplate = `DELETE task FROM
const ttlJobHistoryGCTemplate = `DELETE FROM mysql.tidb_ttl_job_history WHERE create_time < CURDATE() - INTERVAL 90 DAY`
const ttlTableStatusGCWithoutIDTemplate = `DELETE FROM mysql.tidb_ttl_table_status WHERE (current_job_status IS NULL OR current_job_owner_hb_time < %?)`

const timeFormat = time.DateTime
var timeFormat = time.DateTime

func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) (string, []any) {
return insertNewTableIntoStatusTemplate, []any{tableID, parentTableID}
Expand All @@ -86,7 +86,7 @@ func gcTTLTableStatusGCSQL(existIDs []int64, now time.Time) (string, []any) {
existIDStrs = append(existIDStrs, strconv.Itoa(int(id)))
}

hbExpireTime := now.Add(-jobManagerLoopTickerInterval * 2)
hbExpireTime := now.Add(-getHeartbeatInterval() * 2)
args := []any{hbExpireTime.Format(timeFormat)}
if len(existIDStrs) > 0 {
return ttlTableStatusGCWithoutIDTemplate + fmt.Sprintf(` AND table_id NOT IN (%s)`, strings.Join(existIDStrs, ",")), args
Expand Down Expand Up @@ -137,6 +137,10 @@ func NewJobManager(id string, sessPool util.SessionPool, store kv.Storage, etcdC

manager.init(manager.jobLoop)
manager.ctx = logutil.WithKeyValue(manager.ctx, "ttl-worker", "job-manager")
if intest.InTest {
// in test environment, in the same log there will be multiple ttl managers, so we need to distinguish them
manager.ctx = logutil.WithKeyValue(manager.ctx, "ttl-worker", id)
}

manager.infoSchemaCache = cache.NewInfoSchemaCache(getUpdateInfoSchemaCacheInterval())
manager.tableStatusCache = cache.NewTableStatusCache(getUpdateTTLTableStatusCacheInterval())
Expand Down Expand Up @@ -181,15 +185,15 @@ func (m *JobManager) jobLoop() error {
infoSchemaCacheUpdateTicker := time.Tick(m.infoSchemaCache.GetInterval())
tableStatusCacheUpdateTicker := time.Tick(m.tableStatusCache.GetInterval())
resizeWorkersTicker := time.Tick(getResizeWorkersInterval())
gcTicker := time.Tick(ttlGCInterval)
gcTicker := time.Tick(getTTLGCInterval())

scheduleJobTicker := time.Tick(getCheckJobInterval())
jobCheckTicker := time.Tick(getCheckJobInterval())
updateJobHeartBeatTicker := time.Tick(jobManagerLoopTickerInterval)
updateJobHeartBeatTicker := time.Tick(getHeartbeatInterval())
timerTicker := time.Tick(getJobManagerLoopSyncTimerInterval())

scheduleTaskTicker := time.Tick(getTaskManagerLoopTickerInterval())
updateTaskHeartBeatTicker := time.Tick(ttlTaskHeartBeatTickerInterval)
updateTaskHeartBeatTicker := time.Tick(getTaskManagerHeartBeatInterval())
taskCheckTicker := time.Tick(getTaskManagerLoopCheckTaskInterval())
checkScanTaskFinishedTicker := time.Tick(getTaskManagerLoopTickerInterval())

Expand Down Expand Up @@ -732,7 +736,7 @@ func (m *JobManager) couldLockJob(tableStatus *cache.TableStatus, table *cache.P
hbTime := tableStatus.CurrentJobOwnerHBTime
// jobManagerLoopTickerInterval is used to do heartbeat periodically.
// Use twice the time to detect the heartbeat timeout.
hbTimeout := jobManagerLoopTickerInterval * 2
hbTimeout := getHeartbeatInterval() * 2
if interval := getUpdateTTLTableStatusCacheInterval() * 2; interval > hbTimeout {
// tableStatus is get from the cache which may contain stale data.
// So if cache update interval > heartbeat interval, use the cache update interval instead.
Expand Down
177 changes: 177 additions & 0 deletions pkg/ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"fmt"
"math/rand/v2"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -1619,3 +1620,179 @@ func TestJobHeartBeatFailNotBlockOthers(t *testing.T) {
fmt.Sprintf("%d %s", testTable1.Meta().ID, now.Add(-2*time.Hour).In(tkTZ).Format(time.DateTime)),
fmt.Sprintf("%d %s", testTable2.Meta().ID, now.In(tkTZ).Format(time.DateTime))))
}

var _ fault = &faultWithProbability{}

type faultWithProbability struct {
percent float64
}

func (f *faultWithProbability) shouldFault(sql string) bool {
return rand.Float64() < f.percent
}

func newFaultWithProbability(percent float64) *faultWithProbability {
return &faultWithProbability{percent: percent}
}

func accelerateHeartBeat(t *testing.T, tk *testkit.TestKit) func() {
tk.MustExec("ALTER TABLE mysql.tidb_ttl_table_status MODIFY COLUMN current_job_owner_hb_time TIMESTAMP(6)")
tk.MustExec("ALTER TABLE mysql.tidb_ttl_task MODIFY COLUMN owner_hb_time TIMESTAMP(6)")
ttlworker.SetTimeFormat(time.DateTime + ".999999")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/heartbeat-interval", fmt.Sprintf("return(%d)", time.Millisecond*100)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/task-manager-heartbeat-interval", fmt.Sprintf("return(%d)", time.Millisecond*100)))
return func() {
tk.MustExec("ALTER TABLE mysql.tidb_ttl_table_status MODIFY COLUMN current_job_owner_hb_time TIMESTAMP")
tk.MustExec("ALTER TABLE mysql.tidb_ttl_task MODIFY COLUMN owner_hb_time TIMESTAMP")
ttlworker.SetTimeFormat(time.DateTime)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/heartbeat-interval"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/task-manager-heartbeat-interval"))
}
}

func TestJobManagerWithFault(t *testing.T) {
// TODO: add a flag `-long` to enable this test
t.Skip("skip this test because it'll need to run for a long time")

defer boostJobScheduleForTest(t)()

store, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)

tk := testkit.NewTestKit(t, store)
defer accelerateHeartBeat(t, tk)()
tk.MustExec("set @@global.tidb_ttl_running_tasks=32")

managerCount := 20
testDuration := 10 * time.Minute
faultPercent := 0.5

leader := atomic.NewString("")
isLeaderFactory := func(id string) func() bool {
return func() bool {
return leader.Load() == id
}
}

type managerWithPool struct {
m *ttlworker.JobManager
pool util.SessionPool
}
managers := make([]managerWithPool, 0, managerCount)
for i := 0; i < managerCount; i++ {
pool := wrapPoolForTest(dom.SysSessionPool())
faultPool := newFaultSessionPool(pool)

id := fmt.Sprintf("test-ttl-job-manager-%d", i)
m := ttlworker.NewJobManager(id, faultPool, store, nil, isLeaderFactory(id))
managers = append(managers, managerWithPool{
m: m,
pool: faultPool,
})

m.Start()
}

stopTestCh := make(chan struct{})
wg := &sync.WaitGroup{}
wg.Add(1)

fault := newFaultWithFilter(func(sql string) bool {
// skip some local only sql, ref `getSession()` in `session.go`
if strings.HasPrefix(sql, "set tidb_") || strings.HasPrefix(sql, "set @@") ||
strings.ToUpper(sql) == "COMMIT" || strings.ToUpper(sql) == "ROLLBACK" {
return false
}

return true
}, newFaultWithProbability(faultPercent))
go func() {
defer wg.Done()

faultTicker := time.NewTicker(time.Second)
for {
select {
case <-stopTestCh:
// Recover all sessions
for _, m := range managers {
m.pool.(*faultSessionPool).setFault(nil)
}

return
case <-faultTicker.C:
// Recover all sessions
for _, m := range managers {
m.pool.(*faultSessionPool).setFault(nil)
}

faultCount := rand.Int() % managerCount
logutil.BgLogger().Info("inject fault", zap.Int("faultCount", faultCount))
rand.Shuffle(managerCount, func(i, j int) {
managers[i], managers[j] = managers[j], managers[i]
})
// the first non-faultt manager is the leader
leader.Store(managers[faultCount].m.ID())
logutil.BgLogger().Info("set leader", zap.String("leader", leader.Load()))
for i := 0; i < faultCount; i++ {
m := managers[i]
logutil.BgLogger().Info("inject fault", zap.String("id", m.m.ID()))
m.pool.(*faultSessionPool).setFault(fault)
}
}
}
}()

// run the workload goroutine
testStart := time.Now()
for time.Since(testStart) < testDuration {
// create a new table
tk.MustExec("use test")
tk.MustExec("DROP TABLE if exists t")
tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR TTL_ENABLE='OFF'")
tbl, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t"))
require.NoError(t, err)
logutil.BgLogger().Info("create table", zap.Int64("table_id", tbl.Meta().ID))

// insert some data
for i := 0; i < 5; i++ {
tk.MustExec(fmt.Sprintf("INSERT INTO t VALUES (%d, '%s')", i, time.Now().Add(-time.Hour*2).Format(time.DateTime)))
}
for i := 0; i < 5; i++ {
tk.MustExec(fmt.Sprintf("INSERT INTO t VALUES (%d, '%s')", i+5, time.Now().Format(time.DateTime)))
}

tk.MustExec("ALTER TABLE t TTL_ENABLE='ON'")

start := time.Now()
require.Eventually(t, func() bool {
rows := tk.MustQuery("SELECT COUNT(*) FROM t").Rows()
if len(rows) == 1 && rows[0][0].(string) == "5" {
return true
}

logutil.BgLogger().Info("get row count", zap.String("count", rows[0][0].(string)))
return false
}, time.Second*5, time.Millisecond*100)

require.Eventually(t, func() bool {
rows := tk.MustQuery("SELECT current_job_state FROM mysql.tidb_ttl_table_status").Rows()
if len(rows) == 1 && rows[0][0].(string) == "<nil>" {
return true
}

tableStatus := tk.MustQuery("SELECT * FROM mysql.tidb_ttl_table_status").String()
logutil.BgLogger().Info("get job state", zap.String("tidb_ttl_table_status", tableStatus))
return false
}, time.Second*5, time.Millisecond*100)

logutil.BgLogger().Info("finish workload", zap.Duration("duration", time.Since(start)))
}

logutil.BgLogger().Info("test finished")
stopTestCh <- struct{}{}
close(stopTestCh)

wg.Wait()
}
12 changes: 12 additions & 0 deletions pkg/ttl/ttlworker/job_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,11 @@ func (m *JobManager) ReportMetrics(se session.Session) {
m.reportMetrics(se)
}

// ID returns the id of JobManager
func (m *JobManager) ID() string {
return m.id
}

// CheckFinishedJob is an exported version of checkFinishedJob
func (m *JobManager) CheckFinishedJob(se session.Session) {
m.checkFinishedJob(se)
Expand Down Expand Up @@ -695,3 +700,10 @@ func TestSplitCnt(t *testing.T) {
}
}
}

// SetTimeFormat sets the time format used by the test.
// Some tests require a greater precision than the default time format. We don't change it globally to avoid potential compatibility issues.
// Therefore, the format for most tests are also not changed, to make sure the tests can represent the real-world scenarios.
func SetTimeFormat(format string) {
timeFormat = format
}
Loading