Skip to content

Commit

Permalink
add integration test with fault for TTL
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Keao <yangkeao@chunibyo.icu>
  • Loading branch information
YangKeao committed Dec 9, 2024
1 parent 0ffac36 commit c943f3b
Show file tree
Hide file tree
Showing 7 changed files with 251 additions and 6 deletions.
1 change: 1 addition & 0 deletions pkg/meta/model/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"//pkg/planner/cascades/base",
"//pkg/util/intest",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_tikv_pd_client//http",
"@org_uber_go_atomic//:atomic",
],
Expand Down
5 changes: 5 additions & 0 deletions pkg/meta/model/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"
"unsafe"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/parser/auth"
"github.com/pingcap/tidb/pkg/parser/duration"
"github.com/pingcap/tidb/pkg/parser/model"
Expand Down Expand Up @@ -1351,6 +1352,10 @@ func (t *TTLInfo) Clone() *TTLInfo {
// Didn't set TTL_JOB_INTERVAL during upgrade and bootstrap because setting default value here is much simpler
// and could avoid bugs blocking users from upgrading or bootstrapping the cluster.
func (t *TTLInfo) GetJobInterval() (time.Duration, error) {
failpoint.Inject("overwrite-ttl-job-interval", func(val failpoint.Value) (time.Duration, error) {
return time.Duration(val.(int)), nil
})

if len(t.JobInterval) == 0 {
// This only happens when the table is created from 6.5 in which the `tidb_job_interval` is not introduced yet.
// We use `OldDefaultTTLJobInterval` as the return value to ensure a consistent behavior for the
Expand Down
1 change: 1 addition & 0 deletions pkg/ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ go_test(
"//pkg/util/chunk",
"//pkg/util/logutil",
"//pkg/util/mock",
"//pkg/util/sqlexec",
"//pkg/util/timeutil",
"@com_github_google_uuid//:uuid",
"@com_github_ngaut_pools//:pools",
Expand Down
7 changes: 7 additions & 0 deletions pkg/ttl/ttlworker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ func getCheckJobTriggeredInterval() time.Duration {
return 2 * time.Second
}

func getTTLGCInterval() time.Duration {
failpoint.Inject("gc-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
})
return ttlGCInterval
}

func getScanSplitCnt(store kv.Storage) int {
tikvStore, ok := store.(tikv.Storage)
if !ok {
Expand Down
12 changes: 7 additions & 5 deletions pkg/ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (m *JobManager) jobLoop() error {
infoSchemaCacheUpdateTicker := time.Tick(m.infoSchemaCache.GetInterval())
tableStatusCacheUpdateTicker := time.Tick(m.tableStatusCache.GetInterval())
resizeWorkersTicker := time.Tick(getResizeWorkersInterval())
gcTicker := time.Tick(ttlGCInterval)
gcTicker := time.Tick(getTTLGCInterval())

scheduleJobTicker := time.Tick(getCheckJobInterval())
jobCheckTicker := time.Tick(getCheckJobInterval())
Expand Down Expand Up @@ -572,12 +572,14 @@ j:
}
}

func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
func (m *JobManager) rescheduleJobs(se session.Session, nowInSessionTz time.Time) {
var now time.Time
tz, err := se.GlobalTimeZone(m.ctx)
if err != nil {
terror.Log(err)
now = nowInSessionTz
} else {
now = now.In(tz)
now = nowInSessionTz.In(tz)
}

// Try to lock HB timeout jobs, to avoid the case that when the `tidb_ttl_job_enable = 'OFF'`, the HB timeout job will
Expand Down Expand Up @@ -612,7 +614,7 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
if err != nil {
logutil.Logger(m.ctx).Warn("fail to summarize job", zap.Error(err))
}
err = job.finish(se, now, summary)
err = job.finish(se, nowInSessionTz, summary)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err))
continue
Expand All @@ -636,7 +638,7 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
if err != nil {
logutil.Logger(m.ctx).Warn("fail to summarize job", zap.Error(err))
}
err = job.finish(se, now, summary)
err = job.finish(se, nowInSessionTz, summary)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to finish job", zap.Error(err))
continue
Expand Down
229 changes: 229 additions & 0 deletions pkg/ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"fmt"
"math/rand/v2"
"strconv"
"strings"
"sync"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/ast"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
dbsession "github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/testkit"
Expand All @@ -47,6 +49,7 @@ import (
"github.com/pingcap/tidb/pkg/ttl/ttlworker"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/sqlexec"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
Expand Down Expand Up @@ -1453,6 +1456,8 @@ func boostJobScheduleForTest(t *testing.T) func() {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/update-info-schema-cache-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/task-manager-loop-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-task-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/gc-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/meta/model/overwrite-ttl-job-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond)))

return func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-job-triggered-interval"))
Expand All @@ -1463,6 +1468,8 @@ func boostJobScheduleForTest(t *testing.T) func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/update-info-schema-cache-interval"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/task-manager-loop-interval"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-task-interval"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/gc-interval"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/meta/model/overwrite-ttl-job-interval"))
}
}

