Skip to content

Commit

Permalink
Merge branch 'master' into fix-api
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx authored Nov 22, 2023
2 parents 4a669a4 + 57fb020 commit 7c823eb
Show file tree
Hide file tree
Showing 13 changed files with 296 additions and 44 deletions.
15 changes: 14 additions & 1 deletion client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
regionsByKey = "/pd/api/v1/regions/key"
RegionsByStoreIDPrefix = "/pd/api/v1/regions/store"
EmptyRegions = "/pd/api/v1/regions/check/empty-region"
accelerateSchedule = "/pd/api/v1/regions/accelerate-schedule"
AccelerateSchedule = "/pd/api/v1/regions/accelerate-schedule"
store = "/pd/api/v1/store"
Stores = "/pd/api/v1/stores"
StatsRegion = "/pd/api/v1/stats/region"
Expand All @@ -45,7 +45,10 @@ const (
PlacementRule = "/pd/api/v1/config/rule"
PlacementRules = "/pd/api/v1/config/rules"
placementRulesByGroup = "/pd/api/v1/config/rules/group"
PlacementRuleBundle = "/pd/api/v1/config/placement-rule"
RegionLabelRule = "/pd/api/v1/config/region-label/rule"
RegionLabelRules = "/pd/api/v1/config/region-label/rules"
RegionLabelRulesByIDs = "/pd/api/v1/config/region-label/rules/ids"
// Scheduler
Schedulers = "/pd/api/v1/schedulers"
scatterRangeScheduler = "/pd/api/v1/schedulers/scatter-range-"
Expand Down Expand Up @@ -123,6 +126,16 @@ func PlacementRuleByGroupAndID(group, id string) string {
return fmt.Sprintf("%s/%s/%s", PlacementRule, group, id)
}

// PlacementRuleBundleByGroup returns the path of PD HTTP API to get placement rule bundle by group.
func PlacementRuleBundleByGroup(group string) string {
return fmt.Sprintf("%s/%s", PlacementRuleBundle, group)
}

// PlacementRuleBundleWithPartialParameter returns the path of PD HTTP API to get placement rule bundle with partial parameter.
func PlacementRuleBundleWithPartialParameter(partial bool) string {
return fmt.Sprintf("%s?partial=%t", PlacementRuleBundle, partial)
}

// SchedulerByName returns the scheduler API with the given scheduler name.
func SchedulerByName(name string) string {
return fmt.Sprintf("%s/%s", Schedulers, name)
Expand Down
153 changes: 137 additions & 16 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (

// Client is a PD (Placement Driver) HTTP client.
type Client interface {
/* Meta-related interfaces */
GetRegionByID(context.Context, uint64) (*RegionInfo, error)
GetRegionByKey(context.Context, []byte) (*RegionInfo, error)
GetRegions(context.Context) (*RegionsInfo, error)
Expand All @@ -51,11 +52,28 @@ type Client interface {
GetHotWriteRegions(context.Context) (*StoreHotPeersInfos, error)
GetRegionStatusByKeyRange(context.Context, []byte, []byte) (*RegionStats, error)
GetStores(context.Context) (*StoresInfo, error)
/* Rule-related interfaces */
GetAllPlacementRuleBundles(context.Context) ([]*GroupBundle, error)
GetPlacementRuleBundleByGroup(context.Context, string) (*GroupBundle, error)
GetPlacementRulesByGroup(context.Context, string) ([]*Rule, error)
SetPlacementRule(context.Context, *Rule) error
SetPlacementRuleBundles(context.Context, []*GroupBundle, bool) error
DeletePlacementRule(context.Context, string, string) error
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)
GetAllRegionLabelRules(context.Context) ([]*LabelRule, error)
GetRegionLabelRulesByIDs(context.Context, []string) ([]*LabelRule, error)
SetRegionLabelRule(context.Context, *LabelRule) error
PatchRegionLabelRules(context.Context, *LabelRulePatch) error
/* Scheduling-related interfaces */
AccelerateSchedule(context.Context, []byte, []byte) error
/* Other interfaces */
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)

/* Client-related methods */
// WithRespHandler sets and returns a new client with the given HTTP response handler.
// This allows the caller to customize how the response is handled, including error handling logic.
// Additionally, it is important for the caller to handle the content of the response body properly
// in order to ensure that it can be read and marshaled correctly into `res`.
WithRespHandler(func(resp *http.Response, res interface{}) error) Client
Close()
}

Expand All @@ -66,6 +84,8 @@ type client struct {
tlsConf *tls.Config
cli *http.Client

respHandler func(resp *http.Response, res interface{}) error

requestCounter *prometheus.CounterVec
executionDuration *prometheus.HistogramVec
}
Expand Down Expand Up @@ -143,6 +163,15 @@ func (c *client) Close() {
log.Info("[pd] http client closed")
}

// WithRespHandler sets and returns a new client with the given HTTP response handler.
func (c *client) WithRespHandler(
handler func(resp *http.Response, res interface{}) error,
) Client {
newClient := *c
newClient.respHandler = handler
return &newClient
}

func (c *client) reqCounter(name, status string) {
if c.requestCounter == nil {
return
Expand Down Expand Up @@ -204,6 +233,12 @@ func (c *client) request(
}
c.execDuration(name, time.Since(start))
c.reqCounter(name, resp.Status)

// Give away the response handling to the caller if the handler is set.
if c.respHandler != nil {
return c.respHandler(resp, res)
}

defer func() {
err = resp.Body.Close()
if err != nil {
Expand Down Expand Up @@ -345,6 +380,30 @@ func (c *client) GetStores(ctx context.Context) (*StoresInfo, error) {
return &stores, nil
}

// GetAllPlacementRuleBundles gets all placement rules bundles.
func (c *client) GetAllPlacementRuleBundles(ctx context.Context) ([]*GroupBundle, error) {
var bundles []*GroupBundle
err := c.requestWithRetry(ctx,
"GetPlacementRuleBundle", PlacementRuleBundle,
http.MethodGet, nil, &bundles)
if err != nil {
return nil, err
}
return bundles, nil
}

// GetPlacementRuleBundleByGroup gets the placement rules bundle by group.
func (c *client) GetPlacementRuleBundleByGroup(ctx context.Context, group string) (*GroupBundle, error) {
var bundle GroupBundle
err := c.requestWithRetry(ctx,
"GetPlacementRuleBundleByGroup", PlacementRuleBundleByGroup(group),
http.MethodGet, nil, &bundle)
if err != nil {
return nil, err
}
return &bundle, nil
}

// GetPlacementRulesByGroup gets the placement rules by group.
func (c *client) GetPlacementRulesByGroup(ctx context.Context, group string) ([]*Rule, error) {
var rules []*Rule
Expand All @@ -368,13 +427,90 @@ func (c *client) SetPlacementRule(ctx context.Context, rule *Rule) error {
http.MethodPost, bytes.NewBuffer(ruleJSON), nil)
}

// SetPlacementRuleBundles sets the placement rule bundles.
// If `partial` is false, all old configurations will be over-written and dropped.
func (c *client) SetPlacementRuleBundles(ctx context.Context, bundles []*GroupBundle, partial bool) error {
bundlesJSON, err := json.Marshal(bundles)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"SetPlacementRuleBundles", PlacementRuleBundleWithPartialParameter(partial),
http.MethodPost, bytes.NewBuffer(bundlesJSON), nil)
}

// DeletePlacementRule deletes the placement rule.
func (c *client) DeletePlacementRule(ctx context.Context, group, id string) error {
return c.requestWithRetry(ctx,
"DeletePlacementRule", PlacementRuleByGroupAndID(group, id),
http.MethodDelete, nil, nil)
}

// GetAllRegionLabelRules gets all region label rules.
func (c *client) GetAllRegionLabelRules(ctx context.Context) ([]*LabelRule, error) {
var labelRules []*LabelRule
err := c.requestWithRetry(ctx,
"GetAllRegionLabelRules", RegionLabelRules,
http.MethodGet, nil, &labelRules)
if err != nil {
return nil, err
}
return labelRules, nil
}

// GetRegionLabelRulesByIDs gets the region label rules by IDs.
func (c *client) GetRegionLabelRulesByIDs(ctx context.Context, ruleIDs []string) ([]*LabelRule, error) {
idsJSON, err := json.Marshal(ruleIDs)
if err != nil {
return nil, errors.Trace(err)
}
var labelRules []*LabelRule
err = c.requestWithRetry(ctx,
"GetRegionLabelRulesByIDs", RegionLabelRulesByIDs,
http.MethodGet, bytes.NewBuffer(idsJSON), &labelRules)
if err != nil {
return nil, err
}
return labelRules, nil
}

// SetRegionLabelRule sets the region label rule.
func (c *client) SetRegionLabelRule(ctx context.Context, labelRule *LabelRule) error {
labelRuleJSON, err := json.Marshal(labelRule)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"SetRegionLabelRule", RegionLabelRule,
http.MethodPost, bytes.NewBuffer(labelRuleJSON), nil)
}

// PatchRegionLabelRules patches the region label rules.
func (c *client) PatchRegionLabelRules(ctx context.Context, labelRulePatch *LabelRulePatch) error {
labelRulePatchJSON, err := json.Marshal(labelRulePatch)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"PatchRegionLabelRules", RegionLabelRules,
http.MethodPatch, bytes.NewBuffer(labelRulePatchJSON), nil)
}

// AccelerateSchedule accelerates the scheduling of the regions within the given key range.
func (c *client) AccelerateSchedule(ctx context.Context, startKey, endKey []byte) error {
input := map[string]string{
"start_key": url.QueryEscape(string(startKey)),
"end_key": url.QueryEscape(string(endKey)),
}
inputJSON, err := json.Marshal(input)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"AccelerateSchedule", AccelerateSchedule,
http.MethodPost, bytes.NewBuffer(inputJSON), nil)
}

// GetMinResolvedTSByStoresIDs get min-resolved-ts by stores IDs.
func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uint64) (uint64, map[uint64]uint64, error) {
uri := MinResolvedTSPrefix
Expand Down Expand Up @@ -406,18 +542,3 @@ func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uin
}
return resp.MinResolvedTS, resp.StoresMinResolvedTS, nil
}

