diff --git a/br/pkg/lightning/restore/checksum.go b/br/pkg/lightning/restore/checksum.go index 5b7a5b1501af4..e9519b65170cf 100644 --- a/br/pkg/lightning/restore/checksum.go +++ b/br/pkg/lightning/restore/checksum.go @@ -279,12 +279,8 @@ func newTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanCon } } -func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) { - physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx) - if err != nil { - return nil, errors.Annotate(err, "fetch tso from pd failed") - } - executor, err := checksum.NewExecutorBuilder(tableInfo.Core, oracle.ComposeTS(physicalTS, logicalTS)). +func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpoints.TidbTableInfo, ts uint64) (*RemoteChecksum, error) { + executor, err := checksum.NewExecutorBuilder(tableInfo.Core, ts). SetConcurrency(e.distSQLScanConcurrency). Build() if err != nil { @@ -327,12 +323,16 @@ func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpo func (e *tikvChecksumManager) Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) { tbl := common.UniqueTable(tableInfo.DB, tableInfo.Name) - err := e.manager.addOneJob(ctx, tbl, oracle.ComposeTS(time.Now().Unix()*1000, 0)) + physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx) if err != nil { + return nil, errors.Annotate(err, "fetch tso from pd failed") + } + ts := oracle.ComposeTS(physicalTS, logicalTS) + if err := e.manager.addOneJob(ctx, tbl, ts); err != nil { return nil, errors.Trace(err) } - return e.checksumDB(ctx, tableInfo) + return e.checksumDB(ctx, tableInfo, ts) } type tableChecksumTS struct { diff --git a/br/pkg/lightning/restore/checksum_test.go b/br/pkg/lightning/restore/checksum_test.go index 6a9f334b31f9a..b0addcd04f15d 100644 --- a/br/pkg/lightning/restore/checksum_test.go +++ b/br/pkg/lightning/restore/checksum_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tipb/go-tipb" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" + uatomic "go.uber.org/atomic" ) var _ = Suite(&checksumSuite{}) @@ -169,10 +170,16 @@ func (s *checksumSuite) TestDoChecksumWithTikv(c *C) { for i := 0; i <= maxErrorRetryCount; i++ { kvClient.maxErrCount = i kvClient.curErrCount = 0 + var checksumTS uint64 + kvClient.onSendReq = func(req *kv.Request) { + checksumTS = req.StartTs + } checksumExec := &tikvChecksumManager{manager: newGCTTLManager(pdClient), client: kvClient} - startTS := oracle.ComposeTS(time.Now().Unix()*1000, 0) - ctx := context.WithValue(context.Background(), &checksumManagerKey, checksumExec) - _, err = DoChecksum(ctx, &TidbTableInfo{DB: "test", Name: "t", Core: tableInfo}) + ctx := context.Background() + physicalTS, logicalTS, err := pdClient.GetTS(ctx) + c.Assert(err, IsNil) + subCtx := context.WithValue(ctx, &checksumManagerKey, checksumExec) + _, err = DoChecksum(subCtx, &TidbTableInfo{DB: "test", Name: "t", Core: tableInfo}) // with max error retry < maxErrorRetryCount, the checksum can success if i >= maxErrorRetryCount { c.Assert(err, ErrorMatches, "tikv timeout") @@ -184,7 +191,9 @@ func (s *checksumSuite) TestDoChecksumWithTikv(c *C) { // after checksum, safepint should be small than start ts ts := pdClient.currentSafePoint() // 1ms for the schedule deviation + startTS := oracle.ComposeTS(physicalTS+1, logicalTS) c.Assert(ts <= startTS+1, IsTrue) + c.Assert(checksumTS >= ts, IsTrue) c.Assert(atomic.LoadUint32(&checksumExec.manager.started) > 0, IsTrue) } } @@ -215,15 +224,15 @@ func (s *checksumSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) { type safePointTTL struct { safePoint uint64 - // ttl is the last timestamp this safe point is valid - ttl int64 + expiredAt int64 } type testPDClient struct { sync.Mutex pd.Client - count int32 - gcSafePoint []safePointTTL + count int32 + gcSafePoint []safePointTTL + logicalTSCounter uatomic.Uint64 } func (c *testPDClient) currentSafePoint() uint64 { @@ -231,7 +240,7 @@ func (c *testPDClient) currentSafePoint() uint64 { c.Lock() defer c.Unlock() for _, s := range c.gcSafePoint { - if s.ttl > ts { + if s.expiredAt > ts { return s.safePoint } } @@ -239,7 +248,9 @@ func (c *testPDClient) currentSafePoint() uint64 { } func (c *testPDClient) GetTS(ctx context.Context) (int64, int64, error) { - return time.Now().Unix(), 0, nil + physicalTS := time.Now().UnixNano() / 1e6 + logicalTS := oracle.ExtractLogical(c.logicalTSCounter.Inc()) + return physicalTS, logicalTS, nil } func (c *testPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { @@ -253,13 +264,13 @@ func (c *testPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID s }) sp := c.gcSafePoint ttlEnd := time.Now().Unix() + ttl - spTTL := safePointTTL{safePoint: safePoint, ttl: ttlEnd} + spTTL := safePointTTL{safePoint: safePoint, expiredAt: ttlEnd} switch { case idx >= len(sp): c.gcSafePoint = append(c.gcSafePoint, spTTL) case sp[idx].safePoint == safePoint: - if ttlEnd > sp[idx].ttl { - sp[idx].ttl = ttlEnd + if ttlEnd > sp[idx].expiredAt { + sp[idx].expiredAt = ttlEnd } default: c.gcSafePoint = append(append(sp[:idx], spTTL), sp[idx:]...) @@ -385,8 +396,9 @@ func (r *mockResultSubset) RespTime() time.Duration { type mockChecksumKVClient struct { kv.Client - checksum tipb.ChecksumResponse - respDur time.Duration + checksum tipb.ChecksumResponse + respDur time.Duration + onSendReq func(req *kv.Request) // return error count before return success maxErrCount int curErrCount int @@ -394,6 +406,9 @@ type mockChecksumKVClient struct { // a mock client for checksum request func (c *mockChecksumKVClient) Send(ctx context.Context, req *kv.Request, vars interface{}, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool, eventCb trxevents.EventCallback) kv.Response { + if c.onSendReq != nil { + c.onSendReq(req) + } if c.curErrCount < c.maxErrCount { c.curErrCount++ return &mockErrorResponse{err: "tikv timeout"}