Skip to content

Commit

Permalink
Merge branch 'master' into fips
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Dec 20, 2023
2 parents 9f7526c + ebd2fc0 commit 9f14ea2
Show file tree
Hide file tree
Showing 29 changed files with 771 additions and 315 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ test-tso-consistency: install-tools
CGO_ENABLED=1 go test -race -tags without_dashboard,tso_consistency_test,deadlock $(TSO_INTEGRATION_TEST_PKGS) || { $(FAILPOINT_DISABLE); exit 1; }
@$(FAILPOINT_DISABLE)

REAL_CLUSTER_TEST_PATH := $(ROOT_PATH)/tests/integrations/realtiup
REAL_CLUSTER_TEST_PATH := $(ROOT_PATH)/tests/integrations/realcluster

test-real-cluster:
# testing with the real cluster...
Expand Down Expand Up @@ -305,6 +305,7 @@ clean-test:
clean-build:
# Cleaning building files...
rm -rf .dashboard_download_cache/
rm -rf .dashboard_build_temp/
rm -rf $(BUILD_BIN_PATH)
rm -rf $(GO_TOOLS_BIN_PATH)

Expand Down
2 changes: 1 addition & 1 deletion pd.code-workspace
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
},
{
"name": "real-cluster-tests",
"path": "tests/integrations/realtiup"
"path": "tests/integrations/realcluster"
},
{
"name": "pd-tso-bench",
Expand Down
21 changes: 21 additions & 0 deletions pkg/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,3 +309,24 @@ func NewKeyRange(startKey, endKey string) KeyRange {
EndKey: []byte(endKey),
}
}

// KeyRanges is a slice of KeyRange.
type KeyRanges struct {
krs []*KeyRange
}

// Append appends a KeyRange.
func (rs *KeyRanges) Append(startKey, endKey []byte) {
rs.krs = append(rs.krs, &KeyRange{
StartKey: startKey,
EndKey: endKey,
})
}