// AccelerateSchedule accelerates the scheduling of the regions within the given key range.
func (c *client) AccelerateSchedule(ctx context.Context, startKey, endKey []byte) error {
input := map[string]string{
"start_key": url.QueryEscape(string(startKey)),
"end_key": url.QueryEscape(string(endKey)),
}
inputJSON, err := json.Marshal(input)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"AccelerateSchedule", accelerateSchedule,
http.MethodPost, bytes.NewBuffer(inputJSON), nil)
}
31 changes: 31 additions & 0 deletions client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,34 @@ type Rule struct {
Version uint64 `json:"version,omitempty"` // only set at runtime, add 1 each time rules updated, begin from 0.
CreateTimestamp uint64 `json:"create_timestamp,omitempty"` // only set at runtime, recorded rule create timestamp
}

// GroupBundle represents a rule group and all rules belong to the group.
type GroupBundle struct {
ID string `json:"group_id"`
Index int `json:"group_index"`
Override bool `json:"group_override"`
Rules []*Rule `json:"rules"`
}

// RegionLabel is the label of a region.
type RegionLabel struct {
Key string `json:"key"`
Value string `json:"value"`
TTL string `json:"ttl,omitempty"`
StartAt string `json:"start_at,omitempty"`
}

// LabelRule is the rule to assign labels to a region.
type LabelRule struct {
ID string `json:"id"`
Index int `json:"index"`
Labels []RegionLabel `json:"labels"`
RuleType string `json:"rule_type"`
Data interface{} `json:"data"`
}

