Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#49599
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
lance6716 authored and ti-chi-bot committed Dec 21, 2023
1 parent 27b00d3 commit 670901c
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 79 deletions.
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ go_library(
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/sessionctx/variable",
"//pkg/store/pdtypes",
"//pkg/table",
"//pkg/tablecodec",
"//pkg/types",
Expand Down Expand Up @@ -82,6 +81,7 @@ go_library(
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//http",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//backoff",
"@org_golang_google_grpc//codes",
Expand Down Expand Up @@ -128,7 +128,6 @@ go_test(
"//br/pkg/lightning/mydump",
"//br/pkg/membuf",
"//br/pkg/mock/mocklocal",
"//br/pkg/pdutil",
"//br/pkg/restore/split",
"//br/pkg/storage",
"//br/pkg/utils",
Expand Down Expand Up @@ -168,6 +167,7 @@ go_test(
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//errs",
"@com_github_tikv_pd_client//http",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//encoding",
Expand Down
102 changes: 63 additions & 39 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ import (
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/store/pdtypes"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/engine"
"github.com/pingcap/tidb/pkg/util/mathutil"
"github.com/tikv/client-go/v2/oracle"
tikvclient "github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
pdhttp "github.com/tikv/pd/client/http"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -466,7 +466,8 @@ type Backend struct {
engines sync.Map // sync version of map[uuid.UUID]*Engine
externalEngine map[uuid.UUID]common.Engine

pdCtl *pdutil.PdController
pdCli pd.Client
pdHTTPCli pdhttp.Client
splitCli split.SplitClient
tikvCli *tikvclient.KVStore
tls *common.TLS
Expand Down Expand Up @@ -502,11 +503,19 @@ func openDuplicateDB(storeDir string) (*pebble.DB, error) {
return pebble.Open(dbPath, opts)
}

const (
pdCliMaxMsgSize = int(128 * units.MiB) // pd.ScanRegion may return a large response
)

var (
// RunInTest indicates whether the current process is running in test.
RunInTest bool
// LastAlloc is the last ID allocator.
LastAlloc manual.Allocator
LastAlloc manual.Allocator
maxCallMsgSize = []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(pdCliMaxMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(pdCliMaxMsgSize)),
}
)

// NewBackend creates new connections to tikv.
Expand All @@ -523,11 +532,20 @@ func NewBackend(
}
}()
config.adjust()
pdCtl, err := pdutil.NewPdController(ctx, config.PDAddr, tls.TLSConfig(), tls.ToPDSecurityOption())
pdAddrs := strings.Split(config.PDAddr, ",")
pdCli, err := pd.NewClientWithContext(
ctx, pdAddrs, tls.ToPDSecurityOption(),
pd.WithGRPCDialOptions(maxCallMsgSize...),
// If the time too short, we may scatter a region many times, because
// the interface `ScatterRegions` may time out.
pd.WithCustomTimeoutOption(60*time.Second),
pd.WithMaxErrorRetry(3),
)
if err != nil {
return nil, common.NormalizeOrWrapErr(common.ErrCreatePDClient, err)
}
splitCli := split.NewSplitClient(pdCtl.GetPDClient(), tls.TLSConfig(), false)
pdHTTPCli := pdhttp.NewClient("lightning", pdAddrs, pdhttp.WithTLSConfig(tls.TLSConfig()))
splitCli := split.NewSplitClient(pdCli, tls.TLSConfig(), false)

shouldCreate := true
if config.CheckpointEnabled {
Expand Down Expand Up @@ -562,9 +580,9 @@ func NewBackend(

var pdCliForTiKV *tikvclient.CodecPDClient
if config.KeyspaceName == "" {
pdCliForTiKV = tikvclient.NewCodecPDClient(tikvclient.ModeTxn, pdCtl.GetPDClient())
pdCliForTiKV = tikvclient.NewCodecPDClient(tikvclient.ModeTxn, pdCli)
} else {
pdCliForTiKV, err = tikvclient.NewCodecPDClientWithKeyspace(tikvclient.ModeTxn, pdCtl.GetPDClient(), config.KeyspaceName)
pdCliForTiKV, err = tikvclient.NewCodecPDClientWithKeyspace(tikvclient.ModeTxn, pdCli, config.KeyspaceName)
if err != nil {
return nil, common.ErrCreatePDClient.Wrap(err).GenWithStackByArgs()
}
Expand Down Expand Up @@ -595,7 +613,8 @@ func NewBackend(
local := &Backend{
engines: sync.Map{},
externalEngine: map[uuid.UUID]common.Engine{},
pdCtl: pdCtl,
pdCli: pdCli,
pdHTTPCli: pdHTTPCli,
splitCli: splitCli,
tikvCli: tikvCli,
tls: tls,
Expand Down Expand Up @@ -635,7 +654,7 @@ func (local *Backend) TotalMemoryConsume() int64 {
}

func (local *Backend) checkMultiIngestSupport(ctx context.Context) error {
stores, err := local.pdCtl.GetPDClient().GetAllStores(ctx, pd.WithExcludeTombstone())
stores, err := local.pdCli.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -802,7 +821,8 @@ func (local *Backend) Close() {
}
}
_ = local.tikvCli.Close()
local.pdCtl.Close()
local.pdHTTPCli.Close()
local.pdCli.Close()
}

// FlushEngine ensure the written data is saved successfully, to make sure no data lose after restart
Expand Down Expand Up @@ -933,7 +953,7 @@ func (local *Backend) allocateTSIfNotExists(ctx context.Context, engine *Engine)
if engine.TS > 0 {
return nil
}
physical, logical, err := local.pdCtl.GetPDClient().GetTS(ctx)
physical, logical, err := local.pdCli.GetTS(ctx)
if err != nil {
return err
}
Expand All @@ -953,7 +973,7 @@ func (local *Backend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig
if err != nil {
return err
}
physical, logical, err := local.pdCtl.GetPDClient().GetTS(ctx)
physical, logical, err := local.pdCli.GetTS(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -1394,6 +1414,27 @@ func (*Backend) isRetryableImportTiKVError(err error) bool {
return common.IsRetryableError(err)
}

func checkDiskAvail(ctx context.Context, store *pdhttp.StoreInfo) error {
logger := log.FromContext(ctx)
capacity, err := units.RAMInBytes(store.Status.Capacity)
if err != nil {
logger.Warn("failed to parse capacity",
zap.String("capacity", store.Status.Capacity), zap.Error(err))
return nil
}
available, err := units.RAMInBytes(store.Status.Available)
if err != nil {
logger.Warn("failed to parse available",
zap.String("available", store.Status.Available), zap.Error(err))
return nil
}
ratio := available * 100 / capacity
if ratio < 10 {
return errors.Errorf("the remaining storage capacity of TiKV(%s) is less than 10%%; please increase the storage capacity of TiKV and try again", store.Store.Address)
}
return nil
}

// executeJob handles a regionJob and tries to convert it to ingested stage.
// If non-retryable error occurs, it will return the error.
// If retryable error occurs, it will return nil and caller should check the stage
Expand All @@ -1408,26 +1449,14 @@ func (local *Backend) executeJob(
})
if local.ShouldCheckTiKV {
for _, peer := range job.region.Region.GetPeers() {
var (
store *pdtypes.StoreInfo
err error
)
for i := 0; i < maxRetryTimes; i++ {
store, err = local.pdCtl.GetStoreInfo(ctx, peer.StoreId)
if err != nil {
continue
}
if store.Status.Capacity > 0 {
// The available disk percent of TiKV
ratio := store.Status.Available * 100 / store.Status.Capacity
if ratio < 10 {
return errors.Errorf("the remaining storage capacity of TiKV(%s) is less than 10%%; please increase the storage capacity of TiKV and try again", store.Store.Address)
}
}
break
}
store, err := local.pdHTTPCli.GetStore(ctx, peer.StoreId)
if err != nil {
log.FromContext(ctx).Error("failed to get StoreInfo from pd http api", zap.Error(err))
continue
}
err = checkDiskAvail(ctx, store)
if err != nil {
return err
}
}
}
Expand Down Expand Up @@ -1496,7 +1525,7 @@ func (local *Backend) ImportEngine(
log.FromContext(ctx).Info("engine contains no kv, skip import", zap.Stringer("engine", engineUUID))
return nil
}
kvRegionSplitSize, kvRegionSplitKeys, err := GetRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls)
kvRegionSplitSize, kvRegionSplitKeys, err := GetRegionSplitSizeKeys(ctx, local.pdCli, local.tls)
if err == nil {
if kvRegionSplitSize > regionSplitSize {
regionSplitSize = kvRegionSplitSize
Expand Down Expand Up @@ -1526,7 +1555,7 @@ func (local *Backend) ImportEngine(
if len(regionRanges[len(regionRanges)-1].End) > 0 {
endKey = codec.EncodeBytes(nil, regionRanges[len(regionRanges)-1].End)
}
done, err := local.pdCtl.PauseSchedulersByKeyRange(subCtx, startKey, endKey)
done, err := pdutil.PauseSchedulersByKeyRange(subCtx, local.pdHTTPCli, startKey, endKey)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1576,7 +1605,7 @@ func (local *Backend) ImportEngine(

// GetRegionSplitSizeKeys gets the region split size and keys from PD.
func (local *Backend) GetRegionSplitSizeKeys(ctx context.Context) (finalSize int64, finalKeys int64, err error) {
return GetRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls)
return GetRegionSplitSizeKeys(ctx, local.pdCli, local.tls)
}

// expose these variables to unit test.
Expand Down Expand Up @@ -1849,7 +1878,7 @@ func (local *Backend) LocalWriter(_ context.Context, cfg *backend.LocalWriterCon
// This function will spawn a goroutine to keep switch mode periodically until the context is done.
// The return done channel is used to notify the caller that the background goroutine is exited.
func (local *Backend) SwitchModeByKeyRanges(ctx context.Context, ranges []common.Range) (<-chan struct{}, error) {
switcher := NewTiKVModeSwitcher(local.tls, local.pdCtl.GetPDClient(), log.FromContext(ctx).Logger)
switcher := NewTiKVModeSwitcher(local.tls, local.pdCli, log.FromContext(ctx).Logger)
done := make(chan struct{})

keyRanges := make([]*sst.Range, 0, len(ranges))
Expand Down Expand Up @@ -1945,11 +1974,6 @@ func (local *Backend) EngineFileSizes() (res []backend.EngineFileSize) {
return
}

// GetPDClient returns the PD client.
func (local *Backend) GetPDClient() pd.Client {
return local.pdCtl.GetPDClient()
}

var getSplitConfFromStoreFunc = getSplitConfFromStore

// return region split size, region split keys, error
Expand Down
17 changes: 10 additions & 7 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
Expand All @@ -60,6 +59,7 @@ import (
"github.com/pingcap/tidb/pkg/util/hack"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/http"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding"
Expand Down Expand Up @@ -1058,11 +1058,9 @@ func TestMultiIngest(t *testing.T) {
err: testCase.err,
multiIngestCheckFn: testCase.multiIngestSupport,
}
pdCtl := &pdutil.PdController{}
pdCtl.SetPDClient(&mockPdClient{stores: stores})

local := &Backend{
pdCtl: pdCtl,
pdCli: &mockPdClient{stores: stores},
importClientFactory: &mockImportClientFactory{
stores: allStores,
createClientFn: func(store *metapb.Store) sst.ImportSSTClient {
Expand Down Expand Up @@ -2303,16 +2301,14 @@ func TestExternalEngine(t *testing.T) {
TotalKVCount: int64(config.SplitRegionKeys) + 1,
}
engineUUID := uuid.New()
pdCtl := &pdutil.PdController{}
pdCtl.SetPDClient(&mockPdClient{})
local := &Backend{
BackendConfig: BackendConfig{
WorkerConcurrency: 2,
},
splitCli: initTestSplitClient([][]byte{
keys[0], keys[50], endKey,
}, nil),
pdCtl: pdCtl,
pdCli: &mockPdClient{},
externalEngine: map[uuid.UUID]common.Engine{},
keyAdapter: common.NoopKeyAdapter{},
}
Expand Down Expand Up @@ -2379,3 +2375,10 @@ func TestGetExternalEngineKVStatistics(t *testing.T) {
require.Zero(t, size)
require.Zero(t, count)
}

func TestCheckDiskAvail(t *testing.T) {
store := &http.StoreInfo{Status: http.StoreStatus{Capacity: "100 GB", Available: "50 GB"}}
ctx := context.Background()
err := checkDiskAvail(ctx, store)
require.NoError(t, err)
}
Loading

0 comments on commit 670901c

Please sign in to comment.