Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

localbackend: fix resource leak when err on new local backend (#53664) #54618

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 52 additions & 21 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,40 @@ func NewBackend(
}
}()
config.adjust()
pdCtl, err := pdutil.NewPdController(ctx, config.PDAddr, tls.TLSConfig(), tls.ToPDSecurityOption())
var (
pdCtl *pdutil.PdController
spkv *tikvclient.EtcdSafePointKV
pdCliForTiKV *tikvclient.CodecPDClient
rpcCli tikvclient.Client
tikvCli *tikvclient.KVStore
importClientFactory *importClientFactoryImpl
multiIngestSupported bool
)
defer func() {
if err == nil {
return
}
if importClientFactory != nil {
importClientFactory.Close()
}
if tikvCli != nil {
// tikvCli uses pdCliForTiKV(which wraps pdCli) , spkv and rpcCli, so
// close tikvCli will close all of them.
_ = tikvCli.Close()
} else {
if rpcCli != nil {
_ = rpcCli.Close()
}
if spkv != nil {
_ = spkv.Close()
}
// pdCliForTiKV wraps pdCli, so we only need close pdCtl
if pdCtl != nil {
pdCtl.Close()
}
}
}()
pdCtl, err = pdutil.NewPdController(ctx, config.PDAddr, tls.TLSConfig(), tls.ToPDSecurityOption())
if err != nil {
return nil, common.NormalizeOrWrapErr(common.ErrCreatePDClient, err)
}
Expand Down Expand Up @@ -580,12 +613,11 @@ func NewBackend(
}

// The following copies tikv.NewTxnClient without creating yet another pdClient.
spkv, err := tikvclient.NewEtcdSafePointKV(strings.Split(config.PDAddr, ","), tls.TLSConfig())
spkv, err = tikvclient.NewEtcdSafePointKV(strings.Split(config.PDAddr, ","), tls.TLSConfig())
if err != nil {
return nil, common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs()
}

