Skip to content

Commit

Permalink
address the comment
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Oct 8, 2023
1 parent 9aa1958 commit cff15dd
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 18 deletions.
32 changes: 17 additions & 15 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,25 +427,25 @@ func (c *RaftCluster) runStoreConfigSync() {
defer c.wg.Done()

var (
synced, switchRaftV2Config, isLastSynced bool
stores = c.GetStores()
synced, switchRaftV2Config, needPersist bool
stores = c.GetStores()
)
// Start the ticker with a second-level timer to accelerate
// the bootstrap stage.
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
synced, switchRaftV2Config = c.syncStoreConfig(stores)
synced, switchRaftV2Config, needPersist = c.syncStoreConfig(stores)
if switchRaftV2Config {
if err := c.opt.SwitchRaftV2(c.GetStorage()); err != nil {
log.Warn("store config persisted failed", zap.Error(err))
}
}
isLastSynced = synced
// Update the stores if the synchronization is not completed.
if !synced {
stores = c.GetStores()
} else if !isLastSynced && synced {
}
if needPersist {
if err := c.opt.Persist(c.storage); err != nil {
log.Warn("store config persisted failed", zap.Error(err))
}
Expand All @@ -462,7 +462,8 @@ func (c *RaftCluster) runStoreConfigSync() {
// syncStoreConfig syncs the store config from TiKV.
// - `synced` is true if sync config from one tikv.
// - `switchRaftV2` is true if the config of tikv engine is change to raft-kv2.
func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, switchRaftV2 bool) {
func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, switchRaftV2 bool, needPersist bool) {
var err error
for index := 0; index < len(stores); index++ {
select {
case <-c.ctx.Done():
Expand All @@ -482,7 +483,7 @@ func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, sw
}
// it will try next store if the current store is failed.
address := netutil.ResolveLoopBackAddr(stores[index].GetStatusAddress(), stores[index].GetAddress())
switchRaftV2, err := c.observeStoreConfig(c.ctx, address)
switchRaftV2, needPersist, err = c.observeStoreConfig(c.ctx, address)
if err != nil {
// delete the store if it is failed and retry next store.
stores = append(stores[:index], stores[index+1:]...)
Expand All @@ -495,34 +496,35 @@ func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, sw
}
storeSyncConfigEvent.WithLabelValues(address, "succ").Inc()

return true, switchRaftV2
return true, switchRaftV2, needPersist
}
return false, false
return false, false, needPersist
}

// observeStoreConfig is used to observe the store config changes and
// return whether if the new config changes the engine to raft-kv2.
func (c *RaftCluster) observeStoreConfig(ctx context.Context, address string) (bool, error) {
func (c *RaftCluster) observeStoreConfig(ctx context.Context, address string) (switchRaftV2 bool, needPersist bool, err error) {
cfg, err := c.fetchStoreConfigFromTiKV(ctx, address)
if err != nil {
return false, err
return false, false, err
}
oldCfg := c.opt.GetStoreConfig()
if cfg == nil || oldCfg.Equal(cfg) {
return false, nil
return false, false, nil
}
log.Info("sync the store config successful",
zap.String("store-address", address),
zap.String("store-config", cfg.String()),
zap.String("old-config", oldCfg.String()))
return c.updateStoreConfig(oldCfg, cfg)
return c.updateStoreConfig(oldCfg, cfg), true, nil
}

// updateStoreConfig updates the store config. This is extracted for testing.
func (c *RaftCluster) updateStoreConfig(oldCfg, cfg *sc.StoreConfig) (bool, error) {
func (c *RaftCluster) updateStoreConfig(oldCfg, cfg *sc.StoreConfig) (switchRaftV2 bool) {
cfg.Adjust()
c.opt.SetStoreConfig(cfg)
return oldCfg.Storage.Engine != sc.RaftstoreV2 && cfg.Storage.Engine == sc.RaftstoreV2, nil
switchRaftV2 = oldCfg.Storage.Engine != sc.RaftstoreV2 && cfg.Storage.Engine == sc.RaftstoreV2
return
}

// fetchStoreConfigFromTiKV tries to fetch the config from the TiKV store URL.
Expand Down
8 changes: 5 additions & 3 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1428,7 +1428,7 @@ func TestSyncConfigContext(t *testing.T) {
// trip schema header
now := time.Now()
stores[0].GetMeta().StatusAddress = server.URL[7:]
synced, _ := tc.syncStoreConfig(tc.GetStores())
synced, _, _ := tc.syncStoreConfig(tc.GetStores())
re.False(synced)
re.Less(time.Since(now), clientTimeout*2)
}
Expand All @@ -1450,15 +1450,17 @@ func TestStoreConfigSync(t *testing.T) {
re.Equal(uint64(144), tc.GetStoreConfig().GetRegionMaxSize())
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/mockFetchStoreConfigFromTiKV", `return("10MiB")`))
// switchRaftV2 will be true.
synced, switchRaftV2 := tc.syncStoreConfig(tc.GetStores())
synced, switchRaftV2, needPersist := tc.syncStoreConfig(tc.GetStores())
re.True(synced)
re.True(switchRaftV2)
re.True(needPersist)
re.EqualValues(512, tc.opt.GetMaxMovableHotPeerSize())
re.Equal(uint64(10), tc.GetStoreConfig().GetRegionMaxSize())
// switchRaftV2 will be false this time.
synced, switchRaftV2 = tc.syncStoreConfig(tc.GetStores())
synced, switchRaftV2, needPersist = tc.syncStoreConfig(tc.GetStores())
re.True(synced)
re.False(switchRaftV2)
re.False(needPersist)
re.Equal(uint64(10), tc.GetStoreConfig().GetRegionMaxSize())
re.NoError(opt.Persist(tc.GetStorage()))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/mockFetchStoreConfigFromTiKV"))
Expand Down

0 comments on commit cff15dd

Please sign in to comment.