Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: use pd timestamp to update gc safepoint (#32734) #32799

Merged
merged 3 commits into from
Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions br/pkg/lightning/restore/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,8 @@ func newTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanCon
}
}

func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) {
physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx)
if err != nil {
return nil, errors.Annotate(err, "fetch tso from pd failed")
}
executor, err := checksum.NewExecutorBuilder(tableInfo.Core, oracle.ComposeTS(physicalTS, logicalTS)).
func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpoints.TidbTableInfo, ts uint64) (*RemoteChecksum, error) {
executor, err := checksum.NewExecutorBuilder(tableInfo.Core, ts).
SetConcurrency(e.distSQLScanConcurrency).
Build()
if err != nil {
Expand Down Expand Up @@ -327,12 +323,16 @@ func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpo

func (e *tikvChecksumManager) Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) {
tbl := common.UniqueTable(tableInfo.DB, tableInfo.Name)
err := e.manager.addOneJob(ctx, tbl, oracle.ComposeTS(time.Now().Unix()*1000, 0))
physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx)
if err != nil {
return nil, errors.Annotate(err, "fetch tso from pd failed")
}
ts := oracle.ComposeTS(physicalTS, logicalTS)
if err := e.manager.addOneJob(ctx, tbl, ts); err != nil {
return nil, errors.Trace(err)
}

return e.checksumDB(ctx, tableInfo)
return e.checksumDB(ctx, tableInfo, ts)
}

type tableChecksumTS struct {
Expand Down
43 changes: 29 additions & 14 deletions br/pkg/lightning/restore/checksum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
uatomic "go.uber.org/atomic"
)

var _ = Suite(&checksumSuite{})
Expand Down Expand Up @@ -169,10 +170,16 @@ func (s *checksumSuite) TestDoChecksumWithTikv(c *C) {
for i := 0; i <= maxErrorRetryCount; i++ {
kvClient.maxErrCount = i
kvClient.curErrCount = 0
var checksumTS uint64
kvClient.onSendReq = func(req *kv.Request) {
checksumTS = req.StartTs
}
checksumExec := &tikvChecksumManager{manager: newGCTTLManager(pdClient), client: kvClient}
startTS := oracle.ComposeTS(time.Now().Unix()*1000, 0)
ctx := context.WithValue(context.Background(), &checksumManagerKey, checksumExec)
_, err = DoChecksum(ctx, &TidbTableInfo{DB: "test", Name: "t", Core: tableInfo})
ctx := context.Background()
physicalTS, logicalTS, err := pdClient.GetTS(ctx)
c.Assert(err, IsNil)
subCtx := context.WithValue(ctx, &checksumManagerKey, checksumExec)
_, err = DoChecksum(subCtx, &TidbTableInfo{DB: "test", Name: "t", Core: tableInfo})
// with max error retry < maxErrorRetryCount, the checksum can success
if i >= maxErrorRetryCount {
c.Assert(err, ErrorMatches, "tikv timeout")
Expand All @@ -184,7 +191,9 @@ func (s *checksumSuite) TestDoChecksumWithTikv(c *C) {
// after checksum, safepint should be small than start ts
ts := pdClient.currentSafePoint()
// 1ms for the schedule deviation
startTS := oracle.ComposeTS(physicalTS+1, logicalTS)
c.Assert(ts <= startTS+1, IsTrue)
c.Assert(checksumTS >= ts, IsTrue)
c.Assert(atomic.LoadUint32(&checksumExec.manager.started) > 0, IsTrue)
}
}
Expand Down Expand Up @@ -215,31 +224,33 @@ func (s *checksumSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) {

type safePointTTL struct {
safePoint uint64
// ttl is the last timestamp this safe point is valid
ttl int64
expiredAt int64
}

type testPDClient struct {
sync.Mutex
pd.Client
count int32
gcSafePoint []safePointTTL
count int32
gcSafePoint []safePointTTL
logicalTSCounter uatomic.Uint64
}

func (c *testPDClient) currentSafePoint() uint64 {
ts := time.Now().Unix()
c.Lock()
defer c.Unlock()
for _, s := range c.gcSafePoint {
if s.ttl > ts {
if s.expiredAt > ts {
return s.safePoint
}
}
return 0
}

func (c *testPDClient) GetTS(ctx context.Context) (int64, int64, error) {
return time.Now().Unix(), 0, nil
physicalTS := time.Now().UnixNano() / 1e6
logicalTS := oracle.ExtractLogical(c.logicalTSCounter.Inc())
return physicalTS, logicalTS, nil
}

func (c *testPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
Expand All @@ -253,13 +264,13 @@ func (c *testPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID s
})
sp := c.gcSafePoint
ttlEnd := time.Now().Unix() + ttl
spTTL := safePointTTL{safePoint: safePoint, ttl: ttlEnd}
spTTL := safePointTTL{safePoint: safePoint, expiredAt: ttlEnd}
switch {
case idx >= len(sp):
c.gcSafePoint = append(c.gcSafePoint, spTTL)
case sp[idx].safePoint == safePoint:
if ttlEnd > sp[idx].ttl {
sp[idx].ttl = ttlEnd
if ttlEnd > sp[idx].expiredAt {
sp[idx].expiredAt = ttlEnd
}
default:
c.gcSafePoint = append(append(sp[:idx], spTTL), sp[idx:]...)
Expand Down Expand Up @@ -385,15 +396,19 @@ func (r *mockResultSubset) RespTime() time.Duration {

type mockChecksumKVClient struct {
kv.Client
checksum tipb.ChecksumResponse
respDur time.Duration
checksum tipb.ChecksumResponse
respDur time.Duration
onSendReq func(req *kv.Request)
// return error count before return success
maxErrCount int
curErrCount int
}

// a mock client for checksum request
func (c *mockChecksumKVClient) Send(ctx context.Context, req *kv.Request, vars interface{}, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool, eventCb trxevents.EventCallback) kv.Response {
if c.onSendReq != nil {
c.onSendReq(req)
}
if c.curErrCount < c.maxErrCount {
c.curErrCount++
return &mockErrorResponse{err: "tikv timeout"}
Expand Down