// LabelRulePatch is the patch to update the label rules.
type LabelRulePatch struct {
SetRules []*LabelRule `json:"sets"`
DeleteRules []string `json:"deletes"`
}
4 changes: 3 additions & 1 deletion client/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ func (bo *BackOffer) Exec(
fn func() error,
) error {
if err := fn(); err != nil {
after := time.NewTimer(bo.nextInterval())
defer after.Stop()
select {
case <-ctx.Done():
case <-time.After(bo.nextInterval()):
case <-after.C:
failpoint.Inject("backOffExecute", func() {
testBackOffExecuteFlag = true
})
Expand Down
25 changes: 15 additions & 10 deletions pkg/replication/replication_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,15 +442,6 @@ func (m *ModeManager) tickUpdateState() {
canSync := primaryHasVoter && drHasVoter
hasMajority := totalUpVoter*2 > totalVoter

log.Debug("replication store status",
zap.Uint64s("up-primary", storeIDs[primaryUp]),
zap.Uint64s("up-dr", storeIDs[drUp]),
zap.Uint64s("down-primary", storeIDs[primaryDown]),
zap.Uint64s("down-dr", storeIDs[drDown]),
zap.Bool("can-sync", canSync),
zap.Bool("has-majority", hasMajority),
)

/*
+----+ all region sync +------------+
Expand All @@ -469,7 +460,8 @@ func (m *ModeManager) tickUpdateState() {
*/

switch m.drGetState() {
state := m.drGetState()
switch state {
case drStateSync:
// If hasMajority is false, the cluster is always unavailable. Switch to async won't help.
if !canSync && hasMajority {
Expand Down Expand Up @@ -511,6 +503,19 @@ func (m *ModeManager) tickUpdateState() {
}
}
}

logFunc := log.Debug
if state != m.drGetState() {
logFunc = log.Info
}
logFunc("replication store status",
zap.Uint64s("up-primary", storeIDs[primaryUp]),
zap.Uint64s("up-dr", storeIDs[drUp]),
zap.Uint64s("down-primary", storeIDs[primaryDown]),
zap.Uint64s("down-dr", storeIDs[drDown]),
zap.Bool("can-sync", canSync),
zap.Bool("has-majority", hasMajority),
)
}

func (m *ModeManager) tickReplicateStatus() {
Expand Down
3 changes: 1 addition & 2 deletions pkg/schedule/operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package operator
import (
"context"
"encoding/json"
"fmt"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -514,7 +513,7 @@ func (suite *operatorTestSuite) TestOpStepTimeout() {
},
}
for i, v := range testData {
fmt.Printf("case:%d\n", i)
suite.T().Logf("case: %d", i)
for _, step := range v.step {
suite.Equal(v.expect, step.Timeout(v.regionSize))
}
Expand Down
Loading

0 comments on commit 7c823eb

Please sign in to comment.