diff --git a/ttl/cache/infoschema_test.go b/ttl/cache/infoschema_test.go index 4cec3db563fa7..ef428e4399c25 100644 --- a/ttl/cache/infoschema_test.go +++ b/ttl/cache/infoschema_test.go @@ -34,7 +34,7 @@ func TestInfoSchemaCache(t *testing.T) { conn := server.CreateMockConn(t, sv) sctx := conn.Context().Session tk := testkit.NewTestKitWithSession(t, store, sctx) - se := session.NewSession(sctx, sctx, func() {}) + se := session.NewSession(sctx, sctx, func(_ session.Session) {}) isc := cache.NewInfoSchemaCache(time.Hour) diff --git a/ttl/cache/ttlstatus_test.go b/ttl/cache/ttlstatus_test.go index 134faf3a201a3..a73d3675d0e6d 100644 --- a/ttl/cache/ttlstatus_test.go +++ b/ttl/cache/ttlstatus_test.go @@ -36,7 +36,7 @@ func TestTTLStatusCache(t *testing.T) { conn := server.CreateMockConn(t, sv) sctx := conn.Context().Session tk := testkit.NewTestKitWithSession(t, store, sctx) - ttlSession := session.NewSession(sctx, tk.Session(), func() {}) + ttlSession := session.NewSession(sctx, tk.Session(), func(_ session.Session) {}) isc := cache.NewTableStatusCache(time.Hour) diff --git a/ttl/session/session.go b/ttl/session/session.go index d07419651a103..4fa8bc198c3ac 100644 --- a/ttl/session/session.go +++ b/ttl/session/session.go @@ -50,11 +50,11 @@ type Session interface { type session struct { sessionctx.Context sqlExec sqlexec.SQLExecutor - closeFn func() + closeFn func(Session) } // NewSession creates a new Session -func NewSession(sctx sessionctx.Context, sqlExec sqlexec.SQLExecutor, closeFn func()) Session { +func NewSession(sctx sessionctx.Context, sqlExec sqlexec.SQLExecutor, closeFn func(Session)) Session { return &session{ Context: sctx, sqlExec: sqlExec, @@ -99,7 +99,7 @@ func (s *session) RunInTxn(ctx context.Context, fn func() error) (err error) { defer tracer.EnterPhase(tracer.Phase()) tracer.EnterPhase(metrics.PhaseBeginTxn) - if _, err = s.ExecuteSQL(ctx, "BEGIN"); err != nil { + if _, err = s.ExecuteSQL(ctx, "BEGIN OPTIMISTIC"); err != nil { return err } tracer.EnterPhase(metrics.PhaseOther) @@ -150,7 +150,7 @@ func (s *session) ResetWithGlobalTimeZone(ctx context.Context) error { // Close closes the session func (s *session) Close() { if s.closeFn != nil { - s.closeFn() + s.closeFn(s) s.Context = nil s.sqlExec = nil s.closeFn = nil diff --git a/ttl/ttlworker/BUILD.bazel b/ttl/ttlworker/BUILD.bazel index 8b7c39270807e..42a654e3b3343 100644 --- a/ttl/ttlworker/BUILD.bazel +++ b/ttl/ttlworker/BUILD.bazel @@ -40,6 +40,7 @@ go_test( name = "ttlworker_test", srcs = [ "del_test.go", + "job_manager_integration_test.go", "job_manager_test.go", "job_test.go", "scan_test.go", @@ -52,15 +53,21 @@ go_test( "//parser/ast", "//parser/model", "//parser/mysql", + "//session", "//sessionctx", "//sessionctx/variable", + "//testkit", "//ttl/cache", + "//ttl/session", "//types", "//util/chunk", + "//util/logutil", "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@org_golang_x_time//rate", + "@org_uber_go_atomic//:atomic", + "@org_uber_go_zap//:zap", ], ) diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index 5e2b67f8b101e..05f139e6bb599 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -32,7 +32,7 @@ import ( ) const insertNewTableIntoStatusTemplate = "INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (%d, %d)" -const setTableStatusOwnerTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_id = UUID(), current_job_owner_id = '%s',current_job_start_time = '%s',current_job_status = 'waiting',current_job_status_update_time = '%s',current_job_ttl_expire = '%s',current_job_owner_hb_time = '%s' WHERE (current_job_owner_id IS NULL OR current_job_owner_hb_time < '%s') AND table_id = %d" +const setTableStatusOwnerTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_id = UUID(), current_job_owner_id = '%s',current_job_start_time = '%s',current_job_status = 'waiting',current_job_status_update_time = '%s',current_job_ttl_expire = '%s',current_job_owner_hb_time = '%s' WHERE table_id = %d" const updateHeartBeatTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_owner_hb_time = '%s' WHERE table_id = %d AND current_job_owner_id = '%s'" const timeFormat = "2006-01-02 15:04:05" @@ -41,8 +41,8 @@ func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) string { return fmt.Sprintf(insertNewTableIntoStatusTemplate, tableID, parentTableID) } -func setTableStatusOwnerSQL(tableID int64, now time.Time, currentJobTTLExpire time.Time, maxHBTime time.Time, id string) string { - return fmt.Sprintf(setTableStatusOwnerTemplate, id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), maxHBTime.Format(timeFormat), tableID) +func setTableStatusOwnerSQL(tableID int64, now time.Time, currentJobTTLExpire time.Time, id string) string { + return fmt.Sprintf(setTableStatusOwnerTemplate, id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), tableID) } func updateHeartBeatSQL(tableID int64, now time.Time, id string) string { @@ -499,11 +499,10 @@ func (m *JobManager) couldTrySchedule(table *cache.TableStatus, now time.Time) b // localJob and return it. // It could be nil, nil, if the table query doesn't return error but the job has been locked by other instances. func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time) (*ttlJob, error) { - maxHBTime := now.Add(-2 * jobManagerLoopTickerInterval) var expireTime time.Time err := se.RunInTxn(ctx, func() error { - rows, err := se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.TableInfo.ID)) + rows, err := se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.ID)) if err != nil { return err } @@ -513,7 +512,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * if err != nil { return err } - rows, err = se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.TableInfo.ID)) + rows, err = se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.ID)) if err != nil { return err } @@ -534,7 +533,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * return err } - _, err = se.ExecuteSQL(ctx, setTableStatusOwnerSQL(table.ID, now, expireTime, maxHBTime, m.id)) + _, err = se.ExecuteSQL(ctx, setTableStatusOwnerSQL(table.ID, now, expireTime, m.id)) return err }) diff --git a/ttl/ttlworker/job_manager_integration_test.go b/ttl/ttlworker/job_manager_integration_test.go new file mode 100644 index 0000000000000..8c299afcd48de --- /dev/null +++ b/ttl/ttlworker/job_manager_integration_test.go @@ -0,0 +1,98 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttlworker_test + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/model" + dbsession "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/ttl/cache" + "github.com/pingcap/tidb/ttl/session" + "github.com/pingcap/tidb/ttl/ttlworker" + "github.com/pingcap/tidb/util/logutil" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + "go.uber.org/zap" +) + +func TestParallelLockNewJob(t *testing.T) { + store := testkit.CreateMockStore(t) + + sessionFactory := func() session.Session { + dbSession, err := dbsession.CreateSession4Test(store) + require.NoError(t, err) + se := session.NewSession(dbSession, dbSession, nil) + + _, err = se.ExecuteSQL(context.Background(), "ROLLBACK") + require.NoError(t, err) + _, err = se.ExecuteSQL(context.Background(), "set tidb_retry_limit=0") + require.NoError(t, err) + + return se + } + + storedTTLJobRunInterval := variable.TTLJobRunInterval.Load() + variable.TTLJobRunInterval.Store(0) + defer func() { + variable.TTLJobRunInterval.Store(storedTTLJobRunInterval) + }() + + testTable := &cache.PhysicalTable{ID: 2, TableInfo: &model.TableInfo{ID: 1, TTLInfo: &model.TTLInfo{IntervalExprStr: "1", IntervalTimeUnit: int(ast.TimeUnitDay)}}} + // simply lock a new job + m := ttlworker.NewJobManager("test-id", nil, store) + se := sessionFactory() + job, err := m.LockNewJob(context.Background(), se, testTable, time.Now()) + require.NoError(t, err) + job.Finish(se, time.Now()) + + // lock one table in parallel, only one of them should lock successfully + testTimes := 100 + concurrency := 5 + for i := 0; i < testTimes; i++ { + successCounter := atomic.NewUint64(0) + successJob := &ttlworker.TTLJob{} + + wg := sync.WaitGroup{} + for j := 0; j < concurrency; j++ { + jobManagerID := fmt.Sprintf("test-ttl-manager-%d", j) + wg.Add(1) + go func() { + m := ttlworker.NewJobManager(jobManagerID, nil, store) + + se := sessionFactory() + job, err := m.LockNewJob(context.Background(), se, testTable, time.Now()) + if err == nil { + successCounter.Add(1) + successJob = job + } else { + logutil.BgLogger().Error("lock new job with error", zap.Error(err)) + } + wg.Done() + }() + } + wg.Wait() + + require.Equal(t, uint64(1), successCounter.Load()) + successJob.Finish(se, time.Now()) + } +} diff --git a/ttl/ttlworker/job_manager_test.go b/ttl/ttlworker/job_manager_test.go index 87bc19ca08261..6718c384543fe 100644 --- a/ttl/ttlworker/job_manager_test.go +++ b/ttl/ttlworker/job_manager_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/ttl/cache" + "github.com/pingcap/tidb/ttl/session" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/stretchr/testify/assert" @@ -139,6 +140,22 @@ func (m *JobManager) SetScanWorkers4Test(workers []worker) { m.scanWorkers = workers } +// TTLJob exports the ttlJob for test +type TTLJob = ttlJob + +// LockNewJob is an exported version of lockNewJob for test +func (m *JobManager) LockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time) (*TTLJob, error) { + return m.lockNewJob(ctx, se, table, now) +} + +func (j *ttlJob) Finish(se session.Session, now time.Time) { + j.finish(se, now) +} + +func (j *ttlJob) ID() string { + return j.id +} + func newMockTTLJob(tbl *cache.PhysicalTable, status cache.JobStatus) *ttlJob { statistics := &ttlStatistics{} return &ttlJob{tbl: tbl, ctx: context.Background(), statistics: statistics, status: status, tasks: []*ttlScanTask{{ctx: context.Background(), tbl: tbl, statistics: statistics}}} @@ -195,7 +212,6 @@ func TestReadyForNewJobTables(t *testing.T) { func TestLockNewTable(t *testing.T) { now, err := time.Parse(timeFormat, "2022-12-05 17:13:05") assert.NoError(t, err) - maxHBTime := now.Add(-2 * jobManagerLoopTickerInterval) expireTime := now testPhysicalTable := &cache.PhysicalTable{ID: 1, TableInfo: &model.TableInfo{ID: 1, TTLInfo: &model.TTLInfo{ColumnName: model.NewCIStr("test"), IntervalExprStr: "5 Year"}}} @@ -219,7 +235,7 @@ func TestLockNewTable(t *testing.T) { newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, }, { - setTableStatusOwnerSQL(1, now, expireTime, maxHBTime, "test-id"), + setTableStatusOwnerSQL(1, now, expireTime, "test-id"), nil, nil, }, { @@ -241,7 +257,7 @@ func TestLockNewTable(t *testing.T) { newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, }, { - setTableStatusOwnerSQL(1, now, expireTime, maxHBTime, "test-id"), + setTableStatusOwnerSQL(1, now, expireTime, "test-id"), nil, nil, }, { @@ -255,7 +271,7 @@ func TestLockNewTable(t *testing.T) { newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, }, { - setTableStatusOwnerSQL(1, now, expireTime, maxHBTime, "test-id"), + setTableStatusOwnerSQL(1, now, expireTime, "test-id"), nil, errors.New("test error message"), }, }, false, true}, diff --git a/ttl/ttlworker/session.go b/ttl/ttlworker/session.go index b20f436a61859..5bea3a4b8cba6 100644 --- a/ttl/ttlworker/session.go +++ b/ttl/ttlworker/session.go @@ -16,6 +16,7 @@ package ttlworker import ( "context" + "fmt" "time" "github.com/ngaut/pools" @@ -26,7 +27,9 @@ import ( "github.com/pingcap/tidb/ttl/metrics" "github.com/pingcap/tidb/ttl/session" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" + "go.uber.org/zap" ) type sessionPool interface { @@ -57,10 +60,23 @@ func getSession(pool sessionPool) (session.Session, error) { return nil, errors.Errorf("%T cannot be casted to sqlexec.SQLExecutor", sctx) } - se := session.NewSession(sctx, exec, func() { + originalRetryLimit := sctx.GetSessionVars().RetryLimit + se := session.NewSession(sctx, exec, func(se session.Session) { + _, err = se.ExecuteSQL(context.Background(), fmt.Sprintf("set tidb_retry_limit=%d", originalRetryLimit)) + if err != nil { + logutil.BgLogger().Error("fail to reset tidb_retry_limit", zap.Int64("originalRetryLimit", originalRetryLimit), zap.Error(err)) + } + pool.Put(resource) }) + // store and set the retry limit to 0 + _, err = se.ExecuteSQL(context.Background(), "set tidb_retry_limit=0") + if err != nil { + se.Close() + return nil, err + } + // Force rollback the session to guarantee the session is not in any explicit transaction if _, err = se.ExecuteSQL(context.Background(), "ROLLBACK"); err != nil { se.Close()