Skip to content

Commit

Permalink
client/http: implement more HTTP APIs (tikv#7371)
Browse files Browse the repository at this point in the history
ref tikv#7300

- Implement more HTTP APIs.
- Use consts more in `Rule` structure.

Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato authored Nov 17, 2023
1 parent f2eaf23 commit dda748a
Show file tree
Hide file tree
Showing 29 changed files with 616 additions and 312 deletions.
112 changes: 101 additions & 11 deletions client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,58 @@ package http
import (
"fmt"
"net/url"
"time"
)

// The following constants are the paths of PD HTTP APIs.
const (
HotRead = "/pd/api/v1/hotspot/regions/read"
HotWrite = "/pd/api/v1/hotspot/regions/write"
Regions = "/pd/api/v1/regions"
regionByID = "/pd/api/v1/region/id"
regionByKey = "/pd/api/v1/region/key"
regionsByKey = "/pd/api/v1/regions/key"
regionsByStoreID = "/pd/api/v1/regions/store"
Stores = "/pd/api/v1/stores"
// Metadata
HotRead = "/pd/api/v1/hotspot/regions/read"
HotWrite = "/pd/api/v1/hotspot/regions/write"
HotHistory = "/pd/api/v1/hotspot/regions/history"
RegionByIDPrefix = "/pd/api/v1/region/id"
regionByKey = "/pd/api/v1/region/key"
Regions = "/pd/api/v1/regions"
regionsByKey = "/pd/api/v1/regions/key"
RegionsByStoreIDPrefix = "/pd/api/v1/regions/store"
EmptyRegions = "/pd/api/v1/regions/check/empty-region"
accelerateSchedule = "/pd/api/v1/regions/accelerate-schedule"
store = "/pd/api/v1/store"
Stores = "/pd/api/v1/stores"
StatsRegion = "/pd/api/v1/stats/region"
// Config
Config = "/pd/api/v1/config"
ClusterVersion = "/pd/api/v1/config/cluster-version"
ScheduleConfig = "/pd/api/v1/config/schedule"
ReplicateConfig = "/pd/api/v1/config/replicate"
// Rule
PlacementRule = "/pd/api/v1/config/rule"
PlacementRules = "/pd/api/v1/config/rules"
placementRulesByGroup = "/pd/api/v1/config/rules/group"
RegionLabelRule = "/pd/api/v1/config/region-label/rule"
// Scheduler
Schedulers = "/pd/api/v1/schedulers"
scatterRangeScheduler = "/pd/api/v1/schedulers/scatter-range-"
// Admin
ResetTS = "/pd/api/v1/admin/reset-ts"
BaseAllocID = "/pd/api/v1/admin/base-alloc-id"
SnapshotRecoveringMark = "/pd/api/v1/admin/cluster/markers/snapshot-recovering"
// Debug
PProfProfile = "/pd/api/v1/debug/pprof/profile"
PProfHeap = "/pd/api/v1/debug/pprof/heap"
PProfMutex = "/pd/api/v1/debug/pprof/mutex"
PProfAllocs = "/pd/api/v1/debug/pprof/allocs"
PProfBlock = "/pd/api/v1/debug/pprof/block"
PProfGoroutine = "/pd/api/v1/debug/pprof/goroutine"
// Others
MinResolvedTSPrefix = "/pd/api/v1/min-resolved-ts"
Status = "/pd/api/v1/status"
Version = "/pd/api/v1/version"
)

// RegionByID returns the path of PD HTTP API to get region by ID.
func RegionByID(regionID uint64) string {
return fmt.Sprintf("%s/%d", regionByID, regionID)
return fmt.Sprintf("%s/%d", RegionByIDPrefix, regionID)
}

// RegionByKey returns the path of PD HTTP API to get region by key.
Expand All @@ -45,10 +79,66 @@ func RegionByKey(key []byte) string {
// RegionsByKey returns the path of PD HTTP API to scan regions with given start key, end key and limit parameters.
func RegionsByKey(startKey, endKey []byte, limit int) string {
return fmt.Sprintf("%s?start_key=%s&end_key=%s&limit=%d",
regionsByKey, url.QueryEscape(string(startKey)), url.QueryEscape(string(endKey)), limit)
regionsByKey,
url.QueryEscape(string(startKey)),
url.QueryEscape(string(endKey)),
limit)
}

// RegionsByStoreID returns the path of PD HTTP API to get regions by store ID.
func RegionsByStoreID(storeID uint64) string {
return fmt.Sprintf("%s/%d", regionsByStoreID, storeID)
return fmt.Sprintf("%s/%d", RegionsByStoreIDPrefix, storeID)
}

// RegionStatsByKeyRange returns the path of PD HTTP API to get region stats by start key and end key.
func RegionStatsByKeyRange(startKey, endKey []byte) string {
return fmt.Sprintf("%s?start_key=%s&end_key=%s",
StatsRegion,
url.QueryEscape(string(startKey)),
url.QueryEscape(string(endKey)))
}

// StoreByID returns the store API with store ID parameter.
func StoreByID(id uint64) string {
return fmt.Sprintf("%s/%d", store, id)
}

// StoreLabelByID returns the store label API with store ID parameter.
func StoreLabelByID(id uint64) string {
return fmt.Sprintf("%s/%d/label", store, id)
}

// ConfigWithTTLSeconds returns the config API with the TTL seconds parameter.
func ConfigWithTTLSeconds(ttlSeconds float64) string {
return fmt.Sprintf("%s?ttlSecond=%.0f", Config, ttlSeconds)
}

// PlacementRulesByGroup returns the path of PD HTTP API to get placement rules by group.
func PlacementRulesByGroup(group string) string {
return fmt.Sprintf("%s/%s", placementRulesByGroup, group)
}

// PlacementRuleByGroupAndID returns the path of PD HTTP API to get placement rule by group and ID.
func PlacementRuleByGroupAndID(group, id string) string {
return fmt.Sprintf("%s/%s/%s", PlacementRule, group, id)
}

// SchedulerByName returns the scheduler API with the given scheduler name.
func SchedulerByName(name string) string {
return fmt.Sprintf("%s/%s", Schedulers, name)
}

// ScatterRangeSchedulerWithName returns the scatter range scheduler API with name parameter.
func ScatterRangeSchedulerWithName(name string) string {
return fmt.Sprintf("%s%s", scatterRangeScheduler, name)
}

// PProfProfileAPIWithInterval returns the pprof profile API with interval parameter.
func PProfProfileAPIWithInterval(interval time.Duration) string {
return fmt.Sprintf("%s?seconds=%d", PProfProfile, interval/time.Second)
}

// PProfGoroutineWithDebugLevel returns the pprof goroutine API with debug level parameter.
func PProfGoroutineWithDebugLevel(level int) string {
return fmt.Sprintf("%s?debug=%d", PProfGoroutine, level)
}
126 changes: 106 additions & 20 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
package http

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"

Expand All @@ -43,12 +45,17 @@ type Client interface {
GetRegionByID(context.Context, uint64) (*RegionInfo, error)
GetRegionByKey(context.Context, []byte) (*RegionInfo, error)
GetRegions(context.Context) (*RegionsInfo, error)
GetRegionsByKey(context.Context, []byte, []byte, int) (*RegionsInfo, error)
GetRegionsByKeyRange(context.Context, []byte, []byte, int) (*RegionsInfo, error)
GetRegionsByStoreID(context.Context, uint64) (*RegionsInfo, error)
GetHotReadRegions(context.Context) (*StoreHotPeersInfos, error)
GetHotWriteRegions(context.Context) (*StoreHotPeersInfos, error)
GetRegionStatusByKeyRange(context.Context, []byte, []byte) (*RegionStats, error)
GetStores(context.Context) (*StoresInfo, error)
GetPlacementRulesByGroup(context.Context, string) ([]*Rule, error)
SetPlacementRule(context.Context, *Rule) error
DeletePlacementRule(context.Context, string, string) error
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)
AccelerateSchedule(context.Context, []byte, []byte) error
Close()
}

