From 1a351901412baa46050e49a09a90dfe33d522d54 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Sun, 24 Apr 2022 19:22:50 +0800 Subject: [PATCH] owner(cdc): fix two metrics problems (#4703) (#4729) close pingcap/tiflow#4714 --- cdc/capture/capture.go | 18 +++++++----- cdc/owner/changefeed.go | 40 +++++++++++++++++--------- cdc/owner/owner.go | 33 +++++++++++++++++++++ cdc/owner/scheduler_test.go | 6 ++++ cdc/scheduler/info_provider.go | 40 ++++++++++++++++++++++++++ cdc/scheduler/info_provider_test.go | 15 ++++++++++ cdc/scheduler/util/table_set.go | 43 ++++++++++++++++++++++++---- cdc/scheduler/util/table_set_test.go | 15 ++++++++++ pkg/pdtime/acquirer.go | 13 +++++++-- pkg/pdtime/acquirer_test.go | 4 ++- pkg/txnutil/gc/gc_manager_test.go | 13 +++++---- 11 files changed, 205 insertions(+), 35 deletions(-) diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index bbc93dbb5f2..40cc46d3f43 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -25,6 +25,13 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" tidbkv "github.com/pingcap/tidb/kv" + "github.com/tikv/client-go/v2/tikv" + pd "github.com/tikv/pd/client" + "go.etcd.io/etcd/clientv3/concurrency" + "go.etcd.io/etcd/mvcc" + "go.uber.org/zap" + "golang.org/x/time/rate" + "github.com/pingcap/tiflow/cdc/kv" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/owner" @@ -39,12 +46,6 @@ import ( "github.com/pingcap/tiflow/pkg/p2p" "github.com/pingcap/tiflow/pkg/pdtime" "github.com/pingcap/tiflow/pkg/version" - "github.com/tikv/client-go/v2/tikv" - pd "github.com/tikv/pd/client" - "go.etcd.io/etcd/clientv3/concurrency" - "go.etcd.io/etcd/mvcc" - "go.uber.org/zap" - "golang.org/x/time/rate" ) // Capture represents a Capture server, it monitors the changefeed information in etcd and schedules Task on it. @@ -140,7 +141,10 @@ func (c *Capture) reset(ctx context.Context) error { if c.TimeAcquirer != nil { c.TimeAcquirer.Stop() } - c.TimeAcquirer = pdtime.NewTimeAcquirer(c.pdClient) + c.TimeAcquirer, err = pdtime.NewTimeAcquirer(ctx, c.pdClient) + if err != nil { + return errors.Trace(err) + } if c.tableActorSystem != nil { err := c.tableActorSystem.Stop() diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 046fe045e56..f2f4553525b 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -24,6 +24,10 @@ import ( "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/format" timodel "github.com/pingcap/tidb/parser/model" + "github.com/prometheus/client_golang/prometheus" + "github.com/tikv/client-go/v2/oracle" + "go.uber.org/zap" + "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/redo" schedulerv2 "github.com/pingcap/tiflow/cdc/scheduler" @@ -32,9 +36,6 @@ import ( "github.com/pingcap/tiflow/pkg/orchestrator" "github.com/pingcap/tiflow/pkg/txnutil/gc" "github.com/pingcap/tiflow/pkg/util" - "github.com/prometheus/client_golang/prometheus" - "github.com/tikv/client-go/v2/oracle" - "go.uber.org/zap" ) type changefeed struct { @@ -189,18 +190,25 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *orchestrator.Changefeed if err != nil { return errors.Trace(err) } + + pdTime, _ := ctx.GlobalVars().TimeAcquirer.CurrentTimeFromCached() + currentTs := oracle.GetPhysical(pdTime) + // CheckpointCannotProceed implies that not all tables are being replicated normally, // so in that case there is no need to advance the global watermarks. if newCheckpointTs != schedulerv2.CheckpointCannotProceed { - pdTime, _ := ctx.GlobalVars().TimeAcquirer.CurrentTimeFromCached() - currentTs := oracle.GetPhysical(pdTime) if newResolvedTs > barrierTs { newResolvedTs = barrierTs } if newCheckpointTs > barrierTs { newCheckpointTs = barrierTs } - c.updateStatus(currentTs, newCheckpointTs, newResolvedTs) + c.updateStatus(newCheckpointTs, newResolvedTs) + c.updateMetrics(currentTs, newCheckpointTs, newResolvedTs) + } else if c.state.Status != nil { + // We should keep the metrics updated even if the scheduler cannot + // advance the watermarks for now. + c.updateMetrics(currentTs, c.state.Status.CheckpointTs, c.state.Status.ResolvedTs) } return nil } @@ -522,7 +530,17 @@ func (c *changefeed) asyncExecDDL(ctx cdcContext.Context, job *timodel.Job) (don return done, nil } -func (c *changefeed) updateStatus(currentTs int64, checkpointTs, resolvedTs model.Ts) { +func (c *changefeed) updateMetrics(currentTs int64, checkpointTs, resolvedTs model.Ts) { + phyCkpTs := oracle.ExtractPhysical(checkpointTs) + c.metricsChangefeedCheckpointTsGauge.Set(float64(phyCkpTs)) + c.metricsChangefeedCheckpointTsLagGauge.Set(float64(currentTs-phyCkpTs) / 1e3) + + phyRTs := oracle.ExtractPhysical(resolvedTs) + c.metricsChangefeedResolvedTsGauge.Set(float64(phyRTs)) + c.metricsChangefeedResolvedTsLagGauge.Set(float64(currentTs-phyRTs) / 1e3) +} + +func (c *changefeed) updateStatus(checkpointTs, resolvedTs model.Ts) { c.state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { changed := false if status == nil { @@ -538,19 +556,13 @@ func (c *changefeed) updateStatus(currentTs int64, checkpointTs, resolvedTs mode } return status, changed, nil }) - phyCkpTs := oracle.ExtractPhysical(checkpointTs) - c.metricsChangefeedCheckpointTsGauge.Set(float64(phyCkpTs)) - c.metricsChangefeedCheckpointTsLagGauge.Set(float64(currentTs-phyCkpTs) / 1e3) - - phyRTs := oracle.ExtractPhysical(resolvedTs) - c.metricsChangefeedResolvedTsGauge.Set(float64(phyRTs)) - c.metricsChangefeedResolvedTsLagGauge.Set(float64(currentTs-phyRTs) / 1e3) } func (c *changefeed) Close(ctx cdcContext.Context) { c.releaseResources(ctx) } +// GetInfoProvider returns an InfoProvider if one is available. func (c *changefeed) GetInfoProvider() schedulerv2.InfoProvider { if provider, ok := c.scheduler.(schedulerv2.InfoProvider); ok { return provider diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index 6064fa3e1fd..f3b0e843fd9 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" cdcContext "github.com/pingcap/tiflow/pkg/context" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/orchestrator" @@ -306,6 +307,38 @@ func (o *Owner) updateMetrics(state *orchestrator.GlobalReactorState) { ownerMaintainTableNumGauge.Reset() changefeedStatusGauge.Reset() + + conf := config.GetGlobalServerConfig() + + // TODO refactor this piece of code when the new scheduler is stabilized, + // and the old scheduler is removed. + if conf.Debug != nil && conf.Debug.EnableNewScheduler { + for cfID, cf := range o.changefeeds { + if cf.state != nil && cf.state.Info != nil { + changefeedStatusGauge.WithLabelValues(cfID).Set(float64(cf.state.Info.State.ToInt())) + } + + // The InfoProvider is a proxy object returning information + // from the scheduler. + infoProvider := cf.GetInfoProvider() + if infoProvider == nil { + // The scheduler has not been initialized yet. + continue + } + + totalCounts := infoProvider.GetTotalTableCounts() + pendingCounts := infoProvider.GetPendingTableCounts() + + for captureID, info := range o.captures { + ownerMaintainTableNumGauge.WithLabelValues( + cfID, info.AdvertiseAddr, maintainTableTypeTotal).Set(float64(totalCounts[captureID])) + ownerMaintainTableNumGauge.WithLabelValues( + cfID, info.AdvertiseAddr, maintainTableTypeWip).Set(float64(pendingCounts[captureID])) + } + } + return + } + for changefeedID, changefeedState := range state.Changefeeds { for captureID, captureInfo := range state.Captures { taskStatus, exist := changefeedState.TaskStatuses[captureID] diff --git a/cdc/owner/scheduler_test.go b/cdc/owner/scheduler_test.go index c02d8845183..45889aace6b 100644 --- a/cdc/owner/scheduler_test.go +++ b/cdc/owner/scheduler_test.go @@ -261,6 +261,12 @@ func TestSchedulerNoPeer(t *testing.T) { mockCluster.Close() } +func TestInfoProvider(t *testing.T) { + sched := scheduler(new(schedulerV2)) + _, ok := sched.(pscheduler.InfoProvider) + require.True(t, ok) +} + func receiveToChannels( ctx context.Context, t *testing.T, diff --git a/cdc/scheduler/info_provider.go b/cdc/scheduler/info_provider.go index 7c4bd3d285a..de1a9f88520 100644 --- a/cdc/scheduler/info_provider.go +++ b/cdc/scheduler/info_provider.go @@ -26,9 +26,20 @@ type InfoProvider interface { // GetTaskPositions returns the task positions. GetTaskPositions() (map[model.CaptureID]*model.TaskPosition, error) + + // GetTotalTableCounts returns the number of tables associated + // with each capture. + GetTotalTableCounts() map[model.CaptureID]int + + // GetPendingTableCounts returns the number of tables in a non-ready + // status (Adding & Removing) associated with each capture. + GetPendingTableCounts() map[model.CaptureID]int } // GetTaskStatuses implements InfoProvider for BaseScheduleDispatcher. +// Complexity Note: This function has O(#tables) cost. USE WITH CARE. +// Functions with cost O(#tables) are NOT recommended for regular metrics +// collection. func (s *BaseScheduleDispatcher) GetTaskStatuses() (map[model.CaptureID]*model.TaskStatus, error) { s.mu.Lock() defer s.mu.Unlock() @@ -67,6 +78,7 @@ func (s *BaseScheduleDispatcher) GetTaskStatuses() (map[model.CaptureID]*model.T } // GetTaskPositions implements InfoProvider for BaseScheduleDispatcher. +// Complexity Note: This function has O(#captures) cost. func (s *BaseScheduleDispatcher) GetTaskPositions() (map[model.CaptureID]*model.TaskPosition, error) { s.mu.Lock() defer s.mu.Unlock() @@ -81,3 +93,31 @@ func (s *BaseScheduleDispatcher) GetTaskPositions() (map[model.CaptureID]*model. return ret, nil } + +// GetTotalTableCounts implements InfoProvider for BaseScheduleDispatcher. +// Complexity Note: This function has O(#captures) cost. +func (s *BaseScheduleDispatcher) GetTotalTableCounts() map[model.CaptureID]int { + s.mu.Lock() + defer s.mu.Unlock() + + ret := make(map[model.CaptureID]int, len(s.captureStatus)) + for captureID := range s.captureStatus { + ret[captureID] = s.tables.CountTableByCaptureID(captureID) + } + return ret +} + +// GetPendingTableCounts implements InfoProvider for BaseScheduleDispatcher. +// Complexity Note: This function has O(#captures) cost. +func (s *BaseScheduleDispatcher) GetPendingTableCounts() map[model.CaptureID]int { + s.mu.Lock() + defer s.mu.Unlock() + + ret := make(map[model.CaptureID]int, len(s.captureStatus)) + for captureID := range s.captureStatus { + addCount := s.tables.CountTableByCaptureIDAndStatus(captureID, util.AddingTable) + removeCount := s.tables.CountTableByCaptureIDAndStatus(captureID, util.RemovingTable) + ret[captureID] = addCount + removeCount + } + return ret +} diff --git a/cdc/scheduler/info_provider_test.go b/cdc/scheduler/info_provider_test.go index 8261f936655..2e0c6db29e9 100644 --- a/cdc/scheduler/info_provider_test.go +++ b/cdc/scheduler/info_provider_test.go @@ -116,3 +116,18 @@ func TestInfoProviderTaskPosition(t *testing.T) { }, }, taskPosition) } + +func TestInfoProviderTableCounts(t *testing.T) { + dispatcher := NewBaseScheduleDispatcher("cf-1", nil, 1300) + injectSchedulerStateForInfoProviderTest(dispatcher) + + require.Equal(t, map[model.CaptureID]int{ + "capture-1": 3, + "capture-2": 2, + }, dispatcher.GetTotalTableCounts()) + + require.Equal(t, map[model.CaptureID]int{ + "capture-1": 1, + "capture-2": 1, + }, dispatcher.GetPendingTableCounts()) +} diff --git a/cdc/scheduler/util/table_set.go b/cdc/scheduler/util/table_set.go index 55b06f40655..5c83b897c30 100644 --- a/cdc/scheduler/util/table_set.go +++ b/cdc/scheduler/util/table_set.go @@ -28,6 +28,12 @@ type TableSet struct { // a non-unique index to facilitate looking up tables // assigned to a given capture. captureIndex map[model.CaptureID]map[model.TableID]*TableRecord + + // caches the number of tables in each status associated with + // the given capture. + // This is used to accelerate scheduler decision and metrics + // collection when the number of tables is very high. + statusCounts map[model.CaptureID]map[TableStatus]int } // TableRecord is a record to be inserted into TableSet. @@ -62,6 +68,7 @@ func NewTableSet() *TableSet { return &TableSet{ tableIDMap: map[model.TableID]*TableRecord{}, captureIndex: map[model.CaptureID]map[model.TableID]*TableRecord{}, + statusCounts: map[model.CaptureID]map[TableStatus]int{}, } } @@ -80,8 +87,15 @@ func (s *TableSet) AddTableRecord(record *TableRecord) (successful bool) { captureIndexEntry = make(map[model.TableID]*TableRecord) s.captureIndex[record.CaptureID] = captureIndexEntry } - captureIndexEntry[record.TableID] = recordCloned + + statusCountEntry := s.statusCounts[record.CaptureID] + if statusCountEntry == nil { + statusCountEntry = make(map[TableStatus]int) + s.statusCounts[record.CaptureID] = statusCountEntry + } + statusCountEntry[record.Status]++ + return true } @@ -100,6 +114,9 @@ func (s *TableSet) UpdateTableRecord(record *TableRecord) (successful bool) { recordCloned := record.Clone() s.tableIDMap[record.TableID] = recordCloned s.captureIndex[record.CaptureID][record.TableID] = recordCloned + + s.statusCounts[record.CaptureID][oldRecord.Status]-- + s.statusCounts[record.CaptureID][record.Status]++ return true } @@ -142,6 +159,13 @@ func (s *TableSet) RemoveTableRecord(tableID model.TableID) bool { if len(captureIndexEntry) == 0 { delete(s.captureIndex, record.CaptureID) } + + statusCountEntry, ok := s.statusCounts[record.CaptureID] + if !ok { + log.Panic("unreachable", zap.Int64("tableID", tableID)) + } + statusCountEntry[record.Status]-- + return true } @@ -161,6 +185,7 @@ func (s *TableSet) RemoveTableRecordByCaptureID(captureID model.CaptureID) []*Ta ret = append(ret, record) } delete(s.captureIndex, captureID) + delete(s.statusCounts, captureID) return ret } @@ -169,6 +194,16 @@ func (s *TableSet) CountTableByCaptureID(captureID model.CaptureID) int { return len(s.captureIndex[captureID]) } +// CountTableByCaptureIDAndStatus counts the number of tables associated with the given captureID +// with the specified status. +func (s *TableSet) CountTableByCaptureIDAndStatus(captureID model.CaptureID, status TableStatus) int { + statusCountEntry, ok := s.statusCounts[captureID] + if !ok { + return 0 + } + return statusCountEntry[status] +} + // GetDistinctCaptures counts distinct captures with tables. func (s *TableSet) GetDistinctCaptures() []model.CaptureID { var ret []model.CaptureID @@ -202,10 +237,8 @@ func (s *TableSet) GetAllTablesGroupedByCaptures() map[model.CaptureID]map[model // CountTableByStatus counts the number of tables with the given status. func (s *TableSet) CountTableByStatus(status TableStatus) (count int) { - for _, record := range s.tableIDMap { - if record.Status == status { - count++ - } + for _, statusEntryCount := range s.statusCounts { + count += statusEntryCount[status] } return } diff --git a/cdc/scheduler/util/table_set_test.go b/cdc/scheduler/util/table_set_test.go index 0b2e8723ccb..cf39122a617 100644 --- a/cdc/scheduler/util/table_set_test.go +++ b/cdc/scheduler/util/table_set_test.go @@ -44,6 +44,9 @@ func TestTableSetBasics(t *testing.T) { CaptureID: "capture-1", Status: AddingTable, }, record) + require.Equal(t, 1, ts.CountTableByStatus(AddingTable)) + require.Equal(t, 1, ts.CountTableByCaptureIDAndStatus("capture-1", AddingTable)) + require.Equal(t, 0, ts.CountTableByCaptureIDAndStatus("capture-2", AddingTable)) ok = ts.RemoveTableRecord(1) require.True(t, ok) @@ -93,6 +96,10 @@ func TestTableSetCaptures(t *testing.T) { require.Equal(t, 2, ts.CountTableByCaptureID("capture-2")) require.Equal(t, 1, ts.CountTableByCaptureID("capture-3")) + require.Equal(t, 2, ts.CountTableByCaptureIDAndStatus("capture-1", AddingTable)) + require.Equal(t, 2, ts.CountTableByCaptureIDAndStatus("capture-2", AddingTable)) + require.Equal(t, 1, ts.CountTableByCaptureIDAndStatus("capture-3", AddingTable)) + ok = ts.AddTableRecord(&TableRecord{ TableID: 6, CaptureID: "capture-3", @@ -100,6 +107,7 @@ func TestTableSetCaptures(t *testing.T) { }) require.True(t, ok) require.Equal(t, 2, ts.CountTableByCaptureID("capture-3")) + require.Equal(t, 2, ts.CountTableByCaptureIDAndStatus("capture-3", AddingTable)) captures := ts.GetDistinctCaptures() require.Len(t, captures, 3) @@ -112,6 +120,8 @@ func TestTableSetCaptures(t *testing.T) { ok = ts.RemoveTableRecord(4) require.True(t, ok) + require.Equal(t, 0, ts.CountTableByCaptureIDAndStatus("capture-2", AddingTable)) + captures = ts.GetDistinctCaptures() require.Len(t, captures, 2) require.Contains(t, captures, "capture-1") @@ -226,6 +236,7 @@ func TestCountTableByStatus(t *testing.T) { require.Equal(t, 2, ts.CountTableByStatus(AddingTable)) require.Equal(t, 2, ts.CountTableByStatus(RunningTable)) require.Equal(t, 1, ts.CountTableByStatus(RemovingTable)) + require.Equal(t, 1, ts.CountTableByCaptureIDAndStatus("capture-3", RunningTable)) } func TestUpdateTableRecord(t *testing.T) { @@ -243,6 +254,7 @@ func TestUpdateTableRecord(t *testing.T) { Status: AddingTable, }) require.True(t, ok) + require.Equal(t, 0, ts.CountTableByCaptureIDAndStatus("capture-3", RunningTable)) ok = ts.UpdateTableRecord(&TableRecord{ TableID: 5, @@ -255,6 +267,7 @@ func TestUpdateTableRecord(t *testing.T) { require.True(t, ok) require.Equal(t, RunningTable, rec.Status) require.Equal(t, RunningTable, ts.GetAllTablesGroupedByCaptures()["capture-3"][5].Status) + require.Equal(t, 1, ts.CountTableByCaptureIDAndStatus("capture-3", RunningTable)) ok = ts.UpdateTableRecord(&TableRecord{ TableID: 4, @@ -267,4 +280,6 @@ func TestUpdateTableRecord(t *testing.T) { require.Equal(t, RunningTable, rec.Status) require.Equal(t, "capture-3", rec.CaptureID) require.Equal(t, RunningTable, ts.GetAllTablesGroupedByCaptures()["capture-3"][4].Status) + require.Equal(t, 2, ts.CountTableByCaptureIDAndStatus("capture-3", RunningTable)) + require.Equal(t, 0, ts.CountTableByCaptureIDAndStatus("capture-3", AddingTable)) } diff --git a/pkg/pdtime/acquirer.go b/pkg/pdtime/acquirer.go index 4f9a3ad93ad..37eebcec6b7 100644 --- a/pkg/pdtime/acquirer.go +++ b/pkg/pdtime/acquirer.go @@ -20,10 +20,11 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" - "github.com/pingcap/tiflow/pkg/retry" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" "go.uber.org/zap" + + "github.com/pingcap/tiflow/pkg/retry" ) const pdTimeUpdateInterval = 200 * time.Millisecond @@ -47,10 +48,16 @@ type TimeAcquirerImpl struct { } // NewTimeAcquirer return a new TimeAcquirer -func NewTimeAcquirer(pdClient pd.Client) TimeAcquirer { - return &TimeAcquirerImpl{ +func NewTimeAcquirer(ctx context.Context, pdClient pd.Client) (TimeAcquirer, error) { + ret := &TimeAcquirerImpl{ pdClient: pdClient, } + physical, _, err := pdClient.GetTS(ctx) + if err != nil { + return nil, errors.Trace(err) + } + ret.timeCache = oracle.GetTimeFromTS(oracle.ComposeTS(physical, 0)) + return ret, nil } // Run will get time from pd periodically to cache in pdPhysicalTimeCache diff --git a/pkg/pdtime/acquirer_test.go b/pkg/pdtime/acquirer_test.go index 55b2950192e..2fcda8e4d9a 100644 --- a/pkg/pdtime/acquirer_test.go +++ b/pkg/pdtime/acquirer_test.go @@ -36,7 +36,9 @@ func (m *MockPDClient) GetTS(ctx context.Context) (int64, int64, error) { func TestTimeFromPD(t *testing.T) { t.Parallel() mockPDClient := &MockPDClient{} - TimeAcquirer := NewTimeAcquirer(mockPDClient) + TimeAcquirer, err := NewTimeAcquirer(context.Background(), mockPDClient) + require.NoError(t, err) + go TimeAcquirer.Run(context.Background()) defer TimeAcquirer.Stop() time.Sleep(1 * time.Second) diff --git a/pkg/txnutil/gc/gc_manager_test.go b/pkg/txnutil/gc/gc_manager_test.go index 368f879cd39..92e3d37dfa0 100644 --- a/pkg/txnutil/gc/gc_manager_test.go +++ b/pkg/txnutil/gc/gc_manager_test.go @@ -18,14 +18,14 @@ import ( "testing" "time" - "github.com/pingcap/tiflow/pkg/pdtime" - "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/tikv/client-go/v2/oracle" + cdcContext "github.com/pingcap/tiflow/pkg/context" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/pdtime" "github.com/pingcap/tiflow/pkg/util/testleak" - "github.com/tikv/client-go/v2/oracle" ) func Test(t *testing.T) { @@ -96,8 +96,11 @@ func (s *gcManagerSuite) TestCheckStaleCheckpointTs(c *check.C) { gcManager.isTiCDCBlockGC = true ctx := context.Background() - TimeAcquirer := pdtime.NewTimeAcquirer(mockPDClient) + TimeAcquirer, err := pdtime.NewTimeAcquirer(ctx, mockPDClient) + c.Assert(err, check.IsNil) + go TimeAcquirer.Run(ctx) + time.Sleep(1 * time.Second) defer TimeAcquirer.Stop() @@ -105,7 +108,7 @@ func (s *gcManagerSuite) TestCheckStaleCheckpointTs(c *check.C) { TimeAcquirer: TimeAcquirer, }) - err := gcManager.CheckStaleCheckpointTs(cCtx, "cfID", 10) + err = gcManager.CheckStaleCheckpointTs(cCtx, "cfID", 10) c.Assert(cerror.ErrGCTTLExceeded.Equal(errors.Cause(err)), check.IsTrue) c.Assert(cerror.ChangefeedFastFailError(err), check.IsTrue)