From 69e2e90c6b21c5229cf023f3589fafb0d3e999d0 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 14 Dec 2023 22:50:36 +0800 Subject: [PATCH 1/7] mcs: watch rule change with txn Signed-off-by: lhy1024 --- pkg/core/basic_cluster.go | 21 ++ pkg/keyspace/tso_keyspace_group.go | 3 +- pkg/mcs/scheduling/server/apis/v1/api.go | 2 +- pkg/mcs/scheduling/server/cluster.go | 2 +- pkg/mcs/scheduling/server/config/watcher.go | 36 +-- pkg/mcs/scheduling/server/meta/watcher.go | 10 +- pkg/mcs/scheduling/server/rule/watcher.go | 212 ++++++++++----- pkg/mock/mockcluster/mockcluster.go | 2 +- pkg/schedule/checker/rule_checker.go | 8 +- pkg/schedule/placement/config.go | 32 ++- pkg/schedule/placement/config_test.go | 40 +-- pkg/schedule/placement/rule_manager.go | 192 +++++++------ pkg/schedule/placement/rule_manager_test.go | 21 +- pkg/schedule/schedulers/shuffle_region.go | 7 +- .../schedulers/shuffle_region_config.go | 4 +- pkg/statistics/region_collection_test.go | 5 +- pkg/storage/endpoint/key_path.go | 6 + pkg/storage/endpoint/rule.go | 55 ++-- pkg/tso/keyspace_group_manager.go | 7 +- pkg/utils/etcdutil/etcdutil.go | 35 ++- pkg/utils/etcdutil/etcdutil_test.go | 15 +- server/cluster/cluster.go | 2 +- server/cluster/cluster_test.go | 10 +- server/keyspace_service.go | 4 +- server/server.go | 3 +- tests/server/api/region_test.go | 59 +++- tests/server/api/rule_test.go | 253 ++++++++++++++++-- 27 files changed, 720 insertions(+), 326 deletions(-) diff --git a/pkg/core/basic_cluster.go b/pkg/core/basic_cluster.go index 2258a816324..d70b620db3b 100644 --- a/pkg/core/basic_cluster.go +++ b/pkg/core/basic_cluster.go @@ -309,3 +309,24 @@ func NewKeyRange(startKey, endKey string) KeyRange { EndKey: []byte(endKey), } } + +// KeyRanges is a slice of KeyRange. +type KeyRanges struct { + krs []*KeyRange +} + +// Append appends a KeyRange. +func (rs *KeyRanges) Append(startKey, endKey []byte) { + rs.krs = append(rs.krs, &KeyRange{ + StartKey: startKey, + EndKey: endKey, + }) +} + +// Ranges returns the slice of KeyRange. +func (rs *KeyRanges) Ranges() []*KeyRange { + if rs == nil { + return nil + } + return rs.krs +} diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index c8694c4a7c6..51a53f75cc2 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -245,9 +245,10 @@ func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client, clusterID ui client, "tso-nodes-watcher", tsoServiceKey, + func([]*clientv3.Event) error { return nil }, putFn, deleteFn, - func() error { return nil }, + func([]*clientv3.Event) error { return nil }, clientv3.WithRange(tsoServiceEndKey), ) } diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index b59780b7a61..e6881f2f85c 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -1330,5 +1330,5 @@ func checkRegionsReplicated(c *gin.Context) { c.String(http.StatusBadRequest, err.Error()) return } - c.String(http.StatusOK, state) + c.IndentedJSON(http.StatusOK, state) } diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index b24db7ac805..5dd1c9f7fce 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -65,7 +65,7 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, cancel() return nil, err } - ruleManager := placement.NewRuleManager(storage, basicCluster, persistConfig) + ruleManager := placement.NewRuleManager(ctx, storage, basicCluster, persistConfig) c := &Cluster{ ctx: ctx, cancel: cancel, diff --git a/pkg/mcs/scheduling/server/config/watcher.go b/pkg/mcs/scheduling/server/config/watcher.go index 4ded93ceb1b..32028592504 100644 --- a/pkg/mcs/scheduling/server/config/watcher.go +++ b/pkg/mcs/scheduling/server/config/watcher.go @@ -139,14 +139,13 @@ func (cw *Watcher) initializeConfigWatcher() error { deleteFn := func(kv *mvccpb.KeyValue) error { return nil } - postEventFn := func() error { - return nil - } cw.configWatcher = etcdutil.NewLoopWatcher( cw.ctx, &cw.wg, cw.etcdClient, "scheduling-config-watcher", cw.configPath, - putFn, deleteFn, postEventFn, + func([]*clientv3.Event) error { return nil }, + putFn, deleteFn, + func([]*clientv3.Event) error { return nil }, ) cw.configWatcher.StartWatchLoop() return cw.configWatcher.WaitLoad() @@ -154,7 +153,7 @@ func (cw *Watcher) initializeConfigWatcher() error { func (cw *Watcher) initializeTTLConfigWatcher() error { putFn := func(kv *mvccpb.KeyValue) error { - key := string(kv.Key)[len(sc.TTLConfigPrefix)+1:] + key := strings.TrimPrefix(string(kv.Key), sc.TTLConfigPrefix+"/") value := string(kv.Value) leaseID := kv.Lease resp, err := cw.etcdClient.TimeToLive(cw.ctx, clientv3.LeaseID(leaseID)) @@ -166,18 +165,18 @@ func (cw *Watcher) initializeTTLConfigWatcher() error { return nil } deleteFn := func(kv *mvccpb.KeyValue) error { - key := string(kv.Key)[len(sc.TTLConfigPrefix)+1:] + key := strings.TrimPrefix(string(kv.Key), sc.TTLConfigPrefix+"/") cw.ttl.PutWithTTL(key, nil, 0) return nil } - postEventFn := func() error { - return nil - } cw.ttlConfigWatcher = etcdutil.NewLoopWatcher( cw.ctx, &cw.wg, cw.etcdClient, "scheduling-ttl-config-watcher", cw.ttlConfigPrefix, - putFn, deleteFn, postEventFn, clientv3.WithPrefix(), + func([]*clientv3.Event) error { return nil }, + putFn, deleteFn, + func([]*clientv3.Event) error { return nil }, + clientv3.WithPrefix(), ) cw.ttlConfigWatcher.StartWatchLoop() return cw.ttlConfigWatcher.WaitLoad() @@ -186,13 +185,14 @@ func (cw *Watcher) initializeTTLConfigWatcher() error { func (cw *Watcher) initializeSchedulerConfigWatcher() error { prefixToTrim := cw.schedulerConfigPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { - name := strings.TrimPrefix(string(kv.Key), prefixToTrim) + key := string(kv.Key) + name := strings.TrimPrefix(key, prefixToTrim) log.Info("update scheduler config", zap.String("name", name), zap.String("value", string(kv.Value))) err := cw.storage.SaveSchedulerConfig(name, kv.Value) if err != nil { log.Warn("failed to save scheduler config", - zap.String("event-kv-key", string(kv.Key)), + zap.String("event-kv-key", key), zap.String("trimmed-key", name), zap.Error(err)) return err @@ -204,19 +204,19 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error { return nil } deleteFn := func(kv *mvccpb.KeyValue) error { - log.Info("remove scheduler config", zap.String("key", string(kv.Key))) + key := string(kv.Key) + log.Info("remove scheduler config", zap.String("key", key)) return cw.storage.RemoveSchedulerConfig( - strings.TrimPrefix(string(kv.Key), prefixToTrim), + strings.TrimPrefix(key, prefixToTrim), ) } - postEventFn := func() error { - return nil - } cw.schedulerConfigWatcher = etcdutil.NewLoopWatcher( cw.ctx, &cw.wg, cw.etcdClient, "scheduling-scheduler-config-watcher", cw.schedulerConfigPathPrefix, - putFn, deleteFn, postEventFn, + func([]*clientv3.Event) error { return nil }, + putFn, deleteFn, + func([]*clientv3.Event) error { return nil }, clientv3.WithPrefix(), ) cw.schedulerConfigWatcher.StartWatchLoop() diff --git a/pkg/mcs/scheduling/server/meta/watcher.go b/pkg/mcs/scheduling/server/meta/watcher.go index 6fae537eab9..8bbcb8d070d 100644 --- a/pkg/mcs/scheduling/server/meta/watcher.go +++ b/pkg/mcs/scheduling/server/meta/watcher.go @@ -73,9 +73,10 @@ func NewWatcher( func (w *Watcher) initializeStoreWatcher() error { putFn := func(kv *mvccpb.KeyValue) error { store := &metapb.Store{} + key := string(kv.Key) if err := proto.Unmarshal(kv.Value, store); err != nil { log.Warn("failed to unmarshal store entry", - zap.String("event-kv-key", string(kv.Key)), zap.Error(err)) + zap.String("event-kv-key", key), zap.Error(err)) return err } origin := w.basicCluster.GetStore(store.GetId()) @@ -104,14 +105,13 @@ func (w *Watcher) initializeStoreWatcher() error { } return nil } - postEventFn := func() error { - return nil - } w.storeWatcher = etcdutil.NewLoopWatcher( w.ctx, &w.wg, w.etcdClient, "scheduling-store-watcher", w.storePathPrefix, - putFn, deleteFn, postEventFn, + func([]*clientv3.Event) error { return nil }, + putFn, deleteFn, + func([]*clientv3.Event) error { return nil }, clientv3.WithPrefix(), ) w.storeWatcher.StartWatchLoop() diff --git a/pkg/mcs/scheduling/server/rule/watcher.go b/pkg/mcs/scheduling/server/rule/watcher.go index 912fb9c01e5..fd324a98f1b 100644 --- a/pkg/mcs/scheduling/server/rule/watcher.go +++ b/pkg/mcs/scheduling/server/rule/watcher.go @@ -20,6 +20,7 @@ import ( "sync" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/schedule/checker" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/placement" @@ -36,6 +37,10 @@ type Watcher struct { cancel context.CancelFunc wg sync.WaitGroup + // ruleCommonPathPrefix: + // - Key: /pd/{cluster_id}/rule + // - Value: placement.Rule or placement.RuleGroup + ruleCommonPathPrefix string // rulesPathPrefix: // - Key: /pd/{cluster_id}/rules/{group_id}-{rule_id} // - Value: placement.Rule @@ -60,8 +65,10 @@ type Watcher struct { regionLabeler *labeler.RegionLabeler ruleWatcher *etcdutil.LoopWatcher - groupWatcher *etcdutil.LoopWatcher labelWatcher *etcdutil.LoopWatcher + + // patch is used to cache the placement rule changes. + patch *placement.RuleConfigPatch } // NewWatcher creates a new watcher to watch the Placement Rule change from PD API server. @@ -79,6 +86,7 @@ func NewWatcher( ctx: ctx, cancel: cancel, rulesPathPrefix: endpoint.RulesPathPrefix(clusterID), + ruleCommonPathPrefix: endpoint.RuleCommonPathPrefix(clusterID), ruleGroupPathPrefix: endpoint.RuleGroupPathPrefix(clusterID), regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID), etcdClient: etcdClient, @@ -91,10 +99,6 @@ func NewWatcher( if err != nil { return nil, err } - err = rw.initializeGroupWatcher() - if err != nil { - return nil, err - } err = rw.initializeRegionLabelWatcher() if err != nil { return nil, err @@ -103,89 +107,148 @@ func NewWatcher( } func (rw *Watcher) initializeRuleWatcher() error { - prefixToTrim := rw.rulesPathPrefix + "/" + var suspectKeyRanges *core.KeyRanges + + preFn := func(events []*clientv3.Event) error { + suspectKeyRanges = &core.KeyRanges{} + if len(events) != 0 { + rw.ruleManager.Lock() + rw.patch = rw.ruleManager.BeginPatch() + } + return nil + } + putFn := func(kv *mvccpb.KeyValue) error { - log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) - rule, err := placement.NewRuleFromJSON(kv.Value) - if err != nil { + key := string(kv.Key) + if strings.HasPrefix(key, rw.rulesPathPrefix) { + log.Info("update placement rule", zap.String("key", key), zap.String("value", string(kv.Value))) + rule, err := placement.NewRuleFromJSON(kv.Value) + if err != nil { + return err + } + // Try to add the rule to the patch or directly update the rule manager. + err = func() error { + if rw.patch == nil { + return rw.ruleManager.SetRule(rule) + } + if err := rw.ruleManager.AdjustRule(rule, ""); err != nil { + return err + } + rw.patch.SetRule(rule) + return nil + }() + // Update the suspect key ranges + if err == nil { + suspectKeyRanges.Append(rule.StartKey, rule.EndKey) + if oldRule := rw.getRule(rule.GroupID, rule.ID); oldRule != nil { + suspectKeyRanges.Append(oldRule.StartKey, oldRule.EndKey) + } + } return err + } else if strings.HasPrefix(key, rw.ruleGroupPathPrefix) { + log.Info("update placement rule group", zap.String("key", key), zap.String("value", string(kv.Value))) + ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value) + if err != nil { + return err + } + // Try to add the rule to the patch or directly update the rule manager. + err = func() error { + if rw.patch == nil { + return rw.ruleManager.SetRuleGroup(ruleGroup) + } + rw.patch.SetGroup(ruleGroup) + return nil + }() + // Update the suspect key ranges + if err == nil { + for _, rule := range rw.getRulesByGroup(ruleGroup.ID) { + suspectKeyRanges.Append(rule.StartKey, rule.EndKey) + } + } + return err + } else { + log.Warn("unknown key when update placement rule", zap.String("key", key)) + return nil } - // Update the suspect key ranges in the checker. - rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) - if oldRule := rw.ruleManager.GetRule(rule.GroupID, rule.ID); oldRule != nil { - rw.checkerController.AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey) - } - return rw.ruleManager.SetRule(rule) } deleteFn := func(kv *mvccpb.KeyValue) error { key := string(kv.Key) - log.Info("delete placement rule", zap.String("key", key)) - ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, prefixToTrim)) - if err != nil { + if strings.HasPrefix(key, rw.rulesPathPrefix) { + log.Info("delete placement rule", zap.String("key", key)) + ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, rw.rulesPathPrefix+"/")) + if err != nil { + return err + } + rule, err := placement.NewRuleFromJSON([]byte(ruleJSON)) + if err != nil { + return err + } + // Try to add the rule to the patch or directly update the rule manager. + err = func() error { + if rw.patch == nil { + return rw.ruleManager.DeleteRule(rule.GroupID, rule.ID) + } + rw.patch.DeleteRule(rule.GroupID, rule.ID) + return nil + }() + // Update the suspect key ranges + if err == nil { + suspectKeyRanges.Append(rule.StartKey, rule.EndKey) + } return err - } - rule, err := placement.NewRuleFromJSON([]byte(ruleJSON)) - if err != nil { + } else if strings.HasPrefix(key, rw.ruleGroupPathPrefix) { + log.Info("delete placement rule group", zap.String("key", key)) + trimmedKey := strings.TrimPrefix(key, rw.ruleGroupPathPrefix+"/") + // Try to add the rule to the patch or directly update the rule manager. + err := func() error { + if rw.patch == nil { + return rw.ruleManager.DeleteRuleGroup(trimmedKey) + } + rw.patch.DeleteGroup(trimmedKey) + return nil + }() + // Update the suspect key ranges + if err == nil { + for _, rule := range rw.getRulesByGroup(trimmedKey) { + suspectKeyRanges.Append(rule.StartKey, rule.EndKey) + } + } return err + } else { + log.Warn("unknown key when delete placement rule", zap.String("key", key)) + return nil } - rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) - return rw.ruleManager.DeleteRule(rule.GroupID, rule.ID) } - postEventFn := func() error { + postFn := func(events []*clientv3.Event) error { + if len(events) > 0 { + if err := rw.ruleManager.TryCommitPatch(rw.patch); err != nil { + return err + } + rw.ruleManager.Unlock() + } + for _, kr := range suspectKeyRanges.Ranges() { + rw.checkerController.AddSuspectKeyRange(kr.StartKey, kr.EndKey) + } return nil } rw.ruleWatcher = etcdutil.NewLoopWatcher( rw.ctx, &rw.wg, rw.etcdClient, - "scheduling-rule-watcher", rw.rulesPathPrefix, - putFn, deleteFn, postEventFn, + "scheduling-rule-watcher", rw.ruleCommonPathPrefix, + preFn, + putFn, deleteFn, + postFn, clientv3.WithPrefix(), ) rw.ruleWatcher.StartWatchLoop() return rw.ruleWatcher.WaitLoad() } -func (rw *Watcher) initializeGroupWatcher() error { - prefixToTrim := rw.ruleGroupPathPrefix + "/" - putFn := func(kv *mvccpb.KeyValue) error { - log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) - ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value) - if err != nil { - return err - } - // Add all rule key ranges within the group to the suspect key ranges. - for _, rule := range rw.ruleManager.GetRulesByGroup(ruleGroup.ID) { - rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) - } - return rw.ruleManager.SetRuleGroup(ruleGroup) - } - deleteFn := func(kv *mvccpb.KeyValue) error { - key := string(kv.Key) - log.Info("delete placement rule group", zap.String("key", key)) - trimmedKey := strings.TrimPrefix(key, prefixToTrim) - for _, rule := range rw.ruleManager.GetRulesByGroup(trimmedKey) { - rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) - } - return rw.ruleManager.DeleteRuleGroup(trimmedKey) - } - postEventFn := func() error { - return nil - } - rw.groupWatcher = etcdutil.NewLoopWatcher( - rw.ctx, &rw.wg, - rw.etcdClient, - "scheduling-rule-group-watcher", rw.ruleGroupPathPrefix, - putFn, deleteFn, postEventFn, - clientv3.WithPrefix(), - ) - rw.groupWatcher.StartWatchLoop() - return rw.groupWatcher.WaitLoad() -} - func (rw *Watcher) initializeRegionLabelWatcher() error { prefixToTrim := rw.regionLabelPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { - log.Info("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) + key := string(kv.Key) + log.Info("update region label rule", zap.String("key", key), zap.String("value", string(kv.Value))) rule, err := labeler.NewLabelRuleFromJSON(kv.Value) if err != nil { return err @@ -197,14 +260,13 @@ func (rw *Watcher) initializeRegionLabelWatcher() error { log.Info("delete region label rule", zap.String("key", key)) return rw.regionLabeler.DeleteLabelRule(strings.TrimPrefix(key, prefixToTrim)) } - postEventFn := func() error { - return nil - } rw.labelWatcher = etcdutil.NewLoopWatcher( rw.ctx, &rw.wg, rw.etcdClient, "scheduling-region-label-watcher", rw.regionLabelPathPrefix, - putFn, deleteFn, postEventFn, + func([]*clientv3.Event) error { return nil }, + putFn, deleteFn, + func([]*clientv3.Event) error { return nil }, clientv3.WithPrefix(), ) rw.labelWatcher.StartWatchLoop() @@ -216,3 +278,17 @@ func (rw *Watcher) Close() { rw.cancel() rw.wg.Wait() } + +func (rw *Watcher) getRule(groupID, ruleID string) *placement.Rule { + if rw.patch != nil { // patch is not nil means there are locked. + return rw.ruleManager.GetRuleLocked(groupID, ruleID) + } + return rw.ruleManager.GetRule(groupID, ruleID) +} + +func (rw *Watcher) getRulesByGroup(groupID string) []*placement.Rule { + if rw.patch != nil { // patch is not nil means there are locked. + return rw.ruleManager.GetRulesByGroupLocked(groupID) + } + return rw.ruleManager.GetRulesByGroup(groupID) +} diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 01282b40534..6cf7ae143df 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -212,7 +212,7 @@ func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { func (mc *Cluster) initRuleManager() { if mc.RuleManager == nil { - mc.RuleManager = placement.NewRuleManager(mc.GetStorage(), mc, mc.GetSharedConfig()) + mc.RuleManager = placement.NewRuleManager(mc.ctx, mc.GetStorage(), mc, mc.GetSharedConfig()) mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels, mc.GetReplicationConfig().IsolationLevel) } } diff --git a/pkg/schedule/checker/rule_checker.go b/pkg/schedule/checker/rule_checker.go index 553ece09e65..95cc77ade5d 100644 --- a/pkg/schedule/checker/rule_checker.go +++ b/pkg/schedule/checker/rule_checker.go @@ -199,7 +199,7 @@ func (c *RuleChecker) fixRulePeer(region *core.RegionInfo, fit *placement.Region if c.isDownPeer(region, peer) { if c.isStoreDownTimeHitMaxDownTime(peer.GetStoreId()) { ruleCheckerReplaceDownCounter.Inc() - return c.replaceUnexpectRulePeer(region, rf, fit, peer, downStatus) + return c.replaceUnexpectedRulePeer(region, rf, fit, peer, downStatus) } // When witness placement rule is enabled, promotes the witness to voter when region has down voter. if c.isWitnessEnabled() && core.IsVoter(peer) { @@ -211,7 +211,7 @@ func (c *RuleChecker) fixRulePeer(region *core.RegionInfo, fit *placement.Region } if c.isOfflinePeer(peer) { ruleCheckerReplaceOfflineCounter.Inc() - return c.replaceUnexpectRulePeer(region, rf, fit, peer, offlineStatus) + return c.replaceUnexpectedRulePeer(region, rf, fit, peer, offlineStatus) } } // fix loose matched peers. @@ -246,7 +246,7 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, fit *placement.Region continue } ruleCheckerNoStoreThenTryReplace.Inc() - op, err := c.replaceUnexpectRulePeer(region, oldPeerRuleFit, fit, p, "swap-fit") + op, err := c.replaceUnexpectedRulePeer(region, oldPeerRuleFit, fit, p, "swap-fit") if err != nil { return nil, err } @@ -267,7 +267,7 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, fit *placement.Region } // The peer's store may in Offline or Down, need to be replace. -func (c *RuleChecker) replaceUnexpectRulePeer(region *core.RegionInfo, rf *placement.RuleFit, fit *placement.RegionFit, peer *metapb.Peer, status string) (*operator.Operator, error) { +func (c *RuleChecker) replaceUnexpectedRulePeer(region *core.RegionInfo, rf *placement.RuleFit, fit *placement.RegionFit, peer *metapb.Peer, status string) (*operator.Operator, error) { var fastFailover bool // If the store to which the original peer belongs is TiFlash, the new peer cannot be set to witness, nor can it perform fast failover if c.isWitnessEnabled() && !c.cluster.GetStore(peer.StoreId).IsTiFlash() { diff --git a/pkg/schedule/placement/config.go b/pkg/schedule/placement/config.go index 878db4b2e0a..00c0f94b94e 100644 --- a/pkg/schedule/placement/config.go +++ b/pkg/schedule/placement/config.go @@ -79,28 +79,30 @@ func (c *ruleConfig) getGroup(id string) *RuleGroup { return &RuleGroup{ID: id} } -func (c *ruleConfig) beginPatch() *ruleConfigPatch { - return &ruleConfigPatch{ +func (c *ruleConfig) beginPatch() *RuleConfigPatch { + return &RuleConfigPatch{ c: c, mut: newRuleConfig(), } } -// A helper data structure to update ruleConfig. -type ruleConfigPatch struct { +// RuleConfigPatch is a helper data structure to update ruleConfig. +type RuleConfigPatch struct { c *ruleConfig // original configuration to be updated mut *ruleConfig // record all to-commit rules and groups } -func (p *ruleConfigPatch) setRule(r *Rule) { +// SetRule sets a rule to the patch. +func (p *RuleConfigPatch) SetRule(r *Rule) { p.mut.rules[r.Key()] = r } -func (p *ruleConfigPatch) deleteRule(group, id string) { +// DeleteRule deletes a rule from the patch. +func (p *RuleConfigPatch) DeleteRule(group, id string) { p.mut.rules[[2]string{group, id}] = nil } -func (p *ruleConfigPatch) getGroup(id string) *RuleGroup { +func (p *RuleConfigPatch) getGroup(id string) *RuleGroup { if g, ok := p.mut.groups[id]; ok { return g } @@ -110,15 +112,17 @@ func (p *ruleConfigPatch) getGroup(id string) *RuleGroup { return &RuleGroup{ID: id} } -func (p *ruleConfigPatch) setGroup(g *RuleGroup) { +// SetGroup sets a group to the patch. +func (p *RuleConfigPatch) SetGroup(g *RuleGroup) { p.mut.groups[g.ID] = g } -func (p *ruleConfigPatch) deleteGroup(id string) { - p.setGroup(&RuleGroup{ID: id}) +// DeleteGroup deletes a group from the patch. +func (p *RuleConfigPatch) DeleteGroup(id string) { + p.SetGroup(&RuleGroup{ID: id}) } -func (p *ruleConfigPatch) iterateRules(f func(*Rule)) { +func (p *RuleConfigPatch) iterateRules(f func(*Rule)) { for _, r := range p.mut.rules { if r != nil { // nil means delete. f(r) @@ -131,13 +135,13 @@ func (p *ruleConfigPatch) iterateRules(f func(*Rule)) { } } -func (p *ruleConfigPatch) adjust() { +func (p *RuleConfigPatch) adjust() { // setup rule.group for `buildRuleList` use. p.iterateRules(func(r *Rule) { r.group = p.getGroup(r.GroupID) }) } // trim unnecessary updates. For example, remove a rule then insert the same rule. -func (p *ruleConfigPatch) trim() { +func (p *RuleConfigPatch) trim() { for key, rule := range p.mut.rules { if jsonEquals(rule, p.c.getRule(key)) { delete(p.mut.rules, key) @@ -151,7 +155,7 @@ func (p *ruleConfigPatch) trim() { } // merge all mutations to ruleConfig. -func (p *ruleConfigPatch) commit() { +func (p *RuleConfigPatch) commit() { for key, rule := range p.mut.rules { if rule == nil { delete(p.c.rules, key) diff --git a/pkg/schedule/placement/config_test.go b/pkg/schedule/placement/config_test.go index 8f7161a56d7..ccee8837331 100644 --- a/pkg/schedule/placement/config_test.go +++ b/pkg/schedule/placement/config_test.go @@ -30,40 +30,40 @@ func TestTrim(t *testing.T) { rc.setGroup(&RuleGroup{ID: "g2", Index: 2}) testCases := []struct { - ops func(p *ruleConfigPatch) + ops func(p *RuleConfigPatch) mutRules map[[2]string]*Rule mutGroups map[string]*RuleGroup }{ { - func(p *ruleConfigPatch) { - p.setRule(&Rule{GroupID: "g1", ID: "id1", Index: 100}) - p.setRule(&Rule{GroupID: "g1", ID: "id2"}) - p.setGroup(&RuleGroup{ID: "g1", Index: 100}) - p.setGroup(&RuleGroup{ID: "g2", Index: 2}) + func(p *RuleConfigPatch) { + p.SetRule(&Rule{GroupID: "g1", ID: "id1", Index: 100}) + p.SetRule(&Rule{GroupID: "g1", ID: "id2"}) + p.SetGroup(&RuleGroup{ID: "g1", Index: 100}) + p.SetGroup(&RuleGroup{ID: "g2", Index: 2}) }, map[[2]string]*Rule{{"g1", "id1"}: {GroupID: "g1", ID: "id1", Index: 100}}, map[string]*RuleGroup{"g1": {ID: "g1", Index: 100}}, }, { - func(p *ruleConfigPatch) { - p.deleteRule("g1", "id1") - p.deleteGroup("g2") - p.deleteRule("g3", "id3") - p.deleteGroup("g3") + func(p *RuleConfigPatch) { + p.DeleteRule("g1", "id1") + p.DeleteGroup("g2") + p.DeleteRule("g3", "id3") + p.DeleteGroup("g3") }, map[[2]string]*Rule{{"g1", "id1"}: nil}, map[string]*RuleGroup{"g2": {ID: "g2"}}, }, { - func(p *ruleConfigPatch) { - p.setRule(&Rule{GroupID: "g1", ID: "id2", Index: 200}) - p.setRule(&Rule{GroupID: "g1", ID: "id2"}) - p.setRule(&Rule{GroupID: "g3", ID: "id3"}) - p.deleteRule("g3", "id3") - p.setGroup(&RuleGroup{ID: "g1", Index: 100}) - p.setGroup(&RuleGroup{ID: "g1", Index: 1}) - p.setGroup(&RuleGroup{ID: "g3", Index: 3}) - p.deleteGroup("g3") + func(p *RuleConfigPatch) { + p.SetRule(&Rule{GroupID: "g1", ID: "id2", Index: 200}) + p.SetRule(&Rule{GroupID: "g1", ID: "id2"}) + p.SetRule(&Rule{GroupID: "g3", ID: "id3"}) + p.DeleteRule("g3", "id3") + p.SetGroup(&RuleGroup{ID: "g1", Index: 100}) + p.SetGroup(&RuleGroup{ID: "g1", Index: 1}) + p.SetGroup(&RuleGroup{ID: "g3", Index: 3}) + p.DeleteGroup("g3") }, map[[2]string]*Rule{}, map[string]*RuleGroup{}, diff --git a/pkg/schedule/placement/rule_manager.go b/pkg/schedule/placement/rule_manager.go index e25b8802b45..7acf7919570 100644 --- a/pkg/schedule/placement/rule_manager.go +++ b/pkg/schedule/placement/rule_manager.go @@ -16,6 +16,7 @@ package placement import ( "bytes" + "context" "encoding/hex" "encoding/json" "fmt" @@ -32,6 +33,7 @@ import ( "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/syncutil" "go.uber.org/zap" "golang.org/x/exp/slices" @@ -49,6 +51,7 @@ const ( // RuleManager is responsible for the lifecycle of all placement Rules. // It is thread safe. type RuleManager struct { + ctx context.Context storage endpoint.RuleStorage syncutil.RWMutex initialized bool @@ -63,8 +66,9 @@ type RuleManager struct { } // NewRuleManager creates a RuleManager instance. -func NewRuleManager(storage endpoint.RuleStorage, storeSetInformer core.StoreSetInformer, conf config.SharedConfigProvider) *RuleManager { +func NewRuleManager(ctx context.Context, storage endpoint.RuleStorage, storeSetInformer core.StoreSetInformer, conf config.SharedConfigProvider) *RuleManager { return &RuleManager{ + ctx: ctx, storage: storage, storeSetInformer: storeSetInformer, conf: conf, @@ -125,12 +129,17 @@ func (m *RuleManager) Initialize(maxReplica int, locationLabels []string, isolat IsolationLevel: isolationLevel, }) } - for _, defaultRule := range defaultRules { - if err := m.storage.SaveRule(defaultRule.StoreKey(), defaultRule); err != nil { - // TODO: Need to delete the previously successfully saved Rules? - return err + if err := m.storage.RunInTxn(m.ctx, func(txn kv.Txn) (err error) { + for _, defaultRule := range defaultRules { + if err := m.storage.SaveRule(txn, defaultRule.StoreKey(), defaultRule); err != nil { + // TODO: Need to delete the previously successfully saved Rules? + return err + } + m.ruleConfig.setRule(defaultRule) } - m.ruleConfig.setRule(defaultRule) + return nil + }); err != nil { + return err } } m.ruleConfig.adjust() @@ -155,7 +164,7 @@ func (m *RuleManager) loadRules() error { toDelete = append(toDelete, k) return } - err = m.adjustRule(r, "") + err = m.AdjustRule(r, "") if err != nil { log.Error("rule is in bad format", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule, err)) toDelete = append(toDelete, k) @@ -177,17 +186,19 @@ func (m *RuleManager) loadRules() error { if err != nil { return err } - for _, s := range toSave { - if err = m.storage.SaveRule(s.StoreKey(), s); err != nil { - return err + return m.storage.RunInTxn(m.ctx, func(txn kv.Txn) (err error) { + for _, s := range toSave { + if err = m.storage.SaveRule(txn, s.StoreKey(), s); err != nil { + return err + } } - } - for _, d := range toDelete { - if err = m.storage.DeleteRule(d); err != nil { - return err + for _, d := range toDelete { + if err = m.storage.DeleteRule(txn, d); err != nil { + return err + } } - } - return nil + return nil + }) } func (m *RuleManager) loadGroups() error { @@ -201,8 +212,8 @@ func (m *RuleManager) loadGroups() error { }) } -// check and adjust rule from client or storage. -func (m *RuleManager) adjustRule(r *Rule, groupID string) (err error) { +// AdjustRule check and adjust rule from client or storage. +func (m *RuleManager) AdjustRule(r *Rule, groupID string) (err error) { r.StartKey, err = hex.DecodeString(r.StartKeyHex) if err != nil { return errs.ErrHexDecodingString.FastGenByArgs(r.StartKeyHex) @@ -276,6 +287,11 @@ func (m *RuleManager) adjustRule(r *Rule, groupID string) (err error) { func (m *RuleManager) GetRule(group, id string) *Rule { m.RLock() defer m.RUnlock() + return m.GetRuleLocked(group, id) +} + +// GetRuleLocked returns the Rule with the same (group, id). +func (m *RuleManager) GetRuleLocked(group, id string) *Rule { if r := m.ruleConfig.getRule([2]string{group, id}); r != nil { return r.Clone() } @@ -284,14 +300,14 @@ func (m *RuleManager) GetRule(group, id string) *Rule { // SetRule inserts or updates a Rule. func (m *RuleManager) SetRule(rule *Rule) error { - if err := m.adjustRule(rule, ""); err != nil { + if err := m.AdjustRule(rule, ""); err != nil { return err } m.Lock() defer m.Unlock() - p := m.beginPatch() - p.setRule(rule) - if err := m.tryCommitPatch(p); err != nil { + p := m.BeginPatch() + p.SetRule(rule) + if err := m.TryCommitPatch(p); err != nil { return err } log.Info("placement rule updated", zap.String("rule", fmt.Sprint(rule))) @@ -302,9 +318,9 @@ func (m *RuleManager) SetRule(rule *Rule) error { func (m *RuleManager) DeleteRule(group, id string) error { m.Lock() defer m.Unlock() - p := m.beginPatch() - p.deleteRule(group, id) - if err := m.tryCommitPatch(p); err != nil { + p := m.BeginPatch() + p.DeleteRule(group, id) + if err := m.TryCommitPatch(p); err != nil { return err } log.Info("placement rule is removed", zap.String("group", group), zap.String("id", id)) @@ -348,6 +364,11 @@ func (m *RuleManager) GetGroupsCount() int { func (m *RuleManager) GetRulesByGroup(group string) []*Rule { m.RLock() defer m.RUnlock() + return m.GetRulesByGroupLocked(group) +} + +// GetRulesByGroupLocked returns sorted rules of a group. +func (m *RuleManager) GetRulesByGroupLocked(group string) []*Rule { var rules []*Rule for _, r := range m.ruleConfig.rules { if r.GroupID == group { @@ -439,11 +460,13 @@ func (m *RuleManager) CheckIsCachedDirectly(regionID uint64) bool { return ok } -func (m *RuleManager) beginPatch() *ruleConfigPatch { +// BeginPatch returns a patch for multiple changes. +func (m *RuleManager) BeginPatch() *RuleConfigPatch { return m.ruleConfig.beginPatch() } -func (m *RuleManager) tryCommitPatch(patch *ruleConfigPatch) error { +// TryCommitPatch tries to commit a patch. +func (m *RuleManager) TryCommitPatch(patch *RuleConfigPatch) error { patch.adjust() ruleList, err := buildRuleList(patch) @@ -466,49 +489,44 @@ func (m *RuleManager) tryCommitPatch(patch *ruleConfigPatch) error { } func (m *RuleManager) savePatch(p *ruleConfig) error { - // TODO: it is not completely safe - // 1. in case that half of rules applied, error.. we have to cancel persisted rules - // but that may fail too, causing memory/disk inconsistency - // either rely a transaction API, or clients to request again until success - // 2. in case that PD is suddenly down in the loop, inconsistency again - // now we can only rely clients to request again - var err error - for key, r := range p.rules { - if r == nil { - r = &Rule{GroupID: key[0], ID: key[1]} - err = m.storage.DeleteRule(r.StoreKey()) - } else { - err = m.storage.SaveRule(r.StoreKey(), r) - } - if err != nil { - return err - } - } - for id, g := range p.groups { - if g.isDefault() { - err = m.storage.DeleteRuleGroup(id) - } else { - err = m.storage.SaveRuleGroup(id, g) + return m.storage.RunInTxn(m.ctx, func(txn kv.Txn) (err error) { + for key, r := range p.rules { + if r == nil { + r = &Rule{GroupID: key[0], ID: key[1]} + err = m.storage.DeleteRule(txn, r.StoreKey()) + } else { + err = m.storage.SaveRule(txn, r.StoreKey(), r) + } + if err != nil { + return err + } } - if err != nil { - return err + for id, g := range p.groups { + if g.isDefault() { + err = m.storage.DeleteRuleGroup(txn, id) + } else { + err = m.storage.SaveRuleGroup(txn, id, g) + } + if err != nil { + return err + } } - } - return nil + return nil + }) } // SetRules inserts or updates lots of Rules at once. func (m *RuleManager) SetRules(rules []*Rule) error { m.Lock() defer m.Unlock() - p := m.beginPatch() + p := m.BeginPatch() for _, r := range rules { - if err := m.adjustRule(r, ""); err != nil { + if err := m.AdjustRule(r, ""); err != nil { return err } - p.setRule(r) + p.SetRule(r) } - if err := m.tryCommitPatch(p); err != nil { + if err := m.TryCommitPatch(p); err != nil { return err } @@ -543,7 +561,7 @@ func (r RuleOp) String() string { func (m *RuleManager) Batch(todo []RuleOp) error { for _, t := range todo { if t.Action == RuleOpAdd { - err := m.adjustRule(t.Rule, "") + err := m.AdjustRule(t.Rule, "") if err != nil { return err } @@ -553,25 +571,25 @@ func (m *RuleManager) Batch(todo []RuleOp) error { m.Lock() defer m.Unlock() - patch := m.beginPatch() + patch := m.BeginPatch() for _, t := range todo { switch t.Action { case RuleOpAdd: - patch.setRule(t.Rule) + patch.SetRule(t.Rule) case RuleOpDel: if !t.DeleteByIDPrefix { - patch.deleteRule(t.GroupID, t.ID) + patch.DeleteRule(t.GroupID, t.ID) } else { m.ruleConfig.iterateRules(func(r *Rule) { if r.GroupID == t.GroupID && strings.HasPrefix(r.ID, t.ID) { - patch.deleteRule(r.GroupID, r.ID) + patch.DeleteRule(r.GroupID, r.ID) } }) } } } - if err := m.tryCommitPatch(patch); err != nil { + if err := m.TryCommitPatch(patch); err != nil { return err } @@ -605,9 +623,9 @@ func (m *RuleManager) GetRuleGroups() []*RuleGroup { func (m *RuleManager) SetRuleGroup(group *RuleGroup) error { m.Lock() defer m.Unlock() - p := m.beginPatch() - p.setGroup(group) - if err := m.tryCommitPatch(p); err != nil { + p := m.BeginPatch() + p.SetGroup(group) + if err := m.TryCommitPatch(p); err != nil { return err } log.Info("group config updated", zap.String("group", fmt.Sprint(group))) @@ -618,9 +636,9 @@ func (m *RuleManager) SetRuleGroup(group *RuleGroup) error { func (m *RuleManager) DeleteRuleGroup(id string) error { m.Lock() defer m.Unlock() - p := m.beginPatch() - p.deleteGroup(id) - if err := m.tryCommitPatch(p); err != nil { + p := m.BeginPatch() + p.DeleteGroup(id) + if err := m.TryCommitPatch(p); err != nil { return err } log.Info("group config reset", zap.String("group", id)) @@ -678,7 +696,7 @@ func (m *RuleManager) GetGroupBundle(id string) (b GroupBundle) { func (m *RuleManager) SetAllGroupBundles(groups []GroupBundle, override bool) error { m.Lock() defer m.Unlock() - p := m.beginPatch() + p := m.BeginPatch() matchID := func(a string) bool { for _, g := range groups { if g.ID == a { @@ -689,28 +707,28 @@ func (m *RuleManager) SetAllGroupBundles(groups []GroupBundle, override bool) er } for k := range m.ruleConfig.rules { if override || matchID(k[0]) { - p.deleteRule(k[0], k[1]) + p.DeleteRule(k[0], k[1]) } } for id := range m.ruleConfig.groups { if override || matchID(id) { - p.deleteGroup(id) + p.DeleteGroup(id) } } for _, g := range groups { - p.setGroup(&RuleGroup{ + p.SetGroup(&RuleGroup{ ID: g.ID, Index: g.Index, Override: g.Override, }) for _, r := range g.Rules { - if err := m.adjustRule(r, g.ID); err != nil { + if err := m.AdjustRule(r, g.ID); err != nil { return err } - p.setRule(r) + p.SetRule(r) } } - if err := m.tryCommitPatch(p); err != nil { + if err := m.TryCommitPatch(p); err != nil { return err } log.Info("full config reset", zap.String("config", fmt.Sprint(groups))) @@ -722,26 +740,26 @@ func (m *RuleManager) SetAllGroupBundles(groups []GroupBundle, override bool) er func (m *RuleManager) SetGroupBundle(group GroupBundle) error { m.Lock() defer m.Unlock() - p := m.beginPatch() + p := m.BeginPatch() if _, ok := m.ruleConfig.groups[group.ID]; ok { for k := range m.ruleConfig.rules { if k[0] == group.ID { - p.deleteRule(k[0], k[1]) + p.DeleteRule(k[0], k[1]) } } } - p.setGroup(&RuleGroup{ + p.SetGroup(&RuleGroup{ ID: group.ID, Index: group.Index, Override: group.Override, }) for _, r := range group.Rules { - if err := m.adjustRule(r, group.ID); err != nil { + if err := m.AdjustRule(r, group.ID); err != nil { return err } - p.setRule(r) + p.SetRule(r) } - if err := m.tryCommitPatch(p); err != nil { + if err := m.TryCommitPatch(p); err != nil { return err } log.Info("group is reset", zap.String("group", fmt.Sprint(group))) @@ -762,18 +780,18 @@ func (m *RuleManager) DeleteGroupBundle(id string, regex bool) error { matchID = r.MatchString } - p := m.beginPatch() + p := m.BeginPatch() for k := range m.ruleConfig.rules { if matchID(k[0]) { - p.deleteRule(k[0], k[1]) + p.DeleteRule(k[0], k[1]) } } for _, g := range m.ruleConfig.groups { if matchID(g.ID) { - p.deleteGroup(g.ID) + p.DeleteGroup(g.ID) } } - if err := m.tryCommitPatch(p); err != nil { + if err := m.TryCommitPatch(p); err != nil { return err } log.Info("groups are removed", zap.String("id", id), zap.Bool("regexp", regex)) diff --git a/pkg/schedule/placement/rule_manager_test.go b/pkg/schedule/placement/rule_manager_test.go index 68a18b538d4..ea5d7df3640 100644 --- a/pkg/schedule/placement/rule_manager_test.go +++ b/pkg/schedule/placement/rule_manager_test.go @@ -15,6 +15,7 @@ package placement import ( + "context" "encoding/hex" "testing" @@ -32,7 +33,7 @@ func newTestManager(t *testing.T, enableWitness bool) (endpoint.RuleStorage, *Ru re := require.New(t) store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil) var err error - manager := NewRuleManager(store, nil, mockconfig.NewTestOptions()) + manager := NewRuleManager(context.Background(), store, nil, mockconfig.NewTestOptions()) manager.conf.SetEnableWitness(enableWitness) err = manager.Initialize(3, []string{"zone", "rack", "host"}, "") re.NoError(err) @@ -90,23 +91,23 @@ func TestAdjustRule(t *testing.T) { {GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: Voter, Count: -1}, {GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: Voter, Count: 3, LabelConstraints: []LabelConstraint{{Op: "foo"}}}, } - re.NoError(manager.adjustRule(&rules[0], "group")) + re.NoError(manager.AdjustRule(&rules[0], "group")) re.Equal([]byte{0x12, 0x3a, 0xbc}, rules[0].StartKey) re.Equal([]byte{0x12, 0x3a, 0xbf}, rules[0].EndKey) - re.Error(manager.adjustRule(&rules[1], "")) + re.Error(manager.AdjustRule(&rules[1], "")) for i := 2; i < len(rules); i++ { - re.Error(manager.adjustRule(&rules[i], "group")) + re.Error(manager.AdjustRule(&rules[i], "group")) } manager.SetKeyType(constant.Table.String()) - re.Error(manager.adjustRule(&Rule{GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: Voter, Count: 3}, "group")) + re.Error(manager.AdjustRule(&Rule{GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: Voter, Count: 3}, "group")) manager.SetKeyType(constant.Txn.String()) - re.Error(manager.adjustRule(&Rule{GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: Voter, Count: 3}, "group")) + re.Error(manager.AdjustRule(&Rule{GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: Voter, Count: 3}, "group")) - re.Error(manager.adjustRule(&Rule{ + re.Error(manager.AdjustRule(&Rule{ GroupID: "group", ID: "id", StartKeyHex: hex.EncodeToString(codec.EncodeBytes([]byte{0})), @@ -115,7 +116,7 @@ func TestAdjustRule(t *testing.T) { Count: 3, }, "group")) - re.Error(manager.adjustRule(&Rule{ + re.Error(manager.AdjustRule(&Rule{ GroupID: "tiflash", ID: "id", StartKeyHex: hex.EncodeToString(codec.EncodeBytes([]byte{0})), @@ -156,7 +157,7 @@ func TestSaveLoad(t *testing.T) { re.NoError(manager.SetRule(r.Clone())) } - m2 := NewRuleManager(store, nil, nil) + m2 := NewRuleManager(context.Background(), store, nil, nil) err := m2.Initialize(3, []string{"no", "labels"}, "") re.NoError(err) re.Len(m2.GetAllRules(), 3) @@ -174,7 +175,7 @@ func TestSetAfterGet(t *testing.T) { rule.Count = 1 manager.SetRule(rule) - m2 := NewRuleManager(store, nil, nil) + m2 := NewRuleManager(context.Background(), store, nil, nil) err := m2.Initialize(100, []string{}, "") re.NoError(err) rule = m2.GetRule(DefaultGroupID, DefaultRuleID) diff --git a/pkg/schedule/schedulers/shuffle_region.go b/pkg/schedule/schedulers/shuffle_region.go index f1d35e80925..f9bed18d3fa 100644 --- a/pkg/schedule/schedulers/shuffle_region.go +++ b/pkg/schedule/schedulers/shuffle_region.go @@ -139,18 +139,19 @@ func (s *shuffleRegionScheduler) scheduleRemovePeer(cluster sche.SchedulerCluste pendingFilter := filter.NewRegionPendingFilter() downFilter := filter.NewRegionDownFilter() replicaFilter := filter.NewRegionReplicatedFilter(cluster) + ranges := s.conf.GetRanges() for _, source := range candidates.Stores { var region *core.RegionInfo if s.conf.IsRoleAllow(roleFollower) { - region = filter.SelectOneRegion(cluster.RandFollowerRegions(source.GetID(), s.conf.Ranges), nil, + region = filter.SelectOneRegion(cluster.RandFollowerRegions(source.GetID(), ranges), nil, pendingFilter, downFilter, replicaFilter) } if region == nil && s.conf.IsRoleAllow(roleLeader) { - region = filter.SelectOneRegion(cluster.RandLeaderRegions(source.GetID(), s.conf.Ranges), nil, + region = filter.SelectOneRegion(cluster.RandLeaderRegions(source.GetID(), ranges), nil, pendingFilter, downFilter, replicaFilter) } if region == nil && s.conf.IsRoleAllow(roleLearner) { - region = filter.SelectOneRegion(cluster.RandLearnerRegions(source.GetID(), s.conf.Ranges), nil, + region = filter.SelectOneRegion(cluster.RandLearnerRegions(source.GetID(), ranges), nil, pendingFilter, downFilter, replicaFilter) } if region != nil { diff --git a/pkg/schedule/schedulers/shuffle_region_config.go b/pkg/schedule/schedulers/shuffle_region_config.go index 7d04879c992..552d7ea8bce 100644 --- a/pkg/schedule/schedulers/shuffle_region_config.go +++ b/pkg/schedule/schedulers/shuffle_region_config.go @@ -58,7 +58,9 @@ func (conf *shuffleRegionSchedulerConfig) GetRoles() []string { func (conf *shuffleRegionSchedulerConfig) GetRanges() []core.KeyRange { conf.RLock() defer conf.RUnlock() - return conf.Ranges + ranges := make([]core.KeyRange, len(conf.Ranges)) + copy(ranges, conf.Ranges) + return ranges } func (conf *shuffleRegionSchedulerConfig) IsRoleAllow(role string) bool { diff --git a/pkg/statistics/region_collection_test.go b/pkg/statistics/region_collection_test.go index f0df9ce6e07..cbbf7672bee 100644 --- a/pkg/statistics/region_collection_test.go +++ b/pkg/statistics/region_collection_test.go @@ -15,6 +15,7 @@ package statistics import ( + "context" "testing" "github.com/pingcap/kvproto/pkg/metapb" @@ -29,7 +30,7 @@ import ( func TestRegionStatistics(t *testing.T) { re := require.New(t) store := storage.NewStorageWithMemoryBackend() - manager := placement.NewRuleManager(store, nil, nil) + manager := placement.NewRuleManager(context.Background(), store, nil, nil) err := manager.Initialize(3, []string{"zone", "rack", "host"}, "") re.NoError(err) opt := mockconfig.NewTestOptions() @@ -118,7 +119,7 @@ func TestRegionStatistics(t *testing.T) { func TestRegionStatisticsWithPlacementRule(t *testing.T) { re := require.New(t) store := storage.NewStorageWithMemoryBackend() - manager := placement.NewRuleManager(store, nil, nil) + manager := placement.NewRuleManager(context.Background(), store, nil, nil) err := manager.Initialize(3, []string{"zone", "rack", "host"}, "") re.NoError(err) opt := mockconfig.NewTestOptions() diff --git a/pkg/storage/endpoint/key_path.go b/pkg/storage/endpoint/key_path.go index cac40db29c5..69b8d0f2f8e 100644 --- a/pkg/storage/endpoint/key_path.go +++ b/pkg/storage/endpoint/key_path.go @@ -31,6 +31,7 @@ const ( serviceMiddlewarePath = "service_middleware" schedulePath = "schedule" gcPath = "gc" + ruleCommonPath = "rule" rulesPath = "rules" ruleGroupPath = "rule_group" regionLabelPath = "region_label" @@ -102,6 +103,11 @@ func RulesPathPrefix(clusterID uint64) string { return path.Join(PDRootPath(clusterID), rulesPath) } +// RuleCommonPathPrefix returns the path prefix to save the placement rule common config. +func RuleCommonPathPrefix(clusterID uint64) string { + return path.Join(PDRootPath(clusterID), ruleCommonPath) +} + // RuleGroupPathPrefix returns the path prefix to save the placement rule groups. func RuleGroupPathPrefix(clusterID uint64) string { return path.Join(PDRootPath(clusterID), ruleGroupPath) diff --git a/pkg/storage/endpoint/rule.go b/pkg/storage/endpoint/rule.go index 125c5bc31eb..c0eb4cea8fe 100644 --- a/pkg/storage/endpoint/rule.go +++ b/pkg/storage/endpoint/rule.go @@ -15,43 +15,44 @@ package endpoint import ( + "context" + "encoding/json" "strings" + "github.com/tikv/pd/pkg/storage/kv" "go.etcd.io/etcd/clientv3" ) // RuleStorage defines the storage operations on the rule. type RuleStorage interface { + // TODO: shall we support other interfaces about txn? LoadRule(ruleKey string) (string, error) LoadRules(f func(k, v string)) error - SaveRule(ruleKey string, rule interface{}) error - SaveRuleJSON(ruleKey, rule string) error - DeleteRule(ruleKey string) error + SaveRule(txn kv.Txn, ruleKey string, rule interface{}) error + DeleteRule(txn kv.Txn, ruleKey string) error LoadRuleGroups(f func(k, v string)) error - SaveRuleGroup(groupID string, group interface{}) error - SaveRuleGroupJSON(groupID, group string) error - DeleteRuleGroup(groupID string) error + SaveRuleGroup(txn kv.Txn, groupID string, group interface{}) error + DeleteRuleGroup(txn kv.Txn, groupID string) error LoadRegionRules(f func(k, v string)) error SaveRegionRule(ruleKey string, rule interface{}) error - SaveRegionRuleJSON(ruleKey, rule string) error DeleteRegionRule(ruleKey string) error + RunInTxn(ctx context.Context, f func(txn kv.Txn) error) error } var _ RuleStorage = (*StorageEndpoint)(nil) // SaveRule stores a rule cfg to the rulesPath. -func (se *StorageEndpoint) SaveRule(ruleKey string, rule interface{}) error { - return se.saveJSON(ruleKeyPath(ruleKey), rule) -} - -// SaveRuleJSON stores a rule cfg JSON to the rulesPath. -func (se *StorageEndpoint) SaveRuleJSON(ruleKey, rule string) error { - return se.Save(ruleKeyPath(ruleKey), rule) +func (se *StorageEndpoint) SaveRule(txn kv.Txn, ruleKey string, rule interface{}) error { + value, err := json.Marshal(rule) + if err != nil { + return err + } + return txn.Save(ruleKeyPath(ruleKey), string(value)) } // DeleteRule removes a rule from storage. -func (se *StorageEndpoint) DeleteRule(ruleKey string) error { - return se.Remove(ruleKeyPath(ruleKey)) +func (se *StorageEndpoint) DeleteRule(txn kv.Txn, ruleKey string) error { + return txn.Remove(ruleKeyPath(ruleKey)) } // LoadRuleGroups loads all rule groups from storage. @@ -60,18 +61,17 @@ func (se *StorageEndpoint) LoadRuleGroups(f func(k, v string)) error { } // SaveRuleGroup stores a rule group config to storage. -func (se *StorageEndpoint) SaveRuleGroup(groupID string, group interface{}) error { - return se.saveJSON(ruleGroupIDPath(groupID), group) -} - -// SaveRuleGroupJSON stores a rule group config JSON to storage. -func (se *StorageEndpoint) SaveRuleGroupJSON(groupID, group string) error { - return se.Save(ruleGroupIDPath(groupID), group) +func (se *StorageEndpoint) SaveRuleGroup(txn kv.Txn, groupID string, group interface{}) error { + value, err := json.Marshal(group) + if err != nil { + return err + } + return txn.Save(ruleGroupIDPath(groupID), string(value)) } // DeleteRuleGroup removes a rule group from storage. -func (se *StorageEndpoint) DeleteRuleGroup(groupID string) error { - return se.Remove(ruleGroupIDPath(groupID)) +func (se *StorageEndpoint) DeleteRuleGroup(txn kv.Txn, groupID string) error { + return txn.Remove(ruleGroupIDPath(groupID)) } // LoadRegionRules loads region rules from storage. @@ -84,11 +84,6 @@ func (se *StorageEndpoint) SaveRegionRule(ruleKey string, rule interface{}) erro return se.saveJSON(regionLabelKeyPath(ruleKey), rule) } -// SaveRegionRuleJSON saves a region rule JSON to the storage. -func (se *StorageEndpoint) SaveRegionRuleJSON(ruleKey, rule string) error { - return se.Save(regionLabelKeyPath(ruleKey), rule) -} - // DeleteRegionRule removes a region rule from storage. func (se *StorageEndpoint) DeleteRegionRule(ruleKey string) error { return se.Remove(regionLabelKeyPath(ruleKey)) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 58534de1642..eaf673190eb 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -514,9 +514,10 @@ func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error { kgm.etcdClient, "tso-nodes-watcher", kgm.tsoServiceKey, + func([]*clientv3.Event) error { return nil }, putFn, deleteFn, - func() error { return nil }, + func([]*clientv3.Event) error { return nil }, clientv3.WithRange(tsoServiceEndKey), ) kgm.tsoNodesWatcher.StartWatchLoop() @@ -558,7 +559,8 @@ func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error { kgm.deleteKeyspaceGroup(groupID) return nil } - postEventFn := func() error { + // TODO: Does it need to check num of events? + postEventFn := func([]*clientv3.Event) error { // Retry the groups that are not initialized successfully before. for id, group := range kgm.groupUpdateRetryList { delete(kgm.groupUpdateRetryList, id) @@ -572,6 +574,7 @@ func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error { kgm.etcdClient, "keyspace-watcher", startKey, + func([]*clientv3.Event) error { return nil }, putFn, deleteFn, postEventFn, diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 03c2374efc6..0e1b2731474 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -587,8 +587,10 @@ type LoopWatcher struct { putFn func(*mvccpb.KeyValue) error // deleteFn is used to handle the delete event. deleteFn func(*mvccpb.KeyValue) error - // postEventFn is used to call after handling all events. - postEventFn func() error + // postEventsFn is used to call after handling all events. + postEventsFn func([]*clientv3.Event) error + // preEventsFn is used to call before handling all events. + preEventsFn func([]*clientv3.Event) error // forceLoadMu is used to ensure two force loads have minimal interval. forceLoadMu syncutil.RWMutex @@ -613,7 +615,9 @@ func NewLoopWatcher( ctx context.Context, wg *sync.WaitGroup, client *clientv3.Client, name, key string, - putFn, deleteFn func(*mvccpb.KeyValue) error, postEventFn func() error, + preEventsFn func([]*clientv3.Event) error, + putFn, deleteFn func(*mvccpb.KeyValue) error, + postEventsFn func([]*clientv3.Event) error, opts ...clientv3.OpOption, ) *LoopWatcher { return &LoopWatcher{ @@ -627,7 +631,8 @@ func NewLoopWatcher( updateClientCh: make(chan *clientv3.Client, 1), putFn: putFn, deleteFn: deleteFn, - postEventFn: postEventFn, + postEventsFn: postEventsFn, + preEventsFn: preEventsFn, opts: opts, lastTimeForceLoad: time.Now(), loadTimeout: defaultLoadDataFromEtcdTimeout, @@ -813,28 +818,34 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision zap.Int64("revision", revision), zap.String("name", lw.name), zap.String("key", lw.key)) goto watchChanLoop } + if err := lw.preEventsFn(wresp.Events); err != nil { + log.Error("run pre event failed in watch loop", zap.Error(err), + zap.Int64("revision", revision), zap.String("name", lw.name), zap.String("key", lw.key)) + } for _, event := range wresp.Events { switch event.Type { case clientv3.EventTypePut: if err := lw.putFn(event.Kv); err != nil { log.Error("put failed in watch loop", zap.Error(err), - zap.Int64("revision", revision), zap.String("name", lw.name), zap.String("key", lw.key)) + zap.Int64("revision", revision), zap.String("name", lw.name), + zap.String("watch-key", lw.key), zap.ByteString("event-kv-key", event.Kv.Key)) } else { - log.Debug("put in watch loop", zap.String("name", lw.name), + log.Debug("put successfully in watch loop", zap.String("name", lw.name), zap.ByteString("key", event.Kv.Key), zap.ByteString("value", event.Kv.Value)) } case clientv3.EventTypeDelete: if err := lw.deleteFn(event.Kv); err != nil { log.Error("delete failed in watch loop", zap.Error(err), - zap.Int64("revision", revision), zap.String("name", lw.name), zap.String("key", lw.key)) + zap.Int64("revision", revision), zap.String("name", lw.name), + zap.String("watch-key", lw.key), zap.ByteString("event-kv-key", event.Kv.Key)) } else { - log.Debug("delete in watch loop", zap.String("name", lw.name), + log.Debug("delete successfully in watch loop", zap.String("name", lw.name), zap.ByteString("key", event.Kv.Key)) } } } - if err := lw.postEventFn(); err != nil { + if err := lw.postEventsFn(wresp.Events); err != nil { log.Error("run post event failed in watch loop", zap.Error(err), zap.Int64("revision", revision), zap.String("name", lw.name), zap.String("key", lw.key)) } @@ -864,6 +875,10 @@ func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error) zap.String("key", lw.key), zap.Error(err)) return 0, err } + if err := lw.preEventsFn([]*clientv3.Event{}); err != nil { + log.Error("run pre event failed in watch loop", zap.String("name", lw.name), + zap.String("key", lw.key), zap.Error(err)) + } for i, item := range resp.Kvs { if resp.More && i == len(resp.Kvs)-1 { // The last key is the start key of the next batch. @@ -878,7 +893,7 @@ func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error) } // Note: if there are no keys in etcd, the resp.More is false. It also means the load is finished. if !resp.More { - if err := lw.postEventFn(); err != nil { + if err := lw.postEventsFn([]*clientv3.Event{}); err != nil { log.Error("run post event failed in watch loop", zap.String("name", lw.name), zap.String("key", lw.key), zap.Error(err)) } diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index f7fadd3bbf6..861a57cef13 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -410,6 +410,7 @@ func (suite *loopWatcherTestSuite) TestLoadWithoutKey() { suite.client, "test", "TestLoadWithoutKey", + func([]*clientv3.Event) error { return nil }, func(kv *mvccpb.KeyValue) error { cache.Lock() defer cache.Unlock() @@ -417,7 +418,7 @@ func (suite *loopWatcherTestSuite) TestLoadWithoutKey() { return nil }, func(kv *mvccpb.KeyValue) error { return nil }, - func() error { return nil }, + func([]*clientv3.Event) error { return nil }, ) watcher.StartWatchLoop() err := watcher.WaitLoad() @@ -441,6 +442,7 @@ func (suite *loopWatcherTestSuite) TestCallBack() { suite.client, "test", "TestCallBack", + func([]*clientv3.Event) error { return nil }, func(kv *mvccpb.KeyValue) error { result = append(result, string(kv.Key)) return nil @@ -451,7 +453,7 @@ func (suite *loopWatcherTestSuite) TestCallBack() { delete(cache.data, string(kv.Key)) return nil }, - func() error { + func([]*clientv3.Event) error { cache.Lock() defer cache.Unlock() for _, r := range result { @@ -506,6 +508,7 @@ func (suite *loopWatcherTestSuite) TestWatcherLoadLimit() { suite.client, "test", "TestWatcherLoadLimit", + func([]*clientv3.Event) error { return nil }, func(kv *mvccpb.KeyValue) error { cache.Lock() defer cache.Unlock() @@ -515,7 +518,7 @@ func (suite *loopWatcherTestSuite) TestWatcherLoadLimit() { func(kv *mvccpb.KeyValue) error { return nil }, - func() error { + func([]*clientv3.Event) error { return nil }, clientv3.WithPrefix(), @@ -550,6 +553,7 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { suite.client, "test", "TestWatcherBreak", + func([]*clientv3.Event) error { return nil }, func(kv *mvccpb.KeyValue) error { if string(kv.Key) == "TestWatcherBreak" { cache.Lock() @@ -559,7 +563,7 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { return nil }, func(kv *mvccpb.KeyValue) error { return nil }, - func() error { return nil }, + func([]*clientv3.Event) error { return nil }, ) watcher.watchChangeRetryInterval = 100 * time.Millisecond watcher.StartWatchLoop() @@ -633,9 +637,10 @@ func (suite *loopWatcherTestSuite) TestWatcherRequestProgress() { suite.client, "test", "TestWatcherChanBlock", + func([]*clientv3.Event) error { return nil }, func(kv *mvccpb.KeyValue) error { return nil }, func(kv *mvccpb.KeyValue) error { return nil }, - func() error { return nil }, + func([]*clientv3.Event) error { return nil }, ) suite.wg.Add(1) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 78f6ddd4364..ecbd40e2582 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -269,7 +269,7 @@ func (c *RaftCluster) InitCluster( c.unsafeRecoveryController = unsaferecovery.NewController(c) c.keyspaceGroupManager = keyspaceGroupManager c.hbstreams = hbstreams - c.ruleManager = placement.NewRuleManager(c.storage, c, c.GetOpts()) + c.ruleManager = placement.NewRuleManager(c.ctx, c.storage, c, c.GetOpts()) if c.opt.IsPlacementRulesEnabled() { err := c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels(), c.opt.GetIsolationLevel()) if err != nil { diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 85edf911779..7094fd6b673 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -241,7 +241,7 @@ func TestSetOfflineStore(t *testing.T) { re.NoError(err) cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend()) cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) - cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) + cluster.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { @@ -438,7 +438,7 @@ func TestUpStore(t *testing.T) { re.NoError(err) cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend()) cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) - cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) + cluster.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { @@ -541,7 +541,7 @@ func TestDeleteStoreUpdatesClusterVersion(t *testing.T) { re.NoError(err) cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend()) cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) - cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) + cluster.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { @@ -1268,7 +1268,7 @@ func TestOfflineAndMerge(t *testing.T) { re.NoError(err) cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend()) cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) - cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) + cluster.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { @@ -2130,7 +2130,7 @@ func newTestRaftCluster( ) *RaftCluster { rc := &RaftCluster{serverCtx: ctx, core: core.NewBasicCluster(), storage: s} rc.InitCluster(id, opt, nil, nil) - rc.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), rc, opt) + rc.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), rc, opt) if opt.IsPlacementRulesEnabled() { err := rc.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { diff --git a/server/keyspace_service.go b/server/keyspace_service.go index b17239ba0a4..cd874a7636a 100644 --- a/server/keyspace_service.go +++ b/server/keyspace_service.go @@ -89,7 +89,8 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques deleteFn := func(kv *mvccpb.KeyValue) error { return nil } - postEventFn := func() error { + // TODO: does it need to check the num of events? + postEventFn := func([]*clientv3.Event) error { defer func() { keyspaces = keyspaces[:0] }() @@ -109,6 +110,7 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques s.client, "keyspace-server-watcher", startKey, + func([]*clientv3.Event) error { return nil }, putFn, deleteFn, postEventFn, diff --git a/server/server.go b/server/server.go index 187c30dbf7a..fcf71922a09 100644 --- a/server/server.go +++ b/server/server.go @@ -2017,9 +2017,10 @@ func (s *Server) initServicePrimaryWatcher(serviceName string, primaryKey string s.client, name, primaryKey, + func([]*clientv3.Event) error { return nil }, putFn, deleteFn, - func() error { return nil }, + func([]*clientv3.Event) error { return nil }, ) } diff --git a/tests/server/api/region_test.go b/tests/server/api/region_test.go index 450995a6e5e..452cdb63b39 100644 --- a/tests/server/api/region_test.go +++ b/tests/server/api/region_test.go @@ -24,11 +24,13 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/schedule/placement" tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/tests" + "go.uber.org/zap" ) type regionTestSuite struct { @@ -248,8 +250,7 @@ func (suite *regionTestSuite) checkScatterRegions(cluster *tests.TestCluster) { } func (suite *regionTestSuite) TestCheckRegionsReplicated() { - // Fixme: after delete+set rule, the key range will be empty, so the test will fail in api mode. - suite.env.RunTestInPDMode(suite.checkRegionsReplicated) + suite.env.RunTestInTwoModes(suite.checkRegionsReplicated) } func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster) { @@ -304,6 +305,14 @@ func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster) err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) suite.NoError(err) + tu.Eventually(re, func() bool { + respBundle := make([]placement.GroupBundle, 0) + err = tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil, + tu.StatusOK(re), tu.ExtractJSON(re, &respBundle)) + suite.NoError(err) + return len(respBundle) == 1 && respBundle[0].ID == "5" + }) + tu.Eventually(re, func() bool { err = tu.ReadGetJSON(re, testDialClient, url, &status) suite.NoError(err) @@ -328,9 +337,20 @@ func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster) err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) suite.NoError(err) - err = tu.ReadGetJSON(re, testDialClient, url, &status) - suite.NoError(err) - suite.Equal("REPLICATED", status) + tu.Eventually(re, func() bool { + respBundle := make([]placement.GroupBundle, 0) + err = tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil, + tu.StatusOK(re), tu.ExtractJSON(re, &respBundle)) + suite.NoError(err) + log.Info("respBundle", zap.Any("respBundle", respBundle)) + return len(respBundle) == 1 && len(respBundle[0].Rules) == 2 + }) + + tu.Eventually(re, func() bool { + err = tu.ReadGetJSON(re, testDialClient, url, &status) + suite.NoError(err) + return status == "REPLICATED" + }) // test multiple bundles bundle = append(bundle, placement.GroupBundle{ @@ -347,17 +367,34 @@ func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster) err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) suite.NoError(err) - err = tu.ReadGetJSON(re, testDialClient, url, &status) - suite.NoError(err) - suite.Equal("INPROGRESS", status) + tu.Eventually(re, func() bool { + respBundle := make([]placement.GroupBundle, 0) + err = tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil, + tu.StatusOK(re), tu.ExtractJSON(re, &respBundle)) + suite.NoError(err) + if len(respBundle) != 2 { + return false + } + s1 := respBundle[0].ID == "5" && respBundle[1].ID == "6" + s2 := respBundle[0].ID == "6" && respBundle[1].ID == "5" + return s1 || s2 + }) + + tu.Eventually(re, func() bool { + err = tu.ReadGetJSON(re, testDialClient, url, &status) + suite.NoError(err) + return status == "INPROGRESS" + }) r1 = core.NewTestRegionInfo(2, 1, []byte("a"), []byte("b")) r1.GetMeta().Peers = append(r1.GetMeta().Peers, &metapb.Peer{Id: 5, StoreId: 1}, &metapb.Peer{Id: 6, StoreId: 1}, &metapb.Peer{Id: 7, StoreId: 1}) tests.MustPutRegionInfo(re, cluster, r1) - err = tu.ReadGetJSON(re, testDialClient, url, &status) - suite.NoError(err) - suite.Equal("REPLICATED", status) + tu.Eventually(re, func() bool { + err = tu.ReadGetJSON(re, testDialClient, url, &status) + suite.NoError(err) + return status == "REPLICATED" + }) } func (suite *regionTestSuite) checkRegionCount(cluster *tests.TestCluster, count uint64) { diff --git a/tests/server/api/rule_test.go b/tests/server/api/rule_test.go index 0a0c3f2fb2e..de6fb99a8d0 100644 --- a/tests/server/api/rule_test.go +++ b/tests/server/api/rule_test.go @@ -20,17 +20,23 @@ import ( "fmt" "net/http" "net/url" + "sort" + "strconv" + "sync" "testing" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/utils/syncutil" tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" + "go.uber.org/zap" ) type ruleTestSuite struct { @@ -777,7 +783,7 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { err := tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 1) - suite.compareBundle(bundles[0], b1) + suite.assertBundleEqual(bundles[0], b1) // Set b2 := placement.GroupBundle{ @@ -797,14 +803,14 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { var bundle placement.GroupBundle err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule/foo", &bundle) suite.NoError(err) - suite.compareBundle(bundle, b2) + suite.assertBundleEqual(bundle, b2) // GetAll again err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 2) - suite.compareBundle(bundles[0], b1) - suite.compareBundle(bundles[1], b2) + suite.assertBundleEqual(bundles[0], b1) + suite.assertBundleEqual(bundles[1], b2) // Delete err = tu.CheckDelete(testDialClient, urlPrefix+"/placement-rule/pd", tu.StatusOK(suite.Require())) @@ -814,7 +820,7 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 1) - suite.compareBundle(bundles[0], b2) + suite.assertBundleEqual(bundles[0], b2) // SetAll b2.Rules = append(b2.Rules, &placement.Rule{GroupID: "foo", ID: "baz", Index: 2, Role: placement.Follower, Count: 1}) @@ -829,9 +835,9 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 3) - suite.compareBundle(bundles[0], b2) - suite.compareBundle(bundles[1], b1) - suite.compareBundle(bundles[2], b3) + suite.assertBundleEqual(bundles[0], b2) + suite.assertBundleEqual(bundles[1], b1) + suite.assertBundleEqual(bundles[2], b3) // Delete using regexp err = tu.CheckDelete(testDialClient, urlPrefix+"/placement-rule/"+url.PathEscape("foo.*")+"?regexp", tu.StatusOK(suite.Require())) @@ -841,7 +847,7 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 1) - suite.compareBundle(bundles[0], b1) + suite.assertBundleEqual(bundles[0], b1) // Set id := "rule-without-group-id" @@ -862,14 +868,14 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { // Get err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule/"+id, &bundle) suite.NoError(err) - suite.compareBundle(bundle, b4) + suite.assertBundleEqual(bundle, b4) // GetAll again err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 2) - suite.compareBundle(bundles[0], b1) - suite.compareBundle(bundles[1], b4) + suite.assertBundleEqual(bundles[0], b1) + suite.assertBundleEqual(bundles[1], b4) // SetAll b5 := placement.GroupBundle{ @@ -890,9 +896,9 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 3) - suite.compareBundle(bundles[0], b1) - suite.compareBundle(bundles[1], b4) - suite.compareBundle(bundles[2], b5) + suite.assertBundleEqual(bundles[0], b1) + suite.assertBundleEqual(bundles[1], b4) + suite.assertBundleEqual(bundles[2], b5) } func (suite *ruleTestSuite) TestBundleBadRequest() { @@ -925,20 +931,219 @@ func (suite *ruleTestSuite) checkBundleBadRequest(cluster *tests.TestCluster) { } } -func (suite *ruleTestSuite) compareBundle(b1, b2 placement.GroupBundle) { - tu.Eventually(suite.Require(), func() bool { - if b2.ID != b1.ID || b2.Index != b1.Index || b2.Override != b1.Override || len(b2.Rules) != len(b1.Rules) { - return false - } - for i := range b1.Rules { - if !suite.compareRule(b1.Rules[i], b2.Rules[i]) { +func (suite *ruleTestSuite) TestDeleteAndUpdate() { + suite.env.RunTestInTwoModes(suite.checkDeleteAndUpdate) +} + +func (suite *ruleTestSuite) checkDeleteAndUpdate(cluster *tests.TestCluster) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + pdAddr := leaderServer.GetAddr() + urlPrefix := fmt.Sprintf("%s%s/api/v1", pdAddr, apiPrefix) + + bundles := [][]placement.GroupBundle{ + // 1 rule group with 1 rule + {{ + ID: "1", + Index: 1, + Rules: []*placement.Rule{ + { + ID: "foo", Index: 1, Role: placement.Voter, Count: 1, GroupID: "1", + }, + }, + }}, + // 2 rule groups with different range rules + {{ + ID: "1", + Index: 1, + Rules: []*placement.Rule{ + { + ID: "foo", Index: 1, Role: placement.Voter, Count: 1, GroupID: "1", + StartKey: []byte("a"), EndKey: []byte("b"), + }, + }, + }, { + ID: "2", + Index: 2, + Rules: []*placement.Rule{ + { + ID: "foo", Index: 2, Role: placement.Voter, Count: 1, GroupID: "2", + StartKey: []byte("b"), EndKey: []byte("c"), + }, + }, + }}, + // 2 rule groups with 1 rule and 2 rules + {{ + ID: "3", + Index: 3, + Rules: []*placement.Rule{ + { + ID: "foo", Index: 3, Role: placement.Voter, Count: 1, GroupID: "3", + }, + }, + }, { + ID: "4", + Index: 4, + Rules: []*placement.Rule{ + { + ID: "foo", Index: 4, Role: placement.Voter, Count: 1, GroupID: "4", + }, + { + ID: "bar", Index: 6, Role: placement.Voter, Count: 1, GroupID: "4", + }, + }, + }}, + // 1 rule group with 2 rules + {{ + ID: "5", + Index: 5, + Rules: []*placement.Rule{ + { + ID: "foo", Index: 5, Role: placement.Voter, Count: 1, GroupID: "5", + }, + { + ID: "bar", Index: 6, Role: placement.Voter, Count: 1, GroupID: "5", + }, + }, + }}, + } + + for _, bundle := range bundles { + data, err := json.Marshal(bundle) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) + suite.NoError(err) + + tu.Eventually(re, func() bool { + respBundle := make([]placement.GroupBundle, 0) + err = tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil, + tu.StatusOK(re), tu.ExtractJSON(re, &respBundle)) + suite.NoError(err) + if len(respBundle) != len(bundle) { return false } - } - return true + log.Info("respBundle", zap.Any("respBundle", respBundle), zap.Any("bundle", bundle)) + sort.Slice(respBundle, func(i, j int) bool { return respBundle[i].ID < respBundle[j].ID }) + sort.Slice(bundle, func(i, j int) bool { return bundle[i].ID < bundle[j].ID }) + for i := range respBundle { + if !suite.compareBundle(respBundle[i], bundle[i]) { + return false + } + } + return true + }) + } +} + +func (suite *ruleTestSuite) TestConcurrency() { + suite.env.RunTestInTwoModes(suite.checkConcurrency) +} + +func (suite *ruleTestSuite) checkConcurrency(cluster *tests.TestCluster) { + // test concurrency of set rule group with different group id + suite.checkConcurrencyWith(cluster, + func(i int) []placement.GroupBundle { + return []placement.GroupBundle{ + { + ID: strconv.Itoa(i), + Index: i, + Rules: []*placement.Rule{ + { + ID: "foo", Index: i, Role: placement.Voter, Count: 1, GroupID: strconv.Itoa(i), + }, + }, + }, + } + }, + func(resp []placement.GroupBundle, i int) bool { + return len(resp) == 1 && resp[0].ID == strconv.Itoa(i) + }, + ) + // test concurrency of set rule with different id + suite.checkConcurrencyWith(cluster, + func(i int) []placement.GroupBundle { + return []placement.GroupBundle{ + { + ID: "pd", + Index: 1, + Rules: []*placement.Rule{ + { + ID: strconv.Itoa(i), Index: i, Role: placement.Voter, Count: 1, GroupID: "pd", + }, + }, + }, + } + }, + func(resp []placement.GroupBundle, i int) bool { + return len(resp) == 1 && resp[0].ID == "pd" && resp[0].Rules[0].ID == strconv.Itoa(i) + }, + ) +} + +func (suite *ruleTestSuite) checkConcurrencyWith(cluster *tests.TestCluster, + genBundle func(int) []placement.GroupBundle, + checkBundle func([]placement.GroupBundle, int) bool) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + pdAddr := leaderServer.GetAddr() + urlPrefix := fmt.Sprintf("%s%s/api/v1", pdAddr, apiPrefix) + expectResult := struct { + syncutil.RWMutex + val int + }{} + wg := sync.WaitGroup{} + + for i := 1; i <= 10; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + bundle := genBundle(i) + data, err := json.Marshal(bundle) + suite.NoError(err) + for j := 0; j < 10; j++ { + expectResult.Lock() + err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) + suite.NoError(err) + expectResult.val = i + expectResult.Unlock() + } + }(i) + } + + wg.Wait() + expectResult.RLock() + defer expectResult.RUnlock() + suite.NotZero(expectResult.val) + tu.Eventually(re, func() bool { + respBundle := make([]placement.GroupBundle, 0) + err := tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil, + tu.StatusOK(re), tu.ExtractJSON(re, &respBundle)) + suite.NoError(err) + suite.Len(respBundle, 1) + return checkBundle(respBundle, expectResult.val) + }) +} + +func (suite *ruleTestSuite) assertBundleEqual(b1, b2 placement.GroupBundle) { + tu.Eventually(suite.Require(), func() bool { + return suite.compareBundle(b1, b2) }) } +func (suite *ruleTestSuite) compareBundle(b1, b2 placement.GroupBundle) bool { + if b2.ID != b1.ID || b2.Index != b1.Index || b2.Override != b1.Override || len(b2.Rules) != len(b1.Rules) { + return false + } + sort.Slice(b1.Rules, func(i, j int) bool { return b1.Rules[i].ID < b1.Rules[j].ID }) + sort.Slice(b2.Rules, func(i, j int) bool { return b2.Rules[i].ID < b2.Rules[j].ID }) + for i := range b1.Rules { + if !suite.compareRule(b1.Rules[i], b2.Rules[i]) { + return false + } + } + return true +} + func (suite *ruleTestSuite) compareRule(r1 *placement.Rule, r2 *placement.Rule) bool { return r2.GroupID == r1.GroupID && r2.ID == r1.ID && From 2b1c62c1b04f5d1cd01bf57de4eaae9ce1bf2b04 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 15 Dec 2023 14:12:46 +0800 Subject: [PATCH 2/7] refactor endpoint Signed-off-by: lhy1024 --- pkg/mcs/scheduling/server/rule/watcher.go | 4 +- pkg/schedule/placement/rule_manager.go | 73 +++++++++++----------- pkg/storage/endpoint/config.go | 8 +-- pkg/storage/endpoint/gc_safe_point.go | 8 +-- pkg/storage/endpoint/keyspace.go | 5 -- pkg/storage/endpoint/replication_status.go | 6 +- pkg/storage/endpoint/rule.go | 51 ++++----------- pkg/storage/endpoint/safepoint_v2.go | 13 +--- pkg/storage/endpoint/service_middleware.go | 6 +- pkg/storage/endpoint/tso_keyspace_group.go | 7 +-- pkg/storage/endpoint/util.go | 32 +++++++++- pkg/tso/keyspace_group_manager.go | 1 - server/keyspace_service.go | 1 - tests/server/api/rule_test.go | 3 - 14 files changed, 90 insertions(+), 128 deletions(-) diff --git a/pkg/mcs/scheduling/server/rule/watcher.go b/pkg/mcs/scheduling/server/rule/watcher.go index fd324a98f1b..52999eeaea8 100644 --- a/pkg/mcs/scheduling/server/rule/watcher.go +++ b/pkg/mcs/scheduling/server/rule/watcher.go @@ -111,7 +111,7 @@ func (rw *Watcher) initializeRuleWatcher() error { preFn := func(events []*clientv3.Event) error { suspectKeyRanges = &core.KeyRanges{} - if len(events) != 0 { + if len(events) > 0 { rw.ruleManager.Lock() rw.patch = rw.ruleManager.BeginPatch() } @@ -220,7 +220,7 @@ func (rw *Watcher) initializeRuleWatcher() error { } } postFn := func(events []*clientv3.Event) error { - if len(events) > 0 { + if len(events) > 0 && rw.patch != nil { if err := rw.ruleManager.TryCommitPatch(rw.patch); err != nil { return err } diff --git a/pkg/schedule/placement/rule_manager.go b/pkg/schedule/placement/rule_manager.go index 7acf7919570..ea85911462b 100644 --- a/pkg/schedule/placement/rule_manager.go +++ b/pkg/schedule/placement/rule_manager.go @@ -157,36 +157,37 @@ func (m *RuleManager) loadRules() error { toSave []*Rule toDelete []string ) - err := m.storage.LoadRules(func(k, v string) { - r, err := NewRuleFromJSON([]byte(v)) - if err != nil { - log.Error("failed to unmarshal rule value", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule)) - toDelete = append(toDelete, k) - return - } - err = m.AdjustRule(r, "") + return m.storage.RunInTxn(m.ctx, func(txn kv.Txn) (err error) { + err = m.storage.LoadRules(txn, func(k, v string) { + r, err := NewRuleFromJSON([]byte(v)) + if err != nil { + log.Error("failed to unmarshal rule value", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule)) + toDelete = append(toDelete, k) + return + } + err = m.AdjustRule(r, "") + if err != nil { + log.Error("rule is in bad format", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule, err)) + toDelete = append(toDelete, k) + return + } + _, ok := m.ruleConfig.rules[r.Key()] + if ok { + log.Error("duplicated rule key", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule)) + toDelete = append(toDelete, k) + return + } + if k != r.StoreKey() { + log.Error("mismatch data key, need to restore", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule)) + toDelete = append(toDelete, k) + toSave = append(toSave, r) + } + m.ruleConfig.rules[r.Key()] = r + }) if err != nil { - log.Error("rule is in bad format", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule, err)) - toDelete = append(toDelete, k) - return - } - _, ok := m.ruleConfig.rules[r.Key()] - if ok { - log.Error("duplicated rule key", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule)) - toDelete = append(toDelete, k) - return - } - if k != r.StoreKey() { - log.Error("mismatch data key, need to restore", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule)) - toDelete = append(toDelete, k) - toSave = append(toSave, r) + return err } - m.ruleConfig.rules[r.Key()] = r - }) - if err != nil { - return err - } - return m.storage.RunInTxn(m.ctx, func(txn kv.Txn) (err error) { + for _, s := range toSave { if err = m.storage.SaveRule(txn, s.StoreKey(), s); err != nil { return err @@ -202,13 +203,15 @@ func (m *RuleManager) loadRules() error { } func (m *RuleManager) loadGroups() error { - return m.storage.LoadRuleGroups(func(k, v string) { - g, err := NewRuleGroupFromJSON([]byte(v)) - if err != nil { - log.Error("failed to unmarshal rule group", zap.String("group-id", k), errs.ZapError(errs.ErrLoadRuleGroup, err)) - return - } - m.ruleConfig.groups[g.ID] = g + return m.storage.RunInTxn(m.ctx, func(txn kv.Txn) (err error) { + return m.storage.LoadRuleGroups(txn, func(k, v string) { + g, err := NewRuleGroupFromJSON([]byte(v)) + if err != nil { + log.Error("failed to unmarshal rule group", zap.String("group-id", k), errs.ZapError(errs.ErrLoadRuleGroup, err)) + return + } + m.ruleConfig.groups[g.ID] = g + }) }) } diff --git a/pkg/storage/endpoint/config.go b/pkg/storage/endpoint/config.go index db5565a4b90..edfdcbca9a3 100644 --- a/pkg/storage/endpoint/config.go +++ b/pkg/storage/endpoint/config.go @@ -51,17 +51,13 @@ func (se *StorageEndpoint) LoadConfig(cfg interface{}) (bool, error) { // SaveConfig stores marshallable cfg to the configPath. func (se *StorageEndpoint) SaveConfig(cfg interface{}) error { - value, err := json.Marshal(cfg) - if err != nil { - return errs.ErrJSONMarshal.Wrap(err).GenWithStackByCause() - } - return se.Save(configPath, string(value)) + return se.saveJSON(configPath, cfg) } // LoadAllSchedulerConfigs loads all schedulers' config. func (se *StorageEndpoint) LoadAllSchedulerConfigs() ([]string, []string, error) { prefix := customSchedulerConfigPath + "/" - keys, values, err := se.LoadRange(prefix, clientv3.GetPrefixRangeEnd(prefix), 1000) + keys, values, err := se.LoadRange(prefix, clientv3.GetPrefixRangeEnd(prefix), MinKVRangeLimit) for i, key := range keys { keys[i] = strings.TrimPrefix(key, prefix) } diff --git a/pkg/storage/endpoint/gc_safe_point.go b/pkg/storage/endpoint/gc_safe_point.go index db5c58205c8..c2f09980651 100644 --- a/pkg/storage/endpoint/gc_safe_point.go +++ b/pkg/storage/endpoint/gc_safe_point.go @@ -169,13 +169,7 @@ func (se *StorageEndpoint) SaveServiceGCSafePoint(ssp *ServiceSafePoint) error { return errors.New("TTL of gc_worker's service safe point must be infinity") } - key := gcSafePointServicePath(ssp.ServiceID) - value, err := json.Marshal(ssp) - if err != nil { - return err - } - - return se.Save(key, string(value)) + return se.saveJSON(gcSafePointServicePath(ssp.ServiceID), ssp) } // RemoveServiceGCSafePoint removes a GC safepoint for the service diff --git a/pkg/storage/endpoint/keyspace.go b/pkg/storage/endpoint/keyspace.go index 09733ad59c1..77c81b2c8d6 100644 --- a/pkg/storage/endpoint/keyspace.go +++ b/pkg/storage/endpoint/keyspace.go @@ -97,11 +97,6 @@ func (se *StorageEndpoint) LoadKeyspaceID(txn kv.Txn, name string) (bool, uint32 return true, uint32(id64), nil } -// RunInTxn runs the given function in a transaction. -func (se *StorageEndpoint) RunInTxn(ctx context.Context, f func(txn kv.Txn) error) error { - return se.Base.RunInTxn(ctx, f) -} - // LoadRangeKeyspace loads keyspaces starting at startID. // limit specifies the limit of loaded keyspaces. func (se *StorageEndpoint) LoadRangeKeyspace(txn kv.Txn, startID uint32, limit int) ([]*keyspacepb.KeyspaceMeta, error) { diff --git a/pkg/storage/endpoint/replication_status.go b/pkg/storage/endpoint/replication_status.go index 4bac51071bc..0a14770ff47 100644 --- a/pkg/storage/endpoint/replication_status.go +++ b/pkg/storage/endpoint/replication_status.go @@ -43,9 +43,5 @@ func (se *StorageEndpoint) LoadReplicationStatus(mode string, status interface{} // SaveReplicationStatus stores replication status by mode. func (se *StorageEndpoint) SaveReplicationStatus(mode string, status interface{}) error { - value, err := json.Marshal(status) - if err != nil { - return errs.ErrJSONMarshal.Wrap(err).GenWithStackByArgs() - } - return se.Save(replicationModePath(mode), string(value)) + return se.saveJSON(replicationModePath(mode), status) } diff --git a/pkg/storage/endpoint/rule.go b/pkg/storage/endpoint/rule.go index c0eb4cea8fe..b18360040ea 100644 --- a/pkg/storage/endpoint/rule.go +++ b/pkg/storage/endpoint/rule.go @@ -16,23 +16,21 @@ package endpoint import ( "context" - "encoding/json" - "strings" "github.com/tikv/pd/pkg/storage/kv" - "go.etcd.io/etcd/clientv3" ) // RuleStorage defines the storage operations on the rule. type RuleStorage interface { - // TODO: shall we support other interfaces about txn? - LoadRule(ruleKey string) (string, error) - LoadRules(f func(k, v string)) error + LoadRules(txn kv.Txn, f func(k, v string)) error SaveRule(txn kv.Txn, ruleKey string, rule interface{}) error DeleteRule(txn kv.Txn, ruleKey string) error - LoadRuleGroups(f func(k, v string)) error + LoadRuleGroups(txn kv.Txn, f func(k, v string)) error SaveRuleGroup(txn kv.Txn, groupID string, group interface{}) error DeleteRuleGroup(txn kv.Txn, groupID string) error + // LoadRule is used only in rule watcher. + LoadRule(ruleKey string) (string, error) + LoadRegionRules(f func(k, v string)) error SaveRegionRule(ruleKey string, rule interface{}) error DeleteRegionRule(ruleKey string) error @@ -43,11 +41,7 @@ var _ RuleStorage = (*StorageEndpoint)(nil) // SaveRule stores a rule cfg to the rulesPath. func (se *StorageEndpoint) SaveRule(txn kv.Txn, ruleKey string, rule interface{}) error { - value, err := json.Marshal(rule) - if err != nil { - return err - } - return txn.Save(ruleKeyPath(ruleKey), string(value)) + return saveJSONInTxn(txn, ruleKeyPath(ruleKey), rule) } // DeleteRule removes a rule from storage. @@ -56,17 +50,13 @@ func (se *StorageEndpoint) DeleteRule(txn kv.Txn, ruleKey string) error { } // LoadRuleGroups loads all rule groups from storage. -func (se *StorageEndpoint) LoadRuleGroups(f func(k, v string)) error { - return se.loadRangeByPrefix(ruleGroupPath+"/", f) +func (se *StorageEndpoint) LoadRuleGroups(txn kv.Txn, f func(k, v string)) error { + return loadRangeByPrefixInTxn(txn, ruleGroupPath+"/", f) } // SaveRuleGroup stores a rule group config to storage. func (se *StorageEndpoint) SaveRuleGroup(txn kv.Txn, groupID string, group interface{}) error { - value, err := json.Marshal(group) - if err != nil { - return err - } - return txn.Save(ruleGroupIDPath(groupID), string(value)) + return saveJSONInTxn(txn, ruleGroupIDPath(groupID), group) } // DeleteRuleGroup removes a rule group from storage. @@ -95,25 +85,6 @@ func (se *StorageEndpoint) LoadRule(ruleKey string) (string, error) { } // LoadRules loads placement rules from storage. -func (se *StorageEndpoint) LoadRules(f func(k, v string)) error { - return se.loadRangeByPrefix(rulesPath+"/", f) -} - -// loadRangeByPrefix iterates all key-value pairs in the storage that has the prefix. -func (se *StorageEndpoint) loadRangeByPrefix(prefix string, f func(k, v string)) error { - nextKey := prefix - endKey := clientv3.GetPrefixRangeEnd(prefix) - for { - keys, values, err := se.LoadRange(nextKey, endKey, MinKVRangeLimit) - if err != nil { - return err - } - for i := range keys { - f(strings.TrimPrefix(keys[i], prefix), values[i]) - } - if len(keys) < MinKVRangeLimit { - return nil - } - nextKey = keys[len(keys)-1] + "\x00" - } +func (se *StorageEndpoint) LoadRules(txn kv.Txn, f func(k, v string)) error { + return loadRangeByPrefixInTxn(txn, rulesPath+"/", f) } diff --git a/pkg/storage/endpoint/safepoint_v2.go b/pkg/storage/endpoint/safepoint_v2.go index cac2606a470..8d690d07261 100644 --- a/pkg/storage/endpoint/safepoint_v2.go +++ b/pkg/storage/endpoint/safepoint_v2.go @@ -79,12 +79,7 @@ func (se *StorageEndpoint) LoadGCSafePointV2(keyspaceID uint32) (*GCSafePointV2, // SaveGCSafePointV2 saves gc safe point for the given keyspace. func (se *StorageEndpoint) SaveGCSafePointV2(gcSafePoint *GCSafePointV2) error { - key := GCSafePointV2Path(gcSafePoint.KeyspaceID) - value, err := json.Marshal(gcSafePoint) - if err != nil { - return errs.ErrJSONMarshal.Wrap(err).GenWithStackByCause() - } - return se.Save(key, string(value)) + return se.saveJSON(GCSafePointV2Path(gcSafePoint.KeyspaceID), gcSafePoint) } // LoadAllGCSafePoints returns gc safe point for all keyspaces @@ -203,11 +198,7 @@ func (se *StorageEndpoint) SaveServiceSafePointV2(serviceSafePoint *ServiceSafeP } key := ServiceSafePointV2Path(serviceSafePoint.KeyspaceID, serviceSafePoint.ServiceID) - value, err := json.Marshal(serviceSafePoint) - if err != nil { - return errs.ErrJSONMarshal.Wrap(err).GenWithStackByCause() - } - return se.Save(key, string(value)) + return se.saveJSON(key, serviceSafePoint) } // RemoveServiceSafePointV2 removes a service safe point. diff --git a/pkg/storage/endpoint/service_middleware.go b/pkg/storage/endpoint/service_middleware.go index 62cf91c97bf..2becbf3686e 100644 --- a/pkg/storage/endpoint/service_middleware.go +++ b/pkg/storage/endpoint/service_middleware.go @@ -43,9 +43,5 @@ func (se *StorageEndpoint) LoadServiceMiddlewareConfig(cfg interface{}) (bool, e // SaveServiceMiddlewareConfig stores marshallable cfg to the serviceMiddlewarePath. func (se *StorageEndpoint) SaveServiceMiddlewareConfig(cfg interface{}) error { - value, err := json.Marshal(cfg) - if err != nil { - return errs.ErrJSONMarshal.Wrap(err).GenWithStackByCause() - } - return se.Save(serviceMiddlewarePath, string(value)) + return se.saveJSON(serviceMiddlewarePath, cfg) } diff --git a/pkg/storage/endpoint/tso_keyspace_group.go b/pkg/storage/endpoint/tso_keyspace_group.go index 498cd878887..39a08afe937 100644 --- a/pkg/storage/endpoint/tso_keyspace_group.go +++ b/pkg/storage/endpoint/tso_keyspace_group.go @@ -177,12 +177,7 @@ func (se *StorageEndpoint) LoadKeyspaceGroup(txn kv.Txn, id uint32) (*KeyspaceGr // SaveKeyspaceGroup saves the keyspace group. func (se *StorageEndpoint) SaveKeyspaceGroup(txn kv.Txn, kg *KeyspaceGroup) error { - key := KeyspaceGroupIDPath(kg.ID) - value, err := json.Marshal(kg) - if err != nil { - return err - } - return txn.Save(key, string(value)) + return saveJSONInTxn(txn, KeyspaceGroupIDPath(kg.ID), kg) } // DeleteKeyspaceGroup deletes the keyspace group. diff --git a/pkg/storage/endpoint/util.go b/pkg/storage/endpoint/util.go index 37f98a55709..3058c059628 100644 --- a/pkg/storage/endpoint/util.go +++ b/pkg/storage/endpoint/util.go @@ -16,9 +16,12 @@ package endpoint import ( "encoding/json" + "strings" "github.com/gogo/protobuf/proto" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/storage/kv" + "go.etcd.io/etcd/clientv3" ) func (se *StorageEndpoint) loadProto(key string, msg proto.Message) (bool, error) { @@ -42,9 +45,36 @@ func (se *StorageEndpoint) saveProto(key string, msg proto.Message) error { } func (se *StorageEndpoint) saveJSON(key string, data interface{}) error { + return saveJSONInTxn(se /* use the same interface */, key, data) +} + +func saveJSONInTxn(txn kv.Txn, key string, data interface{}) error { value, err := json.Marshal(data) if err != nil { return errs.ErrJSONMarshal.Wrap(err).GenWithStackByArgs() } - return se.Save(key, string(value)) + return txn.Save(key, string(value)) +} + +// loadRangeByPrefix iterates all key-value pairs in the storage that has the prefix. +func (se *StorageEndpoint) loadRangeByPrefix(prefix string, f func(k, v string)) error { + return loadRangeByPrefixInTxn(se /* use the same interface */, prefix, f) +} + +func loadRangeByPrefixInTxn(txn kv.Txn, prefix string, f func(k, v string)) error { + nextKey := prefix + endKey := clientv3.GetPrefixRangeEnd(prefix) + for { + keys, values, err := txn.LoadRange(nextKey, endKey, MinKVRangeLimit) + if err != nil { + return err + } + for i := range keys { + f(strings.TrimPrefix(keys[i], prefix), values[i]) + } + if len(keys) < MinKVRangeLimit { + return nil + } + nextKey = keys[len(keys)-1] + "\x00" + } } diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index eaf673190eb..0e69986f255 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -559,7 +559,6 @@ func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error { kgm.deleteKeyspaceGroup(groupID) return nil } - // TODO: Does it need to check num of events? postEventFn := func([]*clientv3.Event) error { // Retry the groups that are not initialized successfully before. for id, group := range kgm.groupUpdateRetryList { diff --git a/server/keyspace_service.go b/server/keyspace_service.go index cd874a7636a..1718108d73b 100644 --- a/server/keyspace_service.go +++ b/server/keyspace_service.go @@ -89,7 +89,6 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques deleteFn := func(kv *mvccpb.KeyValue) error { return nil } - // TODO: does it need to check the num of events? postEventFn := func([]*clientv3.Event) error { defer func() { keyspaces = keyspaces[:0] diff --git a/tests/server/api/rule_test.go b/tests/server/api/rule_test.go index de6fb99a8d0..46920d8d3ac 100644 --- a/tests/server/api/rule_test.go +++ b/tests/server/api/rule_test.go @@ -26,7 +26,6 @@ import ( "testing" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/log" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" @@ -36,7 +35,6 @@ import ( tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" - "go.uber.org/zap" ) type ruleTestSuite struct { @@ -1022,7 +1020,6 @@ func (suite *ruleTestSuite) checkDeleteAndUpdate(cluster *tests.TestCluster) { if len(respBundle) != len(bundle) { return false } - log.Info("respBundle", zap.Any("respBundle", respBundle), zap.Any("bundle", bundle)) sort.Slice(respBundle, func(i, j int) bool { return respBundle[i].ID < respBundle[j].ID }) sort.Slice(bundle, func(i, j int) bool { return bundle[i].ID < bundle[j].ID }) for i := range respBundle { From 1bd2f1dd103d66f76d1c73371a3dbcc74825418e Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 19 Dec 2023 11:59:38 +0800 Subject: [PATCH 3/7] address comments and remove nopatch Signed-off-by: lhy1024 --- pkg/mcs/scheduling/server/rule/watcher.go | 107 ++++++---------------- tests/server/api/region_test.go | 3 - 2 files changed, 29 insertions(+), 81 deletions(-) diff --git a/pkg/mcs/scheduling/server/rule/watcher.go b/pkg/mcs/scheduling/server/rule/watcher.go index 52999eeaea8..a36ba221e88 100644 --- a/pkg/mcs/scheduling/server/rule/watcher.go +++ b/pkg/mcs/scheduling/server/rule/watcher.go @@ -111,10 +111,8 @@ func (rw *Watcher) initializeRuleWatcher() error { preFn := func(events []*clientv3.Event) error { suspectKeyRanges = &core.KeyRanges{} - if len(events) > 0 { - rw.ruleManager.Lock() - rw.patch = rw.ruleManager.BeginPatch() - } + rw.ruleManager.Lock() + rw.patch = rw.ruleManager.BeginPatch() return nil } @@ -126,46 +124,30 @@ func (rw *Watcher) initializeRuleWatcher() error { if err != nil { return err } - // Try to add the rule to the patch or directly update the rule manager. - err = func() error { - if rw.patch == nil { - return rw.ruleManager.SetRule(rule) - } - if err := rw.ruleManager.AdjustRule(rule, ""); err != nil { - return err - } - rw.patch.SetRule(rule) - return nil - }() - // Update the suspect key ranges - if err == nil { - suspectKeyRanges.Append(rule.StartKey, rule.EndKey) - if oldRule := rw.getRule(rule.GroupID, rule.ID); oldRule != nil { - suspectKeyRanges.Append(oldRule.StartKey, oldRule.EndKey) - } + // Try to add the rule change to the patch. + if err := rw.ruleManager.AdjustRule(rule, ""); err != nil { + return err } - return err + rw.patch.SetRule(rule) + // Update the suspect key ranges in lock. + suspectKeyRanges.Append(rule.StartKey, rule.EndKey) + if oldRule := rw.ruleManager.GetRuleLocked(rule.GroupID, rule.ID); oldRule != nil { + suspectKeyRanges.Append(oldRule.StartKey, oldRule.EndKey) + } + return nil } else if strings.HasPrefix(key, rw.ruleGroupPathPrefix) { log.Info("update placement rule group", zap.String("key", key), zap.String("value", string(kv.Value))) ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value) if err != nil { return err } - // Try to add the rule to the patch or directly update the rule manager. - err = func() error { - if rw.patch == nil { - return rw.ruleManager.SetRuleGroup(ruleGroup) - } - rw.patch.SetGroup(ruleGroup) - return nil - }() + // Try to add the rule group change to the patch. + rw.patch.SetGroup(ruleGroup) // Update the suspect key ranges - if err == nil { - for _, rule := range rw.getRulesByGroup(ruleGroup.ID) { - suspectKeyRanges.Append(rule.StartKey, rule.EndKey) - } + for _, rule := range rw.ruleManager.GetRulesByGroupLocked(ruleGroup.ID) { + suspectKeyRanges.Append(rule.StartKey, rule.EndKey) } - return err + return nil } else { log.Warn("unknown key when update placement rule", zap.String("key", key)) return nil @@ -183,48 +165,31 @@ func (rw *Watcher) initializeRuleWatcher() error { if err != nil { return err } - // Try to add the rule to the patch or directly update the rule manager. - err = func() error { - if rw.patch == nil { - return rw.ruleManager.DeleteRule(rule.GroupID, rule.ID) - } - rw.patch.DeleteRule(rule.GroupID, rule.ID) - return nil - }() + // Try to add the rule change to the patch. + rw.patch.DeleteRule(rule.GroupID, rule.ID) // Update the suspect key ranges - if err == nil { - suspectKeyRanges.Append(rule.StartKey, rule.EndKey) - } + suspectKeyRanges.Append(rule.StartKey, rule.EndKey) return err } else if strings.HasPrefix(key, rw.ruleGroupPathPrefix) { log.Info("delete placement rule group", zap.String("key", key)) trimmedKey := strings.TrimPrefix(key, rw.ruleGroupPathPrefix+"/") - // Try to add the rule to the patch or directly update the rule manager. - err := func() error { - if rw.patch == nil { - return rw.ruleManager.DeleteRuleGroup(trimmedKey) - } - rw.patch.DeleteGroup(trimmedKey) - return nil - }() + // Try to add the rule group change to the patch. + rw.patch.DeleteGroup(trimmedKey) // Update the suspect key ranges - if err == nil { - for _, rule := range rw.getRulesByGroup(trimmedKey) { - suspectKeyRanges.Append(rule.StartKey, rule.EndKey) - } + for _, rule := range rw.ruleManager.GetRulesByGroupLocked(trimmedKey) { + suspectKeyRanges.Append(rule.StartKey, rule.EndKey) } - return err + return nil } else { log.Warn("unknown key when delete placement rule", zap.String("key", key)) return nil } } postFn := func(events []*clientv3.Event) error { - if len(events) > 0 && rw.patch != nil { - if err := rw.ruleManager.TryCommitPatch(rw.patch); err != nil { - return err - } - rw.ruleManager.Unlock() + defer rw.ruleManager.Unlock() + if err := rw.ruleManager.TryCommitPatch(rw.patch); err != nil { + log.Error("failed to commit patch", zap.Error(err)) + return err } for _, kr := range suspectKeyRanges.Ranges() { rw.checkerController.AddSuspectKeyRange(kr.StartKey, kr.EndKey) @@ -278,17 +243,3 @@ func (rw *Watcher) Close() { rw.cancel() rw.wg.Wait() } - -func (rw *Watcher) getRule(groupID, ruleID string) *placement.Rule { - if rw.patch != nil { // patch is not nil means there are locked. - return rw.ruleManager.GetRuleLocked(groupID, ruleID) - } - return rw.ruleManager.GetRule(groupID, ruleID) -} - -func (rw *Watcher) getRulesByGroup(groupID string) []*placement.Rule { - if rw.patch != nil { // patch is not nil means there are locked. - return rw.ruleManager.GetRulesByGroupLocked(groupID) - } - return rw.ruleManager.GetRulesByGroup(groupID) -} diff --git a/tests/server/api/region_test.go b/tests/server/api/region_test.go index 452cdb63b39..328c0fcd885 100644 --- a/tests/server/api/region_test.go +++ b/tests/server/api/region_test.go @@ -24,13 +24,11 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/log" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/schedule/placement" tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/tests" - "go.uber.org/zap" ) type regionTestSuite struct { @@ -342,7 +340,6 @@ func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster) err = tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil, tu.StatusOK(re), tu.ExtractJSON(re, &respBundle)) suite.NoError(err) - log.Info("respBundle", zap.Any("respBundle", respBundle)) return len(respBundle) == 1 && len(respBundle[0].Rules) == 2 }) From 1e778c51cffab4c36339038ca064a9aeb7f7384c Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 19 Dec 2023 12:20:15 +0800 Subject: [PATCH 4/7] address comments and move post function Signed-off-by: lhy1024 --- pkg/mcs/scheduling/server/meta/watcher.go | 3 +-- pkg/mcs/scheduling/server/rule/watcher.go | 10 +++++----- pkg/utils/etcdutil/etcdutil.go | 18 ++++++++++-------- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/pkg/mcs/scheduling/server/meta/watcher.go b/pkg/mcs/scheduling/server/meta/watcher.go index 8bbcb8d070d..808e8fc565e 100644 --- a/pkg/mcs/scheduling/server/meta/watcher.go +++ b/pkg/mcs/scheduling/server/meta/watcher.go @@ -73,10 +73,9 @@ func NewWatcher( func (w *Watcher) initializeStoreWatcher() error { putFn := func(kv *mvccpb.KeyValue) error { store := &metapb.Store{} - key := string(kv.Key) if err := proto.Unmarshal(kv.Value, store); err != nil { log.Warn("failed to unmarshal store entry", - zap.String("event-kv-key", key), zap.Error(err)) + zap.String("event-kv-key", string(kv.Key)), zap.Error(err)) return err } origin := w.basicCluster.GetStore(store.GetId()) diff --git a/pkg/mcs/scheduling/server/rule/watcher.go b/pkg/mcs/scheduling/server/rule/watcher.go index a36ba221e88..9432c8cf3e4 100644 --- a/pkg/mcs/scheduling/server/rule/watcher.go +++ b/pkg/mcs/scheduling/server/rule/watcher.go @@ -110,9 +110,10 @@ func (rw *Watcher) initializeRuleWatcher() error { var suspectKeyRanges *core.KeyRanges preFn := func(events []*clientv3.Event) error { - suspectKeyRanges = &core.KeyRanges{} + // It will be locked until the postFn is finished. rw.ruleManager.Lock() rw.patch = rw.ruleManager.BeginPatch() + suspectKeyRanges = &core.KeyRanges{} return nil } @@ -149,7 +150,7 @@ func (rw *Watcher) initializeRuleWatcher() error { } return nil } else { - log.Warn("unknown key when update placement rule", zap.String("key", key)) + log.Warn("unknown key when updating placement rule", zap.String("key", key)) return nil } } @@ -181,7 +182,7 @@ func (rw *Watcher) initializeRuleWatcher() error { } return nil } else { - log.Warn("unknown key when delete placement rule", zap.String("key", key)) + log.Warn("unknown key when deleting placement rule", zap.String("key", key)) return nil } } @@ -212,8 +213,7 @@ func (rw *Watcher) initializeRuleWatcher() error { func (rw *Watcher) initializeRegionLabelWatcher() error { prefixToTrim := rw.regionLabelPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { - key := string(kv.Key) - log.Info("update region label rule", zap.String("key", key), zap.String("value", string(kv.Value))) + log.Info("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) rule, err := labeler.NewLabelRuleFromJSON(kv.Value) if err != nil { return err diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 0e1b2731474..796ec5168a9 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -865,6 +865,16 @@ func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error) if limit != 0 { limit++ } + if err := lw.preEventsFn([]*clientv3.Event{}); err != nil { + log.Error("run pre event failed in watch loop", zap.String("name", lw.name), + zap.String("key", lw.key), zap.Error(err)) + } + defer func() { + if err := lw.postEventsFn([]*clientv3.Event{}); err != nil { + log.Error("run post event failed in watch loop", zap.String("name", lw.name), + zap.String("key", lw.key), zap.Error(err)) + } + }() for { // Sort by key to get the next key and we don't need to worry about the performance, // Because the default sort is just SortByKey and SortAscend @@ -875,10 +885,6 @@ func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error) zap.String("key", lw.key), zap.Error(err)) return 0, err } - if err := lw.preEventsFn([]*clientv3.Event{}); err != nil { - log.Error("run pre event failed in watch loop", zap.String("name", lw.name), - zap.String("key", lw.key), zap.Error(err)) - } for i, item := range resp.Kvs { if resp.More && i == len(resp.Kvs)-1 { // The last key is the start key of the next batch. @@ -893,10 +899,6 @@ func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error) } // Note: if there are no keys in etcd, the resp.More is false. It also means the load is finished. if !resp.More { - if err := lw.postEventsFn([]*clientv3.Event{}); err != nil { - log.Error("run post event failed in watch loop", zap.String("name", lw.name), - zap.String("key", lw.key), zap.Error(err)) - } return resp.Header.Revision + 1, err } } From d4046e149f915eedbd271421b960d50b04aa9da7 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 19 Dec 2023 12:59:51 +0800 Subject: [PATCH 5/7] add test Signed-off-by: lhy1024 --- tests/server/api/rule_test.go | 97 +++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/tests/server/api/rule_test.go b/tests/server/api/rule_test.go index 46920d8d3ac..06c0f6a1288 100644 --- a/tests/server/api/rule_test.go +++ b/tests/server/api/rule_test.go @@ -929,6 +929,103 @@ func (suite *ruleTestSuite) checkBundleBadRequest(cluster *tests.TestCluster) { } } +func (suite *ruleTestSuite) TestLeaderAndVoter() { + suite.env.RunTestInTwoModes(suite.checkLeaderAndVoter) +} + +func (suite *ruleTestSuite) checkLeaderAndVoter(cluster *tests.TestCluster) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + pdAddr := leaderServer.GetAddr() + urlPrefix := fmt.Sprintf("%s%s/api/v1", pdAddr, apiPrefix) + + stores := []*metapb.Store{ + { + Id: 1, + Address: "tikv1", + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + Version: "7.5.0", + Labels: []*metapb.StoreLabel{{Key: "zone", Value: "z1"}}, + }, + { + Id: 2, + Address: "tikv2", + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + Version: "7.5.0", + Labels: []*metapb.StoreLabel{{Key: "zone", Value: "z2"}}, + }, + } + + for _, store := range stores { + tests.MustPutStore(re, cluster, store) + } + + bundles := [][]placement.GroupBundle{ + { + { + ID: "1", + Index: 1, + Rules: []*placement.Rule{ + { + ID: "rule_1", Index: 1, Role: placement.Voter, Count: 1, GroupID: "1", + LabelConstraints: []placement.LabelConstraint{ + {Key: "zone", Op: "in", Values: []string{"z1"}}, + }, + }, + { + ID: "rule_2", Index: 2, Role: placement.Leader, Count: 1, GroupID: "1", + LabelConstraints: []placement.LabelConstraint{ + {Key: "zone", Op: "in", Values: []string{"z2"}}, + }, + }, + }, + }, + }, + { + { + ID: "1", + Index: 1, + Rules: []*placement.Rule{ + { + ID: "rule_1", Index: 1, Role: placement.Leader, Count: 1, GroupID: "1", + LabelConstraints: []placement.LabelConstraint{ + {Key: "zone", Op: "in", Values: []string{"z2"}}, + }, + }, + { + ID: "rule_2", Index: 2, Role: placement.Voter, Count: 1, GroupID: "1", + LabelConstraints: []placement.LabelConstraint{ + {Key: "zone", Op: "in", Values: []string{"z1"}}, + }, + }, + }, + }, + }} + for _, bundle := range bundles { + data, err := json.Marshal(bundle) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) + suite.NoError(err) + + tu.Eventually(re, func() bool { + respBundle := make([]placement.GroupBundle, 0) + err := tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil, + tu.StatusOK(re), tu.ExtractJSON(re, &respBundle)) + suite.NoError(err) + suite.Len(respBundle, 1) + if bundle[0].Rules[0].Role == placement.Leader { + return respBundle[0].Rules[0].Role == placement.Leader + } + if bundle[0].Rules[0].Role == placement.Voter { + return respBundle[0].Rules[0].Role == placement.Voter + } + return false + }) + } +} + func (suite *ruleTestSuite) TestDeleteAndUpdate() { suite.env.RunTestInTwoModes(suite.checkDeleteAndUpdate) } From e1031308242942fbdba57c77fc3553458e3cf548 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 19 Dec 2023 13:24:39 +0800 Subject: [PATCH 6/7] address comments Signed-off-by: lhy1024 --- pkg/mcs/scheduling/server/rule/watcher.go | 8 ++++---- pkg/tso/keyspace_group_manager.go | 4 ++-- server/keyspace_service.go | 7 +++++-- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/pkg/mcs/scheduling/server/rule/watcher.go b/pkg/mcs/scheduling/server/rule/watcher.go index 9432c8cf3e4..3e11cf9ff9d 100644 --- a/pkg/mcs/scheduling/server/rule/watcher.go +++ b/pkg/mcs/scheduling/server/rule/watcher.go @@ -109,7 +109,7 @@ func NewWatcher( func (rw *Watcher) initializeRuleWatcher() error { var suspectKeyRanges *core.KeyRanges - preFn := func(events []*clientv3.Event) error { + preEventsFn := func(events []*clientv3.Event) error { // It will be locked until the postFn is finished. rw.ruleManager.Lock() rw.patch = rw.ruleManager.BeginPatch() @@ -186,7 +186,7 @@ func (rw *Watcher) initializeRuleWatcher() error { return nil } } - postFn := func(events []*clientv3.Event) error { + postEventsFn := func(events []*clientv3.Event) error { defer rw.ruleManager.Unlock() if err := rw.ruleManager.TryCommitPatch(rw.patch); err != nil { log.Error("failed to commit patch", zap.Error(err)) @@ -201,9 +201,9 @@ func (rw *Watcher) initializeRuleWatcher() error { rw.ctx, &rw.wg, rw.etcdClient, "scheduling-rule-watcher", rw.ruleCommonPathPrefix, - preFn, + preEventsFn, putFn, deleteFn, - postFn, + postEventsFn, clientv3.WithPrefix(), ) rw.ruleWatcher.StartWatchLoop() diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 0e69986f255..c48c066a2aa 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -559,7 +559,7 @@ func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error { kgm.deleteKeyspaceGroup(groupID) return nil } - postEventFn := func([]*clientv3.Event) error { + postEventsFn := func([]*clientv3.Event) error { // Retry the groups that are not initialized successfully before. for id, group := range kgm.groupUpdateRetryList { delete(kgm.groupUpdateRetryList, id) @@ -576,7 +576,7 @@ func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error { func([]*clientv3.Event) error { return nil }, putFn, deleteFn, - postEventFn, + postEventsFn, clientv3.WithRange(endKey), ) if kgm.loadKeyspaceGroupsTimeout > 0 { diff --git a/server/keyspace_service.go b/server/keyspace_service.go index 1718108d73b..11d912a5f54 100644 --- a/server/keyspace_service.go +++ b/server/keyspace_service.go @@ -89,7 +89,10 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques deleteFn := func(kv *mvccpb.KeyValue) error { return nil } - postEventFn := func([]*clientv3.Event) error { + postEventsFn := func([]*clientv3.Event) error { + if len(keyspaces) == 0 { + return nil + } defer func() { keyspaces = keyspaces[:0] }() @@ -112,7 +115,7 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques func([]*clientv3.Event) error { return nil }, putFn, deleteFn, - postEventFn, + postEventsFn, clientv3.WithRange(clientv3.GetPrefixRangeEnd(startKey)), ) watcher.StartWatchLoop() From 969b737db84501fb696fcd4504088c645c2a815b Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 19 Dec 2023 19:06:09 +0800 Subject: [PATCH 7/7] add loading log Signed-off-by: lhy1024 --- pkg/utils/etcdutil/etcdutil.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 796ec5168a9..f6beafee511 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -894,7 +894,11 @@ func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error) } err = lw.putFn(item) if err != nil { - log.Error("put failed in watch loop when loading", zap.String("name", lw.name), zap.String("key", lw.key), zap.Error(err)) + log.Error("put failed in watch loop when loading", zap.String("name", lw.name), zap.String("watch-key", lw.key), + zap.ByteString("key", item.Key), zap.ByteString("value", item.Value), zap.Error(err)) + } else { + log.Debug("put successfully in watch loop when loading", zap.String("name", lw.name), zap.String("watch-key", lw.key), + zap.ByteString("key", item.Key), zap.ByteString("value", item.Value)) } } // Note: if there are no keys in etcd, the resp.More is false. It also means the load is finished.