Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#6919
Browse files Browse the repository at this point in the history
close tikv#6918

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
bufferflies authored and ti-chi-bot committed Aug 10, 2023
1 parent 3794a27 commit b973f44
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 0 deletions.
84 changes: 84 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package cluster

import (
"bytes"
"context"
"fmt"
"math"
Expand Down Expand Up @@ -430,6 +431,12 @@ func (c *RaftCluster) runSyncConfig() {
// switchRaftV2 is true if the config of tikv engine is changed and engine is raft-kv2.
func syncConfig(manager *config.StoreConfigManager, stores []*core.StoreInfo) (synced bool, switchRaftV2 bool) {
for index := 0; index < len(stores); index++ {
select {
case <-c.ctx.Done():
log.Info("stop sync store config job due to server shutdown")
return
default:
}
// filter out the stores that are tiflash
store := stores[index]
if store.IsTiFlash() {
Expand All @@ -442,8 +449,15 @@ func syncConfig(manager *config.StoreConfigManager, stores []*core.StoreInfo) (s
}
// it will try next store if the current store is failed.
address := netutil.ResolveLoopBackAddr(stores[index].GetStatusAddress(), stores[index].GetAddress())
<<<<<<< HEAD
switchRaftV2, err := manager.ObserveConfig(address)
=======
switchRaftV2, err := c.observeStoreConfig(c.ctx, address)
>>>>>>> 38d087fec (config: sync store config in time (#6919))
if err != nil {
// delete the store if it is failed and retry next store.
stores = append(stores[:index], stores[index+1:]...)
index--
storeSyncConfigEvent.WithLabelValues(address, "fail").Inc()
log.Debug("sync store config failed, it will try next store", zap.Error(err))
continue
Expand All @@ -457,6 +471,76 @@ func syncConfig(manager *config.StoreConfigManager, stores []*core.StoreInfo) (s
return false, false
}

<<<<<<< HEAD
=======
// observeStoreConfig is used to observe the store config changes and
// return whether if the new config changes the engine to raft-kv2.
func (c *RaftCluster) observeStoreConfig(ctx context.Context, address string) (bool, error) {
cfg, err := c.fetchStoreConfigFromTiKV(ctx, address)
if err != nil {
return false, err
}
oldCfg := c.opt.GetStoreConfig()
if cfg == nil || oldCfg.Equal(cfg) {
return false, nil
}
log.Info("sync the store config successful",
zap.String("store-address", address),
zap.String("store-config", cfg.String()),
zap.String("old-config", oldCfg.String()))
return c.updateStoreConfig(oldCfg, cfg)
}

// updateStoreConfig updates the store config. This is extracted for testing.
func (c *RaftCluster) updateStoreConfig(oldCfg, cfg *config.StoreConfig) (bool, error) {
cfg.Adjust()
c.opt.SetStoreConfig(cfg)
return oldCfg.Storage.Engine != config.RaftstoreV2 && cfg.Storage.Engine == config.RaftstoreV2, nil
}

// fetchStoreConfigFromTiKV tries to fetch the config from the TiKV store URL.
func (c *RaftCluster) fetchStoreConfigFromTiKV(ctx context.Context, statusAddress string) (*config.StoreConfig, error) {
cfg := &config.StoreConfig{}
failpoint.Inject("mockFetchStoreConfigFromTiKV", func(val failpoint.Value) {
if regionMaxSize, ok := val.(string); ok {
cfg.RegionMaxSize = regionMaxSize
cfg.Storage.Engine = config.RaftstoreV2
}
failpoint.Return(cfg, nil)
})
if c.httpClient == nil {
return nil, fmt.Errorf("failed to get store config due to nil client")
}
var url string
if netutil.IsEnableHTTPS(c.httpClient) {
url = fmt.Sprintf("%s://%s/config", "https", statusAddress)
} else {
url = fmt.Sprintf("%s://%s/config", "http", statusAddress)
}
ctx, cancel := context.WithTimeout(ctx, clientTimeout)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, bytes.NewBuffer(nil))
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create store config http request: %w", err)
}
resp, err := c.httpClient.Do(req)
if err != nil {
cancel()
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
cancel()
if err != nil {
return nil, err
}
if err := json.Unmarshal(body, cfg); err != nil {
return nil, err
}
return cfg, nil
}

>>>>>>> 38d087fec (config: sync store config in time (#6919))
// LoadClusterInfo loads cluster related info.
func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) {
c.meta = &metapb.Cluster{}
Expand Down
51 changes: 51 additions & 0 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"fmt"
"math"
"math/rand"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -1329,6 +1331,7 @@ func TestSyncConfig(t *testing.T) {
},
}

<<<<<<< HEAD
for _, v := range testdata {
tc.storeConfigManager = config.NewTestStoreConfigManager(v.whiteList)
re.Equal(uint64(144), tc.GetStoreConfig().GetRegionMaxSize())
Expand All @@ -1344,6 +1347,54 @@ func TestSyncConfig(t *testing.T) {
re.False(switchRaftV2)
}
re.Equal(v.maxRegionSize, tc.GetStoreConfig().GetRegionMaxSize())
=======
func TestSyncConfigContext(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

_, opt, err := newTestScheduleConfig()
re.NoError(err)
tc := newTestCluster(ctx, opt)
tc.httpClient = &http.Client{}

server := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
time.Sleep(time.Second * 100)
cfg := &config.StoreConfig{}
b, err := json.Marshal(cfg)
if err != nil {
res.WriteHeader(http.StatusInternalServerError)
res.Write([]byte(fmt.Sprintf("failed setting up test server: %s", err)))
return
}

res.WriteHeader(http.StatusOK)
res.Write(b)
}))
stores := newTestStores(1, "2.0.0")
for _, s := range stores {
re.NoError(tc.putStoreLocked(s))
}
// trip schema header
now := time.Now()
stores[0].GetMeta().StatusAddress = server.URL[7:]
synced, _ := tc.syncStoreConfig(tc.GetStores())
re.False(synced)
re.Less(time.Since(now), clientTimeout*2)
}

func TestStoreConfigSync(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

_, opt, err := newTestScheduleConfig()
re.NoError(err)
tc := newTestCluster(ctx, opt)
stores := newTestStores(5, "2.0.0")
for _, s := range stores {
re.NoError(tc.putStoreLocked(s))
>>>>>>> 38d087fec (config: sync store config in time (#6919))
}
}

Expand Down

0 comments on commit b973f44

Please sign in to comment.