Skip to content

Commit

Permalink
*: Check sqlkiller status during split region process (#56155)
Browse files Browse the repository at this point in the history
close #55957
  • Loading branch information
yibin authored Sep 19, 2024
1 parent e0bef39 commit af46a3f
Show file tree
Hide file tree
Showing 13 changed files with 61 additions and 9 deletions.
1 change: 1 addition & 0 deletions pkg/distsql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ go_library(
"//pkg/util/logutil",
"//pkg/util/memory",
"//pkg/util/ranger",
"//pkg/util/sqlkiller",
"//pkg/util/topsql/stmtstats",
"//pkg/util/tracing",
"//pkg/util/trxevents",
Expand Down
7 changes: 7 additions & 0 deletions pkg/distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/pkg/util/collate"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/ranger"
"github.com/pingcap/tidb/pkg/util/sqlkiller"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/tikvrpc"
)
Expand Down Expand Up @@ -437,6 +438,12 @@ func (builder *RequestBuilder) SetConnIDAndConnAlias(connID uint64, connAlias st
return builder
}

// SetSQLKiller sets sqlkiller for the builder.
func (builder *RequestBuilder) SetSQLKiller(killer *sqlkiller.SQLKiller) *RequestBuilder {
builder.SQLKiller = killer
return builder
}

// TableHandleRangesToKVRanges convert table handle ranges to "KeyRanges" for multiple tables.
func TableHandleRangesToKVRanges(dctx *distsqlctx.DistSQLContext, tid []int64, isCommonHandle bool, ranges []*ranger.Range) (*kv.KeyRanges, error) {
if !isCommonHandle {
Expand Down
1 change: 1 addition & 0 deletions pkg/executor/analyze_col.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectRe
SetMemTracker(e.memTracker).
SetResourceGroupName(e.ctx.GetSessionVars().StmtCtx.ResourceGroupName).
SetExplicitRequestSourceType(e.ctx.GetSessionVars().ExplicitRequestSourceType).
SetSQLKiller(&e.ctx.GetSessionVars().SQLKiller).
Build()
if err != nil {
return nil, err
Expand Down
6 changes: 4 additions & 2 deletions pkg/executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,8 @@ func (e *IndexReaderExecutor) buildKVReq(r []kv.KeyRange) (*kv.Request, error) {
SetFromInfoSchema(e.Ctx().GetInfoSchema()).
SetMemTracker(e.memTracker).
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx().GetDistSQLCtx(), &builder.Request, e.netDataSize)).
SetConnIDAndConnAlias(e.Ctx().GetSessionVars().ConnectionID, e.Ctx().GetSessionVars().SessionAlias)
SetConnIDAndConnAlias(e.Ctx().GetSessionVars().ConnectionID, e.Ctx().GetSessionVars().SessionAlias).
SetSQLKiller(&e.Ctx().GetSessionVars().SQLKiller)
kvReq, err := builder.Build()
return kvReq, err
}
Expand Down Expand Up @@ -721,7 +722,8 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<
SetFromInfoSchema(e.Ctx().GetInfoSchema()).
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx().GetDistSQLCtx(), &builder.Request, e.idxNetDataSize/float64(len(kvRanges)))).
SetMemTracker(tracker).
SetConnIDAndConnAlias(e.Ctx().GetSessionVars().ConnectionID, e.Ctx().GetSessionVars().SessionAlias)
SetConnIDAndConnAlias(e.Ctx().GetSessionVars().ConnectionID, e.Ctx().GetSessionVars().SessionAlias).
SetSQLKiller(&e.Ctx().GetSessionVars().SQLKiller)

results := make([]distsql.SelectResult, 0, len(kvRanges))
for _, kvRange := range kvRanges {
Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,8 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
SetPaging(e.paging).
SetFromInfoSchema(e.Ctx().GetInfoSchema()).
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx().GetDistSQLCtx(), &builder.Request, e.partialNetDataSizes[workID])).
SetConnIDAndConnAlias(e.Ctx().GetSessionVars().ConnectionID, e.Ctx().GetSessionVars().SessionAlias)
SetConnIDAndConnAlias(e.Ctx().GetSessionVars().ConnectionID, e.Ctx().GetSessionVars().SessionAlias).
SetSQLKiller(&e.Ctx().GetSessionVars().SQLKiller)

tps := worker.getRetTpsForIndexScan(e.handleCols)
results := make([]distsql.SelectResult, 0, len(keyRanges))
Expand Down
5 changes: 4 additions & 1 deletion pkg/executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ func (e *TableReaderExecutor) buildKVReqSeparately(ctx context.Context, ranges [
SetAllowBatchCop(e.batchCop).
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.dctx, &reqBuilder.Request, e.netDataSize)).
SetConnIDAndConnAlias(e.dctx.ConnectionID, e.dctx.SessionAlias).
SetSQLKiller(e.dctx.SQLKiller).
Build()
if err != nil {
return nil, err
Expand Down Expand Up @@ -484,6 +485,7 @@ func (e *TableReaderExecutor) buildKVReqForPartitionTableScan(ctx context.Contex
SetAllowBatchCop(e.batchCop).
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.dctx, &reqBuilder.Request, e.netDataSize)).
SetConnIDAndConnAlias(e.dctx.ConnectionID, e.dctx.SessionAlias).
SetSQLKiller(e.dctx.SQLKiller).
Build()
if err != nil {
return nil, err
Expand Down Expand Up @@ -528,7 +530,8 @@ func (e *TableReaderExecutor) buildKVReq(ctx context.Context, ranges []*ranger.R
SetAllowBatchCop(e.batchCop).
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.dctx, &reqBuilder.Request, e.netDataSize)).
SetPaging(e.paging).
SetConnIDAndConnAlias(e.dctx.ConnectionID, e.dctx.SessionAlias)
SetConnIDAndConnAlias(e.dctx.ConnectionID, e.dctx.SessionAlias).
SetSQLKiller(e.dctx.SQLKiller)
return reqBuilder.Build()
}

