diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 54085ef533ba..33ed5711ac5c 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -427,25 +427,25 @@ func (c *RaftCluster) runStoreConfigSync() { defer c.wg.Done() var ( - synced, switchRaftV2Config, isLastSynced bool - stores = c.GetStores() + synced, switchRaftV2Config, needPersist bool + stores = c.GetStores() ) // Start the ticker with a second-level timer to accelerate // the bootstrap stage. ticker := time.NewTicker(time.Minute) defer ticker.Stop() for { - synced, switchRaftV2Config = c.syncStoreConfig(stores) + synced, switchRaftV2Config, needPersist = c.syncStoreConfig(stores) if switchRaftV2Config { if err := c.opt.SwitchRaftV2(c.GetStorage()); err != nil { log.Warn("store config persisted failed", zap.Error(err)) } } - isLastSynced = synced // Update the stores if the synchronization is not completed. if !synced { stores = c.GetStores() - } else if !isLastSynced && synced { + } + if needPersist { if err := c.opt.Persist(c.storage); err != nil { log.Warn("store config persisted failed", zap.Error(err)) } @@ -462,7 +462,8 @@ func (c *RaftCluster) runStoreConfigSync() { // syncStoreConfig syncs the store config from TiKV. // - `synced` is true if sync config from one tikv. // - `switchRaftV2` is true if the config of tikv engine is change to raft-kv2. -func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, switchRaftV2 bool) { +func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, switchRaftV2 bool, needPersist bool) { + var err error for index := 0; index < len(stores); index++ { select { case <-c.ctx.Done(): @@ -482,7 +483,7 @@ func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, sw } // it will try next store if the current store is failed. address := netutil.ResolveLoopBackAddr(stores[index].GetStatusAddress(), stores[index].GetAddress()) - switchRaftV2, err := c.observeStoreConfig(c.ctx, address) + switchRaftV2, needPersist, err = c.observeStoreConfig(c.ctx, address) if err != nil { // delete the store if it is failed and retry next store. stores = append(stores[:index], stores[index+1:]...) @@ -495,34 +496,35 @@ func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, sw } storeSyncConfigEvent.WithLabelValues(address, "succ").Inc() - return true, switchRaftV2 + return true, switchRaftV2, needPersist } - return false, false + return false, false, needPersist } // observeStoreConfig is used to observe the store config changes and // return whether if the new config changes the engine to raft-kv2. -func (c *RaftCluster) observeStoreConfig(ctx context.Context, address string) (bool, error) { +func (c *RaftCluster) observeStoreConfig(ctx context.Context, address string) (switchRaftV2 bool, needPersist bool, err error) { cfg, err := c.fetchStoreConfigFromTiKV(ctx, address) if err != nil { - return false, err + return false, false, err } oldCfg := c.opt.GetStoreConfig() if cfg == nil || oldCfg.Equal(cfg) { - return false, nil + return false, false, nil } log.Info("sync the store config successful", zap.String("store-address", address), zap.String("store-config", cfg.String()), zap.String("old-config", oldCfg.String())) - return c.updateStoreConfig(oldCfg, cfg) + return c.updateStoreConfig(oldCfg, cfg), true, nil } // updateStoreConfig updates the store config. This is extracted for testing. -func (c *RaftCluster) updateStoreConfig(oldCfg, cfg *sc.StoreConfig) (bool, error) { +func (c *RaftCluster) updateStoreConfig(oldCfg, cfg *sc.StoreConfig) (switchRaftV2 bool) { cfg.Adjust() c.opt.SetStoreConfig(cfg) - return oldCfg.Storage.Engine != sc.RaftstoreV2 && cfg.Storage.Engine == sc.RaftstoreV2, nil + switchRaftV2 = oldCfg.Storage.Engine != sc.RaftstoreV2 && cfg.Storage.Engine == sc.RaftstoreV2 + return } // fetchStoreConfigFromTiKV tries to fetch the config from the TiKV store URL. diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index aa826e344069..5cfec7d42302 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -1428,7 +1428,7 @@ func TestSyncConfigContext(t *testing.T) { // trip schema header now := time.Now() stores[0].GetMeta().StatusAddress = server.URL[7:] - synced, _ := tc.syncStoreConfig(tc.GetStores()) + synced, _, _ := tc.syncStoreConfig(tc.GetStores()) re.False(synced) re.Less(time.Since(now), clientTimeout*2) } @@ -1450,15 +1450,17 @@ func TestStoreConfigSync(t *testing.T) { re.Equal(uint64(144), tc.GetStoreConfig().GetRegionMaxSize()) re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/mockFetchStoreConfigFromTiKV", `return("10MiB")`)) // switchRaftV2 will be true. - synced, switchRaftV2 := tc.syncStoreConfig(tc.GetStores()) + synced, switchRaftV2, needPersist := tc.syncStoreConfig(tc.GetStores()) re.True(synced) re.True(switchRaftV2) + re.True(needPersist) re.EqualValues(512, tc.opt.GetMaxMovableHotPeerSize()) re.Equal(uint64(10), tc.GetStoreConfig().GetRegionMaxSize()) // switchRaftV2 will be false this time. - synced, switchRaftV2 = tc.syncStoreConfig(tc.GetStores()) + synced, switchRaftV2, needPersist = tc.syncStoreConfig(tc.GetStores()) re.True(synced) re.False(switchRaftV2) + re.False(needPersist) re.Equal(uint64(10), tc.GetStoreConfig().GetRegionMaxSize()) re.NoError(opt.Persist(tc.GetStorage())) re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/mockFetchStoreConfigFromTiKV"))