Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: prevent store config updating periodically #7163

Merged
merged 4 commits into from
Oct 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (cw *Watcher) initializeConfigWatcher() error {
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
return err
}
log.Info("update scheduling config", zap.Reflect("new", cfg))
cw.AdjustScheduleCfg(&cfg.Schedule)
cw.SetClusterVersion(&cfg.ClusterVersion)
cw.SetScheduleConfig(&cfg.Schedule)
Expand Down Expand Up @@ -146,6 +147,7 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error {
prefixToTrim := cw.schedulerConfigPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
name := strings.TrimPrefix(string(kv.Key), prefixToTrim)
log.Info("update scheduler config", zap.String("name", string(kv.Value)))
err := cw.storage.SaveSchedulerConfig(name, kv.Value)
if err != nil {
log.Warn("failed to save scheduler config",
Expand All @@ -161,6 +163,7 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error {
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("remove scheduler config", zap.String("key", string(kv.Key)))
return cw.storage.RemoveSchedulerConfig(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
)
Expand Down
8 changes: 8 additions & 0 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ import (
"strings"
"sync"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
)

// ruleStorage is an in-memory storage for Placement Rules,
Expand Down Expand Up @@ -163,12 +165,14 @@ func (rw *Watcher) initializeRuleWatcher() error {
putFn := func(kv *mvccpb.KeyValue) error {
// Since the PD API server will validate the rule before saving it to etcd,
// so we could directly save the string rule in JSON to the storage here.
log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
return rw.ruleStore.SaveRule(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("delete placement rule", zap.String("key", string(kv.Key)))
Comment on lines 167 to +175
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it print a lot of logs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we frequently add/remove the rule, it will, like the current status of the audit log. However, we cannot change the log level for microservices. Once it is supported, I can change it to the debug level if it's too noisy. Right now, it is more convenient for us to test.

return rw.ruleStore.DeleteRule(strings.TrimPrefix(string(kv.Key), prefixToTrim))
}
postEventFn := func() error {
Expand All @@ -188,12 +192,14 @@ func (rw *Watcher) initializeRuleWatcher() error {
func (rw *Watcher) initializeGroupWatcher() error {
prefixToTrim := rw.ruleGroupPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
return rw.ruleStore.SaveRuleGroup(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("delete placement rule group", zap.String("key", string(kv.Key)))
return rw.ruleStore.DeleteRuleGroup(strings.TrimPrefix(string(kv.Key), prefixToTrim))
}
postEventFn := func() error {
Expand All @@ -213,12 +219,14 @@ func (rw *Watcher) initializeGroupWatcher() error {
func (rw *Watcher) initializeRegionLabelWatcher() error {
prefixToTrim := rw.regionLabelPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
return rw.ruleStore.SaveRegionRule(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("delete region label rule", zap.String("key", string(kv.Key)))
return rw.ruleStore.DeleteRegionRule(strings.TrimPrefix(string(kv.Key), prefixToTrim))
}
postEventFn := func() error {
Expand Down
35 changes: 20 additions & 15 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,15 +427,15 @@ func (c *RaftCluster) runStoreConfigSync() {
defer c.wg.Done()

var (
synced, switchRaftV2Config bool
stores = c.GetStores()
synced, switchRaftV2Config, needPersist bool
stores = c.GetStores()
)
// Start the ticker with a second-level timer to accelerate
// the bootstrap stage.
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
synced, switchRaftV2Config = c.syncStoreConfig(stores)
synced, switchRaftV2Config, needPersist = c.syncStoreConfig(stores)
if switchRaftV2Config {
if err := c.opt.SwitchRaftV2(c.GetStorage()); err != nil {
log.Warn("store config persisted failed", zap.Error(err))
Expand All @@ -444,8 +444,11 @@ func (c *RaftCluster) runStoreConfigSync() {
// Update the stores if the synchronization is not completed.
if !synced {
stores = c.GetStores()
} else if err := c.opt.Persist(c.storage); err != nil {
log.Warn("store config persisted failed", zap.Error(err))
}
if needPersist {
if err := c.opt.Persist(c.storage); err != nil {
log.Warn("store config persisted failed", zap.Error(err))
}
}
select {
case <-c.ctx.Done():
Expand All @@ -459,7 +462,8 @@ func (c *RaftCluster) runStoreConfigSync() {
// syncStoreConfig syncs the store config from TiKV.
// - `synced` is true if sync config from one tikv.
// - `switchRaftV2` is true if the config of tikv engine is change to raft-kv2.
func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, switchRaftV2 bool) {
func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, switchRaftV2 bool, needPersist bool) {
var err error
for index := 0; index < len(stores); index++ {
select {
case <-c.ctx.Done():
Expand All @@ -479,7 +483,7 @@ func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, sw
}
// it will try next store if the current store is failed.
address := netutil.ResolveLoopBackAddr(stores[index].GetStatusAddress(), stores[index].GetAddress())
switchRaftV2, err := c.observeStoreConfig(c.ctx, address)
switchRaftV2, needPersist, err = c.observeStoreConfig(c.ctx, address)
if err != nil {
// delete the store if it is failed and retry next store.
stores = append(stores[:index], stores[index+1:]...)
Expand All @@ -492,34 +496,35 @@ func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, sw
}
storeSyncConfigEvent.WithLabelValues(address, "succ").Inc()

return true, switchRaftV2
return true, switchRaftV2, needPersist
}
return false, false
return false, false, needPersist
}

// observeStoreConfig is used to observe the store config changes and
// return whether if the new config changes the engine to raft-kv2.
func (c *RaftCluster) observeStoreConfig(ctx context.Context, address string) (bool, error) {
func (c *RaftCluster) observeStoreConfig(ctx context.Context, address string) (switchRaftV2 bool, needPersist bool, err error) {
cfg, err := c.fetchStoreConfigFromTiKV(ctx, address)
if err != nil {
return false, err
return false, false, err
}
oldCfg := c.opt.GetStoreConfig()
if cfg == nil || oldCfg.Equal(cfg) {
return false, nil
return false, false, nil
}
log.Info("sync the store config successful",
zap.String("store-address", address),
zap.String("store-config", cfg.String()),
zap.String("old-config", oldCfg.String()))
return c.updateStoreConfig(oldCfg, cfg)
return c.updateStoreConfig(oldCfg, cfg), true, nil
}

// updateStoreConfig updates the store config. This is extracted for testing.
func (c *RaftCluster) updateStoreConfig(oldCfg, cfg *sc.StoreConfig) (bool, error) {
func (c *RaftCluster) updateStoreConfig(oldCfg, cfg *sc.StoreConfig) (switchRaftV2 bool) {
cfg.Adjust()
c.opt.SetStoreConfig(cfg)
return oldCfg.Storage.Engine != sc.RaftstoreV2 && cfg.Storage.Engine == sc.RaftstoreV2, nil
switchRaftV2 = oldCfg.Storage.Engine != sc.RaftstoreV2 && cfg.Storage.Engine == sc.RaftstoreV2
return
}

// fetchStoreConfigFromTiKV tries to fetch the config from the TiKV store URL.
Expand Down
10 changes: 7 additions & 3 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1428,8 +1428,10 @@ func TestSyncConfigContext(t *testing.T) {
// trip schema header
now := time.Now()
stores[0].GetMeta().StatusAddress = server.URL[7:]
synced, _ := tc.syncStoreConfig(tc.GetStores())
synced, switchRaftV2, needPersist := tc.syncStoreConfig(tc.GetStores())
re.False(synced)
re.False(switchRaftV2)
re.False(needPersist)
re.Less(time.Since(now), clientTimeout*2)
}

Expand All @@ -1450,15 +1452,17 @@ func TestStoreConfigSync(t *testing.T) {
re.Equal(uint64(144), tc.GetStoreConfig().GetRegionMaxSize())
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/mockFetchStoreConfigFromTiKV", `return("10MiB")`))
// switchRaftV2 will be true.
synced, switchRaftV2 := tc.syncStoreConfig(tc.GetStores())
synced, switchRaftV2, needPersist := tc.syncStoreConfig(tc.GetStores())
re.True(synced)
re.True(switchRaftV2)
re.True(needPersist)
re.EqualValues(512, tc.opt.GetMaxMovableHotPeerSize())
re.Equal(uint64(10), tc.GetStoreConfig().GetRegionMaxSize())
// switchRaftV2 will be false this time.
synced, switchRaftV2 = tc.syncStoreConfig(tc.GetStores())
synced, switchRaftV2, needPersist = tc.syncStoreConfig(tc.GetStores())
re.True(synced)
re.False(switchRaftV2)
re.False(needPersist)
re.Equal(uint64(10), tc.GetStoreConfig().GetRegionMaxSize())
re.NoError(opt.Persist(tc.GetStorage()))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/mockFetchStoreConfigFromTiKV"))
Expand Down
Loading