diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 93fc7c672ed36..328476270c884 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -548,7 +548,40 @@ func NewBackend( } }() config.adjust() - pdCtl, err := pdutil.NewPdController(ctx, config.PDAddr, tls.TLSConfig(), tls.ToPDSecurityOption()) + var ( + pdCtl *pdutil.PdController + spkv *tikvclient.EtcdSafePointKV + pdCliForTiKV *tikvclient.CodecPDClient + rpcCli tikvclient.Client + tikvCli *tikvclient.KVStore + importClientFactory *importClientFactoryImpl + multiIngestSupported bool + ) + defer func() { + if err == nil { + return + } + if importClientFactory != nil { + importClientFactory.Close() + } + if tikvCli != nil { + // tikvCli uses pdCliForTiKV(which wraps pdCli) , spkv and rpcCli, so + // close tikvCli will close all of them. + _ = tikvCli.Close() + } else { + if rpcCli != nil { + _ = rpcCli.Close() + } + if spkv != nil { + _ = spkv.Close() + } + // pdCliForTiKV wraps pdCli, so we only need close pdCtl + if pdCtl != nil { + pdCtl.Close() + } + } + }() + pdCtl, err = pdutil.NewPdController(ctx, config.PDAddr, tls.TLSConfig(), tls.ToPDSecurityOption()) if err != nil { return nil, common.NormalizeOrWrapErr(common.ErrCreatePDClient, err) } @@ -580,12 +613,11 @@ func NewBackend( } // The following copies tikv.NewTxnClient without creating yet another pdClient. - spkv, err := tikvclient.NewEtcdSafePointKV(strings.Split(config.PDAddr, ","), tls.TLSConfig()) + spkv, err = tikvclient.NewEtcdSafePointKV(strings.Split(config.PDAddr, ","), tls.TLSConfig()) if err != nil { return nil, common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs() } - var pdCliForTiKV *tikvclient.CodecPDClient if config.KeyspaceName == "" { pdCliForTiKV = tikvclient.NewCodecPDClient(tikvclient.ModeTxn, pdCtl.GetPDClient()) } else { @@ -596,12 +628,16 @@ func NewBackend( } tikvCodec := pdCliForTiKV.GetCodec() - rpcCli := tikvclient.NewRPCClient(tikvclient.WithSecurity(tls.ToTiKVSecurityConfig()), tikvclient.WithCodec(tikvCodec)) - tikvCli, err := tikvclient.NewKVStore("lightning-local-backend", pdCliForTiKV, spkv, rpcCli) + rpcCli = tikvclient.NewRPCClient(tikvclient.WithSecurity(tls.ToTiKVSecurityConfig()), tikvclient.WithCodec(tikvCodec)) + tikvCli, err = tikvclient.NewKVStore("lightning-local-backend", pdCliForTiKV, spkv, rpcCli) if err != nil { return nil, common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs() } - importClientFactory := newImportClientFactoryImpl(splitCli, tls, config.MaxConnPerStore, config.ConnCompressType) + importClientFactory = newImportClientFactoryImpl(splitCli, tls, config.MaxConnPerStore, config.ConnCompressType) + multiIngestSupported, err = checkMultiIngestSupport(ctx, pdCtl, importClientFactory) + if err != nil { + return nil, common.ErrCheckMultiIngest.Wrap(err).GenWithStackByArgs() + } keyAdapter := common.KeyAdapter(common.NoopKeyAdapter{}) if config.DupeDetectEnabled { keyAdapter = common.DupDetectKeyAdapter{} @@ -629,6 +665,7 @@ func NewBackend( BackendConfig: config, + supportMultiIngest: multiIngestSupported, duplicateDB: duplicateDB, keyAdapter: keyAdapter, importClientFactory: importClientFactory, @@ -639,9 +676,6 @@ func NewBackend( if m, ok := metric.GetCommonMetric(ctx); ok { local.metrics = m } - if err = local.checkMultiIngestSupport(ctx); err != nil { - return nil, common.ErrCheckMultiIngest.Wrap(err).GenWithStackByArgs() - } return local, nil } @@ -659,10 +693,10 @@ func (local *Backend) TotalMemoryConsume() int64 { return memConsume + local.bufferPool.TotalSize() } -func (local *Backend) checkMultiIngestSupport(ctx context.Context) error { - stores, err := local.pdCtl.GetPDClient().GetAllStores(ctx, pd.WithExcludeTombstone()) +func checkMultiIngestSupport(ctx context.Context, pdCtl *pdutil.PdController, importClientFactory ImportClientFactory) (bool, error) { + stores, err := pdCtl.GetPDClient().GetAllStores(ctx, pd.WithExcludeTombstone()) if err != nil { - return errors.Trace(err) + return false, errors.Trace(err) } hasTiFlash := false @@ -684,10 +718,10 @@ func (local *Backend) checkMultiIngestSupport(ctx context.Context) error { select { case <-time.After(100 * time.Millisecond): case <-ctx.Done(): - return ctx.Err() + return false, ctx.Err() } } - client, err1 := local.getImportClient(ctx, s.Id) + client, err1 := importClientFactory.Create(ctx, s.Id) if err1 != nil { err = err1 log.FromContext(ctx).Warn("get import client failed", zap.Error(err), zap.String("store", s.Address)) @@ -700,8 +734,7 @@ func (local *Backend) checkMultiIngestSupport(ctx context.Context) error { if st, ok := status.FromError(err); ok { if st.Code() == codes.Unimplemented { log.FromContext(ctx).Info("multi ingest not support", zap.Any("unsupported store", s)) - local.supportMultiIngest = false - return nil + return false, nil } } log.FromContext(ctx).Warn("check multi ingest support failed", zap.Error(err), zap.String("store", s.Address), @@ -711,17 +744,15 @@ func (local *Backend) checkMultiIngestSupport(ctx context.Context) error { // if the cluster contains no TiFlash store, we don't need the multi-ingest feature, // so in this condition, downgrade the logic instead of return an error. if hasTiFlash { - return errors.Trace(err) + return false, errors.Trace(err) } log.FromContext(ctx).Warn("check multi failed all retry, fallback to false", log.ShortError(err)) - local.supportMultiIngest = false - return nil + return false, nil } } - local.supportMultiIngest = true log.FromContext(ctx).Info("multi ingest support") - return nil + return true, nil } // rlock read locks a local file and returns the Engine instance if it exists. diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index 93afa912de596..e9e639406138f 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -1070,13 +1070,12 @@ func TestMultiIngest(t *testing.T) { return importCli }, }, - logger: log.L(), } - err := local.checkMultiIngestSupport(context.Background()) + supportMultiIngest, err := checkMultiIngestSupport(context.Background(), local.pdCtl, local.importClientFactory) if err != nil { require.Contains(t, err.Error(), testCase.retErr) } else { - require.Equal(t, testCase.supportMutliIngest, local.supportMultiIngest) + require.Equal(t, testCase.supportMutliIngest, supportMultiIngest) } } } diff --git a/br/pkg/restore/split/split.go b/br/pkg/restore/split/split.go index b2c47a40fec7a..9a701d0af4728 100644 --- a/br/pkg/restore/split/split.go +++ b/br/pkg/restore/split/split.go @@ -6,6 +6,7 @@ import ( "bytes" "context" "encoding/hex" + goerrors "errors" "time" "github.com/pingcap/errors" @@ -121,7 +122,7 @@ func PaginateScanRegion( var batch []*RegionInfo batch, err = client.ScanRegions(ctx, scanStartKey, endKey, limit) if err != nil { - err = errors.Annotatef(berrors.ErrPDBatchScanRegion, "scan regions from start-key:%s, err: %s", + err = errors.Annotatef(berrors.ErrPDBatchScanRegion.Wrap(err), "scan regions from start-key:%s, err: %s", redact.Key(scanStartKey), err.Error()) return err } @@ -235,7 +236,9 @@ func NewWaitRegionOnlineBackoffer() utils.Backoffer { // NextBackoff returns a duration to wait before retrying again func (b *WaitRegionOnlineBackoffer) NextBackoff(err error) time.Duration { - if berrors.ErrPDBatchScanRegion.Equal(err) { + // TODO(lance6716): why we only backoff when the error is ErrPDBatchScanRegion? + var perr *errors.Error + if goerrors.As(err, &perr) && berrors.ErrPDBatchScanRegion.ID() == perr.ID() { // it needs more time to wait splitting the regions that contains data in PITR. // 2s * 150 delayTime := b.Stat.ExponentialBackoff() diff --git a/br/pkg/restore/util_test.go b/br/pkg/restore/util_test.go index 42f0cebaddcbf..ccd9a04f50876 100644 --- a/br/pkg/restore/util_test.go +++ b/br/pkg/restore/util_test.go @@ -5,10 +5,12 @@ package restore_test import ( "context" "encoding/binary" + goerrors "errors" "fmt" "math/rand" "testing" + "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/metapb" @@ -353,7 +355,9 @@ func TestPaginateScanRegion(t *testing.T) { tc.InjectTimes = 5 _, err = split.PaginateScanRegion(ctx, tc, []byte{}, []byte{}, 3) require.Error(t, err) - require.True(t, berrors.ErrPDBatchScanRegion.Equal(err)) + var perr *errors.Error + goerrors.As(err, &perr) + require.EqualValues(t, berrors.ErrPDBatchScanRegion.ID(), perr.ID()) // make the regionMap losing some region, this will cause scan region check fails // region ID is key+1, so region 4 is deleted