Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#53664
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
D3Hunter authored and ti-chi-bot committed Jul 15, 2024
1 parent ed1e010 commit 3daa761
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 18 deletions.
136 changes: 122 additions & 14 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,7 @@ func NewBackend(
config BackendConfig,
regionSizeGetter TableRegionSizeGetter,
) (b *Backend, err error) {
<<<<<<< HEAD:br/pkg/lightning/backend/local/local.go
var duplicateDB *pebble.DB
defer func() {
if err != nil && duplicateDB != nil {
Expand All @@ -549,6 +550,61 @@ func NewBackend(
}()
config.adjust()
pdCtl, err := pdutil.NewPdController(ctx, config.PDAddr, tls.TLSConfig(), tls.ToPDSecurityOption())
=======
var (
pdCli pd.Client
spkv *tikvclient.EtcdSafePointKV
pdCliForTiKV *tikvclient.CodecPDClient
rpcCli tikvclient.Client
tikvCli *tikvclient.KVStore
pdHTTPCli pdhttp.Client
importClientFactory *importClientFactoryImpl
multiIngestSupported bool
)
defer func() {
if err == nil {
return
}
if importClientFactory != nil {
importClientFactory.Close()
}
if pdHTTPCli != nil {
pdHTTPCli.Close()
}
if tikvCli != nil {
// tikvCli uses pdCliForTiKV(which wraps pdCli) , spkv and rpcCli, so
// close tikvCli will close all of them.
_ = tikvCli.Close()
} else {
if rpcCli != nil {
_ = rpcCli.Close()
}
if spkv != nil {
_ = spkv.Close()
}
// pdCliForTiKV wraps pdCli, so we only need close pdCli
if pdCli != nil {
pdCli.Close()
}
}
}()
config.adjust()
var pdAddrs []string
if pdSvcDiscovery != nil {
pdAddrs = pdSvcDiscovery.GetServiceURLs()
// TODO(lance6716): if PD client can support creating a client with external
// service discovery, we can directly pass pdSvcDiscovery.
} else {
pdAddrs = strings.Split(config.PDAddr, ",")
}
pdCli, err = pd.NewClientWithContext(
ctx, pdAddrs, tls.ToPDSecurityOption(),
pd.WithGRPCDialOptions(maxCallMsgSize...),
// If the time too short, we may scatter a region many times, because
// the interface `ScatterRegions` may time out.
pd.WithCustomTimeoutOption(60*time.Second),
)
>>>>>>> 29bf0083a6b (localbackend: fix resource leak when err on new local backend (#53664)):pkg/lightning/backend/local/local.go
if err != nil {
return nil, common.NormalizeOrWrapErr(common.ErrCreatePDClient, err)
}
Expand Down Expand Up @@ -580,12 +636,11 @@ func NewBackend(
}

// The following copies tikv.NewTxnClient without creating yet another pdClient.
spkv, err := tikvclient.NewEtcdSafePointKV(strings.Split(config.PDAddr, ","), tls.TLSConfig())
spkv, err = tikvclient.NewEtcdSafePointKV(strings.Split(config.PDAddr, ","), tls.TLSConfig())
if err != nil {
return nil, common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs()
}

var pdCliForTiKV *tikvclient.CodecPDClient
if config.KeyspaceName == "" {
pdCliForTiKV = tikvclient.NewCodecPDClient(tikvclient.ModeTxn, pdCtl.GetPDClient())
} else {
Expand All @@ -596,16 +651,32 @@ func NewBackend(
}

tikvCodec := pdCliForTiKV.GetCodec()
rpcCli := tikvclient.NewRPCClient(tikvclient.WithSecurity(tls.ToTiKVSecurityConfig()), tikvclient.WithCodec(tikvCodec))
tikvCli, err := tikvclient.NewKVStore("lightning-local-backend", pdCliForTiKV, spkv, rpcCli)
rpcCli = tikvclient.NewRPCClient(tikvclient.WithSecurity(tls.ToTiKVSecurityConfig()), tikvclient.WithCodec(tikvCodec))
tikvCli, err = tikvclient.NewKVStore("lightning-local-backend", pdCliForTiKV, spkv, rpcCli)
if err != nil {
return nil, common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs()
}
<<<<<<< HEAD:br/pkg/lightning/backend/local/local.go
importClientFactory := newImportClientFactoryImpl(splitCli, tls, config.MaxConnPerStore, config.ConnCompressType)
keyAdapter := common.KeyAdapter(common.NoopKeyAdapter{})
if config.DupeDetectEnabled {
keyAdapter = common.DupDetectKeyAdapter{}
}
=======
pdHTTPCli = pdhttp.NewClientWithServiceDiscovery(
"lightning",
pdCli.GetServiceDiscovery(),
pdhttp.WithTLSConfig(tls.TLSConfig()),
).WithBackoffer(retry.InitialBackoffer(time.Second, time.Second, pdutil.PDRequestRetryTime*time.Second))
splitCli := split.NewClient(pdCli, pdHTTPCli, tls.TLSConfig(), config.RegionSplitBatchSize, config.RegionSplitConcurrency)
importClientFactory = newImportClientFactoryImpl(splitCli, tls, config.MaxConnPerStore, config.ConnCompressType)

multiIngestSupported, err = checkMultiIngestSupport(ctx, pdCli, importClientFactory)
if err != nil {
return nil, common.ErrCheckMultiIngest.Wrap(err).GenWithStackByArgs()
}

>>>>>>> 29bf0083a6b (localbackend: fix resource leak when err on new local backend (#53664)):pkg/lightning/backend/local/local.go
var writeLimiter StoreWriteLimiter
if config.StoreWriteBWLimit > 0 {
writeLimiter = newStoreWriteLimiter(config.StoreWriteBWLimit)
Expand All @@ -629,18 +700,53 @@ func NewBackend(

BackendConfig: config,

<<<<<<< HEAD:br/pkg/lightning/backend/local/local.go
duplicateDB: duplicateDB,
keyAdapter: keyAdapter,
=======
supportMultiIngest: multiIngestSupported,
>>>>>>> 29bf0083a6b (localbackend: fix resource leak when err on new local backend (#53664)):pkg/lightning/backend/local/local.go
importClientFactory: importClientFactory,
bufferPool: membuf.NewPool(membuf.WithAllocator(alloc)),
writeLimiter: writeLimiter,
logger: log.FromContext(ctx),
}
<<<<<<< HEAD:br/pkg/lightning/backend/local/local.go
if m, ok := metric.GetCommonMetric(ctx); ok {
local.metrics = m
}
if err = local.checkMultiIngestSupport(ctx); err != nil {
return nil, common.ErrCheckMultiIngest.Wrap(err).GenWithStackByArgs()
=======
local.engineMgr, err = newEngineManager(config, local, local.logger)
if err != nil {
return nil, err
}
if m, ok := metric.GetCommonMetric(ctx); ok {
local.metrics = m
}
local.tikvSideCheckFreeSpace(ctx)

return local, nil
}

// NewBackendForTest creates a new Backend for test.
func NewBackendForTest(ctx context.Context, config BackendConfig, storeHelper StoreHelper) (*Backend, error) {
config.adjust()

logger := log.FromContext(ctx)
engineMgr, err := newEngineManager(config, storeHelper, logger)
if err != nil {
return nil, err
}
local := &Backend{
BackendConfig: config,
logger: logger,
engineMgr: engineMgr,
}
if m, ok := metric.GetCommonMetric(ctx); ok {
local.metrics = m
>>>>>>> 29bf0083a6b (localbackend: fix resource leak when err on new local backend (#53664)):pkg/lightning/backend/local/local.go
}

return local, nil
Expand All @@ -659,10 +765,15 @@ func (local *Backend) TotalMemoryConsume() int64 {
return memConsume + local.bufferPool.TotalSize()
}

<<<<<<< HEAD:br/pkg/lightning/backend/local/local.go
func (local *Backend) checkMultiIngestSupport(ctx context.Context) error {
stores, err := local.pdCtl.GetPDClient().GetAllStores(ctx, pd.WithExcludeTombstone())
=======
func checkMultiIngestSupport(ctx context.Context, pdCli pd.Client, importClientFactory ImportClientFactory) (bool, error) {
stores, err := pdCli.GetAllStores(ctx, pd.WithExcludeTombstone())
>>>>>>> 29bf0083a6b (localbackend: fix resource leak when err on new local backend (#53664)):pkg/lightning/backend/local/local.go
if err != nil {
return errors.Trace(err)
return false, errors.Trace(err)
}

hasTiFlash := false
Expand All @@ -684,10 +795,10 @@ func (local *Backend) checkMultiIngestSupport(ctx context.Context) error {
select {
case <-time.After(100 * time.Millisecond):
case <-ctx.Done():
return ctx.Err()
return false, ctx.Err()
}
}
client, err1 := local.getImportClient(ctx, s.Id)
client, err1 := importClientFactory.Create(ctx, s.Id)
if err1 != nil {
err = err1
log.FromContext(ctx).Warn("get import client failed", zap.Error(err), zap.String("store", s.Address))
Expand All @@ -700,8 +811,7 @@ func (local *Backend) checkMultiIngestSupport(ctx context.Context) error {
if st, ok := status.FromError(err); ok {
if st.Code() == codes.Unimplemented {
log.FromContext(ctx).Info("multi ingest not support", zap.Any("unsupported store", s))
local.supportMultiIngest = false
return nil
return false, nil
}
}
log.FromContext(ctx).Warn("check multi ingest support failed", zap.Error(err), zap.String("store", s.Address),
Expand All @@ -711,17 +821,15 @@ func (local *Backend) checkMultiIngestSupport(ctx context.Context) error {
// if the cluster contains no TiFlash store, we don't need the multi-ingest feature,
// so in this condition, downgrade the logic instead of return an error.
if hasTiFlash {
return errors.Trace(err)
return false, errors.Trace(err)
}
log.FromContext(ctx).Warn("check multi failed all retry, fallback to false", log.ShortError(err))
local.supportMultiIngest = false
return nil
return false, nil
}
}

local.supportMultiIngest = true
log.FromContext(ctx).Info("multi ingest support")
return nil
return true, nil
}

// rlock read locks a local file and returns the Engine instance if it exists.
Expand Down
5 changes: 2 additions & 3 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1070,13 +1070,12 @@ func TestMultiIngest(t *testing.T) {
return importCli
},
},
logger: log.L(),
}
err := local.checkMultiIngestSupport(context.Background())
supportMultiIngest, err := checkMultiIngestSupport(context.Background(), local.pdCli, local.importClientFactory)
if err != nil {
require.Contains(t, err.Error(), testCase.retErr)
} else {
require.Equal(t, testCase.supportMutliIngest, local.supportMultiIngest)
require.Equal(t, testCase.supportMutliIngest, supportMultiIngest)
}
}
}
Expand Down
9 changes: 8 additions & 1 deletion br/pkg/restore/split/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"bytes"
"context"
"encoding/hex"
goerrors "errors"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -121,7 +122,7 @@ func PaginateScanRegion(
var batch []*RegionInfo
batch, err = client.ScanRegions(ctx, scanStartKey, endKey, limit)
if err != nil {
err = errors.Annotatef(berrors.ErrPDBatchScanRegion, "scan regions from start-key:%s, err: %s",
err = errors.Annotatef(berrors.ErrPDBatchScanRegion.Wrap(err), "scan regions from start-key:%s, err: %s",
redact.Key(scanStartKey), err.Error())
return err
}
Expand Down Expand Up @@ -235,7 +236,13 @@ func NewWaitRegionOnlineBackoffer() utils.Backoffer {

// NextBackoff returns a duration to wait before retrying again
func (b *WaitRegionOnlineBackoffer) NextBackoff(err error) time.Duration {
<<<<<<< HEAD
if berrors.ErrPDBatchScanRegion.Equal(err) {
=======
// TODO(lance6716): why we only backoff when the error is ErrPDBatchScanRegion?
var perr *errors.Error
if goerrors.As(err, &perr) && berrors.ErrPDBatchScanRegion.ID() == perr.ID() {
>>>>>>> 29bf0083a6b (localbackend: fix resource leak when err on new local backend (#53664))
// it needs more time to wait splitting the regions that contains data in PITR.
// 2s * 150
delayTime := b.Stat.ExponentialBackoff()
Expand Down

0 comments on commit 3daa761

Please sign in to comment.