diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 5a24444a1ad..56c5e07be98 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -293,22 +293,28 @@ func (c *RaftCluster) runSyncConfig() { defer ticker.Stop() stores := c.GetStores() - syncConfig(c.storeConfigManager, stores) + syncConfig(c.ctx, c.storeConfigManager, stores) for { select { case <-c.ctx.Done(): log.Info("sync store config job is stopped") return case <-ticker.C: - if !syncConfig(c.storeConfigManager, stores) { + if !syncConfig(c.ctx, c.storeConfigManager, stores) { stores = c.GetStores() } } } } -func syncConfig(manager *config.StoreConfigManager, stores []*core.StoreInfo) bool { +func syncConfig(ctx context.Context, manager *config.StoreConfigManager, stores []*core.StoreInfo) bool { for index := 0; index < len(stores); index++ { + select { + case <-ctx.Done(): + log.Info("stop sync store config job due to raft cluster exited") + return false + default: + } // filter out the stores that are tiflash store := stores[index] if core.IsStoreContainLabel(store.GetMeta(), core.EngineKey, core.EngineTiFlash) { @@ -321,7 +327,10 @@ func syncConfig(manager *config.StoreConfigManager, stores []*core.StoreInfo) bo } // it will try next store if the current store is failed. address := netutil.ResolveLoopBackAddr(stores[index].GetStatusAddress(), stores[index].GetAddress()) - if err := manager.ObserveConfig(address); err != nil { + if err := manager.ObserveConfig(ctx, address); err != nil { + // delete the store if it is failed and retry next store. + stores = append(stores[:index], stores[index+1:]...) + index-- storeSyncConfigEvent.WithLabelValues(address, "fail").Inc() log.Debug("sync store config failed, it will try next store", zap.Error(err)) continue diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 66882d95ed5..06cd7778ba0 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -16,9 +16,12 @@ package cluster import ( "context" + "encoding/json" "fmt" "math" "math/rand" + "net/http" + "net/http/httptest" "strings" "sync" "testing" @@ -1134,6 +1137,40 @@ func (s *testClusterInfoSuite) TestOfflineAndMerge(c *C) { } } +func (s *testClusterInfoSuite) TestSyncConfigContext(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, opt, _ := newTestScheduleConfig() + tc := newTestCluster(ctx, opt) + tc.storeConfigManager = config.NewStoreConfigManager(http.DefaultClient) + tc.httpClient = &http.Client{} + + server := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + time.Sleep(time.Second * 100) + cfg := &config.StoreConfig{} + b, err := json.Marshal(cfg) + if err != nil { + res.WriteHeader(http.StatusInternalServerError) + res.Write([]byte(fmt.Sprintf("failed setting up test server: %s", err))) + return + } + + res.WriteHeader(http.StatusOK) + res.Write(b) + })) + stores := newTestStores(1, "2.0.0") + for _, s := range stores { + tc.putStoreLocked(s) + } + // trip schema header + now := time.Now() + stores[0].GetMeta().StatusAddress = server.URL[7:] + synced := syncConfig(tc.ctx, tc.storeConfigManager, stores) + c.Assert(synced, IsFalse) + c.Assert(time.Since(now), Less, clientTimeout*2) +} + func (s *testClusterInfoSuite) TestSyncConfig(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) @@ -1161,7 +1198,7 @@ func (s *testClusterInfoSuite) TestSyncConfig(c *C) { for _, v := range testdata { tc.storeConfigManager = config.NewTestStoreConfigManager(v.whiteList) c.Assert(tc.GetStoreConfig().GetRegionMaxSize(), Equals, uint64(144)) - c.Assert(syncConfig(tc.storeConfigManager, tc.GetStores()), Equals, v.updated) + c.Assert(syncConfig(context.Background(), tc.storeConfigManager, tc.GetStores()), Equals, v.updated) c.Assert(tc.GetStoreConfig().GetRegionMaxSize(), Equals, v.maxRegionSize) } } diff --git a/server/config/store_config.go b/server/config/store_config.go index 27fc456dd08..bd7d7465776 100644 --- a/server/config/store_config.go +++ b/server/config/store_config.go @@ -15,12 +15,14 @@ package config import ( + "context" "encoding/json" "fmt" - "io/ioutil" + "io" "net/http" "reflect" "sync/atomic" + "time" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" @@ -41,6 +43,8 @@ var ( defaultRegionMaxKey = uint64(1440000) // default region split key is 960000 defaultRegionSplitKey = uint64(960000) + + clientTimeout = 3 * time.Second ) // StoreConfig is the config of store like TiKV. @@ -182,8 +186,8 @@ func NewTestStoreConfigManager(whiteList []string) *StoreConfigManager { } // ObserveConfig is used to observe the config change. -func (m *StoreConfigManager) ObserveConfig(address string) error { - cfg, err := m.source.GetConfig(address) +func (m *StoreConfigManager) ObserveConfig(ctx context.Context, address string) error { + cfg, err := m.source.GetConfig(ctx, address) if err != nil { return err } @@ -206,7 +210,7 @@ func (m *StoreConfigManager) GetStoreConfig() *StoreConfig { // Source is used to get the store config. type Source interface { - GetConfig(statusAddress string) (*StoreConfig, error) + GetConfig(ctx context.Context, statusAddress string) (*StoreConfig, error) } // TiKVConfigSource is used to get the store config from TiKV. @@ -223,14 +227,20 @@ func newTiKVConfigSource(schema string, client *http.Client) *TiKVConfigSource { } // GetConfig returns the store config from TiKV. -func (s TiKVConfigSource) GetConfig(statusAddress string) (*StoreConfig, error) { +func (s TiKVConfigSource) GetConfig(ctx context.Context, statusAddress string) (*StoreConfig, error) { url := fmt.Sprintf("%s://%s/config", s.schema, statusAddress) - resp, err := s.client.Get(url) + ctx, cancel := context.WithTimeout(ctx, clientTimeout) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + defer cancel() + if err != nil { + return nil, fmt.Errorf("failed to create store config http request: %w", err) + } + resp, err := s.client.Do(req) if err != nil { return nil, err } defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) if err != nil { return nil, err } @@ -253,7 +263,7 @@ func newFakeSource(whiteList []string) *FakeSource { } // GetConfig returns the config. -func (f *FakeSource) GetConfig(url string) (*StoreConfig, error) { +func (f *FakeSource) GetConfig(_ context.Context, url string) (*StoreConfig, error) { if !slice.Contains(f.whiteList, url) { return nil, fmt.Errorf("[url:%s] is not in white list", url) } diff --git a/server/config/store_config_test.go b/server/config/store_config_test.go index 478e1ebb3d7..bf8f3cc7644 100644 --- a/server/config/store_config_test.go +++ b/server/config/store_config_test.go @@ -15,6 +15,7 @@ package config import ( + "context" "crypto/tls" "encoding/json" "net/http" @@ -62,9 +63,9 @@ func (t *testTiKVConfigSuite) TestTiKVConfig(c *C) { func (t *testTiKVConfigSuite) TestUpdateConfig(c *C) { manager := NewTestStoreConfigManager([]string{"tidb.com"}) - manager.ObserveConfig("tikv.com") + manager.ObserveConfig(context.Background(), "tikv.com") c.Assert(manager.GetStoreConfig().GetRegionMaxSize(), Equals, uint64(144)) - manager.ObserveConfig("tidb.com") + manager.ObserveConfig(context.Background(), "tidb.com") c.Assert(manager.GetStoreConfig().GetRegionMaxSize(), Equals, uint64(10)) client := &http.Client{