From 95759f44bd56e6039533587aec4afd61c366b4c1 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Mon, 20 Nov 2023 12:20:42 +0800 Subject: [PATCH] *: further adaptation to PD HTTP client (#48606) ref pingcap/tidb#35319 --- DEPS.bzl | 24 +- br/pkg/lightning/common/BUILD.bazel | 2 +- br/pkg/lightning/common/security_test.go | 6 +- br/pkg/lightning/importer/BUILD.bazel | 4 +- br/pkg/lightning/importer/get_pre_info.go | 8 +- .../lightning/importer/table_import_test.go | 6 +- br/pkg/lightning/tikv/BUILD.bazel | 4 +- br/pkg/lightning/tikv/tikv.go | 4 +- br/pkg/lightning/tikv/tikv_test.go | 6 +- br/pkg/pdutil/BUILD.bazel | 4 +- br/pkg/pdutil/pd.go | 44 ++-- br/pkg/pdutil/pd_serial_test.go | 6 +- br/pkg/pdutil/utils.go | 4 +- br/pkg/restore/split/BUILD.bazel | 2 +- br/pkg/restore/split/client.go | 12 +- go.mod | 4 +- go.sum | 8 +- pkg/domain/infosync/BUILD.bazel | 1 - pkg/domain/infosync/info.go | 5 +- pkg/domain/infosync/label_manager.go | 10 +- pkg/domain/infosync/placement_manager.go | 9 +- pkg/domain/infosync/region.go | 4 +- pkg/domain/infosync/schedule_manager.go | 6 +- pkg/domain/infosync/tiflash_manager.go | 30 +-- pkg/executor/BUILD.bazel | 2 - pkg/executor/builder.go | 2 +- .../hot_regions_history_table_test.go | 6 +- pkg/executor/infoschema_cluster_table_test.go | 13 +- pkg/executor/infoschema_reader.go | 36 ++- pkg/executor/internal/pdhelper/pd.go | 20 +- pkg/executor/internal/pdhelper/pd_test.go | 22 +- pkg/executor/memtable_reader.go | 15 +- pkg/executor/memtable_reader_test.go | 4 +- pkg/executor/set_config.go | 4 +- pkg/executor/split.go | 6 +- pkg/executor/tikv_regions_peers_table_test.go | 7 +- pkg/infoschema/BUILD.bazel | 2 +- pkg/infoschema/perfschema/BUILD.bazel | 4 +- pkg/infoschema/perfschema/tables.go | 14 +- pkg/infoschema/perfschema/tables_test.go | 14 +- pkg/infoschema/tables.go | 4 +- .../test/clustertablestest/BUILD.bazel | 1 - .../clustertablestest/cluster_tables_test.go | 7 +- pkg/server/handler/tikvhandler/BUILD.bazel | 2 +- .../handler/tikvhandler/tikv_handler.go | 10 +- pkg/store/helper/BUILD.bazel | 3 - pkg/store/helper/helper.go | 218 ++---------------- pkg/store/helper/helper_test.go | 16 +- pkg/util/pdapi/BUILD.bazel | 8 - pkg/util/pdapi/const.go | 96 -------- 50 files changed, 238 insertions(+), 511 deletions(-) delete mode 100644 pkg/util/pdapi/BUILD.bazel delete mode 100644 pkg/util/pdapi/const.go diff --git a/DEPS.bzl b/DEPS.bzl index 2bd2c06208b6a..ce20a653810d4 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -7119,26 +7119,26 @@ def go_deps(): name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sha256 = "285edca3320cc8847aceffb5d5471fe7483c49f66795622f71ed819c72635d00", - strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20231114060955-8fc8a528217e", + sha256 = "548df2ca5c27559e3318b97b4cb91703d5c253410e7f9fa0eb926e2d3aa28b59", + strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20231116051730-1c2351c28173", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231114060955-8fc8a528217e.zip", - "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231114060955-8fc8a528217e.zip", - "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231114060955-8fc8a528217e.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231114060955-8fc8a528217e.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231116051730-1c2351c28173.zip", + "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231116051730-1c2351c28173.zip", + "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231116051730-1c2351c28173.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231116051730-1c2351c28173.zip", ], ) go_repository( name = "com_github_tikv_pd_client", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/pd/client", - sha256 = "cb510944ce56555f005fff2d891af3fefa667f37955779b89c35fd40f51deace", - strip_prefix = "github.com/tikv/pd/client@v0.0.0-20231114041114-86831ce71865", + sha256 = "014bb8796797b8b5cecc22866a1aab8491e3718c540168ac91257cf7f220cc84", + strip_prefix = "github.com/tikv/pd/client@v0.0.0-20231117041718-dda748abe55d", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231114041114-86831ce71865.zip", - "http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231114041114-86831ce71865.zip", - "https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231114041114-86831ce71865.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231114041114-86831ce71865.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231117041718-dda748abe55d.zip", + "http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231117041718-dda748abe55d.zip", + "https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231117041718-dda748abe55d.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231117041718-dda748abe55d.zip", ], ) go_repository( diff --git a/br/pkg/lightning/common/BUILD.bazel b/br/pkg/lightning/common/BUILD.bazel index 8174dba42b872..77c214ef13dfd 100644 --- a/br/pkg/lightning/common/BUILD.bazel +++ b/br/pkg/lightning/common/BUILD.bazel @@ -128,13 +128,13 @@ go_test( "//pkg/testkit/testsetup", "//pkg/util/dbutil", "//pkg/util/mock", - "//pkg/util/pdapi", "@com_github_data_dog_go_sqlmock//:go-sqlmock", "@com_github_go_sql_driver_mysql//:mysql", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", + "@com_github_tikv_pd_client//http", "@io_etcd_go_etcd_client_v3//:client", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", diff --git a/br/pkg/lightning/common/security_test.go b/br/pkg/lightning/common/security_test.go index 4b4e86c54006d..e9f415e927464 100644 --- a/br/pkg/lightning/common/security_test.go +++ b/br/pkg/lightning/common/security_test.go @@ -26,8 +26,8 @@ import ( "testing" "github.com/pingcap/tidb/br/pkg/lightning/common" - "github.com/pingcap/tidb/pkg/util/pdapi" "github.com/stretchr/testify/require" + pd "github.com/tikv/pd/client/http" ) func respondPathHandler(w http.ResponseWriter, req *http.Request) { @@ -94,8 +94,8 @@ func TestWithHost(t *testing.T) { false, }, { - fmt.Sprintf("http://127.0.0.1:2379%s", pdapi.Stores), - fmt.Sprintf("127.0.0.1:2379%s", pdapi.Stores), + fmt.Sprintf("http://127.0.0.1:2379%s", pd.Stores), + fmt.Sprintf("127.0.0.1:2379%s", pd.Stores), false, }, { diff --git a/br/pkg/lightning/importer/BUILD.bazel b/br/pkg/lightning/importer/BUILD.bazel index 6132a4e4cc399..3c7a9ce68ab38 100644 --- a/br/pkg/lightning/importer/BUILD.bazel +++ b/br/pkg/lightning/importer/BUILD.bazel @@ -74,7 +74,6 @@ go_library( "//pkg/util/etcd", "//pkg/util/extsort", "//pkg/util/mock", - "//pkg/util/pdapi", "//pkg/util/regexpr-router", "//pkg/util/set", "@com_github_coreos_go_semver//semver", @@ -90,6 +89,7 @@ go_library( "@com_github_tikv_client_go_v2//config", "@com_github_tikv_client_go_v2//util", "@com_github_tikv_pd_client//:client", + "@com_github_tikv_pd_client//http", "@io_etcd_go_etcd_client_v3//:client", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//codes", @@ -163,7 +163,6 @@ go_test( "//pkg/util/dbutil", "//pkg/util/extsort", "//pkg/util/mock", - "//pkg/util/pdapi", "//pkg/util/promutil", "//pkg/util/table-filter", "//pkg/util/table-router", @@ -179,6 +178,7 @@ go_test( "@com_github_tikv_client_go_v2//config", "@com_github_tikv_client_go_v2//testutils", "@com_github_tikv_pd_client//:client", + "@com_github_tikv_pd_client//http", "@com_github_xitongsys_parquet_go//writer", "@com_github_xitongsys_parquet_go_source//buffer", "@io_etcd_go_etcd_client_v3//:client", diff --git a/br/pkg/lightning/importer/get_pre_info.go b/br/pkg/lightning/importer/get_pre_info.go index 2e1f3cb980f4e..b7f0ac04cb8e1 100644 --- a/br/pkg/lightning/importer/get_pre_info.go +++ b/br/pkg/lightning/importer/get_pre_info.go @@ -50,8 +50,8 @@ import ( "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/dbterror" "github.com/pingcap/tidb/pkg/util/mock" - "github.com/pingcap/tidb/pkg/util/pdapi" pd "github.com/tikv/pd/client" + pdhttp "github.com/tikv/pd/client/http" "go.uber.org/zap" "golang.org/x/exp/maps" ) @@ -237,7 +237,7 @@ func (g *TargetInfoGetterImpl) GetTargetSysVariablesForImport(ctx context.Contex // It uses the PD interface through TLS to get the information. func (g *TargetInfoGetterImpl) GetReplicationConfig(ctx context.Context) (*pdtypes.ReplicationConfig, error) { result := new(pdtypes.ReplicationConfig) - if err := g.tls.WithHost(g.pdCli.GetLeaderAddr()).GetJSON(ctx, pdapi.ReplicateConfig, &result); err != nil { + if err := g.tls.WithHost(g.pdCli.GetLeaderAddr()).GetJSON(ctx, pdhttp.ReplicateConfig, &result); err != nil { return nil, errors.Trace(err) } return result, nil @@ -248,7 +248,7 @@ func (g *TargetInfoGetterImpl) GetReplicationConfig(ctx context.Context) (*pdtyp // It uses the PD interface through TLS to get the information. func (g *TargetInfoGetterImpl) GetStorageInfo(ctx context.Context) (*pdtypes.StoresInfo, error) { result := new(pdtypes.StoresInfo) - if err := g.tls.WithHost(g.pdCli.GetLeaderAddr()).GetJSON(ctx, pdapi.Stores, result); err != nil { + if err := g.tls.WithHost(g.pdCli.GetLeaderAddr()).GetJSON(ctx, pdhttp.Stores, result); err != nil { return nil, errors.Trace(err) } return result, nil @@ -259,7 +259,7 @@ func (g *TargetInfoGetterImpl) GetStorageInfo(ctx context.Context) (*pdtypes.Sto // It uses the PD interface through TLS to get the information. func (g *TargetInfoGetterImpl) GetEmptyRegionsInfo(ctx context.Context) (*pdtypes.RegionsInfo, error) { result := new(pdtypes.RegionsInfo) - if err := g.tls.WithHost(g.pdCli.GetLeaderAddr()).GetJSON(ctx, pdapi.EmptyRegions, &result); err != nil { + if err := g.tls.WithHost(g.pdCli.GetLeaderAddr()).GetJSON(ctx, pdhttp.EmptyRegions, &result); err != nil { return nil, errors.Trace(err) } return result, nil diff --git a/br/pkg/lightning/importer/table_import_test.go b/br/pkg/lightning/importer/table_import_test.go index c4fc26190f9ee..3891dcde52e30 100644 --- a/br/pkg/lightning/importer/table_import_test.go +++ b/br/pkg/lightning/importer/table_import_test.go @@ -64,13 +64,13 @@ import ( "github.com/pingcap/tidb/pkg/table/tables" "github.com/pingcap/tidb/pkg/types" tmock "github.com/pingcap/tidb/pkg/util/mock" - "github.com/pingcap/tidb/pkg/util/pdapi" "github.com/pingcap/tidb/pkg/util/promutil" filter "github.com/pingcap/tidb/pkg/util/table-filter" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/client-go/v2/testutils" pd "github.com/tikv/pd/client" + pdhttp "github.com/tikv/pd/client/http" "go.uber.org/mock/gomock" ) @@ -1324,9 +1324,9 @@ func (s *tableRestoreSuite) TestCheckClusterRegion() { for i, ca := range testCases { server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { var err error - if req.URL.Path == pdapi.Stores { + if req.URL.Path == pdhttp.Stores { _, err = w.Write(mustMarshal(ca.stores)) - } else if req.URL.Path == pdapi.EmptyRegions { + } else if req.URL.Path == pdhttp.EmptyRegions { _, err = w.Write(mustMarshal(ca.emptyRegions)) } else { w.WriteHeader(http.StatusNotFound) diff --git a/br/pkg/lightning/tikv/BUILD.bazel b/br/pkg/lightning/tikv/BUILD.bazel index 457e04b66f79d..f13cdd3301b26 100644 --- a/br/pkg/lightning/tikv/BUILD.bazel +++ b/br/pkg/lightning/tikv/BUILD.bazel @@ -13,13 +13,13 @@ go_library( "//br/pkg/version", "//pkg/kv", "//pkg/parser/model", - "//pkg/util/pdapi", "@com_github_coreos_go_semver//semver", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/debugpb", "@com_github_pingcap_kvproto//pkg/import_sstpb", "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_tikv_client_go_v2//util", + "@com_github_tikv_pd_client//http", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", @@ -37,9 +37,9 @@ go_test( deps = [ ":tikv", "//br/pkg/lightning/common", - "//pkg/util/pdapi", "@com_github_coreos_go_semver//semver", "@com_github_pingcap_kvproto//pkg/import_sstpb", "@com_github_stretchr_testify//require", + "@com_github_tikv_pd_client//http", ], ) diff --git a/br/pkg/lightning/tikv/tikv.go b/br/pkg/lightning/tikv/tikv.go index 9a1e674ac992e..397e69795cb9c 100644 --- a/br/pkg/lightning/tikv/tikv.go +++ b/br/pkg/lightning/tikv/tikv.go @@ -32,8 +32,8 @@ import ( "github.com/pingcap/tidb/br/pkg/version" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/util/pdapi" "github.com/tikv/client-go/v2/util" + pd "github.com/tikv/pd/client/http" "go.uber.org/zap" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -125,7 +125,7 @@ func ForAllStores( Store Store } } - err := tls.GetJSON(ctx, pdapi.Stores, &stores) + err := tls.GetJSON(ctx, pd.Stores, &stores) if err != nil { return err } diff --git a/br/pkg/lightning/tikv/tikv_test.go b/br/pkg/lightning/tikv/tikv_test.go index a4b192595b4b4..ff6c6eb5677a8 100644 --- a/br/pkg/lightning/tikv/tikv_test.go +++ b/br/pkg/lightning/tikv/tikv_test.go @@ -29,8 +29,8 @@ import ( "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/tidb/br/pkg/lightning/common" kv "github.com/pingcap/tidb/br/pkg/lightning/tikv" - "github.com/pingcap/tidb/pkg/util/pdapi" "github.com/stretchr/testify/require" + pd "github.com/tikv/pd/client/http" ) var ( @@ -176,7 +176,7 @@ func TestCheckPDVersion(t *testing.T) { ctx := context.Background() mockServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - require.Equal(t, pdapi.Version, req.URL.Path) + require.Equal(t, pd.Version, req.URL.Path) w.WriteHeader(http.StatusOK) _, err := w.Write([]byte(version)) require.NoError(t, err) @@ -230,7 +230,7 @@ func TestCheckTiKVVersion(t *testing.T) { ctx := context.Background() mockServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - require.Equal(t, pdapi.Stores, req.URL.Path) + require.Equal(t, pd.Stores, req.URL.Path) w.WriteHeader(http.StatusOK) stores := make([]map[string]interface{}, 0, len(versions)) diff --git a/br/pkg/pdutil/BUILD.bazel b/br/pkg/pdutil/BUILD.bazel index 24ce8ea2e809f..b9b42e7d534b0 100644 --- a/br/pkg/pdutil/BUILD.bazel +++ b/br/pkg/pdutil/BUILD.bazel @@ -15,7 +15,6 @@ go_library( "//pkg/store/pdtypes", "//pkg/tablecodec", "//pkg/util/codec", - "//pkg/util/pdapi", "@com_github_coreos_go_semver//semver", "@com_github_docker_go_units//:go-units", "@com_github_google_uuid//:uuid", @@ -24,6 +23,7 @@ go_library( "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_log//:log", "@com_github_tikv_pd_client//:client", + "@com_github_tikv_pd_client//http", "@org_golang_google_grpc//:grpc", "@org_uber_go_zap//:zap", ], @@ -43,11 +43,11 @@ go_test( "//pkg/store/pdtypes", "//pkg/testkit/testsetup", "//pkg/util/codec", - "//pkg/util/pdapi", "@com_github_coreos_go_semver//semver", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_stretchr_testify//require", + "@com_github_tikv_pd_client//http", "@org_uber_go_goleak//:goleak", ], ) diff --git a/br/pkg/pdutil/pd.go b/br/pkg/pdutil/pd.go index 8f88fa73249b0..779d3044de11d 100644 --- a/br/pkg/pdutil/pd.go +++ b/br/pkg/pdutil/pd.go @@ -31,8 +31,8 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/pkg/store/pdtypes" "github.com/pingcap/tidb/pkg/util/codec" - "github.com/pingcap/tidb/pkg/util/pdapi" pd "github.com/tikv/pd/client" + pdhttp "github.com/tikv/pd/client/http" "go.uber.org/zap" "google.golang.org/grpc" ) @@ -265,7 +265,7 @@ func NewPdController( } } processedAddrs = append(processedAddrs, addr) - versionBytes, failure = pdRequest(ctx, addr, pdapi.ClusterVersion, cli, http.MethodGet, nil) + versionBytes, failure = pdRequest(ctx, addr, pdhttp.ClusterVersion, cli, http.MethodGet, nil) if failure == nil { break } @@ -362,7 +362,7 @@ func (p *PdController) GetClusterVersion(ctx context.Context) (string, error) { func (p *PdController) getClusterVersionWith(ctx context.Context, get pdHTTPRequest) (string, error) { var err error for _, addr := range p.getAllPDAddrs() { - v, e := get(ctx, addr, pdapi.ClusterVersion, p.cli, http.MethodGet, nil) + v, e := get(ctx, addr, pdhttp.ClusterVersion, p.cli, http.MethodGet, nil) if e != nil { err = e continue @@ -382,14 +382,14 @@ func (p *PdController) getRegionCountWith( ctx context.Context, get pdHTTPRequest, startKey, endKey []byte, ) (int, error) { // TiKV reports region start/end keys to PD in memcomparable-format. - var start, end string - start = url.QueryEscape(string(codec.EncodeBytes(nil, startKey))) + var start, end []byte + start = codec.EncodeBytes(nil, startKey) if len(endKey) != 0 { // Empty end key means the max. - end = url.QueryEscape(string(codec.EncodeBytes(nil, endKey))) + end = codec.EncodeBytes(nil, endKey) } var err error for _, addr := range p.getAllPDAddrs() { - v, e := get(ctx, addr, pdapi.RegionStatsByStartEndKey(start, end), p.cli, http.MethodGet, nil) + v, e := get(ctx, addr, pdhttp.RegionStatsByKeyRange(start, end), p.cli, http.MethodGet, nil) if e != nil { err = e continue @@ -413,7 +413,7 @@ func (p *PdController) getStoreInfoWith( ctx context.Context, get pdHTTPRequest, storeID uint64) (*pdtypes.StoreInfo, error) { var err error for _, addr := range p.getAllPDAddrs() { - v, e := get(ctx, addr, pdapi.StoreByID(storeID), p.cli, http.MethodGet, nil) + v, e := get(ctx, addr, pdhttp.StoreByID(storeID), p.cli, http.MethodGet, nil) if e != nil { err = e continue @@ -439,7 +439,7 @@ func (p *PdController) doPauseSchedulers(ctx context.Context, removedSchedulers := make([]string, 0, len(schedulers)) for _, scheduler := range schedulers { for _, addr := range p.getAllPDAddrs() { - _, err = post(ctx, addr, pdapi.SchedulerByName(scheduler), p.cli, http.MethodPost, body) + _, err = post(ctx, addr, pdhttp.SchedulerByName(scheduler), p.cli, http.MethodPost, body) if err == nil { removedSchedulers = append(removedSchedulers, scheduler) break @@ -521,7 +521,7 @@ func (p *PdController) resumeSchedulerWith(ctx context.Context, schedulers []str } for _, scheduler := range schedulers { for _, addr := range p.getAllPDAddrs() { - _, err = post(ctx, addr, pdapi.SchedulerByName(scheduler), p.cli, http.MethodPost, body) + _, err = post(ctx, addr, pdhttp.SchedulerByName(scheduler), p.cli, http.MethodPost, body) if err == nil { break } @@ -545,7 +545,7 @@ func (p *PdController) ListSchedulers(ctx context.Context) ([]string, error) { func (p *PdController) listSchedulersWith(ctx context.Context, get pdHTTPRequest) ([]string, error) { var err error for _, addr := range p.getAllPDAddrs() { - v, e := get(ctx, addr, pdapi.Schedulers, p.cli, http.MethodGet, nil) + v, e := get(ctx, addr, pdhttp.Schedulers, p.cli, http.MethodGet, nil) if e != nil { err = e continue @@ -568,7 +568,7 @@ func (p *PdController) GetPDScheduleConfig( var err error for _, addr := range p.getAllPDAddrs() { v, e := pdRequest( - ctx, addr, pdapi.ScheduleConfig, p.cli, http.MethodGet, nil) + ctx, addr, pdhttp.ScheduleConfig, p.cli, http.MethodGet, nil) if e != nil { err = e continue @@ -592,7 +592,7 @@ func (p *PdController) UpdatePDScheduleConfig(ctx context.Context) error { func (p *PdController) doUpdatePDScheduleConfig( ctx context.Context, cfg map[string]interface{}, post pdHTTPRequest, prefixs ...string, ) error { - prefix := pdapi.Config + prefix := pdhttp.Config if len(prefixs) != 0 { prefix = prefixs[0] } @@ -621,7 +621,7 @@ func (p *PdController) doUpdatePDScheduleConfig( func (p *PdController) doPauseConfigs(ctx context.Context, cfg map[string]interface{}, post pdHTTPRequest) error { // pause this scheduler with 300 seconds - return p.doUpdatePDScheduleConfig(ctx, cfg, post, pdapi.ConfigWithTTLSeconds(pauseTimeout.Seconds())) + return p.doUpdatePDScheduleConfig(ctx, cfg, post, pdhttp.ConfigWithTTLSeconds(pauseTimeout.Seconds())) } func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg ClusterConfig, @@ -643,7 +643,7 @@ func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg Cluster prefix := make([]string, 0, 1) if pd.isPauseConfigEnabled() { // set config's ttl to zero, make temporary config invalid immediately. - prefix = append(prefix, pdapi.ConfigWithTTLSeconds(0)) + prefix = append(prefix, pdhttp.ConfigWithTTLSeconds(0)) } // reset config with previous value. if err := pd.doUpdatePDScheduleConfig(ctx, mergeCfg, pdRequest, prefix...); err != nil { @@ -832,7 +832,7 @@ func (p *PdController) doRemoveSchedulersWith( func (p *PdController) GetMinResolvedTS(ctx context.Context) (uint64, error) { var err error for _, addr := range p.getAllPDAddrs() { - v, e := pdRequest(ctx, addr, pdapi.MinResolvedTS, p.cli, http.MethodGet, nil) + v, e := pdRequest(ctx, addr, pdhttp.MinResolvedTSPrefix, p.cli, http.MethodGet, nil) if e != nil { log.Warn("failed to get min resolved ts", zap.String("addr", addr), zap.Error(e)) err = e @@ -866,7 +866,7 @@ func (p *PdController) RecoverBaseAllocID(ctx context.Context, id uint64) error }) var err error for _, addr := range p.getAllPDAddrs() { - _, e := pdRequest(ctx, addr, pdapi.BaseAllocID, p.cli, http.MethodPost, reqData) + _, e := pdRequest(ctx, addr, pdhttp.BaseAllocID, p.cli, http.MethodPost, reqData) if e != nil { log.Warn("failed to recover base alloc id", zap.String("addr", addr), zap.Error(e)) err = e @@ -890,7 +890,7 @@ func (p *PdController) ResetTS(ctx context.Context, ts uint64) error { }) var err error for _, addr := range p.getAllPDAddrs() { - code, _, e := pdRequestWithCode(ctx, addr, pdapi.ResetTS, p.cli, http.MethodPost, reqData) + code, _, e := pdRequestWithCode(ctx, addr, pdhttp.ResetTS, p.cli, http.MethodPost, reqData) if e != nil { // for pd version <= 6.2, if the given ts < current ts of pd, pd returns StatusForbidden. // it's not an error for br @@ -920,7 +920,7 @@ func (p *PdController) UnmarkRecovering(ctx context.Context) error { func (p *PdController) operateRecoveringMark(ctx context.Context, method string) error { var err error for _, addr := range p.getAllPDAddrs() { - _, e := pdRequest(ctx, addr, pdapi.SnapshotRecoveringMark, p.cli, method, nil) + _, e := pdRequest(ctx, addr, pdhttp.SnapshotRecoveringMark, p.cli, method, nil) if e != nil { log.Warn("failed to operate recovering mark", zap.String("method", method), zap.String("addr", addr), zap.Error(e)) @@ -966,7 +966,7 @@ func (p *PdController) CreateOrUpdateRegionLabelRule(ctx context.Context, rule L var lastErr error addrs := p.getAllPDAddrs() for i, addr := range addrs { - _, lastErr = pdRequest(ctx, addr, pdapi.RegionLabelRule, + _, lastErr = pdRequest(ctx, addr, pdhttp.RegionLabelRule, p.cli, http.MethodPost, reqData) if lastErr == nil { return nil @@ -988,7 +988,7 @@ func (p *PdController) DeleteRegionLabelRule(ctx context.Context, ruleID string) var lastErr error addrs := p.getAllPDAddrs() for i, addr := range addrs { - _, lastErr = pdRequest(ctx, addr, fmt.Sprintf("%s/%s", pdapi.RegionLabelRule, ruleID), + _, lastErr = pdRequest(ctx, addr, fmt.Sprintf("%s/%s", pdhttp.RegionLabelRule, ruleID), p.cli, http.MethodDelete, nil) if lastErr == nil { return nil @@ -1092,7 +1092,7 @@ func FetchPDVersion(ctx context.Context, tls *common.TLS, pdAddr string) (*semve var rawVersion struct { Version string `json:"version"` } - err := tls.WithHost(pdAddr).GetJSON(ctx, pdapi.Version, &rawVersion) + err := tls.WithHost(pdAddr).GetJSON(ctx, pdhttp.Version, &rawVersion) if err != nil { return nil, errors.Trace(err) } diff --git a/br/pkg/pdutil/pd_serial_test.go b/br/pkg/pdutil/pd_serial_test.go index 5dc91f7192a46..f76f61f6dddb9 100644 --- a/br/pkg/pdutil/pd_serial_test.go +++ b/br/pkg/pdutil/pd_serial_test.go @@ -22,8 +22,8 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/pkg/store/pdtypes" "github.com/pingcap/tidb/pkg/util/codec" - "github.com/pingcap/tidb/pkg/util/pdapi" "github.com/stretchr/testify/require" + pd "github.com/tikv/pd/client/http" ) func TestScheduler(t *testing.T) { @@ -273,7 +273,7 @@ func TestStoreInfo(t *testing.T) { _ context.Context, addr string, prefix string, _ *http.Client, _ string, _ []byte, ) ([]byte, error) { require.Equal(t, - fmt.Sprintf("http://mock%s", pdapi.StoreByID(1)), + fmt.Sprintf("http://mock%s", pd.StoreByID(1)), fmt.Sprintf("%s%s", addr, prefix)) ret, err := json.Marshal(storeInfo) require.NoError(t, err) @@ -307,7 +307,7 @@ func TestPauseSchedulersByKeyRange(t *testing.T) { return } if r.Method == http.MethodDelete { - ruleID := strings.TrimPrefix(r.URL.Path, pdapi.RegionLabelRule+"/") + ruleID := strings.TrimPrefix(r.URL.Path, pd.RegionLabelRule+"/") delete(labelExpires, ruleID) deleted = true return diff --git a/br/pkg/pdutil/utils.go b/br/pkg/pdutil/utils.go index 548a155b3362a..2b1e0e3fed0c9 100644 --- a/br/pkg/pdutil/utils.go +++ b/br/pkg/pdutil/utils.go @@ -17,7 +17,7 @@ import ( "github.com/pingcap/tidb/pkg/store/pdtypes" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/util/codec" - "github.com/pingcap/tidb/pkg/util/pdapi" + pd "github.com/tikv/pd/client/http" ) // UndoFunc is a 'undo' operation of some undoable command. @@ -34,7 +34,7 @@ func GetPlacementRules(ctx context.Context, pdAddr string, tlsConf *tls.Config) if tlsConf != nil { prefix = "https://" } - reqURL := fmt.Sprintf("%s%s%s", prefix, pdAddr, pdapi.PlacementRules) + reqURL := fmt.Sprintf("%s%s%s", prefix, pdAddr, pd.PlacementRules) req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil) if err != nil { return nil, errors.Trace(err) diff --git a/br/pkg/restore/split/BUILD.bazel b/br/pkg/restore/split/BUILD.bazel index 3a24fca4275af..b8a595f5239fc 100644 --- a/br/pkg/restore/split/BUILD.bazel +++ b/br/pkg/restore/split/BUILD.bazel @@ -20,7 +20,6 @@ go_library( "//br/pkg/utils", "//pkg/kv", "//pkg/store/pdtypes", - "//pkg/util/pdapi", "@com_github_google_btree//:btree", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", @@ -31,6 +30,7 @@ go_library( "@com_github_pingcap_kvproto//pkg/tikvpb", "@com_github_pingcap_log//:log", "@com_github_tikv_pd_client//:client", + "@com_github_tikv_pd_client//http", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//credentials", diff --git a/br/pkg/restore/split/client.go b/br/pkg/restore/split/client.go index 99dfe88bd39fd..c558e410ea632 100644 --- a/br/pkg/restore/split/client.go +++ b/br/pkg/restore/split/client.go @@ -29,8 +29,8 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/pkg/store/pdtypes" - "github.com/pingcap/tidb/pkg/util/pdapi" pd "github.com/tikv/pd/client" + pdhttp "github.com/tikv/pd/client/http" "go.uber.org/multierr" "go.uber.org/zap" "google.golang.org/grpc" @@ -463,7 +463,7 @@ func (c *pdClient) getStoreCount(ctx context.Context) (int, error) { func (c *pdClient) getMaxReplica(ctx context.Context) (int, error) { api := c.getPDAPIAddr() - req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%s%s", api, pdapi.ReplicateConfig), nil) + req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%s%s", api, pdhttp.ReplicateConfig), nil) if err != nil { return 0, errors.Trace(err) } @@ -540,7 +540,7 @@ func (c *pdClient) GetPlacementRule(ctx context.Context, groupID, ruleID string) return rule, errors.Annotate(berrors.ErrRestoreSplitFailed, "failed to add stores labels: no leader") } req, err := http.NewRequestWithContext(ctx, "GET", - addr+path.Join(pdapi.PlacementRule, groupID, ruleID), nil) + addr+path.Join(pdhttp.PlacementRule, groupID, ruleID), nil) if err != nil { return rule, errors.Trace(err) } @@ -571,7 +571,7 @@ func (c *pdClient) SetPlacementRule(ctx context.Context, rule pdtypes.Rule) erro } m, _ := json.Marshal(rule) req, err := http.NewRequestWithContext(ctx, "POST", - addr+path.Join(pdapi.PlacementRule), bytes.NewReader(m)) + addr+path.Join(pdhttp.PlacementRule), bytes.NewReader(m)) if err != nil { return errors.Trace(err) } @@ -587,7 +587,7 @@ func (c *pdClient) DeletePlacementRule(ctx context.Context, groupID, ruleID stri if addr == "" { return errors.Annotate(berrors.ErrPDLeaderNotFound, "failed to add stores labels") } - req, err := http.NewRequestWithContext(ctx, "DELETE", addr+path.Join(pdapi.PlacementRule, groupID, ruleID), nil) + req, err := http.NewRequestWithContext(ctx, "DELETE", addr+path.Join(pdhttp.PlacementRule, groupID, ruleID), nil) if err != nil { return errors.Trace(err) } @@ -610,7 +610,7 @@ func (c *pdClient) SetStoresLabel( for _, id := range stores { req, err := http.NewRequestWithContext( ctx, "POST", - addr+pdapi.StoreLabelByID(id), + addr+pdhttp.StoreLabelByID(id), bytes.NewReader(b), ) if err != nil { diff --git a/go.mod b/go.mod index b33af90197511..7f070aa84ee51 100644 --- a/go.mod +++ b/go.mod @@ -102,8 +102,8 @@ require ( github.com/stretchr/testify v1.8.4 github.com/tdakkota/asciicheck v0.2.0 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.8-0.20231114060955-8fc8a528217e - github.com/tikv/pd/client v0.0.0-20231114041114-86831ce71865 + github.com/tikv/client-go/v2 v2.0.8-0.20231116051730-1c2351c28173 + github.com/tikv/pd/client v0.0.0-20231117041718-dda748abe55d github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 github.com/twmb/murmur3 v1.1.6 github.com/uber/jaeger-client-go v2.22.1+incompatible diff --git a/go.sum b/go.sum index 2c365b3838e2e..7e3565060643b 100644 --- a/go.sum +++ b/go.sum @@ -991,10 +991,10 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= -github.com/tikv/client-go/v2 v2.0.8-0.20231114060955-8fc8a528217e h1:kl8+gDOfPfRqkc1VDhhjhezMvsbfRENYsm/FqSIDnwg= -github.com/tikv/client-go/v2 v2.0.8-0.20231114060955-8fc8a528217e/go.mod h1:fEAE7GS/lta+OasPOacdgy6RlJIRaq9/Cyr2WbSYcBE= -github.com/tikv/pd/client v0.0.0-20231114041114-86831ce71865 h1:Gkvo77EevOpBGIdV1c8gwRqPhVbgLPRy82tXNEFpGTc= -github.com/tikv/pd/client v0.0.0-20231114041114-86831ce71865/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ= +github.com/tikv/client-go/v2 v2.0.8-0.20231116051730-1c2351c28173 h1:lmJzX0kqrV7kO21wrZPbtjkidzwbDCfXeQrhDWEi5dE= +github.com/tikv/client-go/v2 v2.0.8-0.20231116051730-1c2351c28173/go.mod h1:BOGTSZtbMHEnGC4HOpbONdnTQF+E9nb2Io7c3P9sb7g= +github.com/tikv/pd/client v0.0.0-20231117041718-dda748abe55d h1:6isljjnUH8zzkJx2X8MUGh+5AlMv+pCEhCy5MSyuhSM= +github.com/tikv/pd/client v0.0.0-20231117041718-dda748abe55d/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ= github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 h1:quvGphlmUVU+nhpFa4gg4yJyTRJ13reZMDHrKwYw53M= github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966/go.mod h1:27bSVNWSBOHm+qRp1T9qzaIpsWEP6TbUnei/43HK+PQ= github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= diff --git a/pkg/domain/infosync/BUILD.bazel b/pkg/domain/infosync/BUILD.bazel index a6d571336bfc5..2680731b34312 100644 --- a/pkg/domain/infosync/BUILD.bazel +++ b/pkg/domain/infosync/BUILD.bazel @@ -36,7 +36,6 @@ go_library( "//pkg/util/dbterror", "//pkg/util/hack", "//pkg/util/logutil", - "//pkg/util/pdapi", "//pkg/util/syncutil", "//pkg/util/versioninfo", "@com_github_golang_protobuf//proto", diff --git a/pkg/domain/infosync/info.go b/pkg/domain/infosync/info.go index 79631640843ba..1278fd2469da7 100644 --- a/pkg/domain/infosync/info.go +++ b/pkg/domain/infosync/info.go @@ -50,7 +50,6 @@ import ( "github.com/pingcap/tidb/pkg/util/dbterror" "github.com/pingcap/tidb/pkg/util/hack" "github.com/pingcap/tidb/pkg/util/logutil" - "github.com/pingcap/tidb/pkg/util/pdapi" "github.com/pingcap/tidb/pkg/util/versioninfo" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" @@ -918,8 +917,8 @@ func (is *InfoSyncer) getPrometheusAddr() (string, error) { if !clientAvailable || len(pdAddrs) == 0 { return "", errors.Errorf("pd unavailable") } - // Get prometheus address from pdApi. - url := util2.ComposeURL(pdAddrs[0], pdapi.Config) + // Get prometheus address from pdhttp. + url := util2.ComposeURL(pdAddrs[0], pdhttp.Config) resp, err := util2.InternalHTTPClient().Get(url) if err != nil { return "", err diff --git a/pkg/domain/infosync/label_manager.go b/pkg/domain/infosync/label_manager.go index 30560346244e3..a6b95ce964991 100644 --- a/pkg/domain/infosync/label_manager.go +++ b/pkg/domain/infosync/label_manager.go @@ -22,7 +22,7 @@ import ( "sync" "github.com/pingcap/tidb/pkg/ddl/label" - "github.com/pingcap/tidb/pkg/util/pdapi" + pd "github.com/tikv/pd/client/http" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -45,7 +45,7 @@ func (lm *PDLabelManager) PutLabelRule(ctx context.Context, rule *label.Rule) er if err != nil { return err } - _, err = doRequest(ctx, "PutLabelRule", lm.etcdCli.Endpoints(), path.Join(pdapi.Config, "region-label", "rule"), "POST", bytes.NewReader(r)) + _, err = doRequest(ctx, "PutLabelRule", lm.etcdCli.Endpoints(), path.Join(pd.Config, "region-label", "rule"), "POST", bytes.NewReader(r)) return err } @@ -56,14 +56,14 @@ func (lm *PDLabelManager) UpdateLabelRules(ctx context.Context, patch *label.Rul return err } - _, err = doRequest(ctx, "UpdateLabelRules", lm.etcdCli.Endpoints(), path.Join(pdapi.Config, "region-label", "rules"), "PATCH", bytes.NewReader(r)) + _, err = doRequest(ctx, "UpdateLabelRules", lm.etcdCli.Endpoints(), path.Join(pd.Config, "region-label", "rules"), "PATCH", bytes.NewReader(r)) return err } // GetAllLabelRules implements GetAllLabelRules func (lm *PDLabelManager) GetAllLabelRules(ctx context.Context) ([]*label.Rule, error) { var rules []*label.Rule - res, err := doRequest(ctx, "GetAllLabelRules", lm.etcdCli.Endpoints(), path.Join(pdapi.Config, "region-label", "rules"), "GET", nil) + res, err := doRequest(ctx, "GetAllLabelRules", lm.etcdCli.Endpoints(), path.Join(pd.Config, "region-label", "rules"), "GET", nil) if err == nil && res != nil { err = json.Unmarshal(res, &rules) @@ -79,7 +79,7 @@ func (lm *PDLabelManager) GetLabelRules(ctx context.Context, ruleIDs []string) ( } rules := []*label.Rule{} - res, err := doRequest(ctx, "GetLabelRules", lm.etcdCli.Endpoints(), path.Join(pdapi.Config, "region-label", "rules", "ids"), "GET", bytes.NewReader(ids)) + res, err := doRequest(ctx, "GetLabelRules", lm.etcdCli.Endpoints(), path.Join(pd.Config, "region-label", "rules", "ids"), "GET", bytes.NewReader(ids)) if err == nil && res != nil { err = json.Unmarshal(res, &rules) diff --git a/pkg/domain/infosync/placement_manager.go b/pkg/domain/infosync/placement_manager.go index 3368869a4e38d..2d1597bf8f0e3 100644 --- a/pkg/domain/infosync/placement_manager.go +++ b/pkg/domain/infosync/placement_manager.go @@ -18,12 +18,13 @@ import ( "bytes" "context" "encoding/json" + "net/http" "path" "sync" "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/ddl/placement" - "github.com/pingcap/tidb/pkg/util/pdapi" + pd "github.com/tikv/pd/client/http" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -46,7 +47,7 @@ type PDPlacementManager struct { // GetRuleBundle is used to get one specific rule bundle from PD. func (m *PDPlacementManager) GetRuleBundle(ctx context.Context, name string) (*placement.Bundle, error) { bundle := &placement.Bundle{ID: name} - res, err := doRequest(ctx, "GetPlacementRule", m.etcdCli.Endpoints(), path.Join(pdapi.Config, "placement-rule", name), "GET", nil) + res, err := doRequest(ctx, "GetPlacementRule", m.etcdCli.Endpoints(), path.Join(pd.Config, "placement-rule", name), http.MethodGet, nil) if err == nil && res != nil { err = json.Unmarshal(res, bundle) } @@ -56,7 +57,7 @@ func (m *PDPlacementManager) GetRuleBundle(ctx context.Context, name string) (*p // GetAllRuleBundles is used to get all rule bundles from PD. It is used to load full rules from PD while fullload infoschema. func (m *PDPlacementManager) GetAllRuleBundles(ctx context.Context) ([]*placement.Bundle, error) { var bundles []*placement.Bundle - res, err := doRequest(ctx, "GetAllPlacementRules", m.etcdCli.Endpoints(), path.Join(pdapi.Config, "placement-rule"), "GET", nil) + res, err := doRequest(ctx, "GetAllPlacementRules", m.etcdCli.Endpoints(), path.Join(pd.Config, "placement-rule"), http.MethodGet, nil) if err == nil && res != nil { err = json.Unmarshal(res, &bundles) } @@ -75,7 +76,7 @@ func (m *PDPlacementManager) PutRuleBundles(ctx context.Context, bundles []*plac } log.Debug("Put placement rule bundles", zap.String("rules", string(b))) - _, err = doRequest(ctx, "PutPlacementRules", m.etcdCli.Endpoints(), path.Join(pdapi.Config, "placement-rule")+"?partial=true", "POST", bytes.NewReader(b)) + _, err = doRequest(ctx, "PutPlacementRules", m.etcdCli.Endpoints(), path.Join(pd.Config, "placement-rule")+"?partial=true", http.MethodPost, bytes.NewReader(b)) return err } diff --git a/pkg/domain/infosync/region.go b/pkg/domain/infosync/region.go index 76134557a8595..17208ecb6e153 100644 --- a/pkg/domain/infosync/region.go +++ b/pkg/domain/infosync/region.go @@ -21,7 +21,7 @@ import ( "fmt" "github.com/pingcap/errors" - "github.com/pingcap/tidb/pkg/util/pdapi" + pd "github.com/tikv/pd/client/http" ) // PlacementScheduleState is the returned third-valued state from GetReplicationState(). For convenience, the string of PD is deserialized into an enum first. @@ -66,7 +66,7 @@ func GetReplicationState(ctx context.Context, startKey []byte, endKey []byte) (P return PlacementScheduleStatePending, errors.Errorf("pd unavailable") } - res, err := doRequest(ctx, "GetReplicationState", addrs, fmt.Sprintf("%s/replicated?startKey=%s&endKey=%s", pdapi.Regions, hex.EncodeToString(startKey), hex.EncodeToString(endKey)), "GET", nil) + res, err := doRequest(ctx, "GetReplicationState", addrs, fmt.Sprintf("%s/replicated?startKey=%s&endKey=%s", pd.Regions, hex.EncodeToString(startKey), hex.EncodeToString(endKey)), "GET", nil) if err == nil && res != nil { st := PlacementScheduleStatePending // it should not fail diff --git a/pkg/domain/infosync/schedule_manager.go b/pkg/domain/infosync/schedule_manager.go index 7b71a8423edf7..1578007deb4d7 100644 --- a/pkg/domain/infosync/schedule_manager.go +++ b/pkg/domain/infosync/schedule_manager.go @@ -22,7 +22,7 @@ import ( "sync" "github.com/pingcap/errors" - "github.com/pingcap/tidb/pkg/util/pdapi" + pd "github.com/tikv/pd/client/http" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -39,7 +39,7 @@ type PDScheduleManager struct { // GetPDScheduleConfig get schedule config from pd func (sm *PDScheduleManager) GetPDScheduleConfig(ctx context.Context) (map[string]interface{}, error) { - ret, err := doRequest(ctx, "GetPDSchedule", sm.etcdCli.Endpoints(), path.Join(pdapi.Config, "schedule"), "GET", nil) + ret, err := doRequest(ctx, "GetPDSchedule", sm.etcdCli.Endpoints(), path.Join(pd.Config, "schedule"), "GET", nil) if err != nil { return nil, errors.Trace(err) } @@ -59,7 +59,7 @@ func (sm *PDScheduleManager) SetPDScheduleConfig(ctx context.Context, config map return err } - _, err = doRequest(ctx, "SetPDSchedule", sm.etcdCli.Endpoints(), path.Join(pdapi.Config, "schedule"), "POST", bytes.NewReader(configJSON)) + _, err = doRequest(ctx, "SetPDSchedule", sm.etcdCli.Endpoints(), path.Join(pd.Config, "schedule"), "POST", bytes.NewReader(configJSON)) if err != nil { return errors.Trace(err) } diff --git a/pkg/domain/infosync/tiflash_manager.go b/pkg/domain/infosync/tiflash_manager.go index 3b892c5378904..f1d93c024e39e 100644 --- a/pkg/domain/infosync/tiflash_manager.go +++ b/pkg/domain/infosync/tiflash_manager.go @@ -22,7 +22,6 @@ import ( "fmt" "net/http" "net/http/httptest" - "net/url" "path" "strconv" "strings" @@ -38,7 +37,6 @@ import ( "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/util/codec" "github.com/pingcap/tidb/pkg/util/logutil" - "github.com/pingcap/tidb/pkg/util/pdapi" "github.com/pingcap/tidb/pkg/util/syncutil" "github.com/tikv/client-go/v2/tikv" pd "github.com/tikv/pd/client/http" @@ -209,7 +207,7 @@ func (m *TiFlashReplicaManagerCtx) SetTiFlashGroupConfig(ctx context.Context) er res, err := doRequest(ctx, "GetRuleGroupConfig", m.etcdCli.Endpoints(), - path.Join(pdapi.Config, "rule_group", placement.TiFlashRuleGroupID), + path.Join(pd.Config, "rule_group", placement.TiFlashRuleGroupID), "GET", nil, ) @@ -243,7 +241,7 @@ func (m *TiFlashReplicaManagerCtx) SetTiFlashGroupConfig(ctx context.Context) er _, err = doRequest(ctx, "SetRuleGroupConfig", m.etcdCli.Endpoints(), - path.Join(pdapi.Config, "rule_group"), + path.Join(pd.Config, "rule_group"), "POST", bytes.NewBuffer(body), ) @@ -275,7 +273,7 @@ func (m *TiFlashReplicaManagerCtx) doSetPlacementRule(ctx context.Context, rule return errors.Trace(err) } buf := bytes.NewBuffer(j) - res, err := doRequest(ctx, "SetPlacementRule", m.etcdCli.Endpoints(), path.Join(pdapi.Config, "rule"), "POST", buf) + res, err := doRequest(ctx, "SetPlacementRule", m.etcdCli.Endpoints(), path.Join(pd.Config, "rule"), "POST", buf) if err != nil { return errors.Trace(err) } @@ -415,7 +413,7 @@ func (m *TiFlashReplicaManagerCtx) doSetPlacementRuleBatch(ctx context.Context, return errors.Trace(err) } buf := bytes.NewBuffer(j) - res, err := doRequest(ctx, "SetPlacementRuleBatch", m.etcdCli.Endpoints(), path.Join(pdapi.Config, "rules", "batch"), "POST", buf) + res, err := doRequest(ctx, "SetPlacementRuleBatch", m.etcdCli.Endpoints(), path.Join(pd.Config, "rules", "batch"), "POST", buf) if err != nil { return errors.Trace(err) } @@ -432,7 +430,7 @@ func (m *TiFlashReplicaManagerCtx) DeletePlacementRule(ctx context.Context, grou } func (m *TiFlashReplicaManagerCtx) doDeletePlacementRule(ctx context.Context, group string, ruleID string) error { - res, err := doRequest(ctx, "DeletePlacementRule", m.etcdCli.Endpoints(), path.Join(pdapi.Config, "rule", group, ruleID), "DELETE", nil) + res, err := doRequest(ctx, "DeletePlacementRule", m.etcdCli.Endpoints(), path.Join(pd.Config, "rule", group, ruleID), "DELETE", nil) if err != nil { return errors.Trace(err) } @@ -444,7 +442,7 @@ func (m *TiFlashReplicaManagerCtx) doDeletePlacementRule(ctx context.Context, gr // GetGroupRules to get all placement rule in a certain group. func (m *TiFlashReplicaManagerCtx) GetGroupRules(ctx context.Context, group string) ([]placement.TiFlashRule, error) { - res, err := doRequest(ctx, "GetGroupRules", m.etcdCli.Endpoints(), path.Join(pdapi.Config, "rules", "group", group), "GET", nil) + res, err := doRequest(ctx, "GetGroupRules", m.etcdCli.Endpoints(), path.Join(pd.Config, "rules", "group", group), "GET", nil) if err != nil { return nil, errors.Trace(err) } @@ -481,7 +479,7 @@ func (m *TiFlashReplicaManagerCtx) PostAccelerateScheduleBatch(ctx context.Conte return errors.Trace(err) } buf := bytes.NewBuffer(j) - res, err := doRequest(ctx, "PostAccelerateScheduleBatch", m.etcdCli.Endpoints(), path.Join(pdapi.Regions, "accelerate-schedule", "batch"), "POST", buf) + res, err := doRequest(ctx, "PostAccelerateScheduleBatch", m.etcdCli.Endpoints(), path.Join(pd.Regions, "accelerate-schedule", "batch"), http.MethodPost, buf) if err != nil { return errors.Trace(err) } @@ -497,11 +495,7 @@ func (m *TiFlashReplicaManagerCtx) GetRegionCountFromPD(ctx context.Context, tab endKey := tablecodec.EncodeTablePrefix(tableID + 1) startKey, endKey = m.codec.EncodeRegionRange(startKey, endKey) - p := fmt.Sprintf("%s&count", - pdapi.RegionStatsByStartEndKey( - url.QueryEscape(string(startKey)), - url.QueryEscape(string(endKey)), - )) + p := fmt.Sprintf("%s&count", pd.RegionStatsByKeyRange(startKey, endKey)) res, err := doRequest(ctx, "GetPDRegionStats", m.etcdCli.Endpoints(), p, "GET", nil) if err != nil { return errors.Trace(err) @@ -509,7 +503,7 @@ func (m *TiFlashReplicaManagerCtx) GetRegionCountFromPD(ctx context.Context, tab if res == nil { return fmt.Errorf("TiFlashReplicaManagerCtx returns error in GetRegionCountFromPD") } - var stats helper.PDRegionStats + var stats pd.RegionStats err = json.Unmarshal(res, &stats) if err != nil { return errors.Trace(err) @@ -521,7 +515,7 @@ func (m *TiFlashReplicaManagerCtx) GetRegionCountFromPD(ctx context.Context, tab // GetStoresStat gets the TiKV store information by accessing PD's api. func (m *TiFlashReplicaManagerCtx) GetStoresStat(ctx context.Context) (*pd.StoresInfo, error) { var storesStat pd.StoresInfo - res, err := doRequest(ctx, "GetStoresStat", m.etcdCli.Endpoints(), pdapi.Stores, "GET", nil) + res, err := doRequest(ctx, "GetStoresStat", m.etcdCli.Endpoints(), pd.Stores, "GET", nil) if err != nil { return nil, errors.Trace(err) } @@ -795,8 +789,8 @@ func (tiflash *MockTiFlash) HandlePostAccelerateSchedule(endKey string) error { // HandleGetPDRegionRecordStats is mock function for GetRegionCountFromPD. // It currently always returns 1 Region for convenience. -func (tiflash *MockTiFlash) HandleGetPDRegionRecordStats(_ int64) helper.PDRegionStats { - return helper.PDRegionStats{ +func (tiflash *MockTiFlash) HandleGetPDRegionRecordStats(_ int64) pd.RegionStats { + return pd.RegionStats{ Count: 1, } } diff --git a/pkg/executor/BUILD.bazel b/pkg/executor/BUILD.bazel index 2e1f6b0316e1b..76277ce2336e0 100644 --- a/pkg/executor/BUILD.bazel +++ b/pkg/executor/BUILD.bazel @@ -209,7 +209,6 @@ go_library( "//pkg/util/memory", "//pkg/util/mvmap", "//pkg/util/password-validation", - "//pkg/util/pdapi", "//pkg/util/plancodec", "//pkg/util/printer", "//pkg/util/ranger", @@ -440,7 +439,6 @@ go_test( "//pkg/util/memory", "//pkg/util/mock", "//pkg/util/paging", - "//pkg/util/pdapi", "//pkg/util/ranger", "//pkg/util/sem", "//pkg/util/set", diff --git a/pkg/executor/builder.go b/pkg/executor/builder.go index d2a9e98e9a392..51208d675dbf4 100644 --- a/pkg/executor/builder.go +++ b/pkg/executor/builder.go @@ -2796,7 +2796,7 @@ func (b *executorBuilder) getAdjustedSampleRate(task plannercore.AnalyzeColumnsT } func (b *executorBuilder) getApproximateTableCountFromStorage(tid int64, task plannercore.AnalyzeColumnsTask) (float64, bool) { - return pdhelper.GlobalPDHelper.GetApproximateTableCountFromStorage(b.ctx, tid, task.DBName, task.TableName, task.PartitionName) + return pdhelper.GlobalPDHelper.GetApproximateTableCountFromStorage(context.Background(), b.ctx, tid, task.DBName, task.TableName, task.PartitionName) } func (b *executorBuilder) buildAnalyzeColumnsPushdown( diff --git a/pkg/executor/hot_regions_history_table_test.go b/pkg/executor/hot_regions_history_table_test.go index ce6138458eda3..31b5d74a6645e 100644 --- a/pkg/executor/hot_regions_history_table_test.go +++ b/pkg/executor/hot_regions_history_table_test.go @@ -37,8 +37,8 @@ import ( "github.com/pingcap/tidb/pkg/store/helper" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/external" - "github.com/pingcap/tidb/pkg/util/pdapi" "github.com/stretchr/testify/require" + pd "github.com/tikv/pd/client/http" ) type mockStoreWithMultiPD struct { @@ -140,7 +140,7 @@ func (s *hotRegionsHistoryTableSuite) setUpMockPDHTTPServer() (*httptest.Server, server := httptest.NewServer(router) mockAddr := strings.TrimPrefix(server.URL, "http://") // mock PD API - router.Handle(pdapi.Status, fn.Wrap(func() (interface{}, error) { + router.Handle(pd.Status, fn.Wrap(func() (interface{}, error) { return struct { Version string `json:"version"` GitHash string `json:"git_hash"` @@ -152,7 +152,7 @@ func (s *hotRegionsHistoryTableSuite) setUpMockPDHTTPServer() (*httptest.Server, }, nil })) // mock history hot regions response - router.HandleFunc(pdapi.HotHistory, hisHotRegionsHandler) + router.HandleFunc(pd.HotHistory, hisHotRegionsHandler) return server, mockAddr } diff --git a/pkg/executor/infoschema_cluster_table_test.go b/pkg/executor/infoschema_cluster_table_test.go index b8187b8b38cfb..5ac6d7e147236 100644 --- a/pkg/executor/infoschema_cluster_table_test.go +++ b/pkg/executor/infoschema_cluster_table_test.go @@ -37,7 +37,6 @@ import ( "github.com/pingcap/tidb/pkg/store/mockstore" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/util" - "github.com/pingcap/tidb/pkg/util/pdapi" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/tikv" pd "github.com/tikv/pd/client/http" @@ -107,7 +106,7 @@ func (s *infosSchemaClusterTableSuite) setUpMockPDHTTPServer() (*httptest.Server srv := httptest.NewServer(router) // mock store stats stat mockAddr := strings.TrimPrefix(srv.URL, "http://") - router.Handle(pdapi.Stores, fn.Wrap(func() (*pd.StoresInfo, error) { + router.Handle(pd.Stores, fn.Wrap(func() (*pd.StoresInfo, error) { return &pd.StoresInfo{ Count: 1, Stores: []pd.StoreInfo{ @@ -127,7 +126,7 @@ func (s *infosSchemaClusterTableSuite) setUpMockPDHTTPServer() (*httptest.Server }, nil })) // mock regions - router.Handle(pdapi.Regions, fn.Wrap(func() (*pd.RegionsInfo, error) { + router.Handle(pd.Regions, fn.Wrap(func() (*pd.RegionsInfo, error) { return &pd.RegionsInfo{ Count: 1, Regions: []pd.RegionInfo{ @@ -148,7 +147,7 @@ func (s *infosSchemaClusterTableSuite) setUpMockPDHTTPServer() (*httptest.Server }, nil })) // mock PD API - router.Handle(pdapi.Status, fn.Wrap(func() (interface{}, error) { + router.Handle(pd.Status, fn.Wrap(func() (interface{}, error) { return struct { Version string `json:"version"` GitHash string `json:"git_hash"` @@ -178,12 +177,12 @@ func (s *infosSchemaClusterTableSuite) setUpMockPDHTTPServer() (*httptest.Server return configuration, nil } // PD config. - router.Handle(pdapi.Config, fn.Wrap(mockConfig)) + router.Handle(pd.Config, fn.Wrap(mockConfig)) // TiDB/TiKV config. router.Handle("/config", fn.Wrap(mockConfig)) // PD region. - router.Handle(pdapi.RegionStats, fn.Wrap(func() (*helper.PDRegionStats, error) { - return &helper.PDRegionStats{ + router.Handle(pd.StatsRegion, fn.Wrap(func() (*pd.RegionStats, error) { + return &pd.RegionStats{ Count: 1, EmptyCount: 1, StorageSize: 1, diff --git a/pkg/executor/infoschema_reader.go b/pkg/executor/infoschema_reader.go index ad01e17d00a8d..9fabb6cfbb8b5 100644 --- a/pkg/executor/infoschema_reader.go +++ b/pkg/executor/infoschema_reader.go @@ -1217,7 +1217,11 @@ func (e *memtableRetriever) dataForTiKVStoreStatus(ctx context.Context, sctx ses Store: tikvStore, RegionCache: tikvStore.GetRegionCache(), } - storesStat, err := tikvHelper.PDHTTPClient().GetStores(ctx) + pdCli, err := tikvHelper.TryGetPDHTTPClient() + if err != nil { + return err + } + storesStat, err := pdCli.GetStores(ctx) if err != nil { return err } @@ -1648,7 +1652,11 @@ func (e *memtableRetriever) setDataForTiKVRegionStatus(ctx context.Context, sctx } } if !requestByTableRange { - allRegionsInfo, err = tikvHelper.PDHTTPClient().GetRegions(ctx) + pdCli, err := tikvHelper.TryGetPDHTTPClient() + if err != nil { + return err + } + allRegionsInfo, err = pdCli.GetRegions(ctx) if err != nil { return err } @@ -1701,12 +1709,16 @@ func (e *memtableRetriever) getRegionsInfoForTable(ctx context.Context, h *helpe } func (*memtableRetriever) getRegionsInfoForSingleTable(ctx context.Context, helper *helper.Helper, tableID int64) (*pd.RegionsInfo, error) { + pdCli, err := helper.TryGetPDHTTPClient() + if err != nil { + return nil, err + } sk, ek := tablecodec.GetTableHandleKeyRange(tableID) - sRegion, err := helper.PDHTTPClient().GetRegionByKey(ctx, codec.EncodeBytes(nil, sk)) + sRegion, err := pdCli.GetRegionByKey(ctx, codec.EncodeBytes(nil, sk)) if err != nil { return nil, err } - eRegion, err := helper.PDHTTPClient().GetRegionByKey(ctx, codec.EncodeBytes(nil, ek)) + eRegion, err := pdCli.GetRegionByKey(ctx, codec.EncodeBytes(nil, ek)) if err != nil { return nil, err } @@ -1718,7 +1730,7 @@ func (*memtableRetriever) getRegionsInfoForSingleTable(ctx context.Context, help if err != nil { return nil, err } - return helper.PDHTTPClient().GetRegionsByKey(ctx, sk, ek, -1) + return pdCli.GetRegionsByKeyRange(ctx, sk, ek, -1) } func (e *memtableRetriever) setNewTiKVRegionStatusCol(region *pd.RegionInfo, table *helper.TableInfo) { @@ -1891,10 +1903,10 @@ type tableStorageStatsRetriever struct { initialTables []*initialTable curTable int helper *helper.Helper - stats helper.PDRegionStats + stats *pd.RegionStats } -func (e *tableStorageStatsRetriever) retrieve(_ context.Context, sctx sessionctx.Context) ([][]types.Datum, error) { +func (e *tableStorageStatsRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) { if e.retrieved { return nil, nil } @@ -1909,7 +1921,7 @@ func (e *tableStorageStatsRetriever) retrieve(_ context.Context, sctx sessionctx return nil, nil } - rows, err := e.setDataForTableStorageStats() + rows, err := e.setDataForTableStorageStats(ctx) if err != nil { return nil, err } @@ -1997,7 +2009,7 @@ func (e *tableStorageStatsRetriever) initialize(sctx sessionctx.Context) error { return nil } -func (e *tableStorageStatsRetriever) setDataForTableStorageStats() ([][]types.Datum, error) { +func (e *tableStorageStatsRetriever) setDataForTableStorageStats(ctx context.Context) ([][]types.Datum, error) { rows := make([][]types.Datum, 0, 1024) count := 0 for e.curTable < len(e.initialTables) && count < 1024 { @@ -2009,9 +2021,9 @@ func (e *tableStorageStatsRetriever) setDataForTableStorageStats() ([][]types.Da tblIDs = append(tblIDs, partDef.ID) } } - + var err error for _, tableID := range tblIDs { - err := e.helper.GetPDRegionStats(tableID, &e.stats, false) + e.stats, err = e.helper.GetPDRegionStats(ctx, tableID, false) if err != nil { return nil, err } @@ -2167,7 +2179,7 @@ func getRemainDurationForAnalyzeStatusHelper( } } if tid > 0 && totalCnt == 0 { - totalCnt, _ = pdhelper.GlobalPDHelper.GetApproximateTableCountFromStorage(sctx, tid, dbName, tableName, partitionName) + totalCnt, _ = pdhelper.GlobalPDHelper.GetApproximateTableCountFromStorage(ctx, sctx, tid, dbName, tableName, partitionName) } remainingDuration, percentage = calRemainInfoForAnalyzeStatus(ctx, int64(totalCnt), processedRows, duration) } diff --git a/pkg/executor/internal/pdhelper/pd.go b/pkg/executor/internal/pdhelper/pd.go index 2dfd04a9c558b..1a24e29016f3f 100644 --- a/pkg/executor/internal/pdhelper/pd.go +++ b/pkg/executor/internal/pdhelper/pd.go @@ -39,7 +39,7 @@ var globalPDHelperOnce sync.Once type PDHelper struct { cacheForApproximateTableCountFromStorage *ttlcache.Cache[string, float64] - getApproximateTableCountFromStorageFunc func(sctx sessionctx.Context, tid int64, dbName, tableName, partitionName string) (float64, bool) + getApproximateTableCountFromStorageFunc func(ctx context.Context, sctx sessionctx.Context, tid int64, dbName, tableName, partitionName string) (float64, bool) wg util.WaitGroupWrapper } @@ -72,24 +72,28 @@ func approximateTableCountKey(tid int64, dbName, tableName, partitionName string } // GetApproximateTableCountFromStorage gets the approximate count of the table. -func (p *PDHelper) GetApproximateTableCountFromStorage(sctx sessionctx.Context, tid int64, dbName, tableName, partitionName string) (float64, bool) { +func (p *PDHelper) GetApproximateTableCountFromStorage( + ctx context.Context, sctx sessionctx.Context, + tid int64, dbName, tableName, partitionName string, +) (float64, bool) { key := approximateTableCountKey(tid, dbName, tableName, partitionName) if item := p.cacheForApproximateTableCountFromStorage.Get(key); item != nil { return item.Value(), true } - result, hasPD := p.getApproximateTableCountFromStorageFunc(sctx, tid, dbName, tableName, partitionName) + result, hasPD := p.getApproximateTableCountFromStorageFunc(ctx, sctx, tid, dbName, tableName, partitionName) p.cacheForApproximateTableCountFromStorage.Set(key, result, ttlcache.DefaultTTL) return result, hasPD } -func getApproximateTableCountFromStorage(sctx sessionctx.Context, tid int64, dbName, tableName, partitionName string) (float64, bool) { +func getApproximateTableCountFromStorage( + ctx context.Context, sctx sessionctx.Context, + tid int64, dbName, tableName, partitionName string, +) (float64, bool) { tikvStore, ok := sctx.GetStore().(helper.Storage) if !ok { return 0, false } - regionStats := &helper.PDRegionStats{} - pdHelper := helper.NewHelper(tikvStore) - err := pdHelper.GetPDRegionStats(tid, regionStats, true) + regionStats, err := helper.NewHelper(tikvStore).GetPDRegionStats(ctx, tid, true) failpoint.Inject("calcSampleRateByStorageCount", func() { // Force the TiDB thinking that there's PD and the count of region is small. err = nil @@ -112,7 +116,7 @@ func getApproximateTableCountFromStorage(sctx sessionctx.Context, tid int64, dbN if partitionName != "" { sqlescape.MustFormatSQL(sql, " partition(%n)", partitionName) } - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnStats) rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, nil, sql.String()) if err != nil { return 0, false diff --git a/pkg/executor/internal/pdhelper/pd_test.go b/pkg/executor/internal/pdhelper/pd_test.go index 134cbdced5786..f35c9c9e5a93f 100644 --- a/pkg/executor/internal/pdhelper/pd_test.go +++ b/pkg/executor/internal/pdhelper/pd_test.go @@ -15,6 +15,7 @@ package pdhelper import ( + "context" "testing" "time" @@ -33,7 +34,7 @@ func (m *mockClient) getMissCnt() int { return m.missCnt } -func (m *mockClient) getFakeApproximateTableCountFromStorage(_ sessionctx.Context, _ int64, _, _, _ string) (float64, bool) { +func (m *mockClient) getFakeApproximateTableCountFromStorage(_ context.Context, _ sessionctx.Context, _ int64, _, _, _ string) (float64, bool) { m.missCnt++ return 1.0, true } @@ -47,21 +48,22 @@ func TestTTLCache(t *testing.T) { cacheForApproximateTableCountFromStorage: cache, getApproximateTableCountFromStorageFunc: globalMockClient.getFakeApproximateTableCountFromStorage, } - helper.GetApproximateTableCountFromStorage(nil, 1, "db", "table", "partition") // Miss + ctx := context.Background() + helper.GetApproximateTableCountFromStorage(ctx, nil, 1, "db", "table", "partition") // Miss require.Equal(t, 1, globalMockClient.getMissCnt()) - helper.GetApproximateTableCountFromStorage(nil, 1, "db", "table", "partition") // Hit + helper.GetApproximateTableCountFromStorage(ctx, nil, 1, "db", "table", "partition") // Hit require.Equal(t, 1, globalMockClient.getMissCnt()) - helper.GetApproximateTableCountFromStorage(nil, 2, "db1", "table1", "partition") // Miss + helper.GetApproximateTableCountFromStorage(ctx, nil, 2, "db1", "table1", "partition") // Miss require.Equal(t, 2, globalMockClient.getMissCnt()) - helper.GetApproximateTableCountFromStorage(nil, 3, "db2", "table2", "partition") // Miss - helper.GetApproximateTableCountFromStorage(nil, 1, "db", "table", "partition") // Miss + helper.GetApproximateTableCountFromStorage(ctx, nil, 3, "db2", "table2", "partition") // Miss + helper.GetApproximateTableCountFromStorage(ctx, nil, 1, "db", "table", "partition") // Miss require.Equal(t, 4, globalMockClient.getMissCnt()) - helper.GetApproximateTableCountFromStorage(nil, 3, "db2", "table2", "partition") // Hit + helper.GetApproximateTableCountFromStorage(ctx, nil, 3, "db2", "table2", "partition") // Hit require.Equal(t, 4, globalMockClient.getMissCnt()) time.Sleep(200 * time.Millisecond) // All is miss. - helper.GetApproximateTableCountFromStorage(nil, 1, "db", "table", "partition") - helper.GetApproximateTableCountFromStorage(nil, 2, "db1", "table1", "partition") - helper.GetApproximateTableCountFromStorage(nil, 3, "db2", "table2", "partition") + helper.GetApproximateTableCountFromStorage(ctx, nil, 1, "db", "table", "partition") + helper.GetApproximateTableCountFromStorage(ctx, nil, 2, "db1", "table1", "partition") + helper.GetApproximateTableCountFromStorage(ctx, nil, 3, "db2", "table2", "partition") require.Equal(t, 7, globalMockClient.getMissCnt()) } diff --git a/pkg/executor/memtable_reader.go b/pkg/executor/memtable_reader.go index 00f340b921318..2093a8c137be3 100644 --- a/pkg/executor/memtable_reader.go +++ b/pkg/executor/memtable_reader.go @@ -47,7 +47,6 @@ import ( "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/execdetails" - "github.com/pingcap/tidb/pkg/util/pdapi" "github.com/pingcap/tidb/pkg/util/set" pd "github.com/tikv/pd/client/http" "google.golang.org/grpc" @@ -199,7 +198,7 @@ func fetchClusterConfig(sctx sessionctx.Context, nodeTypes, nodeAddrs set.String var url string switch typ { case "pd": - url = fmt.Sprintf("%s://%s%s", util.InternalHTTPSchema(), statusAddr, pdapi.Config) + url = fmt.Sprintf("%s://%s%s", util.InternalHTTPSchema(), statusAddr, pd.Config) case "tikv", "tidb", "tiflash": url = fmt.Sprintf("%s://%s/config", util.InternalHTTPSchema(), statusAddr) case "tiproxy": @@ -715,7 +714,7 @@ func (e *hotRegionsHistoryRetriver) startRetrieving( go func(ch chan hotRegionsResult, address string, body *bytes.Buffer) { util.WithRecovery(func() { defer close(ch) - url := fmt.Sprintf("%s://%s%s", util.InternalHTTPSchema(), address, pdapi.HotHistory) + url := fmt.Sprintf("%s://%s%s", util.InternalHTTPSchema(), address, pd.HotHistory) req, err := http.NewRequest(http.MethodGet, url, body) if err != nil { ch <- hotRegionsResult{err: errors.Trace(err)} @@ -886,13 +885,17 @@ func (e *tikvRegionPeersRetriever) retrieve(ctx context.Context, sctx sessionctx Store: tikvStore, RegionCache: tikvStore.GetRegionCache(), } + pdCli, err := tikvHelper.TryGetPDHTTPClient() + if err != nil { + return nil, err + } var regionsInfo, regionsInfoByStoreID []pd.RegionInfo regionMap := make(map[int64]*pd.RegionInfo) storeMap := make(map[int64]struct{}) if len(e.extractor.StoreIDs) == 0 && len(e.extractor.RegionIDs) == 0 { - regionsInfo, err := tikvHelper.PDHTTPClient().GetRegions(ctx) + regionsInfo, err := pdCli.GetRegions(ctx) if err != nil { return nil, err } @@ -903,7 +906,7 @@ func (e *tikvRegionPeersRetriever) retrieve(ctx context.Context, sctx sessionctx // if a region_id located in 1, 4, 7 store we will get all of them when request any store_id, // storeMap is used to filter peers on unexpected stores. storeMap[int64(storeID)] = struct{}{} - storeRegionsInfo, err := tikvHelper.PDHTTPClient().GetRegionsByStoreID(ctx, storeID) + storeRegionsInfo, err := pdCli.GetRegionsByStoreID(ctx, storeID) if err != nil { return nil, err } @@ -926,7 +929,7 @@ func (e *tikvRegionPeersRetriever) retrieve(ctx context.Context, sctx sessionctx // if there is storeIDs, target region_id is fetched by storeIDs, // otherwise we need to fetch it from PD. if len(e.extractor.StoreIDs) == 0 { - regionInfo, err := tikvHelper.PDHTTPClient().GetRegionByID(ctx, regionID) + regionInfo, err := pdCli.GetRegionByID(ctx, regionID) if err != nil { return nil, err } diff --git a/pkg/executor/memtable_reader_test.go b/pkg/executor/memtable_reader_test.go index e0f3cae8dde1d..75451234920c1 100644 --- a/pkg/executor/memtable_reader_test.go +++ b/pkg/executor/memtable_reader_test.go @@ -31,9 +31,9 @@ import ( "github.com/pingcap/sysutil" "github.com/pingcap/tidb/pkg/executor" "github.com/pingcap/tidb/pkg/testkit" - "github.com/pingcap/tidb/pkg/util/pdapi" pmodel "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + pd "github.com/tikv/pd/client/http" "google.golang.org/grpc" ) @@ -149,7 +149,7 @@ func TestTiDBClusterConfig(t *testing.T) { } // pd config - router.Handle(pdapi.Config, fn.Wrap(mockConfig)) + router.Handle(pd.Config, fn.Wrap(mockConfig)) // TiDB/TiKV config router.Handle("/config", fn.Wrap(mockConfig)) // Tiproxy config diff --git a/pkg/executor/set_config.go b/pkg/executor/set_config.go index ca9cebd4dd949..c9a47cd4e4ef3 100644 --- a/pkg/executor/set_config.go +++ b/pkg/executor/set_config.go @@ -33,9 +33,9 @@ import ( "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/chunk" - "github.com/pingcap/tidb/pkg/util/pdapi" "github.com/pingcap/tidb/pkg/util/set" "github.com/pingcap/tidb/pkg/util/stringutil" + pd "github.com/tikv/pd/client/http" ) // SetConfigExec executes 'SET CONFIG' statement. @@ -112,7 +112,7 @@ func (s *SetConfigExec) Next(_ context.Context, req *chunk.Chunk) error { var url string switch serverInfo.ServerType { case "pd": - url = fmt.Sprintf("%s://%s%s", util.InternalHTTPSchema(), serverInfo.StatusAddr, pdapi.Config) + url = fmt.Sprintf("%s://%s%s", util.InternalHTTPSchema(), serverInfo.StatusAddr, pd.Config) case "tikv": url = fmt.Sprintf("%s://%s/config", util.InternalHTTPSchema(), serverInfo.StatusAddr) case "tiflash": diff --git a/pkg/executor/split.go b/pkg/executor/split.go index d56981ef8a363..04c38b9588e15 100644 --- a/pkg/executor/split.go +++ b/pkg/executor/split.go @@ -823,8 +823,12 @@ func getRegionInfo(store helper.Storage, regions []regionMeta) ([]regionMeta, er Store: store, RegionCache: store.GetRegionCache(), } + pdCli, err := tikvHelper.TryGetPDHTTPClient() + if err != nil { + return regions, err + } for i := range regions { - regionInfo, err := tikvHelper.PDHTTPClient().GetRegionByID(context.TODO(), regions[i].region.Id) + regionInfo, err := pdCli.GetRegionByID(context.TODO(), regions[i].region.Id) if err != nil { return nil, err } diff --git a/pkg/executor/tikv_regions_peers_table_test.go b/pkg/executor/tikv_regions_peers_table_test.go index b716cee80fb69..dd8ffb5757cca 100644 --- a/pkg/executor/tikv_regions_peers_table_test.go +++ b/pkg/executor/tikv_regions_peers_table_test.go @@ -28,7 +28,6 @@ import ( "github.com/pingcap/tidb/pkg/store/helper" "github.com/pingcap/tidb/pkg/store/mockstore" "github.com/pingcap/tidb/pkg/testkit" - "github.com/pingcap/tidb/pkg/util/pdapi" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/tikv" pd "github.com/tikv/pd/client/http" @@ -95,7 +94,7 @@ func TestTikvRegionPeers(t *testing.T) { // mock store stats stat mockAddr := strings.TrimPrefix(server.URL, "http://") // mock PD API - router.Handle(pdapi.Status, fn.Wrap(func() (interface{}, error) { + router.Handle(pd.Status, fn.Wrap(func() (interface{}, error) { return struct { Version string `json:"version"` GitHash string `json:"git_hash"` @@ -107,9 +106,9 @@ func TestTikvRegionPeers(t *testing.T) { }, nil })) // mock get regionsInfo by store id - router.HandleFunc(pdapi.StoreRegions+"/"+"{id}", storesRegionsInfoHandler) + router.HandleFunc(pd.RegionsByStoreIDPrefix+"/"+"{id}", storesRegionsInfoHandler) // mock get regionInfo by region id - router.HandleFunc(pdapi.RegionByID+"/"+"{id}", regionsInfoHandler) + router.HandleFunc(pd.RegionByIDPrefix+"/"+"{id}", regionsInfoHandler) defer server.Close() store := testkit.CreateMockStore(t, diff --git a/pkg/infoschema/BUILD.bazel b/pkg/infoschema/BUILD.bazel index 0e62ee3953ca7..c83b8841a2451 100644 --- a/pkg/infoschema/BUILD.bazel +++ b/pkg/infoschema/BUILD.bazel @@ -43,7 +43,6 @@ go_library( "//pkg/util/execdetails", "//pkg/util/logutil", "//pkg/util/mock", - "//pkg/util/pdapi", "//pkg/util/sem", "//pkg/util/set", "//pkg/util/sqlexec", @@ -55,6 +54,7 @@ go_library( "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_log//:log", "@com_github_tikv_client_go_v2//tikv", + "@com_github_tikv_pd_client//http", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//credentials", "@org_golang_google_grpc//credentials/insecure", diff --git a/pkg/infoschema/perfschema/BUILD.bazel b/pkg/infoschema/perfschema/BUILD.bazel index 8291fbcab1bc9..0e300561f33f5 100644 --- a/pkg/infoschema/perfschema/BUILD.bazel +++ b/pkg/infoschema/perfschema/BUILD.bazel @@ -25,10 +25,10 @@ go_library( "//pkg/table/tables", "//pkg/types", "//pkg/util", - "//pkg/util/pdapi", "//pkg/util/profile", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", + "@com_github_tikv_pd_client//http", ], ) @@ -50,9 +50,9 @@ go_test( "//pkg/store/mockstore", "//pkg/testkit", "//pkg/testkit/testsetup", - "//pkg/util/pdapi", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", + "@com_github_tikv_pd_client//http", "@io_opencensus_go//stats/view", "@org_uber_go_goleak//:goleak", ], diff --git a/pkg/infoschema/perfschema/tables.go b/pkg/infoschema/perfschema/tables.go index efc5aefe0f297..22ba0f1e07310 100644 --- a/pkg/infoschema/perfschema/tables.go +++ b/pkg/infoschema/perfschema/tables.go @@ -36,8 +36,8 @@ import ( "github.com/pingcap/tidb/pkg/table/tables" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util" - "github.com/pingcap/tidb/pkg/util/pdapi" "github.com/pingcap/tidb/pkg/util/profile" + pd "github.com/tikv/pd/client/http" ) const ( @@ -242,17 +242,17 @@ func (vt *perfSchemaTable) getRows(ctx context.Context, sctx sessionctx.Context, interval := fmt.Sprintf("%d", profile.CPUProfileInterval/time.Second) fullRows, err = dataForRemoteProfile(sctx, "tikv", "/debug/pprof/profile?seconds="+interval, false) case tableNamePDProfileCPU: - fullRows, err = dataForRemoteProfile(sctx, "pd", pdapi.PProfProfileAPIWithInterval(profile.CPUProfileInterval), false) + fullRows, err = dataForRemoteProfile(sctx, "pd", pd.PProfProfileAPIWithInterval(profile.CPUProfileInterval), false) case tableNamePDProfileMemory: - fullRows, err = dataForRemoteProfile(sctx, "pd", pdapi.PProfHeap, false) + fullRows, err = dataForRemoteProfile(sctx, "pd", pd.PProfHeap, false) case tableNamePDProfileMutex: - fullRows, err = dataForRemoteProfile(sctx, "pd", pdapi.PProfMutex, false) + fullRows, err = dataForRemoteProfile(sctx, "pd", pd.PProfMutex, false) case tableNamePDProfileAllocs: - fullRows, err = dataForRemoteProfile(sctx, "pd", pdapi.PProfAllocs, false) + fullRows, err = dataForRemoteProfile(sctx, "pd", pd.PProfAllocs, false) case tableNamePDProfileBlock: - fullRows, err = dataForRemoteProfile(sctx, "pd", pdapi.PProfBlock, false) + fullRows, err = dataForRemoteProfile(sctx, "pd", pd.PProfBlock, false) case tableNamePDProfileGoroutines: - fullRows, err = dataForRemoteProfile(sctx, "pd", pdapi.PProfGoroutineWithDebugLevel(2), true) + fullRows, err = dataForRemoteProfile(sctx, "pd", pd.PProfGoroutineWithDebugLevel(2), true) case tableNameSessionVariables: fullRows, err = infoschema.GetDataFromSessionVariables(ctx, sctx) case tableNameSessionConnectAttrs: diff --git a/pkg/infoschema/perfschema/tables_test.go b/pkg/infoschema/perfschema/tables_test.go index b68fcd525aa18..cb78f06533738 100644 --- a/pkg/infoschema/perfschema/tables_test.go +++ b/pkg/infoschema/perfschema/tables_test.go @@ -31,8 +31,8 @@ import ( "github.com/pingcap/tidb/pkg/session" "github.com/pingcap/tidb/pkg/store/mockstore" "github.com/pingcap/tidb/pkg/testkit" - "github.com/pingcap/tidb/pkg/util/pdapi" "github.com/stretchr/testify/require" + pd "github.com/tikv/pd/client/http" "go.opencensus.io/stats/view" ) @@ -152,12 +152,12 @@ func TestTiKVProfileCPU(t *testing.T) { } // mock PD profile - router.HandleFunc(pdapi.PProfProfile, copyHandler("testdata/test.pprof")) - router.HandleFunc(pdapi.PProfHeap, handlerFactory("heap")) - router.HandleFunc(pdapi.PProfMutex, handlerFactory("mutex")) - router.HandleFunc(pdapi.PProfAllocs, handlerFactory("allocs")) - router.HandleFunc(pdapi.PProfBlock, handlerFactory("block")) - router.HandleFunc(pdapi.PProfGoroutine, handlerFactory("goroutine", 2)) + router.HandleFunc(pd.PProfProfile, copyHandler("testdata/test.pprof")) + router.HandleFunc(pd.PProfHeap, handlerFactory("heap")) + router.HandleFunc(pd.PProfMutex, handlerFactory("mutex")) + router.HandleFunc(pd.PProfAllocs, handlerFactory("allocs")) + router.HandleFunc(pd.PProfBlock, handlerFactory("block")) + router.HandleFunc(pd.PProfGoroutine, handlerFactory("goroutine", 2)) tk.MustQuery("select * from pd_profile_cpu where depth < 3") warnings = tk.Session().GetSessionVars().StmtCtx.GetWarnings() diff --git a/pkg/infoschema/tables.go b/pkg/infoschema/tables.go index 44acfd9bafb54..d2992118bd2e0 100644 --- a/pkg/infoschema/tables.go +++ b/pkg/infoschema/tables.go @@ -54,11 +54,11 @@ import ( "github.com/pingcap/tidb/pkg/util/deadlockhistory" "github.com/pingcap/tidb/pkg/util/execdetails" "github.com/pingcap/tidb/pkg/util/logutil" - "github.com/pingcap/tidb/pkg/util/pdapi" "github.com/pingcap/tidb/pkg/util/sem" "github.com/pingcap/tidb/pkg/util/set" "github.com/pingcap/tidb/pkg/util/stmtsummary" "github.com/tikv/client-go/v2/tikv" + pd "github.com/tikv/pd/client/http" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -1853,7 +1853,7 @@ func GetPDServerInfo(ctx sessionctx.Context) ([]ServerInfo, error) { // Try on each member until one succeeds or all fail. for _, addr := range members { // Get PD version, git_hash - url := fmt.Sprintf("%s://%s%s", util.InternalHTTPSchema(), addr, pdapi.Status) + url := fmt.Sprintf("%s://%s%s", util.InternalHTTPSchema(), addr, pd.Status) req, err := http.NewRequest(http.MethodGet, url, nil) if err != nil { ctx.GetSessionVars().StmtCtx.AppendWarning(err) diff --git a/pkg/infoschema/test/clustertablestest/BUILD.bazel b/pkg/infoschema/test/clustertablestest/BUILD.bazel index 61977e7ff7ce3..e0e053c35698a 100644 --- a/pkg/infoschema/test/clustertablestest/BUILD.bazel +++ b/pkg/infoschema/test/clustertablestest/BUILD.bazel @@ -40,7 +40,6 @@ go_test( "//pkg/util/dbterror/exeerrors", "//pkg/util/gctuner", "//pkg/util/memory", - "//pkg/util/pdapi", "//pkg/util/resourcegrouptag", "//pkg/util/set", "//pkg/util/stmtsummary", diff --git a/pkg/infoschema/test/clustertablestest/cluster_tables_test.go b/pkg/infoschema/test/clustertablestest/cluster_tables_test.go index 301c677e793e8..ffd50fb8e798d 100644 --- a/pkg/infoschema/test/clustertablestest/cluster_tables_test.go +++ b/pkg/infoschema/test/clustertablestest/cluster_tables_test.go @@ -49,7 +49,6 @@ import ( "github.com/pingcap/tidb/pkg/testkit/external" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" - "github.com/pingcap/tidb/pkg/util/pdapi" "github.com/pingcap/tidb/pkg/util/resourcegrouptag" "github.com/pingcap/tidb/pkg/util/set" "github.com/pingcap/tidb/pkg/util/stmtsummary" @@ -768,7 +767,7 @@ func (s *clusterTablesSuite) setUpMockPDHTTPServer() (*httptest.Server, string) srv := httptest.NewServer(router) // mock store stats stat mockAddr := strings.TrimPrefix(srv.URL, "http://") - router.Handle(pdapi.Stores, fn.Wrap(func() (*pd.StoresInfo, error) { + router.Handle(pd.Stores, fn.Wrap(func() (*pd.StoresInfo, error) { return &pd.StoresInfo{ Count: 1, Stores: []pd.StoreInfo{ @@ -788,7 +787,7 @@ func (s *clusterTablesSuite) setUpMockPDHTTPServer() (*httptest.Server, string) }, nil })) // mock PD API - router.Handle(pdapi.Status, fn.Wrap(func() (interface{}, error) { + router.Handle(pd.Status, fn.Wrap(func() (interface{}, error) { return struct { Version string `json:"version"` GitHash string `json:"git_hash"` @@ -818,7 +817,7 @@ func (s *clusterTablesSuite) setUpMockPDHTTPServer() (*httptest.Server, string) return configuration, nil } // pd config - router.Handle(pdapi.Config, fn.Wrap(mockConfig)) + router.Handle(pd.Config, fn.Wrap(mockConfig)) // TiDB/TiKV config router.Handle("/config", fn.Wrap(mockConfig)) return srv, mockAddr diff --git a/pkg/server/handler/tikvhandler/BUILD.bazel b/pkg/server/handler/tikvhandler/BUILD.bazel index b5dffea18f4e5..e4ba4a2ae109a 100644 --- a/pkg/server/handler/tikvhandler/BUILD.bazel +++ b/pkg/server/handler/tikvhandler/BUILD.bazel @@ -33,12 +33,12 @@ go_library( "//pkg/util/gcutil", "//pkg/util/hack", "//pkg/util/logutil", - "//pkg/util/pdapi", "//pkg/util/sqlexec", "@com_github_gorilla_mux//:mux", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_log//:log", "@com_github_tikv_client_go_v2//tikv", + "@com_github_tikv_pd_client//http", "@org_uber_go_zap//:zap", ], ) diff --git a/pkg/server/handler/tikvhandler/tikv_handler.go b/pkg/server/handler/tikvhandler/tikv_handler.go index 5b1ef2a3fee16..4f5863a62d817 100644 --- a/pkg/server/handler/tikvhandler/tikv_handler.go +++ b/pkg/server/handler/tikvhandler/tikv_handler.go @@ -60,9 +60,9 @@ import ( "github.com/pingcap/tidb/pkg/util/gcutil" "github.com/pingcap/tidb/pkg/util/hack" "github.com/pingcap/tidb/pkg/util/logutil" - "github.com/pingcap/tidb/pkg/util/pdapi" "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/tikv/client-go/v2/tikv" + pd "github.com/tikv/pd/client/http" "go.uber.org/zap" ) @@ -1201,7 +1201,7 @@ func (h *TableHandler) addScatterSchedule(startKey, endKey []byte, name string) if err != nil { return err } - scheduleURL := fmt.Sprintf("%s://%s%s", util.InternalHTTPSchema(), pdAddrs[0], pdapi.Schedulers) + scheduleURL := fmt.Sprintf("%s://%s%s", util.InternalHTTPSchema(), pdAddrs[0], pd.Schedulers) resp, err := util.InternalHTTPClient().Post(scheduleURL, "application/json", bytes.NewBuffer(v)) if err != nil { return err @@ -1217,7 +1217,7 @@ func (h *TableHandler) deleteScatterSchedule(name string) error { if err != nil { return err } - scheduleURL := fmt.Sprintf("%s://%s%s", util.InternalHTTPSchema(), pdAddrs[0], pdapi.ScatterRangeSchedulerWithName(name)) + scheduleURL := fmt.Sprintf("%s://%s%s", util.InternalHTTPSchema(), pdAddrs[0], pd.ScatterRangeSchedulerWithName(name)) req, err := http.NewRequest(http.MethodDelete, scheduleURL, nil) if err != nil { return err @@ -1403,9 +1403,7 @@ func (h *TableHandler) getRegionsByID(tbl table.Table, id int64, name string) (* } func (h *TableHandler) handleDiskUsageRequest(tbl table.Table, w http.ResponseWriter) { - tableID := tbl.Meta().ID - var stats helper.PDRegionStats - err := h.GetPDRegionStats(tableID, &stats, false) + stats, err := h.GetPDRegionStats(context.Background(), tbl.Meta().ID, false) if err != nil { writeError(w, err) return diff --git a/pkg/store/helper/BUILD.bazel b/pkg/store/helper/BUILD.bazel index 4e46ec6b32ff0..30acad79fb205 100644 --- a/pkg/store/helper/BUILD.bazel +++ b/pkg/store/helper/BUILD.bazel @@ -6,7 +6,6 @@ go_library( importpath = "github.com/pingcap/tidb/pkg/store/helper", visibility = ["//visibility:public"], deps = [ - "//pkg/ddl/placement", "//pkg/kv", "//pkg/parser/model", "//pkg/store/driver/error", @@ -14,7 +13,6 @@ go_library( "//pkg/util", "//pkg/util/codec", "//pkg/util/logutil", - "//pkg/util/pdapi", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/deadlock", "@com_github_pingcap_kvproto//pkg/kvrpcpb", @@ -42,7 +40,6 @@ go_test( "//pkg/store/mockstore", "//pkg/tablecodec", "//pkg/testkit/testsetup", - "//pkg/util/pdapi", "@com_github_gorilla_mux//:mux", "@com_github_pingcap_log//:log", "@com_github_stretchr_testify//require", diff --git a/pkg/store/helper/helper.go b/pkg/store/helper/helper.go index 78d0004acd6f0..acc65afd5b03e 100644 --- a/pkg/store/helper/helper.go +++ b/pkg/store/helper/helper.go @@ -20,12 +20,8 @@ import ( "cmp" "context" "encoding/hex" - "encoding/json" "fmt" - "io" "math" - "net/http" - "net/url" "slices" "strconv" "strings" @@ -34,7 +30,6 @@ import ( "github.com/pingcap/errors" deadlockpb "github.com/pingcap/kvproto/pkg/deadlock" "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/tidb/pkg/ddl/placement" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/model" derr "github.com/pingcap/tidb/pkg/store/driver/error" @@ -42,7 +37,6 @@ import ( "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/codec" "github.com/pingcap/tidb/pkg/util/logutil" - "github.com/pingcap/tidb/pkg/util/pdapi" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" @@ -97,9 +91,13 @@ func NewHelper(store Storage) *Helper { } } -// PDHTTPClient returns the PD HTTP client. -func (h *Helper) PDHTTPClient() pd.Client { - return h.Store.GetPDHTTPClient() +// TryGetPDHTTPClient tries to get a PD HTTP client if it's available. +func (h *Helper) TryGetPDHTTPClient() (pd.Client, error) { + cli := h.Store.GetPDHTTPClient() + if cli == nil { + return nil, errors.New("pd http client unavailable") + } + return cli, nil } // MaxBackoffTimeoutForMvccGet is a derived value from previous implementation possible experiencing value 5000ms. @@ -306,15 +304,16 @@ func (h *Helper) ScrapeHotInfo(ctx context.Context, rw string, allSchemas []*mod // FetchHotRegion fetches the hot region information from PD's http api. func (h *Helper) FetchHotRegion(ctx context.Context, rw string) (map[uint64]RegionMetric, error) { - var ( - regionResp *pd.StoreHotPeersInfos - err error - ) + pdCli, err := h.TryGetPDHTTPClient() + if err != nil { + return nil, err + } + var regionResp *pd.StoreHotPeersInfos switch rw { case HotRead: - regionResp, err = h.PDHTTPClient().GetHotReadRegions(ctx) + regionResp, err = pdCli.GetHotReadRegions(ctx) case HotWrite: - regionResp, err = h.PDHTTPClient().GetHotWriteRegions(ctx) + regionResp, err = pdCli.GetHotWriteRegions(ctx) } if err != nil { return nil, err @@ -789,21 +788,11 @@ func (h *Helper) GetPDAddr() ([]string, error) { return pdAddrs, nil } -// PDRegionStats is the json response from PD. -type PDRegionStats struct { - Count int `json:"count"` - EmptyCount int `json:"empty_count"` - StorageSize int64 `json:"storage_size"` - StorageKeys int64 `json:"storage_keys"` - StoreLeaderCount map[uint64]int `json:"store_leader_count"` - StorePeerCount map[uint64]int `json:"store_peer_count"` -} - -// GetPDRegionStats get the RegionStats by tableID. -func (h *Helper) GetPDRegionStats(tableID int64, stats *PDRegionStats, noIndexStats bool) error { - pdAddrs, err := h.GetPDAddr() +// GetPDRegionStats get the RegionStats by tableID from PD by HTTP API. +func (h *Helper) GetPDRegionStats(ctx context.Context, tableID int64, noIndexStats bool) (*pd.RegionStats, error) { + pdCli, err := h.TryGetPDHTTPClient() if err != nil { - return errors.Trace(err) + return nil, err } var startKey, endKey []byte @@ -817,176 +806,7 @@ func (h *Helper) GetPDRegionStats(tableID int64, stats *PDRegionStats, noIndexSt startKey = codec.EncodeBytes([]byte{}, startKey) endKey = codec.EncodeBytes([]byte{}, endKey) - statURL := fmt.Sprintf("%s://%s%s", - util.InternalHTTPSchema(), - pdAddrs[0], - pdapi.RegionStatsByStartEndKey( - url.QueryEscape(string(startKey)), - url.QueryEscape(string(endKey)), - )) - - resp, err := util.InternalHTTPClient().Get(statURL) - if err != nil { - return errors.Trace(err) - } - defer func() { - if err = resp.Body.Close(); err != nil { - logutil.BgLogger().Error("err", zap.Error(err)) - } - }() - if resp.StatusCode != http.StatusOK { - body, err := io.ReadAll(resp.Body) - if err != nil { - return errors.Errorf("GetPDRegionStats %d: %s", resp.StatusCode, err) - } - return errors.Errorf("GetPDRegionStats %d: %s", resp.StatusCode, string(body)) - } - dec := json.NewDecoder(resp.Body) - - return dec.Decode(stats) -} - -// DeletePlacementRule is to delete placement rule for certain group. -func (h *Helper) DeletePlacementRule(group string, ruleID string) error { - pdAddrs, err := h.GetPDAddr() - if err != nil { - return errors.Trace(err) - } - - deleteURL := fmt.Sprintf("%s://%s%s/%v/%v", - util.InternalHTTPSchema(), - pdAddrs[0], - pdapi.PlacementRule, - group, - ruleID, - ) - - req, err := http.NewRequest("DELETE", deleteURL, nil) - if err != nil { - return errors.Trace(err) - } - - resp, err := util.InternalHTTPClient().Do(req) - if err != nil { - return errors.Trace(err) - } - defer func() { - if err = resp.Body.Close(); err != nil { - logutil.BgLogger().Error("err", zap.Error(err)) - } - }() - if resp.StatusCode != http.StatusOK { - return errors.New("DeletePlacementRule returns error") - } - return nil -} - -// SetPlacementRule is a helper function to set placement rule. -func (h *Helper) SetPlacementRule(rule placement.Rule) error { - pdAddrs, err := h.GetPDAddr() - if err != nil { - return errors.Trace(err) - } - m, _ := json.Marshal(rule) - - postURL := fmt.Sprintf("%s://%s%s", - util.InternalHTTPSchema(), - pdAddrs[0], - pdapi.PlacementRule, - ) - buf := bytes.NewBuffer(m) - resp, err := util.InternalHTTPClient().Post(postURL, "application/json", buf) - if err != nil { - return errors.Trace(err) - } - defer func() { - if err = resp.Body.Close(); err != nil { - logutil.BgLogger().Error("err", zap.Error(err)) - } - }() - if resp.StatusCode != http.StatusOK { - return errors.New("SetPlacementRule returns error") - } - return nil -} - -// GetGroupRules to get all placement rule in a certain group. -func (h *Helper) GetGroupRules(group string) ([]placement.Rule, error) { - pdAddrs, err := h.GetPDAddr() - if err != nil { - return nil, errors.Trace(err) - } - - getURL := fmt.Sprintf("%s://%s%s/%s", - util.InternalHTTPSchema(), - pdAddrs[0], - pdapi.PlacementRulesGroup, - group, - ) - - resp, err := util.InternalHTTPClient().Get(getURL) - if err != nil { - return nil, errors.Trace(err) - } - defer func() { - if err = resp.Body.Close(); err != nil { - logutil.BgLogger().Error("err", zap.Error(err)) - } - }() - - if resp.StatusCode != http.StatusOK { - return nil, errors.New("GetGroupRules returns error") - } - - buf := new(bytes.Buffer) - _, err = buf.ReadFrom(resp.Body) - if err != nil { - return nil, errors.Trace(err) - } - - var rules []placement.Rule - err = json.Unmarshal(buf.Bytes(), &rules) - if err != nil { - return nil, errors.Trace(err) - } - - return rules, nil -} - -// PostAccelerateSchedule sends `regions/accelerate-schedule` request. -func (h *Helper) PostAccelerateSchedule(tableID int64) error { - pdAddrs, err := h.GetPDAddr() - if err != nil { - return errors.Trace(err) - } - startKey := tablecodec.GenTableRecordPrefix(tableID) - endKey := tablecodec.EncodeTablePrefix(tableID + 1) - startKey = codec.EncodeBytes([]byte{}, startKey) - endKey = codec.EncodeBytes([]byte{}, endKey) - - postURL := fmt.Sprintf("%s://%s%s", - util.InternalHTTPSchema(), - pdAddrs[0], - pdapi.AccelerateSchedule) - - input := map[string]string{ - "start_key": url.QueryEscape(string(startKey)), - "end_key": url.QueryEscape(string(endKey)), - } - v, err := json.Marshal(input) - if err != nil { - return errors.Trace(err) - } - resp, err := util.InternalHTTPClient().Post(postURL, "application/json", bytes.NewBuffer(v)) - if err != nil { - return errors.Trace(err) - } - defer func() { - if err = resp.Body.Close(); err != nil { - logutil.BgLogger().Error("err", zap.Error(err)) - } - }() - return nil + return pdCli.GetRegionStatusByKeyRange(ctx, startKey, endKey) } // GetTiFlashTableIDFromEndKey computes tableID from pd rule's endKey. diff --git a/pkg/store/helper/helper_test.go b/pkg/store/helper/helper_test.go index d4ee767f6ecd7..21774cfc13ebe 100644 --- a/pkg/store/helper/helper_test.go +++ b/pkg/store/helper/helper_test.go @@ -32,7 +32,6 @@ import ( "github.com/pingcap/tidb/pkg/store/helper" "github.com/pingcap/tidb/pkg/store/mockstore" "github.com/pingcap/tidb/pkg/tablecodec" - "github.com/pingcap/tidb/pkg/util/pdapi" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/testutils" "github.com/tikv/client-go/v2/tikv" @@ -92,7 +91,9 @@ func TestTiKVRegionsInfo(t *testing.T) { Store: store, RegionCache: store.GetRegionCache(), } - regionsInfo, err := h.PDHTTPClient().GetRegions(context.Background()) + pdCli, err := h.TryGetPDHTTPClient() + require.NoError(t, err) + regionsInfo, err := pdCli.GetRegions(context.Background()) require.NoError(t, err) require.Equal(t, getMockTiKVRegionsInfo(), regionsInfo) } @@ -105,7 +106,10 @@ func TestTiKVStoresStat(t *testing.T) { RegionCache: store.GetRegionCache(), } - stat, err := h.PDHTTPClient().GetStores(context.Background()) + pdCli, err := h.TryGetPDHTTPClient() + require.NoError(t, err) + + stat, err := pdCli.GetStores(context.Background()) require.NoError(t, err) data, err := json.Marshal(stat) @@ -168,9 +172,9 @@ func createMockStore(t *testing.T) (store helper.Storage) { func mockPDHTTPServer() *httptest.Server { router := mux.NewRouter() - router.HandleFunc(pdapi.HotRead, mockHotRegionResponse) - router.HandleFunc(pdapi.Regions, mockTiKVRegionsInfoResponse) - router.HandleFunc(pdapi.Stores, mockStoreStatResponse) + router.HandleFunc(pd.HotRead, mockHotRegionResponse) + router.HandleFunc(pd.Regions, mockTiKVRegionsInfoResponse) + router.HandleFunc(pd.Stores, mockStoreStatResponse) serverMux := http.NewServeMux() serverMux.Handle("/", router) return httptest.NewServer(serverMux) diff --git a/pkg/util/pdapi/BUILD.bazel b/pkg/util/pdapi/BUILD.bazel deleted file mode 100644 index 64859603979a5..0000000000000 --- a/pkg/util/pdapi/BUILD.bazel +++ /dev/null @@ -1,8 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") - -go_library( - name = "pdapi", - srcs = ["const.go"], - importpath = "github.com/pingcap/tidb/pkg/util/pdapi", - visibility = ["//visibility:public"], -) diff --git a/pkg/util/pdapi/const.go b/pkg/util/pdapi/const.go deleted file mode 100644 index be4d201caada3..0000000000000 --- a/pkg/util/pdapi/const.go +++ /dev/null @@ -1,96 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pdapi - -import ( - "fmt" - "time" -) - -// The following constants are the APIs of PD server. -const ( - HotRead = "/pd/api/v1/hotspot/regions/read" - HotHistory = "/pd/api/v1/hotspot/regions/history" - Regions = "/pd/api/v1/regions" - StoreRegions = "/pd/api/v1/regions/store" - EmptyRegions = "/pd/api/v1/regions/check/empty-region" - AccelerateSchedule = "/pd/api/v1/regions/accelerate-schedule" - RegionByID = "/pd/api/v1/region/id" - store = "/pd/api/v1/store" - Stores = "/pd/api/v1/stores" - Status = "/pd/api/v1/status" - RegionStats = "/pd/api/v1/stats/region" - Version = "/pd/api/v1/version" - Config = "/pd/api/v1/config" - ClusterVersion = "/pd/api/v1/config/cluster-version" - ScheduleConfig = "/pd/api/v1/config/schedule" - ReplicateConfig = "/pd/api/v1/config/replicate" - PlacementRule = "/pd/api/v1/config/rule" - PlacementRules = "/pd/api/v1/config/rules" - PlacementRulesGroup = "/pd/api/v1/config/rules/group" - RegionLabelRule = "/pd/api/v1/config/region-label/rule" - Schedulers = "/pd/api/v1/schedulers" - scatterRangeScheduler = "/pd/api/v1/schedulers/scatter-range-" - ResetTS = "/pd/api/v1/admin/reset-ts" - BaseAllocID = "/pd/api/v1/admin/base-alloc-id" - SnapshotRecoveringMark = "/pd/api/v1/admin/cluster/markers/snapshot-recovering" - MinResolvedTS = "/pd/api/v1/min-resolved-ts" - PProfProfile = "/pd/api/v1/debug/pprof/profile" - PProfHeap = "/pd/api/v1/debug/pprof/heap" - PProfMutex = "/pd/api/v1/debug/pprof/mutex" - PProfAllocs = "/pd/api/v1/debug/pprof/allocs" - PProfBlock = "/pd/api/v1/debug/pprof/block" - PProfGoroutine = "/pd/api/v1/debug/pprof/goroutine" -) - -// ConfigWithTTLSeconds returns the config API with the TTL seconds parameter. -func ConfigWithTTLSeconds(ttlSeconds float64) string { - return fmt.Sprintf("%s?ttlSecond=%.0f", Config, ttlSeconds) -} - -// StoreByID returns the store API with store ID parameter. -func StoreByID(id uint64) string { - return fmt.Sprintf("%s/%d", store, id) -} - -// StoreLabelByID returns the store label API with store ID parameter. -func StoreLabelByID(id uint64) string { - return fmt.Sprintf("%s/%d/label", store, id) -} - -// RegionStatsByStartEndKey returns the region stats API with start key and end key parameters. -func RegionStatsByStartEndKey(startKey, endKey string) string { - return fmt.Sprintf("%s?start_key=%s&end_key=%s", RegionStats, startKey, endKey) -} - -// SchedulerByName returns the scheduler API with the given scheduler name. -func SchedulerByName(name string) string { - return fmt.Sprintf("%s/%s", Schedulers, name) -} - -// ScatterRangeSchedulerWithName returns the scatter range scheduler API with name parameter. -func ScatterRangeSchedulerWithName(name string) string { - return fmt.Sprintf("%s%s", scatterRangeScheduler, name) -} - -// PProfProfileAPIWithInterval returns the pprof profile API with interval parameter. -func PProfProfileAPIWithInterval(interval time.Duration) string { - return fmt.Sprintf("%s?seconds=%d", PProfProfile, interval/time.Second) -} - -// PProfGoroutineWithDebugLevel returns the pprof goroutine API with debug level parameter. -func PProfGoroutineWithDebugLevel(level int) string { - return fmt.Sprintf("%s?debug=%d", PProfGoroutine, level) -}