Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/copr: fix build batchCop in disaggregated tiflash mode #40008

Merged
merged 22 commits into from
Jan 4, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
727a0a7
store/copr: fix build batchCop in disaggregated tiflash mode
guo-shaoge Dec 17, 2022
281ddbe
update go.mod
guo-shaoge Dec 19, 2022
6405b32
Merge branch 'master' into disaggregated_batch_copr
guo-shaoge Dec 19, 2022
269a002
update bazel
guo-shaoge Dec 19, 2022
1441516
update err msg
guo-shaoge Dec 19, 2022
6e0b747
Merge branch 'disaggregated_batch_copr' of github.com:guo-shaoge/tidb…
guo-shaoge Dec 19, 2022
697b5fe
Merge branch 'master' of github.com:pingcap/tidb into disaggregated_b…
guo-shaoge Dec 26, 2022
e19720c
fix
guo-shaoge Dec 26, 2022
0176ad5
fix
guo-shaoge Dec 26, 2022
d9fb7e3
add failpoint
guo-shaoge Dec 29, 2022
55d3ddf
Merge branch 'master' of github.com:pingcap/tidb into disaggregated_b…
guo-shaoge Dec 29, 2022
058e2b2
update bazel
guo-shaoge Dec 29, 2022
f9bee04
Merge branch 'master' of github.com:pingcap/tidb into disaggregated_b…
guo-shaoge Jan 2, 2023
298cfbb
Merge branch 'master' into disaggregated_batch_copr
guo-shaoge Jan 3, 2023
110af29
trivial fix
guo-shaoge Jan 3, 2023
5fefec7
Merge branch 'disaggregated_batch_copr' of github.com:guo-shaoge/tidb…
guo-shaoge Jan 3, 2023
bd76d19
Merge branch 'master' into disaggregated_batch_copr
guo-shaoge Jan 3, 2023
0b25423
Merge branch 'master' into disaggregated_batch_copr
breezewish Jan 4, 2023
72f26a8
Merge branch 'master' into disaggregated_batch_copr
ti-chi-bot Jan 4, 2023
3e7b41c
Merge branch 'master' into disaggregated_batch_copr
ti-chi-bot Jan 4, 2023
0a4261e
Merge branch 'master' into disaggregated_batch_copr
ti-chi-bot Jan 4, 2023
3ffc655
Merge branch 'master' into disaggregated_batch_copr
ti-chi-bot Jan 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3523,12 +3523,20 @@ def go_deps():
sum = "h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=",
version = "v0.0.0-20181126055449-889f96f722a2",
)
go_repository(
name = "com_github_tiancaiamao_gp",
build_file_proto_mode = "disable",
importpath = "github.com/tiancaiamao/gp",
sum = "h1:iffZXeHZTd35tTOS3nJ2OyMUmn40eNkLHCeQXMs6KYI=",
version = "v0.0.0-20221214071713-abacb15f16f1",
)