var pdCliForTiKV *tikvclient.CodecPDClient
if config.KeyspaceName == "" {
pdCliForTiKV = tikvclient.NewCodecPDClient(tikvclient.ModeTxn, pdCtl.GetPDClient())
} else {
Expand All @@ -596,12 +628,16 @@ func NewBackend(
}

tikvCodec := pdCliForTiKV.GetCodec()
rpcCli := tikvclient.NewRPCClient(tikvclient.WithSecurity(tls.ToTiKVSecurityConfig()), tikvclient.WithCodec(tikvCodec))
tikvCli, err := tikvclient.NewKVStore("lightning-local-backend", pdCliForTiKV, spkv, rpcCli)
rpcCli = tikvclient.NewRPCClient(tikvclient.WithSecurity(tls.ToTiKVSecurityConfig()), tikvclient.WithCodec(tikvCodec))
tikvCli, err = tikvclient.NewKVStore("lightning-local-backend", pdCliForTiKV, spkv, rpcCli)
if err != nil {
return nil, common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs()
}
importClientFactory := newImportClientFactoryImpl(splitCli, tls, config.MaxConnPerStore, config.ConnCompressType)
importClientFactory = newImportClientFactoryImpl(splitCli, tls, config.MaxConnPerStore, config.ConnCompressType)
multiIngestSupported, err = checkMultiIngestSupport(ctx, pdCtl, importClientFactory)
if err != nil {
return nil, common.ErrCheckMultiIngest.Wrap(err).GenWithStackByArgs()
}
keyAdapter := common.KeyAdapter(common.NoopKeyAdapter{})
if config.DupeDetectEnabled {
keyAdapter = common.DupDetectKeyAdapter{}
Expand Down Expand Up @@ -629,6 +665,7 @@ func NewBackend(

BackendConfig: config,

supportMultiIngest: multiIngestSupported,
duplicateDB: duplicateDB,
keyAdapter: keyAdapter,
importClientFactory: importClientFactory,
Expand All @@ -639,9 +676,6 @@ func NewBackend(
if m, ok := metric.GetCommonMetric(ctx); ok {
local.metrics = m
}
if err = local.checkMultiIngestSupport(ctx); err != nil {
return nil, common.ErrCheckMultiIngest.Wrap(err).GenWithStackByArgs()
}

return local, nil
}
Expand All @@ -659,10 +693,10 @@ func (local *Backend) TotalMemoryConsume() int64 {
return memConsume + local.bufferPool.TotalSize()
}

func (local *Backend) checkMultiIngestSupport(ctx context.Context) error {
stores, err := local.pdCtl.GetPDClient().GetAllStores(ctx, pd.WithExcludeTombstone())
func checkMultiIngestSupport(ctx context.Context, pdCtl *pdutil.PdController, importClientFactory ImportClientFactory) (bool, error) {
stores, err := pdCtl.GetPDClient().GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return errors.Trace(err)
return false, errors.Trace(err)
}

hasTiFlash := false
Expand All @@ -684,10 +718,10 @@ func (local *Backend) checkMultiIngestSupport(ctx context.Context) error {
select {
case <-time.After(100 * time.Millisecond):
case <-ctx.Done():
return ctx.Err()
return false, ctx.Err()
}
}
client, err1 := local.getImportClient(ctx, s.Id)
client, err1 := importClientFactory.Create(ctx, s.Id)
if err1 != nil {
err = err1
log.FromContext(ctx).Warn("get import client failed", zap.Error(err), zap.String("store", s.Address))
Expand All @@ -700,8 +734,7 @@ func (local *Backend) checkMultiIngestSupport(ctx context.Context) error {
if st, ok := status.FromError(err); ok {
if st.Code() == codes.Unimplemented {
log.FromContext(ctx).Info("multi ingest not support", zap.Any("unsupported store", s))
local.supportMultiIngest = false
return nil
return false, nil
}
}
log.FromContext(ctx).Warn("check multi ingest support failed", zap.Error(err), zap.String("store", s.Address),
Expand All @@ -711,17 +744,15 @@ func (local *Backend) checkMultiIngestSupport(ctx context.Context) error {
// if the cluster contains no TiFlash store, we don't need the multi-ingest feature,
// so in this condition, downgrade the logic instead of return an error.
if hasTiFlash {
return errors.Trace(err)
return false, errors.Trace(err)
}
log.FromContext(ctx).Warn("check multi failed all retry, fallback to false", log.ShortError(err))
local.supportMultiIngest = false
return nil
return false, nil
}
}

local.supportMultiIngest = true
log.FromContext(ctx).Info("multi ingest support")
return nil
return true, nil
}

// rlock read locks a local file and returns the Engine instance if it exists.
Expand Down
5 changes: 2 additions & 3 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1070,13 +1070,12 @@ func TestMultiIngest(t *testing.T) {
return importCli
},
},
logger: log.L(),
}
err := local.checkMultiIngestSupport(context.Background())
supportMultiIngest, err := checkMultiIngestSupport(context.Background(), local.pdCtl, local.importClientFactory)
if err != nil {
require.Contains(t, err.Error(), testCase.retErr)
} else {
require.Equal(t, testCase.supportMutliIngest, local.supportMultiIngest)
require.Equal(t, testCase.supportMutliIngest, supportMultiIngest)
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions br/pkg/restore/split/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"bytes"
"context"
"encoding/hex"
goerrors "errors"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -121,7 +122,7 @@ func PaginateScanRegion(
var batch []*RegionInfo
batch, err = client.ScanRegions(ctx, scanStartKey, endKey, limit)
if err != nil {
err = errors.Annotatef(berrors.ErrPDBatchScanRegion, "scan regions from start-key:%s, err: %s",
err = errors.Annotatef(berrors.ErrPDBatchScanRegion.Wrap(err), "scan regions from start-key:%s, err: %s",
redact.Key(scanStartKey), err.Error())
return err
}
Expand Down Expand Up @@ -235,7 +236,9 @@ func NewWaitRegionOnlineBackoffer() utils.Backoffer {

// NextBackoff returns a duration to wait before retrying again
func (b *WaitRegionOnlineBackoffer) NextBackoff(err error) time.Duration {
if berrors.ErrPDBatchScanRegion.Equal(err) {
// TODO(lance6716): why we only backoff when the error is ErrPDBatchScanRegion?
var perr *errors.Error
if goerrors.As(err, &perr) && berrors.ErrPDBatchScanRegion.ID() == perr.ID() {
// it needs more time to wait splitting the regions that contains data in PITR.
// 2s * 150
delayTime := b.Stat.ExponentialBackoff()
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/restore/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ package restore_test
import (
"context"
"encoding/binary"
goerrors "errors"
"fmt"
"math/rand"
"testing"

"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -353,7 +355,9 @@ func TestPaginateScanRegion(t *testing.T) {
tc.InjectTimes = 5
_, err = split.PaginateScanRegion(ctx, tc, []byte{}, []byte{}, 3)
require.Error(t, err)
require.True(t, berrors.ErrPDBatchScanRegion.Equal(err))
var perr *errors.Error
goerrors.As(err, &perr)
require.EqualValues(t, berrors.ErrPDBatchScanRegion.ID(), perr.ID())

// make the regionMap losing some region, this will cause scan region check fails
// region ID is key+1, so region 4 is deleted
Expand Down
Loading