Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: use PD HTTP client #49599

Merged
merged 5 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ go_library(
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/sessionctx/variable",
"//pkg/store/pdtypes",
"//pkg/table",
"//pkg/tablecodec",
"//pkg/types",
Expand Down Expand Up @@ -82,6 +81,7 @@ go_library(
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//http",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//backoff",
"@org_golang_google_grpc//codes",
Expand Down Expand Up @@ -128,7 +128,6 @@ go_test(
"//br/pkg/lightning/mydump",
"//br/pkg/membuf",
"//br/pkg/mock/mocklocal",
"//br/pkg/pdutil",
"//br/pkg/restore/split",
"//br/pkg/storage",
"//br/pkg/utils",
Expand Down Expand Up @@ -168,6 +167,7 @@ go_test(
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//errs",
"@com_github_tikv_pd_client//http",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//encoding",
Expand Down
102 changes: 63 additions & 39 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ import (
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/store/pdtypes"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/engine"
"github.com/tikv/client-go/v2/oracle"
tikvclient "github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
pdhttp "github.com/tikv/pd/client/http"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -466,7 +466,8 @@ type Backend struct {
engines sync.Map // sync version of map[uuid.UUID]*Engine
externalEngine map[uuid.UUID]common.Engine

pdCtl *pdutil.PdController
pdCli pd.Client
pdHTTPCli pdhttp.Client
splitCli split.SplitClient
tikvCli *tikvclient.KVStore
tls *common.TLS
Expand Down Expand Up @@ -502,11 +503,19 @@ func openDuplicateDB(storeDir string) (*pebble.DB, error) {
return pebble.Open(dbPath, opts)
}

const (
pdCliMaxMsgSize = int(128 * units.MiB) // pd.ScanRegion may return a large response
)

var (
// RunInTest indicates whether the current process is running in test.
RunInTest bool
// LastAlloc is the last ID allocator.
LastAlloc manual.Allocator
LastAlloc manual.Allocator
maxCallMsgSize = []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(pdCliMaxMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(pdCliMaxMsgSize)),
}
)

// NewBackend creates new connections to tikv.
Expand All @@ -523,11 +532,20 @@ func NewBackend(
}
}()
config.adjust()
pdCtl, err := pdutil.NewPdController(ctx, config.PDAddr, tls.TLSConfig(), tls.ToPDSecurityOption())
pdAddrs := strings.Split(config.PDAddr, ",")
pdCli, err := pd.NewClientWithContext(
ctx, pdAddrs, tls.ToPDSecurityOption(),
pd.WithGRPCDialOptions(maxCallMsgSize...),
// If the time too short, we may scatter a region many times, because
// the interface `ScatterRegions` may time out.
pd.WithCustomTimeoutOption(60*time.Second),
pd.WithMaxErrorRetry(3),
)
if err != nil {
return nil, common.NormalizeOrWrapErr(common.ErrCreatePDClient, err)
}
splitCli := split.NewSplitClient(pdCtl.GetPDClient(), tls.TLSConfig(), false)
pdHTTPCli := pdhttp.NewClient("lightning", pdAddrs, pdhttp.WithTLSConfig(tls.TLSConfig()))
splitCli := split.NewSplitClient(pdCli, tls.TLSConfig(), false)

shouldCreate := true
if config.CheckpointEnabled {
Expand Down Expand Up @@ -562,9 +580,9 @@ func NewBackend(

var pdCliForTiKV *tikvclient.CodecPDClient
if config.KeyspaceName == "" {
pdCliForTiKV = tikvclient.NewCodecPDClient(tikvclient.ModeTxn, pdCtl.GetPDClient())
pdCliForTiKV = tikvclient.NewCodecPDClient(tikvclient.ModeTxn, pdCli)
} else {
pdCliForTiKV, err = tikvclient.NewCodecPDClientWithKeyspace(tikvclient.ModeTxn, pdCtl.GetPDClient(), config.KeyspaceName)
pdCliForTiKV, err = tikvclient.NewCodecPDClientWithKeyspace(tikvclient.ModeTxn, pdCli, config.KeyspaceName)
if err != nil {
return nil, common.ErrCreatePDClient.Wrap(err).GenWithStackByArgs()
}
Expand Down Expand Up @@ -595,7 +613,8 @@ func NewBackend(
local := &Backend{
engines: sync.Map{},
externalEngine: map[uuid.UUID]common.Engine{},
pdCtl: pdCtl,
pdCli: pdCli,
pdHTTPCli: pdHTTPCli,
splitCli: splitCli,
tikvCli: tikvCli,
tls: tls,
Expand Down Expand Up @@ -635,7 +654,7 @@ func (local *Backend) TotalMemoryConsume() int64 {
}

func (local *Backend) checkMultiIngestSupport(ctx context.Context) error {
stores, err := local.pdCtl.GetPDClient().GetAllStores(ctx, pd.WithExcludeTombstone())
stores, err := local.pdCli.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -802,7 +821,8 @@ func (local *Backend) Close() {
}
}
_ = local.tikvCli.Close()
local.pdCtl.Close()
local.pdHTTPCli.Close()
local.pdCli.Close()
}

// FlushEngine ensure the written data is saved successfully, to make sure no data lose after restart
Expand Down Expand Up @@ -933,7 +953,7 @@ func (local *Backend) allocateTSIfNotExists(ctx context.Context, engine *Engine)
if engine.TS > 0 {
return nil
}
physical, logical, err := local.pdCtl.GetPDClient().GetTS(ctx)
physical, logical, err := local.pdCli.GetTS(ctx)
if err != nil {
return err
}
Expand All @@ -953,7 +973,7 @@ func (local *Backend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig
if err != nil {
return err
}
physical, logical, err := local.pdCtl.GetPDClient().GetTS(ctx)
physical, logical, err := local.pdCli.GetTS(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -1400,6 +1420,27 @@ func (*Backend) isRetryableImportTiKVError(err error) bool {
return common.IsRetryableError(err)
}

func checkDiskAvail(ctx context.Context, store *pdhttp.StoreInfo) error {
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
logger := log.FromContext(ctx)
capacity, err := units.RAMInBytes(store.Status.Capacity)
if err != nil {
logger.Warn("failed to parse capacity",
zap.String("capacity", store.Status.Capacity), zap.Error(err))
return nil
}
available, err := units.RAMInBytes(store.Status.Available)
if err != nil {
logger.Warn("failed to parse available",
zap.String("available", store.Status.Available), zap.Error(err))
return nil
}
ratio := available * 100 / capacity
if ratio < 10 {
return errors.Errorf("the remaining storage capacity of TiKV(%s) is less than 10%%; please increase the storage capacity of TiKV and try again", store.Store.Address)
}
return nil
}

// executeJob handles a regionJob and tries to convert it to ingested stage.
// If non-retryable error occurs, it will return the error.
// If retryable error occurs, it will return nil and caller should check the stage
Expand All @@ -1414,26 +1455,14 @@ func (local *Backend) executeJob(
})
if local.ShouldCheckTiKV {
for _, peer := range job.region.Region.GetPeers() {
var (
store *pdtypes.StoreInfo
err error
)
for i := 0; i < maxRetryTimes; i++ {
store, err = local.pdCtl.GetStoreInfo(ctx, peer.StoreId)
if err != nil {
continue
}
if store.Status.Capacity > 0 {
// The available disk percent of TiKV
ratio := store.Status.Available * 100 / store.Status.Capacity
if ratio < 10 {
return errors.Errorf("the remaining storage capacity of TiKV(%s) is less than 10%%; please increase the storage capacity of TiKV and try again", store.Store.Address)
}
}
break
}
store, err := local.pdHTTPCli.GetStore(ctx, peer.StoreId)
if err != nil {
log.FromContext(ctx).Error("failed to get StoreInfo from pd http api", zap.Error(err))
continue
}
err = checkDiskAvail(ctx, store)
if err != nil {
return err
}
}
}
Expand Down Expand Up @@ -1502,7 +1531,7 @@ func (local *Backend) ImportEngine(
log.FromContext(ctx).Info("engine contains no kv, skip import", zap.Stringer("engine", engineUUID))
return nil
}
kvRegionSplitSize, kvRegionSplitKeys, err := GetRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls)
kvRegionSplitSize, kvRegionSplitKeys, err := GetRegionSplitSizeKeys(ctx, local.pdCli, local.tls)
if err == nil {
if kvRegionSplitSize > regionSplitSize {
regionSplitSize = kvRegionSplitSize
Expand Down Expand Up @@ -1532,7 +1561,7 @@ func (local *Backend) ImportEngine(
if len(regionRanges[len(regionRanges)-1].End) > 0 {
endKey = codec.EncodeBytes(nil, regionRanges[len(regionRanges)-1].End)
}
done, err := local.pdCtl.PauseSchedulersByKeyRange(subCtx, startKey, endKey)
done, err := pdutil.PauseSchedulersByKeyRange(subCtx, local.pdHTTPCli, startKey, endKey)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1582,7 +1611,7 @@ func (local *Backend) ImportEngine(

// GetRegionSplitSizeKeys gets the region split size and keys from PD.
func (local *Backend) GetRegionSplitSizeKeys(ctx context.Context) (finalSize int64, finalKeys int64, err error) {
return GetRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls)
return GetRegionSplitSizeKeys(ctx, local.pdCli, local.tls)
}

// expose these variables to unit test.
Expand Down Expand Up @@ -1865,7 +1894,7 @@ func (local *Backend) LocalWriter(_ context.Context, cfg *backend.LocalWriterCon
// This function will spawn a goroutine to keep switch mode periodically until the context is done.
// The return done channel is used to notify the caller that the background goroutine is exited.
func (local *Backend) SwitchModeByKeyRanges(ctx context.Context, ranges []common.Range) (<-chan struct{}, error) {
switcher := NewTiKVModeSwitcher(local.tls, local.pdCtl.GetPDClient(), log.FromContext(ctx).Logger)
switcher := NewTiKVModeSwitcher(local.tls, local.pdCli, log.FromContext(ctx).Logger)
done := make(chan struct{})

keyRanges := make([]*sst.Range, 0, len(ranges))
Expand Down Expand Up @@ -1961,11 +1990,6 @@ func (local *Backend) EngineFileSizes() (res []backend.EngineFileSize) {
return
}

// GetPDClient returns the PD client.
func (local *Backend) GetPDClient() pd.Client {
return local.pdCtl.GetPDClient()
}

var getSplitConfFromStoreFunc = getSplitConfFromStore

// return region split size, region split keys, error
Expand Down
17 changes: 10 additions & 7 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
Expand All @@ -60,6 +59,7 @@ import (
"github.com/pingcap/tidb/pkg/util/hack"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/http"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding"
Expand Down Expand Up @@ -1058,11 +1058,9 @@ func TestMultiIngest(t *testing.T) {
err: testCase.err,
multiIngestCheckFn: testCase.multiIngestSupport,
}
pdCtl := &pdutil.PdController{}
pdCtl.SetPDClient(&mockPdClient{stores: stores})

local := &Backend{
pdCtl: pdCtl,
pdCli: &mockPdClient{stores: stores},
importClientFactory: &mockImportClientFactory{
stores: allStores,
createClientFn: func(store *metapb.Store) sst.ImportSSTClient {
Expand Down Expand Up @@ -2308,16 +2306,14 @@ func TestExternalEngine(t *testing.T) {
TotalKVCount: int64(config.SplitRegionKeys) + 1,
}
engineUUID := uuid.New()
pdCtl := &pdutil.PdController{}
pdCtl.SetPDClient(&mockPdClient{})
local := &Backend{
BackendConfig: BackendConfig{
WorkerConcurrency: 2,
},
splitCli: initTestSplitClient([][]byte{
keys[0], keys[50], endKey,
}, nil),
pdCtl: pdCtl,
pdCli: &mockPdClient{},
externalEngine: map[uuid.UUID]common.Engine{},
keyAdapter: common.NoopKeyAdapter{},
}
Expand Down Expand Up @@ -2384,3 +2380,10 @@ func TestGetExternalEngineKVStatistics(t *testing.T) {
require.Zero(t, size)
require.Zero(t, count)
}

func TestCheckDiskAvail(t *testing.T) {
store := &http.StoreInfo{Status: http.StoreStatus{Capacity: "100 GB", Available: "50 GB"}}
ctx := context.Background()
err := checkDiskAvail(ctx, store)
require.NoError(t, err)
}
Loading
Loading