Skip to content

Commit

Permalink
*: integrate PD HTTP client to the store helper (#48276)
Browse files Browse the repository at this point in the history
ref #35319
  • Loading branch information
JmPotato committed Nov 15, 2023
1 parent c3cda23 commit 0f35b6f
Show file tree
Hide file tree
Showing 35 changed files with 295 additions and 497 deletions.
24 changes: 12 additions & 12 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -7119,26 +7119,26 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "b689432454a504f8ba1ad166ebf901584155edc64eed4119a30c07ab52e3af8f",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20231030120815-1362f1e87566",
sha256 = "285edca3320cc8847aceffb5d5471fe7483c49f66795622f71ed819c72635d00",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20231114060955-8fc8a528217e",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231030120815-1362f1e87566.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231030120815-1362f1e87566.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231030120815-1362f1e87566.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231030120815-1362f1e87566.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231114060955-8fc8a528217e.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231114060955-8fc8a528217e.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231114060955-8fc8a528217e.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231114060955-8fc8a528217e.zip",
],
)
go_repository(
name = "com_github_tikv_pd_client",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/pd/client",
sha256 = "cadd6f9cf411690e66fd2e5bad6ef837dd8522d98ddaf79a6a2f9361cbc558c1",
strip_prefix = "github.com/tikv/pd/client@v0.0.0-20231113092444-be31c08186fa",
sha256 = "cb510944ce56555f005fff2d891af3fefa667f37955779b89c35fd40f51deace",
strip_prefix = "github.com/tikv/pd/client@v0.0.0-20231114041114-86831ce71865",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231113092444-be31c08186fa.zip",
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231113092444-be31c08186fa.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231113092444-be31c08186fa.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231113092444-be31c08186fa.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231114041114-86831ce71865.zip",
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231114041114-86831ce71865.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231114041114-86831ce71865.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231114041114-86831ce71865.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ go_library(
"//pkg/parser/mysql",
"//pkg/sessionctx/variable",
"//pkg/statistics/handle",
"//pkg/store/helper",
"//pkg/store/pdtypes",
"//pkg/tablecodec",
"//pkg/util",
Expand Down Expand Up @@ -92,6 +91,7 @@ go_library(
"@com_github_tikv_client_go_v2//txnkv/rangetask",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//http",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//backoff",
"@org_golang_google_grpc//codes",
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/statistics/handle"
"github.com/pingcap/tidb/pkg/store/helper"
"github.com/pingcap/tidb/pkg/store/pdtypes"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util/codec"
Expand All @@ -64,6 +63,7 @@ import (
"github.com/tikv/client-go/v2/oracle"
kvutil "github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
pdhttp "github.com/tikv/pd/client/http"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -1811,7 +1811,7 @@ func (rc *Client) GoWaitTiFlashReady(ctx context.Context, inCh <-chan *CreatedTa
if err != nil {
errCh <- err
}
tiFlashStores := make(map[int64]helper.StoreStat)
tiFlashStores := make(map[int64]pdhttp.StoreInfo)
for _, store := range tikvStats.Stores {
for _, l := range store.Store.Labels {
if l.Key == "engine" && l.Value == "tiflash" {
Expand Down
1 change: 1 addition & 0 deletions dumpling/export/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ go_library(
"@com_github_soheilhy_cmux//:cmux",
"@com_github_spf13_pflag//:pflag",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//http",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
Expand Down
8 changes: 4 additions & 4 deletions dumpling/export/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
dbconfig "github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/store/helper"
pd "github.com/tikv/pd/client/http"
"go.uber.org/multierr"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -1465,19 +1465,19 @@ func GetDBInfo(db *sql.Conn, tables map[string]map[string]struct{}) ([]*model.DB

// GetRegionInfos get region info including regionID, start key, end key from database sql interface.
// start key, end key includes information to help split table
func GetRegionInfos(db *sql.Conn) (*helper.RegionsInfo, error) {
func GetRegionInfos(db *sql.Conn) (*pd.RegionsInfo, error) {
const tableRegionSQL = "SELECT REGION_ID,START_KEY,END_KEY FROM INFORMATION_SCHEMA.TIKV_REGION_STATUS ORDER BY START_KEY;"
var (
regionID int64
startKey, endKey string
)
regionsInfo := &helper.RegionsInfo{Regions: make([]helper.RegionInfo, 0)}
regionsInfo := &pd.RegionsInfo{Regions: make([]pd.RegionInfo, 0)}
err := simpleQuery(db, tableRegionSQL, func(rows *sql.Rows) error {
err := rows.Scan(&regionID, &startKey, &endKey)
if err != nil {
return errors.Trace(err)
}
regionsInfo.Regions = append(regionsInfo.Regions, helper.RegionInfo{
regionsInfo.Regions = append(regionsInfo.Regions, pd.RegionInfo{
ID: regionID,
StartKey: startKey,
EndKey: endKey,
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ require (
github.com/stretchr/testify v1.8.4
github.com/tdakkota/asciicheck v0.2.0
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.8-0.20231030120815-1362f1e87566
github.com/tikv/pd/client v0.0.0-20231113092444-be31c08186fa
github.com/tikv/client-go/v2 v2.0.8-0.20231114060955-8fc8a528217e
github.com/tikv/pd/client v0.0.0-20231114041114-86831ce71865
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966
github.com/twmb/murmur3 v1.1.6
github.com/uber/jaeger-client-go v2.22.1+incompatible
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -991,10 +991,10 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/client-go/v2 v2.0.8-0.20231030120815-1362f1e87566 h1:ULv8/h2S2daBtNDoovptSBC5fJEBKrx0K7E1K8iVOSw=
github.com/tikv/client-go/v2 v2.0.8-0.20231030120815-1362f1e87566/go.mod h1:XiEHwWZfJqgafxW/VEgi1ltGWB9yjwCJBs2kW1xHMY4=
github.com/tikv/pd/client v0.0.0-20231113092444-be31c08186fa h1:qgbTvsjSzU2A9ItK+NUSUHgtvDTeaWk1mGH2Kjbaf7s=
github.com/tikv/pd/client v0.0.0-20231113092444-be31c08186fa/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ=
github.com/tikv/client-go/v2 v2.0.8-0.20231114060955-8fc8a528217e h1:kl8+gDOfPfRqkc1VDhhjhezMvsbfRENYsm/FqSIDnwg=
github.com/tikv/client-go/v2 v2.0.8-0.20231114060955-8fc8a528217e/go.mod h1:fEAE7GS/lta+OasPOacdgy6RlJIRaq9/Cyr2WbSYcBE=
github.com/tikv/pd/client v0.0.0-20231114041114-86831ce71865 h1:Gkvo77EevOpBGIdV1c8gwRqPhVbgLPRy82tXNEFpGTc=
github.com/tikv/pd/client v0.0.0-20231114041114-86831ce71865/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ=
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 h1:quvGphlmUVU+nhpFa4gg4yJyTRJ13reZMDHrKwYw53M=
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966/go.mod h1:27bSVNWSBOHm+qRp1T9qzaIpsWEP6TbUnei/43HK+PQ=
github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs=
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ go_library(
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//txnkv/rangetask",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//http",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
Expand Down
10 changes: 5 additions & 5 deletions pkg/ddl/ddl_tiflash_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ import (
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/store/helper"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
pd "github.com/tikv/pd/client/http"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -111,7 +111,7 @@ func NewPollTiFlashBackoffContext(minThreshold, maxThreshold TiFlashTick, capaci

// TiFlashManagementContext is the context for TiFlash Replica Management
type TiFlashManagementContext struct {
TiFlashStores map[int64]helper.StoreStat
TiFlashStores map[int64]pd.StoreInfo
PollCounter uint64
Backoff *PollTiFlashBackoffContext
// tables waiting for updating progress after become available.
Expand Down Expand Up @@ -206,7 +206,7 @@ func NewTiFlashManagementContext() (*TiFlashManagementContext, error) {
}
return &TiFlashManagementContext{
PollCounter: 0,
TiFlashStores: make(map[int64]helper.StoreStat),
TiFlashStores: make(map[int64]pd.StoreInfo),
Backoff: c,
UpdatingProgressTables: list.New(),
}, nil
Expand Down Expand Up @@ -293,7 +293,7 @@ func LoadTiFlashReplicaInfo(tblInfo *model.TableInfo, tableList *[]TiFlashReplic
}

// UpdateTiFlashHTTPAddress report TiFlash's StatusAddress's port to Pd's etcd.
func (d *ddl) UpdateTiFlashHTTPAddress(store *helper.StoreStat) error {
func (d *ddl) UpdateTiFlashHTTPAddress(store *pd.StoreInfo) error {
host, _, err := net.SplitHostPort(store.Store.StatusAddress)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -338,7 +338,7 @@ func updateTiFlashStores(pollTiFlashContext *TiFlashManagementContext) error {
if err != nil {
return err
}
pollTiFlashContext.TiFlashStores = make(map[int64]helper.StoreStat)
pollTiFlashContext.TiFlashStores = make(map[int64]pd.StoreInfo)
for _, store := range tikvStats.Stores {
for _, l := range store.Store.Labels {
if l.Key == "engine" && l.Value == "tiflash" {
Expand Down
1 change: 1 addition & 0 deletions pkg/domain/infosync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ go_library(
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//http",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_client_v3//concurrency",
"@org_uber_go_zap//:zap",
Expand Down
22 changes: 6 additions & 16 deletions pkg/domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"net/http"
"os"
"path"
"regexp"
"strconv"
"strings"
"sync"
Expand All @@ -47,7 +46,6 @@ import (
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx/binloginfo"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/store/helper"
util2 "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/hack"
Expand All @@ -57,6 +55,7 @@ import (
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
pdhttp "github.com/tikv/pd/client/http"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"go.uber.org/zap"
Expand Down Expand Up @@ -411,7 +410,7 @@ func DeleteTiFlashTableSyncProgress(tableInfo *model.TableInfo) error {
}

// MustGetTiFlashProgress gets tiflash replica progress from tiflashProgressCache, if cache not exist, it calculates progress from PD and TiFlash and inserts progress into cache.
func MustGetTiFlashProgress(tableID int64, replicaCount uint64, tiFlashStores *map[int64]helper.StoreStat) (float64, error) {
func MustGetTiFlashProgress(tableID int64, replicaCount uint64, tiFlashStores *map[int64]pdhttp.StoreInfo) (float64, error) {
is, err := getGlobalInfoSyncer()
if err != nil {
return 0, err
Expand All @@ -428,7 +427,7 @@ func MustGetTiFlashProgress(tableID int64, replicaCount uint64, tiFlashStores *m
if err != nil {
return 0, err
}
stores := make(map[int64]helper.StoreStat)
stores := make(map[int64]pdhttp.StoreInfo)
for _, store := range tikvStats.Stores {
for _, l := range store.Store.Labels {
if l.Key == "engine" && l.Value == "tiflash" {
Expand All @@ -448,6 +447,7 @@ func MustGetTiFlashProgress(tableID int64, replicaCount uint64, tiFlashStores *m
return progress, nil
}

// TODO: replace with the unified PD HTTP client.
func doRequest(ctx context.Context, apiName string, addrs []string, route, method string, body io.Reader) ([]byte, error) {
var err error
var req *http.Request
Expand Down Expand Up @@ -501,16 +501,6 @@ func doRequest(ctx context.Context, apiName string, addrs []string, route, metho
return nil, err
}

func removeVAndHash(v string) string {
if v == "" {
return v
}
versionHash := regexp.MustCompile("-[0-9]+-g[0-9a-f]{7,}(-dev)?")
v = versionHash.ReplaceAllLiteralString(v, "")
v = strings.TrimSuffix(v, "-dirty")
return strings.TrimPrefix(v, "v")
}

func doRequestWithFailpoint(req *http.Request) (resp *http.Response, err error) {
fpEnabled := false
failpoint.Inject("FailPlacement", func(val failpoint.Value) {
Expand Down Expand Up @@ -1107,7 +1097,7 @@ func GetLabelRules(ctx context.Context, ruleIDs []string) (map[string]*label.Rul
}

// CalculateTiFlashProgress calculates TiFlash replica progress
func CalculateTiFlashProgress(tableID int64, replicaCount uint64, TiFlashStores map[int64]helper.StoreStat) (float64, error) {
func CalculateTiFlashProgress(tableID int64, replicaCount uint64, TiFlashStores map[int64]pdhttp.StoreInfo) (float64, error) {
is, err := getGlobalInfoSyncer()
if err != nil {
return 0, errors.Trace(err)
Expand Down Expand Up @@ -1195,7 +1185,7 @@ func GetTiFlashRegionCountFromPD(ctx context.Context, tableID int64, regionCount
}

// GetTiFlashStoresStat gets the TiKV store information by accessing PD's api.
func GetTiFlashStoresStat(ctx context.Context) (*helper.StoresStat, error) {
func GetTiFlashStoresStat(ctx context.Context) (*pdhttp.StoresInfo, error) {
is, err := getGlobalInfoSyncer()
if err != nil {
return nil, errors.Trace(err)
Expand Down
Loading

0 comments on commit 0f35b6f

Please sign in to comment.