Expand Down Expand Up @@ -1614,3 +1621,225 @@ func TestJobHeartBeatFailNotBlockOthers(t *testing.T) {
fmt.Sprintf("%d %s", testTable1.Meta().ID, now.Add(-2*time.Hour).Format(time.DateTime)),
fmt.Sprintf("%d %s", testTable2.Meta().ID, now.Format(time.DateTime))))
}

var _ sqlexec.SQLExecutor = &sessionWithFault{}

type sessionWithFault struct {
sessionctx.Context

fault *atomic.Bool
faultPercent float64
}

// Close implements pools.Resource
func (s *sessionWithFault) Close() {
s.Context.(pools.Resource).Close()
}

// GetSQLExecutor implements sessionctx.Context.
func (s *sessionWithFault) GetSQLExecutor() sqlexec.SQLExecutor {
return s
}

// Execute implements sqlexec.SQLExecutor.
func (s *sessionWithFault) Execute(ctx context.Context, sql string) ([]sqlexec.RecordSet, error) {
if s.fault.Load() && s.shouldFault(sql) {
return nil, errors.New("fault in test")
}
return s.Context.GetSQLExecutor().Execute(ctx, sql)
}

// ExecuteStmt implements sqlexec.SQLExecutor.
func (s *sessionWithFault) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlexec.RecordSet, error) {
// TODO: assert `shouldFault` according to the `stmtNode`. It's not used in TTL so it's fine to keep it now.
if s.fault.Load() && s.shouldFault(stmtNode.Text()) {
return nil, errors.New("fault in test")
}
return s.Context.GetSQLExecutor().ExecuteStmt(ctx, stmtNode)
}

// ExecuteInternal implements sqlexec.SQLExecutor.
func (s *sessionWithFault) ExecuteInternal(ctx context.Context, sql string, args ...any) (sqlexec.RecordSet, error) {
if s.fault.Load() && s.shouldFault(sql) {
return nil, errors.New("fault in test")
}
return s.Context.GetSQLExecutor().ExecuteInternal(ctx, sql, args...)
}

func (s *sessionWithFault) shouldFault(sql string) bool {
// skip some local only sql, ref `getSession()` in `session.go`
if strings.HasPrefix(sql, "set tidb_") || strings.HasPrefix(sql, "set @@") {
return false
}

return rand.Float64() < s.faultPercent
}

var _ util.SessionPool = &faultSessionPool{}

type faultSessionPool struct {
util.SessionPool

fault *atomic.Bool
faultPercent float64
}

func newFaultSessionPool(sp util.SessionPool, percent float64) *faultSessionPool {
return &faultSessionPool{
SessionPool: sp,
fault: atomic.NewBool(false),
faultPercent: percent,
}
}

// Get implements util.SessionPool.
func (f *faultSessionPool) Get() (pools.Resource, error) {
resource, err := f.SessionPool.Get()
if err != nil {
return nil, err
}

return &sessionWithFault{
Context: resource.(sessionctx.Context),
fault: f.fault,
faultPercent: f.faultPercent,
}, nil
}

// Put implements util.SessionPool.
func (f *faultSessionPool) Put(se pools.Resource) {
f.SessionPool.Put(se.(*sessionWithFault).Context.(pools.Resource))
}

// SetFault sets whether to enable or disable the fault injection
func (f *faultSessionPool) SetFault(on bool) {
f.fault.Store(on)
}

func TestGetSessionWithFaultSession(t *testing.T) {
_, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)

pool := wrapPoolForTest(dom.SysSessionPool())
// defer pool.AssertNoSessionInUse(t)
faultPool := newFaultSessionPool(pool, 1)

se, err := ttlworker.GetSessionForTest(faultPool)
require.True(t, se != nil || err != nil)