// Ranges returns the slice of KeyRange.
func (rs *KeyRanges) Ranges() []*KeyRange {
if rs == nil {
return nil
}
return rs.krs
}
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1330,5 +1330,5 @@ func checkRegionsReplicated(c *gin.Context) {
c.String(http.StatusBadRequest, err.Error())
return
}
c.String(http.StatusOK, state)
c.IndentedJSON(http.StatusOK, state)
}
154 changes: 92 additions & 62 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/schedule/checker"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/placement"
Expand All @@ -36,6 +37,10 @@ type Watcher struct {
cancel context.CancelFunc
wg sync.WaitGroup

// ruleCommonPathPrefix:
// - Key: /pd/{cluster_id}/rule
// - Value: placement.Rule or placement.RuleGroup
ruleCommonPathPrefix string
// rulesPathPrefix:
// - Key: /pd/{cluster_id}/rules/{group_id}-{rule_id}
// - Value: placement.Rule
Expand All @@ -60,8 +65,10 @@ type Watcher struct {
regionLabeler *labeler.RegionLabeler

ruleWatcher *etcdutil.LoopWatcher
groupWatcher *etcdutil.LoopWatcher
labelWatcher *etcdutil.LoopWatcher

// patch is used to cache the placement rule changes.
patch *placement.RuleConfigPatch
}

// NewWatcher creates a new watcher to watch the Placement Rule change from PD API server.
Expand All @@ -79,6 +86,7 @@ func NewWatcher(
ctx: ctx,
cancel: cancel,
rulesPathPrefix: endpoint.RulesPathPrefix(clusterID),
ruleCommonPathPrefix: endpoint.RuleCommonPathPrefix(clusterID),
ruleGroupPathPrefix: endpoint.RuleGroupPathPrefix(clusterID),
regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID),
etcdClient: etcdClient,
Expand All @@ -91,10 +99,6 @@ func NewWatcher(
if err != nil {
return nil, err
}
err = rw.initializeGroupWatcher()
if err != nil {
return nil, err
}
err = rw.initializeRegionLabelWatcher()
if err != nil {
return nil, err
Expand All @@ -103,83 +107,109 @@ func NewWatcher(
}

func (rw *Watcher) initializeRuleWatcher() error {
prefixToTrim := rw.rulesPathPrefix + "/"
var suspectKeyRanges *core.KeyRanges

preEventsFn := func(events []*clientv3.Event) error {
// It will be locked until the postFn is finished.
rw.ruleManager.Lock()
rw.patch = rw.ruleManager.BeginPatch()
suspectKeyRanges = &core.KeyRanges{}
return nil
}

putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
rule, err := placement.NewRuleFromJSON(kv.Value)
if err != nil {
return err
}
// Update the suspect key ranges in the checker.
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
if oldRule := rw.ruleManager.GetRule(rule.GroupID, rule.ID); oldRule != nil {
rw.checkerController.AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey)
key := string(kv.Key)
if strings.HasPrefix(key, rw.rulesPathPrefix) {
log.Info("update placement rule", zap.String("key", key), zap.String("value", string(kv.Value)))
rule, err := placement.NewRuleFromJSON(kv.Value)
if err != nil {
return err
}
// Try to add the rule change to the patch.
if err := rw.ruleManager.AdjustRule(rule, ""); err != nil {
return err
}
rw.patch.SetRule(rule)
// Update the suspect key ranges in lock.
suspectKeyRanges.Append(rule.StartKey, rule.EndKey)
if oldRule := rw.ruleManager.GetRuleLocked(rule.GroupID, rule.ID); oldRule != nil {
suspectKeyRanges.Append(oldRule.StartKey, oldRule.EndKey)
}
return nil
} else if strings.HasPrefix(key, rw.ruleGroupPathPrefix) {
log.Info("update placement rule group", zap.String("key", key), zap.String("value", string(kv.Value)))
ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value)
if err != nil {
return err
}
// Try to add the rule group change to the patch.
rw.patch.SetGroup(ruleGroup)
// Update the suspect key ranges
for _, rule := range rw.ruleManager.GetRulesByGroupLocked(ruleGroup.ID) {
suspectKeyRanges.Append(rule.StartKey, rule.EndKey)
}
return nil
} else {
log.Warn("unknown key when updating placement rule", zap.String("key", key))
return nil
}
return rw.ruleManager.SetRule(rule)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
log.Info("delete placement rule", zap.String("key", key))
ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, prefixToTrim))
if err != nil {
if strings.HasPrefix(key, rw.rulesPathPrefix) {
log.Info("delete placement rule", zap.String("key", key))
ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, rw.rulesPathPrefix+"/"))
if err != nil {
return err
}
rule, err := placement.NewRuleFromJSON([]byte(ruleJSON))
if err != nil {
return err
}
// Try to add the rule change to the patch.
rw.patch.DeleteRule(rule.GroupID, rule.ID)
// Update the suspect key ranges
suspectKeyRanges.Append(rule.StartKey, rule.EndKey)
return err
} else if strings.HasPrefix(key, rw.ruleGroupPathPrefix) {
log.Info("delete placement rule group", zap.String("key", key))
trimmedKey := strings.TrimPrefix(key, rw.ruleGroupPathPrefix+"/")
// Try to add the rule group change to the patch.
rw.patch.DeleteGroup(trimmedKey)
// Update the suspect key ranges
for _, rule := range rw.ruleManager.GetRulesByGroupLocked(trimmedKey) {
suspectKeyRanges.Append(rule.StartKey, rule.EndKey)
}
return nil
} else {
log.Warn("unknown key when deleting placement rule", zap.String("key", key))
return nil
}
rule, err := placement.NewRuleFromJSON([]byte(ruleJSON))
if err != nil {
}
postEventsFn := func(events []*clientv3.Event) error {
defer rw.ruleManager.Unlock()
if err := rw.ruleManager.TryCommitPatch(rw.patch); err != nil {
log.Error("failed to commit patch", zap.Error(err))
return err
}
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
return rw.ruleManager.DeleteRule(rule.GroupID, rule.ID)
for _, kr := range suspectKeyRanges.Ranges() {
rw.checkerController.AddSuspectKeyRange(kr.StartKey, kr.EndKey)
}
return nil
}
rw.ruleWatcher = etcdutil.NewLoopWatcher(
rw.ctx, &rw.wg,
rw.etcdClient,
"scheduling-rule-watcher", rw.rulesPathPrefix,
func([]*clientv3.Event) error { return nil },
"scheduling-rule-watcher", rw.ruleCommonPathPrefix,
preEventsFn,
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
postEventsFn,
clientv3.WithPrefix(),
)
rw.ruleWatcher.StartWatchLoop()
return rw.ruleWatcher.WaitLoad()
}

