Skip to content

Commit

Permalink
cherry pick pingcap#32734 to release-5.4
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
sleepymole authored and ti-srebot committed Mar 3, 2022
1 parent 1ee4efe commit 848f484
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 17 deletions.
16 changes: 8 additions & 8 deletions br/pkg/lightning/restore/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,8 @@ func newTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanCon
}
}

func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) {
physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx)
if err != nil {
return nil, errors.Annotate(err, "fetch tso from pd failed")
}
executor, err := checksum.NewExecutorBuilder(tableInfo.Core, oracle.ComposeTS(physicalTS, logicalTS)).
func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpoints.TidbTableInfo, ts uint64) (*RemoteChecksum, error) {
executor, err := checksum.NewExecutorBuilder(tableInfo.Core, ts).
SetConcurrency(e.distSQLScanConcurrency).
Build()
if err != nil {
Expand Down Expand Up @@ -327,12 +323,16 @@ func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpo

func (e *tikvChecksumManager) Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) {
tbl := common.UniqueTable(tableInfo.DB, tableInfo.Name)
err := e.manager.addOneJob(ctx, tbl, oracle.ComposeTS(time.Now().Unix()*1000, 0))
physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx)
if err != nil {
return nil, errors.Annotate(err, "fetch tso from pd failed")
}
ts := oracle.ComposeTS(physicalTS, logicalTS)
if err := e.manager.addOneJob(ctx, tbl, ts); err != nil {
return nil, errors.Trace(err)
}

return e.checksumDB(ctx, tableInfo)
return e.checksumDB(ctx, tableInfo, ts)
}

type tableChecksumTS struct {
Expand Down
53 changes: 44 additions & 9 deletions br/pkg/lightning/restore/checksum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,21 @@ func (s *checksumSuite) TestDoChecksumWithTikv(c *C) {
for i := 0; i <= maxErrorRetryCount; i++ {
kvClient.maxErrCount = i
kvClient.curErrCount = 0
var checksumTS uint64
kvClient.onSendReq = func(req *kv.Request) {
checksumTS = req.StartTs
}
checksumExec := &tikvChecksumManager{manager: newGCTTLManager(pdClient), client: kvClient}
<<<<<<< HEAD
startTS := oracle.ComposeTS(time.Now().Unix()*1000, 0)
ctx := context.WithValue(context.Background(), &checksumManagerKey, checksumExec)
_, err = DoChecksum(ctx, &TidbTableInfo{DB: "test", Name: "t", Core: tableInfo})
=======
physicalTS, logicalTS, err := pdClient.GetTS(ctx)
require.NoError(t, err)
subCtx := context.WithValue(ctx, &checksumManagerKey, checksumExec)
_, err = DoChecksum(subCtx, &TidbTableInfo{DB: "test", Name: "t", Core: tableInfo})
>>>>>>> 5288efa20... lightning: use pd timestamp to update gc safepoint (#32734)
// with max error retry < maxErrorRetryCount, the checksum can success
if i >= maxErrorRetryCount {
c.Assert(err, ErrorMatches, "tikv timeout")
Expand All @@ -184,8 +195,17 @@ func (s *checksumSuite) TestDoChecksumWithTikv(c *C) {
// after checksum, safepint should be small than start ts
ts := pdClient.currentSafePoint()
// 1ms for the schedule deviation
<<<<<<< HEAD
c.Assert(ts <= startTS+1, IsTrue)
c.Assert(atomic.LoadUint32(&checksumExec.manager.started) > 0, IsTrue)
=======
startTS := oracle.ComposeTS(physicalTS+1, logicalTS)
require.True(t, ts <= startTS+1)
require.GreaterOrEqual(t, checksumTS, ts)
require.True(t, checksumExec.manager.started.Load())
require.Zero(t, checksumExec.manager.currentTS)
require.Equal(t, 0, len(checksumExec.manager.tableGCSafeTS))
>>>>>>> 5288efa20... lightning: use pd timestamp to update gc safepoint (#32734)
}
}

Expand Down Expand Up @@ -215,31 +235,38 @@ func (s *checksumSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) {

type safePointTTL struct {
safePoint uint64
// ttl is the last timestamp this safe point is valid
ttl int64
expiredAt int64
}

type testPDClient struct {
sync.Mutex
pd.Client
<<<<<<< HEAD
count int32
gcSafePoint []safePointTTL
=======
count atomic.Int32
gcSafePoint []safePointTTL
logicalTSCounter atomic.Uint64
>>>>>>> 5288efa20... lightning: use pd timestamp to update gc safepoint (#32734)
}

func (c *testPDClient) currentSafePoint() uint64 {
ts := time.Now().Unix()
c.Lock()
defer c.Unlock()
for _, s := range c.gcSafePoint {
if s.ttl > ts {
if s.expiredAt > ts {
return s.safePoint
}
}
return 0
}

func (c *testPDClient) GetTS(ctx context.Context) (int64, int64, error) {
return time.Now().Unix(), 0, nil
physicalTS := time.Now().UnixNano() / 1e6
logicalTS := oracle.ExtractLogical(c.logicalTSCounter.Inc())
return physicalTS, logicalTS, nil
}

func (c *testPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
Expand All @@ -253,13 +280,13 @@ func (c *testPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID s
})
sp := c.gcSafePoint
ttlEnd := time.Now().Unix() + ttl
spTTL := safePointTTL{safePoint: safePoint, ttl: ttlEnd}
spTTL := safePointTTL{safePoint: safePoint, expiredAt: ttlEnd}
switch {
case idx >= len(sp):
c.gcSafePoint = append(c.gcSafePoint, spTTL)
case sp[idx].safePoint == safePoint:
if ttlEnd > sp[idx].ttl {
sp[idx].ttl = ttlEnd
if ttlEnd > sp[idx].expiredAt {
sp[idx].expiredAt = ttlEnd
}
default:
c.gcSafePoint = append(append(sp[:idx], spTTL), sp[idx:]...)
Expand Down Expand Up @@ -385,15 +412,23 @@ func (r *mockResultSubset) RespTime() time.Duration {

type mockChecksumKVClient struct {
kv.Client
checksum tipb.ChecksumResponse
respDur time.Duration
checksum tipb.ChecksumResponse
respDur time.Duration
onSendReq func(req *kv.Request)
// return error count before return success
maxErrCount int
curErrCount int
}

// a mock client for checksum request
<<<<<<< HEAD
func (c *mockChecksumKVClient) Send(ctx context.Context, req *kv.Request, vars interface{}, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool, eventCb trxevents.EventCallback) kv.Response {
=======
func (c *mockChecksumKVClient) Send(ctx context.Context, req *kv.Request, vars interface{}, option *kv.ClientSendOption) kv.Response {
if c.onSendReq != nil {
c.onSendReq(req)
}
>>>>>>> 5288efa20... lightning: use pd timestamp to update gc safepoint (#32734)
if c.curErrCount < c.maxErrCount {
c.curErrCount++
return &mockErrorResponse{err: "tikv timeout"}
Expand Down

0 comments on commit 848f484

Please sign in to comment.