diff --git a/ddl/ttl.go b/ddl/ttl.go index 357481f3eb32f..f7c4f3d6c73bb 100644 --- a/ddl/ttl.go +++ b/ddl/ttl.go @@ -63,7 +63,7 @@ func onTTLInfoChange(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err er if ttlInfo != nil { // if the TTL_ENABLE is not set explicitly, use the original value - if ttlInfoEnable == nil { + if ttlInfoEnable == nil && tblInfo.TTLInfo != nil { ttlInfo.Enable = tblInfo.TTLInfo.Enable } tblInfo.TTLInfo = ttlInfo diff --git a/domain/BUILD.bazel b/domain/BUILD.bazel index 97fced3ad05d4..56e3bffedde31 100644 --- a/domain/BUILD.bazel +++ b/domain/BUILD.bazel @@ -48,6 +48,7 @@ go_library( "//statistics", "//statistics/handle", "//telemetry", + "//ttl/ttlworker", "//types", "//util", "//util/chunk", diff --git a/domain/domain.go b/domain/domain.go index 0f0425178f06f..de448c530c801 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -59,6 +59,7 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/telemetry" + "github.com/pingcap/tidb/ttl/ttlworker" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/dbterror" @@ -120,6 +121,7 @@ type Domain struct { expiredTimeStamp4PC types.Time logBackupAdvancer *daemon.OwnerDaemon historicalStatsWorker *HistoricalStatsWorker + ttlJobManager *ttlworker.JobManager serverID uint64 serverIDSession *concurrency.Session @@ -1058,6 +1060,10 @@ func (do *Domain) Init( return err } + do.wg.Run(func() { + do.runTTLJobManager(ctx) + }) + return nil } @@ -2374,6 +2380,29 @@ func (do *Domain) serverIDKeeper() { } } +func (do *Domain) runTTLJobManager(ctx context.Context) { + ttlJobManager := ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool, do.store) + ttlJobManager.Start() + do.ttlJobManager = ttlJobManager + + // TODO: read the worker count from `do.sysVarCache` and resize the workers + ttlworker.ScanWorkersCount.Store(4) + ttlworker.DeleteWorkerCount.Store(4) + + <-do.exit + + ttlJobManager.Stop() + err := ttlJobManager.WaitStopped(ctx, 30*time.Second) + if err != nil { + logutil.BgLogger().Warn("fail to wait until the ttl job manager stop", zap.Error(err)) + } +} + +// TTLJobManager returns the ttl job manager on this domain +func (do *Domain) TTLJobManager() *ttlworker.JobManager { + return do.ttlJobManager +} + func init() { initByLDFlagsForGlobalKill() telemetry.GetDomainInfoSchema = func(ctx sessionctx.Context) infoschema.InfoSchema { diff --git a/ttl/cache/BUILD.bazel b/ttl/cache/BUILD.bazel index 32772c0f86de0..f051716ced40b 100644 --- a/ttl/cache/BUILD.bazel +++ b/ttl/cache/BUILD.bazel @@ -11,7 +11,6 @@ go_library( importpath = "github.com/pingcap/tidb/ttl/cache", visibility = ["//visibility:public"], deps = [ - "//infoschema", "//kv", "//parser/ast", "//parser/model", diff --git a/ttl/cache/base.go b/ttl/cache/base.go index cc2ece5c9bdc1..bf23dd8e14bb1 100644 --- a/ttl/cache/base.go +++ b/ttl/cache/base.go @@ -39,3 +39,7 @@ func (bc *baseCache) ShouldUpdate() bool { func (bc *baseCache) SetInterval(interval time.Duration) { bc.interval = interval } + +func (bc *baseCache) GetInterval() time.Duration { + return bc.interval +} diff --git a/ttl/cache/infoschema.go b/ttl/cache/infoschema.go index afff0ea725847..5fdf8d2081dc6 100644 --- a/ttl/cache/infoschema.go +++ b/ttl/cache/infoschema.go @@ -17,10 +17,8 @@ package cache import ( "time" - "github.com/pingcap/errors" - "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/ttl/session" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" ) @@ -41,18 +39,10 @@ func NewInfoSchemaCache(updateInterval time.Duration) *InfoSchemaCache { } // Update updates the info schema cache -func (isc *InfoSchemaCache) Update(sctx sessionctx.Context) error { - is, ok := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) - if !ok { - return errors.New("fail to get domain info schema from session") - } - - ext, ok := is.(*infoschema.SessionExtendedInfoSchema) - if !ok { - return errors.New("fail to get extended info schema") - } +func (isc *InfoSchemaCache) Update(se session.Session) error { + is := se.SessionInfoSchema() - if isc.schemaVer == ext.SchemaMetaVersion() { + if isc.schemaVer == is.SchemaMetaVersion() { return nil } @@ -60,7 +50,7 @@ func (isc *InfoSchemaCache) Update(sctx sessionctx.Context) error { for _, db := range is.AllSchemas() { for _, tbl := range is.SchemaTables(db.Name) { tblInfo := tbl.Meta() - if tblInfo.TTLInfo == nil || tblInfo.State != model.StatePublic { + if tblInfo.TTLInfo == nil || !tblInfo.TTLInfo.Enable || tblInfo.State != model.StatePublic { continue } diff --git a/ttl/cache/infoschema_test.go b/ttl/cache/infoschema_test.go index 5ba99a2b69703..7e811050b4601 100644 --- a/ttl/cache/infoschema_test.go +++ b/ttl/cache/infoschema_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb/server" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/ttl/cache" + "github.com/pingcap/tidb/ttl/session" "github.com/stretchr/testify/assert" ) @@ -36,18 +37,19 @@ func TestInfoSchemaCache(t *testing.T) { conn := server.CreateMockConn(t, sv) sctx := conn.Context().Session tk := testkit.NewTestKitWithSession(t, store, sctx) + se := session.NewSession(sctx, sctx, func() {}) isc := cache.NewInfoSchemaCache(time.Hour) // test should update assert.True(t, isc.ShouldUpdate()) - assert.NoError(t, isc.Update(sctx)) + assert.NoError(t, isc.Update(se)) assert.False(t, isc.ShouldUpdate()) // test new tables are synced assert.Equal(t, 0, len(isc.Tables)) tk.MustExec("create table test.t(created_at datetime) ttl = created_at + INTERVAL 5 YEAR") - assert.NoError(t, isc.Update(sctx)) + assert.NoError(t, isc.Update(se)) assert.Equal(t, 1, len(isc.Tables)) for _, table := range isc.Tables { assert.Equal(t, "t", table.TableInfo.Name.L) @@ -62,7 +64,7 @@ func TestInfoSchemaCache(t *testing.T) { partition p1 values less than (2000) ) `) - assert.NoError(t, isc.Update(sctx)) + assert.NoError(t, isc.Update(se)) assert.Equal(t, 2, len(isc.Tables)) partitions := []string{} for id, table := range isc.Tables { diff --git a/ttl/cache/table.go b/ttl/cache/table.go index ec64cade3cf5e..0cf50d092e437 100644 --- a/ttl/cache/table.go +++ b/ttl/cache/table.go @@ -176,7 +176,7 @@ func (t *PhysicalTable) ValidateKey(key []types.Datum) error { // EvalExpireTime returns the expired time func (t *PhysicalTable) EvalExpireTime(ctx context.Context, se session.Session, now time.Time) (expire time.Time, err error) { - tz := se.GetSessionVars().TimeZone + tz := se.GetSessionVars().Location() expireExpr := t.TTLInfo.IntervalExprStr unit := ast.TimeUnitType(t.TTLInfo.IntervalTimeUnit) diff --git a/ttl/cache/ttlstatus.go b/ttl/cache/ttlstatus.go index 5222ba3025433..cb1b8ef5942fe 100644 --- a/ttl/cache/ttlstatus.go +++ b/ttl/cache/ttlstatus.go @@ -16,6 +16,7 @@ package cache import ( "context" + "fmt" "time" "github.com/pingcap/tidb/sessionctx" @@ -35,12 +36,17 @@ const ( JobStatusCancelling = "cancelling" // JobStatusCancelled means this job has been canceled successfully JobStatusCancelled = "cancelled" - // JobStatusError means this job is in error status - JobStatusError = "error" + // JobStatusTimeout means this job has timeout + JobStatusTimeout = "timeout" ) const selectFromTTLTableStatus = "SELECT table_id,parent_table_id,table_statistics,last_job_id,last_job_start_time,last_job_finish_time,last_job_ttl_expire,last_job_summary,current_job_id,current_job_owner_id,current_job_owner_addr,current_job_owner_hb_time,current_job_start_time,current_job_ttl_expire,current_job_state,current_job_status,current_job_status_update_time FROM mysql.tidb_ttl_table_status" +// SelectFromTTLTableStatusWithID returns an SQL statement to get the table status from table id +func SelectFromTTLTableStatusWithID(tableID int64) string { + return selectFromTTLTableStatus + fmt.Sprintf(" WHERE table_id = %d", tableID) +} + // TableStatus contains the corresponding information in the system table `mysql.tidb_ttl_table_status` type TableStatus struct { TableID int64 @@ -89,7 +95,7 @@ func (tsc *TableStatusCache) Update(ctx context.Context, se session.Session) err newTables := make(map[int64]*TableStatus, len(rows)) for _, row := range rows { - status, err := rowToTableStatus(se, row) + status, err := RowToTableStatus(se, row) if err != nil { return err } @@ -101,9 +107,10 @@ func (tsc *TableStatusCache) Update(ctx context.Context, se session.Session) err return nil } -func rowToTableStatus(sctx sessionctx.Context, row chunk.Row) (*TableStatus, error) { +// RowToTableStatus converts a row to table status +func RowToTableStatus(sctx sessionctx.Context, row chunk.Row) (*TableStatus, error) { var err error - timeZone := sctx.GetSessionVars().TimeZone + timeZone := sctx.GetSessionVars().Location() status := &TableStatus{ TableID: row.GetInt64(0), diff --git a/ttl/session/session.go b/ttl/session/session.go index ea48bcd1f34dd..927b5f570bc92 100644 --- a/ttl/session/session.go +++ b/ttl/session/session.go @@ -16,6 +16,7 @@ package session import ( "context" + "time" "github.com/pingcap/errors" "github.com/pingcap/tidb/infoschema" @@ -41,6 +42,8 @@ type Session interface { ResetWithGlobalTimeZone(ctx context.Context) error // Close closes the session Close() + // Now returns the current time in location specified by session var + Now() time.Time } type session struct { @@ -145,3 +148,8 @@ func (s *session) Close() { s.closeFn = nil } } + +// Now returns the current time in the location of time_zone session var +func (s *session) Now() time.Time { + return time.Now().In(s.Context.GetSessionVars().Location()) +} diff --git a/ttl/ttlworker/BUILD.bazel b/ttl/ttlworker/BUILD.bazel index a278b9cac754f..a06a4af2d27c0 100644 --- a/ttl/ttlworker/BUILD.bazel +++ b/ttl/ttlworker/BUILD.bazel @@ -3,7 +3,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "ttlworker", srcs = [ + "config.go", "del.go", + "job.go", + "job_manager.go", "scan.go", "session.go", "worker.go", @@ -11,6 +14,7 @@ go_library( importpath = "github.com/pingcap/tidb/ttl/ttlworker", visibility = ["//visibility:public"], deps = [ + "//kv", "//parser/terror", "//sessionctx", "//sessionctx/variable", @@ -22,9 +26,12 @@ go_library( "//util/chunk", "//util/logutil", "//util/sqlexec", + "//util/timeutil", "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", "@org_golang_x_time//rate", + "@org_uber_go_atomic//:atomic", + "@org_uber_go_multierr//:multierr", "@org_uber_go_zap//:zap", ], ) @@ -33,10 +40,13 @@ go_test( name = "ttlworker_test", srcs = [ "del_test.go", + "job_manager_test.go", + "job_test.go", "scan_test.go", "session_test.go", ], embed = [":ttlworker"], + flaky = True, deps = [ "//infoschema", "//parser/ast", @@ -49,6 +59,7 @@ go_test( "//util/chunk", "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", + "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@org_golang_x_time//rate", ], diff --git a/ttl/ttlworker/config.go b/ttl/ttlworker/config.go new file mode 100644 index 0000000000000..e7a8e344c3e16 --- /dev/null +++ b/ttl/ttlworker/config.go @@ -0,0 +1,50 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttlworker + +import ( + "time" + + "go.uber.org/atomic" +) + +// TODO: the following functions should be put in the variable pkg to avoid cyclic dependency after adding variables for the TTL +// some of them are only used in test + +const jobManagerLoopTickerInterval = 10 * time.Second + +const updateInfoSchemaCacheInterval = time.Minute +const updateTTLTableStatusCacheInterval = 10 * time.Minute + +const ttlInternalSQLTimeout = 30 * time.Second +const ttlJobTimeout = 6 * time.Hour + +// TODO: add this variable to the sysvar +const ttlJobInterval = time.Hour + +// TODO: add these variables to the sysvar +var ttlJobScheduleWindowStartTime, _ = time.Parse(timeFormat, "2006-01-02 00:00:00") +var ttlJobScheduleWindowEndTime, _ = time.Parse(timeFormat, "2006-01-02 23:59:00") + +// TODO: migrate these two count to sysvar + +// ScanWorkersCount defines the count of scan worker +var ScanWorkersCount = atomic.NewUint64(0) + +// DeleteWorkerCount defines the count of delete worker +var DeleteWorkerCount = atomic.NewUint64(0) + +const resizeWorkersInterval = 30 * time.Second +const splitScanCount = 64 diff --git a/ttl/ttlworker/del_test.go b/ttl/ttlworker/del_test.go index 62e05dbeb4c14..524b45c9c8806 100644 --- a/ttl/ttlworker/del_test.go +++ b/ttl/ttlworker/del_test.go @@ -263,7 +263,7 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) { for _, c := range cases { invokes = 0 - retryRows := c.task.doDelete(context.TODO(), s) + retryRows := c.task.doDelete(context.Background(), s) require.Equal(t, 4, invokes) if c.retryRows == nil { require.Nil(t, retryRows) diff --git a/ttl/ttlworker/job.go b/ttl/ttlworker/job.go new file mode 100644 index 0000000000000..0d9d65bf54fe9 --- /dev/null +++ b/ttl/ttlworker/job.go @@ -0,0 +1,166 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttlworker + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/ttl/cache" + "github.com/pingcap/tidb/ttl/session" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +const updateJobCurrentStatusTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_status = '%s' WHERE table_id = %d AND current_job_status = '%s' AND current_job_id = '%s'" +const finishJobTemplate = "UPDATE mysql.tidb_ttl_table_status SET last_job_id = current_job_id, last_job_start_time = current_job_start_time, last_job_finish_time = '%s', last_job_ttl_expire = current_job_ttl_expire, last_job_summary = '%s', current_job_id = NULL, current_job_owner_id = NULL, current_job_owner_hb_time = NULL, current_job_start_time = NULL, current_job_ttl_expire = NULL, current_job_state = NULL, current_job_status = NULL, current_job_status_update_time = NULL WHERE table_id = %d AND current_job_id = '%s'" +const updateJobStateTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_state = '%s' WHERE table_id = %d AND current_job_id = '%s' AND current_job_owner_id = '%s'" + +func updateJobCurrentStatusSQL(tableID int64, oldStatus cache.JobStatus, newStatus cache.JobStatus, jobID string) string { + return fmt.Sprintf(updateJobCurrentStatusTemplate, newStatus, tableID, oldStatus, jobID) +} + +func finishJobSQL(tableID int64, finishTime time.Time, summary string, jobID string) string { + return fmt.Sprintf(finishJobTemplate, finishTime.Format(timeFormat), summary, tableID, jobID) +} + +func updateJobState(tableID int64, currentJobID string, currentJobState string, currentJobOwnerID string) string { + return fmt.Sprintf(updateJobStateTemplate, currentJobState, tableID, currentJobID, currentJobOwnerID) +} + +type ttlJob struct { + id string + ownerID string + + ctx context.Context + cancel func() + + createTime time.Time + + tbl *cache.PhysicalTable + + tasks []*ttlScanTask + taskIter int + finishedScanTaskCounter int + scanTaskErr error + + // status is the only field which should be protected by a mutex, as `Cancel` may be called at any time, and will + // change the status + statusMutex sync.Mutex + status cache.JobStatus + + statistics *ttlStatistics +} + +// changeStatus updates the state of this job +func (job *ttlJob) changeStatus(ctx context.Context, se session.Session, status cache.JobStatus) error { + job.statusMutex.Lock() + oldStatus := job.status + job.status = status + job.statusMutex.Unlock() + + _, err := se.ExecuteSQL(ctx, updateJobCurrentStatusSQL(job.tbl.ID, oldStatus, status, job.id)) + if err != nil { + return errors.Trace(err) + } + + return nil +} + +func (job *ttlJob) updateState(ctx context.Context, se session.Session) error { + _, err := se.ExecuteSQL(ctx, updateJobState(job.tbl.ID, job.id, job.statistics.String(), job.ownerID)) + if err != nil { + return errors.Trace(err) + } + + return nil +} + +// peekScanTask returns the next scan task, but doesn't promote the iterator +func (job *ttlJob) peekScanTask() (*ttlScanTask, error) { + return job.tasks[job.taskIter], nil +} + +// nextScanTask promotes the iterator +func (job *ttlJob) nextScanTask() { + job.taskIter += 1 +} + +// finish turns current job into last job, and update the error message and statistics summary +func (job *ttlJob) finish(se session.Session, now time.Time) { + summary := job.statistics.String() + if job.scanTaskErr != nil { + summary = fmt.Sprintf("Scan Error: %s, Statistics: %s", job.scanTaskErr.Error(), summary) + } + // at this time, the job.ctx may have been canceled (to cancel this job) + // even when it's canceled, we'll need to update the states, so use another context + _, err := se.ExecuteSQL(context.TODO(), finishJobSQL(job.tbl.ID, now, summary, job.id)) + if err != nil { + logutil.Logger(job.ctx).Error("fail to finish a ttl job", zap.Error(err), zap.Int64("tableID", job.tbl.ID), zap.String("jobID", job.id)) + } +} + +// AllSpawned returns whether all scan tasks have been dumped out +// **This function will be called concurrently, in many workers' goroutine** +func (job *ttlJob) AllSpawned() bool { + return job.taskIter == len(job.tasks) && len(job.tasks) != 0 +} + +// Timeout will return whether the job has timeout, if it is, it will be killed +func (job *ttlJob) Timeout(ctx context.Context, se session.Session, now time.Time) bool { + if !job.createTime.Add(ttlJobTimeout).Before(now) { + return false + } + + err := job.changeStatus(ctx, se, cache.JobStatusTimeout) + if err != nil { + logutil.BgLogger().Info("fail to update status of ttl job", zap.String("jobID", job.id), zap.Error(err)) + } + + return true +} + +// Finished returns whether the job is finished +func (job *ttlJob) Finished() bool { + job.statusMutex.Lock() + defer job.statusMutex.Unlock() + // in three condition, a job is considered finished: + // 1. It's cancelled manually + // 2. All scan tasks have been finished, and all selected rows succeed or in error state + // 3. The job is created one hour ago. It's a timeout. + return job.status == cache.JobStatusCancelled || (job.AllSpawned() && job.finishedScanTaskCounter == len(job.tasks) && job.statistics.TotalRows.Load() == job.statistics.ErrorRows.Load()+job.statistics.SuccessRows.Load()) +} + +// Cancel cancels the job context +func (job *ttlJob) Cancel(ctx context.Context, se session.Session) error { + if job.cancel != nil { + job.cancel() + } + // TODO: wait until all tasks have been finished + return job.changeStatus(ctx, se, cache.JobStatusCancelled) +} + +func findJobWithTableID(jobs []*ttlJob, id int64) *ttlJob { + for _, j := range jobs { + if j.tbl.ID == id { + return j + } + } + + return nil +} diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go new file mode 100644 index 0000000000000..a141cfc046fb3 --- /dev/null +++ b/ttl/ttlworker/job_manager.go @@ -0,0 +1,576 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttlworker + +import ( + "context" + "fmt" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/ttl/cache" + "github.com/pingcap/tidb/ttl/session" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/timeutil" + "go.uber.org/multierr" + "go.uber.org/zap" +) + +const insertNewTableIntoStatusTemplate = "INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (%d, %d)" +const setTableStatusOwnerTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_id = UUID(), current_job_owner_id = '%s',current_job_start_time = '%s',current_job_status = 'waiting',current_job_status_update_time = '%s',current_job_ttl_expire = '%s',current_job_owner_hb_time = '%s' WHERE (current_job_owner_id IS NULL OR current_job_owner_hb_time < '%s') AND table_id = %d" +const updateHeartBeatTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_owner_hb_time = '%s' WHERE table_id = %d AND current_job_owner_id = '%s'" + +const timeFormat = "2006-01-02 15:04:05" + +func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) string { + return fmt.Sprintf(insertNewTableIntoStatusTemplate, tableID, parentTableID) +} + +func setTableStatusOwnerSQL(tableID int64, now time.Time, currentJobTTLExpire time.Time, maxHBTime time.Time, id string) string { + return fmt.Sprintf(setTableStatusOwnerTemplate, id, now.Format(timeFormat), now.Format(timeFormat), currentJobTTLExpire.Format(timeFormat), now.Format(timeFormat), maxHBTime.Format(timeFormat), tableID) +} + +func updateHeartBeatSQL(tableID int64, now time.Time, id string) string { + return fmt.Sprintf(updateHeartBeatTemplate, now.Format(timeFormat), tableID, id) +} + +// JobManager schedules and manages the ttl jobs on this instance +type JobManager struct { + // the `runningJobs`, `scanWorkers` and `delWorkers` should be protected by mutex: + // `runningJobs` will be shared in the state loop and schedule loop + // `scanWorkers` and `delWorkers` can be modified by setting variables at any time + baseWorker + + sessPool sessionPool + + // id is the ddl id of this instance + id string + + store kv.Storage + + // the workers are shared between the loop goroutine and other sessions (e.g. manually resize workers through + // setting variables) + scanWorkers []worker + delWorkers []worker + + // infoSchemaCache and tableStatusCache are a cache stores the information from info schema and the tidb_ttl_table_status + // table. They don't need to be protected by mutex, because they are only used in job loop goroutine. + infoSchemaCache *cache.InfoSchemaCache + tableStatusCache *cache.TableStatusCache + + // runningJobs record all ttlJob waiting in local + // when a job for a table is created, it could spawn several scan tasks. If there are too many scan tasks, and they cannot + // be fully consumed by local scan workers, their states should be recorded in the runningJobs, so that we could continue + // to poll scan tasks from the job in the future when there are scan workers in idle. + runningJobs []*ttlJob + + delCh chan *ttlDeleteTask + notifyStateCh chan interface{} +} + +// NewJobManager creates a new ttl job manager +func NewJobManager(id string, sessPool sessionPool, store kv.Storage) (manager *JobManager) { + manager = &JobManager{} + manager.id = id + manager.store = store + manager.sessPool = sessPool + manager.delCh = make(chan *ttlDeleteTask) + + manager.init(manager.jobLoop) + manager.ctx = logutil.WithKeyValue(manager.ctx, "ttl-worker", "manager") + + manager.infoSchemaCache = cache.NewInfoSchemaCache(updateInfoSchemaCacheInterval) + manager.tableStatusCache = cache.NewTableStatusCache(updateTTLTableStatusCacheInterval) + + return +} + +func (m *JobManager) jobLoop() error { + se, err := getSession(m.sessPool) + if err != nil { + return err + } + + defer func() { + err = multierr.Combine(err, multierr.Combine(m.resizeScanWorkers(0), m.resizeDelWorkers(0))) + se.Close() + }() + + scheduleTicker := time.Tick(jobManagerLoopTickerInterval) + updateHeartBeatTicker := time.Tick(jobManagerLoopTickerInterval) + jobCheckTicker := time.Tick(jobManagerLoopTickerInterval) + updateScanTaskStateTicker := time.Tick(jobManagerLoopTickerInterval) + infoSchemaCacheUpdateTicker := time.Tick(m.infoSchemaCache.GetInterval()) + tableStatusCacheUpdateTicker := time.Tick(m.tableStatusCache.GetInterval()) + resizeWorkersTicker := time.Tick(resizeWorkersInterval) + for { + now := se.Now() + + select { + case <-m.ctx.Done(): + return nil + case <-infoSchemaCacheUpdateTicker: + err := m.updateInfoSchemaCache(se) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to update info schema cache", zap.Error(err)) + } + case <-tableStatusCacheUpdateTicker: + err := m.updateTableStatusCache(se) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to update table status cache", zap.Error(err)) + } + case <-updateHeartBeatTicker: + updateHeartBeatCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout) + err = m.updateHeartBeat(updateHeartBeatCtx, se) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to update heart beat", zap.Error(err)) + } + cancel() + case <-updateScanTaskStateTicker: + m.updateTaskState() + case <-m.notifyStateCh: + m.updateTaskState() + case <-jobCheckTicker: + m.checkFinishedJob(se, now) + m.checkNotOwnJob() + case <-resizeWorkersTicker: + err := m.resizeScanWorkers(int(ScanWorkersCount.Load())) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to resize scan workers", zap.Error(err)) + } + err = m.resizeDelWorkers(int(DeleteWorkerCount.Load())) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to resize delete workers", zap.Error(err)) + } + case <-scheduleTicker: + m.rescheduleJobs(se, now) + } + } +} + +func (m *JobManager) resizeScanWorkers(count int) error { + var err error + m.scanWorkers, err = m.resizeWorkers(m.scanWorkers, count, func() worker { + return newScanWorker(m.delCh, m.notifyStateCh, m.sessPool) + }) + return err +} + +func (m *JobManager) resizeDelWorkers(count int) error { + var err error + m.delWorkers, err = m.resizeWorkers(m.delWorkers, count, func() worker { + return newDeleteWorker(m.delCh, m.sessPool) + }) + return err +} + +func (m *JobManager) resizeWorkers(workers []worker, count int, factory func() worker) ([]worker, error) { + if count < len(workers) { + logutil.Logger(m.ctx).Info("shrink ttl worker", zap.Int("originalCount", len(workers)), zap.Int("newCount", count)) + + for _, w := range workers[count:] { + w.Stop() + } + var errs error + for _, w := range workers[count:] { + err := w.WaitStopped(m.ctx, 30*time.Second) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to stop ttl worker", zap.Error(err)) + errs = multierr.Append(errs, err) + } + } + + // remove the existing workers, and keep the left workers + workers = workers[:count] + return workers, errs + } + + if count > len(workers) { + logutil.Logger(m.ctx).Info("scale ttl worker", zap.Int("originalCount", len(workers)), zap.Int("newCount", count)) + + for i := len(workers); i < count; i++ { + w := factory() + w.Start() + workers = append(workers, w) + } + return workers, nil + } + + return workers, nil +} + +func (m *JobManager) updateTaskState() { + results := m.pollScanWorkerResults() + for _, result := range results { + job := findJobWithTableID(m.runningJobs, result.task.tbl.ID) + if job != nil { + logutil.Logger(m.ctx).Debug("scan task state changed", zap.String("jobID", job.id)) + + job.finishedScanTaskCounter += 1 + job.scanTaskErr = multierr.Append(job.scanTaskErr, result.err) + } + } +} + +func (m *JobManager) pollScanWorkerResults() []*ttlScanTaskExecResult { + results := make([]*ttlScanTaskExecResult, 0, len(m.scanWorkers)) + for _, w := range m.scanWorkers { + worker := w.(*ttlScanWorker) + result := worker.PollTaskResult() + if result != nil { + results = append(results, result) + } + } + + return results +} + +// checkNotOwnJob removes the job whose current job owner is not yourself +func (m *JobManager) checkNotOwnJob() { + for _, job := range m.runningJobs { + tableStatus := m.tableStatusCache.Tables[job.tbl.ID] + if tableStatus == nil || tableStatus.CurrentJobOwnerID != m.id { + logutil.Logger(m.ctx).Info("job has been taken over by another node", zap.String("jobID", job.id), zap.String("statistics", job.statistics.String())) + m.removeJob(job) + job.cancel() + } + } +} + +func (m *JobManager) checkFinishedJob(se session.Session, now time.Time) { + for _, job := range m.runningJobs { + timeoutJobCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout) + if job.Finished() { + logutil.Logger(m.ctx).Info("job has finished", zap.String("jobID", job.id), zap.String("statistics", job.statistics.String())) + m.removeJob(job) + job.finish(se, se.Now()) + } else if job.Timeout(timeoutJobCtx, se, now) { + logutil.Logger(m.ctx).Info("job is timeout", zap.String("jobID", job.id), zap.String("statistics", job.statistics.String())) + m.removeJob(job) + err := job.Cancel(timeoutJobCtx, se) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to cancel job", zap.Error(err)) + } + job.finish(se, se.Now()) + } + cancel() + } +} + +func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) { + if !timeutil.WithinDayTimePeriod(ttlJobScheduleWindowStartTime, ttlJobScheduleWindowEndTime, now) { + // Local jobs will also not run, but as the server is still sending heartbeat, + // and keep the job in memory, it could start the left task in the next window. + return + } + + idleScanWorkers := m.idleScanWorkers() + if len(idleScanWorkers) == 0 { + return + } + + localJobs := m.localJobs() + newJobTables := m.readyForNewJobTables(now) + // TODO: also consider to resume tables, but it's fine to left them there, as other nodes will take this job + // when the heart beat is not sent + for len(idleScanWorkers) > 0 && (len(newJobTables) > 0 || len(localJobs) > 0) { + var job *ttlJob + var err error + + switch { + case len(localJobs) > 0: + job = localJobs[0] + localJobs = localJobs[1:] + case len(newJobTables) > 0: + table := newJobTables[0] + newJobTables = newJobTables[1:] + logutil.Logger(m.ctx).Debug("try lock new job", zap.Int64("tableID", table.ID)) + job, err = m.lockNewJob(m.ctx, se, table, now) + if job != nil { + m.appendJob(job) + } + } + if err != nil { + logutil.Logger(m.ctx).Warn("fail to create new job", zap.Error(err)) + } + if job == nil { + continue + } + + for !job.AllSpawned() { + task, err := job.peekScanTask() + if err != nil { + logutil.Logger(m.ctx).Warn("fail to generate scan task", zap.Error(err)) + break + } + + for len(idleScanWorkers) > 0 { + idleWorker := idleScanWorkers[0] + idleScanWorkers = idleScanWorkers[1:] + + err := idleWorker.Schedule(task) + if err != nil { + logutil.Logger(m.ctx).Info("fail to schedule task", zap.Error(err)) + continue + } + + ctx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout) + err = job.changeStatus(ctx, se, cache.JobStatusRunning) + if err != nil { + // not a big problem, current logic doesn't depend on the job status to promote + // the routine, so we could just print a log here + logutil.Logger(m.ctx).Error("change ttl job status", zap.Error(err), zap.String("id", job.id)) + } + cancel() + + logArgs := []zap.Field{zap.String("table", task.tbl.TableInfo.Name.L)} + if task.tbl.PartitionDef != nil { + logArgs = append(logArgs, zap.String("partition", task.tbl.PartitionDef.Name.L)) + } + logutil.Logger(m.ctx).Debug("schedule ttl task", + logArgs...) + + job.nextScanTask() + break + } + + if len(idleScanWorkers) == 0 { + break + } + } + } +} + +func (m *JobManager) idleScanWorkers() []scanWorker { + workers := make([]scanWorker, 0, len(m.scanWorkers)) + for _, w := range m.scanWorkers { + if w.(scanWorker).Idle() { + workers = append(workers, w.(scanWorker)) + } + } + return workers +} + +func (m *JobManager) localJobs() []*ttlJob { + for _, job := range m.runningJobs { + status := m.tableStatusCache.Tables[job.tbl.ID] + if status == nil || status.CurrentJobOwnerID != m.id { + m.removeJob(job) + continue + } + } + return m.runningJobs +} + +// readyForNewJobTables returns all tables which should spawn a TTL job according to cache +func (m *JobManager) readyForNewJobTables(now time.Time) []*cache.PhysicalTable { + tables := make([]*cache.PhysicalTable, 0, len(m.infoSchemaCache.Tables)) + for _, table := range m.infoSchemaCache.Tables { + status := m.tableStatusCache.Tables[table.ID] + ok := m.couldTrySchedule(status, now) + if ok { + tables = append(tables, table) + } + } + + return tables +} + +// couldTrySchedule returns whether a table should be tried to run TTL +func (m *JobManager) couldTrySchedule(table *cache.TableStatus, now time.Time) bool { + if table == nil { + // if the table status hasn't been created, return true + return true + } + if table.CurrentJobOwnerID != "" { + // see whether it's heart beat time is expired + hbTime := table.CurrentJobOwnerHBTime + // a more concrete value is `2 * max(updateTTLTableStatusCacheInterval, jobManagerLoopTickerInterval)`, but the + // `updateTTLTableStatusCacheInterval` is greater than `jobManagerLoopTickerInterval` in most cases. + if hbTime.Add(2 * updateTTLTableStatusCacheInterval).Before(now) { + logutil.Logger(m.ctx).Info("task heartbeat has stopped", zap.Int64("tableID", table.TableID), zap.Time("hbTime", hbTime), zap.Time("now", now)) + return true + } + return false + } + + if table.LastJobFinishTime.IsZero() { + return true + } + + finishTime := table.LastJobFinishTime + + return finishTime.Add(ttlJobInterval).Before(now) +} + +// occupyNewJob tries to occupy a new job in the ttl_table_status table. If it locks successfully, it will create a new +// localJob and return it. +// It could be nil, nil, if the table query doesn't return error but the job has been locked by other instances. +func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time) (*ttlJob, error) { + maxHBTime := now.Add(-2 * jobManagerLoopTickerInterval) + var expireTime time.Time + + err := se.RunInTxn(ctx, func() error { + rows, err := se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.TableInfo.ID)) + if err != nil { + return err + } + if len(rows) == 0 { + // cannot find the row, insert the status row + _, err = se.ExecuteSQL(ctx, insertNewTableIntoStatusSQL(table.ID, table.TableInfo.ID)) + if err != nil { + return err + } + rows, err = se.ExecuteSQL(ctx, cache.SelectFromTTLTableStatusWithID(table.TableInfo.ID)) + if err != nil { + return err + } + if len(rows) == 0 { + return errors.New("table status row still doesn't exist after insertion") + } + } + tableStatus, err := cache.RowToTableStatus(se, rows[0]) + if err != nil { + return err + } + if !m.couldTrySchedule(tableStatus, now) { + return errors.New("couldn't schedule ttl job") + } + + expireTime, err = table.EvalExpireTime(m.ctx, se, now) + if err != nil { + return err + } + + _, err = se.ExecuteSQL(ctx, setTableStatusOwnerSQL(table.ID, now, expireTime, maxHBTime, m.id)) + + return err + }) + if err != nil { + return nil, err + } + + // successfully update the table status, will need to refresh the cache. + err = m.updateInfoSchemaCache(se) + if err != nil { + return nil, err + } + err = m.updateTableStatusCache(se) + if err != nil { + return nil, err + } + return m.createNewJob(expireTime, now, table) +} + +func (m *JobManager) createNewJob(expireTime time.Time, now time.Time, table *cache.PhysicalTable) (*ttlJob, error) { + id := m.tableStatusCache.Tables[table.ID].CurrentJobID + + statistics := &ttlStatistics{} + + ranges, err := table.SplitScanRanges(m.ctx, m.store, splitScanCount) + if err != nil { + return nil, err + } + + jobCtx, cancel := context.WithCancel(m.ctx) + + scanTasks := make([]*ttlScanTask, 0, len(ranges)) + for _, r := range ranges { + scanTasks = append(scanTasks, &ttlScanTask{ + ctx: jobCtx, + tbl: table, + expire: expireTime, + scanRange: r, + statistics: statistics, + }) + } + + return &ttlJob{ + id: id, + ownerID: m.id, + + ctx: jobCtx, + cancel: cancel, + + createTime: now, + // at least, the info schema cache and table status cache are consistent in table id, so it's safe to get table + // information from schema cache directly + tbl: table, + tasks: scanTasks, + + status: cache.JobStatusWaiting, + statistics: statistics, + }, nil +} + +// updateHeartBeat updates the heartbeat for all task with current instance as owner +func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session) error { + now := se.Now() + for _, job := range m.localJobs() { + _, err := se.ExecuteSQL(ctx, updateHeartBeatSQL(job.tbl.ID, now, m.id)) + if err != nil { + return errors.Trace(err) + } + } + return nil +} + +// updateInfoSchemaCache updates the cache of information schema +func (m *JobManager) updateInfoSchemaCache(se session.Session) error { + return m.infoSchemaCache.Update(se) +} + +// updateTableStatusCache updates the cache of table status +func (m *JobManager) updateTableStatusCache(se session.Session) error { + cacheUpdateCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout) + defer cancel() + return m.tableStatusCache.Update(cacheUpdateCtx, se) +} + +func (m *JobManager) removeJob(finishedJob *ttlJob) { + for idx, job := range m.runningJobs { + if job.id == finishedJob.id { + if idx+1 < len(m.runningJobs) { + m.runningJobs = append(m.runningJobs[0:idx], m.runningJobs[idx+1:]...) + } else { + m.runningJobs = m.runningJobs[0:idx] + } + return + } + } +} + +func (m *JobManager) appendJob(job *ttlJob) { + m.runningJobs = append(m.runningJobs, job) +} + +// CancelJob cancels a job +// TODO: the delete task is not controlled by the context now (but controlled by the worker context), so cancel +// doesn't work for delete tasks. +func (m *JobManager) CancelJob(ctx context.Context, jobID string) error { + se, err := getSession(m.sessPool) + if err != nil { + return err + } + + for _, job := range m.runningJobs { + if job.id == jobID { + return job.Cancel(ctx, se) + } + } + + return errors.Errorf("cannot find the job with id: %s", jobID) +} diff --git a/ttl/ttlworker/job_manager_test.go b/ttl/ttlworker/job_manager_test.go new file mode 100644 index 0000000000000..7261eb2edf8f7 --- /dev/null +++ b/ttl/ttlworker/job_manager_test.go @@ -0,0 +1,494 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttlworker + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/ttl/cache" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" + "github.com/stretchr/testify/assert" +) + +func newTTLTableStatusRows(status ...*cache.TableStatus) []chunk.Row { + c := chunk.NewChunkWithCapacity([]*types.FieldType{ + types.NewFieldType(mysql.TypeLonglong), // table_id + types.NewFieldType(mysql.TypeLonglong), // parent_table_id + types.NewFieldType(mysql.TypeString), // table_statistics + types.NewFieldType(mysql.TypeString), // last_job_id + types.NewFieldType(mysql.TypeDatetime), // last_job_start_time + types.NewFieldType(mysql.TypeDatetime), // last_job_finish_time + types.NewFieldType(mysql.TypeDatetime), // last_job_ttl_expire + types.NewFieldType(mysql.TypeString), // last_job_summary + types.NewFieldType(mysql.TypeString), // current_job_id + types.NewFieldType(mysql.TypeString), // current_job_owner_id + types.NewFieldType(mysql.TypeString), // current_job_owner_addr + types.NewFieldType(mysql.TypeDatetime), // current_job_hb_time + types.NewFieldType(mysql.TypeDatetime), // current_job_start_time + types.NewFieldType(mysql.TypeDatetime), // current_job_ttl_expire + types.NewFieldType(mysql.TypeString), // current_job_state + types.NewFieldType(mysql.TypeString), // current_job_status + types.NewFieldType(mysql.TypeDatetime), // current_job_status_update_time + }, len(status)) + var rows []chunk.Row + + for _, s := range status { + tableID := types.NewDatum(s.TableID) + c.AppendDatum(0, &tableID) + parentTableID := types.NewDatum(s.ParentTableID) + c.AppendDatum(1, &parentTableID) + if s.TableStatistics == "" { + c.AppendNull(2) + } else { + tableStatistics := types.NewDatum(s.TableStatistics) + c.AppendDatum(2, &tableStatistics) + } + + if s.LastJobID == "" { + c.AppendNull(3) + } else { + lastJobID := types.NewDatum(s.LastJobID) + c.AppendDatum(3, &lastJobID) + } + + lastJobStartTime := types.NewDatum(types.NewTime(types.FromGoTime(s.LastJobStartTime), mysql.TypeDatetime, types.MaxFsp)) + c.AppendDatum(4, &lastJobStartTime) + lastJobFinishTime := types.NewDatum(types.NewTime(types.FromGoTime(s.LastJobFinishTime), mysql.TypeDatetime, types.MaxFsp)) + c.AppendDatum(5, &lastJobFinishTime) + lastJobTTLExpire := types.NewDatum(types.NewTime(types.FromGoTime(s.LastJobTTLExpire), mysql.TypeDatetime, types.MaxFsp)) + c.AppendDatum(6, &lastJobTTLExpire) + + if s.LastJobSummary == "" { + c.AppendNull(7) + } else { + lastJobSummary := types.NewDatum(s.LastJobSummary) + c.AppendDatum(7, &lastJobSummary) + } + if s.CurrentJobID == "" { + c.AppendNull(8) + } else { + currentJobID := types.NewDatum(s.CurrentJobID) + c.AppendDatum(8, ¤tJobID) + } + if s.CurrentJobOwnerID == "" { + c.AppendNull(9) + } else { + currentJobOwnerID := types.NewDatum(s.CurrentJobOwnerID) + c.AppendDatum(9, ¤tJobOwnerID) + } + if s.CurrentJobOwnerAddr == "" { + c.AppendNull(10) + } else { + currentJobOwnerAddr := types.NewDatum(s.CurrentJobOwnerAddr) + c.AppendDatum(10, ¤tJobOwnerAddr) + } + + currentJobOwnerHBTime := types.NewDatum(types.NewTime(types.FromGoTime(s.CurrentJobOwnerHBTime), mysql.TypeDatetime, types.MaxFsp)) + c.AppendDatum(11, ¤tJobOwnerHBTime) + currentJobStartTime := types.NewDatum(types.NewTime(types.FromGoTime(s.CurrentJobStartTime), mysql.TypeDatetime, types.MaxFsp)) + c.AppendDatum(12, ¤tJobStartTime) + currentJobTTLExpire := types.NewDatum(types.NewTime(types.FromGoTime(s.CurrentJobTTLExpire), mysql.TypeDatetime, types.MaxFsp)) + c.AppendDatum(13, ¤tJobTTLExpire) + + if s.CurrentJobState == "" { + c.AppendNull(14) + } else { + currentJobState := types.NewDatum(s.CurrentJobState) + c.AppendDatum(14, ¤tJobState) + } + if s.CurrentJobStatus == "" { + c.AppendNull(15) + } else { + currentJobStatus := types.NewDatum(s.CurrentJobStatus) + c.AppendDatum(15, ¤tJobStatus) + } + + currentJobStatusUpdateTime := types.NewDatum(types.NewTime(types.FromGoTime(s.CurrentJobStatusUpdateTime), mysql.TypeDatetime, types.MaxFsp)) + c.AppendDatum(16, ¤tJobStatusUpdateTime) + } + + iter := chunk.NewIterator4Chunk(c) + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + rows = append(rows, row) + } + return rows +} + +var updateStatusSQL = "SELECT table_id,parent_table_id,table_statistics,last_job_id,last_job_start_time,last_job_finish_time,last_job_ttl_expire,last_job_summary,current_job_id,current_job_owner_id,current_job_owner_addr,current_job_owner_hb_time,current_job_start_time,current_job_ttl_expire,current_job_state,current_job_status,current_job_status_update_time FROM mysql.tidb_ttl_table_status" + +func (m *JobManager) SetScanWorkers4Test(workers []worker) { + m.scanWorkers = workers +} + +func newMockTTLJob(tbl *cache.PhysicalTable, status cache.JobStatus) *ttlJob { + statistics := &ttlStatistics{} + return &ttlJob{tbl: tbl, ctx: context.Background(), statistics: statistics, status: status, tasks: []*ttlScanTask{{ctx: context.Background(), tbl: tbl, statistics: statistics}}} +} + +func TestReadyForNewJobTables(t *testing.T) { + tbl := newMockTTLTbl(t, "t1") + m := NewJobManager("test-id", newMockSessionPool(t, tbl), nil) + se := newMockSession(t, tbl) + + cases := []struct { + name string + infoSchemaTables []*cache.PhysicalTable + tableStatus []*cache.TableStatus + shouldSchedule bool + }{ + // for a newly inserted table, it'll always be scheduled + {"newly created", []*cache.PhysicalTable{tbl}, []*cache.TableStatus{{TableID: tbl.ID, ParentTableID: tbl.ID}}, true}, + // table only in the table status cache will not be scheduled + {"proper subset", []*cache.PhysicalTable{}, []*cache.TableStatus{{TableID: tbl.ID, ParentTableID: tbl.ID}}, false}, + // table whose current job owner id is not empty, and heart beat time is long enough will not be scheduled + {"current job not empty", []*cache.PhysicalTable{tbl}, []*cache.TableStatus{{TableID: tbl.ID, ParentTableID: tbl.ID, CurrentJobOwnerID: "test-another-id", CurrentJobOwnerHBTime: time.Now()}}, false}, + // table whose current job owner id is not empty, but heart beat time is expired will be scheduled + {"hb time expired", []*cache.PhysicalTable{tbl}, []*cache.TableStatus{{TableID: tbl.ID, ParentTableID: tbl.ID, CurrentJobOwnerID: "test-another-id", CurrentJobOwnerHBTime: time.Now().Add(-time.Hour)}}, true}, + // if the last finished time is too near, it will also not be scheduled + {"last finished time too near", []*cache.PhysicalTable{tbl}, []*cache.TableStatus{{TableID: tbl.ID, ParentTableID: tbl.ID, LastJobFinishTime: time.Now()}}, false}, + // if the last finished time is expired, it will be scheduled + {"last finished time expired", []*cache.PhysicalTable{tbl}, []*cache.TableStatus{{TableID: tbl.ID, ParentTableID: tbl.ID, LastJobFinishTime: time.Now().Add(time.Hour * 2)}}, false}, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + m.infoSchemaCache.Tables = make(map[int64]*cache.PhysicalTable) + for _, ist := range c.infoSchemaTables { + m.infoSchemaCache.Tables[ist.ID] = ist + } + m.tableStatusCache.Tables = make(map[int64]*cache.TableStatus) + for _, st := range c.tableStatus { + m.tableStatusCache.Tables[st.TableID] = st + } + + tables := m.readyForNewJobTables(se.Now()) + if c.shouldSchedule { + assert.Len(t, tables, 1) + assert.Equal(t, int64(0), tables[0].ID) + assert.Equal(t, int64(0), tables[0].TableInfo.ID) + } else { + assert.Len(t, tables, 0) + } + }) + } +} + +func TestLockNewTable(t *testing.T) { + now, err := time.Parse(timeFormat, "2022-12-05 17:13:05") + assert.NoError(t, err) + maxHBTime := now.Add(-2 * jobManagerLoopTickerInterval) + expireTime := now + + testPhysicalTable := &cache.PhysicalTable{ID: 1, TableInfo: &model.TableInfo{ID: 1, TTLInfo: &model.TTLInfo{ColumnName: model.NewCIStr("test"), IntervalExprStr: "5 Year"}}} + + type sqlExecute struct { + sql string + + rows []chunk.Row + err error + } + cases := []struct { + name string + table *cache.PhysicalTable + sqls []sqlExecute + hasJob bool + hasError bool + }{ + {"normal lock table", testPhysicalTable, []sqlExecute{ + { + cache.SelectFromTTLTableStatusWithID(1), + newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, + }, + { + setTableStatusOwnerSQL(1, now, expireTime, maxHBTime, "test-id"), + nil, nil, + }, + { + updateStatusSQL, + newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, + }, + }, true, false}, + {"select nothing", testPhysicalTable, []sqlExecute{ + { + cache.SelectFromTTLTableStatusWithID(1), + nil, nil, + }, + { + insertNewTableIntoStatusSQL(1, 1), + nil, nil, + }, + { + cache.SelectFromTTLTableStatusWithID(1), + newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, + }, + { + setTableStatusOwnerSQL(1, now, expireTime, maxHBTime, "test-id"), + nil, nil, + }, + { + updateStatusSQL, + newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, + }, + }, true, false}, + {"return error", testPhysicalTable, []sqlExecute{ + { + cache.SelectFromTTLTableStatusWithID(1), + newTTLTableStatusRows(&cache.TableStatus{TableID: 1}), nil, + }, + { + setTableStatusOwnerSQL(1, now, expireTime, maxHBTime, "test-id"), + nil, errors.New("test error message"), + }, + }, false, true}, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + tbl := newMockTTLTbl(t, "t1") + + m := NewJobManager("test-id", newMockSessionPool(t, tbl), nil) + sqlCounter := 0 + se := newMockSession(t, tbl) + se.executeSQL = func(ctx context.Context, sql string, args ...interface{}) (rows []chunk.Row, err error) { + assert.Less(t, sqlCounter, len(c.sqls)) + assert.Equal(t, sql, c.sqls[sqlCounter].sql) + + rows = c.sqls[sqlCounter].rows + err = c.sqls[sqlCounter].err + sqlCounter += 1 + return + } + se.evalExpire = now + + job, err := m.lockNewJob(context.Background(), se, c.table, now) + if c.hasJob { + assert.NotNil(t, job) + } else { + assert.Nil(t, job) + } + if c.hasError { + assert.NotNil(t, err) + } else { + assert.Nil(t, err) + } + }) + } +} + +func TestResizeWorkers(t *testing.T) { + tbl := newMockTTLTbl(t, "t1") + + // scale workers + scanWorker1 := newMockScanWorker(t) + scanWorker1.Start() + scanWorker2 := newMockScanWorker(t) + + m := NewJobManager("test-id", newMockSessionPool(t, tbl), nil) + m.SetScanWorkers4Test([]worker{ + scanWorker1, + }) + newWorkers, err := m.resizeWorkers(m.scanWorkers, 2, func() worker { + return scanWorker2 + }) + assert.NoError(t, err) + assert.Len(t, newWorkers, 2) + scanWorker1.checkWorkerStatus(workerStatusRunning, true, nil) + scanWorker2.checkWorkerStatus(workerStatusRunning, true, nil) + + // shrink scan workers + scanWorker1 = newMockScanWorker(t) + scanWorker1.Start() + scanWorker2 = newMockScanWorker(t) + scanWorker2.Start() + + m = NewJobManager("test-id", newMockSessionPool(t, tbl), nil) + m.SetScanWorkers4Test([]worker{ + scanWorker1, + scanWorker2, + }) + + assert.NoError(t, m.resizeScanWorkers(1)) + scanWorker2.checkWorkerStatus(workerStatusStopped, false, nil) +} + +func TestLocalJobs(t *testing.T) { + tbl1 := newMockTTLTbl(t, "t1") + tbl1.ID = 1 + tbl2 := newMockTTLTbl(t, "t2") + tbl2.ID = 2 + m := NewJobManager("test-id", newMockSessionPool(t, tbl1, tbl2), nil) + + m.runningJobs = []*ttlJob{{tbl: tbl1, id: "1", ctx: context.Background()}, {tbl: tbl2, id: "2", ctx: context.Background()}} + m.tableStatusCache.Tables = map[int64]*cache.TableStatus{ + tbl1.ID: { + CurrentJobOwnerID: m.id, + }, + tbl2.ID: { + CurrentJobOwnerID: "another-id", + }, + } + assert.Len(t, m.localJobs(), 1) + assert.Equal(t, m.localJobs()[0].id, "1") +} + +func TestRescheduleJobs(t *testing.T) { + tbl := newMockTTLTbl(t, "t1") + se := newMockSession(t, tbl) + + scanWorker1 := newMockScanWorker(t) + scanWorker1.Start() + scanWorker1.setOneRowResult(tbl, 2022) + scanWorker2 := newMockScanWorker(t) + scanWorker2.Start() + scanWorker2.setOneRowResult(tbl, 2022) + + m := NewJobManager("test-id", newMockSessionPool(t, tbl), nil) + m.SetScanWorkers4Test([]worker{ + scanWorker1, + scanWorker2, + }) + + // schedule local running job + m.tableStatusCache.Tables = map[int64]*cache.TableStatus{ + tbl.ID: { + CurrentJobOwnerID: m.id, + }, + } + m.runningJobs = []*ttlJob{newMockTTLJob(tbl, cache.JobStatusWaiting)} + m.rescheduleJobs(se, se.Now()) + scanWorker1.checkWorkerStatus(workerStatusRunning, false, m.runningJobs[0].tasks[0]) + scanWorker1.checkPollResult(false, "") + scanWorker2.checkWorkerStatus(workerStatusRunning, true, nil) + scanWorker2.checkPollResult(false, "") + + // then run reschedule multiple times, no job will be scheduled + m.rescheduleJobs(se, se.Now()) + m.rescheduleJobs(se, se.Now()) + scanWorker1.checkWorkerStatus(workerStatusRunning, false, m.runningJobs[0].tasks[0]) + scanWorker1.checkPollResult(false, "") + scanWorker2.checkWorkerStatus(workerStatusRunning, true, nil) + scanWorker2.checkPollResult(false, "") + + del := scanWorker1.pollDelTask() + assert.Equal(t, 1, len(del.rows)) + assert.Equal(t, 1, len(del.rows[0])) + assert.Equal(t, int64(2022), del.rows[0][0].GetInt64()) + + // then the task ends + msg := scanWorker1.waitNotifyScanTaskEnd() + assert.Same(t, m.runningJobs[0].tasks[0], msg.result.task) + assert.NoError(t, msg.result.err) + scanWorker1.checkWorkerStatus(workerStatusRunning, false, m.runningJobs[0].tasks[0]) + scanWorker1.checkPollResult(true, "") + scanWorker2.checkWorkerStatus(workerStatusRunning, true, nil) + scanWorker2.checkPollResult(false, "") +} + +func TestRescheduleJobsOutOfWindow(t *testing.T) { + tbl := newMockTTLTbl(t, "t1") + se := newMockSession(t, tbl) + + scanWorker1 := newMockScanWorker(t) + scanWorker1.Start() + scanWorker1.setOneRowResult(tbl, 2022) + scanWorker2 := newMockScanWorker(t) + scanWorker2.Start() + scanWorker2.setOneRowResult(tbl, 2022) + + m := NewJobManager("test-id", newMockSessionPool(t, tbl), nil) + m.SetScanWorkers4Test([]worker{ + scanWorker1, + scanWorker2, + }) + + // jobs will not be scheduled + m.tableStatusCache.Tables = map[int64]*cache.TableStatus{ + tbl.ID: { + CurrentJobOwnerID: m.id, + }, + } + m.runningJobs = []*ttlJob{newMockTTLJob(tbl, cache.JobStatusWaiting)} + savedttlJobScheduleWindowStartTime := ttlJobScheduleWindowStartTime + savedttlJobScheduleWindowEndTime := ttlJobScheduleWindowEndTime + ttlJobScheduleWindowStartTime, _ = time.Parse(timeFormat, "2022-12-06 12:00:00") + ttlJobScheduleWindowEndTime, _ = time.Parse(timeFormat, "2022-12-06 12:05:00") + defer func() { + ttlJobScheduleWindowStartTime = savedttlJobScheduleWindowStartTime + ttlJobScheduleWindowEndTime = savedttlJobScheduleWindowEndTime + }() + + now, _ := time.Parse(timeFormat, "2022-12-06 12:06:00") + m.rescheduleJobs(se, now) + scanWorker1.checkWorkerStatus(workerStatusRunning, true, nil) + scanWorker1.checkPollResult(false, "") + scanWorker2.checkWorkerStatus(workerStatusRunning, true, nil) + scanWorker2.checkPollResult(false, "") + + // jobs will be scheduled within the time window + now, _ = time.Parse(timeFormat, "2022-12-06 12:02:00") + m.rescheduleJobs(se, now) + scanWorker1.checkWorkerStatus(workerStatusRunning, false, m.runningJobs[0].tasks[0]) + scanWorker1.checkPollResult(false, "") + scanWorker2.checkWorkerStatus(workerStatusRunning, true, nil) + scanWorker2.checkPollResult(false, "") +} + +func TestCheckFinishedJob(t *testing.T) { + tbl := newMockTTLTbl(t, "t1") + se := newMockSession(t, tbl) + + // cancelled job will be regarded as finished + m := NewJobManager("test-id", newMockSessionPool(t, tbl), nil) + m.runningJobs = []*ttlJob{newMockTTLJob(tbl, cache.JobStatusCancelled)} + m.checkFinishedJob(se, se.Now()) + assert.Len(t, m.runningJobs, 0) + + // a real finished job + finishedStatistics := &ttlStatistics{} + finishedStatistics.TotalRows.Store(1) + finishedStatistics.SuccessRows.Store(1) + m = NewJobManager("test-id", newMockSessionPool(t, tbl), nil) + m.runningJobs = []*ttlJob{newMockTTLJob(tbl, cache.JobStatusRunning)} + m.runningJobs[0].statistics = finishedStatistics + m.runningJobs[0].tasks[0].statistics = finishedStatistics + m.runningJobs[0].taskIter = 1 + m.runningJobs[0].finishedScanTaskCounter = 1 + + m.checkFinishedJob(se, se.Now()) + assert.Len(t, m.runningJobs, 0) + + // check timeout job + now := se.Now() + createTime := now.Add(-20 * time.Hour) + m = NewJobManager("test-id", newMockSessionPool(t, tbl), nil) + m.runningJobs = []*ttlJob{ + { + ctx: context.Background(), + tbl: tbl, + status: cache.JobStatusRunning, + statistics: &ttlStatistics{}, + + createTime: createTime, + }, + } + m.checkFinishedJob(se, now) + assert.Len(t, m.runningJobs, 0) +} diff --git a/ttl/ttlworker/job_test.go b/ttl/ttlworker/job_test.go new file mode 100644 index 0000000000000..5af5d0316eed2 --- /dev/null +++ b/ttl/ttlworker/job_test.go @@ -0,0 +1,37 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttlworker + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIterScanTask(t *testing.T) { + tbl := newMockTTLTbl(t, "t1") + + job := &ttlJob{ + tbl: tbl, + tasks: []*ttlScanTask{{}}, + } + scanTask, err := job.peekScanTask() + assert.NoError(t, err) + assert.NotNil(t, scanTask) + assert.Len(t, job.tasks, 1) + + job.nextScanTask() + assert.True(t, job.AllSpawned()) +} diff --git a/ttl/ttlworker/scan.go b/ttl/ttlworker/scan.go index 6cb18a2a9e346..538d3aeefc070 100644 --- a/ttl/ttlworker/scan.go +++ b/ttl/ttlworker/scan.go @@ -16,6 +16,7 @@ package ttlworker import ( "context" + "fmt" "strconv" "sync/atomic" "time" @@ -62,7 +63,13 @@ func (s *ttlStatistics) Reset() { s.TotalRows.Store(0) } +func (s *ttlStatistics) String() string { + return fmt.Sprintf("Total Rows: %d, Success Rows: %d, Error Rows: %d", s.TotalRows.Load(), s.SuccessRows.Load(), s.ErrorRows.Load()) +} + type ttlScanTask struct { + ctx context.Context + tbl *cache.PhysicalTable expire time.Time scanRange cache.ScanRange @@ -87,6 +94,11 @@ func (t *ttlScanTask) getDatumRows(rows []chunk.Row) [][]types.Datum { } func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, sessPool sessionPool) *ttlScanTaskExecResult { + // TODO: merge the ctx and the taskCtx in ttl scan task, to allow both "cancel" and gracefully stop workers + // now, the taskCtx is only check at the beginning of every loop + + taskCtx := t.ctx + rawSess, err := getSession(sessPool) if err != nil { return t.result(err) @@ -113,6 +125,9 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s retryTimes := 0 var lastResult [][]types.Datum for { + if err = taskCtx.Err(); err != nil { + return t.result(err) + } if err = ctx.Err(); err != nil { return t.result(err) } @@ -239,15 +254,15 @@ func (w *ttlScanWorker) CurrentTask() *ttlScanTask { return w.curTask } -func (w *ttlScanWorker) PollTaskResult() (*ttlScanTaskExecResult, bool) { +func (w *ttlScanWorker) PollTaskResult() *ttlScanTaskExecResult { w.Lock() defer w.Unlock() if r := w.curTaskResult; r != nil { w.curTask = nil w.curTaskResult = nil - return r, true + return r } - return nil, false + return nil } func (w *ttlScanWorker) loop() error { @@ -262,7 +277,7 @@ func (w *ttlScanWorker) loop() error { } switch task := msg.(type) { case *ttlScanTask: - w.handleScanTask(ctx, task) + w.handleScanTask(task) default: logutil.BgLogger().Warn("unrecognized message for ttlScanWorker", zap.Any("msg", msg)) } @@ -271,8 +286,8 @@ func (w *ttlScanWorker) loop() error { return nil } -func (w *ttlScanWorker) handleScanTask(ctx context.Context, task *ttlScanTask) { - result := task.doScan(ctx, w.delCh, w.sessionPool) +func (w *ttlScanWorker) handleScanTask(task *ttlScanTask) { + result := task.doScan(w.ctx, w.delCh, w.sessionPool) if result == nil { result = task.result(nil) } @@ -288,3 +303,10 @@ func (w *ttlScanWorker) handleScanTask(ctx context.Context, task *ttlScanTask) { } } } + +type scanWorker interface { + worker + + Idle() bool + Schedule(*ttlScanTask) error +} diff --git a/ttl/ttlworker/scan_test.go b/ttl/ttlworker/scan_test.go index 96d103d061876..66582084b18f3 100644 --- a/ttl/ttlworker/scan_test.go +++ b/ttl/ttlworker/scan_test.go @@ -47,8 +47,7 @@ func newMockScanWorker(t *testing.T) *mockScanWorker { w.ttlScanWorker = newScanWorker(w.delCh, w.notifyCh, w.sessPoll) require.Equal(t, workerStatusCreated, w.Status()) require.False(t, w.Idle()) - result, ok := w.PollTaskResult() - require.False(t, ok) + result := w.PollTaskResult() require.Nil(t, result) return w } @@ -61,8 +60,8 @@ func (w *mockScanWorker) checkWorkerStatus(status workerStatus, idle bool, curTa func (w *mockScanWorker) checkPollResult(exist bool, err string) { curTask := w.CurrentTask() - r, ok := w.PollTaskResult() - require.Equal(w.t, exist, ok) + r := w.PollTaskResult() + require.Equal(w.t, exist, r != nil) if !exist { require.Nil(w.t, r) } else { @@ -135,6 +134,7 @@ func TestScanWorkerSchedule(t *testing.T) { defer w.stopWithWait() task := &ttlScanTask{ + ctx: context.Background(), tbl: tbl, expire: time.UnixMilli(0), statistics: &ttlStatistics{}, @@ -181,6 +181,7 @@ func TestScanWorkerScheduleWithFailedTask(t *testing.T) { defer w.stopWithWait() task := &ttlScanTask{ + ctx: context.Background(), tbl: tbl, expire: time.UnixMilli(0), statistics: &ttlStatistics{}, @@ -220,6 +221,7 @@ func newMockScanTask(t *testing.T, sqlCnt int) *mockScanTask { task := &mockScanTask{ t: t, ttlScanTask: &ttlScanTask{ + ctx: context.Background(), tbl: tbl, expire: time.UnixMilli(0), scanRange: cache.ScanRange{ diff --git a/ttl/ttlworker/session.go b/ttl/ttlworker/session.go index 6326fff5f11d7..a8109e4c2e863 100644 --- a/ttl/ttlworker/session.go +++ b/ttl/ttlworker/session.go @@ -155,7 +155,7 @@ func validateTTLWork(ctx context.Context, s session.Session, tbl *cache.Physical if newTblInfo.TTLInfo.IntervalExprStr != tbl.TTLInfo.IntervalExprStr || newTblInfo.TTLInfo.IntervalTimeUnit != tbl.TTLInfo.IntervalTimeUnit { - newExpireTime, err := newTTLTbl.EvalExpireTime(ctx, s, time.Now()) + newExpireTime, err := newTTLTbl.EvalExpireTime(ctx, s, s.Now()) if err != nil { return err } diff --git a/ttl/ttlworker/session_test.go b/ttl/ttlworker/session_test.go index b12d47dd9f2bc..8cceaed7ac72b 100644 --- a/ttl/ttlworker/session_test.go +++ b/ttl/ttlworker/session_test.go @@ -195,6 +195,14 @@ func (s *mockSession) Close() { s.closed = true } +func (s *mockSession) Now() time.Time { + tz := s.sessionVars.TimeZone + if tz != nil { + tz = time.UTC + } + return time.Now().In(tz) +} + func TestExecuteSQLWithCheck(t *testing.T) { ctx := context.TODO() tbl := newMockTTLTbl(t, "t1") diff --git a/ttl/ttlworker/worker.go b/ttl/ttlworker/worker.go index d03a747d2855e..a04110373cdbf 100644 --- a/ttl/ttlworker/worker.go +++ b/ttl/ttlworker/worker.go @@ -109,6 +109,10 @@ func (w *baseWorker) WaitStopped(ctx context.Context, timeout time.Duration) err return nil } +func (w *baseWorker) Send() chan<- interface{} { + return w.ch +} + func (w *baseWorker) loop() { var err error defer func() {