Skip to content

Commit

Permalink
resolve conflict
Browse files Browse the repository at this point in the history
Signed-off-by: bufferflies <1045931706@qq.com>
  • Loading branch information
bufferflies committed Aug 14, 2023
1 parent ce7b2df commit 9dbc9bf
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 188 deletions.
90 changes: 6 additions & 84 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package cluster

import (
"bytes"
"context"
"fmt"
"math"
Expand Down Expand Up @@ -308,26 +307,26 @@ func (c *RaftCluster) runSyncConfig() {
defer ticker.Stop()
stores := c.GetStores()

syncConfig(c.storeConfigManager, stores)
syncConfig(c.ctx, c.storeConfigManager, stores)
for {
select {
case <-c.ctx.Done():
log.Info("sync store config job is stopped")
return
case <-ticker.C:
if !syncConfig(c.storeConfigManager, stores) {
if !syncConfig(c.ctx, c.storeConfigManager, stores) {
stores = c.GetStores()
}
}
}
}

func syncConfig(manager *config.StoreConfigManager, stores []*core.StoreInfo) bool {
func syncConfig(ctx context.Context, manager *config.StoreConfigManager, stores []*core.StoreInfo) bool {
for index := 0; index < len(stores); index++ {
select {
case <-c.ctx.Done():
case <-ctx.Done():
log.Info("stop sync store config job due to server shutdown")
return
return false
default:
}
// filter out the stores that are tiflash
Expand All @@ -342,15 +341,9 @@ func syncConfig(manager *config.StoreConfigManager, stores []*core.StoreInfo) bo
}
// it will try next store if the current store is failed.
address := netutil.ResolveLoopBackAddr(stores[index].GetStatusAddress(), stores[index].GetAddress())
<<<<<<< HEAD
if err := manager.ObserveConfig(address); err != nil {
=======
switchRaftV2, err := c.observeStoreConfig(c.ctx, address)
if err != nil {
// delete the store if it is failed and retry next store.
if err := manager.ObserveConfig(ctx, address); err != nil {
stores = append(stores[:index], stores[index+1:]...)
index--
>>>>>>> 38d087fec (config: sync store config in time (#6919))
storeSyncConfigEvent.WithLabelValues(address, "fail").Inc()
log.Debug("sync store config failed, it will try next store", zap.Error(err))
continue
Expand All @@ -359,78 +352,7 @@ func syncConfig(manager *config.StoreConfigManager, stores []*core.StoreInfo) bo
// it will only try one store.
return true
}
<<<<<<< HEAD
return false
=======
return false, false
}

// observeStoreConfig is used to observe the store config changes and
// return whether if the new config changes the engine to raft-kv2.
func (c *RaftCluster) observeStoreConfig(ctx context.Context, address string) (bool, error) {
cfg, err := c.fetchStoreConfigFromTiKV(ctx, address)
if err != nil {
return false, err
}
oldCfg := c.opt.GetStoreConfig()
if cfg == nil || oldCfg.Equal(cfg) {
return false, nil
}
log.Info("sync the store config successful",
zap.String("store-address", address),
zap.String("store-config", cfg.String()),
zap.String("old-config", oldCfg.String()))
return c.updateStoreConfig(oldCfg, cfg)
}

// updateStoreConfig updates the store config. This is extracted for testing.
func (c *RaftCluster) updateStoreConfig(oldCfg, cfg *config.StoreConfig) (bool, error) {
cfg.Adjust()
c.opt.SetStoreConfig(cfg)
return oldCfg.Storage.Engine != config.RaftstoreV2 && cfg.Storage.Engine == config.RaftstoreV2, nil
}

// fetchStoreConfigFromTiKV tries to fetch the config from the TiKV store URL.
func (c *RaftCluster) fetchStoreConfigFromTiKV(ctx context.Context, statusAddress string) (*config.StoreConfig, error) {
cfg := &config.StoreConfig{}
failpoint.Inject("mockFetchStoreConfigFromTiKV", func(val failpoint.Value) {
if regionMaxSize, ok := val.(string); ok {
cfg.RegionMaxSize = regionMaxSize
cfg.Storage.Engine = config.RaftstoreV2
}
failpoint.Return(cfg, nil)
})
if c.httpClient == nil {
return nil, fmt.Errorf("failed to get store config due to nil client")
}
var url string
if netutil.IsEnableHTTPS(c.httpClient) {
url = fmt.Sprintf("%s://%s/config", "https", statusAddress)
} else {
url = fmt.Sprintf("%s://%s/config", "http", statusAddress)
}
ctx, cancel := context.WithTimeout(ctx, clientTimeout)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, bytes.NewBuffer(nil))
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create store config http request: %w", err)
}
resp, err := c.httpClient.Do(req)
if err != nil {
cancel()
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
cancel()
if err != nil {
return nil, err
}
if err := json.Unmarshal(body, cfg); err != nil {
return nil, err
}
return cfg, nil
>>>>>>> 38d087fec (config: sync store config in time (#6919))
}

// LoadClusterInfo loads cluster related info.
Expand Down
118 changes: 23 additions & 95 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package cluster

import (
"context"
"encoding/json"
"fmt"
"math"
"math/rand"
Expand Down Expand Up @@ -1290,10 +1291,7 @@ func TestOfflineAndMerge(t *testing.T) {
}
}

<<<<<<< HEAD
func TestSyncConfig(t *testing.T) {
=======
func TestStoreConfigUpdate(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -1306,60 +1304,26 @@ func TestStoreConfigUpdate(t *testing.T) {
re.NoError(tc.putStoreLocked(s))
}
re.Len(tc.getUpStores(), 5)
// Case1: big region.
{
body := `{ "coprocessor": {
"split-region-on-table": false,
"batch-split-limit": 2,
"region-max-size": "15GiB",
"region-split-size": "10GiB",
"region-max-keys": 144000000,
"region-split-keys": 96000000,
"consistency-check-method": "mvcc",
"perf-level": 2
}}`
var config config.StoreConfig
re.NoError(json.Unmarshal([]byte(body), &config))
tc.updateStoreConfig(opt.GetStoreConfig(), &config)
re.Equal(uint64(144000000), opt.GetRegionMaxKeys())
re.Equal(uint64(96000000), opt.GetRegionSplitKeys())
re.Equal(uint64(15*units.GiB/units.MiB), opt.GetRegionMaxSize())
re.Equal(uint64(10*units.GiB/units.MiB), opt.GetRegionSplitSize())
}
// Case2: empty config.
{
body := `{}`
var config config.StoreConfig
re.NoError(json.Unmarshal([]byte(body), &config))
tc.updateStoreConfig(opt.GetStoreConfig(), &config)
re.Equal(uint64(1440000), opt.GetRegionMaxKeys())
re.Equal(uint64(960000), opt.GetRegionSplitKeys())
re.Equal(uint64(144), opt.GetRegionMaxSize())
re.Equal(uint64(96), opt.GetRegionSplitSize())
}
// Case3: raft-kv2 config.
{
body := `{ "coprocessor": {
"split-region-on-table":false,
"batch-split-limit":10,
"region-max-size":"384MiB",
"region-split-size":"256MiB",
"region-max-keys":3840000,
"region-split-keys":2560000,
"consistency-check-method":"mvcc",
"enable-region-bucket":true,
"region-bucket-size":"96MiB",
"region-size-threshold-for-approximate":"384MiB",
"region-bucket-merge-size-ratio":0.33
},
"storage":{
"engine":"raft-kv2"
}}`
var config config.StoreConfig
re.NoError(json.Unmarshal([]byte(body), &config))
tc.updateStoreConfig(opt.GetStoreConfig(), &config)
re.Equal(uint64(96), opt.GetRegionBucketSize())
re.True(opt.IsRaftKV2())

testdata := []struct {
whiteList []string
maxRegionSize uint64
updated bool
}{{
whiteList: []string{},
maxRegionSize: uint64(144),
updated: false,
}, {
whiteList: []string{"127.0.0.1:5"},
maxRegionSize: uint64(10),
updated: true,
}}

for _, v := range testdata {
tc.storeConfigManager = config.NewTestStoreConfigManager(v.whiteList)
re.Equal(uint64(144), tc.GetStoreConfig().GetRegionMaxSize())
re.Equal(v.updated, syncConfig(tc.ctx, tc.storeConfigManager, tc.GetStores()))
re.Equal(v.maxRegionSize, tc.GetStoreConfig().GetRegionMaxSize())
}
}

Expand All @@ -1371,6 +1335,7 @@ func TestSyncConfigContext(t *testing.T) {
_, opt, err := newTestScheduleConfig()
re.NoError(err)
tc := newTestCluster(ctx, opt)
tc.storeConfigManager = config.NewStoreConfigManager(http.DefaultClient)
tc.httpClient = &http.Client{}

server := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
Expand All @@ -1393,48 +1358,11 @@ func TestSyncConfigContext(t *testing.T) {
// trip schema header
now := time.Now()
stores[0].GetMeta().StatusAddress = server.URL[7:]
synced, _ := tc.syncStoreConfig(tc.GetStores())
synced := syncConfig(tc.ctx, tc.storeConfigManager, stores)
re.False(synced)
re.Less(time.Since(now), clientTimeout*2)
}

func TestStoreConfigSync(t *testing.T) {
>>>>>>> 38d087fec (config: sync store config in time (#6919))
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

_, opt, err := newTestScheduleConfig()
re.NoError(err)
tc := newTestCluster(ctx, opt)
stores := newTestStores(5, "2.0.0")
for _, s := range stores {
re.NoError(tc.putStoreLocked(s))
}
re.Len(tc.getUpStores(), 5)

testdata := []struct {
whiteList []string
maxRegionSize uint64
updated bool
}{{
whiteList: []string{},
maxRegionSize: uint64(144),
updated: false,
}, {
whiteList: []string{"127.0.0.1:5"},
maxRegionSize: uint64(10),
updated: true,
}}

for _, v := range testdata {
tc.storeConfigManager = config.NewTestStoreConfigManager(v.whiteList)
re.Equal(uint64(144), tc.GetStoreConfig().GetRegionMaxSize())
re.Equal(v.updated, syncConfig(tc.storeConfigManager, tc.GetStores()))
re.Equal(v.maxRegionSize, tc.GetStoreConfig().GetRegionMaxSize())
}
}

func TestUpdateStorePendingPeerCount(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down
21 changes: 15 additions & 6 deletions server/config/store_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
package config

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"reflect"
"sync/atomic"
"time"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
Expand All @@ -41,6 +43,7 @@ var (
defaultRegionMaxKey = uint64(1440000)
// default region split key is 960000
defaultRegionSplitKey = uint64(960000)
clientTimeout = 3 * time.Second
)

// StoreConfig is the config of store like TiKV.
Expand Down Expand Up @@ -191,8 +194,8 @@ func NewTestStoreConfigManager(whiteList []string) *StoreConfigManager {
}

// ObserveConfig is used to observe the config change.
func (m *StoreConfigManager) ObserveConfig(address string) error {
cfg, err := m.source.GetConfig(address)
func (m *StoreConfigManager) ObserveConfig(ctx context.Context, address string) error {
cfg, err := m.source.GetConfig(ctx, address)
if err != nil {
return err
}
Expand Down Expand Up @@ -222,7 +225,7 @@ func (m *StoreConfigManager) GetStoreConfig() *StoreConfig {

// Source is used to get the store config.
type Source interface {
GetConfig(statusAddress string) (*StoreConfig, error)
GetConfig(ctx context.Context, statusAddress string) (*StoreConfig, error)
}

// TiKVConfigSource is used to get the store config from TiKV.
Expand All @@ -239,9 +242,15 @@ func newTiKVConfigSource(schema string, client *http.Client) *TiKVConfigSource {
}

// GetConfig returns the store config from TiKV.
func (s TiKVConfigSource) GetConfig(statusAddress string) (*StoreConfig, error) {
func (s TiKVConfigSource) GetConfig(ctx context.Context, statusAddress string) (*StoreConfig, error) {
url := fmt.Sprintf("%s://%s/config", s.schema, statusAddress)
resp, err := s.client.Get(url)
ctx, cancel := context.WithTimeout(ctx, clientTimeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, fmt.Errorf("failed to create store config http request: %w", err)
}
resp, err := s.client.Do(req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -269,7 +278,7 @@ func newFakeSource(whiteList []string) *FakeSource {
}

// GetConfig returns the config.
func (f *FakeSource) GetConfig(url string) (*StoreConfig, error) {
func (f *FakeSource) GetConfig(_ context.Context, url string) (*StoreConfig, error) {
if !slice.Contains(f.whiteList, url) {
return nil, fmt.Errorf("[url:%s] is not in white list", url)
}
Expand Down
7 changes: 4 additions & 3 deletions server/config/store_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package config

import (
"context"
"crypto/tls"
"encoding/json"
"net/http"
Expand Down Expand Up @@ -63,13 +64,13 @@ func TestTiKVConfig(t *testing.T) {
func TestUpdateConfig(t *testing.T) {
re := require.New(t)
manager := NewTestStoreConfigManager([]string{"tidb.com"})
manager.ObserveConfig("tikv.com")
manager.ObserveConfig(context.Background(), "tikv.com")
re.Equal(uint64(144), manager.GetStoreConfig().GetRegionMaxSize())
manager.ObserveConfig("tidb.com")
manager.ObserveConfig(context.Background(), "tidb.com")
re.Equal(uint64(10), manager.GetStoreConfig().GetRegionMaxSize())

// case2: the config should not update if config is same expect some ignore field.
c, err := manager.source.GetConfig("tidb.com")
c, err := manager.source.GetConfig(context.Background(), "tidb.com")
re.NoError(err)
re.True(manager.GetStoreConfig().Equal(c))

Expand Down

0 comments on commit 9dbc9bf

Please sign in to comment.