From 727a0a7ca31d7c96083a7b820912945e7a930fd4 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Sat, 17 Dec 2022 16:26:47 +0800 Subject: [PATCH 1/9] store/copr: fix build batchCop in disaggregated tiflash mode Signed-off-by: guo-shaoge --- store/copr/batch_coprocessor.go | 241 ++++++++++++++++++----------- store/copr/batch_request_sender.go | 30 ++-- store/copr/mpp.go | 22 +-- 3 files changed, 172 insertions(+), 121 deletions(-) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 801eebc40de9a..703e523f1c041 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -36,7 +36,6 @@ import ( "github.com/pingcap/tidb/store/driver/backoff" derr "github.com/pingcap/tidb/store/driver/error" "github.com/pingcap/tidb/util/logutil" - "github.com/stathat/consistent" "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" @@ -325,64 +324,15 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks [] storeTaskMap[taskStoreID] = batchTask } } else { - logutil.BgLogger().Info("detecting available mpp stores") - // decide the available stores stores := cache.RegionCache.GetTiFlashStores() - var wg sync.WaitGroup - var mu sync.Mutex - wg.Add(len(stores)) - cur := time.Now() - for i := range stores { - go func(idx int) { - defer wg.Done() - s := stores[idx] - - var lastAny any - var ok bool - mu.Lock() - if lastAny, ok = mppStoreLastFailTime.Load(s.GetAddr()); ok && cur.Sub(lastAny.(time.Time)) < 100*time.Millisecond { - // The interval time is so short that may happen in a same query, so we needn't to check again. - mu.Unlock() - return - } else if !ok { - lastAny = time.Time{} - } - mu.Unlock() - - resp, err := kvStore.GetTiKVClient().SendRequest(ctx, s.GetAddr(), &tikvrpc.Request{ - Type: tikvrpc.CmdMPPAlive, - StoreTp: tikvrpc.TiFlash, - Req: &mpp.IsAliveRequest{}, - Context: kvrpcpb.Context{}, - }, 2*time.Second) - - if err != nil || !resp.Resp.(*mpp.IsAliveResponse).Available { - errMsg := "store not ready to serve" - if err != nil { - errMsg = err.Error() - } - logutil.BgLogger().Warn("Store is not ready", zap.String("store address", s.GetAddr()), zap.String("err message", errMsg)) - mu.Lock() - mppStoreLastFailTime.Store(s.GetAddr(), time.Now()) - mu.Unlock() - return - } - - if cur.Sub(lastAny.(time.Time)) < ttl { - logutil.BgLogger().Warn("Cannot detect store's availability because the current time has not reached MPPStoreLastFailTime + MPPStoreFailTTL", zap.String("store address", s.GetAddr()), zap.Time("last fail time", lastAny.(time.Time))) - return - } - - mu.Lock() - defer mu.Unlock() - storeTaskMap[s.StoreID()] = &batchCopTask{ - storeAddr: s.GetAddr(), - cmdType: originalTasks[0].cmdType, - ctx: &tikv.RPCContext{Addr: s.GetAddr(), Store: s}, - } - }(i) + aliveStores := filterAliveStores(ctx, stores, mppStoreLastFailTime, ttl, kvStore) + for _, s := range aliveStores { + storeTaskMap[s.StoreID()] = &batchCopTask{ + storeAddr: s.GetAddr(), + cmdType: originalTasks[0].cmdType, + ctx: &tikv.RPCContext{Addr: s.GetAddr(), Store: s}, + } } - wg.Wait() } var candidateRegionInfos []RegionInfo @@ -539,7 +489,7 @@ func buildBatchCopTasksForNonPartitionedTable(bo *backoff.Backoffer, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) { if config.GetGlobalConfig().DisaggregatedTiFlash { - return buildBatchCopTasksConsistentHash(bo, store, []*KeyRanges{ranges}, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount) + return buildBatchCopTasksConsistentHash(bo, store, []*KeyRanges{ranges}, storeType, mppStoreLastFailTime, ttl) } return buildBatchCopTasksCore(bo, store, []*KeyRanges{ranges}, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount) } @@ -554,7 +504,7 @@ func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer, balanceContinuousRegionCount int64, partitionIDs []int64) (batchTasks []*batchCopTask, err error) { if config.GetGlobalConfig().DisaggregatedTiFlash { - batchTasks, err = buildBatchCopTasksConsistentHash(bo, store, rangesForEachPhysicalTable, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount) + batchTasks, err = buildBatchCopTasksConsistentHash(bo, store, rangesForEachPhysicalTable, storeType, mppStoreLastFailTime, ttl) } else { batchTasks, err = buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount) } @@ -566,49 +516,154 @@ func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer, return batchTasks, nil } -func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, mppStoreLastFailTime *sync.Map, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) { - batchTasks, err := buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount) - if err != nil { - return nil, err - } - cache := store.GetRegionCache() - stores, err := cache.GetTiFlashComputeStores(bo.TiKVBackoffer()) - if err != nil { - return nil, err - } - if len(stores) == 0 { - return nil, errors.New("No available tiflash_compute node") - } +func filterAliveStores(ctx context.Context, stores []*tikv.Store, mppStoreLastFailTime *sync.Map, ttl time.Duration, kvStore *kvStore) []*tikv.Store { + var wg sync.WaitGroup + var mu sync.Mutex + wg.Add(len(stores)) + cur := time.Now() + aliveStores := make([]*tikv.Store, 0, len(stores)) + for i := range stores { + go func(idx int) { + defer wg.Done() + s := stores[idx] + + var lastAny any + var ok bool + mu.Lock() + if lastAny, ok = mppStoreLastFailTime.Load(s.GetAddr()); ok && cur.Sub(lastAny.(time.Time)) < 100*time.Millisecond { + // The interval time is so short that may happen in a same query, so we needn't to check again. + mu.Unlock() + return + } else if !ok { + lastAny = time.Time{} + } + mu.Unlock() + + resp, err := kvStore.GetTiKVClient().SendRequest(ctx, s.GetAddr(), &tikvrpc.Request{ + Type: tikvrpc.CmdMPPAlive, + StoreTp: tikvrpc.TiFlash, + Req: &mpp.IsAliveRequest{}, + Context: kvrpcpb.Context{}, + }, 2*time.Second) + + if err != nil || !resp.Resp.(*mpp.IsAliveResponse).Available { + errMsg := "store not ready to serve" + if err != nil { + errMsg = err.Error() + } + logutil.BgLogger().Warn("Store is not ready", zap.String("store address", s.GetAddr()), zap.String("err message", errMsg)) + mu.Lock() + mppStoreLastFailTime.Store(s.GetAddr(), time.Now()) + mu.Unlock() + return + } + + if cur.Sub(lastAny.(time.Time)) < ttl { + logutil.BgLogger().Warn("Cannot detect store's availability because the current time has not reached MPPStoreLastFailTime + MPPStoreFailTTL", + zap.String("store address", s.GetAddr()), zap.Time("last fail time", lastAny.(time.Time))) + return + } - hasher := consistent.New() - for _, store := range stores { - hasher.Add(store.GetAddr()) + mu.Lock() + defer mu.Unlock() + aliveStores = append(aliveStores, s) + }(i) } - for _, task := range batchTasks { - addr, err := hasher.Get(task.storeAddr) + wg.Wait() + + logutil.BgLogger().Info("detecting available mpp stores", zap.Any("total", len(stores)), zap.Any("alive", len(aliveStores))) + return aliveStores +} + +// 1. Split range by region location to build copTasks. +// 2. For each copTask build its rpcCtx , the target tiflash_compute node will be chosen using consistent hash. +// 3. All copTasks that will be sent to one tiflash_compute node are put in one batchCopTask. +func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, + kvStore *kvStore, + rangesForEachPhysicalTable []*KeyRanges, + storeType kv.StoreType, + mppStoreLastFailTime *sync.Map, + ttl time.Duration) (res []*batchCopTask, err error) { + const cmdType = tikvrpc.CmdBatchCop + var retryNum int + cache := kvStore.GetRegionCache() + + for { + retryNum++ + var rangesLen int + tasks := make([]*copTask, 0) + regionIDs := make([]tikv.RegionVerID, 0) + + for i, ranges := range rangesForEachPhysicalTable { + rangesLen += ranges.Len() + locations, err := cache.SplitKeyRangesByLocations(bo, ranges) + if err != nil { + return nil, errors.Trace(err) + } + for _, lo := range locations { + tasks = append(tasks, &copTask{ + region: lo.Location.Region, + ranges: lo.Ranges, + cmdType: cmdType, + storeType: storeType, + partitionIndex: int64(i), + }) + regionIDs = append(regionIDs, lo.Location.Region) + } + } + + stores, err := cache.GetTiFlashComputeStores(bo.TiKVBackoffer()) if err != nil { return nil, err } - var store *tikv.Store - for _, s := range stores { - if s.GetAddr() == addr { - store = s - break + stores = filterAliveStores(bo.GetCtx(), stores, mppStoreLastFailTime, ttl, kvStore) + if len(stores) == 0 { + return nil, errors.New("tiflash_compute node is unavailable") + } + + rpcCtxs, err := cache.GetTiFlashComputeRPCContextByConsistentHash(bo.TiKVBackoffer(), regionIDs, stores) + if err != nil { + return nil, err + } + if rpcCtxs == nil { + logutil.BgLogger().Info("buildBatchCopTasksConsistentHash retry because rcpCtx is nil", zap.Int("retryNum", retryNum)) + err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer")) + if err != nil { + return nil, errors.Trace(err) } + continue } - if store == nil { - return nil, errors.New("cannot find tiflash_compute store: " + addr) + if len(rpcCtxs) != len(tasks) { + return nil, errors.Errorf("length should be equal", zap.Any("len(rpcCtxs)", len(rpcCtxs)), zap.Any("len(tasks)", len(tasks))) } - - task.storeAddr = addr - task.ctx.Store = store - task.ctx.Addr = addr - } - logutil.BgLogger().Info("build batchCop tasks for disaggregated tiflash using ConsistentHash done.", zap.Int("len(tasks)", len(batchTasks))) - for _, task := range batchTasks { - logutil.BgLogger().Debug("batchTasks detailed info", zap.String("addr", task.storeAddr), zap.Int("RegionInfo number", len(task.regionInfos))) + taskMap := make(map[string]*batchCopTask) + for i, rpcCtx := range rpcCtxs { + regionInfo := RegionInfo{ + // tasks and rpcCtxs are correspond to each other. + Region: tasks[i].region, + Meta: rpcCtx.Meta, + Ranges: tasks[i].ranges, + AllStores: []uint64{rpcCtx.Store.StoreID()}, + PartitionIndex: tasks[i].partitionIndex, + } + if batchTask, ok := taskMap[rpcCtx.Addr]; ok { + batchTask.regionInfos = append(batchTask.regionInfos, regionInfo) + } else { + batchTask := &batchCopTask{ + storeAddr: rpcCtx.Addr, + cmdType: cmdType, + ctx: rpcCtx, + regionInfos: []RegionInfo{regionInfo}, + } + taskMap[rpcCtx.Addr] = batchTask + res = append(res, batchTask) + } + } + logutil.BgLogger().Info("buildBatchCopTasksConsistentHash done", zap.Any("len(tasks)", len(taskMap)), zap.Any("len(tiflash_compute)", len(stores))) + break } - return batchTasks, nil + + return res, nil } // When `partitionIDs != nil`, it means that buildBatchCopTasksCore is constructing a batch cop tasks for PartitionTableScan. diff --git a/store/copr/batch_request_sender.go b/store/copr/batch_request_sender.go index 3a3bd339f4e6c..b976d26a59ab3 100644 --- a/store/copr/batch_request_sender.go +++ b/store/copr/batch_request_sender.go @@ -21,7 +21,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/tidb/config" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" @@ -36,7 +35,6 @@ type RegionInfo struct { Ranges *KeyRanges AllStores []uint64 PartitionIndex int64 // used by PartitionTableScan, indicates the n-th partition of the partition table - Addr string } func (ri *RegionInfo) toCoprocessorRegionInfo() *coprocessor.RegionInfo { @@ -100,22 +98,18 @@ func (ss *RegionBatchRequestSender) onSendFailForBatchRegions(bo *Backoffer, ctx return tikverr.ErrTiDBShuttingDown } - if config.GetGlobalConfig().DisaggregatedTiFlash { - ss.GetRegionCache().InvalidateTiFlashComputeStoresIfGRPCError(err) - } else { - // The reload region param is always true. Because that every time we try, we must - // re-build the range then re-create the batch sender. As a result, the len of "failStores" - // will change. If tiflash's replica is more than two, the "reload region" will always be false. - // Now that the batch cop and mpp has a relative low qps, it's reasonable to reload every time - // when meeting io error. - rc := RegionCache{ss.GetRegionCache()} - rc.OnSendFailForBatchRegions(bo, ctx.Store, regionInfos, true, err) + // The reload region param is always true. Because that every time we try, we must + // re-build the range then re-create the batch sender. As a result, the len of "failStores" + // will change. If tiflash's replica is more than two, the "reload region" will always be false. + // Now that the batch cop and mpp has a relative low qps, it's reasonable to reload every time + // when meeting io error. + rc := RegionCache{ss.GetRegionCache()} + rc.OnSendFailForBatchRegions(bo, ctx.Store, regionInfos, true, err) - // Retry on send request failure when it's not canceled. - // When a store is not available, the leader of related region should be elected quickly. - // TODO: the number of retry time should be limited:since region may be unavailable - // when some unrecoverable disaster happened. - err = bo.Backoff(tikv.BoTiFlashRPC(), errors.Errorf("send request error: %v, ctx: %v, regionInfos: %v", err, ctx, regionInfos)) - } + // Retry on send request failure when it's not canceled. + // When a store is not available, the leader of related region should be elected quickly. + // TODO: the number of retry time should be limited:since region may be unavailable + // when some unrecoverable disaster happened. + err = bo.Backoff(tikv.BoTiFlashRPC(), errors.Errorf("send request error: %v, ctx: %v, regionInfos: %v", err, ctx, regionInfos)) return errors.Trace(err) } diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 37d5629ac6f3d..7dad1ce6aa349 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -252,14 +252,14 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req if originalTask != nil { sender := NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient(), m.enableCollectExecutionInfo) rpcResp, retry, _, err = sender.SendReqToAddr(bo, originalTask.ctx, originalTask.regionInfos, wrappedReq, tikv.ReadTimeoutMedium) - if err != nil && disaggregatedTiFlash { - m.store.GetRegionCache().InvalidateTiFlashComputeStoresIfGRPCError(err) - } // No matter what the rpc error is, we won't retry the mpp dispatch tasks. // TODO: If we want to retry, we must redo the plan fragment cutting and task scheduling. // That's a hard job but we can try it in the future. if sender.GetRPCError() != nil { logutil.BgLogger().Warn("mpp dispatch meet io error", zap.String("error", sender.GetRPCError().Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId)) + if disaggregatedTiFlash { + m.store.GetRegionCache().InvalidateTiFlashComputeStores() + } // if needTriggerFallback is true, we return timeout to trigger tikv's fallback if m.needTriggerFallback { err = derr.ErrTiFlashServerTimeout @@ -273,7 +273,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req retry = false } else if err != nil { if disaggregatedTiFlash { - m.store.GetRegionCache().InvalidateTiFlashComputeStoresIfGRPCError(err) + m.store.GetRegionCache().InvalidateTiFlashComputeStores() } if bo.Backoff(tikv.BoTiFlashRPC(), err) == nil { retry = true @@ -353,6 +353,7 @@ func (m *mppIterator) cancelMppTasks() { } // send cancel cmd to all stores where tasks run + gotErr := atomic.Bool{} wg := util.WaitGroupWrapper{} for addr := range usedStoreAddrs { storeAddr := addr @@ -361,13 +362,14 @@ func (m *mppIterator) cancelMppTasks() { logutil.BgLogger().Debug("cancel task", zap.Uint64("query id ", m.startTs), zap.String("on addr", storeAddr)) if err != nil { logutil.BgLogger().Error("cancel task error", zap.Error(err), zap.Uint64("query id", m.startTs), zap.String("on addr", storeAddr)) - if disaggregatedTiFlash { - m.store.GetRegionCache().InvalidateTiFlashComputeStoresIfGRPCError(err) - } + gotErr.CompareAndSwap(false, true) } }) } wg.Wait() + if gotErr.Load() && disaggregatedTiFlash { + m.store.GetRegionCache().InvalidateTiFlashComputeStores() + } } func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchRequest, taskMeta *mpp.TaskMeta) { @@ -391,15 +393,15 @@ func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchReques if err != nil { logutil.BgLogger().Warn("establish mpp connection meet error and cannot retry", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId)) + if disaggregatedTiFlash { + m.store.GetRegionCache().InvalidateTiFlashComputeStores() + } // if needTriggerFallback is true, we return timeout to trigger tikv's fallback if m.needTriggerFallback { m.sendError(derr.ErrTiFlashServerTimeout) } else { m.sendError(err) } - if disaggregatedTiFlash { - m.store.GetRegionCache().InvalidateTiFlashComputeStoresIfGRPCError(err) - } return } From 281ddbe1042a0d59e99151026e5a5b70c3f378ac Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 19 Dec 2022 16:12:12 +0800 Subject: [PATCH 2/9] update go.mod Signed-off-by: guo-shaoge --- go.mod | 3 ++- go.sum | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index df18ccec64b18..223d775072539 100644 --- a/go.mod +++ b/go.mod @@ -89,7 +89,7 @@ require ( github.com/stretchr/testify v1.8.0 github.com/tdakkota/asciicheck v0.1.1 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.3 + github.com/tikv/client-go/v2 v2.0.4-0.20221219075931-a4f5c00b4667 github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 github.com/twmb/murmur3 v1.1.3 @@ -219,6 +219,7 @@ require ( github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd // indirect github.com/sirupsen/logrus v1.9.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect + github.com/tiancaiamao/gp v0.0.0-20221214071713-abacb15f16f1 // indirect github.com/tklauser/go-sysconf v0.3.10 // indirect github.com/tklauser/numcpus v0.4.0 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect diff --git a/go.sum b/go.sum index b7de00223b8a3..7b939d5d1109e 100644 --- a/go.sum +++ b/go.sum @@ -932,8 +932,10 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= -github.com/tikv/client-go/v2 v2.0.3 h1:/glZOHs/K2pkCioDVae+aThUHFYRYQkEgY4NUTgfh+s= -github.com/tikv/client-go/v2 v2.0.3/go.mod h1:MDT4J9LzgS7Bj1DnEq6Gk/puy6mp8TgUC92zGEVVLLg= +github.com/tiancaiamao/gp v0.0.0-20221214071713-abacb15f16f1 h1:iffZXeHZTd35tTOS3nJ2OyMUmn40eNkLHCeQXMs6KYI= +github.com/tiancaiamao/gp v0.0.0-20221214071713-abacb15f16f1/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= +github.com/tikv/client-go/v2 v2.0.4-0.20221219075931-a4f5c00b4667 h1:c5oR15IdOvGTFOKPZrJWKLsmecos8zL9eunEZDysfR4= +github.com/tikv/client-go/v2 v2.0.4-0.20221219075931-a4f5c00b4667/go.mod h1:CUlYic0IhmNy2WU2liHHOEK57Hw+2kQ+SRFcLsnjkPw= github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc= github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A= github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro= From 269a00202a03f32c65761cabbb40ffc44f741369 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 19 Dec 2022 16:30:05 +0800 Subject: [PATCH 3/9] update bazel Signed-off-by: guo-shaoge --- DEPS.bzl | 12 ++++++++++-- store/copr/BUILD.bazel | 1 - 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index ac0a348ad55fe..621d3782f9714 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -3523,12 +3523,20 @@ def go_deps(): sum = "h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=", version = "v0.0.0-20181126055449-889f96f722a2", ) + go_repository( + name = "com_github_tiancaiamao_gp", + build_file_proto_mode = "disable", + importpath = "github.com/tiancaiamao/gp", + sum = "h1:iffZXeHZTd35tTOS3nJ2OyMUmn40eNkLHCeQXMs6KYI=", + version = "v0.0.0-20221214071713-abacb15f16f1", + ) + go_repository( name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sum = "h1:/glZOHs/K2pkCioDVae+aThUHFYRYQkEgY4NUTgfh+s=", - version = "v2.0.3", + sum = "h1:c5oR15IdOvGTFOKPZrJWKLsmecos8zL9eunEZDysfR4=", + version = "v2.0.4-0.20221219075931-a4f5c00b4667", ) go_repository( name = "com_github_tikv_pd_client", diff --git a/store/copr/BUILD.bazel b/store/copr/BUILD.bazel index eb3eb2f016424..409c2add6f1da 100644 --- a/store/copr/BUILD.bazel +++ b/store/copr/BUILD.bazel @@ -41,7 +41,6 @@ go_library( "@com_github_pingcap_kvproto//pkg/mpp", "@com_github_pingcap_log//:log", "@com_github_pingcap_tipb//go-tipb", - "@com_github_stathat_consistent//:consistent", "@com_github_tikv_client_go_v2//config", "@com_github_tikv_client_go_v2//error", "@com_github_tikv_client_go_v2//metrics", From 1441516e83d13b2180f3fea7ce4b252ce4396ea1 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 19 Dec 2022 16:41:13 +0800 Subject: [PATCH 4/9] update err msg Signed-off-by: guo-shaoge --- store/copr/batch_coprocessor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 703e523f1c041..2a746c24dcf0d 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -634,7 +634,7 @@ func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, continue } if len(rpcCtxs) != len(tasks) { - return nil, errors.Errorf("length should be equal", zap.Any("len(rpcCtxs)", len(rpcCtxs)), zap.Any("len(tasks)", len(tasks))) + return nil, errors.Errorf("length should be equal, len(rpcCtxs): %d, len(tasks): %d", len(rpcCtxs), len(tasks)) } taskMap := make(map[string]*batchCopTask) for i, rpcCtx := range rpcCtxs { From e19720cdaa263451405b622cb35cd3931831d1ad Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 26 Dec 2022 13:20:35 +0800 Subject: [PATCH 5/9] fix Signed-off-by: guo-shaoge --- store/copr/batch_coprocessor.go | 87 +++++++-------------------------- 1 file changed, 19 insertions(+), 68 deletions(-) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 47445a7b63577..f4c200eeb29e9 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -323,35 +323,13 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks [] } } else { stores := cache.RegionCache.GetTiFlashStores() - var wg sync.WaitGroup - var mu sync.Mutex - wg.Add(len(stores)) - for i := range stores { - go func(idx int) { - defer wg.Done() - s := stores[idx] - - // check if store is failed already. - ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, s.GetAddr(), ttl) - if !ok { - return - } - - tikvClient := kvStore.GetTiKVClient() - ok = detectMPPStore(ctx, tikvClient, s.GetAddr(), DetectTimeoutLimit) - if !ok { - GlobalMPPFailedStoreProber.Add(ctx, s.GetAddr(), tikvClient) - return - } - - mu.Lock() - defer mu.Unlock() - storeTaskMap[s.StoreID()] = &batchCopTask{ - storeAddr: s.GetAddr(), - cmdType: originalTasks[0].cmdType, - ctx: &tikv.RPCContext{Addr: s.GetAddr(), Store: s}, - } - }(i) + aliveStores := filterAliveStores(ctx, stores, ttl, kvStore) + for _, s := range aliveStores { + storeTaskMap[s.StoreID()] = &batchCopTask{ + storeAddr: s.GetAddr(), + cmdType: originalTasks[0].cmdType, + ctx: &tikv.RPCContext{Addr: s.GetAddr(), Store: s}, + } } } @@ -509,7 +487,7 @@ func buildBatchCopTasksForNonPartitionedTable(bo *backoff.Backoffer, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) { if config.GetGlobalConfig().DisaggregatedTiFlash { - return buildBatchCopTasksConsistentHash(bo, store, []*KeyRanges{ranges}, storeType, isMpp, ttl) + return buildBatchCopTasksConsistentHash(bo, store, []*KeyRanges{ranges}, storeType, ttl) } return buildBatchCopTasksCore(bo, store, []*KeyRanges{ranges}, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) } @@ -524,7 +502,7 @@ func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer, balanceContinuousRegionCount int64, partitionIDs []int64) (batchTasks []*batchCopTask, err error) { if config.GetGlobalConfig().DisaggregatedTiFlash { - batchTasks, err = buildBatchCopTasksConsistentHash(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl) + batchTasks, err = buildBatchCopTasksConsistentHash(bo, store, rangesForEachPhysicalTable, storeType, ttl) } else { batchTasks, err = buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) } @@ -536,51 +514,26 @@ func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer, return batchTasks, nil } -func filterAliveStores(ctx context.Context, stores []*tikv.Store, mppStoreLastFailTime *sync.Map, ttl time.Duration, kvStore *kvStore) []*tikv.Store { +func filterAliveStores(ctx context.Context, stores []*tikv.Store, ttl time.Duration, kvStore *kvStore) []*tikv.Store { + var aliveStores []*tikv.Store var wg sync.WaitGroup var mu sync.Mutex wg.Add(len(stores)) - cur := time.Now() - aliveStores := make([]*tikv.Store, 0, len(stores)) for i := range stores { go func(idx int) { defer wg.Done() s := stores[idx] - var lastAny any - var ok bool - mu.Lock() - if lastAny, ok = mppStoreLastFailTime.Load(s.GetAddr()); ok && cur.Sub(lastAny.(time.Time)) < 100*time.Millisecond { - // The interval time is so short that may happen in a same query, so we needn't to check again. - mu.Unlock() - return - } else if !ok { - lastAny = time.Time{} - } - mu.Unlock() - - resp, err := kvStore.GetTiKVClient().SendRequest(ctx, s.GetAddr(), &tikvrpc.Request{ - Type: tikvrpc.CmdMPPAlive, - StoreTp: tikvrpc.TiFlash, - Req: &mpp.IsAliveRequest{}, - Context: kvrpcpb.Context{}, - }, 2*time.Second) - - if err != nil || !resp.Resp.(*mpp.IsAliveResponse).Available { - errMsg := "store not ready to serve" - if err != nil { - errMsg = err.Error() - } - logutil.BgLogger().Warn("Store is not ready", zap.String("store address", s.GetAddr()), zap.String("err message", errMsg)) - mu.Lock() - mppStoreLastFailTime.Store(s.GetAddr(), time.Now()) - mu.Unlock() + // check if store is failed already. + ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, s.GetAddr(), ttl) + if !ok { return } - if cur.Sub(lastAny.(time.Time)) < ttl { - logutil.BgLogger().Warn("Cannot detect store's availability because the current time has not reached MPPStoreLastFailTime + MPPStoreFailTTL", - zap.String("store address", s.GetAddr()), zap.Time("last fail time", lastAny.(time.Time))) + tikvClient := kvStore.GetTiKVClient() + ok = detectMPPStore(ctx, tikvClient, s.GetAddr(), DetectTimeoutLimit) + if !ok { + GlobalMPPFailedStoreProber.Add(ctx, s.GetAddr(), tikvClient) return } @@ -589,7 +542,6 @@ func filterAliveStores(ctx context.Context, stores []*tikv.Store, mppStoreLastFa aliveStores = append(aliveStores, s) }(i) } - wg.Wait() logutil.BgLogger().Info("detecting available mpp stores", zap.Any("total", len(stores)), zap.Any("alive", len(aliveStores))) return aliveStores @@ -602,7 +554,6 @@ func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, kvStore *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, - mppStoreLastFailTime *sync.Map, ttl time.Duration) (res []*batchCopTask, err error) { const cmdType = tikvrpc.CmdBatchCop var retryNum int @@ -636,7 +587,7 @@ func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, if err != nil { return nil, err } - stores = filterAliveStores(bo.GetCtx(), stores, mppStoreLastFailTime, ttl, kvStore) + stores = filterAliveStores(bo.GetCtx(), stores, ttl, kvStore) if len(stores) == 0 { return nil, errors.New("tiflash_compute node is unavailable") } From 0176ad56818a385dd556439a24786ddc97a1d65f Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 26 Dec 2022 13:25:26 +0800 Subject: [PATCH 6/9] fix Signed-off-by: guo-shaoge --- store/copr/batch_coprocessor.go | 1 + 1 file changed, 1 insertion(+) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index f4c200eeb29e9..05b33a8f5915d 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -542,6 +542,7 @@ func filterAliveStores(ctx context.Context, stores []*tikv.Store, ttl time.Durat aliveStores = append(aliveStores, s) }(i) } + wg.Wait() logutil.BgLogger().Info("detecting available mpp stores", zap.Any("total", len(stores)), zap.Any("alive", len(aliveStores))) return aliveStores From d9fb7e388bc2495f858aeb34fd559fdb39b772d5 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Thu, 29 Dec 2022 16:30:30 +0800 Subject: [PATCH 7/9] add failpoint Signed-off-by: guo-shaoge --- store/copr/batch_coprocessor.go | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 05b33a8f5915d..c795d8d7d03ba 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -21,6 +21,7 @@ import ( "io" "math" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -635,9 +636,35 @@ func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, break } + failpointCheckForConsistentHash(res) return res, nil } +func failpointCheckForConsistentHash(tasks []*batchCopTask) { + failpoint.Inject("checkOnlyDispatchToTiFlashComputeNodes", func(val failpoint.Value) { + logutil.BgLogger().Debug("in checkOnlyDispatchToTiFlashComputeNodes") + + // This failpoint will be tested in test-infra case, because we needs setup a cluster. + // All tiflash_compute nodes addrs are stored in val, separated by semicolon. + str := val.(string) + addrs := strings.Split(str, ";") + if len(addrs) < 1 { + err := fmt.Sprintf("unexpected length of tiflash_compute node addrs: %v, %s", len(addrs), str) + panic(err) + } + addrMap := make(map[string]struct{}) + for _, addr := range addrs { + addrMap[addr] = struct{}{} + } + for _, batchTask := range tasks { + if _, ok := addrMap[batchTask.storeAddr]; !ok { + err := errors.Errorf("batchCopTask send to node which is not tiflash_compute: %v(tiflash_compute nodes: %s)", batchTask.storeAddr, str) + panic(err) + } + } + }) +} + // When `partitionIDs != nil`, it means that buildBatchCopTasksCore is constructing a batch cop tasks for PartitionTableScan. // At this time, `len(rangesForEachPhysicalTable) == len(partitionIDs)` and `rangesForEachPhysicalTable[i]` is for partition `partitionIDs[i]`. // Otherwise, `rangesForEachPhysicalTable[0]` indicates the range for the single physical table. From 058e2b2a63dee7b35edb508cd4bfeb6deab39e23 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Thu, 29 Dec 2022 17:32:28 +0800 Subject: [PATCH 8/9] update bazel Signed-off-by: guo-shaoge --- DEPS.bzl | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index d52b03cf470bd..686aef1f3c028 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -3527,26 +3527,16 @@ def go_deps(): name = "com_github_tiancaiamao_gp", build_file_proto_mode = "disable", importpath = "github.com/tiancaiamao/gp", -<<<<<<< HEAD - sum = "h1:iffZXeHZTd35tTOS3nJ2OyMUmn40eNkLHCeQXMs6KYI=", - version = "v0.0.0-20221214071713-abacb15f16f1", -======= sum = "h1:4RNtqw1/tW67qP9fFgfQpTVd7DrfkaAWu4vsC18QmBo=", version = "v0.0.0-20221221095600-1a473d1f9b4b", ->>>>>>> cf349414012cafccb5069e08dd49235ec09d9cbf ) go_repository( name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", -<<<<<<< HEAD - sum = "h1:c5oR15IdOvGTFOKPZrJWKLsmecos8zL9eunEZDysfR4=", - version = "v2.0.4-0.20221219075931-a4f5c00b4667", -======= sum = "h1:m6glgBGCIds9QURbk8Mn+8mjLKDcv6nWrNwYh92fydQ=", version = "v2.0.4-0.20221226080148-018c59dbd837", ->>>>>>> cf349414012cafccb5069e08dd49235ec09d9cbf ) go_repository( name = "com_github_tikv_pd_client", From 110af29833d9417da68e7ee6721fb3224bd9d182 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Tue, 3 Jan 2023 14:16:07 +0800 Subject: [PATCH 9/9] trivial fix Signed-off-by: guo-shaoge --- store/copr/batch_coprocessor.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index c795d8d7d03ba..9f9d4ef6fb002 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -525,15 +525,13 @@ func filterAliveStores(ctx context.Context, stores []*tikv.Store, ttl time.Durat defer wg.Done() s := stores[idx] - // check if store is failed already. - ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, s.GetAddr(), ttl) - if !ok { + // Check if store is failed already. + if ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, s.GetAddr(), ttl); !ok { return } tikvClient := kvStore.GetTiKVClient() - ok = detectMPPStore(ctx, tikvClient, s.GetAddr(), DetectTimeoutLimit) - if !ok { + if ok := detectMPPStore(ctx, tikvClient, s.GetAddr(), DetectTimeoutLimit); !ok { GlobalMPPFailedStoreProber.Add(ctx, s.GetAddr(), tikvClient) return }