Skip to content

Commit

Permalink
br: add GetTSWithRetry func (#38663)
Browse files Browse the repository at this point in the history
ref #36910
  • Loading branch information
MoCuishle28 authored Nov 3, 2022
1 parent 0f5bcd2 commit 7e5086c
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 1 deletion.
31 changes: 30 additions & 1 deletion br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,35 @@ func (rc *Client) GetTS(ctx context.Context) (uint64, error) {
return restoreTS, nil
}

// GetTSWithRetry gets a new timestamp with retry from PD.
func (rc *Client) GetTSWithRetry(ctx context.Context) (uint64, error) {
var (
startTS uint64
getTSErr error
retry uint
)

err := utils.WithRetry(ctx, func() error {
startTS, getTSErr = rc.GetTS(ctx)
failpoint.Inject("get-ts-error", func(val failpoint.Value) {
if val.(bool) && retry < 3 {
getTSErr = errors.Errorf("rpc error: code = Unknown desc = [PD:tso:ErrGenerateTimestamp]generate timestamp failed, requested pd is not leader of cluster")
}
})

retry++
if getTSErr != nil {
log.Warn("failed to get TS, retry it", zap.Uint("retry time", retry), logutil.ShortError(getTSErr))
}
return getTSErr
}, utils.NewPDReqBackoffer())

if err != nil {
log.Error("failed to get TS", zap.Error(err))
}
return startTS, errors.Trace(err)
}

// ResetTS resets the timestamp of PD to a bigger value.
func (rc *Client) ResetTS(ctx context.Context, pdCtrl *pdutil.PdController) error {
restoreTS := rc.backupMeta.GetEndVersion()
Expand Down Expand Up @@ -1350,7 +1379,7 @@ func (rc *Client) execChecksum(
ctx = opentracing.ContextWithSpan(ctx, span1)
}

startTS, err := rc.GetTS(ctx)
startTS, err := rc.GetTSWithRetry(ctx)
if err != nil {
return errors.Trace(err)
}
Expand Down
43 changes: 43 additions & 0 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/import_sstpb"
Expand Down Expand Up @@ -330,12 +331,54 @@ func TestPreCheckTableClusterIndex(t *testing.T) {
type fakePDClient struct {
pd.Client
stores []*metapb.Store

notLeader bool
}

var retryTimes int

func (fpdc fakePDClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*metapb.Store, error) {
return append([]*metapb.Store{}, fpdc.stores...), nil
}

func (fpdc fakePDClient) GetTS(ctx context.Context) (int64, int64, error) {
retryTimes++
if retryTimes >= 3 { // the mock PD leader switched successfully
fpdc.notLeader = false
}

if fpdc.notLeader {
return 0, 0, errors.Errorf("rpc error: code = Unknown desc = [PD:tso:ErrGenerateTimestamp]generate timestamp failed, requested pd is not leader of cluster")
}
return 1, 1, nil
}

func TestGetTSWithRetry(t *testing.T) {
t.Run("PD leader is healthy:", func(t *testing.T) {
retryTimes = -1000
pDClient := fakePDClient{notLeader: false}
client := restore.NewRestoreClient(pDClient, nil, defaultKeepaliveCfg, false)
_, err := client.GetTSWithRetry(context.Background())
require.NoError(t, err)
})

t.Run("PD leader failure:", func(t *testing.T) {
retryTimes = -1000
pDClient := fakePDClient{notLeader: true}
client := restore.NewRestoreClient(pDClient, nil, defaultKeepaliveCfg, false)
_, err := client.GetTSWithRetry(context.Background())
require.Error(t, err)
})

t.Run("PD leader switch successfully", func(t *testing.T) {
retryTimes = 0
pDClient := fakePDClient{notLeader: true}
client := restore.NewRestoreClient(pDClient, nil, defaultKeepaliveCfg, false)
_, err := client.GetTSWithRetry(context.Background())
require.NoError(t, err)
})
}

func TestPreCheckTableTiFlashReplicas(t *testing.T) {
m := mc
mockStores := []*metapb.Store{
Expand Down

0 comments on commit 7e5086c

Please sign in to comment.