diff --git a/ttl/ttlworker/BUILD.bazel b/ttl/ttlworker/BUILD.bazel index 6e3f77a106d86..4debd916534a0 100644 --- a/ttl/ttlworker/BUILD.bazel +++ b/ttl/ttlworker/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "scan.go", "session.go", "task_manager.go", + "timer.go", "timer_sync.go", "worker.go", ], @@ -24,6 +25,7 @@ go_library( "//sessionctx/variable", "//store/driver/error", "//timer/api", + "//timer/runtime", "//ttl/cache", "//ttl/client", "//ttl/metrics", @@ -60,11 +62,12 @@ go_test( "task_manager_integration_test.go", "task_manager_test.go", "timer_sync_test.go", + "timer_test.go", ], embed = [":ttlworker"], flaky = True, race = "on", - shard_count = 36, + shard_count = 41, deps = [ "//domain", "//infoschema", @@ -93,6 +96,7 @@ go_test( "@com_github_pingcap_failpoint//:failpoint", "@com_github_prometheus_client_model//go", "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//mock", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//testutils", "@org_golang_x_time//rate", diff --git a/ttl/ttlworker/timer.go b/ttl/ttlworker/timer.go new file mode 100644 index 0000000000000..f4ed5e99adb2d --- /dev/null +++ b/ttl/ttlworker/timer.go @@ -0,0 +1,281 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttlworker + +import ( + "context" + "encoding/json" + "sync" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/sessionctx/variable" + timerapi "github.com/pingcap/tidb/timer/api" + timerrt "github.com/pingcap/tidb/timer/runtime" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/timeutil" + "go.uber.org/zap" +) + +const ( + defaultCheckTTLJobInterval = 10 * time.Second +) + +type ttlTimerSummary struct { + LastJobRequestID string `json:"last_job_request_id,omitempty"` + LastJobSummary *TTLSummary `json:"last_job_summary,omitempty"` +} + +// TTLJobTrace contains some TTL job information to trace +type TTLJobTrace struct { + // RequestID is the request id when job submitted, we can use it to trace a job + RequestID string + // Finished indicates whether the job is finished + Finished bool + // Summary indicates the summary of the job + Summary *TTLSummary +} + +// TTLJobAdapter is used to submit TTL job and trace job status +type TTLJobAdapter interface { + // CanSubmitJob returns whether a new job can be created for the specified table + CanSubmitJob(tableID, physicalID int64) bool + // SubmitJob submits a new job + SubmitJob(ctx context.Context, tableID, physicalID int64, requestID string, watermark time.Time) (*TTLJobTrace, error) + // GetJob returns the job to trace + GetJob(ctx context.Context, tableID, physicalID int64, requestID string) (*TTLJobTrace, error) +} + +type ttlTimerHook struct { + adapter TTLJobAdapter + cli timerapi.TimerClient + ctx context.Context + cancel func() + wg sync.WaitGroup + nowFunc func() time.Time + checkTTLJobInterval time.Duration + // waitJobLoopCounter is only used for test + waitJobLoopCounter int64 +} + +func newTTLTimerHook(adapter TTLJobAdapter, cli timerapi.TimerClient) *ttlTimerHook { + ctx, cancel := context.WithCancel(context.Background()) + return &ttlTimerHook{ + adapter: adapter, + cli: cli, + ctx: ctx, + cancel: cancel, + nowFunc: time.Now, + checkTTLJobInterval: defaultCheckTTLJobInterval, + } +} + +func (t *ttlTimerHook) Start() {} + +func (t *ttlTimerHook) Stop() { + t.cancel() + t.wg.Wait() +} + +func (t *ttlTimerHook) OnPreSchedEvent(_ context.Context, event timerapi.TimerShedEvent) (r timerapi.PreSchedEventResult, err error) { + if !variable.EnableTTLJob.Load() { + r.Delay = time.Minute + return + } + + windowStart, windowEnd := variable.TTLJobScheduleWindowStartTime.Load(), variable.TTLJobScheduleWindowEndTime.Load() + if !timeutil.WithinDayTimePeriod(windowStart, windowEnd, t.nowFunc()) { + r.Delay = time.Minute + return + } + + timer := event.Timer() + var data TTLTimerData + if err = json.Unmarshal(event.Timer().Data, &data); err != nil { + logutil.BgLogger().Error("invalid TTL timer data", + zap.String("timerID", timer.ID), + zap.String("timerKey", timer.Key), + zap.ByteString("data", timer.Data), + ) + r.Delay = time.Minute + return + } + + if !t.adapter.CanSubmitJob(data.TableID, data.PhysicalID) { + r.Delay = time.Minute + return + } + + return +} + +func (t *ttlTimerHook) OnSchedEvent(ctx context.Context, event timerapi.TimerShedEvent) error { + timer := event.Timer() + eventID := event.EventID() + logger := logutil.BgLogger().With( + zap.String("key", timer.Key), + zap.String("eventID", eventID), + zap.Time("eventStart", timer.EventStart), + zap.Strings("tags", timer.Tags), + ) + + logger.Info("timer triggered to run TTL job", zap.String("manualRequest", timer.EventManualRequestID)) + if err := t.ctx.Err(); err != nil { + return err + } + + var data TTLTimerData + if err := json.Unmarshal(timer.Data, &data); err != nil { + logger.Error("invalid TTL timer data", zap.ByteString("data", timer.Data)) + return err + } + + job, err := t.adapter.GetJob(ctx, data.TableID, data.PhysicalID, eventID) + if err != nil { + return err + } + + if job == nil { + cancel := false + if !timer.Enable || !t.adapter.CanSubmitJob(data.TableID, data.PhysicalID) { + cancel = true + logger.Warn("cancel current TTL timer event because table's ttl is not enabled") + } + + if t.nowFunc().Sub(timer.EventStart) > 10*time.Minute { + cancel = true + logger.Warn("cancel current TTL timer event because job not submitted for a long time") + } + + if cancel { + return t.cli.CloseTimerEvent(ctx, timer.ID, eventID, timerapi.WithSetWatermark(timer.Watermark)) + } + + logger.Info("submit TTL job for current timer event") + if job, err = t.adapter.SubmitJob(ctx, data.TableID, data.PhysicalID, eventID, timer.EventStart); err != nil { + return err + } + } + + logger = logger.With(zap.String("jobRequestID", job.RequestID)) + logger.Info("start to wait TTL job") + t.wg.Add(1) + t.waitJobLoopCounter++ + go t.waitJobFinished(logger, &data, timer.ID, eventID, timer.EventStart) + return nil +} + +func (t *ttlTimerHook) waitJobFinished(logger *zap.Logger, data *TTLTimerData, timerID string, eventID string, eventStart time.Time) { + defer func() { + t.wg.Done() + logger.Info("stop to wait job") + }() + + ticker := time.NewTicker(t.checkTTLJobInterval) + defer ticker.Stop() + + for { + select { + case <-t.ctx.Done(): + logger.Info("stop waiting TTL job because of context cancelled") + return + case <-ticker.C: + } + + timer, err := t.cli.GetTimerByID(t.ctx, timerID) + if err != nil { + if errors.ErrorEqual(timerapi.ErrTimerNotExist, err) { + logger.Warn("stop waiting TTL job because of timer is deleted") + return + } + + logger.Error("GetTimerByID failed", zap.Error(err)) + continue + } + + if timer.EventID != eventID { + logger.Warn("stop waiting TTL job because of current event id changed", zap.String("newEventID", timer.EventID)) + return + } + + job, err := t.adapter.GetJob(t.ctx, data.TableID, data.PhysicalID, eventID) + if err != nil { + logger.Error("GetJob error", zap.Error(err)) + continue + } + + if job != nil && !job.Finished { + continue + } + + timerSummary := &ttlTimerSummary{ + LastJobRequestID: eventID, + } + + if job != nil { + timerSummary.LastJobSummary = job.Summary + } else { + logger.Warn("job for current TTL timer event not found") + } + + logger.Info("TTL job is finished, close current timer event") + summaryData, err := json.Marshal(timerSummary) + if err != nil { + logger.Error("marshal summary error", zap.Error(err)) + continue + } + + if err = t.cli.CloseTimerEvent(t.ctx, timerID, eventID, timerapi.WithSetWatermark(eventStart), timerapi.WithSetSummaryData(summaryData)); err != nil { + logger.Error("CloseTimerEvent error", zap.Error(err)) + continue + } + + return + } +} + +type ttlTimerRuntime struct { + rt *timerrt.TimerGroupRuntime + store *timerapi.TimerStore + adapter TTLJobAdapter +} + +func newTTLTimerRuntime(store *timerapi.TimerStore, adapter TTLJobAdapter) *ttlTimerRuntime { + return &ttlTimerRuntime{ + store: store, + adapter: adapter, + } +} + +func (r *ttlTimerRuntime) Resume() { + if r.rt != nil { + return + } + + r.rt = timerrt.NewTimerRuntimeBuilder("ttl", r.store). + SetCond(&timerapi.TimerCond{Key: timerapi.NewOptionalVal(timerKeyPrefix), KeyPrefix: true}). + RegisterHookFactory(timerHookClass, func(hookClass string, cli timerapi.TimerClient) timerapi.Hook { + return newTTLTimerHook(r.adapter, cli) + }). + Build() + r.rt.Start() +} + +func (r *ttlTimerRuntime) Pause() { + if rt := r.rt; rt != nil { + r.rt = nil + rt.Stop() + } +} diff --git a/ttl/ttlworker/timer_sync.go b/ttl/ttlworker/timer_sync.go index 8a23f57e87394..030d0b9b89754 100644 --- a/ttl/ttlworker/timer_sync.go +++ b/ttl/ttlworker/timer_sync.go @@ -70,6 +70,60 @@ func (g *TTLTimersSyncer) SetDelayDeleteInterval(interval time.Duration) { g.delayDelete = interval } +// ManualTriggerTTLTimer triggers a TTL job for a physical table which returns a function to wait the job done. +// This returned function returns a bool value to indicates whether the job is finished. +func (g *TTLTimersSyncer) ManualTriggerTTLTimer(ctx context.Context, tbl *cache.PhysicalTable) (func() (string, bool, error), error) { + se, err := getSession(g.pool) + if err != nil { + return nil, err + } + defer se.Close() + + timer, err := g.syncOneTimer(ctx, se, tbl.Schema, tbl.TableInfo, tbl.PartitionDef, true) + if err != nil { + return nil, err + } + + reqID, err := g.cli.ManualTriggerEvent(ctx, timer.ID) + if err != nil { + return nil, err + } + + return func() (string, bool, error) { + se, err = getSession(g.pool) + if err != nil { + return "", false, err + } + defer se.Close() + + if err = ctx.Err(); err != nil { + return "", false, err + } + + timer, err = g.cli.GetTimerByID(ctx, timer.ID) + if err != nil { + return "", false, err + } + + if timer.ManualRequestID != reqID { + return "", false, errors.Errorf("manual request failed to trigger, request not found") + } + + if timer.IsManualRequesting() { + if timeout := timer.ManualTimeout; timeout > 0 && time.Since(timer.ManualRequestTime) > timeout+5*time.Second { + return "", false, errors.New("manual request timeout") + } + return "", false, nil + } + + if timer.ManualEventID == "" { + return "", false, errors.New("manual request failed to trigger, request cancelled") + } + + return timer.ManualEventID, true, nil + }, nil +} + // SyncTimers syncs timers with TTL tables func (g *TTLTimersSyncer) SyncTimers(ctx context.Context, is infoschema.InfoSchema) { if time.Since(g.lastPullTimers) > fullRefreshTimersCacheInterval { @@ -115,6 +169,19 @@ func (g *TTLTimersSyncer) SyncTimers(ctx context.Context, is infoschema.InfoSche if time.Since(timer.CreateTime) > g.delayDelete { if _, err = g.cli.DeleteTimer(ctx, timer.ID); err != nil { logutil.BgLogger().Error("failed to delete timer", zap.Error(err), zap.String("timerID", timer.ID)) + } else { + delete(g.key2Timers, key) + } + } else if timer.Enable { + if err = g.cli.UpdateTimer(ctx, timer.ID, timerapi.WithSetEnable(false)); err != nil { + logutil.BgLogger().Error("failed to disable timer", zap.Error(err), zap.String("timerID", timer.ID)) + } + + timer, err = g.cli.GetTimerByID(ctx, timer.ID) + if err != nil { + logutil.BgLogger().Error("failed to get timer", zap.Error(err), zap.String("timerID", timer.ID)) + } else { + g.key2Timers[key] = timer } } } @@ -122,36 +189,57 @@ func (g *TTLTimersSyncer) SyncTimers(ctx context.Context, is infoschema.InfoSche func (g *TTLTimersSyncer) syncTimersForTable(ctx context.Context, se session.Session, schema model.CIStr, tblInfo *model.TableInfo) []string { if tblInfo.Partition == nil { - return []string{g.syncOneTimer(ctx, se, schema, tblInfo, nil)} + key := buildTimerKey(tblInfo, nil) + if _, err := g.syncOneTimer(ctx, se, schema, tblInfo, nil, false); err != nil { + logutil.BgLogger().Error("failed to syncOneTimer", zap.Error(err), zap.String("key", key)) + } + return []string{key} } defs := tblInfo.Partition.Definitions keys := make([]string, 0, len(defs)) for i := range defs { - keys = append( - keys, - g.syncOneTimer(ctx, se, schema, tblInfo, &defs[i]), - ) + partition := &defs[i] + key := buildTimerKey(tblInfo, partition) + keys = append(keys, key) + if _, err := g.syncOneTimer(ctx, se, schema, tblInfo, partition, false); err != nil { + logutil.BgLogger().Error("failed to syncOneTimer", zap.Error(err), zap.String("key", key)) + } } return keys } -func (g *TTLTimersSyncer) syncOneTimer(ctx context.Context, se session.Session, schema model.CIStr, tblInfo *model.TableInfo, partition *model.PartitionDefinition) (key string) { - key = buildTimerKey(tblInfo, partition) +func (g *TTLTimersSyncer) shouldSyncTimer(timer *timerapi.TimerRecord, schema model.CIStr, tblInfo *model.TableInfo, partition *model.PartitionDefinition) bool { + if timer == nil { + return true + } + tags := getTimerTags(schema, tblInfo, partition) ttlInfo := tblInfo.TTLInfo - existTimer, ok := g.key2Timers[key] - if ok && slices.Equal(existTimer.Tags, tags) && existTimer.Enable == ttlInfo.Enable && existTimer.SchedPolicyExpr == ttlInfo.JobInterval { - return + return !slices.Equal(timer.Tags, tags) || + timer.Enable != ttlInfo.Enable || + timer.SchedPolicyExpr != ttlInfo.JobInterval +} + +func (g *TTLTimersSyncer) syncOneTimer(ctx context.Context, se session.Session, schema model.CIStr, tblInfo *model.TableInfo, partition *model.PartitionDefinition, skipCache bool) (*timerapi.TimerRecord, error) { + key := buildTimerKey(tblInfo, partition) + tags := getTimerTags(schema, tblInfo, partition) + ttlInfo := tblInfo.TTLInfo + + if !skipCache { + timer, ok := g.key2Timers[key] + if ok && !g.shouldSyncTimer(timer, schema, tblInfo, partition) { + return timer, nil + } } timer, err := g.cli.GetTimerByKey(ctx, key) if err != nil && !errors.ErrorEqual(err, timerapi.ErrTimerNotExist) { - logutil.BgLogger().Error("failed to get timer for TTL table", zap.Error(err), zap.String("key", key)) - return + return nil, err } if errors.ErrorEqual(err, timerapi.ErrTimerNotExist) { + delete(g.key2Timers, key) var watermark time.Time ttlTableStatus, err := getTTLTableStatus(ctx, se, tblInfo, partition) if err != nil { @@ -159,7 +247,11 @@ func (g *TTLTimersSyncer) syncOneTimer(ctx context.Context, se session.Session, } if ttlTableStatus != nil { - watermark = ttlTableStatus.LastJobStartTime + if ttlTableStatus.CurrentJobID != "" { + watermark = ttlTableStatus.CurrentJobStartTime + } else { + watermark = ttlTableStatus.LastJobStartTime + } } dataObj := &TTLTimerData{ @@ -173,8 +265,7 @@ func (g *TTLTimersSyncer) syncOneTimer(ctx context.Context, se session.Session, data, err := json.Marshal(dataObj) if err != nil { - logutil.BgLogger().Error("failed to marshal TTL data object", zap.Error(err)) - return + return nil, err } timer, err = g.cli.CreateTimer(ctx, timerapi.TimerSpec{ @@ -188,15 +279,15 @@ func (g *TTLTimersSyncer) syncOneTimer(ctx context.Context, se session.Session, Enable: ttlInfo.Enable, }) if err != nil { - logutil.BgLogger().Error("failed to create new timer", - zap.Error(err), - zap.String("key", key), - zap.Strings("tags", tags), - ) - return + return nil, err } g.key2Timers[key] = timer - return + return timer, nil + } + + g.key2Timers[key] = timer + if !g.shouldSyncTimer(timer, schema, tblInfo, partition) { + return timer, nil } err = g.cli.UpdateTimer(ctx, timer.ID, @@ -212,20 +303,16 @@ func (g *TTLTimersSyncer) syncOneTimer(ctx context.Context, se session.Session, zap.String("key", key), zap.Strings("tags", tags), ) - return + return nil, err } timer, err = g.cli.GetTimerByID(ctx, timer.ID) if err != nil { - logutil.BgLogger().Error("failed to get timer", - zap.Error(err), - zap.String("timerID", timer.ID), - ) - return + return nil, err } g.key2Timers[timer.Key] = timer - return + return timer, nil } func getTimerTags(schema model.CIStr, tblInfo *model.TableInfo, partition *model.PartitionDefinition) []string { @@ -246,7 +333,11 @@ func buildTimerKey(tblInfo *model.TableInfo, partition *model.PartitionDefinitio if partition != nil { physicalID = partition.ID } - return fmt.Sprintf("%s%d/%d", timerKeyPrefix, tblInfo.ID, physicalID) + return buildTimerKeyWithID(tblInfo.ID, physicalID) +} + +func buildTimerKeyWithID(tblID, physicalID int64) string { + return fmt.Sprintf("%s%d/%d", timerKeyPrefix, tblID, physicalID) } func getTTLTableStatus(ctx context.Context, se session.Session, tblInfo *model.TableInfo, partition *model.PartitionDefinition) (*cache.TableStatus, error) { diff --git a/ttl/ttlworker/timer_sync_test.go b/ttl/ttlworker/timer_sync_test.go index eb08f681e878a..8dc4a1f8db2cd 100644 --- a/ttl/ttlworker/timer_sync_test.go +++ b/ttl/ttlworker/timer_sync_test.go @@ -22,15 +22,145 @@ import ( "time" "github.com/google/uuid" + "github.com/pingcap/errors" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/testkit" timerapi "github.com/pingcap/tidb/timer/api" "github.com/pingcap/tidb/timer/tablestore" + "github.com/pingcap/tidb/ttl/cache" "github.com/pingcap/tidb/ttl/ttlworker" "github.com/stretchr/testify/require" ) +func TestTTLManualTriggerOneTimer(t *testing.T) { + store, do := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(tablestore.CreateTimerTableSQL("test", "test_timers")) + timerStore := tablestore.NewTableTimerStore(1, do.SysSessionPool(), "test", "test_timers", nil) + defer timerStore.Close() + var zeroWatermark time.Time + cli := timerapi.NewDefaultTimerClient(timerStore) + sync := ttlworker.NewTTLTimerSyncer(do.SysSessionPool(), cli) + + tk.MustExec("set @@global.tidb_ttl_job_enable=0") + tk.MustExec("create table tp1(a int, t timestamp) TTL=`t`+interval 1 HOUR ttl_job_interval='3h' partition by range(a) (" + + "partition p0 values less than (10)," + + "partition p1 values less than (100)," + + "partition p2 values less than (1000)" + + ")") + + key, physical := getPhysicalTableInfo(t, do, "test", "tp1", "p0") + _, err := cli.GetTimerByKey(context.TODO(), key) + require.True(t, errors.ErrorEqual(err, timerapi.ErrTimerNotExist)) + + startTrigger := func(ctx context.Context, expectErr string) (func() (string, bool, error), timerapi.ManualRequest) { + timer, err := cli.GetTimerByKey(context.TODO(), key) + if !errors.ErrorEqual(timerapi.ErrTimerNotExist, err) { + require.NoError(t, err) + require.False(t, timer.IsManualRequesting()) + } + + _, physical = getPhysicalTableInfo(t, do, "test", "tp1", "p0") + check, err := sync.ManualTriggerTTLTimer(ctx, physical) + timer = checkTimerWithTableMeta(t, do, cli, "test", "tp1", "p0", zeroWatermark) + if expectErr != "" { + require.EqualError(t, err, expectErr) + require.Nil(t, check) + require.False(t, timer.IsManualRequesting()) + return nil, timer.ManualRequest + } + + require.NoError(t, err) + require.True(t, timer.IsManualRequesting()) + return check, timer.ManualRequest + } + + testCheckFunc := func(check func() (string, bool, error), expectJobID string, expectErr string) { + jobID, ok, err := check() + if expectErr != "" { + require.Empty(t, expectJobID) + require.Empty(t, jobID) + require.False(t, ok) + require.EqualError(t, err, expectErr) + } else { + require.NoError(t, err) + require.Equal(t, ok, jobID != "") + require.Equal(t, expectJobID, jobID) + } + } + + // start trigger -> not finished -> finished + check, manual := startTrigger(context.TODO(), "") + testCheckFunc(check, "", "") + timer, err := cli.GetTimerByKey(context.TODO(), key) + require.NoError(t, err) + manual.ManualEventID = "event123" + manual.ManualProcessed = true + require.NoError(t, timerStore.Update(context.TODO(), timer.ID, &timerapi.TimerUpdate{ + ManualRequest: timerapi.NewOptionalVal(manual), + })) + testCheckFunc(check, "event123", "") + + // start trigger -> trigger done but no event id + check, manual = startTrigger(context.TODO(), "") + manual.ManualProcessed = true + require.NoError(t, timerStore.Update(context.TODO(), timer.ID, &timerapi.TimerUpdate{ + ManualRequest: timerapi.NewOptionalVal(manual), + })) + testCheckFunc(check, "", "manual request failed to trigger, request cancelled") + + // start trigger -> manual requestID not match + check, manual = startTrigger(context.TODO(), "") + manual.ManualRequestID = "anotherreqid" + require.NoError(t, timerStore.Update(context.TODO(), timer.ID, &timerapi.TimerUpdate{ + ManualRequest: timerapi.NewOptionalVal(manual), + })) + testCheckFunc(check, "", "manual request failed to trigger, request not found") + manual.ManualRequestID = "anotherreqid" + manual.ManualProcessed = true + require.NoError(t, timerStore.Update(context.TODO(), timer.ID, &timerapi.TimerUpdate{ + ManualRequest: timerapi.NewOptionalVal(manual), + })) + testCheckFunc(check, "", "manual request failed to trigger, request not found") + require.NoError(t, timerStore.Update(context.TODO(), timer.ID, &timerapi.TimerUpdate{ + ManualRequest: timerapi.NewOptionalVal(timerapi.ManualRequest{}), + })) + testCheckFunc(check, "", "manual request failed to trigger, request not found") + + // start trigger -> trigger not done but timeout + check, manual = startTrigger(context.TODO(), "") + manual.ManualRequestTime = time.Now().Add(-time.Minute) + manual.ManualTimeout = 50 * time.Second + require.NoError(t, timerStore.Update(context.TODO(), timer.ID, &timerapi.TimerUpdate{ + ManualRequest: timerapi.NewOptionalVal(manual), + })) + testCheckFunc(check, "", "manual request timeout") + + // disable ttl + require.NoError(t, timerStore.Update(context.TODO(), timer.ID, &timerapi.TimerUpdate{ + ManualRequest: timerapi.NewOptionalVal(timerapi.ManualRequest{}), + })) + tk.MustExec("alter table tp1 ttl_enable='OFF'") + _, physical = getPhysicalTableInfo(t, do, "test", "tp1", "p0") + startTrigger(context.TODO(), "manual trigger is not allowed when timer is disabled") + tk.MustExec("alter table tp1 ttl_enable='ON'") + + // start trigger -> timer deleted + check, _ = startTrigger(context.TODO(), "") + _, err = cli.DeleteTimer(context.TODO(), timer.ID) + require.NoError(t, err) + testCheckFunc(check, "", "timer not exist") + + // ctx timeout + ctx, cancel := context.WithCancel(context.TODO()) + check, _ = startTrigger(ctx, "") + cancel() + testCheckFunc(check, "", ctx.Err().Error()) +} + func TestTTLTimerSync(t *testing.T) { store, do := testkit.CreateMockStoreAndDomain(t) @@ -53,10 +183,10 @@ func TestTTLTimerSync(t *testing.T) { var zeroTime time.Time wm1 := time.Unix(3600*24*12, 0) wm2 := time.Unix(3600*24*24, 0) - insertTTLTableStatusWatermark(t, do, tk, "test", "t1", "", zeroTime) - insertTTLTableStatusWatermark(t, do, tk, "test", "t2", "", wm1) - insertTTLTableStatusWatermark(t, do, tk, "test", "tp1", "p0", zeroTime) - insertTTLTableStatusWatermark(t, do, tk, "test", "tp1", "p1", wm2) + insertTTLTableStatusWatermark(t, do, tk, "test", "t1", "", zeroTime, false) + insertTTLTableStatusWatermark(t, do, tk, "test", "t2", "", wm1, false) + insertTTLTableStatusWatermark(t, do, tk, "test", "tp1", "p0", zeroTime, false) + insertTTLTableStatusWatermark(t, do, tk, "test", "tp1", "p1", wm2, true) cli := timerapi.NewDefaultTimerClient(timerStore) sync := ttlworker.NewTTLTimerSyncer(do.SysSessionPool(), cli) @@ -129,7 +259,11 @@ func TestTTLTimerSync(t *testing.T) { require.NotEqual(t, oldTimerP20.ID, timerP20.ID) timerP21 = checkTimerWithTableMeta(t, do, cli, "test", "tp2", "p1", zeroTime) require.NotEqual(t, oldTimerP21.ID, timerP21.ID) - checkTimersNotChange(t, cli, timer1, oldTimer2, timer3, timer4, timer5, timerP10, oldTimerP11, timerP12, timerP13, oldTimerP20, oldTimerP21) + oldTimer2 = checkTimerOnlyDisabled(t, cli, oldTimer2) + oldTimerP11 = checkTimerOnlyDisabled(t, cli, oldTimerP11) + oldTimerP20 = checkTimerOnlyDisabled(t, cli, oldTimerP20) + oldTimerP21 = checkTimerOnlyDisabled(t, cli, oldTimerP21) + checkTimersNotChange(t, cli, timer1, timer3, timer4, timer5, timerP10, timerP12, timerP13) // drop table/partition tk.MustExec("drop table t1a") @@ -137,8 +271,12 @@ func TestTTLTimerSync(t *testing.T) { tk.MustExec("drop table tp2") sync.SyncTimers(context.TODO(), do.InfoSchema()) checkTimerCnt(t, cli, 15) + checkTimerOnlyDisabled(t, cli, timer1) + checkTimerOnlyDisabled(t, cli, timerP13) + checkTimerOnlyDisabled(t, cli, timerP20) + checkTimerOnlyDisabled(t, cli, timerP21) checkTimersNotChange(t, cli, oldTimer2, oldTimerP11, oldTimerP20, oldTimerP21) - checkTimersNotChange(t, cli, timer1, timer2, timer3, timer4, timer5, timerP10, timerP11, timerP12, timerP13, timerP20, timerP21) + checkTimersNotChange(t, cli, timer2, timer3, timer4, timer5, timerP10, timerP11, timerP12) // clear deleted tables sync.SetDelayDeleteInterval(time.Millisecond) @@ -148,31 +286,26 @@ func TestTTLTimerSync(t *testing.T) { checkTimersNotChange(t, cli, timer2, timer3, timer4, timer5, timerP10, timerP11, timerP12) } -func insertTTLTableStatusWatermark(t *testing.T, do *domain.Domain, tk *testkit.TestKit, db, table, partition string, watermark time.Time) { - tbl, err := do.InfoSchema().TableByName(model.NewCIStr(db), model.NewCIStr(table)) - require.NoError(t, err) - tblInfo := tbl.Meta() - physicalID := tblInfo.ID - var par model.PartitionDefinition - if partition != "" { - for _, def := range tblInfo.Partition.Definitions { - if def.Name.L == model.NewCIStr(partition).L { - par = def - } - } - require.NotNil(t, par) - physicalID = par.ID - } - +func insertTTLTableStatusWatermark(t *testing.T, do *domain.Domain, tk *testkit.TestKit, db, table, partition string, watermark time.Time, jobRunning bool) { + _, physical := getPhysicalTableInfo(t, do, db, table, partition) if watermark.IsZero() { - tk.MustExec("insert into mysql.tidb_ttl_table_status (table_id, parent_table_id) values (?, ?)", physicalID, tblInfo.ID) + tk.MustExec("insert into mysql.tidb_ttl_table_status (table_id, parent_table_id) values (?, ?)", physical.ID, physical.TableInfo.ID) return } - tk.MustExec( - "insert into mysql.tidb_ttl_table_status (table_id, parent_table_id, last_job_id, last_job_start_time, last_job_finish_time, last_job_ttl_expire) values(?, ?, ?, FROM_UNIXTIME(?), FROM_UNIXTIME(?), FROM_UNIXTIME(?))", - physicalID, tblInfo.ID, uuid.NewString(), watermark.Unix(), watermark.Add(time.Minute).Unix(), watermark.Add(-time.Minute).Unix(), - ) + if jobRunning { + tk.MustExec( + "insert into mysql.tidb_ttl_table_status (table_id, parent_table_id, last_job_id, last_job_start_time, last_job_finish_time, last_job_ttl_expire, current_job_id, current_job_start_time) values(?, ?, ?, FROM_UNIXTIME(?), FROM_UNIXTIME(?), FROM_UNIXTIME(?), ?, FROM_UNIXTIME(?))", + physical.ID, physical.TableInfo.ID, uuid.NewString(), watermark.Add(-10*time.Minute).Unix(), watermark.Add(-time.Minute).Unix(), watermark.Add(-20*time.Minute).Unix(), + uuid.NewString(), + watermark.Unix(), + ) + } else { + tk.MustExec( + "insert into mysql.tidb_ttl_table_status (table_id, parent_table_id, last_job_id, last_job_start_time, last_job_finish_time, last_job_ttl_expire) values(?, ?, ?, FROM_UNIXTIME(?), FROM_UNIXTIME(?), FROM_UNIXTIME(?))", + physical.ID, physical.TableInfo.ID, uuid.NewString(), watermark.Unix(), watermark.Add(time.Minute).Unix(), watermark.Add(-time.Minute).Unix(), + ) + } } func checkTimerCnt(t *testing.T, cli timerapi.TimerClient, cnt int) { @@ -181,6 +314,23 @@ func checkTimerCnt(t *testing.T, cli timerapi.TimerClient, cnt int) { require.Equal(t, cnt, len(timers)) } +func checkTimerOnlyDisabled(t *testing.T, cli timerapi.TimerClient, timer *timerapi.TimerRecord) *timerapi.TimerRecord { + tm, err := cli.GetTimerByID(context.TODO(), timer.ID) + require.NoError(t, err) + if !timer.Enable { + require.Equal(t, *timer, *tm) + return timer + } + + require.False(t, tm.Enable) + require.Greater(t, tm.Version, timer.Version) + tm2 := timer.Clone() + tm2.Enable = tm.Enable + tm2.Version = tm.Version + require.Equal(t, *tm, *tm2) + return tm +} + func checkTimersNotChange(t *testing.T, cli timerapi.TimerClient, timers ...*timerapi.TimerRecord) { for i, timer := range timers { tm, err := cli.GetTimerByID(context.TODO(), timer.ID) @@ -189,50 +339,46 @@ func checkTimersNotChange(t *testing.T, cli timerapi.TimerClient, timers ...*tim } } -func checkTimerWithTableMeta(t *testing.T, do *domain.Domain, cli timerapi.TimerClient, db, table, partition string, watermark time.Time) *timerapi.TimerRecord { +func getPhysicalTableInfo(t *testing.T, do *domain.Domain, db, table, partition string) (string, *cache.PhysicalTable) { is := do.InfoSchema() - dbInfo, ok := is.SchemaByName(model.NewCIStr(db)) - require.True(t, ok) tbl, err := is.TableByName(model.NewCIStr(db), model.NewCIStr(table)) require.NoError(t, err) tblInfo := tbl.Meta() - physicalID := tblInfo.ID - var par model.PartitionDefinition - if partition != "" { - for _, def := range tblInfo.Partition.Definitions { - if def.Name.L == model.NewCIStr(partition).L { - par = def - } - } - require.NotNil(t, par) - physicalID = par.ID - } + physical, err := cache.NewPhysicalTable(model.NewCIStr(db), tblInfo, model.NewCIStr(partition)) + require.NoError(t, err) + return fmt.Sprintf("/tidb/ttl/physical_table/%d/%d", tblInfo.ID, physical.ID), physical +} + +func checkTimerWithTableMeta(t *testing.T, do *domain.Domain, cli timerapi.TimerClient, db, table, partition string, watermark time.Time) *timerapi.TimerRecord { + is := do.InfoSchema() + dbInfo, ok := is.SchemaByName(model.NewCIStr(db)) + require.True(t, ok) - key := fmt.Sprintf("/tidb/ttl/physical_table/%d/%d", tblInfo.ID, physicalID) + key, physical := getPhysicalTableInfo(t, do, db, table, partition) timer, err := cli.GetTimerByKey(context.TODO(), key) require.NoError(t, err) - require.Equal(t, tblInfo.TTLInfo.Enable, timer.Enable) + require.Equal(t, physical.TTLInfo.Enable, timer.Enable) require.Equal(t, timerapi.SchedEventInterval, timer.SchedPolicyType) - require.Equal(t, tblInfo.TTLInfo.JobInterval, timer.SchedPolicyExpr) + require.Equal(t, physical.TTLInfo.JobInterval, timer.SchedPolicyExpr) if partition == "" { require.Equal(t, []string{ fmt.Sprintf("db=%s", dbInfo.Name.O), - fmt.Sprintf("table=%s", tblInfo.Name.O), + fmt.Sprintf("table=%s", physical.Name.O), }, timer.Tags) } else { require.Equal(t, []string{ fmt.Sprintf("db=%s", dbInfo.Name.O), - fmt.Sprintf("table=%s", tblInfo.Name.O), - fmt.Sprintf("partition=%s", par.Name.O), + fmt.Sprintf("table=%s", physical.Name.O), + fmt.Sprintf("partition=%s", physical.Partition.O), }, timer.Tags) } require.NotNil(t, timer.Data) var timerData ttlworker.TTLTimerData require.NoError(t, json.Unmarshal(timer.Data, &timerData)) - require.Equal(t, tblInfo.ID, timerData.TableID) - require.Equal(t, physicalID, timerData.PhysicalID) + require.Equal(t, physical.TableInfo.ID, timerData.TableID) + require.Equal(t, physical.ID, timerData.PhysicalID) require.Equal(t, watermark.Unix(), timer.Watermark.Unix()) return timer } diff --git a/ttl/ttlworker/timer_test.go b/ttl/ttlworker/timer_test.go new file mode 100644 index 0000000000000..3268de5c0f94d --- /dev/null +++ b/ttl/ttlworker/timer_test.go @@ -0,0 +1,556 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttlworker + +import ( + "context" + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/google/uuid" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/sessionctx/variable" + timerapi "github.com/pingcap/tidb/timer/api" + "github.com/pingcap/tidb/util/logutil" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +type mockJobAdapter struct { + mock.Mock +} + +func (a *mockJobAdapter) CanSubmitJob(tableID, physicalID int64) bool { + args := a.Called(tableID, physicalID) + return args.Bool(0) +} + +func (a *mockJobAdapter) SubmitJob(ctx context.Context, tableID, physicalID int64, requestID string, watermark time.Time) (job *TTLJobTrace, _ error) { + args := a.Called(ctx, tableID, physicalID, requestID, watermark) + if obj := args.Get(0); obj != nil { + job = obj.(*TTLJobTrace) + } + return job, args.Error(1) +} + +func (a *mockJobAdapter) GetJob(ctx context.Context, tableID, physicalID int64, requestID string) (job *TTLJobTrace, _ error) { + args := a.Called(ctx, tableID, physicalID, requestID) + if obj := args.Get(0); obj != nil { + job = obj.(*TTLJobTrace) + } + return job, args.Error(1) +} + +type mockTimerCli struct { + mock.Mock + timerapi.TimerClient +} + +func (c *mockTimerCli) GetTimerByID(ctx context.Context, timerID string) (r *timerapi.TimerRecord, _ error) { + args := c.Called(ctx, timerID) + if obj := args.Get(0); obj != nil { + r = obj.(*timerapi.TimerRecord) + } + return r, args.Error(1) +} + +func (c *mockTimerCli) CloseTimerEvent(ctx context.Context, timerID string, eventID string, opts ...timerapi.UpdateTimerOption) error { + args := c.Called(ctx, timerID, eventID, opts) + if err := args.Error(0); err != nil { + return err + } + return c.TimerClient.CloseTimerEvent(ctx, timerID, eventID, opts...) +} + +type mockTimerSchedEvent struct { + eventID string + timer *timerapi.TimerRecord +} + +func (e *mockTimerSchedEvent) EventID() string { + return e.eventID +} + +func (e *mockTimerSchedEvent) Timer() *timerapi.TimerRecord { + return e.timer +} + +func createTestTimer(t *testing.T, cli timerapi.TimerClient) (*timerapi.TimerRecord, *TTLTimerData) { + data := &TTLTimerData{ + TableID: 1, + PhysicalID: 2, + } + + bs, err := json.Marshal(data) + require.NoError(t, err) + + timer, err := cli.CreateTimer(context.TODO(), timerapi.TimerSpec{ + Key: fmt.Sprintf("%s%d/%d", timerKeyPrefix, data.TableID, data.PhysicalID), + HookClass: timerHookClass, + Tags: []string{"db=test", "table=t1", "partition=p0"}, + Data: bs, + SchedPolicyType: timerapi.SchedEventInterval, + SchedPolicyExpr: "1h", + Watermark: time.Unix(3600*128, 0), + Enable: true, + }) + require.NoError(t, err) + return timer, data +} + +func triggerTestTimer(t *testing.T, store *timerapi.TimerStore, timerID string) *timerapi.TimerRecord { + timer, err := store.GetByID(context.TODO(), timerID) + require.NoError(t, err) + require.NotNil(t, timer) + require.Empty(t, timer.EventID) + eventID := uuid.NewString() + err = store.Update(context.TODO(), timer.ID, &timerapi.TimerUpdate{ + EventStatus: timerapi.NewOptionalVal(timerapi.SchedEventTrigger), + EventID: timerapi.NewOptionalVal(eventID), + EventStart: timerapi.NewOptionalVal(time.Now().Add(-2 * time.Second)), + EventExtra: timerapi.NewOptionalVal(timerapi.EventExtra{ + EventWatermark: timer.Watermark, + }), + }) + require.NoError(t, err) + timer, err = store.GetByID(context.TODO(), timerID) + require.NoError(t, err) + require.Equal(t, timerapi.SchedEventTrigger, timer.EventStatus) + require.Equal(t, eventID, timer.EventID) + return timer +} + +func clearTTLWindowAndEnable() { + variable.EnableTTLJob.Store(true) + variable.TTLJobScheduleWindowStartTime.Store(time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC)) + variable.TTLJobScheduleWindowEndTime.Store(time.Date(0, 0, 0, 23, 59, 0, 0, time.UTC)) +} + +func makeTTLSummary(t *testing.T, requestID string) (*ttlTimerSummary, []byte) { + summary := &ttlTimerSummary{ + LastJobRequestID: requestID, + LastJobSummary: &TTLSummary{ + TotalRows: 100, + SuccessRows: 98, + ErrorRows: 2, + + TotalScanTask: 10, + ScheduledScanTask: 9, + FinishedScanTask: 1, + + ScanTaskErr: "err1", + SummaryText: "summary1", + }, + } + + bs, err := json.Marshal(summary) + require.NoError(t, err) + return summary, bs +} + +func checkTTLTimerNotChange(t *testing.T, cli timerapi.TimerClient, r *timerapi.TimerRecord) { + tm, err := cli.GetTimerByID(context.TODO(), r.ID) + require.NoError(t, err) + require.Equal(t, *r, *tm) +} + +func TestTTLTimerHookPrepare(t *testing.T) { + defer clearTTLWindowAndEnable() + clearTTLWindowAndEnable() + + adapter := &mockJobAdapter{} + store := timerapi.NewMemoryTimerStore() + defer store.Close() + + cli := timerapi.NewDefaultTimerClient(store) + timer, data := createTestTimer(t, cli) + + hook := newTTLTimerHook(adapter, cli) + + // normal + adapter.On("CanSubmitJob", data.TableID, data.PhysicalID).Return(true).Once() + r, err := hook.OnPreSchedEvent(context.TODO(), &mockTimerSchedEvent{eventID: "event1", timer: timer}) + require.NoError(t, err) + require.Equal(t, timerapi.PreSchedEventResult{}, r) + adapter.AssertExpectations(t) + + // global ttl job disabled + variable.EnableTTLJob.Store(false) + r, err = hook.OnPreSchedEvent(context.TODO(), &mockTimerSchedEvent{eventID: "event1", timer: timer}) + require.NoError(t, err) + require.Equal(t, timerapi.PreSchedEventResult{Delay: time.Minute}, r) + adapter.AssertExpectations(t) + + // not in window + now := time.Date(2023, 1, 1, 15, 10, 0, 0, time.UTC) + hook.nowFunc = func() time.Time { + return now + } + clearTTLWindowAndEnable() + variable.TTLJobScheduleWindowStartTime.Store(time.Date(0, 0, 0, 15, 11, 0, 0, time.UTC)) + r, err = hook.OnPreSchedEvent(context.TODO(), &mockTimerSchedEvent{eventID: "event1", timer: timer}) + require.NoError(t, err) + require.Equal(t, timerapi.PreSchedEventResult{Delay: time.Minute}, r) + adapter.AssertExpectations(t) + + clearTTLWindowAndEnable() + variable.TTLJobScheduleWindowEndTime.Store(time.Date(0, 0, 0, 15, 9, 0, 0, time.UTC)) + r, err = hook.OnPreSchedEvent(context.TODO(), &mockTimerSchedEvent{eventID: "event1", timer: timer}) + require.NoError(t, err) + require.Equal(t, timerapi.PreSchedEventResult{Delay: time.Minute}, r) + adapter.AssertExpectations(t) + + // in window + clearTTLWindowAndEnable() + adapter.On("CanSubmitJob", data.TableID, data.PhysicalID).Return(true).Once() + variable.TTLJobScheduleWindowStartTime.Store(time.Date(0, 0, 0, 15, 9, 0, 0, time.UTC)) + variable.TTLJobScheduleWindowEndTime.Store(time.Date(0, 0, 0, 15, 11, 0, 0, time.UTC)) + r, err = hook.OnPreSchedEvent(context.TODO(), &mockTimerSchedEvent{eventID: "event1", timer: timer}) + require.NoError(t, err) + require.Equal(t, timerapi.PreSchedEventResult{}, r) + adapter.AssertExpectations(t) + + // CanSubmitJob returns false + clearTTLWindowAndEnable() + adapter.On("CanSubmitJob", data.TableID, data.PhysicalID).Return(false).Once() + r, err = hook.OnPreSchedEvent(context.TODO(), &mockTimerSchedEvent{eventID: "event1", timer: timer}) + require.NoError(t, err) + require.Equal(t, timerapi.PreSchedEventResult{Delay: time.Minute}, r) + adapter.AssertExpectations(t) +} + +func TestTTLTimerHookOnEvent(t *testing.T) { + ctx := context.Background() + adapter := &mockJobAdapter{} + store := timerapi.NewMemoryTimerStore() + defer store.Close() + + cli := timerapi.NewDefaultTimerClient(store) + timer, data := createTestTimer(t, cli) + timer = triggerTestTimer(t, store, timer.ID) + + hook := newTTLTimerHook(adapter, cli) + hook.Start() + defer hook.Stop() + + // get job error + adapter.On("GetJob", ctx, data.TableID, data.PhysicalID, timer.EventID). + Return(nil, errors.New("mockErr")). + Once() + err := hook.OnSchedEvent(ctx, &mockTimerSchedEvent{eventID: timer.EventID, timer: timer}) + require.EqualError(t, err, "mockErr") + adapter.AssertExpectations(t) + checkTTLTimerNotChange(t, cli, timer) + require.Equal(t, int64(0), hook.waitJobLoopCounter) + + // submit job error + adapter.On("GetJob", ctx, data.TableID, data.PhysicalID, timer.EventID). + Return(nil, nil). + Once() + adapter.On("CanSubmitJob", data.TableID, data.PhysicalID). + Return(true). + Once() + adapter.On("SubmitJob", ctx, data.TableID, data.PhysicalID, timer.EventID, timer.EventStart). + Return(nil, errors.New("mockSubmitErr")). + Once() + err = hook.OnSchedEvent(ctx, &mockTimerSchedEvent{eventID: timer.EventID, timer: timer}) + require.EqualError(t, err, "mockSubmitErr") + adapter.AssertExpectations(t) + checkTTLTimerNotChange(t, cli, timer) + require.Equal(t, int64(0), hook.waitJobLoopCounter) + + waitJobDone := func(timerID string) *timerapi.TimerRecord { + start := time.Now() + for { + if time.Since(start) > time.Minute { + require.FailNow(t, "timeout") + } + + tm, err := cli.GetTimerByID(ctx, timerID) + require.NoError(t, err) + if tm.EventStatus != timerapi.SchedEventTrigger { + return tm + } + time.Sleep(100 * time.Millisecond) + } + } + + // submit job and wait done + hook.checkTTLJobInterval = time.Millisecond + summary, summaryData := makeTTLSummary(t, timer.EventID) + eventStart := timer.EventStart + adapter.On("GetJob", ctx, data.TableID, data.PhysicalID, timer.EventID). + Return(nil, nil). + Once() + adapter.On("CanSubmitJob", data.TableID, data.PhysicalID). + Return(true). + Once() + adapter.On("SubmitJob", ctx, data.TableID, data.PhysicalID, timer.EventID, timer.EventStart). + Return(&TTLJobTrace{RequestID: timer.EventID}, nil). + Once() + adapter.On("GetJob", hook.ctx, data.TableID, data.PhysicalID, timer.EventID). + Return(&TTLJobTrace{RequestID: timer.EventID, Finished: true, Summary: summary.LastJobSummary}, nil). + Once() + err = hook.OnSchedEvent(ctx, &mockTimerSchedEvent{eventID: timer.EventID, timer: timer}) + require.NoError(t, err) + require.Equal(t, int64(1), hook.waitJobLoopCounter) + timer = waitJobDone(timer.ID) + require.Equal(t, timerapi.SchedEventIdle, timer.EventStatus) + require.Empty(t, timer.EventID) + require.Empty(t, timer.EventStart) + require.Equal(t, timer.Watermark, eventStart) + require.Equal(t, summaryData, timer.SummaryData) + adapter.AssertExpectations(t) + + // wait exist job + timer = triggerTestTimer(t, store, timer.ID) + summary, summaryData = makeTTLSummary(t, timer.EventID) + eventStart = timer.EventStart + adapter.On("GetJob", ctx, data.TableID, data.PhysicalID, timer.EventID). + Return(&TTLJobTrace{RequestID: timer.EventID}, nil). + Once() + adapter.On("GetJob", hook.ctx, data.TableID, data.PhysicalID, timer.EventID). + Return(&TTLJobTrace{RequestID: timer.EventID, Finished: true, Summary: summary.LastJobSummary}, nil). + Once() + err = hook.OnSchedEvent(ctx, &mockTimerSchedEvent{eventID: timer.EventID, timer: timer}) + require.NoError(t, err) + require.Equal(t, int64(2), hook.waitJobLoopCounter) + timer = waitJobDone(timer.ID) + require.Equal(t, timerapi.SchedEventIdle, timer.EventStatus) + require.Empty(t, timer.EventID) + require.Empty(t, timer.EventStart) + require.Equal(t, timer.Watermark, eventStart) + require.Equal(t, summaryData, timer.SummaryData) + adapter.AssertExpectations(t) + + // job not exists but table ttl not enabled + watermark := time.Unix(3600*123, 0) + require.NoError(t, cli.UpdateTimer(ctx, timer.ID, timerapi.WithSetWatermark(watermark))) + timer = triggerTestTimer(t, store, timer.ID) + adapter.On("GetJob", ctx, data.TableID, data.PhysicalID, timer.EventID). + Return(nil, nil). + Once() + adapter.On("CanSubmitJob", data.TableID, data.PhysicalID). + Return(false). + Once() + err = hook.OnSchedEvent(ctx, &mockTimerSchedEvent{eventID: timer.EventID, timer: timer}) + require.NoError(t, err) + adapter.AssertExpectations(t) + oldSummary := timer.SummaryData + timer, err = cli.GetTimerByID(ctx, timer.ID) + require.NoError(t, err) + require.Equal(t, timerapi.SchedEventIdle, timer.EventStatus) + require.Empty(t, timer.EventID) + require.Equal(t, watermark, timer.Watermark) + require.Equal(t, oldSummary, timer.SummaryData) + + // job not exists but timer disabled + watermark = time.Unix(3600*456, 0) + require.NoError(t, cli.UpdateTimer(ctx, timer.ID, timerapi.WithSetWatermark(watermark), timerapi.WithSetEnable(false))) + timer = triggerTestTimer(t, store, timer.ID) + adapter.On("GetJob", ctx, data.TableID, data.PhysicalID, timer.EventID). + Return(nil, nil). + Once() + require.False(t, timer.Enable) + err = hook.OnSchedEvent(ctx, &mockTimerSchedEvent{eventID: timer.EventID, timer: timer}) + require.NoError(t, err) + adapter.AssertExpectations(t) + oldSummary = timer.SummaryData + timer, err = cli.GetTimerByID(ctx, timer.ID) + require.NoError(t, err) + require.Equal(t, timerapi.SchedEventIdle, timer.EventStatus) + require.Empty(t, timer.EventID) + require.Equal(t, watermark, timer.Watermark) + require.Equal(t, oldSummary, timer.SummaryData) + require.NoError(t, cli.UpdateTimer(ctx, timer.ID, timerapi.WithSetEnable(true))) + + // job not exists but event start too early + watermark = time.Unix(3600*789, 0) + require.NoError(t, cli.UpdateTimer(ctx, timer.ID, timerapi.WithSetWatermark(watermark))) + timer = triggerTestTimer(t, store, timer.ID) + hook.nowFunc = func() time.Time { + return timer.EventStart.Add(11 * time.Minute) + } + adapter.On("GetJob", ctx, data.TableID, data.PhysicalID, timer.EventID). + Return(nil, nil). + Once() + adapter.On("CanSubmitJob", data.TableID, data.PhysicalID). + Return(true). + Once() + err = hook.OnSchedEvent(ctx, &mockTimerSchedEvent{eventID: timer.EventID, timer: timer}) + require.NoError(t, err) + adapter.AssertExpectations(t) + oldSummary = timer.SummaryData + timer, err = cli.GetTimerByID(ctx, timer.ID) + require.NoError(t, err) + require.Equal(t, timerapi.SchedEventIdle, timer.EventStatus) + require.Empty(t, timer.EventID) + require.Equal(t, watermark, timer.Watermark) + require.Equal(t, oldSummary, timer.SummaryData) + + hook.Stop() +} + +func TestWaitTTLJobFinish(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + adapter := &mockJobAdapter{} + store := timerapi.NewMemoryTimerStore() + defer store.Close() + + cli := timerapi.NewDefaultTimerClient(store) + timer, data := createTestTimer(t, cli) + timer = triggerTestTimer(t, store, timer.ID) + + hook := newTTLTimerHook(adapter, cli) + hook.checkTTLJobInterval = time.Millisecond + + // timer closed + hook.ctx, hook.cancel = context.WithTimeout(ctx, time.Minute) + eventID, eventStart := timer.EventID, timer.EventStart + require.NoError(t, cli.CloseTimerEvent(ctx, timer.ID, timer.EventID)) + timer, err := cli.GetTimerByID(ctx, timer.ID) + require.NoError(t, err) + require.Empty(t, timer.EventID) + hook.wg.Add(1) + hook.waitJobFinished(logutil.BgLogger(), data, timer.ID, eventID, eventStart) + require.NoError(t, hook.ctx.Err()) + checkTTLTimerNotChange(t, cli, timer) + adapter.AssertExpectations(t) + + // different event id + timer = triggerTestTimer(t, store, timer.ID) + hook.ctx, hook.cancel = context.WithTimeout(ctx, time.Minute) + hook.wg.Add(1) + hook.waitJobFinished(logutil.BgLogger(), data, timer.ID, eventID, eventStart) + require.NoError(t, hook.ctx.Err()) + checkTTLTimerNotChange(t, cli, timer) + adapter.AssertExpectations(t) + + // timer deleted + hook.ctx, hook.cancel = context.WithTimeout(ctx, time.Minute) + _, err = cli.DeleteTimer(ctx, timer.ID) + require.NoError(t, err) + hook.wg.Add(1) + hook.waitJobFinished(logutil.BgLogger(), data, timer.ID, timer.EventID, timer.EventStart) + require.NoError(t, hook.ctx.Err()) + timers, err := cli.GetTimers(ctx) + require.NoError(t, err) + require.Equal(t, 0, len(timers)) + + // job deleted + timer, data = createTestTimer(t, cli) + timer = triggerTestTimer(t, store, timer.ID) + eventID, eventStart = timer.EventID, timer.EventStart + hook.ctx, hook.cancel = context.WithTimeout(ctx, time.Minute) + adapter.On("GetJob", hook.ctx, data.TableID, data.PhysicalID, timer.EventID). + Return(nil, nil). + Once() + hook.wg.Add(1) + hook.waitJobFinished(logutil.BgLogger(), data, timer.ID, eventID, eventStart) + require.NoError(t, hook.ctx.Err()) + timer, err = cli.GetTimerByID(ctx, timer.ID) + require.NoError(t, err) + require.Equal(t, timerapi.SchedEventIdle, timer.EventStatus) + require.Empty(t, timer.EventID) + require.Equal(t, eventStart, timer.Watermark) + summaryData, err := json.Marshal(ttlTimerSummary{LastJobRequestID: eventID}) + require.NoError(t, err) + require.Equal(t, summaryData, timer.SummaryData) + adapter.AssertExpectations(t) + + // retry until success + timer = triggerTestTimer(t, store, timer.ID) + summary, summaryData := makeTTLSummary(t, timer.EventID) + mockCli := &mockTimerCli{TimerClient: cli} + hook.ctx, hook.cancel = context.WithTimeout(ctx, time.Minute) + hook.cli = mockCli + mockCli.On("GetTimerByID", hook.ctx, timer.ID). + Return(nil, errors.New("mockTimerErr")). + Times(5) + mockCli.On("GetTimerByID", hook.ctx, timer.ID). + Return(timer, nil). + Times(10) + adapter.On("GetJob", hook.ctx, data.TableID, data.PhysicalID, timer.EventID). + Return(nil, errors.New("mockJobErr")). + Times(5) + adapter.On("GetJob", hook.ctx, data.TableID, data.PhysicalID, timer.EventID). + Return(&TTLJobTrace{RequestID: timer.EventID}, errors.New("mockJobErr")). + Times(2) + adapter.On("GetJob", hook.ctx, data.TableID, data.PhysicalID, timer.EventID). + Return(&TTLJobTrace{RequestID: timer.EventID, Finished: true, Summary: summary.LastJobSummary}, nil). + Times(3) + mockCli.On("CloseTimerEvent", hook.ctx, timer.ID, timer.EventID, mock.Anything). + Return(errors.New("mockCloseErr")). + Times(2) + mockCli.On("CloseTimerEvent", hook.ctx, timer.ID, timer.EventID, mock.Anything). + Return(nil). + Once() + eventID, eventStart = timer.EventID, timer.EventStart + hook.wg.Add(1) + hook.waitJobFinished(logutil.BgLogger(), data, timer.ID, eventID, eventStart) + require.NoError(t, hook.ctx.Err()) + adapter.AssertExpectations(t) + mockCli.AssertExpectations(t) + timer, err = cli.GetTimerByID(ctx, timer.ID) + require.NoError(t, err) + require.Equal(t, timerapi.SchedEventIdle, timer.EventStatus) + require.Empty(t, timer.EventID) + require.Equal(t, eventStart, timer.Watermark) + require.Equal(t, summaryData, timer.SummaryData) + + // retry until context done + timer = triggerTestTimer(t, store, timer.ID) + eventID, eventStart = timer.EventID, timer.EventStart + hook.ctx, hook.cancel = context.WithTimeout(ctx, time.Minute) + hook.cli = cli + adapter.On("GetJob", hook.ctx, data.TableID, data.PhysicalID, timer.EventID). + Return(nil, errors.New("mockErr")). + Times(10) + adapter.On("GetJob", hook.ctx, data.TableID, data.PhysicalID, timer.EventID). + Return(nil, errors.New("mockErr")). + Once(). + Run(func(_ mock.Arguments) { hook.cancel() }) + hook.wg.Add(1) + hook.waitJobFinished(logutil.BgLogger(), data, timer.ID, eventID, eventStart) + adapter.AssertExpectations(t) + checkTTLTimerNotChange(t, cli, timer) +} + +func TestTTLTimerRuntime(t *testing.T) { + adapter := &mockJobAdapter{} + store := timerapi.NewMemoryTimerStore() + defer store.Close() + + r := newTTLTimerRuntime(store, adapter) + r.Resume() + rt := r.rt + require.NotNil(t, rt) + r.Resume() + require.Same(t, rt, r.rt) + + r.Pause() + require.Nil(t, r.rt) + r.Pause() + require.Nil(t, r.rt) + + r.Resume() + require.NotNil(t, r.rt) + r.Pause() + require.Nil(t, r.rt) +}