diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 7a2c04285c09f..590f95a1b5889 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -443,6 +443,35 @@ func (rc *Client) GetTS(ctx context.Context) (uint64, error) { return restoreTS, nil } +// GetTSWithRetry gets a new timestamp with retry from PD. +func (rc *Client) GetTSWithRetry(ctx context.Context) (uint64, error) { + var ( + startTS uint64 + getTSErr error + retry uint + ) + + err := utils.WithRetry(ctx, func() error { + startTS, getTSErr = rc.GetTS(ctx) + failpoint.Inject("get-ts-error", func(val failpoint.Value) { + if val.(bool) && retry < 3 { + getTSErr = errors.Errorf("rpc error: code = Unknown desc = [PD:tso:ErrGenerateTimestamp]generate timestamp failed, requested pd is not leader of cluster") + } + }) + + retry++ + if getTSErr != nil { + log.Warn("failed to get TS, retry it", zap.Uint("retry time", retry), logutil.ShortError(getTSErr)) + } + return getTSErr + }, utils.NewPDReqBackoffer()) + + if err != nil { + log.Error("failed to get TS", zap.Error(err)) + } + return startTS, errors.Trace(err) +} + // ResetTS resets the timestamp of PD to a bigger value. func (rc *Client) ResetTS(ctx context.Context, pdAddrs []string) error { restoreTS := rc.backupMeta.GetEndVersion() @@ -1174,7 +1203,7 @@ func (rc *Client) execChecksum( ctx = opentracing.ContextWithSpan(ctx, span1) } - startTS, err := rc.GetTS(ctx) + startTS, err := rc.GetTSWithRetry(ctx) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/restore/client_test.go b/br/pkg/restore/client_test.go index 334ac67387f5d..4791a7d252e20 100644 --- a/br/pkg/restore/client_test.go +++ b/br/pkg/restore/client_test.go @@ -9,6 +9,13 @@ import ( "testing" "time" +<<<<<<< HEAD +======= + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/kvproto/pkg/import_sstpb" +>>>>>>> 7e5086c475 (br: add GetTSWithRetry func (#38663)) "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/br/pkg/gluetidb" "github.com/pingcap/tidb/br/pkg/metautil" @@ -168,12 +175,54 @@ func TestPreCheckTableClusterIndex(t *testing.T) { type fakePDClient struct { pd.Client stores []*metapb.Store + + notLeader bool } +var retryTimes int + func (fpdc fakePDClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*metapb.Store, error) { return append([]*metapb.Store{}, fpdc.stores...), nil } +func (fpdc fakePDClient) GetTS(ctx context.Context) (int64, int64, error) { + retryTimes++ + if retryTimes >= 3 { // the mock PD leader switched successfully + fpdc.notLeader = false + } + + if fpdc.notLeader { + return 0, 0, errors.Errorf("rpc error: code = Unknown desc = [PD:tso:ErrGenerateTimestamp]generate timestamp failed, requested pd is not leader of cluster") + } + return 1, 1, nil +} + +func TestGetTSWithRetry(t *testing.T) { + t.Run("PD leader is healthy:", func(t *testing.T) { + retryTimes = -1000 + pDClient := fakePDClient{notLeader: false} + client := restore.NewRestoreClient(pDClient, nil, defaultKeepaliveCfg, false) + _, err := client.GetTSWithRetry(context.Background()) + require.NoError(t, err) + }) + + t.Run("PD leader failure:", func(t *testing.T) { + retryTimes = -1000 + pDClient := fakePDClient{notLeader: true} + client := restore.NewRestoreClient(pDClient, nil, defaultKeepaliveCfg, false) + _, err := client.GetTSWithRetry(context.Background()) + require.Error(t, err) + }) + + t.Run("PD leader switch successfully", func(t *testing.T) { + retryTimes = 0 + pDClient := fakePDClient{notLeader: true} + client := restore.NewRestoreClient(pDClient, nil, defaultKeepaliveCfg, false) + _, err := client.GetTSWithRetry(context.Background()) + require.NoError(t, err) + }) +} + func TestPreCheckTableTiFlashReplicas(t *testing.T) { m := mc mockStores := []*metapb.Store{