Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add pre func in etcdutil and refactor for endpoint #7555

Merged
merged 4 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,10 @@ func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client, clusterID ui
client,
"tso-nodes-watcher",
tsoServiceKey,
func([]*clientv3.Event) error { return nil },
putFn,
deleteFn,
func() error { return nil },
func([]*clientv3.Event) error { return nil },
clientv3.WithRange(tsoServiceEndKey),
)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig,
cancel()
return nil, err
}
ruleManager := placement.NewRuleManager(storage, basicCluster, persistConfig)
ruleManager := placement.NewRuleManager(ctx, storage, basicCluster, persistConfig)
c := &Cluster{
ctx: ctx,
cancel: cancel,
Expand Down
36 changes: 18 additions & 18 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,22 +139,21 @@
deleteFn := func(kv *mvccpb.KeyValue) error {
return nil
}
postEventFn := func() error {
return nil
}
cw.configWatcher = etcdutil.NewLoopWatcher(
cw.ctx, &cw.wg,
cw.etcdClient,
"scheduling-config-watcher", cw.configPath,
putFn, deleteFn, postEventFn,
func([]*clientv3.Event) error { return nil },
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
)
cw.configWatcher.StartWatchLoop()
return cw.configWatcher.WaitLoad()
}

func (cw *Watcher) initializeTTLConfigWatcher() error {
putFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)[len(sc.TTLConfigPrefix)+1:]
key := strings.TrimPrefix(string(kv.Key), sc.TTLConfigPrefix+"/")
value := string(kv.Value)
leaseID := kv.Lease
resp, err := cw.etcdClient.TimeToLive(cw.ctx, clientv3.LeaseID(leaseID))
Expand All @@ -166,18 +165,18 @@
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)[len(sc.TTLConfigPrefix)+1:]
key := strings.TrimPrefix(string(kv.Key), sc.TTLConfigPrefix+"/")
cw.ttl.PutWithTTL(key, nil, 0)
return nil
}
postEventFn := func() error {
return nil
}
cw.ttlConfigWatcher = etcdutil.NewLoopWatcher(
cw.ctx, &cw.wg,
cw.etcdClient,
"scheduling-ttl-config-watcher", cw.ttlConfigPrefix,
putFn, deleteFn, postEventFn, clientv3.WithPrefix(),
func([]*clientv3.Event) error { return nil },
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithPrefix(),
)
cw.ttlConfigWatcher.StartWatchLoop()
return cw.ttlConfigWatcher.WaitLoad()
Expand All @@ -186,13 +185,14 @@
func (cw *Watcher) initializeSchedulerConfigWatcher() error {
prefixToTrim := cw.schedulerConfigPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
name := strings.TrimPrefix(string(kv.Key), prefixToTrim)
key := string(kv.Key)
name := strings.TrimPrefix(key, prefixToTrim)
log.Info("update scheduler config", zap.String("name", name),
zap.String("value", string(kv.Value)))
err := cw.storage.SaveSchedulerConfig(name, kv.Value)
if err != nil {
log.Warn("failed to save scheduler config",
zap.String("event-kv-key", string(kv.Key)),
zap.String("event-kv-key", key),

Check warning on line 195 in pkg/mcs/scheduling/server/config/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/config/watcher.go#L195

Added line #L195 was not covered by tests
zap.String("trimmed-key", name),
zap.Error(err))
return err
Expand All @@ -204,19 +204,19 @@
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("remove scheduler config", zap.String("key", string(kv.Key)))
key := string(kv.Key)
log.Info("remove scheduler config", zap.String("key", key))
return cw.storage.RemoveSchedulerConfig(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
strings.TrimPrefix(key, prefixToTrim),
)
}
postEventFn := func() error {
return nil
}
cw.schedulerConfigWatcher = etcdutil.NewLoopWatcher(
cw.ctx, &cw.wg,
cw.etcdClient,
"scheduling-scheduler-config-watcher", cw.schedulerConfigPathPrefix,
putFn, deleteFn, postEventFn,
func([]*clientv3.Event) error { return nil },
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithPrefix(),
)
cw.schedulerConfigWatcher.StartWatchLoop()
Expand Down
7 changes: 3 additions & 4 deletions pkg/mcs/scheduling/server/meta/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,13 @@ func (w *Watcher) initializeStoreWatcher() error {
}
return nil
}
postEventFn := func() error {
return nil
}
w.storeWatcher = etcdutil.NewLoopWatcher(
w.ctx, &w.wg,
w.etcdClient,
"scheduling-store-watcher", w.storePathPrefix,
putFn, deleteFn, postEventFn,
func([]*clientv3.Event) error { return nil },
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithPrefix(),
)
w.storeWatcher.StartWatchLoop()
Expand Down
21 changes: 9 additions & 12 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,13 @@ func (rw *Watcher) initializeRuleWatcher() error {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
return rw.ruleManager.DeleteRule(rule.GroupID, rule.ID)
}
postEventFn := func() error {
return nil
}
rw.ruleWatcher = etcdutil.NewLoopWatcher(
rw.ctx, &rw.wg,
rw.etcdClient,
"scheduling-rule-watcher", rw.rulesPathPrefix,
putFn, deleteFn, postEventFn,
func([]*clientv3.Event) error { return nil },
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithPrefix(),
)
rw.ruleWatcher.StartWatchLoop()
Expand Down Expand Up @@ -168,14 +167,13 @@ func (rw *Watcher) initializeGroupWatcher() error {
}
return rw.ruleManager.DeleteRuleGroup(trimmedKey)
}
postEventFn := func() error {
return nil
}
rw.groupWatcher = etcdutil.NewLoopWatcher(
rw.ctx, &rw.wg,
rw.etcdClient,
"scheduling-rule-group-watcher", rw.ruleGroupPathPrefix,
putFn, deleteFn, postEventFn,
func([]*clientv3.Event) error { return nil },
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithPrefix(),
)
rw.groupWatcher.StartWatchLoop()
Expand All @@ -197,14 +195,13 @@ func (rw *Watcher) initializeRegionLabelWatcher() error {
log.Info("delete region label rule", zap.String("key", key))
return rw.regionLabeler.DeleteLabelRule(strings.TrimPrefix(key, prefixToTrim))
}
postEventFn := func() error {
return nil
}
rw.labelWatcher = etcdutil.NewLoopWatcher(
rw.ctx, &rw.wg,
rw.etcdClient,
"scheduling-region-label-watcher", rw.regionLabelPathPrefix,
putFn, deleteFn, postEventFn,
func([]*clientv3.Event) error { return nil },
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithPrefix(),
)
rw.labelWatcher.StartWatchLoop()
Expand Down
2 changes: 1 addition & 1 deletion pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) {

func (mc *Cluster) initRuleManager() {
if mc.RuleManager == nil {
mc.RuleManager = placement.NewRuleManager(mc.GetStorage(), mc, mc.GetSharedConfig())
mc.RuleManager = placement.NewRuleManager(mc.ctx, mc.GetStorage(), mc, mc.GetSharedConfig())
mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels, mc.GetReplicationConfig().IsolationLevel)
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (c *RuleChecker) fixRulePeer(region *core.RegionInfo, fit *placement.Region
if c.isDownPeer(region, peer) {
if c.isStoreDownTimeHitMaxDownTime(peer.GetStoreId()) {
ruleCheckerReplaceDownCounter.Inc()
return c.replaceUnexpectRulePeer(region, rf, fit, peer, downStatus)
return c.replaceUnexpectedRulePeer(region, rf, fit, peer, downStatus)
}
// When witness placement rule is enabled, promotes the witness to voter when region has down voter.
if c.isWitnessEnabled() && core.IsVoter(peer) {
Expand All @@ -211,7 +211,7 @@ func (c *RuleChecker) fixRulePeer(region *core.RegionInfo, fit *placement.Region
}
if c.isOfflinePeer(peer) {
ruleCheckerReplaceOfflineCounter.Inc()
return c.replaceUnexpectRulePeer(region, rf, fit, peer, offlineStatus)
return c.replaceUnexpectedRulePeer(region, rf, fit, peer, offlineStatus)
}
}
// fix loose matched peers.
Expand Down Expand Up @@ -246,7 +246,7 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, fit *placement.Region
continue
}
ruleCheckerNoStoreThenTryReplace.Inc()
op, err := c.replaceUnexpectRulePeer(region, oldPeerRuleFit, fit, p, "swap-fit")
op, err := c.replaceUnexpectedRulePeer(region, oldPeerRuleFit, fit, p, "swap-fit")
if err != nil {
return nil, err
}
Expand All @@ -267,7 +267,7 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, fit *placement.Region
}

// The peer's store may in Offline or Down, need to be replace.
func (c *RuleChecker) replaceUnexpectRulePeer(region *core.RegionInfo, rf *placement.RuleFit, fit *placement.RegionFit, peer *metapb.Peer, status string) (*operator.Operator, error) {
func (c *RuleChecker) replaceUnexpectedRulePeer(region *core.RegionInfo, rf *placement.RuleFit, fit *placement.RegionFit, peer *metapb.Peer, status string) (*operator.Operator, error) {
var fastFailover bool
// If the store to which the original peer belongs is TiFlash, the new peer cannot be set to witness, nor can it perform fast failover
if c.isWitnessEnabled() && !c.cluster.GetStore(peer.StoreId).IsTiFlash() {
Expand Down
5 changes: 4 additions & 1 deletion pkg/schedule/placement/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package placement

import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -49,6 +50,7 @@ const (
// RuleManager is responsible for the lifecycle of all placement Rules.
// It is thread safe.
type RuleManager struct {
ctx context.Context
storage endpoint.RuleStorage
syncutil.RWMutex
initialized bool
Expand All @@ -63,8 +65,9 @@ type RuleManager struct {
}

// NewRuleManager creates a RuleManager instance.
func NewRuleManager(storage endpoint.RuleStorage, storeSetInformer core.StoreSetInformer, conf config.SharedConfigProvider) *RuleManager {
func NewRuleManager(ctx context.Context, storage endpoint.RuleStorage, storeSetInformer core.StoreSetInformer, conf config.SharedConfigProvider) *RuleManager {
return &RuleManager{
ctx: ctx,
storage: storage,
storeSetInformer: storeSetInformer,
conf: conf,
Expand Down
7 changes: 4 additions & 3 deletions pkg/schedule/placement/rule_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package placement

import (
"context"
"encoding/hex"
"testing"

Expand All @@ -32,7 +33,7 @@ func newTestManager(t *testing.T, enableWitness bool) (endpoint.RuleStorage, *Ru
re := require.New(t)
store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
var err error
manager := NewRuleManager(store, nil, mockconfig.NewTestOptions())
manager := NewRuleManager(context.Background(), store, nil, mockconfig.NewTestOptions())
manager.conf.SetEnableWitness(enableWitness)
err = manager.Initialize(3, []string{"zone", "rack", "host"}, "")
re.NoError(err)
Expand Down Expand Up @@ -156,7 +157,7 @@ func TestSaveLoad(t *testing.T) {
re.NoError(manager.SetRule(r.Clone()))
}

m2 := NewRuleManager(store, nil, nil)
m2 := NewRuleManager(context.Background(), store, nil, nil)
err := m2.Initialize(3, []string{"no", "labels"}, "")
re.NoError(err)
re.Len(m2.GetAllRules(), 3)
Expand All @@ -174,7 +175,7 @@ func TestSetAfterGet(t *testing.T) {
rule.Count = 1
manager.SetRule(rule)

m2 := NewRuleManager(store, nil, nil)
m2 := NewRuleManager(context.Background(), store, nil, nil)
err := m2.Initialize(100, []string{}, "")
re.NoError(err)
rule = m2.GetRule(DefaultGroupID, DefaultRuleID)
Expand Down
7 changes: 4 additions & 3 deletions pkg/schedule/schedulers/shuffle_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,18 +139,19 @@ func (s *shuffleRegionScheduler) scheduleRemovePeer(cluster sche.SchedulerCluste
pendingFilter := filter.NewRegionPendingFilter()
downFilter := filter.NewRegionDownFilter()
replicaFilter := filter.NewRegionReplicatedFilter(cluster)
ranges := s.conf.GetRanges()
for _, source := range candidates.Stores {
var region *core.RegionInfo
if s.conf.IsRoleAllow(roleFollower) {
region = filter.SelectOneRegion(cluster.RandFollowerRegions(source.GetID(), s.conf.Ranges), nil,
region = filter.SelectOneRegion(cluster.RandFollowerRegions(source.GetID(), ranges), nil,
pendingFilter, downFilter, replicaFilter)
}
if region == nil && s.conf.IsRoleAllow(roleLeader) {
region = filter.SelectOneRegion(cluster.RandLeaderRegions(source.GetID(), s.conf.Ranges), nil,
region = filter.SelectOneRegion(cluster.RandLeaderRegions(source.GetID(), ranges), nil,
pendingFilter, downFilter, replicaFilter)
}
if region == nil && s.conf.IsRoleAllow(roleLearner) {
region = filter.SelectOneRegion(cluster.RandLearnerRegions(source.GetID(), s.conf.Ranges), nil,
region = filter.SelectOneRegion(cluster.RandLearnerRegions(source.GetID(), ranges), nil,
pendingFilter, downFilter, replicaFilter)
}
if region != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/schedule/schedulers/shuffle_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ func (conf *shuffleRegionSchedulerConfig) GetRoles() []string {
func (conf *shuffleRegionSchedulerConfig) GetRanges() []core.KeyRange {
conf.RLock()
defer conf.RUnlock()
return conf.Ranges
ranges := make([]core.KeyRange, len(conf.Ranges))
copy(ranges, conf.Ranges)
return ranges
}

func (conf *shuffleRegionSchedulerConfig) IsRoleAllow(role string) bool {
Expand Down
5 changes: 3 additions & 2 deletions pkg/statistics/region_collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package statistics

import (
"context"
"testing"

"github.com/pingcap/kvproto/pkg/metapb"
Expand All @@ -29,7 +30,7 @@ import (
func TestRegionStatistics(t *testing.T) {
re := require.New(t)
store := storage.NewStorageWithMemoryBackend()
manager := placement.NewRuleManager(store, nil, nil)
manager := placement.NewRuleManager(context.Background(), store, nil, nil)
err := manager.Initialize(3, []string{"zone", "rack", "host"}, "")
re.NoError(err)
opt := mockconfig.NewTestOptions()
Expand Down Expand Up @@ -118,7 +119,7 @@ func TestRegionStatistics(t *testing.T) {
func TestRegionStatisticsWithPlacementRule(t *testing.T) {
re := require.New(t)
store := storage.NewStorageWithMemoryBackend()
manager := placement.NewRuleManager(store, nil, nil)
manager := placement.NewRuleManager(context.Background(), store, nil, nil)
err := manager.Initialize(3, []string{"zone", "rack", "host"}, "")
re.NoError(err)
opt := mockconfig.NewTestOptions()
Expand Down
8 changes: 2 additions & 6 deletions pkg/storage/endpoint/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,13 @@ func (se *StorageEndpoint) LoadConfig(cfg interface{}) (bool, error) {

// SaveConfig stores marshallable cfg to the configPath.
func (se *StorageEndpoint) SaveConfig(cfg interface{}) error {
value, err := json.Marshal(cfg)
if err != nil {
return errs.ErrJSONMarshal.Wrap(err).GenWithStackByCause()
}
return se.Save(configPath, string(value))
return se.saveJSON(configPath, cfg)
}

// LoadAllSchedulerConfigs loads all schedulers' config.
func (se *StorageEndpoint) LoadAllSchedulerConfigs() ([]string, []string, error) {
prefix := customSchedulerConfigPath + "/"
keys, values, err := se.LoadRange(prefix, clientv3.GetPrefixRangeEnd(prefix), 1000)
keys, values, err := se.LoadRange(prefix, clientv3.GetPrefixRangeEnd(prefix), MinKVRangeLimit)
for i, key := range keys {
keys[i] = strings.TrimPrefix(key, prefix)
}
Expand Down
8 changes: 1 addition & 7 deletions pkg/storage/endpoint/gc_safe_point.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,7 @@ func (se *StorageEndpoint) SaveServiceGCSafePoint(ssp *ServiceSafePoint) error {
return errors.New("TTL of gc_worker's service safe point must be infinity")
}

key := gcSafePointServicePath(ssp.ServiceID)
value, err := json.Marshal(ssp)
if err != nil {
return err
}

return se.Save(key, string(value))
return se.saveJSON(gcSafePointServicePath(ssp.ServiceID), ssp)
}

// RemoveServiceGCSafePoint removes a GC safepoint for the service
Expand Down
Loading
Loading