Skip to content

Commit

Permalink
ttl: fix the timezone issue and panic in the caller of getSession (#…
Browse files Browse the repository at this point in the history
…58166) (#58709)

close #58107, close #58109
  • Loading branch information
ti-chi-bot authored Jan 7, 2025
1 parent fe0bc60 commit d9c7e43
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 48 deletions.
3 changes: 2 additions & 1 deletion pkg/ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ go_test(
"job_manager_integration_test.go",
"job_manager_test.go",
"scan_test.go",
"session_integration_test.go",
"session_test.go",
"task_manager_integration_test.go",
"task_manager_test.go",
Expand All @@ -83,7 +84,6 @@ go_test(
"//pkg/parser/ast",
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/session",
"//pkg/sessionctx",
"//pkg/sessionctx/variable",
"//pkg/statistics",
Expand All @@ -101,6 +101,7 @@ go_test(
"//pkg/util/chunk",
"//pkg/util/logutil",
"//pkg/util/mock",
"//pkg/util/sqlexec",
"//pkg/util/timeutil",
"@com_github_google_uuid//:uuid",
"@com_github_ngaut_pools//:pools",
Expand Down
15 changes: 1 addition & 14 deletions pkg/ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,13 +370,7 @@ func (m *JobManager) triggerTTLJob(requestID string, cmd *client.TriggerNewTTLJo
return
}

tz, err := se.GlobalTimeZone(m.ctx)
if err != nil {
responseErr(err)
return
}

if !timeutil.WithinDayTimePeriod(variable.TTLJobScheduleWindowStartTime.Load(), variable.TTLJobScheduleWindowEndTime.Load(), se.Now().In(tz)) {
if !timeutil.WithinDayTimePeriod(variable.TTLJobScheduleWindowStartTime.Load(), variable.TTLJobScheduleWindowEndTime.Load(), se.Now()) {
responseErr(errors.New("not in TTL job window"))
return
}
Expand Down Expand Up @@ -579,13 +573,6 @@ j:
}

func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
tz, err := se.GlobalTimeZone(m.ctx)
if err != nil {
terror.Log(err)
} else {
now = now.In(tz)
}

// Try to lock HB timeout jobs, to avoid the case that when the `tidb_ttl_job_enable = 'OFF'`, the HB timeout job will
// never be cancelled.
jobTables := m.readyForLockHBTimeoutJobTables(now)
Expand Down
52 changes: 28 additions & 24 deletions pkg/ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
dbsession "github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/testkit"
Expand All @@ -53,15 +52,11 @@ import (
"go.uber.org/zap"
)

func sessionFactory(t *testing.T, store kv.Storage) func() session.Session {
return func() session.Session {
dbSession, err := dbsession.CreateSession4Test(store)
require.NoError(t, err)
se := session.NewSession(dbSession, dbSession, nil)
func sessionFactory(t *testing.T, dom *domain.Domain) func() session.Session {
pool := dom.SysSessionPool()

_, err = se.ExecuteSQL(context.Background(), "ROLLBACK")
require.NoError(t, err)
_, err = se.ExecuteSQL(context.Background(), "set tidb_retry_limit=0")
return func() session.Session {
se, err := ttlworker.GetSessionForTest(pool)
require.NoError(t, err)

return se
Expand Down Expand Up @@ -112,7 +107,7 @@ func TestParallelLockNewJob(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)

sessionFactory := sessionFactory(t, store)
sessionFactory := sessionFactory(t, dom)

testTable := &cache.PhysicalTable{ID: 2, TableInfo: &model.TableInfo{ID: 1, TTLInfo: &model.TTLInfo{IntervalExprStr: "1", IntervalTimeUnit: int(ast.TimeUnitDay), JobInterval: "1h"}}}
// simply lock a new job
Expand Down Expand Up @@ -166,7 +161,7 @@ func TestFinishJob(t *testing.T) {
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)

sessionFactory := sessionFactory(t, store)
sessionFactory := sessionFactory(t, dom)

testTable := &cache.PhysicalTable{ID: 2, Schema: pmodel.NewCIStr("db1"), TableInfo: &model.TableInfo{ID: 1, Name: pmodel.NewCIStr("t1"), TTLInfo: &model.TTLInfo{IntervalExprStr: "1", IntervalTimeUnit: int(ast.TimeUnitDay)}}}

Expand Down Expand Up @@ -426,7 +421,7 @@ func TestTTLJobDisable(t *testing.T) {
func TestSubmitJob(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
sessionFactory := sessionFactory(t, store)
sessionFactory := sessionFactory(t, dom)

waitAndStopTTLManager(t, dom)

Expand Down Expand Up @@ -505,18 +500,18 @@ func TestSubmitJob(t *testing.T) {
func TestRescheduleJobs(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
sessionFactory := sessionFactory(t, store)
sessionFactory := sessionFactory(t, dom)

waitAndStopTTLManager(t, dom)

now := time.Now()
tk.MustExec("create table test.t (id int, created_at datetime) ttl = `created_at` + interval 1 minute ttl_job_interval = '1m'")
table, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t"))
require.NoError(t, err)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnTTL)

se := sessionFactory()
m := ttlworker.NewJobManager("manager-1", nil, store, nil, func() bool {
now := se.Now()
m := ttlworker.NewJobManager("manager-1", dom.SysSessionPool(), store, nil, func() bool {
return true
})
m.TaskManager().ResizeWorkersWithSysVar()
Expand Down Expand Up @@ -556,18 +551,21 @@ func TestRescheduleJobs(t *testing.T) {
// if the time leaves the time window, it'll finish the job
tk.MustExec("set global tidb_ttl_job_schedule_window_start_time='23:58'")
tk.MustExec("set global tidb_ttl_job_schedule_window_end_time='23:59'")
anotherManager.RescheduleJobs(se, time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, now.Nanosecond(), now.Location()))
rescheduleTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, now.Nanosecond(), now.Location())
anotherManager.RescheduleJobs(se, rescheduleTime)
tkTZ := tk.Session().GetSessionVars().Location()
tk.MustQuery("select last_job_summary->>'$.scan_task_err' from mysql.tidb_ttl_table_status").Check(testkit.Rows("out of TTL job schedule window"))
tk.MustQuery("select last_job_finish_time from mysql.tidb_ttl_table_status").Check(testkit.Rows(rescheduleTime.In(tkTZ).Format(time.DateTime)))
}

func TestRescheduleJobsAfterTableDropped(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
sessionFactory := sessionFactory(t, store)
sessionFactory := sessionFactory(t, dom)

waitAndStopTTLManager(t, dom)

now := time.Now()
now := time.Now().In(time.UTC)
createTableSQL := "create table test.t (id int, created_at datetime) ttl = `created_at` + interval 1 minute ttl_job_interval = '1m'"
tk.MustExec("create table test.t (id int, created_at datetime) ttl = `created_at` + interval 1 minute ttl_job_interval = '1m'")
table, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t"))
Expand Down Expand Up @@ -619,7 +617,7 @@ func TestRescheduleJobsAfterTableDropped(t *testing.T) {
func TestJobTimeout(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
sessionFactory := sessionFactory(t, store)
sessionFactory := sessionFactory(t, dom)

waitAndStopTTLManager(t, dom)

Expand Down Expand Up @@ -675,6 +673,8 @@ func TestJobTimeout(t *testing.T) {
require.Equal(t, tableStatus.CurrentJobStartTime, newTableStatus.CurrentJobStartTime)
require.Equal(t, tableStatus.CurrentJobTTLExpire, newTableStatus.CurrentJobTTLExpire)
require.Equal(t, now.Unix(), newTableStatus.CurrentJobOwnerHBTime.Unix())
// the `CurrentJobStatusUpdateTime` only has `s` precision, so use any format with only `s` precision and a TZ to compare.
require.Equal(t, now.Format(time.RFC3339), newTableStatus.CurrentJobStatusUpdateTime.Format(time.RFC3339))

// the timeout will be checked while updating heartbeat
require.NoError(t, m2.UpdateHeartBeat(ctx, se, now.Add(7*time.Hour)))
Expand All @@ -685,7 +685,7 @@ func TestJobTimeout(t *testing.T) {
func TestTriggerScanTask(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
sessionFactory := sessionFactory(t, store)
sessionFactory := sessionFactory(t, dom)
se := sessionFactory()

waitAndStopTTLManager(t, dom)
Expand Down Expand Up @@ -862,7 +862,7 @@ func TestGCTTLHistory(t *testing.T) {
func TestJobMetrics(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
sessionFactory := sessionFactory(t, store)
sessionFactory := sessionFactory(t, dom)

waitAndStopTTLManager(t, dom)

Expand Down Expand Up @@ -1304,7 +1304,7 @@ func TestFinishAndUpdateOwnerAtSameTime(t *testing.T) {
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)

sessionFactory := sessionFactory(t, store)
sessionFactory := sessionFactory(t, dom)
se := sessionFactory()

tk.MustExec("use test")
Expand Down Expand Up @@ -1349,7 +1349,7 @@ func TestFinishError(t *testing.T) {
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)

sessionFactory := sessionFactory(t, store)
sessionFactory := sessionFactory(t, dom)
se := sessionFactory()

tk.MustExec("use test")
Expand Down Expand Up @@ -1447,6 +1447,8 @@ func boostJobScheduleForTest(t *testing.T) func() {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/update-info-schema-cache-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/task-manager-loop-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-task-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ttl/ttlworker/gc-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/meta/model/overwrite-ttl-job-interval", fmt.Sprintf("return(%d)", 100*time.Millisecond)))

return func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-job-triggered-interval"))
Expand All @@ -1457,6 +1459,8 @@ func boostJobScheduleForTest(t *testing.T) func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/update-info-schema-cache-interval"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/task-manager-loop-interval"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/check-task-interval"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ttl/ttlworker/gc-interval"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/meta/model/overwrite-ttl-job-interval"))
}
}

Expand All @@ -1465,7 +1469,7 @@ func TestDisableTTLAfterLoseHeartbeat(t *testing.T) {
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)

sessionFactory := sessionFactory(t, store)
sessionFactory := sessionFactory(t, dom)
se := sessionFactory()

tk.MustExec("use test")
Expand Down
2 changes: 1 addition & 1 deletion pkg/ttl/ttlworker/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func getSession(pool util.SessionPool) (session.Session, error) {
originalIsolationReadEngines, restoreIsolationReadEngines := "", false

se := session.NewSession(sctx, exec, func(se session.Session) {
_, err = se.ExecuteSQL(context.Background(), fmt.Sprintf("set tidb_retry_limit=%d", originalRetryLimit))
_, err := se.ExecuteSQL(context.Background(), fmt.Sprintf("set tidb_retry_limit=%d", originalRetryLimit))
if err != nil {
intest.AssertNoError(err)
logutil.BgLogger().Error("fail to reset tidb_retry_limit", zap.Int64("originalRetryLimit", originalRetryLimit), zap.Error(err))
Expand Down
Loading

0 comments on commit d9c7e43

Please sign in to comment.