Skip to content

Commit

Permalink
store: remove stores that have no region before balance (#52790)
Browse files Browse the repository at this point in the history
ref #52313
  • Loading branch information
xzhangxian1008 authored May 28, 2024
1 parent b511a45 commit 61e5fa0
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 7 deletions.
3 changes: 2 additions & 1 deletion store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ go_test(
embed = [":copr"],
flaky = True,
race = "on",
shard_count = 29,
shard_count = 30,
deps = [
"//kv",
"//store/driver/backoff",
Expand All @@ -88,6 +88,7 @@ go_test(
"//util/trxevents",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/coprocessor",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_kvproto//pkg/mpp",
"@com_github_stathat_consistent//:consistent",
"@com_github_stretchr_testify//require",
Expand Down
26 changes: 22 additions & 4 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,19 @@ func balanceBatchCopTaskWithContinuity(storeTaskMap map[uint64]*batchCopTask, ca
return res, score
}

func getUsedStores(cache *RegionCache, usedTiFlashStoresMap map[uint64]struct{}) []*tikv.Store {
// decide the available stores
stores := cache.RegionCache.GetTiFlashStores(tikv.LabelFilterNoTiFlashWriteNode)
usedStores := make([]*tikv.Store, 0)
for _, store := range stores {
_, ok := usedTiFlashStoresMap[store.StoreID()]
if ok {
usedStores = append(usedStores, store)
}
}
return usedStores
}

// balanceBatchCopTask balance the regions between available stores, the basic rule is
// 1. the first region of each original batch cop task belongs to its original store because some
// meta data(like the rpc context) in batchCopTask is related to it
Expand All @@ -300,7 +313,7 @@ func balanceBatchCopTaskWithContinuity(storeTaskMap map[uint64]*batchCopTask, ca
//
// The second balance strategy: Not only consider the region count between TiFlash stores, but also try to make the regions' range continuous(stored in TiFlash closely).
// If balanceWithContinuity is true, the second balance strategy is enable.
func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []*batchCopTask, isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) []*batchCopTask {
func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, usedTiFlashStoresMap map[uint64]struct{}, originalTasks []*batchCopTask, isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) []*batchCopTask {
if len(originalTasks) == 0 {
log.Info("Batch cop task balancer got an empty task set.")
return originalTasks
Expand Down Expand Up @@ -329,8 +342,8 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []
storeTaskMap[taskStoreID] = batchTask
}
} else {
stores := cache.RegionCache.GetTiFlashStores(tikv.LabelFilterNoTiFlashWriteNode)
aliveStores := filterAliveStores(ctx, stores, ttl, kvStore)
usedStores := getUsedStores(cache, usedTiFlashStoresMap)
aliveStores := filterAliveStores(ctx, usedStores, ttl, kvStore)
for _, s := range aliveStores {
storeTaskMap[s.StoreID()] = &batchCopTask{
storeAddr: s.GetAddr(),
Expand Down Expand Up @@ -823,6 +836,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
var batchTasks []*batchCopTask

storeTaskMap := make(map[string]*batchCopTask)
usedTiFlashStoresMap := make(map[uint64]struct{})
needRetry := false
for _, task := range tasks {
rpcCtx, err := cache.GetTiFlashRPCContext(bo.TiKVBackoffer(), task.region, isMPP, tikv.LabelFilterNoTiFlashWriteNode)
Expand Down Expand Up @@ -852,6 +866,10 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
}
storeTaskMap[rpcCtx.Addr] = batchTask
}

for _, store := range allStores {
usedTiFlashStoresMap[store] = struct{}{}
}
}
if needRetry {
// As mentioned above, nil rpcCtx is always attributed to failed stores.
Expand All @@ -875,7 +893,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
logutil.BgLogger().Debug(msg)
}
balanceStart := time.Now()
batchTasks = balanceBatchCopTask(bo.GetCtx(), store, batchTasks, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
batchTasks = balanceBatchCopTask(bo.GetCtx(), store, usedTiFlashStoresMap, batchTasks, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
balanceElapsed := time.Since(balanceStart)
if log.GetLevel() <= zap.DebugLevel {
msg := "After region balance:"
Expand Down
40 changes: 38 additions & 2 deletions store/copr/batch_coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/driver/backoff"
"github.com/pingcap/tidb/util/logutil"
"github.com/stathat/consistent"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -125,13 +128,13 @@ func TestBalanceBatchCopTaskWithContinuity(t *testing.T) {
func TestBalanceBatchCopTaskWithEmptyTaskSet(t *testing.T) {
{
var nilTaskSet []*batchCopTask
nilResult := balanceBatchCopTask(nil, nil, nilTaskSet, false, time.Second, false, 0)
nilResult := balanceBatchCopTask(nil, nil, nil, nilTaskSet, false, time.Second, false, 0)
require.True(t, nilResult == nil)
}

{
emptyTaskSet := make([]*batchCopTask, 0)
emptyResult := balanceBatchCopTask(nil, nil, emptyTaskSet, false, time.Second, false, 0)
emptyResult := balanceBatchCopTask(nil, nil, nil, emptyTaskSet, false, time.Second, false, 0)
require.True(t, emptyResult != nil)
require.True(t, len(emptyResult) == 0)
}
Expand Down Expand Up @@ -282,3 +285,36 @@ func TestTopoFetcherBackoff(t *testing.T) {
require.GreaterOrEqual(t, dura, 30*time.Second)
require.LessOrEqual(t, dura, 50*time.Second)
}

func TestGetUsedStores(t *testing.T) {
mockClient, _, pdClient, err := testutils.NewMockTiKV("", nil)
require.NoError(t, err)
defer func() {
pdClient.Close()
err = mockClient.Close()
require.NoError(t, err)
}()

pdCli := &tikv.CodecPDClient{Client: pdClient}
defer pdCli.Close()

cache := NewRegionCache(tikv.NewRegionCache(pdCli))
defer cache.Close()

label1 := metapb.StoreLabel{Key: tikvrpc.EngineLabelKey, Value: tikvrpc.EngineLabelTiFlash}
label2 := metapb.StoreLabel{Key: tikvrpc.EngineRoleLabelKey, Value: tikvrpc.EngineLabelTiFlashCompute}

cache.SetRegionCacheStore(1, "192.168.1.1", "", tikvrpc.TiFlash, 1, []*metapb.StoreLabel{&label1, &label2})
cache.SetRegionCacheStore(2, "192.168.1.2", "192.168.1.3", tikvrpc.TiFlash, 1, []*metapb.StoreLabel{&label1, &label2})
cache.SetRegionCacheStore(3, "192.168.1.3", "192.168.1.2", tikvrpc.TiFlash, 1, []*metapb.StoreLabel{&label1, &label2})

allUsedTiFlashStoresMap := make(map[uint64]struct{})
allUsedTiFlashStoresMap[2] = struct{}{}
allUsedTiFlashStoresMap[3] = struct{}{}
allUsedTiFlashStores := getUsedStores(cache, allUsedTiFlashStoresMap)
require.Equal(t, len(allUsedTiFlashStoresMap), len(allUsedTiFlashStores))
for _, store := range allUsedTiFlashStores {
_, ok := allUsedTiFlashStoresMap[store.StoreID()]
require.True(t, ok)
}
}

0 comments on commit 61e5fa0

Please sign in to comment.