Skip to content

Commit

Permalink
prevent store config updating periodically
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Sep 27, 2023
1 parent eac55a7 commit 9aa1958
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 4 deletions.
3 changes: 3 additions & 0 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (cw *Watcher) initializeConfigWatcher() error {
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
return err
}
log.Info("update scheduling config", zap.Reflect("new", cfg))
cw.AdjustScheduleCfg(&cfg.Schedule)
cw.SetClusterVersion(&cfg.ClusterVersion)
cw.SetScheduleConfig(&cfg.Schedule)
Expand Down Expand Up @@ -146,6 +147,7 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error {
prefixToTrim := cw.schedulerConfigPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
name := strings.TrimPrefix(string(kv.Key), prefixToTrim)
log.Info("update scheduler config", zap.Reflect("name", string(kv.Value)))
err := cw.storage.SaveSchedulerConfig(name, kv.Value)
if err != nil {
log.Warn("failed to save scheduler config",
Expand All @@ -161,6 +163,7 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error {
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("remove scheduler config", zap.Reflect("key", string(kv.Key)))
return cw.storage.RemoveSchedulerConfig(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
)
Expand Down
8 changes: 8 additions & 0 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ import (
"strings"
"sync"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
)

// ruleStorage is an in-memory storage for Placement Rules,
Expand Down Expand Up @@ -163,12 +165,14 @@ func (rw *Watcher) initializeRuleWatcher() error {
putFn := func(kv *mvccpb.KeyValue) error {
// Since the PD API server will validate the rule before saving it to etcd,
// so we could directly save the string rule in JSON to the storage here.
log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
return rw.ruleStore.SaveRule(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("delete placement rule", zap.String("key", string(kv.Key)))
return rw.ruleStore.DeleteRule(strings.TrimPrefix(string(kv.Key), prefixToTrim))
}
postEventFn := func() error {
Expand All @@ -188,12 +192,14 @@ func (rw *Watcher) initializeRuleWatcher() error {
func (rw *Watcher) initializeGroupWatcher() error {
prefixToTrim := rw.ruleGroupPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
return rw.ruleStore.SaveRuleGroup(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("delete placement rule group", zap.String("key", string(kv.Key)))
return rw.ruleStore.DeleteRuleGroup(strings.TrimPrefix(string(kv.Key), prefixToTrim))
}
postEventFn := func() error {
Expand All @@ -213,12 +219,14 @@ func (rw *Watcher) initializeGroupWatcher() error {
func (rw *Watcher) initializeRegionLabelWatcher() error {
prefixToTrim := rw.regionLabelPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
return rw.ruleStore.SaveRegionRule(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("delete region label rule", zap.String("key", string(kv.Key)))
return rw.ruleStore.DeleteRegionRule(strings.TrimPrefix(string(kv.Key), prefixToTrim))
}
postEventFn := func() error {
Expand Down
11 changes: 7 additions & 4 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,8 +427,8 @@ func (c *RaftCluster) runStoreConfigSync() {
defer c.wg.Done()

var (
synced, switchRaftV2Config bool
stores = c.GetStores()
synced, switchRaftV2Config, isLastSynced bool
stores = c.GetStores()
)
// Start the ticker with a second-level timer to accelerate
// the bootstrap stage.
Expand All @@ -441,11 +441,14 @@ func (c *RaftCluster) runStoreConfigSync() {
log.Warn("store config persisted failed", zap.Error(err))
}
}
isLastSynced = synced
// Update the stores if the synchronization is not completed.
if !synced {
stores = c.GetStores()
} else if err := c.opt.Persist(c.storage); err != nil {
log.Warn("store config persisted failed", zap.Error(err))
} else if !isLastSynced && synced {
if err := c.opt.Persist(c.storage); err != nil {
log.Warn("store config persisted failed", zap.Error(err))
}
}
select {
case <-c.ctx.Done():
Expand Down

0 comments on commit 9aa1958

Please sign in to comment.