diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index d95c4def0d0..f35b5eea01e 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -33,6 +33,7 @@ import ( cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/etcd" "github.com/pingcap/ticdc/pkg/orchestrator" + "github.com/pingcap/ticdc/pkg/pdtime" "github.com/pingcap/ticdc/pkg/version" tidbkv "github.com/pingcap/tidb/kv" pd "github.com/tikv/pd/client" @@ -55,10 +56,11 @@ type Capture struct { session *concurrency.Session election *concurrency.Election - pdClient pd.Client - kvStorage tidbkv.Storage - etcdClient *etcd.CDCEtcdClient - grpcPool kv.GrpcPool + pdClient pd.Client + kvStorage tidbkv.Storage + etcdClient *etcd.CDCEtcdClient + grpcPool kv.GrpcPool + TimeAcquirer pdtime.TimeAcquirer cancel context.CancelFunc @@ -100,6 +102,12 @@ func (c *Capture) reset(ctx context.Context) error { } c.session = sess c.election = concurrency.NewElection(sess, etcd.CaptureOwnerKey) + + if c.TimeAcquirer != nil { + c.TimeAcquirer.Stop() + } + c.TimeAcquirer = pdtime.NewTimeAcquirer(c.pdClient) + if c.grpcPool != nil { c.grpcPool.Close() } @@ -148,11 +156,12 @@ func (c *Capture) Run(ctx context.Context) error { func (c *Capture) run(stdCtx context.Context) error { ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{ - PDClient: c.pdClient, - KVStorage: c.kvStorage, - CaptureInfo: c.info, - EtcdClient: c.etcdClient, - GrpcPool: c.grpcPool, + PDClient: c.pdClient, + KVStorage: c.kvStorage, + CaptureInfo: c.info, + EtcdClient: c.etcdClient, + GrpcPool: c.grpcPool, + TimeAcquirer: c.TimeAcquirer, }) err := c.register(ctx) if err != nil { @@ -166,7 +175,7 @@ func (c *Capture) run(stdCtx context.Context) error { cancel() }() wg := new(sync.WaitGroup) - wg.Add(3) + wg.Add(4) var ownerErr, processorErr error go func() { defer wg.Done() @@ -188,6 +197,10 @@ func (c *Capture) run(stdCtx context.Context) error { processorErr = c.runEtcdWorker(ctx, c.processorManager, orchestrator.NewGlobalState(), processorFlushInterval) log.Info("the processor routine has exited", zap.Error(processorErr)) }() + go func() { + defer wg.Done() + c.TimeAcquirer.Run(ctx) + }() go func() { defer wg.Done() c.grpcPool.RecycleConn(ctx) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 2f7d2f7e8ed..3bb3dfbb826 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -16,7 +16,6 @@ package owner import ( "context" "sync" - "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -184,7 +183,9 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *orchestrator.Changefeed return errors.Trace(err) } if shouldUpdateState { - c.updateStatus(barrierTs) + pdTime, _ := ctx.GlobalVars().TimeAcquirer.CurrentTimeFromCached() + currentTs := oracle.GetPhysical(pdTime) + c.updateStatus(currentTs, barrierTs) } return nil } @@ -464,7 +465,7 @@ func (c *changefeed) asyncExecDDL(ctx cdcContext.Context, job *timodel.Job) (don return done, nil } -func (c *changefeed) updateStatus(barrierTs model.Ts) { +func (c *changefeed) updateStatus(currentTs int64, barrierTs model.Ts) { resolvedTs := barrierTs for _, position := range c.state.TaskPositions { if resolvedTs > position.ResolvedTs { @@ -496,12 +497,10 @@ func (c *changefeed) updateStatus(barrierTs model.Ts) { } return status, changed, nil }) - phyTs := oracle.ExtractPhysical(checkpointTs) + c.metricsChangefeedCheckpointTsGauge.Set(float64(phyTs)) - // It is more accurate to get tso from PD, but in most cases since we have - // deployed NTP service, a little bias is acceptable here. - c.metricsChangefeedCheckpointTsLagGauge.Set(float64(oracle.GetPhysical(time.Now())-phyTs) / 1e3) + c.metricsChangefeedCheckpointTsLagGauge.Set(float64(currentTs-phyTs) / 1e3) } func (c *changefeed) Close(ctx context.Context) { diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index 5c73d580311..e3a630a4340 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/ticdc/pkg/config" cdcContext "github.com/pingcap/ticdc/pkg/context" "github.com/pingcap/ticdc/pkg/orchestrator" + "github.com/pingcap/ticdc/pkg/pdtime" "github.com/pingcap/ticdc/pkg/txnutil/gc" "github.com/pingcap/ticdc/pkg/util/testleak" "github.com/pingcap/ticdc/pkg/version" @@ -216,6 +217,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) { AdvertiseAddr: "127.0.0.1:0000", Version: version.ReleaseVersion, }, + TimeAcquirer: pdtime.NewTimeAcquirer4Test(), }) ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ ID: "changefeed-id-test", diff --git a/cdc/processor/manager.go b/cdc/processor/manager.go index 6b3782fbd99..968a65ebca4 100644 --- a/cdc/processor/manager.go +++ b/cdc/processor/manager.go @@ -70,6 +70,7 @@ func (m *Manager) Tick(stdCtx context.Context, state orchestrator.ReactorState) if err := m.handleCommand(); err != nil { return state, err } + captureID := ctx.GlobalVars().CaptureInfo.ID var inactiveChangefeedCount int for changefeedID, changefeedState := range globalState.Changefeeds { diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 1c060d790d6..36787e3d70b 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -159,7 +159,8 @@ func (p *processor) tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR if !p.checkChangefeedNormal() { return nil, cerror.ErrAdminStopProcessor.GenWithStackByArgs() } - if skip := p.checkPosition(); skip { + // we should skip this tick after create a task position + if p.createTaskPosition() { return p.changefeed, nil } if err := p.handleErrorCh(ctx); err != nil { @@ -177,7 +178,11 @@ func (p *processor) tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR if err := p.flushRedoLogMeta(ctx); err != nil { return nil, err } - p.handlePosition() + // it is no need to check the err here, because we will use + // local time when an error return, which is acceptable + pdTime, _ := ctx.GlobalVars().TimeAcquirer.CurrentTimeFromCached() + + p.handlePosition(oracle.GetPhysical(pdTime)) p.pushResolvedTs2Table() p.handleWorkload() p.doGCSchemaStorage(ctx) @@ -195,10 +200,10 @@ func (p *processor) checkChangefeedNormal() bool { return true } -// checkPosition create a new task position, and put it into the etcd state. -// task position maybe be not exist only when the processor is running first time. -func (p *processor) checkPosition() (skipThisTick bool) { - if p.changefeed.TaskPositions[p.captureInfo.ID] != nil { +// createTaskPosition will create a new task position if a task position does not exist. +// task position not exist only when the processor is running first in the first tick. +func (p *processor) createTaskPosition() (skipThisTick bool) { + if _, exist := p.changefeed.TaskPositions[p.captureInfo.ID]; exist { return false } if p.initialized { @@ -559,7 +564,7 @@ func (p *processor) checkTablesNum(ctx cdcContext.Context) error { } // handlePosition calculates the local resolved ts and local checkpoint ts -func (p *processor) handlePosition() { +func (p *processor) handlePosition(currentTs int64) { minResolvedTs := uint64(math.MaxUint64) if p.schemaStorage != nil { minResolvedTs = p.schemaStorage.ResolvedTs() @@ -580,15 +585,11 @@ func (p *processor) handlePosition() { } resolvedPhyTs := oracle.ExtractPhysical(minResolvedTs) - // It is more accurate to get tso from PD, but in most cases we have - // deployed NTP service, a little bias is acceptable here. - p.metricResolvedTsLagGauge.Set(float64(oracle.GetPhysical(time.Now())-resolvedPhyTs) / 1e3) + p.metricResolvedTsLagGauge.Set(float64(currentTs-resolvedPhyTs) / 1e3) p.metricResolvedTsGauge.Set(float64(resolvedPhyTs)) checkpointPhyTs := oracle.ExtractPhysical(minCheckpointTs) - // It is more accurate to get tso from PD, but in most cases we have - // deployed NTP service, a little bias is acceptable here. - p.metricCheckpointTsLagGauge.Set(float64(oracle.GetPhysical(time.Now())-checkpointPhyTs) / 1e3) + p.metricCheckpointTsLagGauge.Set(float64(currentTs-checkpointPhyTs) / 1e3) p.metricCheckpointTsGauge.Set(float64(checkpointPhyTs)) // minResolvedTs and minCheckpointTs may less than global resolved ts and global checkpoint ts when a new table added, the startTs of the new table is less than global checkpoint ts. diff --git a/pkg/context/context.go b/pkg/context/context.go index 9c2445de027..6cf689fa2d8 100644 --- a/pkg/context/context.go +++ b/pkg/context/context.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/etcd" + "github.com/pingcap/ticdc/pkg/pdtime" "github.com/pingcap/ticdc/pkg/version" tidbkv "github.com/pingcap/tidb/kv" "github.com/tikv/client-go/v2/oracle" @@ -33,11 +34,12 @@ import ( // the lifecycle of vars in the GlobalVars should be aligned with the ticdc server process. // All field in Vars should be READ-ONLY and THREAD-SAFE type GlobalVars struct { - PDClient pd.Client - KVStorage tidbkv.Storage - CaptureInfo *model.CaptureInfo - EtcdClient *etcd.CDCEtcdClient - GrpcPool kv.GrpcPool + PDClient pd.Client + KVStorage tidbkv.Storage + CaptureInfo *model.CaptureInfo + EtcdClient *etcd.CDCEtcdClient + GrpcPool kv.GrpcPool + TimeAcquirer pdtime.TimeAcquirer } // ChangefeedVars contains some vars which can be used anywhere in a pipeline @@ -184,6 +186,7 @@ func NewBackendContext4Test(withChangefeedVars bool) Context { AdvertiseAddr: "127.0.0.1:0000", Version: version.ReleaseVersion, }, + TimeAcquirer: pdtime.NewTimeAcquirer4Test(), }) if withChangefeedVars { ctx = WithChangefeedVars(ctx, &ChangefeedVars{ diff --git a/pkg/orchestrator/etcd_worker_test.go b/pkg/orchestrator/etcd_worker_test.go index 72cced6004f..ad0711f386d 100644 --- a/pkg/orchestrator/etcd_worker_test.go +++ b/pkg/orchestrator/etcd_worker_test.go @@ -18,6 +18,7 @@ import ( "encoding/json" "regexp" "strconv" + "strings" "sync" "testing" "time" @@ -223,7 +224,6 @@ func (s *etcdWorkerSuite) TestEtcdSum(c *check.C) { defer func() { _ = cli.Unwrap().Close() }() - _, err := cli.Put(ctx, testEtcdKeyPrefix+"/sum", "0") c.Check(err, check.IsNil) @@ -272,7 +272,9 @@ func (s *etcdWorkerSuite) TestEtcdSum(c *check.C) { } err = errg.Wait() - if err != nil && (errors.Cause(err) == context.DeadlineExceeded || errors.Cause(err) == context.Canceled) { + if err != nil && (errors.Cause(err) == context.DeadlineExceeded || + errors.Cause(err) == context.Canceled || + strings.Contains(err.Error(), "etcdserver: request timeout")) { return } c.Check(err, check.IsNil) diff --git a/pkg/pdtime/acquirer.go b/pkg/pdtime/acquirer.go new file mode 100644 index 00000000000..3fae739fa9d --- /dev/null +++ b/pkg/pdtime/acquirer.go @@ -0,0 +1,118 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pdtime + +import ( + "context" + "sync" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/retry" + "github.com/tikv/client-go/v2/oracle" + pd "github.com/tikv/pd/client" + "go.uber.org/zap" +) + +const pdTimeUpdateInterval = 200 * time.Millisecond + +type TimeAcquirer interface { + // Run run the TimeAcquirer + Run(ctx context.Context) + // CurrentTimeFromCached returns current time from cache + CurrentTimeFromCached() (time.Time, error) + // Stop stops the TimeAcquirer + Stop() +} + +// TimeAcquirerImpl cache time get from PD periodically and cache it +type TimeAcquirerImpl struct { + pdClient pd.Client + timeCache time.Time + mu sync.RWMutex + cancel context.CancelFunc + err error +} + +// NewTimeAcquirer return a new TimeAcquirer +func NewTimeAcquirer(pdClient pd.Client) TimeAcquirer { + return &TimeAcquirerImpl{ + pdClient: pdClient, + } +} + +// Run will get time from pd periodically to cache in pdPhysicalTimeCache +func (c *TimeAcquirerImpl) Run(ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + c.cancel = cancel + ticker := time.NewTicker(pdTimeUpdateInterval) + for { + select { + // c.Stop() was called or parent ctx was canceled + case <-ctx.Done(): + log.Info("TimeAcquirer exit") + return + case <-ticker.C: + err := retry.Do(ctx, func() error { + physical, _, err := c.pdClient.GetTS(ctx) + if err != nil { + log.Info("get time from pd failed, retry later", zap.Error(err)) + return err + } + c.mu.Lock() + c.timeCache = oracle.GetTimeFromTS(oracle.ComposeTS(physical, 0)) + c.err = nil + c.mu.Unlock() + return nil + }, retry.WithBackoffBaseDelay(200), retry.WithMaxTries(10)) + if err != nil { + log.Warn("get time from pd failed, will use local time as pd time") + c.mu.Lock() + c.timeCache = time.Now() + c.err = err + c.mu.Unlock() + } + } + } +} + +// CurrentTimeFromCached return current time from pd cache +func (c *TimeAcquirerImpl) CurrentTimeFromCached() (time.Time, error) { + c.mu.RLock() + err := c.err + cacheTime := c.timeCache + c.mu.RUnlock() + return cacheTime, errors.Trace(err) +} + +func (c *TimeAcquirerImpl) Stop() { + c.cancel() +} + +type TimeAcquirer4Test struct{} + +func NewTimeAcquirer4Test() TimeAcquirer { + return &TimeAcquirer4Test{} +} + +func (c *TimeAcquirer4Test) CurrentTimeFromCached() (time.Time, error) { + return time.Now(), nil +} + +func (c *TimeAcquirer4Test) Run(ctx context.Context) { +} + +func (c *TimeAcquirer4Test) Stop() { +} diff --git a/pkg/pdtime/acquirer_test.go b/pkg/pdtime/acquirer_test.go new file mode 100644 index 00000000000..55b2950192e --- /dev/null +++ b/pkg/pdtime/acquirer_test.go @@ -0,0 +1,53 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pdtime + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" + pd "github.com/tikv/pd/client" +) + +// MockPDClient mocks pd.Client to facilitate unit testing. +type MockPDClient struct { + pd.Client +} + +// GetTS implements pd.Client.GetTS. +func (m *MockPDClient) GetTS(ctx context.Context) (int64, int64, error) { + return oracle.GetPhysical(time.Now()), 0, nil +} + +func TestTimeFromPD(t *testing.T) { + t.Parallel() + mockPDClient := &MockPDClient{} + TimeAcquirer := NewTimeAcquirer(mockPDClient) + go TimeAcquirer.Run(context.Background()) + defer TimeAcquirer.Stop() + time.Sleep(1 * time.Second) + + t1, err := TimeAcquirer.CurrentTimeFromCached() + require.Nil(t, err) + + time.Sleep(400 * time.Millisecond) + // assume that the gc safe point updated one hour ago + t2, err := TimeAcquirer.CurrentTimeFromCached() + require.Nil(t, err) + // should return new time + require.NotEqual(t, t1, t2) +} diff --git a/pkg/pdtime/main_test.go b/pkg/pdtime/main_test.go new file mode 100644 index 00000000000..229d3e567f5 --- /dev/null +++ b/pkg/pdtime/main_test.go @@ -0,0 +1,24 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pdtime + +import ( + "testing" + + "github.com/pingcap/ticdc/pkg/leakutil" +) + +func TestMain(m *testing.M) { + leakutil.SetUpLeakTest(m) +} diff --git a/pkg/txnutil/gc/gc_manager.go b/pkg/txnutil/gc/gc_manager.go index df997487dbf..1a158fa6f9d 100644 --- a/pkg/txnutil/gc/gc_manager.go +++ b/pkg/txnutil/gc/gc_manager.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/pkg/config" + cdcContext "github.com/pingcap/ticdc/pkg/context" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" @@ -31,7 +32,6 @@ import ( const ( // CDCServiceSafePointID is the ID of CDC service in pd.UpdateServiceGCSafePoint. CDCServiceSafePointID = "ticdc" - pdTimeUpdateInterval = 10 * time.Minute ) // gcSafepointUpdateInterval is the minimum interval that CDC can update gc safepoint @@ -43,32 +43,27 @@ type Manager interface { // Manager may skip update when it thinks it is too frequent. // Set `forceUpdate` to force Manager update. TryUpdateGCSafePoint(ctx context.Context, checkpointTs model.Ts, forceUpdate bool) error - CurrentTimeFromPDCached(ctx context.Context) (time.Time, error) CheckStaleCheckpointTs(ctx context.Context, changefeedID model.ChangeFeedID, checkpointTs model.Ts) error } type gcManager struct { pdClient pd.Client - - gcTTL int64 + gcTTL int64 lastUpdatedTime time.Time lastSucceededTime time.Time lastSafePointTs uint64 isTiCDCBlockGC bool - - pdPhysicalTimeCache time.Time - lastUpdatedPdTime time.Time } // NewManager creates a new Manager. -func NewManager(pdClint pd.Client) Manager { +func NewManager(pdClient pd.Client) Manager { serverConfig := config.GetGlobalServerConfig() failpoint.Inject("InjectGcSafepointUpdateInterval", func(val failpoint.Value) { gcSafepointUpdateInterval = time.Duration(val.(int) * int(time.Millisecond)) }) return &gcManager{ - pdClient: pdClint, + pdClient: pdClient, lastSucceededTime: time.Now(), gcTTL: serverConfig.GcTTL, } @@ -111,25 +106,17 @@ func (m *gcManager) TryUpdateGCSafePoint( return nil } -func (m *gcManager) CurrentTimeFromPDCached(ctx context.Context) (time.Time, error) { - if time.Since(m.lastUpdatedPdTime) <= pdTimeUpdateInterval { - return m.pdPhysicalTimeCache, nil - } - physical, logical, err := m.pdClient.GetTS(ctx) - if err != nil { - return time.Now(), errors.Trace(err) - } - m.pdPhysicalTimeCache = oracle.GetTimeFromTS(oracle.ComposeTS(physical, logical)) - m.lastUpdatedPdTime = time.Now() - return m.pdPhysicalTimeCache, nil -} - func (m *gcManager) CheckStaleCheckpointTs( ctx context.Context, changefeedID model.ChangeFeedID, checkpointTs model.Ts, ) error { gcSafepointUpperBound := checkpointTs - 1 if m.isTiCDCBlockGC { - pdTime, err := m.CurrentTimeFromPDCached(ctx) + cctx, ok := ctx.(cdcContext.Context) + if !ok { + return cerror.ErrOwnerUnknown.GenWithStack("ctx not an cdcContext.Context, it should be") + } + pdTime, err := cctx.GlobalVars().TimeAcquirer.CurrentTimeFromCached() + // TODO: should we return err here, or just log it? if err != nil { return errors.Trace(err) } diff --git a/pkg/txnutil/gc/gc_manager_test.go b/pkg/txnutil/gc/gc_manager_test.go index 1f535f8cc94..d650aa72443 100644 --- a/pkg/txnutil/gc/gc_manager_test.go +++ b/pkg/txnutil/gc/gc_manager_test.go @@ -18,6 +18,8 @@ import ( "testing" "time" + "github.com/pingcap/ticdc/pkg/pdtime" + "github.com/pingcap/check" "github.com/pingcap/errors" cdcContext "github.com/pingcap/ticdc/pkg/context" @@ -87,49 +89,32 @@ func (s *gcManagerSuite) TestUpdateGCSafePoint(c *check.C) { } } -func (s *gcManagerSuite) TestTimeFromPD(c *check.C) { - defer testleak.AfterTest(c)() - mockPDClient := &MockPDClient{} - gcManager := NewManager(mockPDClient).(*gcManager) - ctx := cdcContext.NewBackendContext4Test(true) - ctx.GlobalVars().PDClient = mockPDClient - t1, err := gcManager.CurrentTimeFromPDCached(ctx) - c.Assert(err, check.IsNil) - c.Assert(t1, check.Equals, gcManager.pdPhysicalTimeCache) - - time.Sleep(50 * time.Millisecond) - // should return cached time - t2, err := gcManager.CurrentTimeFromPDCached(ctx) - c.Assert(err, check.IsNil) - c.Assert(t2, check.Equals, gcManager.pdPhysicalTimeCache) - c.Assert(t2, check.Equals, t1) - - time.Sleep(50 * time.Millisecond) - // assume that the gc safe point updated one hour ago - gcManager.lastUpdatedPdTime = time.Now().Add(-time.Hour) - t3, err := gcManager.CurrentTimeFromPDCached(ctx) - c.Assert(err, check.IsNil) - c.Assert(t3, check.Equals, gcManager.pdPhysicalTimeCache) - // should return new time - c.Assert(t3, check.Not(check.Equals), t2) -} - func (s *gcManagerSuite) TestCheckStaleCheckpointTs(c *check.C) { defer testleak.AfterTest(c)() mockPDClient := &MockPDClient{} gcManager := NewManager(mockPDClient).(*gcManager) gcManager.isTiCDCBlockGC = true ctx := context.Background() - err := gcManager.CheckStaleCheckpointTs(ctx, "cfID", 10) + + TimeAcquirer := pdtime.NewTimeAcquirer(mockPDClient) + go TimeAcquirer.Run(ctx) + time.Sleep(1 * time.Second) + defer TimeAcquirer.Stop() + + cCtx := cdcContext.NewContext(ctx, &cdcContext.GlobalVars{ + TimeAcquirer: TimeAcquirer, + }) + + err := gcManager.CheckStaleCheckpointTs(cCtx, "cfID", 10) c.Assert(cerror.ErrGCTTLExceeded.Equal(errors.Cause(err)), check.IsTrue) c.Assert(cerror.ChangefeedFastFailError(err), check.IsTrue) - err = gcManager.CheckStaleCheckpointTs(ctx, "cfID", oracle.GoTimeToTS(time.Now())) + err = gcManager.CheckStaleCheckpointTs(cCtx, "cfID", oracle.GoTimeToTS(time.Now())) c.Assert(err, check.IsNil) gcManager.isTiCDCBlockGC = false gcManager.lastSafePointTs = 20 - err = gcManager.CheckStaleCheckpointTs(ctx, "cfID", 10) + err = gcManager.CheckStaleCheckpointTs(cCtx, "cfID", 10) c.Assert(cerror.ErrSnapshotLostByGC.Equal(errors.Cause(err)), check.IsTrue) c.Assert(cerror.ChangefeedFastFailError(err), check.IsTrue) }