faultPool.SetFault(true)
se, err = ttlworker.GetSessionForTest(faultPool)
require.True(t, se != nil || err != nil)
}

func TestJobManagerWithFault(t *testing.T) {
boostJobScheduleForTest(t)

store, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)

managerCount := 5
testDuration := 10 * time.Second
faultPercent := 0.5

leader := atomic.NewInt32(0)
isLeaderFactory := func(id int) func() bool {
return func() bool {
return leader.Load() == int32(id)
}
}

type managerWithPool struct {
m *ttlworker.JobManager
pool util.SessionPool
}
managers := make([]managerWithPool, 0, managerCount)
for i := 0; i < managerCount; i++ {
pool := wrapPoolForTest(dom.SysSessionPool())
// defer pool.AssertNoSessionInUse(t)
faultPool := newFaultSessionPool(pool, faultPercent)

m := ttlworker.NewJobManager(fmt.Sprintf("test-ttl-job-manager-%d", i), faultPool, store, nil, isLeaderFactory(i))
managers = append(managers, managerWithPool{
m: m,
pool: faultPool,
})

m.Start()
}

stopTestCh := make(chan struct{})
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()

faultTicker := time.NewTicker(time.Millisecond * 200)
for {
select {
case <-stopTestCh:
return
case <-faultTicker.C:
// Recover all sessions
for _, m := range managers {
m.pool.(*faultSessionPool).SetFault(false)
}

faultCount := rand.Int() % managerCount
logutil.BgLogger().Info("inject fault", zap.Int("faultCount", faultCount))
rand.Shuffle(managerCount, func(i, j int) {
managers[i], managers[j] = managers[j], managers[i]
})
// the first non-fault manager is the leader
leader.Store(int32(faultCount))
for i := 0; i < faultCount; i++ {
m := managers[i]
m.pool.(*faultSessionPool).SetFault(true)
}
}
}
}()

// run the workload goroutine
timer := time.After(testDuration)
outerLoop:
for {
select {
case <-timer:
break outerLoop
default:
// create a new table
tk.MustExec("use test")
tk.MustExec("DROP TABLE if exists t")
tk.MustExec("CREATE TABLE t (id INT PRIMARY KEY, created_at DATETIME) TTL = created_at + INTERVAL 1 HOUR TTL_ENABLE='OFF'")

// insert some data
for i := 0; i < 500; i++ {
tk.MustExec(fmt.Sprintf("INSERT INTO t VALUES (%d, '%s')", i, time.Now().Add(-time.Hour*2).Format(time.DateTime)))
}
for i := 0; i < 500; i++ {
tk.MustExec(fmt.Sprintf("INSERT INTO t VALUES (%d, '%s')", i+500, time.Now().Format(time.DateTime)))
}

tk.MustExec("ALTER TABLE t TTL_ENABLE='ON'")

start := time.Now()
require.Eventually(t, func() bool {
rows := tk.MustQuery("SELECT COUNT(*) FROM t").Rows()
return len(rows) == 1 && rows[0][0].(string) == "500"
}, time.Second*2, time.Millisecond)

require.Eventually(t, func() bool {
rows := tk.MustQuery("SELECT current_job_state FROM mysql.tidb_ttl_table_status").Rows()
return len(rows) == 1 && rows[0][0].(string) == "<nil>"
}, time.Second*2, time.Millisecond)

logutil.BgLogger().Info("finish workload", zap.Duration("duration", time.Since(start)))
}
}

close(stopTestCh)

wg.Wait()
}
2 changes: 1 addition & 1 deletion pkg/ttl/ttlworker/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func getSession(pool util.SessionPool) (session.Session, error) {
originalIsolationReadEngines, restoreIsolationReadEngines := "", false

se := session.NewSession(sctx, exec, func(se session.Session) {
_, err = se.ExecuteSQL(context.Background(), fmt.Sprintf("set tidb_retry_limit=%d", originalRetryLimit))
_, err := se.ExecuteSQL(context.Background(), fmt.Sprintf("set tidb_retry_limit=%d", originalRetryLimit))
if err != nil {
intest.AssertNoError(err)
logutil.BgLogger().Error("fail to reset tidb_retry_limit", zap.Int64("originalRetryLimit", originalRetryLimit), zap.Error(err))
Expand Down

0 comments on commit c943f3b

Please sign in to comment.