Skip to content

Commit

Permalink
lightning: use pd timestamp to update gc safepoint
Browse files Browse the repository at this point in the history
  • Loading branch information
sleepymole committed Mar 2, 2022
1 parent 1e1cf22 commit cb245bb
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 20 deletions.
16 changes: 8 additions & 8 deletions br/pkg/lightning/restore/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,8 @@ func newTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanCon
}
}

func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) {
physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx)
if err != nil {
return nil, errors.Annotate(err, "fetch tso from pd failed")
}
executor, err := checksum.NewExecutorBuilder(tableInfo.Core, oracle.ComposeTS(physicalTS, logicalTS)).
func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpoints.TidbTableInfo, ts uint64) (*RemoteChecksum, error) {
executor, err := checksum.NewExecutorBuilder(tableInfo.Core, ts).
SetConcurrency(e.distSQLScanConcurrency).
Build()
if err != nil {
Expand Down Expand Up @@ -327,13 +323,17 @@ func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpo

func (e *tikvChecksumManager) Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) {
tbl := common.UniqueTable(tableInfo.DB, tableInfo.Name)
err := e.manager.addOneJob(ctx, tbl, oracle.ComposeTS(time.Now().Unix()*1000, 0))
physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx)
if err != nil {
return nil, errors.Annotate(err, "fetch tso from pd failed")
}
ts := oracle.ComposeTS(physicalTS, logicalTS)
if err := e.manager.addOneJob(ctx, tbl, ts); err != nil {
return nil, errors.Trace(err)
}
defer e.manager.removeOneJob(tbl)

return e.checksumDB(ctx, tableInfo)
return e.checksumDB(ctx, tableInfo, ts)
}

type tableChecksumTS struct {
Expand Down
38 changes: 26 additions & 12 deletions br/pkg/lightning/restore/checksum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,13 @@ func TestDoChecksumWithTikv(t *testing.T) {
for i := 0; i <= maxErrorRetryCount; i++ {
kvClient.maxErrCount = i
kvClient.curErrCount = 0
var checksumTS uint64
kvClient.onSendReq = func(req *kv.Request) {
checksumTS = req.StartTs
}
checksumExec := &tikvChecksumManager{manager: newGCTTLManager(pdClient), client: kvClient}
startTS := oracle.ComposeTS(time.Now().Unix()*1000, 0)
physicalTS, logicalTS, err := pdClient.GetTS(ctx)
require.NoError(t, err)
subCtx := context.WithValue(ctx, &checksumManagerKey, checksumExec)
_, err = DoChecksum(subCtx, &TidbTableInfo{DB: "test", Name: "t", Core: tableInfo})
// with max error retry < maxErrorRetryCount, the checksum can success
Expand All @@ -185,8 +190,11 @@ func TestDoChecksumWithTikv(t *testing.T) {
// after checksum, safepint should be small than start ts
ts := pdClient.currentSafePoint()
// 1ms for the schedule deviation
startTS := oracle.ComposeTS(physicalTS+1, logicalTS)
require.True(t, ts <= startTS+1)
require.GreaterOrEqual(t, checksumTS, ts)
require.True(t, checksumExec.manager.started.Load())
require.Zero(t, checksumExec.manager.currentTS)
require.Equal(t, 0, len(checksumExec.manager.tableGCSafeTS))
}
}
Expand Down Expand Up @@ -215,31 +223,33 @@ func TestDoChecksumWithErrorAndLongOriginalLifetime(t *testing.T) {

type safePointTTL struct {
safePoint uint64
// ttl is the last timestamp this safe point is valid
ttl int64
expiredAt int64
}

type testPDClient struct {
sync.Mutex
pd.Client
count atomic.Int32
gcSafePoint []safePointTTL
count atomic.Int32
gcSafePoint []safePointTTL
logicalTSCounter atomic.Uint64
}

func (c *testPDClient) currentSafePoint() uint64 {
ts := time.Now().Unix()
c.Lock()
defer c.Unlock()
for _, s := range c.gcSafePoint {
if s.ttl > ts {
if s.expiredAt > ts {
return s.safePoint
}
}
return 0
}

func (c *testPDClient) GetTS(ctx context.Context) (int64, int64, error) {
return time.Now().Unix(), 0, nil
physicalTS := time.Now().UnixMilli()
logicalTS := oracle.ExtractLogical(c.logicalTSCounter.Inc())
return physicalTS, logicalTS, nil
}

func (c *testPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
Expand All @@ -253,13 +263,13 @@ func (c *testPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID s
})
sp := c.gcSafePoint
ttlEnd := time.Now().Unix() + ttl
spTTL := safePointTTL{safePoint: safePoint, ttl: ttlEnd}
spTTL := safePointTTL{safePoint: safePoint, expiredAt: ttlEnd}
switch {
case idx >= len(sp):
c.gcSafePoint = append(c.gcSafePoint, spTTL)
case sp[idx].safePoint == safePoint:
if ttlEnd > sp[idx].ttl {
sp[idx].ttl = ttlEnd
if ttlEnd > sp[idx].expiredAt {
sp[idx].expiredAt = ttlEnd
}
default:
c.gcSafePoint = append(append(sp[:idx], spTTL), sp[idx:]...)
Expand Down Expand Up @@ -385,15 +395,19 @@ func (r *mockResultSubset) RespTime() time.Duration {

type mockChecksumKVClient struct {
kv.Client
checksum tipb.ChecksumResponse
respDur time.Duration
checksum tipb.ChecksumResponse
respDur time.Duration
onSendReq func(req *kv.Request)
// return error count before return success
maxErrCount int
curErrCount int
}

// a mock client for checksum request
func (c *mockChecksumKVClient) Send(ctx context.Context, req *kv.Request, vars interface{}, option *kv.ClientSendOption) kv.Response {
if c.onSendReq != nil {
c.onSendReq(req)
}
if c.curErrCount < c.maxErrCount {
c.curErrCount++
return &mockErrorResponse{err: "tikv timeout"}
Expand Down

0 comments on commit cb245bb

Please sign in to comment.