func (rw *Watcher) initializeGroupWatcher() error {
prefixToTrim := rw.ruleGroupPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value)
if err != nil {
return err
}
// Add all rule key ranges within the group to the suspect key ranges.
for _, rule := range rw.ruleManager.GetRulesByGroup(ruleGroup.ID) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
return rw.ruleManager.SetRuleGroup(ruleGroup)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
log.Info("delete placement rule group", zap.String("key", key))
trimmedKey := strings.TrimPrefix(key, prefixToTrim)
for _, rule := range rw.ruleManager.GetRulesByGroup(trimmedKey) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
return rw.ruleManager.DeleteRuleGroup(trimmedKey)
}
rw.groupWatcher = etcdutil.NewLoopWatcher(
rw.ctx, &rw.wg,
rw.etcdClient,
"scheduling-rule-group-watcher", rw.ruleGroupPathPrefix,
func([]*clientv3.Event) error { return nil },
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithPrefix(),
)
rw.groupWatcher.StartWatchLoop()
return rw.groupWatcher.WaitLoad()
}

func (rw *Watcher) initializeRegionLabelWatcher() error {
prefixToTrim := rw.regionLabelPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
Expand Down
4 changes: 3 additions & 1 deletion pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ func (c *RuleChecker) fixOrphanPeers(region *core.RegionInfo, fit *placement.Reg
}
}

extra := fit.ExtraCount()
// If hasUnhealthyFit is true, try to remove unhealthy orphan peers only if number of OrphanPeers is >= 2.
// Ref https://github.com/tikv/pd/issues/4045
if len(fit.OrphanPeers) >= 2 {
Expand All @@ -576,7 +577,8 @@ func (c *RuleChecker) fixOrphanPeers(region *core.RegionInfo, fit *placement.Reg
ruleCheckerRemoveOrphanPeerCounter.Inc()
return operator.CreateRemovePeerOperator("remove-unhealthy-orphan-peer", c.cluster, 0, region, orphanPeer.StoreId)
}
if hasHealthPeer {
// The healthy orphan peer can be removed to keep the high availability only if the peer count is greater than the rule requirement.
if hasHealthPeer && extra > 0 {
// there already exists a healthy orphan peer, so we can remove other orphan Peers.
ruleCheckerRemoveOrphanPeerCounter.Inc()
// if there exists a disconnected orphan peer, we will pick it to remove firstly.
Expand Down
36 changes: 36 additions & 0 deletions pkg/schedule/checker/rule_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2029,3 +2029,39 @@ func (suite *ruleCheckerTestAdvancedSuite) TestReplaceAnExistingPeerCases() {
suite.ruleManager.DeleteGroupBundle(groupName, false)
}
}

func (suite *ruleCheckerTestSuite) TestRemoveOrphanPeer() {
suite.cluster.AddLabelsStore(1, 1, map[string]string{"zone": "z1", "host": "h1"})
suite.cluster.AddLabelsStore(2, 1, map[string]string{"zone": "z1", "host": "h1"})
suite.cluster.AddLabelsStore(3, 1, map[string]string{"zone": "z1", "host": "h1"})
suite.cluster.AddLabelsStore(4, 1, map[string]string{"zone": "z2", "host": "h1"})
suite.cluster.AddLabelsStore(5, 1, map[string]string{"zone": "z2", "host": "h2"})
suite.cluster.AddLabelsStore(6, 1, map[string]string{"zone": "z2", "host": "h2"})
rule := &placement.Rule{
GroupID: "pd",
ID: "test2",
Role: placement.Voter,
Count: 3,
LabelConstraints: []placement.LabelConstraint{
{
Key: "zone",
Op: placement.In,
Values: []string{"z2"},
},
},
}
suite.ruleManager.SetRule(rule)
suite.ruleManager.DeleteRule("pd", "default")

// case1: regionA has 3 peers but not extra peer can be removed, so it needs to add peer first
suite.cluster.AddLeaderRegionWithRange(1, "200", "300", 1, 2, 3)
op := suite.rc.Check(suite.cluster.GetRegion(1))
suite.NotNil(op)
suite.Equal("add-rule-peer", op.Desc())

// case2: regionB has 4 peers and one extra peer can be removed, so it needs to remove extra peer first
suite.cluster.AddLeaderRegionWithRange(2, "300", "400", 1, 2, 3, 4)
op = suite.rc.Check(suite.cluster.GetRegion(2))
suite.NotNil(op)
suite.Equal("remove-orphan-peer", op.Desc())
}
Loading

0 comments on commit 9f14ea2

Please sign in to comment.