diff --git a/pkg/ttl/ttlworker/BUILD.bazel b/pkg/ttl/ttlworker/BUILD.bazel index 59aa1033e5e5c..9e2eed2ac0a79 100644 --- a/pkg/ttl/ttlworker/BUILD.bazel +++ b/pkg/ttl/ttlworker/BUILD.bazel @@ -62,6 +62,7 @@ go_test( "job_manager_integration_test.go", "job_manager_test.go", "scan_test.go", + "session_integration_test.go", "session_test.go", "task_manager_integration_test.go", "task_manager_test.go", @@ -83,7 +84,6 @@ go_test( "//pkg/parser/ast", "//pkg/parser/model", "//pkg/parser/mysql", - "//pkg/session", "//pkg/sessionctx", "//pkg/sessionctx/variable", "//pkg/statistics", @@ -101,6 +101,7 @@ go_test( "//pkg/util/chunk", "//pkg/util/logutil", "//pkg/util/mock", + "//pkg/util/sqlexec", "//pkg/util/timeutil", "@com_github_google_uuid//:uuid", "@com_github_ngaut_pools//:pools", diff --git a/pkg/ttl/ttlworker/job_manager.go b/pkg/ttl/ttlworker/job_manager.go index f9b6d1e1c355d..2054b717bee4f 100644 --- a/pkg/ttl/ttlworker/job_manager.go +++ b/pkg/ttl/ttlworker/job_manager.go @@ -370,13 +370,7 @@ func (m *JobManager) triggerTTLJob(requestID string, cmd *client.TriggerNewTTLJo return } - tz, err := se.GlobalTimeZone(m.ctx) - if err != nil { - responseErr(err) - return - } - - if !timeutil.WithinDayTimePeriod(variable.TTLJobScheduleWindowStartTime.Load(), variable.TTLJobScheduleWindowEndTime.Load(), se.Now().In(tz)) { + if !timeutil.WithinDayTimePeriod(variable.TTLJobScheduleWindowStartTime.Load(), variable.TTLJobScheduleWindowEndTime.Load(), se.Now()) { responseErr(errors.New("not in TTL job window")) return } @@ -579,13 +573,6 @@ j: } func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) { - tz, err := se.GlobalTimeZone(m.ctx) - if err != nil { - terror.Log(err) - } else { - now = now.In(tz) - } - // Try to lock HB timeout jobs, to avoid the case that when the `tidb_ttl_job_enable = 'OFF'`, the HB timeout job will // never be cancelled. jobTables := m.readyForLockHBTimeoutJobTables(now) diff --git a/pkg/ttl/ttlworker/job_manager_integration_test.go b/pkg/ttl/ttlworker/job_manager_integration_test.go index 0bd260077122b..90c2297dd320c 100644 --- a/pkg/ttl/ttlworker/job_manager_integration_test.go +++ b/pkg/ttl/ttlworker/job_manager_integration_test.go @@ -33,7 +33,6 @@ import ( "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser/ast" pmodel "github.com/pingcap/tidb/pkg/parser/model" - dbsession "github.com/pingcap/tidb/pkg/session" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/testkit" @@ -53,15 +52,11 @@ import ( "go.uber.org/zap" ) -func sessionFactory(t *testing.T, store kv.Storage) func() session.Session { - return func() session.Session { - dbSession, err := dbsession.CreateSession4Test(store) - require.NoError(t, err) - se := session.NewSession(dbSession, dbSession, nil) +func sessionFactory(t *testing.T, dom *domain.Domain) func() session.Session { + pool := dom.SysSessionPool() - _, err = se.ExecuteSQL(context.Background(), "ROLLBACK") - require.NoError(t, err) - _, err = se.ExecuteSQL(context.Background(), "set tidb_retry_limit=0") + return func() session.Session { + se, err := ttlworker.GetSessionForTest(pool) require.NoError(t, err) return se @@ -112,7 +107,7 @@ func TestParallelLockNewJob(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) waitAndStopTTLManager(t, dom) - sessionFactory := sessionFactory(t, store) + sessionFactory := sessionFactory(t, dom) testTable := &cache.PhysicalTable{ID: 2, TableInfo: &model.TableInfo{ID: 1, TTLInfo: &model.TTLInfo{IntervalExprStr: "1", IntervalTimeUnit: int(ast.TimeUnitDay), JobInterval: "1h"}}} // simply lock a new job @@ -166,7 +161,7 @@ func TestFinishJob(t *testing.T) { waitAndStopTTLManager(t, dom) tk := testkit.NewTestKit(t, store) - sessionFactory := sessionFactory(t, store) + sessionFactory := sessionFactory(t, dom) testTable := &cache.PhysicalTable{ID: 2, Schema: pmodel.NewCIStr("db1"), TableInfo: &model.TableInfo{ID: 1, Name: pmodel.NewCIStr("t1"), TTLInfo: &model.TTLInfo{IntervalExprStr: "1", IntervalTimeUnit: int(ast.TimeUnitDay)}}} @@ -426,7 +421,7 @@ func TestTTLJobDisable(t *testing.T) { func TestSubmitJob(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) - sessionFactory := sessionFactory(t, store) + sessionFactory := sessionFactory(t, dom) waitAndStopTTLManager(t, dom) @@ -505,18 +500,18 @@ func TestSubmitJob(t *testing.T) { func TestRescheduleJobs(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) - sessionFactory := sessionFactory(t, store) + sessionFactory := sessionFactory(t, dom) waitAndStopTTLManager(t, dom) - now := time.Now() tk.MustExec("create table test.t (id int, created_at datetime) ttl = `created_at` + interval 1 minute ttl_job_interval = '1m'") table, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) require.NoError(t, err) ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnTTL) se := sessionFactory() - m := ttlworker.NewJobManager("manager-1", nil, store, nil, func() bool { + now := se.Now() + m := ttlworker.NewJobManager("manager-1", dom.SysSessionPool(), store, nil, func() bool { return true }) m.TaskManager().ResizeWorkersWithSysVar() @@ -556,18 +551,21 @@ func TestRescheduleJobs(t *testing.T) { // if the time leaves the time window, it'll finish the job tk.MustExec("set global tidb_ttl_job_schedule_window_start_time='23:58'") tk.MustExec("set global tidb_ttl_job_schedule_window_end_time='23:59'") - anotherManager.RescheduleJobs(se, time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, now.Nanosecond(), now.Location())) + rescheduleTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, now.Nanosecond(), now.Location()) + anotherManager.RescheduleJobs(se, rescheduleTime) + tkTZ := tk.Session().GetSessionVars().Location() tk.MustQuery("select last_job_summary->>'$.scan_task_err' from mysql.tidb_ttl_table_status").Check(testkit.Rows("out of TTL job schedule window")) + tk.MustQuery("select last_job_finish_time from mysql.tidb_ttl_table_status").Check(testkit.Rows(rescheduleTime.In(tkTZ).Format(time.DateTime))) } func TestRescheduleJobsAfterTableDropped(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) - sessionFactory := sessionFactory(t, store) + sessionFactory := sessionFactory(t, dom) waitAndStopTTLManager(t, dom) - now := time.Now() + now := time.Now().In(time.UTC) createTableSQL := "create table test.t (id int, created_at datetime) ttl = `created_at` + interval 1 minute ttl_job_interval = '1m'" tk.MustExec("create table test.t (id int, created_at datetime) ttl = `created_at` + interval 1 minute ttl_job_interval = '1m'") table, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) @@ -619,7 +617,7 @@ func TestRescheduleJobsAfterTableDropped(t *testing.T) { func TestJobTimeout(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) - sessionFactory := sessionFactory(t, store) + sessionFactory := sessionFactory(t, dom) waitAndStopTTLManager(t, dom) @@ -675,6 +673,8 @@ func TestJobTimeout(t *testing.T) { require.Equal(t, tableStatus.CurrentJobStartTime, newTableStatus.CurrentJobStartTime) require.Equal(t, tableStatus.CurrentJobTTLExpire, newTableStatus.CurrentJobTTLExpire) require.Equal(t, now.Unix(), newTableStatus.CurrentJobOwnerHBTime.Unix()) + // the `CurrentJobStatusUpdateTime` only has `s` precision, so use any format with only `s` precision and a TZ to compare. + require.Equal(t, now.Format(time.RFC3339), newTableStatus.CurrentJobStatusUpdateTime.Format(time.RFC3339)) // the timeout will be checked while updating heartbeat require.NoError(t, m2.UpdateHeartBeat(ctx, se, now.Add(7*time.Hour))) @@ -685,7 +685,7 @@ func TestJobTimeout(t *testing.T) { func TestTriggerScanTask(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) - sessionFactory := sessionFactory(t, store) + sessionFactory := sessionFactory(t, dom) se := sessionFactory() waitAndStopTTLManager(t, dom) @@ -862,7 +862,7 @@ func TestGCTTLHistory(t *testing.T) { func TestJobMetrics(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) - sessionFactory := sessionFactory(t, store) + sessionFactory := sessionFactory(t, dom) waitAndStopTTLManager(t, dom) @@ -1304,7 +1304,7 @@ func TestFinishAndUpdateOwnerAtSameTime(t *testing.T) { waitAndStopTTLManager(t, dom) tk := testkit.NewTestKit(t, store) - sessionFactory := sessionFactory(t, store) + sessionFactory := sessionFactory(t, dom) se := sessionFactory() tk.MustExec("use test") @@ -1349,7 +1349,7 @@ func TestFinishError(t *testing.T) { waitAndStopTTLManager(t, dom) tk := testkit.NewTestKit(t, store) - sessionFactory := sessionFactory(t, store) + sessionFactory := sessionFactory(t, dom) se := sessionFactory() tk.MustExec("use test") @@ -1447,6 +1447,8 @@ func boostJobScheduleForTest(t *testing.T) func() { require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/update-info-schema-cache-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond))) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/task-manager-loop-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond))) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-task-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/gc-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/meta/model/overwrite-ttl-job-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond))) return func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-job-triggered-interval")) @@ -1457,6 +1459,8 @@ func boostJobScheduleForTest(t *testing.T) func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/update-info-schema-cache-interval")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/task-manager-loop-interval")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-task-interval")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/gc-interval")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/meta/model/overwrite-ttl-job-interval")) } } @@ -1465,7 +1469,7 @@ func TestDisableTTLAfterLoseHeartbeat(t *testing.T) { waitAndStopTTLManager(t, dom) tk := testkit.NewTestKit(t, store) - sessionFactory := sessionFactory(t, store) + sessionFactory := sessionFactory(t, dom) se := sessionFactory() tk.MustExec("use test") diff --git a/pkg/ttl/ttlworker/session.go b/pkg/ttl/ttlworker/session.go index 24396d91bc32d..9b294193df8ec 100644 --- a/pkg/ttl/ttlworker/session.go +++ b/pkg/ttl/ttlworker/session.go @@ -87,7 +87,7 @@ func getSession(pool util.SessionPool) (session.Session, error) { originalIsolationReadEngines, restoreIsolationReadEngines := "", false se := session.NewSession(sctx, exec, func(se session.Session) { - _, err = se.ExecuteSQL(context.Background(), fmt.Sprintf("set tidb_retry_limit=%d", originalRetryLimit)) + _, err := se.ExecuteSQL(context.Background(), fmt.Sprintf("set tidb_retry_limit=%d", originalRetryLimit)) if err != nil { intest.AssertNoError(err) logutil.BgLogger().Error("fail to reset tidb_retry_limit", zap.Int64("originalRetryLimit", originalRetryLimit), zap.Error(err)) diff --git a/pkg/ttl/ttlworker/session_integration_test.go b/pkg/ttl/ttlworker/session_integration_test.go new file mode 100644 index 0000000000000..c2842e303a17b --- /dev/null +++ b/pkg/ttl/ttlworker/session_integration_test.go @@ -0,0 +1,161 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttlworker_test + +import ( + "context" + "errors" + "strings" + "sync/atomic" + "testing" + + "github.com/ngaut/pools" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/ttl/ttlworker" + "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tidb/pkg/util/sqlexec" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +type fault interface { + // shouldFault returns whether the session should fault this time. + shouldFault() bool +} + +var _ fault = &faultAfterCount{} + +type faultAfterCount struct { + faultCount int + + currentCount int +} + +func (f *faultAfterCount) shouldFault() bool { + if f.currentCount >= f.faultCount { + return true + } + + f.currentCount++ + return false +} + +// sessionWithFault is a session which will fail to execute SQL after successfully executing several SQLs. It's designed +// to trigger every possible branch of returning error from `Execute` +type sessionWithFault struct { + sessionctx.Context + + fault *atomic.Pointer[fault] +} + +// Close implements pools.Resource +func (s *sessionWithFault) Close() { + s.Context.(pools.Resource).Close() +} + +// GetSQLExecutor implements sessionctx.Context. +func (s *sessionWithFault) GetSQLExecutor() sqlexec.SQLExecutor { + return s +} + +// Execute implements sqlexec.SQLExecutor. +func (s *sessionWithFault) Execute(ctx context.Context, sql string) ([]sqlexec.RecordSet, error) { + if s.shouldFault(sql) { + return nil, errors.New("fault in test") + } + return s.Context.GetSQLExecutor().Execute(ctx, sql) +} + +// ExecuteStmt implements sqlexec.SQLExecutor. +func (s *sessionWithFault) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlexec.RecordSet, error) { + if s.shouldFault(stmtNode.Text()) { + return nil, errors.New("fault in test") + } + return s.Context.GetSQLExecutor().ExecuteStmt(ctx, stmtNode) +} + +func (s *sessionWithFault) ExecuteInternal(ctx context.Context, sql string, args ...any) (sqlexec.RecordSet, error) { + if s.shouldFault(sql) { + return nil, errors.New("fault in test") + } + return s.Context.GetSQLExecutor().ExecuteInternal(ctx, sql, args...) +} + +func (s *sessionWithFault) shouldFault(sql string) bool { + if s.fault.Load() == nil { + return false + } + + // as a fault implementation may have side-effect, we should always call it before checking the SQL. + shouldFault := (*s.fault.Load()).shouldFault() + + // skip some local only sql, ref `getSession()` in `session.go` + if strings.HasPrefix(sql, "set tidb_") || strings.HasPrefix(sql, "set @@") { + return false + } + + return shouldFault +} + +type faultSessionPool struct { + util.SessionPool + + fault *atomic.Pointer[fault] +} + +func newFaultSessionPool(sp util.SessionPool) *faultSessionPool { + return &faultSessionPool{ + SessionPool: sp, + fault: &atomic.Pointer[fault]{}, + } +} + +// Get implements util.SessionPool. +func (f *faultSessionPool) Get() (pools.Resource, error) { + resource, err := f.SessionPool.Get() + if err != nil { + return nil, err + } + + return &sessionWithFault{ + Context: resource.(sessionctx.Context), + fault: f.fault, + }, nil +} + +// Put implements util.SessionPool. +func (f *faultSessionPool) Put(se pools.Resource) { + f.SessionPool.Put(se.(*sessionWithFault).Context.(pools.Resource)) +} + +func (f *faultSessionPool) setFault(ft fault) { + f.fault.Store(&ft) +} + +func TestGetSessionWithFault(t *testing.T) { + _, dom := testkit.CreateMockStoreAndDomain(t) + + pool := newFaultSessionPool(dom.SysSessionPool()) + + for i := 0; i < 50; i++ { + pool.setFault(&faultAfterCount{faultCount: i}) + se, err := ttlworker.GetSessionForTest(pool) + logutil.BgLogger().Info("get session", zap.Int("error after count", i), zap.Bool("session is nil", se == nil), zap.Bool("error is nil", err == nil)) + require.True(t, se != nil || err != nil) + } +} diff --git a/pkg/ttl/ttlworker/task_manager_integration_test.go b/pkg/ttl/ttlworker/task_manager_integration_test.go index e88332198187e..645a2f103adbf 100644 --- a/pkg/ttl/ttlworker/task_manager_integration_test.go +++ b/pkg/ttl/ttlworker/task_manager_integration_test.go @@ -38,7 +38,7 @@ import ( ) func TestParallelLockNewTask(t *testing.T) { - store := testkit.CreateMockStore(t) + store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) tk.MustExec("set global tidb_ttl_running_tasks = 1000") ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnTTL) @@ -46,7 +46,7 @@ func TestParallelLockNewTask(t *testing.T) { testTable, err := tk.Session().GetDomainInfoSchema().(infoschema.InfoSchema).TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) require.NoError(t, err) - sessionFactory := sessionFactory(t, store) + sessionFactory := sessionFactory(t, dom) se := sessionFactory() now := se.Now() @@ -117,7 +117,7 @@ func TestParallelSchedule(t *testing.T) { waitAndStopTTLManager(t, dom) tk := testkit.NewTestKit(t, store) tk.MustExec("set global tidb_ttl_running_tasks = 1000") - sessionFactory := sessionFactory(t, store) + sessionFactory := sessionFactory(t, dom) tk.MustExec("create table test.t(id int, created_at datetime) ttl=created_at + interval 1 day") table, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) @@ -175,7 +175,7 @@ func TestTaskScheduleExpireHeartBeat(t *testing.T) { waitAndStopTTLManager(t, dom) tk := testkit.NewTestKit(t, store) tk.MustExec("set global tidb_ttl_running_tasks = 1000") - sessionFactory := sessionFactory(t, store) + sessionFactory := sessionFactory(t, dom) // create table and scan task tk.MustExec("create table test.t(id int, created_at datetime) ttl=created_at + interval 1 day") @@ -223,7 +223,7 @@ func TestTaskMetrics(t *testing.T) { waitAndStopTTLManager(t, dom) tk := testkit.NewTestKit(t, store) tk.MustExec("set global tidb_ttl_running_tasks = 1000") - sessionFactory := sessionFactory(t, store) + sessionFactory := sessionFactory(t, dom) // create table and scan task tk.MustExec("create table test.t(id int, created_at datetime) ttl=created_at + interval 1 day") @@ -258,7 +258,7 @@ func TestRescheduleWithError(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("set global tidb_ttl_running_tasks = 1000") - sessionFactory := sessionFactory(t, store) + sessionFactory := sessionFactory(t, dom) // insert a wrong scan task with random table id sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", 613, 1) tk.MustExec(sql) @@ -293,7 +293,7 @@ func TestTTLRunningTasksLimitation(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) waitAndStopTTLManager(t, dom) tk := testkit.NewTestKit(t, store) - sessionFactory := sessionFactory(t, store) + sessionFactory := sessionFactory(t, dom) tk.MustExec("set global tidb_ttl_running_tasks = 32") tk.MustExec("create table test.t(id int, created_at datetime) ttl=created_at + interval 1 day") @@ -363,7 +363,7 @@ func TestShrinkScanWorkerTimeout(t *testing.T) { defer pool.AssertNoSessionInUse(t) waitAndStopTTLManager(t, dom) tk := testkit.NewTestKit(t, store) - sessionFactory := sessionFactory(t, store) + sessionFactory := sessionFactory(t, dom) tk.MustExec("set global tidb_ttl_running_tasks = 32")