diff --git a/store/copr/BUILD.bazel b/store/copr/BUILD.bazel index f6cbe57efa2d7..9ea8467d01dfa 100644 --- a/store/copr/BUILD.bazel +++ b/store/copr/BUILD.bazel @@ -42,7 +42,6 @@ go_library( "@com_github_pingcap_kvproto//pkg/mpp", "@com_github_pingcap_log//:log", "@com_github_pingcap_tipb//go-tipb", - "@com_github_stathat_consistent//:consistent", "@com_github_tikv_client_go_v2//config", "@com_github_tikv_client_go_v2//error", "@com_github_tikv_client_go_v2//metrics", diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index b316d10acaf6e..9f9d4ef6fb002 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -21,6 +21,7 @@ import ( "io" "math" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -35,7 +36,6 @@ import ( "github.com/pingcap/tidb/store/driver/backoff" derr "github.com/pingcap/tidb/store/driver/error" "github.com/pingcap/tidb/util/logutil" - "github.com/stathat/consistent" "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" @@ -323,40 +323,15 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks [] storeTaskMap[taskStoreID] = batchTask } } else { - logutil.BgLogger().Info("detecting available mpp stores") - // decide the available stores stores := cache.RegionCache.GetTiFlashStores() - var wg sync.WaitGroup - var mu sync.Mutex - wg.Add(len(stores)) - for i := range stores { - go func(idx int) { - defer wg.Done() - s := stores[idx] - - // check if store is failed already. - ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, s.GetAddr(), ttl) - if !ok { - return - } - - tikvClient := kvStore.GetTiKVClient() - ok = detectMPPStore(ctx, tikvClient, s.GetAddr(), DetectTimeoutLimit) - if !ok { - GlobalMPPFailedStoreProber.Add(ctx, s.GetAddr(), tikvClient) - return - } - - mu.Lock() - defer mu.Unlock() - storeTaskMap[s.StoreID()] = &batchCopTask{ - storeAddr: s.GetAddr(), - cmdType: originalTasks[0].cmdType, - ctx: &tikv.RPCContext{Addr: s.GetAddr(), Store: s}, - } - }(i) + aliveStores := filterAliveStores(ctx, stores, ttl, kvStore) + for _, s := range aliveStores { + storeTaskMap[s.StoreID()] = &batchCopTask{ + storeAddr: s.GetAddr(), + cmdType: originalTasks[0].cmdType, + ctx: &tikv.RPCContext{Addr: s.GetAddr(), Store: s}, + } } - wg.Wait() } var candidateRegionInfos []RegionInfo @@ -513,7 +488,7 @@ func buildBatchCopTasksForNonPartitionedTable(bo *backoff.Backoffer, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) { if config.GetGlobalConfig().DisaggregatedTiFlash { - return buildBatchCopTasksConsistentHash(bo, store, []*KeyRanges{ranges}, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) + return buildBatchCopTasksConsistentHash(bo, store, []*KeyRanges{ranges}, storeType, ttl) } return buildBatchCopTasksCore(bo, store, []*KeyRanges{ranges}, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) } @@ -528,7 +503,7 @@ func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer, balanceContinuousRegionCount int64, partitionIDs []int64) (batchTasks []*batchCopTask, err error) { if config.GetGlobalConfig().DisaggregatedTiFlash { - batchTasks, err = buildBatchCopTasksConsistentHash(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) + batchTasks, err = buildBatchCopTasksConsistentHash(bo, store, rangesForEachPhysicalTable, storeType, ttl) } else { batchTasks, err = buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) } @@ -540,49 +515,152 @@ func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer, return batchTasks, nil } -func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) { - batchTasks, err := buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) - if err != nil { - return nil, err - } - cache := store.GetRegionCache() - stores, err := cache.GetTiFlashComputeStores(bo.TiKVBackoffer()) - if err != nil { - return nil, err - } - if len(stores) == 0 { - return nil, errors.New("No available tiflash_compute node") - } +func filterAliveStores(ctx context.Context, stores []*tikv.Store, ttl time.Duration, kvStore *kvStore) []*tikv.Store { + var aliveStores []*tikv.Store + var wg sync.WaitGroup + var mu sync.Mutex + wg.Add(len(stores)) + for i := range stores { + go func(idx int) { + defer wg.Done() + s := stores[idx] + + // Check if store is failed already. + if ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, s.GetAddr(), ttl); !ok { + return + } + + tikvClient := kvStore.GetTiKVClient() + if ok := detectMPPStore(ctx, tikvClient, s.GetAddr(), DetectTimeoutLimit); !ok { + GlobalMPPFailedStoreProber.Add(ctx, s.GetAddr(), tikvClient) + return + } - hasher := consistent.New() - for _, store := range stores { - hasher.Add(store.GetAddr()) + mu.Lock() + defer mu.Unlock() + aliveStores = append(aliveStores, s) + }(i) } - for _, task := range batchTasks { - addr, err := hasher.Get(task.storeAddr) + wg.Wait() + + logutil.BgLogger().Info("detecting available mpp stores", zap.Any("total", len(stores)), zap.Any("alive", len(aliveStores))) + return aliveStores +} + +// 1. Split range by region location to build copTasks. +// 2. For each copTask build its rpcCtx , the target tiflash_compute node will be chosen using consistent hash. +// 3. All copTasks that will be sent to one tiflash_compute node are put in one batchCopTask. +func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, + kvStore *kvStore, + rangesForEachPhysicalTable []*KeyRanges, + storeType kv.StoreType, + ttl time.Duration) (res []*batchCopTask, err error) { + const cmdType = tikvrpc.CmdBatchCop + var retryNum int + cache := kvStore.GetRegionCache() + + for { + retryNum++ + var rangesLen int + tasks := make([]*copTask, 0) + regionIDs := make([]tikv.RegionVerID, 0) + + for i, ranges := range rangesForEachPhysicalTable { + rangesLen += ranges.Len() + locations, err := cache.SplitKeyRangesByLocations(bo, ranges) + if err != nil { + return nil, errors.Trace(err) + } + for _, lo := range locations { + tasks = append(tasks, &copTask{ + region: lo.Location.Region, + ranges: lo.Ranges, + cmdType: cmdType, + storeType: storeType, + partitionIndex: int64(i), + }) + regionIDs = append(regionIDs, lo.Location.Region) + } + } + + stores, err := cache.GetTiFlashComputeStores(bo.TiKVBackoffer()) if err != nil { return nil, err } - var store *tikv.Store - for _, s := range stores { - if s.GetAddr() == addr { - store = s - break + stores = filterAliveStores(bo.GetCtx(), stores, ttl, kvStore) + if len(stores) == 0 { + return nil, errors.New("tiflash_compute node is unavailable") + } + + rpcCtxs, err := cache.GetTiFlashComputeRPCContextByConsistentHash(bo.TiKVBackoffer(), regionIDs, stores) + if err != nil { + return nil, err + } + if rpcCtxs == nil { + logutil.BgLogger().Info("buildBatchCopTasksConsistentHash retry because rcpCtx is nil", zap.Int("retryNum", retryNum)) + err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer")) + if err != nil { + return nil, errors.Trace(err) } + continue } - if store == nil { - return nil, errors.New("cannot find tiflash_compute store: " + addr) + if len(rpcCtxs) != len(tasks) { + return nil, errors.Errorf("length should be equal, len(rpcCtxs): %d, len(tasks): %d", len(rpcCtxs), len(tasks)) } - - task.storeAddr = addr - task.ctx.Store = store - task.ctx.Addr = addr - } - logutil.BgLogger().Info("build batchCop tasks for disaggregated tiflash using ConsistentHash done.", zap.Int("len(tasks)", len(batchTasks))) - for _, task := range batchTasks { - logutil.BgLogger().Debug("batchTasks detailed info", zap.String("addr", task.storeAddr), zap.Int("RegionInfo number", len(task.regionInfos))) + taskMap := make(map[string]*batchCopTask) + for i, rpcCtx := range rpcCtxs { + regionInfo := RegionInfo{ + // tasks and rpcCtxs are correspond to each other. + Region: tasks[i].region, + Meta: rpcCtx.Meta, + Ranges: tasks[i].ranges, + AllStores: []uint64{rpcCtx.Store.StoreID()}, + PartitionIndex: tasks[i].partitionIndex, + } + if batchTask, ok := taskMap[rpcCtx.Addr]; ok { + batchTask.regionInfos = append(batchTask.regionInfos, regionInfo) + } else { + batchTask := &batchCopTask{ + storeAddr: rpcCtx.Addr, + cmdType: cmdType, + ctx: rpcCtx, + regionInfos: []RegionInfo{regionInfo}, + } + taskMap[rpcCtx.Addr] = batchTask + res = append(res, batchTask) + } + } + logutil.BgLogger().Info("buildBatchCopTasksConsistentHash done", zap.Any("len(tasks)", len(taskMap)), zap.Any("len(tiflash_compute)", len(stores))) + break } - return batchTasks, nil + + failpointCheckForConsistentHash(res) + return res, nil +} + +func failpointCheckForConsistentHash(tasks []*batchCopTask) { + failpoint.Inject("checkOnlyDispatchToTiFlashComputeNodes", func(val failpoint.Value) { + logutil.BgLogger().Debug("in checkOnlyDispatchToTiFlashComputeNodes") + + // This failpoint will be tested in test-infra case, because we needs setup a cluster. + // All tiflash_compute nodes addrs are stored in val, separated by semicolon. + str := val.(string) + addrs := strings.Split(str, ";") + if len(addrs) < 1 { + err := fmt.Sprintf("unexpected length of tiflash_compute node addrs: %v, %s", len(addrs), str) + panic(err) + } + addrMap := make(map[string]struct{}) + for _, addr := range addrs { + addrMap[addr] = struct{}{} + } + for _, batchTask := range tasks { + if _, ok := addrMap[batchTask.storeAddr]; !ok { + err := errors.Errorf("batchCopTask send to node which is not tiflash_compute: %v(tiflash_compute nodes: %s)", batchTask.storeAddr, str) + panic(err) + } + } + }) } // When `partitionIDs != nil`, it means that buildBatchCopTasksCore is constructing a batch cop tasks for PartitionTableScan. diff --git a/store/copr/batch_request_sender.go b/store/copr/batch_request_sender.go index 3a3bd339f4e6c..b976d26a59ab3 100644 --- a/store/copr/batch_request_sender.go +++ b/store/copr/batch_request_sender.go @@ -21,7 +21,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/tidb/config" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" @@ -36,7 +35,6 @@ type RegionInfo struct { Ranges *KeyRanges AllStores []uint64 PartitionIndex int64 // used by PartitionTableScan, indicates the n-th partition of the partition table - Addr string } func (ri *RegionInfo) toCoprocessorRegionInfo() *coprocessor.RegionInfo { @@ -100,22 +98,18 @@ func (ss *RegionBatchRequestSender) onSendFailForBatchRegions(bo *Backoffer, ctx return tikverr.ErrTiDBShuttingDown } - if config.GetGlobalConfig().DisaggregatedTiFlash { - ss.GetRegionCache().InvalidateTiFlashComputeStoresIfGRPCError(err) - } else { - // The reload region param is always true. Because that every time we try, we must - // re-build the range then re-create the batch sender. As a result, the len of "failStores" - // will change. If tiflash's replica is more than two, the "reload region" will always be false. - // Now that the batch cop and mpp has a relative low qps, it's reasonable to reload every time - // when meeting io error. - rc := RegionCache{ss.GetRegionCache()} - rc.OnSendFailForBatchRegions(bo, ctx.Store, regionInfos, true, err) + // The reload region param is always true. Because that every time we try, we must + // re-build the range then re-create the batch sender. As a result, the len of "failStores" + // will change. If tiflash's replica is more than two, the "reload region" will always be false. + // Now that the batch cop and mpp has a relative low qps, it's reasonable to reload every time + // when meeting io error. + rc := RegionCache{ss.GetRegionCache()} + rc.OnSendFailForBatchRegions(bo, ctx.Store, regionInfos, true, err) - // Retry on send request failure when it's not canceled. - // When a store is not available, the leader of related region should be elected quickly. - // TODO: the number of retry time should be limited:since region may be unavailable - // when some unrecoverable disaster happened. - err = bo.Backoff(tikv.BoTiFlashRPC(), errors.Errorf("send request error: %v, ctx: %v, regionInfos: %v", err, ctx, regionInfos)) - } + // Retry on send request failure when it's not canceled. + // When a store is not available, the leader of related region should be elected quickly. + // TODO: the number of retry time should be limited:since region may be unavailable + // when some unrecoverable disaster happened. + err = bo.Backoff(tikv.BoTiFlashRPC(), errors.Errorf("send request error: %v, ctx: %v, regionInfos: %v", err, ctx, regionInfos)) return errors.Trace(err) } diff --git a/store/copr/mpp.go b/store/copr/mpp.go index c3225c40d1455..39ab058c223e5 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -254,14 +254,14 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req if originalTask != nil { sender := NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient(), m.enableCollectExecutionInfo) rpcResp, retry, _, err = sender.SendReqToAddr(bo, originalTask.ctx, originalTask.regionInfos, wrappedReq, tikv.ReadTimeoutMedium) - if err != nil && disaggregatedTiFlash { - m.store.GetRegionCache().InvalidateTiFlashComputeStoresIfGRPCError(err) - } // No matter what the rpc error is, we won't retry the mpp dispatch tasks. // TODO: If we want to retry, we must redo the plan fragment cutting and task scheduling. // That's a hard job but we can try it in the future. if sender.GetRPCError() != nil { logutil.BgLogger().Warn("mpp dispatch meet io error", zap.String("error", sender.GetRPCError().Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId)) + if disaggregatedTiFlash { + m.store.GetRegionCache().InvalidateTiFlashComputeStores() + } // if needTriggerFallback is true, we return timeout to trigger tikv's fallback if m.needTriggerFallback { err = derr.ErrTiFlashServerTimeout @@ -275,7 +275,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req retry = false } else if err != nil { if disaggregatedTiFlash { - m.store.GetRegionCache().InvalidateTiFlashComputeStoresIfGRPCError(err) + m.store.GetRegionCache().InvalidateTiFlashComputeStores() } if bo.Backoff(tikv.BoTiFlashRPC(), err) == nil { retry = true @@ -355,6 +355,7 @@ func (m *mppIterator) cancelMppTasks() { } // send cancel cmd to all stores where tasks run + gotErr := atomic.Bool{} wg := util.WaitGroupWrapper{} for addr := range usedStoreAddrs { storeAddr := addr @@ -363,13 +364,14 @@ func (m *mppIterator) cancelMppTasks() { logutil.BgLogger().Debug("cancel task", zap.Uint64("query id ", m.startTs), zap.String("on addr", storeAddr)) if err != nil { logutil.BgLogger().Error("cancel task error", zap.Error(err), zap.Uint64("query id", m.startTs), zap.String("on addr", storeAddr)) - if disaggregatedTiFlash { - m.store.GetRegionCache().InvalidateTiFlashComputeStoresIfGRPCError(err) - } + gotErr.CompareAndSwap(false, true) } }) } wg.Wait() + if gotErr.Load() && disaggregatedTiFlash { + m.store.GetRegionCache().InvalidateTiFlashComputeStores() + } } func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchRequest, taskMeta *mpp.TaskMeta) { @@ -396,15 +398,15 @@ func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchReques if err != nil { logutil.BgLogger().Warn("establish mpp connection meet error and cannot retry", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId)) + if disaggregatedTiFlash { + m.store.GetRegionCache().InvalidateTiFlashComputeStores() + } // if needTriggerFallback is true, we return timeout to trigger tikv's fallback if m.needTriggerFallback { m.sendError(derr.ErrTiFlashServerTimeout) } else { m.sendError(err) } - if disaggregatedTiFlash { - m.store.GetRegionCache().InvalidateTiFlashComputeStoresIfGRPCError(err) - } return }