From 261e689597d129a342e87d7fbd75de5d8665c9cc Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 15 Dec 2023 15:40:50 +0800 Subject: [PATCH 1/2] *: add pre func in etcdutil and refactor for endpoint Signed-off-by: lhy1024 --- pkg/keyspace/tso_keyspace_group.go | 3 +- pkg/mcs/scheduling/server/cluster.go | 2 +- pkg/mcs/scheduling/server/config/watcher.go | 36 +++++++++---------- pkg/mcs/scheduling/server/meta/watcher.go | 7 ++-- pkg/mcs/scheduling/server/rule/watcher.go | 21 +++++------ pkg/mock/mockcluster/mockcluster.go | 2 +- pkg/schedule/checker/rule_checker.go | 8 ++--- pkg/schedule/placement/rule_manager.go | 5 ++- pkg/schedule/schedulers/shuffle_region.go | 7 ++-- .../schedulers/shuffle_region_config.go | 4 ++- pkg/statistics/region_collection_test.go | 5 +-- pkg/storage/endpoint/config.go | 8 ++--- pkg/storage/endpoint/gc_safe_point.go | 8 +---- pkg/storage/endpoint/key_path.go | 6 ++++ pkg/storage/endpoint/keyspace.go | 5 --- pkg/storage/endpoint/replication_status.go | 6 +--- pkg/storage/endpoint/rule.go | 25 ------------- pkg/storage/endpoint/safepoint_v2.go | 13 ++----- pkg/storage/endpoint/service_middleware.go | 6 +--- pkg/storage/endpoint/tso_keyspace_group.go | 7 +--- pkg/storage/endpoint/util.go | 32 ++++++++++++++++- pkg/tso/keyspace_group_manager.go | 6 ++-- pkg/utils/etcdutil/etcdutil.go | 35 ++++++++++++------ pkg/utils/etcdutil/etcdutil_test.go | 15 +++++--- server/cluster/cluster.go | 2 +- server/cluster/cluster_test.go | 10 +++--- server/keyspace_service.go | 3 +- server/server.go | 3 +- 28 files changed, 146 insertions(+), 144 deletions(-) diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index c8694c4a7c6..51a53f75cc2 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -245,9 +245,10 @@ func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client, clusterID ui client, "tso-nodes-watcher", tsoServiceKey, + func([]*clientv3.Event) error { return nil }, putFn, deleteFn, - func() error { return nil }, + func([]*clientv3.Event) error { return nil }, clientv3.WithRange(tsoServiceEndKey), ) } diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index b24db7ac805..5dd1c9f7fce 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -65,7 +65,7 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, cancel() return nil, err } - ruleManager := placement.NewRuleManager(storage, basicCluster, persistConfig) + ruleManager := placement.NewRuleManager(ctx, storage, basicCluster, persistConfig) c := &Cluster{ ctx: ctx, cancel: cancel, diff --git a/pkg/mcs/scheduling/server/config/watcher.go b/pkg/mcs/scheduling/server/config/watcher.go index 4ded93ceb1b..32028592504 100644 --- a/pkg/mcs/scheduling/server/config/watcher.go +++ b/pkg/mcs/scheduling/server/config/watcher.go @@ -139,14 +139,13 @@ func (cw *Watcher) initializeConfigWatcher() error { deleteFn := func(kv *mvccpb.KeyValue) error { return nil } - postEventFn := func() error { - return nil - } cw.configWatcher = etcdutil.NewLoopWatcher( cw.ctx, &cw.wg, cw.etcdClient, "scheduling-config-watcher", cw.configPath, - putFn, deleteFn, postEventFn, + func([]*clientv3.Event) error { return nil }, + putFn, deleteFn, + func([]*clientv3.Event) error { return nil }, ) cw.configWatcher.StartWatchLoop() return cw.configWatcher.WaitLoad() @@ -154,7 +153,7 @@ func (cw *Watcher) initializeConfigWatcher() error { func (cw *Watcher) initializeTTLConfigWatcher() error { putFn := func(kv *mvccpb.KeyValue) error { - key := string(kv.Key)[len(sc.TTLConfigPrefix)+1:] + key := strings.TrimPrefix(string(kv.Key), sc.TTLConfigPrefix+"/") value := string(kv.Value) leaseID := kv.Lease resp, err := cw.etcdClient.TimeToLive(cw.ctx, clientv3.LeaseID(leaseID)) @@ -166,18 +165,18 @@ func (cw *Watcher) initializeTTLConfigWatcher() error { return nil } deleteFn := func(kv *mvccpb.KeyValue) error { - key := string(kv.Key)[len(sc.TTLConfigPrefix)+1:] + key := strings.TrimPrefix(string(kv.Key), sc.TTLConfigPrefix+"/") cw.ttl.PutWithTTL(key, nil, 0) return nil } - postEventFn := func() error { - return nil - } cw.ttlConfigWatcher = etcdutil.NewLoopWatcher( cw.ctx, &cw.wg, cw.etcdClient, "scheduling-ttl-config-watcher", cw.ttlConfigPrefix, - putFn, deleteFn, postEventFn, clientv3.WithPrefix(), + func([]*clientv3.Event) error { return nil }, + putFn, deleteFn, + func([]*clientv3.Event) error { return nil }, + clientv3.WithPrefix(), ) cw.ttlConfigWatcher.StartWatchLoop() return cw.ttlConfigWatcher.WaitLoad() @@ -186,13 +185,14 @@ func (cw *Watcher) initializeTTLConfigWatcher() error { func (cw *Watcher) initializeSchedulerConfigWatcher() error { prefixToTrim := cw.schedulerConfigPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { - name := strings.TrimPrefix(string(kv.Key), prefixToTrim) + key := string(kv.Key) + name := strings.TrimPrefix(key, prefixToTrim) log.Info("update scheduler config", zap.String("name", name), zap.String("value", string(kv.Value))) err := cw.storage.SaveSchedulerConfig(name, kv.Value) if err != nil { log.Warn("failed to save scheduler config", - zap.String("event-kv-key", string(kv.Key)), + zap.String("event-kv-key", key), zap.String("trimmed-key", name), zap.Error(err)) return err @@ -204,19 +204,19 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error { return nil } deleteFn := func(kv *mvccpb.KeyValue) error { - log.Info("remove scheduler config", zap.String("key", string(kv.Key))) + key := string(kv.Key) + log.Info("remove scheduler config", zap.String("key", key)) return cw.storage.RemoveSchedulerConfig( - strings.TrimPrefix(string(kv.Key), prefixToTrim), + strings.TrimPrefix(key, prefixToTrim), ) } - postEventFn := func() error { - return nil - } cw.schedulerConfigWatcher = etcdutil.NewLoopWatcher( cw.ctx, &cw.wg, cw.etcdClient, "scheduling-scheduler-config-watcher", cw.schedulerConfigPathPrefix, - putFn, deleteFn, postEventFn, + func([]*clientv3.Event) error { return nil }, + putFn, deleteFn, + func([]*clientv3.Event) error { return nil }, clientv3.WithPrefix(), ) cw.schedulerConfigWatcher.StartWatchLoop() diff --git a/pkg/mcs/scheduling/server/meta/watcher.go b/pkg/mcs/scheduling/server/meta/watcher.go index 6fae537eab9..808e8fc565e 100644 --- a/pkg/mcs/scheduling/server/meta/watcher.go +++ b/pkg/mcs/scheduling/server/meta/watcher.go @@ -104,14 +104,13 @@ func (w *Watcher) initializeStoreWatcher() error { } return nil } - postEventFn := func() error { - return nil - } w.storeWatcher = etcdutil.NewLoopWatcher( w.ctx, &w.wg, w.etcdClient, "scheduling-store-watcher", w.storePathPrefix, - putFn, deleteFn, postEventFn, + func([]*clientv3.Event) error { return nil }, + putFn, deleteFn, + func([]*clientv3.Event) error { return nil }, clientv3.WithPrefix(), ) w.storeWatcher.StartWatchLoop() diff --git a/pkg/mcs/scheduling/server/rule/watcher.go b/pkg/mcs/scheduling/server/rule/watcher.go index 912fb9c01e5..96e19cf5002 100644 --- a/pkg/mcs/scheduling/server/rule/watcher.go +++ b/pkg/mcs/scheduling/server/rule/watcher.go @@ -131,14 +131,13 @@ func (rw *Watcher) initializeRuleWatcher() error { rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) return rw.ruleManager.DeleteRule(rule.GroupID, rule.ID) } - postEventFn := func() error { - return nil - } rw.ruleWatcher = etcdutil.NewLoopWatcher( rw.ctx, &rw.wg, rw.etcdClient, "scheduling-rule-watcher", rw.rulesPathPrefix, - putFn, deleteFn, postEventFn, + func([]*clientv3.Event) error { return nil }, + putFn, deleteFn, + func([]*clientv3.Event) error { return nil }, clientv3.WithPrefix(), ) rw.ruleWatcher.StartWatchLoop() @@ -168,14 +167,13 @@ func (rw *Watcher) initializeGroupWatcher() error { } return rw.ruleManager.DeleteRuleGroup(trimmedKey) } - postEventFn := func() error { - return nil - } rw.groupWatcher = etcdutil.NewLoopWatcher( rw.ctx, &rw.wg, rw.etcdClient, "scheduling-rule-group-watcher", rw.ruleGroupPathPrefix, - putFn, deleteFn, postEventFn, + func([]*clientv3.Event) error { return nil }, + putFn, deleteFn, + func([]*clientv3.Event) error { return nil }, clientv3.WithPrefix(), ) rw.groupWatcher.StartWatchLoop() @@ -197,14 +195,13 @@ func (rw *Watcher) initializeRegionLabelWatcher() error { log.Info("delete region label rule", zap.String("key", key)) return rw.regionLabeler.DeleteLabelRule(strings.TrimPrefix(key, prefixToTrim)) } - postEventFn := func() error { - return nil - } rw.labelWatcher = etcdutil.NewLoopWatcher( rw.ctx, &rw.wg, rw.etcdClient, "scheduling-region-label-watcher", rw.regionLabelPathPrefix, - putFn, deleteFn, postEventFn, + func([]*clientv3.Event) error { return nil }, + putFn, deleteFn, + func([]*clientv3.Event) error { return nil }, clientv3.WithPrefix(), ) rw.labelWatcher.StartWatchLoop() diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 01282b40534..6cf7ae143df 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -212,7 +212,7 @@ func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { func (mc *Cluster) initRuleManager() { if mc.RuleManager == nil { - mc.RuleManager = placement.NewRuleManager(mc.GetStorage(), mc, mc.GetSharedConfig()) + mc.RuleManager = placement.NewRuleManager(mc.ctx, mc.GetStorage(), mc, mc.GetSharedConfig()) mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels, mc.GetReplicationConfig().IsolationLevel) } } diff --git a/pkg/schedule/checker/rule_checker.go b/pkg/schedule/checker/rule_checker.go index 553ece09e65..95cc77ade5d 100644 --- a/pkg/schedule/checker/rule_checker.go +++ b/pkg/schedule/checker/rule_checker.go @@ -199,7 +199,7 @@ func (c *RuleChecker) fixRulePeer(region *core.RegionInfo, fit *placement.Region if c.isDownPeer(region, peer) { if c.isStoreDownTimeHitMaxDownTime(peer.GetStoreId()) { ruleCheckerReplaceDownCounter.Inc() - return c.replaceUnexpectRulePeer(region, rf, fit, peer, downStatus) + return c.replaceUnexpectedRulePeer(region, rf, fit, peer, downStatus) } // When witness placement rule is enabled, promotes the witness to voter when region has down voter. if c.isWitnessEnabled() && core.IsVoter(peer) { @@ -211,7 +211,7 @@ func (c *RuleChecker) fixRulePeer(region *core.RegionInfo, fit *placement.Region } if c.isOfflinePeer(peer) { ruleCheckerReplaceOfflineCounter.Inc() - return c.replaceUnexpectRulePeer(region, rf, fit, peer, offlineStatus) + return c.replaceUnexpectedRulePeer(region, rf, fit, peer, offlineStatus) } } // fix loose matched peers. @@ -246,7 +246,7 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, fit *placement.Region continue } ruleCheckerNoStoreThenTryReplace.Inc() - op, err := c.replaceUnexpectRulePeer(region, oldPeerRuleFit, fit, p, "swap-fit") + op, err := c.replaceUnexpectedRulePeer(region, oldPeerRuleFit, fit, p, "swap-fit") if err != nil { return nil, err } @@ -267,7 +267,7 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, fit *placement.Region } // The peer's store may in Offline or Down, need to be replace. -func (c *RuleChecker) replaceUnexpectRulePeer(region *core.RegionInfo, rf *placement.RuleFit, fit *placement.RegionFit, peer *metapb.Peer, status string) (*operator.Operator, error) { +func (c *RuleChecker) replaceUnexpectedRulePeer(region *core.RegionInfo, rf *placement.RuleFit, fit *placement.RegionFit, peer *metapb.Peer, status string) (*operator.Operator, error) { var fastFailover bool // If the store to which the original peer belongs is TiFlash, the new peer cannot be set to witness, nor can it perform fast failover if c.isWitnessEnabled() && !c.cluster.GetStore(peer.StoreId).IsTiFlash() { diff --git a/pkg/schedule/placement/rule_manager.go b/pkg/schedule/placement/rule_manager.go index e25b8802b45..621c52d738e 100644 --- a/pkg/schedule/placement/rule_manager.go +++ b/pkg/schedule/placement/rule_manager.go @@ -16,6 +16,7 @@ package placement import ( "bytes" + "context" "encoding/hex" "encoding/json" "fmt" @@ -49,6 +50,7 @@ const ( // RuleManager is responsible for the lifecycle of all placement Rules. // It is thread safe. type RuleManager struct { + ctx context.Context storage endpoint.RuleStorage syncutil.RWMutex initialized bool @@ -63,8 +65,9 @@ type RuleManager struct { } // NewRuleManager creates a RuleManager instance. -func NewRuleManager(storage endpoint.RuleStorage, storeSetInformer core.StoreSetInformer, conf config.SharedConfigProvider) *RuleManager { +func NewRuleManager(ctx context.Context, storage endpoint.RuleStorage, storeSetInformer core.StoreSetInformer, conf config.SharedConfigProvider) *RuleManager { return &RuleManager{ + ctx: ctx, storage: storage, storeSetInformer: storeSetInformer, conf: conf, diff --git a/pkg/schedule/schedulers/shuffle_region.go b/pkg/schedule/schedulers/shuffle_region.go index f1d35e80925..f9bed18d3fa 100644 --- a/pkg/schedule/schedulers/shuffle_region.go +++ b/pkg/schedule/schedulers/shuffle_region.go @@ -139,18 +139,19 @@ func (s *shuffleRegionScheduler) scheduleRemovePeer(cluster sche.SchedulerCluste pendingFilter := filter.NewRegionPendingFilter() downFilter := filter.NewRegionDownFilter() replicaFilter := filter.NewRegionReplicatedFilter(cluster) + ranges := s.conf.GetRanges() for _, source := range candidates.Stores { var region *core.RegionInfo if s.conf.IsRoleAllow(roleFollower) { - region = filter.SelectOneRegion(cluster.RandFollowerRegions(source.GetID(), s.conf.Ranges), nil, + region = filter.SelectOneRegion(cluster.RandFollowerRegions(source.GetID(), ranges), nil, pendingFilter, downFilter, replicaFilter) } if region == nil && s.conf.IsRoleAllow(roleLeader) { - region = filter.SelectOneRegion(cluster.RandLeaderRegions(source.GetID(), s.conf.Ranges), nil, + region = filter.SelectOneRegion(cluster.RandLeaderRegions(source.GetID(), ranges), nil, pendingFilter, downFilter, replicaFilter) } if region == nil && s.conf.IsRoleAllow(roleLearner) { - region = filter.SelectOneRegion(cluster.RandLearnerRegions(source.GetID(), s.conf.Ranges), nil, + region = filter.SelectOneRegion(cluster.RandLearnerRegions(source.GetID(), ranges), nil, pendingFilter, downFilter, replicaFilter) } if region != nil { diff --git a/pkg/schedule/schedulers/shuffle_region_config.go b/pkg/schedule/schedulers/shuffle_region_config.go index 7d04879c992..552d7ea8bce 100644 --- a/pkg/schedule/schedulers/shuffle_region_config.go +++ b/pkg/schedule/schedulers/shuffle_region_config.go @@ -58,7 +58,9 @@ func (conf *shuffleRegionSchedulerConfig) GetRoles() []string { func (conf *shuffleRegionSchedulerConfig) GetRanges() []core.KeyRange { conf.RLock() defer conf.RUnlock() - return conf.Ranges + ranges := make([]core.KeyRange, len(conf.Ranges)) + copy(ranges, conf.Ranges) + return ranges } func (conf *shuffleRegionSchedulerConfig) IsRoleAllow(role string) bool { diff --git a/pkg/statistics/region_collection_test.go b/pkg/statistics/region_collection_test.go index f0df9ce6e07..cbbf7672bee 100644 --- a/pkg/statistics/region_collection_test.go +++ b/pkg/statistics/region_collection_test.go @@ -15,6 +15,7 @@ package statistics import ( + "context" "testing" "github.com/pingcap/kvproto/pkg/metapb" @@ -29,7 +30,7 @@ import ( func TestRegionStatistics(t *testing.T) { re := require.New(t) store := storage.NewStorageWithMemoryBackend() - manager := placement.NewRuleManager(store, nil, nil) + manager := placement.NewRuleManager(context.Background(), store, nil, nil) err := manager.Initialize(3, []string{"zone", "rack", "host"}, "") re.NoError(err) opt := mockconfig.NewTestOptions() @@ -118,7 +119,7 @@ func TestRegionStatistics(t *testing.T) { func TestRegionStatisticsWithPlacementRule(t *testing.T) { re := require.New(t) store := storage.NewStorageWithMemoryBackend() - manager := placement.NewRuleManager(store, nil, nil) + manager := placement.NewRuleManager(context.Background(), store, nil, nil) err := manager.Initialize(3, []string{"zone", "rack", "host"}, "") re.NoError(err) opt := mockconfig.NewTestOptions() diff --git a/pkg/storage/endpoint/config.go b/pkg/storage/endpoint/config.go index db5565a4b90..edfdcbca9a3 100644 --- a/pkg/storage/endpoint/config.go +++ b/pkg/storage/endpoint/config.go @@ -51,17 +51,13 @@ func (se *StorageEndpoint) LoadConfig(cfg interface{}) (bool, error) { // SaveConfig stores marshallable cfg to the configPath. func (se *StorageEndpoint) SaveConfig(cfg interface{}) error { - value, err := json.Marshal(cfg) - if err != nil { - return errs.ErrJSONMarshal.Wrap(err).GenWithStackByCause() - } - return se.Save(configPath, string(value)) + return se.saveJSON(configPath, cfg) } // LoadAllSchedulerConfigs loads all schedulers' config. func (se *StorageEndpoint) LoadAllSchedulerConfigs() ([]string, []string, error) { prefix := customSchedulerConfigPath + "/" - keys, values, err := se.LoadRange(prefix, clientv3.GetPrefixRangeEnd(prefix), 1000) + keys, values, err := se.LoadRange(prefix, clientv3.GetPrefixRangeEnd(prefix), MinKVRangeLimit) for i, key := range keys { keys[i] = strings.TrimPrefix(key, prefix) } diff --git a/pkg/storage/endpoint/gc_safe_point.go b/pkg/storage/endpoint/gc_safe_point.go index db5c58205c8..c2f09980651 100644 --- a/pkg/storage/endpoint/gc_safe_point.go +++ b/pkg/storage/endpoint/gc_safe_point.go @@ -169,13 +169,7 @@ func (se *StorageEndpoint) SaveServiceGCSafePoint(ssp *ServiceSafePoint) error { return errors.New("TTL of gc_worker's service safe point must be infinity") } - key := gcSafePointServicePath(ssp.ServiceID) - value, err := json.Marshal(ssp) - if err != nil { - return err - } - - return se.Save(key, string(value)) + return se.saveJSON(gcSafePointServicePath(ssp.ServiceID), ssp) } // RemoveServiceGCSafePoint removes a GC safepoint for the service diff --git a/pkg/storage/endpoint/key_path.go b/pkg/storage/endpoint/key_path.go index cac40db29c5..69b8d0f2f8e 100644 --- a/pkg/storage/endpoint/key_path.go +++ b/pkg/storage/endpoint/key_path.go @@ -31,6 +31,7 @@ const ( serviceMiddlewarePath = "service_middleware" schedulePath = "schedule" gcPath = "gc" + ruleCommonPath = "rule" rulesPath = "rules" ruleGroupPath = "rule_group" regionLabelPath = "region_label" @@ -102,6 +103,11 @@ func RulesPathPrefix(clusterID uint64) string { return path.Join(PDRootPath(clusterID), rulesPath) } +// RuleCommonPathPrefix returns the path prefix to save the placement rule common config. +func RuleCommonPathPrefix(clusterID uint64) string { + return path.Join(PDRootPath(clusterID), ruleCommonPath) +} + // RuleGroupPathPrefix returns the path prefix to save the placement rule groups. func RuleGroupPathPrefix(clusterID uint64) string { return path.Join(PDRootPath(clusterID), ruleGroupPath) diff --git a/pkg/storage/endpoint/keyspace.go b/pkg/storage/endpoint/keyspace.go index 09733ad59c1..77c81b2c8d6 100644 --- a/pkg/storage/endpoint/keyspace.go +++ b/pkg/storage/endpoint/keyspace.go @@ -97,11 +97,6 @@ func (se *StorageEndpoint) LoadKeyspaceID(txn kv.Txn, name string) (bool, uint32 return true, uint32(id64), nil } -// RunInTxn runs the given function in a transaction. -func (se *StorageEndpoint) RunInTxn(ctx context.Context, f func(txn kv.Txn) error) error { - return se.Base.RunInTxn(ctx, f) -} - // LoadRangeKeyspace loads keyspaces starting at startID. // limit specifies the limit of loaded keyspaces. func (se *StorageEndpoint) LoadRangeKeyspace(txn kv.Txn, startID uint32, limit int) ([]*keyspacepb.KeyspaceMeta, error) { diff --git a/pkg/storage/endpoint/replication_status.go b/pkg/storage/endpoint/replication_status.go index 4bac51071bc..0a14770ff47 100644 --- a/pkg/storage/endpoint/replication_status.go +++ b/pkg/storage/endpoint/replication_status.go @@ -43,9 +43,5 @@ func (se *StorageEndpoint) LoadReplicationStatus(mode string, status interface{} // SaveReplicationStatus stores replication status by mode. func (se *StorageEndpoint) SaveReplicationStatus(mode string, status interface{}) error { - value, err := json.Marshal(status) - if err != nil { - return errs.ErrJSONMarshal.Wrap(err).GenWithStackByArgs() - } - return se.Save(replicationModePath(mode), string(value)) + return se.saveJSON(replicationModePath(mode), status) } diff --git a/pkg/storage/endpoint/rule.go b/pkg/storage/endpoint/rule.go index 125c5bc31eb..80b6fc7c0ff 100644 --- a/pkg/storage/endpoint/rule.go +++ b/pkg/storage/endpoint/rule.go @@ -14,12 +14,6 @@ package endpoint -import ( - "strings" - - "go.etcd.io/etcd/clientv3" -) - // RuleStorage defines the storage operations on the rule. type RuleStorage interface { LoadRule(ruleKey string) (string, error) @@ -103,22 +97,3 @@ func (se *StorageEndpoint) LoadRule(ruleKey string) (string, error) { func (se *StorageEndpoint) LoadRules(f func(k, v string)) error { return se.loadRangeByPrefix(rulesPath+"/", f) } - -// loadRangeByPrefix iterates all key-value pairs in the storage that has the prefix. -func (se *StorageEndpoint) loadRangeByPrefix(prefix string, f func(k, v string)) error { - nextKey := prefix - endKey := clientv3.GetPrefixRangeEnd(prefix) - for { - keys, values, err := se.LoadRange(nextKey, endKey, MinKVRangeLimit) - if err != nil { - return err - } - for i := range keys { - f(strings.TrimPrefix(keys[i], prefix), values[i]) - } - if len(keys) < MinKVRangeLimit { - return nil - } - nextKey = keys[len(keys)-1] + "\x00" - } -} diff --git a/pkg/storage/endpoint/safepoint_v2.go b/pkg/storage/endpoint/safepoint_v2.go index cac2606a470..8d690d07261 100644 --- a/pkg/storage/endpoint/safepoint_v2.go +++ b/pkg/storage/endpoint/safepoint_v2.go @@ -79,12 +79,7 @@ func (se *StorageEndpoint) LoadGCSafePointV2(keyspaceID uint32) (*GCSafePointV2, // SaveGCSafePointV2 saves gc safe point for the given keyspace. func (se *StorageEndpoint) SaveGCSafePointV2(gcSafePoint *GCSafePointV2) error { - key := GCSafePointV2Path(gcSafePoint.KeyspaceID) - value, err := json.Marshal(gcSafePoint) - if err != nil { - return errs.ErrJSONMarshal.Wrap(err).GenWithStackByCause() - } - return se.Save(key, string(value)) + return se.saveJSON(GCSafePointV2Path(gcSafePoint.KeyspaceID), gcSafePoint) } // LoadAllGCSafePoints returns gc safe point for all keyspaces @@ -203,11 +198,7 @@ func (se *StorageEndpoint) SaveServiceSafePointV2(serviceSafePoint *ServiceSafeP } key := ServiceSafePointV2Path(serviceSafePoint.KeyspaceID, serviceSafePoint.ServiceID) - value, err := json.Marshal(serviceSafePoint) - if err != nil { - return errs.ErrJSONMarshal.Wrap(err).GenWithStackByCause() - } - return se.Save(key, string(value)) + return se.saveJSON(key, serviceSafePoint) } // RemoveServiceSafePointV2 removes a service safe point. diff --git a/pkg/storage/endpoint/service_middleware.go b/pkg/storage/endpoint/service_middleware.go index 62cf91c97bf..2becbf3686e 100644 --- a/pkg/storage/endpoint/service_middleware.go +++ b/pkg/storage/endpoint/service_middleware.go @@ -43,9 +43,5 @@ func (se *StorageEndpoint) LoadServiceMiddlewareConfig(cfg interface{}) (bool, e // SaveServiceMiddlewareConfig stores marshallable cfg to the serviceMiddlewarePath. func (se *StorageEndpoint) SaveServiceMiddlewareConfig(cfg interface{}) error { - value, err := json.Marshal(cfg) - if err != nil { - return errs.ErrJSONMarshal.Wrap(err).GenWithStackByCause() - } - return se.Save(serviceMiddlewarePath, string(value)) + return se.saveJSON(serviceMiddlewarePath, cfg) } diff --git a/pkg/storage/endpoint/tso_keyspace_group.go b/pkg/storage/endpoint/tso_keyspace_group.go index 498cd878887..39a08afe937 100644 --- a/pkg/storage/endpoint/tso_keyspace_group.go +++ b/pkg/storage/endpoint/tso_keyspace_group.go @@ -177,12 +177,7 @@ func (se *StorageEndpoint) LoadKeyspaceGroup(txn kv.Txn, id uint32) (*KeyspaceGr // SaveKeyspaceGroup saves the keyspace group. func (se *StorageEndpoint) SaveKeyspaceGroup(txn kv.Txn, kg *KeyspaceGroup) error { - key := KeyspaceGroupIDPath(kg.ID) - value, err := json.Marshal(kg) - if err != nil { - return err - } - return txn.Save(key, string(value)) + return saveJSONInTxn(txn, KeyspaceGroupIDPath(kg.ID), kg) } // DeleteKeyspaceGroup deletes the keyspace group. diff --git a/pkg/storage/endpoint/util.go b/pkg/storage/endpoint/util.go index 37f98a55709..3058c059628 100644 --- a/pkg/storage/endpoint/util.go +++ b/pkg/storage/endpoint/util.go @@ -16,9 +16,12 @@ package endpoint import ( "encoding/json" + "strings" "github.com/gogo/protobuf/proto" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/storage/kv" + "go.etcd.io/etcd/clientv3" ) func (se *StorageEndpoint) loadProto(key string, msg proto.Message) (bool, error) { @@ -42,9 +45,36 @@ func (se *StorageEndpoint) saveProto(key string, msg proto.Message) error { } func (se *StorageEndpoint) saveJSON(key string, data interface{}) error { + return saveJSONInTxn(se /* use the same interface */, key, data) +} + +func saveJSONInTxn(txn kv.Txn, key string, data interface{}) error { value, err := json.Marshal(data) if err != nil { return errs.ErrJSONMarshal.Wrap(err).GenWithStackByArgs() } - return se.Save(key, string(value)) + return txn.Save(key, string(value)) +} + +// loadRangeByPrefix iterates all key-value pairs in the storage that has the prefix. +func (se *StorageEndpoint) loadRangeByPrefix(prefix string, f func(k, v string)) error { + return loadRangeByPrefixInTxn(se /* use the same interface */, prefix, f) +} + +func loadRangeByPrefixInTxn(txn kv.Txn, prefix string, f func(k, v string)) error { + nextKey := prefix + endKey := clientv3.GetPrefixRangeEnd(prefix) + for { + keys, values, err := txn.LoadRange(nextKey, endKey, MinKVRangeLimit) + if err != nil { + return err + } + for i := range keys { + f(strings.TrimPrefix(keys[i], prefix), values[i]) + } + if len(keys) < MinKVRangeLimit { + return nil + } + nextKey = keys[len(keys)-1] + "\x00" + } } diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 58534de1642..0e69986f255 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -514,9 +514,10 @@ func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error { kgm.etcdClient, "tso-nodes-watcher", kgm.tsoServiceKey, + func([]*clientv3.Event) error { return nil }, putFn, deleteFn, - func() error { return nil }, + func([]*clientv3.Event) error { return nil }, clientv3.WithRange(tsoServiceEndKey), ) kgm.tsoNodesWatcher.StartWatchLoop() @@ -558,7 +559,7 @@ func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error { kgm.deleteKeyspaceGroup(groupID) return nil } - postEventFn := func() error { + postEventFn := func([]*clientv3.Event) error { // Retry the groups that are not initialized successfully before. for id, group := range kgm.groupUpdateRetryList { delete(kgm.groupUpdateRetryList, id) @@ -572,6 +573,7 @@ func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error { kgm.etcdClient, "keyspace-watcher", startKey, + func([]*clientv3.Event) error { return nil }, putFn, deleteFn, postEventFn, diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 03c2374efc6..0e1b2731474 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -587,8 +587,10 @@ type LoopWatcher struct { putFn func(*mvccpb.KeyValue) error // deleteFn is used to handle the delete event. deleteFn func(*mvccpb.KeyValue) error - // postEventFn is used to call after handling all events. - postEventFn func() error + // postEventsFn is used to call after handling all events. + postEventsFn func([]*clientv3.Event) error + // preEventsFn is used to call before handling all events. + preEventsFn func([]*clientv3.Event) error // forceLoadMu is used to ensure two force loads have minimal interval. forceLoadMu syncutil.RWMutex @@ -613,7 +615,9 @@ func NewLoopWatcher( ctx context.Context, wg *sync.WaitGroup, client *clientv3.Client, name, key string, - putFn, deleteFn func(*mvccpb.KeyValue) error, postEventFn func() error, + preEventsFn func([]*clientv3.Event) error, + putFn, deleteFn func(*mvccpb.KeyValue) error, + postEventsFn func([]*clientv3.Event) error, opts ...clientv3.OpOption, ) *LoopWatcher { return &LoopWatcher{ @@ -627,7 +631,8 @@ func NewLoopWatcher( updateClientCh: make(chan *clientv3.Client, 1), putFn: putFn, deleteFn: deleteFn, - postEventFn: postEventFn, + postEventsFn: postEventsFn, + preEventsFn: preEventsFn, opts: opts, lastTimeForceLoad: time.Now(), loadTimeout: defaultLoadDataFromEtcdTimeout, @@ -813,28 +818,34 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision zap.Int64("revision", revision), zap.String("name", lw.name), zap.String("key", lw.key)) goto watchChanLoop } + if err := lw.preEventsFn(wresp.Events); err != nil { + log.Error("run pre event failed in watch loop", zap.Error(err), + zap.Int64("revision", revision), zap.String("name", lw.name), zap.String("key", lw.key)) + } for _, event := range wresp.Events { switch event.Type { case clientv3.EventTypePut: if err := lw.putFn(event.Kv); err != nil { log.Error("put failed in watch loop", zap.Error(err), - zap.Int64("revision", revision), zap.String("name", lw.name), zap.String("key", lw.key)) + zap.Int64("revision", revision), zap.String("name", lw.name), + zap.String("watch-key", lw.key), zap.ByteString("event-kv-key", event.Kv.Key)) } else { - log.Debug("put in watch loop", zap.String("name", lw.name), + log.Debug("put successfully in watch loop", zap.String("name", lw.name), zap.ByteString("key", event.Kv.Key), zap.ByteString("value", event.Kv.Value)) } case clientv3.EventTypeDelete: if err := lw.deleteFn(event.Kv); err != nil { log.Error("delete failed in watch loop", zap.Error(err), - zap.Int64("revision", revision), zap.String("name", lw.name), zap.String("key", lw.key)) + zap.Int64("revision", revision), zap.String("name", lw.name), + zap.String("watch-key", lw.key), zap.ByteString("event-kv-key", event.Kv.Key)) } else { - log.Debug("delete in watch loop", zap.String("name", lw.name), + log.Debug("delete successfully in watch loop", zap.String("name", lw.name), zap.ByteString("key", event.Kv.Key)) } } } - if err := lw.postEventFn(); err != nil { + if err := lw.postEventsFn(wresp.Events); err != nil { log.Error("run post event failed in watch loop", zap.Error(err), zap.Int64("revision", revision), zap.String("name", lw.name), zap.String("key", lw.key)) } @@ -864,6 +875,10 @@ func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error) zap.String("key", lw.key), zap.Error(err)) return 0, err } + if err := lw.preEventsFn([]*clientv3.Event{}); err != nil { + log.Error("run pre event failed in watch loop", zap.String("name", lw.name), + zap.String("key", lw.key), zap.Error(err)) + } for i, item := range resp.Kvs { if resp.More && i == len(resp.Kvs)-1 { // The last key is the start key of the next batch. @@ -878,7 +893,7 @@ func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error) } // Note: if there are no keys in etcd, the resp.More is false. It also means the load is finished. if !resp.More { - if err := lw.postEventFn(); err != nil { + if err := lw.postEventsFn([]*clientv3.Event{}); err != nil { log.Error("run post event failed in watch loop", zap.String("name", lw.name), zap.String("key", lw.key), zap.Error(err)) } diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index f7fadd3bbf6..861a57cef13 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -410,6 +410,7 @@ func (suite *loopWatcherTestSuite) TestLoadWithoutKey() { suite.client, "test", "TestLoadWithoutKey", + func([]*clientv3.Event) error { return nil }, func(kv *mvccpb.KeyValue) error { cache.Lock() defer cache.Unlock() @@ -417,7 +418,7 @@ func (suite *loopWatcherTestSuite) TestLoadWithoutKey() { return nil }, func(kv *mvccpb.KeyValue) error { return nil }, - func() error { return nil }, + func([]*clientv3.Event) error { return nil }, ) watcher.StartWatchLoop() err := watcher.WaitLoad() @@ -441,6 +442,7 @@ func (suite *loopWatcherTestSuite) TestCallBack() { suite.client, "test", "TestCallBack", + func([]*clientv3.Event) error { return nil }, func(kv *mvccpb.KeyValue) error { result = append(result, string(kv.Key)) return nil @@ -451,7 +453,7 @@ func (suite *loopWatcherTestSuite) TestCallBack() { delete(cache.data, string(kv.Key)) return nil }, - func() error { + func([]*clientv3.Event) error { cache.Lock() defer cache.Unlock() for _, r := range result { @@ -506,6 +508,7 @@ func (suite *loopWatcherTestSuite) TestWatcherLoadLimit() { suite.client, "test", "TestWatcherLoadLimit", + func([]*clientv3.Event) error { return nil }, func(kv *mvccpb.KeyValue) error { cache.Lock() defer cache.Unlock() @@ -515,7 +518,7 @@ func (suite *loopWatcherTestSuite) TestWatcherLoadLimit() { func(kv *mvccpb.KeyValue) error { return nil }, - func() error { + func([]*clientv3.Event) error { return nil }, clientv3.WithPrefix(), @@ -550,6 +553,7 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { suite.client, "test", "TestWatcherBreak", + func([]*clientv3.Event) error { return nil }, func(kv *mvccpb.KeyValue) error { if string(kv.Key) == "TestWatcherBreak" { cache.Lock() @@ -559,7 +563,7 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() { return nil }, func(kv *mvccpb.KeyValue) error { return nil }, - func() error { return nil }, + func([]*clientv3.Event) error { return nil }, ) watcher.watchChangeRetryInterval = 100 * time.Millisecond watcher.StartWatchLoop() @@ -633,9 +637,10 @@ func (suite *loopWatcherTestSuite) TestWatcherRequestProgress() { suite.client, "test", "TestWatcherChanBlock", + func([]*clientv3.Event) error { return nil }, func(kv *mvccpb.KeyValue) error { return nil }, func(kv *mvccpb.KeyValue) error { return nil }, - func() error { return nil }, + func([]*clientv3.Event) error { return nil }, ) suite.wg.Add(1) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 78f6ddd4364..ecbd40e2582 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -269,7 +269,7 @@ func (c *RaftCluster) InitCluster( c.unsafeRecoveryController = unsaferecovery.NewController(c) c.keyspaceGroupManager = keyspaceGroupManager c.hbstreams = hbstreams - c.ruleManager = placement.NewRuleManager(c.storage, c, c.GetOpts()) + c.ruleManager = placement.NewRuleManager(c.ctx, c.storage, c, c.GetOpts()) if c.opt.IsPlacementRulesEnabled() { err := c.ruleManager.Initialize(c.opt.GetMaxReplicas(), c.opt.GetLocationLabels(), c.opt.GetIsolationLevel()) if err != nil { diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 85edf911779..7094fd6b673 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -241,7 +241,7 @@ func TestSetOfflineStore(t *testing.T) { re.NoError(err) cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend()) cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) - cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) + cluster.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { @@ -438,7 +438,7 @@ func TestUpStore(t *testing.T) { re.NoError(err) cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend()) cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) - cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) + cluster.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { @@ -541,7 +541,7 @@ func TestDeleteStoreUpdatesClusterVersion(t *testing.T) { re.NoError(err) cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend()) cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) - cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) + cluster.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { @@ -1268,7 +1268,7 @@ func TestOfflineAndMerge(t *testing.T) { re.NoError(err) cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend()) cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) - cluster.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) + cluster.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), cluster, cluster.GetOpts()) if opt.IsPlacementRulesEnabled() { err := cluster.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { @@ -2130,7 +2130,7 @@ func newTestRaftCluster( ) *RaftCluster { rc := &RaftCluster{serverCtx: ctx, core: core.NewBasicCluster(), storage: s} rc.InitCluster(id, opt, nil, nil) - rc.ruleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), rc, opt) + rc.ruleManager = placement.NewRuleManager(ctx, storage.NewStorageWithMemoryBackend(), rc, opt) if opt.IsPlacementRulesEnabled() { err := rc.ruleManager.Initialize(opt.GetMaxReplicas(), opt.GetLocationLabels(), opt.GetIsolationLevel()) if err != nil { diff --git a/server/keyspace_service.go b/server/keyspace_service.go index b17239ba0a4..1718108d73b 100644 --- a/server/keyspace_service.go +++ b/server/keyspace_service.go @@ -89,7 +89,7 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques deleteFn := func(kv *mvccpb.KeyValue) error { return nil } - postEventFn := func() error { + postEventFn := func([]*clientv3.Event) error { defer func() { keyspaces = keyspaces[:0] }() @@ -109,6 +109,7 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques s.client, "keyspace-server-watcher", startKey, + func([]*clientv3.Event) error { return nil }, putFn, deleteFn, postEventFn, diff --git a/server/server.go b/server/server.go index 187c30dbf7a..fcf71922a09 100644 --- a/server/server.go +++ b/server/server.go @@ -2017,9 +2017,10 @@ func (s *Server) initServicePrimaryWatcher(serviceName string, primaryKey string s.client, name, primaryKey, + func([]*clientv3.Event) error { return nil }, putFn, deleteFn, - func() error { return nil }, + func([]*clientv3.Event) error { return nil }, ) } From ac1bc1f9c2bb083a4f84216701d50557696e4bc2 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 15 Dec 2023 16:03:44 +0800 Subject: [PATCH 2/2] fix test Signed-off-by: lhy1024 --- pkg/schedule/placement/rule_manager_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/schedule/placement/rule_manager_test.go b/pkg/schedule/placement/rule_manager_test.go index 68a18b538d4..c0987f6dd33 100644 --- a/pkg/schedule/placement/rule_manager_test.go +++ b/pkg/schedule/placement/rule_manager_test.go @@ -15,6 +15,7 @@ package placement import ( + "context" "encoding/hex" "testing" @@ -32,7 +33,7 @@ func newTestManager(t *testing.T, enableWitness bool) (endpoint.RuleStorage, *Ru re := require.New(t) store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil) var err error - manager := NewRuleManager(store, nil, mockconfig.NewTestOptions()) + manager := NewRuleManager(context.Background(), store, nil, mockconfig.NewTestOptions()) manager.conf.SetEnableWitness(enableWitness) err = manager.Initialize(3, []string{"zone", "rack", "host"}, "") re.NoError(err) @@ -156,7 +157,7 @@ func TestSaveLoad(t *testing.T) { re.NoError(manager.SetRule(r.Clone())) } - m2 := NewRuleManager(store, nil, nil) + m2 := NewRuleManager(context.Background(), store, nil, nil) err := m2.Initialize(3, []string{"no", "labels"}, "") re.NoError(err) re.Len(m2.GetAllRules(), 3) @@ -174,7 +175,7 @@ func TestSetAfterGet(t *testing.T) { rule.Count = 1 manager.SetRule(rule) - m2 := NewRuleManager(store, nil, nil) + m2 := NewRuleManager(context.Background(), store, nil, nil) err := m2.Initialize(100, []string{}, "") re.NoError(err) rule = m2.GetRule(DefaultGroupID, DefaultRuleID)