go_repository(
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:/glZOHs/K2pkCioDVae+aThUHFYRYQkEgY4NUTgfh+s=",
version = "v2.0.3",
sum = "h1:c5oR15IdOvGTFOKPZrJWKLsmecos8zL9eunEZDysfR4=",
version = "v2.0.4-0.20221219075931-a4f5c00b4667",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ require (
github.com/stretchr/testify v1.8.0
github.com/tdakkota/asciicheck v0.1.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.3
github.com/tikv/client-go/v2 v2.0.4-0.20221219075931-a4f5c00b4667
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144
github.com/twmb/murmur3 v1.1.3
Expand Down Expand Up @@ -219,6 +219,7 @@ require (
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221214071713-abacb15f16f1 // indirect
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/tklauser/numcpus v0.4.0 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -932,8 +932,10 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR
github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tikv/client-go/v2 v2.0.3 h1:/glZOHs/K2pkCioDVae+aThUHFYRYQkEgY4NUTgfh+s=
github.com/tikv/client-go/v2 v2.0.3/go.mod h1:MDT4J9LzgS7Bj1DnEq6Gk/puy6mp8TgUC92zGEVVLLg=
github.com/tiancaiamao/gp v0.0.0-20221214071713-abacb15f16f1 h1:iffZXeHZTd35tTOS3nJ2OyMUmn40eNkLHCeQXMs6KYI=
github.com/tiancaiamao/gp v0.0.0-20221214071713-abacb15f16f1/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/client-go/v2 v2.0.4-0.20221219075931-a4f5c00b4667 h1:c5oR15IdOvGTFOKPZrJWKLsmecos8zL9eunEZDysfR4=
github.com/tikv/client-go/v2 v2.0.4-0.20221219075931-a4f5c00b4667/go.mod h1:CUlYic0IhmNy2WU2liHHOEK57Hw+2kQ+SRFcLsnjkPw=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A=
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro=
Expand Down
1 change: 0 additions & 1 deletion store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ go_library(
"@com_github_pingcap_kvproto//pkg/mpp",
"@com_github_pingcap_log//:log",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_stathat_consistent//:consistent",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//error",
"@com_github_tikv_client_go_v2//metrics",
Expand Down
190 changes: 121 additions & 69 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/pingcap/tidb/store/driver/backoff"
derr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/util/logutil"
"github.com/stathat/consistent"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
Expand Down Expand Up @@ -323,40 +322,15 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []
storeTaskMap[taskStoreID] = batchTask
}
} else {
logutil.BgLogger().Info("detecting available mpp stores")
// decide the available stores
stores := cache.RegionCache.GetTiFlashStores()
var wg sync.WaitGroup
var mu sync.Mutex
wg.Add(len(stores))
for i := range stores {
go func(idx int) {
defer wg.Done()
s := stores[idx]

// check if store is failed already.
ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, s.GetAddr(), ttl)
if !ok {
return
}

tikvClient := kvStore.GetTiKVClient()
ok = detectMPPStore(ctx, tikvClient, s.GetAddr(), DetectTimeoutLimit)
if !ok {
GlobalMPPFailedStoreProber.Add(ctx, s.GetAddr(), tikvClient)
return
}

mu.Lock()
defer mu.Unlock()
storeTaskMap[s.StoreID()] = &batchCopTask{
storeAddr: s.GetAddr(),
cmdType: originalTasks[0].cmdType,
ctx: &tikv.RPCContext{Addr: s.GetAddr(), Store: s},
}
}(i)
aliveStores := filterAliveStores(ctx, stores, ttl, kvStore)
for _, s := range aliveStores {
storeTaskMap[s.StoreID()] = &batchCopTask{
storeAddr: s.GetAddr(),
cmdType: originalTasks[0].cmdType,
ctx: &tikv.RPCContext{Addr: s.GetAddr(), Store: s},
}
}
wg.Wait()
}

var candidateRegionInfos []RegionInfo
Expand Down Expand Up @@ -513,7 +487,7 @@ func buildBatchCopTasksForNonPartitionedTable(bo *backoff.Backoffer,
balanceWithContinuity bool,
balanceContinuousRegionCount int64) ([]*batchCopTask, error) {
if config.GetGlobalConfig().DisaggregatedTiFlash {
return buildBatchCopTasksConsistentHash(bo, store, []*KeyRanges{ranges}, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
return buildBatchCopTasksConsistentHash(bo, store, []*KeyRanges{ranges}, storeType, ttl)
}
return buildBatchCopTasksCore(bo, store, []*KeyRanges{ranges}, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
}
Expand All @@ -528,7 +502,7 @@ func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer,
balanceContinuousRegionCount int64,
partitionIDs []int64) (batchTasks []*batchCopTask, err error) {
if config.GetGlobalConfig().DisaggregatedTiFlash {
batchTasks, err = buildBatchCopTasksConsistentHash(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
batchTasks, err = buildBatchCopTasksConsistentHash(bo, store, rangesForEachPhysicalTable, storeType, ttl)
} else {
batchTasks, err = buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
}
Expand All @@ -540,49 +514,127 @@ func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer,
return batchTasks, nil
}

func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) {
batchTasks, err := buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
if err != nil {
return nil, err
}
cache := store.GetRegionCache()
stores, err := cache.GetTiFlashComputeStores(bo.TiKVBackoffer())
if err != nil {
return nil, err
}
if len(stores) == 0 {
return nil, errors.New("No available tiflash_compute node")
}
func filterAliveStores(ctx context.Context, stores []*tikv.Store, ttl time.Duration, kvStore *kvStore) []*tikv.Store {
var aliveStores []*tikv.Store
var wg sync.WaitGroup
var mu sync.Mutex
wg.Add(len(stores))
for i := range stores {
go func(idx int) {
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
defer wg.Done()
s := stores[idx]

// check if store is failed already.
ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, s.GetAddr(), ttl)
if !ok {
return
}

hasher := consistent.New()
for _, store := range stores {
hasher.Add(store.GetAddr())
tikvClient := kvStore.GetTiKVClient()
ok = detectMPPStore(ctx, tikvClient, s.GetAddr(), DetectTimeoutLimit)
if !ok {
GlobalMPPFailedStoreProber.Add(ctx, s.GetAddr(), tikvClient)
return
}

mu.Lock()
defer mu.Unlock()
aliveStores = append(aliveStores, s)
}(i)
}
for _, task := range batchTasks {
addr, err := hasher.Get(task.storeAddr)

logutil.BgLogger().Info("detecting available mpp stores", zap.Any("total", len(stores)), zap.Any("alive", len(aliveStores)))
return aliveStores
}

// 1. Split range by region location to build copTasks.
// 2. For each copTask build its rpcCtx , the target tiflash_compute node will be chosen using consistent hash.
// 3. All copTasks that will be sent to one tiflash_compute node are put in one batchCopTask.
func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer,
kvStore *kvStore,
rangesForEachPhysicalTable []*KeyRanges,
storeType kv.StoreType,
ttl time.Duration) (res []*batchCopTask, err error) {
const cmdType = tikvrpc.CmdBatchCop
var retryNum int
cache := kvStore.GetRegionCache()

for {
retryNum++
var rangesLen int
tasks := make([]*copTask, 0)
regionIDs := make([]tikv.RegionVerID, 0)

for i, ranges := range rangesForEachPhysicalTable {
rangesLen += ranges.Len()
locations, err := cache.SplitKeyRangesByLocations(bo, ranges)
if err != nil {
return nil, errors.Trace(err)
}
for _, lo := range locations {
tasks = append(tasks, &copTask{
region: lo.Location.Region,
ranges: lo.Ranges,
cmdType: cmdType,
storeType: storeType,
partitionIndex: int64(i),
})
regionIDs = append(regionIDs, lo.Location.Region)
}
}

stores, err := cache.GetTiFlashComputeStores(bo.TiKVBackoffer())
if err != nil {
return nil, err
}
var store *tikv.Store
for _, s := range stores {
if s.GetAddr() == addr {
store = s
break
stores = filterAliveStores(bo.GetCtx(), stores, ttl, kvStore)
if len(stores) == 0 {
return nil, errors.New("tiflash_compute node is unavailable")
}

rpcCtxs, err := cache.GetTiFlashComputeRPCContextByConsistentHash(bo.TiKVBackoffer(), regionIDs, stores)
if err != nil {
return nil, err
}
if rpcCtxs == nil {
logutil.BgLogger().Info("buildBatchCopTasksConsistentHash retry because rcpCtx is nil", zap.Int("retryNum", retryNum))
err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer"))
if err != nil {
return nil, errors.Trace(err)
}
continue
}
if store == nil {
return nil, errors.New("cannot find tiflash_compute store: " + addr)
if len(rpcCtxs) != len(tasks) {
return nil, errors.Errorf("length should be equal, len(rpcCtxs): %d, len(tasks): %d", len(rpcCtxs), len(tasks))
}

task.storeAddr = addr
task.ctx.Store = store
task.ctx.Addr = addr
}
logutil.BgLogger().Info("build batchCop tasks for disaggregated tiflash using ConsistentHash done.", zap.Int("len(tasks)", len(batchTasks)))
for _, task := range batchTasks {
logutil.BgLogger().Debug("batchTasks detailed info", zap.String("addr", task.storeAddr), zap.Int("RegionInfo number", len(task.regionInfos)))
taskMap := make(map[string]*batchCopTask)
for i, rpcCtx := range rpcCtxs {
regionInfo := RegionInfo{
// tasks and rpcCtxs are correspond to each other.
Region: tasks[i].region,
Meta: rpcCtx.Meta,
Ranges: tasks[i].ranges,
AllStores: []uint64{rpcCtx.Store.StoreID()},
PartitionIndex: tasks[i].partitionIndex,
}
if batchTask, ok := taskMap[rpcCtx.Addr]; ok {
batchTask.regionInfos = append(batchTask.regionInfos, regionInfo)
} else {
batchTask := &batchCopTask{
storeAddr: rpcCtx.Addr,
cmdType: cmdType,
ctx: rpcCtx,
regionInfos: []RegionInfo{regionInfo},
}
taskMap[rpcCtx.Addr] = batchTask
res = append(res, batchTask)
}
}
logutil.BgLogger().Info("buildBatchCopTasksConsistentHash done", zap.Any("len(tasks)", len(taskMap)), zap.Any("len(tiflash_compute)", len(stores)))
break
}
return batchTasks, nil

return res, nil
}

// When `partitionIDs != nil`, it means that buildBatchCopTasksCore is constructing a batch cop tasks for PartitionTableScan.
Expand Down
30 changes: 12 additions & 18 deletions store/copr/batch_request_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/config"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
Expand All @@ -36,7 +35,6 @@ type RegionInfo struct {
Ranges *KeyRanges
AllStores []uint64
PartitionIndex int64 // used by PartitionTableScan, indicates the n-th partition of the partition table
Addr string
}

func (ri *RegionInfo) toCoprocessorRegionInfo() *coprocessor.RegionInfo {
Expand Down Expand Up @@ -100,22 +98,18 @@ func (ss *RegionBatchRequestSender) onSendFailForBatchRegions(bo *Backoffer, ctx
return tikverr.ErrTiDBShuttingDown
}

if config.GetGlobalConfig().DisaggregatedTiFlash {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete this if branch because there will be no batchCop protocol in disaggregated tiflash mode. Also mpp error handling will be handled in mpp.go

ss.GetRegionCache().InvalidateTiFlashComputeStoresIfGRPCError(err)
} else {
// The reload region param is always true. Because that every time we try, we must
// re-build the range then re-create the batch sender. As a result, the len of "failStores"
// will change. If tiflash's replica is more than two, the "reload region" will always be false.
// Now that the batch cop and mpp has a relative low qps, it's reasonable to reload every time
// when meeting io error.
rc := RegionCache{ss.GetRegionCache()}
rc.OnSendFailForBatchRegions(bo, ctx.Store, regionInfos, true, err)
// The reload region param is always true. Because that every time we try, we must
// re-build the range then re-create the batch sender. As a result, the len of "failStores"
// will change. If tiflash's replica is more than two, the "reload region" will always be false.
// Now that the batch cop and mpp has a relative low qps, it's reasonable to reload every time
// when meeting io error.
rc := RegionCache{ss.GetRegionCache()}
rc.OnSendFailForBatchRegions(bo, ctx.Store, regionInfos, true, err)

// Retry on send request failure when it's not canceled.
// When a store is not available, the leader of related region should be elected quickly.
// TODO: the number of retry time should be limited:since region may be unavailable
// when some unrecoverable disaster happened.
err = bo.Backoff(tikv.BoTiFlashRPC(), errors.Errorf("send request error: %v, ctx: %v, regionInfos: %v", err, ctx, regionInfos))
}
// Retry on send request failure when it's not canceled.
// When a store is not available, the leader of related region should be elected quickly.
// TODO: the number of retry time should be limited:since region may be unavailable
// when some unrecoverable disaster happened.
err = bo.Backoff(tikv.BoTiFlashRPC(), errors.Errorf("send request error: %v, ctx: %v, regionInfos: %v", err, ctx, regionInfos))
return errors.Trace(err)
}
Loading