Expand Down Expand Up @@ -154,16 +161,16 @@ func (c *client) execDuration(name string, duration time.Duration) {
// it consistent with the current implementation of some clients (e.g. TiDB).
func (c *client) requestWithRetry(
ctx context.Context,
name, uri string,
res interface{},
name, uri, method string,
body io.Reader, res interface{},
) error {
var (
err error
addr string
)
for idx := 0; idx < len(c.pdAddrs); idx++ {
addr = c.pdAddrs[idx]
err = c.request(ctx, name, addr, uri, res)
err = c.request(ctx, name, fmt.Sprintf("%s%s", addr, uri), method, body, res)
if err == nil {
break
}
Expand All @@ -175,16 +182,15 @@ func (c *client) requestWithRetry(

func (c *client) request(
ctx context.Context,
name, addr, uri string,
res interface{},
name, url, method string,
body io.Reader, res interface{},
) error {
reqURL := fmt.Sprintf("%s%s", addr, uri)
logFields := []zap.Field{
zap.String("name", name),
zap.String("url", reqURL),
zap.String("url", url),
}
log.Debug("[pd] request the http url", logFields...)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil)
req, err := http.NewRequestWithContext(ctx, method, url, body)
if err != nil {
log.Error("[pd] create http request failed", append(logFields, zap.Error(err))...)
return errors.Trace(err)
Expand Down Expand Up @@ -219,6 +225,10 @@ func (c *client) request(
return errors.Errorf("request pd http api failed with status: '%s'", resp.Status)
}

if res == nil {
return nil
}

err = json.NewDecoder(resp.Body).Decode(res)
if err != nil {
return errors.Trace(err)
Expand All @@ -229,7 +239,9 @@ func (c *client) request(
// GetRegionByID gets the region info by ID.
func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*RegionInfo, error) {
var region RegionInfo
err := c.requestWithRetry(ctx, "GetRegionByID", RegionByID(regionID), &region)
err := c.requestWithRetry(ctx,
"GetRegionByID", RegionByID(regionID),
http.MethodGet, nil, &region)
if err != nil {
return nil, err
}
Expand All @@ -239,7 +251,9 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*RegionInf
// GetRegionByKey gets the region info by key.
func (c *client) GetRegionByKey(ctx context.Context, key []byte) (*RegionInfo, error) {
var region RegionInfo
err := c.requestWithRetry(ctx, "GetRegionByKey", RegionByKey(key), &region)
err := c.requestWithRetry(ctx,
"GetRegionByKey", RegionByKey(key),
http.MethodGet, nil, &region)
if err != nil {
return nil, err
}
Expand All @@ -249,17 +263,21 @@ func (c *client) GetRegionByKey(ctx context.Context, key []byte) (*RegionInfo, e
// GetRegions gets the regions info.
func (c *client) GetRegions(ctx context.Context) (*RegionsInfo, error) {
var regions RegionsInfo
err := c.requestWithRetry(ctx, "GetRegions", Regions, &regions)
err := c.requestWithRetry(ctx,
"GetRegions", Regions,
http.MethodGet, nil, &regions)
if err != nil {
return nil, err
}
return &regions, nil
}

// GetRegionsByKey gets the regions info by key range. If the limit is -1, it will return all regions within the range.
func (c *client) GetRegionsByKey(ctx context.Context, startKey, endKey []byte, limit int) (*RegionsInfo, error) {
// GetRegionsByKeyRange gets the regions info by key range. If the limit is -1, it will return all regions within the range.
func (c *client) GetRegionsByKeyRange(ctx context.Context, startKey, endKey []byte, limit int) (*RegionsInfo, error) {
var regions RegionsInfo
err := c.requestWithRetry(ctx, "GetRegionsByKey", RegionsByKey(startKey, endKey, limit), &regions)
err := c.requestWithRetry(ctx,
"GetRegionsByKeyRange", RegionsByKey(startKey, endKey, limit),
http.MethodGet, nil, &regions)
if err != nil {
return nil, err
}
Expand All @@ -269,7 +287,9 @@ func (c *client) GetRegionsByKey(ctx context.Context, startKey, endKey []byte, l
// GetRegionsByStoreID gets the regions info by store ID.
func (c *client) GetRegionsByStoreID(ctx context.Context, storeID uint64) (*RegionsInfo, error) {
var regions RegionsInfo
err := c.requestWithRetry(ctx, "GetRegionsByStoreID", RegionsByStoreID(storeID), &regions)
err := c.requestWithRetry(ctx,
"GetRegionsByStoreID", RegionsByStoreID(storeID),
http.MethodGet, nil, &regions)
if err != nil {
return nil, err
}
Expand All @@ -279,7 +299,9 @@ func (c *client) GetRegionsByStoreID(ctx context.Context, storeID uint64) (*Regi
// GetHotReadRegions gets the hot read region statistics info.
func (c *client) GetHotReadRegions(ctx context.Context) (*StoreHotPeersInfos, error) {
var hotReadRegions StoreHotPeersInfos
err := c.requestWithRetry(ctx, "GetHotReadRegions", HotRead, &hotReadRegions)
err := c.requestWithRetry(ctx,
"GetHotReadRegions", HotRead,
http.MethodGet, nil, &hotReadRegions)
if err != nil {
return nil, err
}
Expand All @@ -289,23 +311,70 @@ func (c *client) GetHotReadRegions(ctx context.Context) (*StoreHotPeersInfos, er
// GetHotWriteRegions gets the hot write region statistics info.
func (c *client) GetHotWriteRegions(ctx context.Context) (*StoreHotPeersInfos, error) {
var hotWriteRegions StoreHotPeersInfos
err := c.requestWithRetry(ctx, "GetHotWriteRegions", HotWrite, &hotWriteRegions)
err := c.requestWithRetry(ctx,
"GetHotWriteRegions", HotWrite,
http.MethodGet, nil, &hotWriteRegions)
if err != nil {
return nil, err
}
return &hotWriteRegions, nil
}

// GetRegionStatusByKeyRange gets the region status by key range.
func (c *client) GetRegionStatusByKeyRange(ctx context.Context, startKey, endKey []byte) (*RegionStats, error) {
var regionStats RegionStats
err := c.requestWithRetry(ctx,
"GetRegionStatusByKeyRange", RegionStatsByKeyRange(startKey, endKey),
http.MethodGet, nil, &regionStats,
)
if err != nil {
return nil, err
}
return &regionStats, nil
}

// GetStores gets the stores info.
func (c *client) GetStores(ctx context.Context) (*StoresInfo, error) {
var stores StoresInfo
err := c.requestWithRetry(ctx, "GetStores", Stores, &stores)
err := c.requestWithRetry(ctx,
"GetStores", Stores,
http.MethodGet, nil, &stores)
if err != nil {
return nil, err
}
return &stores, nil
}

// GetPlacementRulesByGroup gets the placement rules by group.
func (c *client) GetPlacementRulesByGroup(ctx context.Context, group string) ([]*Rule, error) {
var rules []*Rule
err := c.requestWithRetry(ctx,
"GetPlacementRulesByGroup", PlacementRulesByGroup(group),
http.MethodGet, nil, &rules)
if err != nil {
return nil, err
}
return rules, nil
}

// SetPlacementRule sets the placement rule.
func (c *client) SetPlacementRule(ctx context.Context, rule *Rule) error {
ruleJSON, err := json.Marshal(rule)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"SetPlacementRule", PlacementRule,
http.MethodPost, bytes.NewBuffer(ruleJSON), nil)
}

// DeletePlacementRule deletes the placement rule.
func (c *client) DeletePlacementRule(ctx context.Context, group, id string) error {
return c.requestWithRetry(ctx,
"DeletePlacementRule", PlacementRuleByGroupAndID(group, id),
http.MethodDelete, nil, nil)
}

// GetMinResolvedTSByStoresIDs get min-resolved-ts by stores IDs.
func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uint64) (uint64, map[uint64]uint64, error) {
uri := MinResolvedTSPrefix
Expand All @@ -326,7 +395,9 @@ func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uin
IsRealTime bool `json:"is_real_time,omitempty"`
StoresMinResolvedTS map[uint64]uint64 `json:"stores_min_resolved_ts"`
}{}
err := c.requestWithRetry(ctx, "GetMinResolvedTSByStoresIDs", uri, &resp)
err := c.requestWithRetry(ctx,
"GetMinResolvedTSByStoresIDs", uri,
http.MethodGet, nil, &resp)
if err != nil {
return 0, nil, err
}
Expand All @@ -335,3 +406,18 @@ func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uin
}
return resp.MinResolvedTS, resp.StoresMinResolvedTS, nil
}

// AccelerateSchedule accelerates the scheduling of the regions within the given key range.
func (c *client) AccelerateSchedule(ctx context.Context, startKey, endKey []byte) error {
input := map[string]string{
"start_key": url.QueryEscape(string(startKey)),
"end_key": url.QueryEscape(string(endKey)),
}
inputJSON, err := json.Marshal(input)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"AccelerateSchedule", accelerateSchedule,
http.MethodPost, bytes.NewBuffer(inputJSON), nil)
}
Loading

0 comments on commit dda748a

Please sign in to comment.