Expand Down
17 changes: 17 additions & 0 deletions pkg/executor/test/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3010,3 +3010,20 @@ func TestQueryWithKill(t *testing.T) {
}()
wg.Wait()
}

func TestIssue55957(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a int)")
for i := 0; i < 20; i++ {
tk.MustExec(fmt.Sprintf("insert into t values(%d)", i))
}
tk.Session().GetSessionVars().ConnectionID = 123456
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/copr/SplitRangesHangCausedKill", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/copr/SplitRangesHangCausedKill"))
}()
err := tk.QueryToErr("select /*+ MAX_EXECUTION_TIME(1000) */ * from t where a < 30 and a > 3 order by a")
require.True(t, exeerrors.ErrMaxExecTimeExceeded.Equal(err))
}
2 changes: 1 addition & 1 deletion pkg/executor/test/splittest/split_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ func BenchmarkLocateRegion(t *testing.B) {

t.ResetTimer()
for i := 0; i < t.N; i++ {
_, err := cache.SplitKeyRangesByBuckets(bo, ranges)
_, err := cache.SplitKeyRangesByBuckets(bo, ranges, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_library(
"//pkg/util/memory",
"//pkg/util/set",
"//pkg/util/size",
"//pkg/util/sqlkiller",
"//pkg/util/tiflash",
"//pkg/util/tiflashcompute",
"//pkg/util/trxevents",
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/pkg/domain/resourcegroup"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/sqlkiller"
"github.com/pingcap/tidb/pkg/util/tiflash"
"github.com/pingcap/tidb/pkg/util/trxevents"
tikvstore "github.com/tikv/client-go/v2/kv"
Expand Down Expand Up @@ -602,6 +603,8 @@ type Request struct {
ConnID uint64
// ConnAlias stores the session connection alias.
ConnAlias string
// SQLKiller is a flag to indicate that this query is killed.
SQLKiller *sqlkiller.SQLKiller
}

// CoprRequestAdjuster is used to check and adjust a copr request according to specific rules.
Expand Down
1 change: 1 addition & 0 deletions pkg/store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_library(
"//pkg/util/memory",
"//pkg/util/paging",
"//pkg/util/size",
"//pkg/util/sqlkiller",
"//pkg/util/tiflash",
"//pkg/util/tiflashcompute",
"//pkg/util/tracing",
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*c
})

// TODO(youjiali1995): is there any request type that needn't be splitted by buckets?
locs, err := cache.SplitKeyRangesByBuckets(bo, ranges)
locs, err := cache.SplitKeyRangesByBuckets(bo, ranges, req.SQLKiller)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
21 changes: 18 additions & 3 deletions pkg/store/copr/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/kv"
derr "github.com/pingcap/tidb/pkg/store/driver/error"
"github.com/pingcap/tidb/pkg/store/driver/options"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/sqlkiller"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
Expand Down Expand Up @@ -168,7 +170,7 @@ const UnspecifiedLimit = -1

// SplitKeyRangesByLocationsWithBuckets splits the KeyRanges by logical info in the cache.
// The buckets in the returned LocationKeyRanges are not empty if the region is split by bucket.
func (c *RegionCache) SplitKeyRangesByLocationsWithBuckets(bo *Backoffer, ranges *KeyRanges, limit int) ([]*LocationKeyRanges, error) {
func (c *RegionCache) SplitKeyRangesByLocationsWithBuckets(bo *Backoffer, ranges *KeyRanges, limit int, killer *sqlkiller.SQLKiller) ([]*LocationKeyRanges, error) {
res := make([]*LocationKeyRanges, 0)
for ranges.Len() > 0 {
if limit != UnspecifiedLimit && len(res) >= limit {
Expand All @@ -178,6 +180,19 @@ func (c *RegionCache) SplitKeyRangesByLocationsWithBuckets(bo *Backoffer, ranges
if err != nil {
return res, derr.ToTiDBErr(err)
}
failpoint.Inject("SplitRangesHangCausedKill", func(val failpoint.Value) {
if val.(bool) {
if killer != nil {
killer.SendKillSignal(sqlkiller.MaxExecTimeExceeded)
}
}
})
if killer != nil {
err = killer.HandleSignal()
if err != nil {
return nil, err
}
}

isBreak := false
res, ranges, isBreak = c.splitKeyRangesByLocation(loc, ranges, res)
Expand Down Expand Up @@ -240,8 +255,8 @@ func (c *RegionCache) SplitKeyRangesByLocationsWithoutBuckets(bo *Backoffer, ran
// it's equal to SplitKeyRangesByLocations.
//
// TODO(youjiali1995): Try to do it in one round and reduce allocations if bucket is not enabled.
func (c *RegionCache) SplitKeyRangesByBuckets(bo *Backoffer, ranges *KeyRanges) ([]*LocationKeyRanges, error) {
locs, err := c.SplitKeyRangesByLocationsWithBuckets(bo, ranges, UnspecifiedLimit)
func (c *RegionCache) SplitKeyRangesByBuckets(bo *Backoffer, ranges *KeyRanges, sqlkiller *sqlkiller.SQLKiller) ([]*LocationKeyRanges, error) {
locs, err := c.SplitKeyRangesByLocationsWithBuckets(bo, ranges, UnspecifiedLimit, sqlkiller)
if err != nil {
return nil, derr.ToTiDBErr(err)
}
Expand Down

0 comments on commit af46a3f

Please sign in to comment.