Skip to content

Commit

Permalink
sync config add context
Browse files Browse the repository at this point in the history
Signed-off-by: bufferflies <1045931706@qq.com>
  • Loading branch information
bufferflies committed Aug 9, 2023
1 parent d4138c8 commit bea52c9
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 5 deletions.
28 changes: 23 additions & 5 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package cluster

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -450,6 +451,12 @@ func (c *RaftCluster) runStoreConfigSync() {
// - `switchRaftV2` is true if the config of tikv engine is change to raft-kv2.
func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, switchRaftV2 bool) {
for index := 0; index < len(stores); index++ {
select {
case <-c.ctx.Done():
log.Info("sync store config job is stopped")
return
default:
}
// filter out the stores that are tiflash
store := stores[index]
if store.IsTiFlash() {
Expand All @@ -462,8 +469,11 @@ func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, sw
}
// it will try next store if the current store is failed.
address := netutil.ResolveLoopBackAddr(stores[index].GetStatusAddress(), stores[index].GetAddress())
switchRaftV2, err := c.observeStoreConfig(address)
switchRaftV2, err := c.observeStoreConfig(c.ctx, address)
if err != nil {
// delete the store if it is failed and retry next store.
stores = append(stores[:index], stores[index+1:]...)
index--
storeSyncConfigEvent.WithLabelValues(address, "fail").Inc()
log.Debug("sync store config failed, it will try next store", zap.Error(err))
continue
Expand All @@ -479,8 +489,8 @@ func (c *RaftCluster) syncStoreConfig(stores []*core.StoreInfo) (synced bool, sw

// observeStoreConfig is used to observe the store config changes and
// return whether if the new config changes the engine to raft-kv2.
func (c *RaftCluster) observeStoreConfig(address string) (bool, error) {
cfg, err := c.fetchStoreConfigFromTiKV(address)
func (c *RaftCluster) observeStoreConfig(ctx context.Context, address string) (bool, error) {
cfg, err := c.fetchStoreConfigFromTiKV(ctx, address)
if err != nil {
return false, err
}
Expand All @@ -503,7 +513,7 @@ func (c *RaftCluster) updateStoreConfig(oldCfg, cfg *config.StoreConfig) (bool,
}

// fetchStoreConfigFromTiKV tries to fetch the config from the TiKV store URL.
func (c *RaftCluster) fetchStoreConfigFromTiKV(statusAddress string) (*config.StoreConfig, error) {
func (c *RaftCluster) fetchStoreConfigFromTiKV(ctx context.Context, statusAddress string) (*config.StoreConfig, error) {
cfg := &config.StoreConfig{}
failpoint.Inject("mockFetchStoreConfigFromTiKV", func(val failpoint.Value) {
if regionMaxSize, ok := val.(string); ok {
Expand All @@ -521,12 +531,20 @@ func (c *RaftCluster) fetchStoreConfigFromTiKV(statusAddress string) (*config.St
} else {
url = fmt.Sprintf("%s://%s/config", "http", statusAddress)
}
resp, err := c.httpClient.Get(url)
ctx, cancel := context.WithTimeout(ctx, clientTimeout)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, bytes.NewBuffer(nil))
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create http request: %w", err)
}
resp, err := c.httpClient.Do(req)
if err != nil {
cancel()
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
cancel()
if err != nil {
return nil, err
}
Expand Down
45 changes: 45 additions & 0 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"fmt"
"math"
"math/rand"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -1386,6 +1388,49 @@ func TestStoreConfigUpdate(t *testing.T) {
}
}

func TestRemoveUnhealthyStore(t *testing.T) {
arr := []int{1, 2, 3}
re := require.New(t)
index := 0
arr = append(arr[:index], arr[index+1:]...)
re.Len(arr, 2)
}

func TestSyncConfigContext(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

_, opt, err := newTestScheduleConfig()
re.NoError(err)
tc := newTestCluster(ctx, opt)
tc.httpClient = &http.Client{}

server := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
time.Sleep(time.Second * 100)
cfg := &config.StoreConfig{}
b, err := json.Marshal(cfg)
if err != nil {
res.WriteHeader(http.StatusInternalServerError)
res.Write([]byte(fmt.Sprintf("failed setting up test server: %s", err)))
return
}

res.WriteHeader(http.StatusOK)
res.Write(b)
}))
stores := newTestStores(1, "2.0.0")
for _, s := range stores {
re.NoError(tc.putStoreLocked(s))
}
// trip schema header
now := time.Now()
stores[0].GetMeta().StatusAddress = server.URL[7:]
synced, _ := tc.syncStoreConfig(tc.GetStores())
re.False(synced)
re.Less(time.Since(now), clientTimeout*2)
}

func TestStoreConfigSync(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down

0 comments on commit bea52c9

Please sign in to comment.