Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor, store: Plumb the query max execution time to timebox the PD GetRegion grpc calls #56923

Merged
merged 2 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/distsql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ go_test(
embed = [":distsql"],
flaky = True,
race = "on",
shard_count = 28,
shard_count = 29,
deps = [
"//pkg/distsql/context",
"//pkg/errctx",
Expand Down
1 change: 1 addition & 0 deletions pkg/distsql/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type DistSQLContext struct {
LoadBasedReplicaReadThreshold time.Duration
RunawayChecker resourcegroup.RunawayChecker
TiKVClientReadTimeout uint64
MaxExecutionTime uint64

ReplicaClosestReadThreshold int64
ConnectionID uint64
Expand Down
1 change: 1 addition & 0 deletions pkg/distsql/context/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func TestContextDetach(t *testing.T) {
ResourceGroupName: "c",
LoadBasedReplicaReadThreshold: time.Second,
TiKVClientReadTimeout: 1,
MaxExecutionTime: 1,

ReplicaClosestReadThreshold: 1,
ConnectionID: 1,
Expand Down
1 change: 1 addition & 0 deletions pkg/distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ func (builder *RequestBuilder) SetFromSessionVars(dctx *distsqlctx.DistSQLContex
builder.Request.StoreBusyThreshold = dctx.LoadBasedReplicaReadThreshold
builder.Request.RunawayChecker = dctx.RunawayChecker
builder.Request.TiKVClientReadTimeout = dctx.TiKVClientReadTimeout
builder.Request.MaxExecutionTime = dctx.MaxExecutionTime
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO adding a timeout to the context should be the final step. If we design with the expectation that the entire execution path checks for ctx.Done messages, perhaps we should review and document in the gitHub issue which tidb paths lack ctx.Done checks and list related TODO sub-tasks.

/cc @bb7133

return builder
}

Expand Down
27 changes: 27 additions & 0 deletions pkg/distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,33 @@ func TestRequestBuilderTiKVClientReadTimeout(t *testing.T) {
require.Equal(t, expect, actual)
}

func TestRequestBuilderMaxExecutionTime(t *testing.T) {
dctx := NewDistSQLContextForTest()
dctx.MaxExecutionTime = 100
actual, err := (&RequestBuilder{}).
SetFromSessionVars(dctx).
Build()
require.NoError(t, err)
expect := &kv.Request{
Tp: 0,
StartTs: 0x0,
Data: []uint8(nil),
KeyRanges: kv.NewNonPartitionedKeyRanges(nil),
Concurrency: variable.DefDistSQLScanConcurrency,
IsolationLevel: 0,
Priority: 0,
MemTracker: (*memory.Tracker)(nil),
SchemaVar: 0,
ReadReplicaScope: kv.GlobalReplicaScope,
MaxExecutionTime: 100,
ResourceGroupName: resourcegroup.DefaultResourceGroupName,
}
expect.Paging.MinPagingSize = paging.MinPagingSize
expect.Paging.MaxPagingSize = paging.MaxPagingSize
actual.ResourceGroupTagger = nil
require.Equal(t, expect, actual)
}

func TestTableRangesToKVRangesWithFbs(t *testing.T) {
ranges := []*ranger.Range{
{
Expand Down
7 changes: 7 additions & 0 deletions pkg/executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"slices"
"sync/atomic"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
Expand Down Expand Up @@ -235,6 +236,12 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
var indexKeys []kv.Key
var err error
batchGetter := e.batchGetter
if e.Ctx().GetSessionVars().MaxExecutionTime > 0 {
// If MaxExecutionTime is set, we need to set the context deadline for the batch get.
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(e.Ctx().GetSessionVars().MaxExecutionTime)*time.Millisecond)
defer cancel()
}
rc := e.Ctx().GetSessionVars().IsPessimisticReadConsistency()
if e.idxInfo != nil && !isCommonHandleRead(e.tblInfo, e.idxInfo) {
// `SELECT a, b FROM t WHERE (a, b) IN ((1, 2), (1, 2), (2, 1), (1, 2))` should not return duplicated rows
Expand Down
7 changes: 7 additions & 0 deletions pkg/executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"sort"
"strconv"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -672,6 +673,12 @@ func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) ([]byte, error)
}
}
// if not read lock or table was unlock then snapshot get
if e.Ctx().GetSessionVars().MaxExecutionTime > 0 {
// if the query has max execution time set, we need to set the context deadline for the get request
ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Duration(e.Ctx().GetSessionVars().MaxExecutionTime)*time.Millisecond)
defer cancel()
return e.snapshot.Get(ctxWithTimeout, key)
}
return e.snapshot.Get(ctx, key)
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,8 @@ type Request struct {
StoreBusyThreshold time.Duration
// TiKVClientReadTimeout is the timeout of kv read request
TiKVClientReadTimeout uint64
// MaxExecutionTime is the timeout of the whole query execution
MaxExecutionTime uint64

RunawayChecker resourcegroup.RunawayChecker

Expand Down
1 change: 1 addition & 0 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2644,6 +2644,7 @@ func (s *session) GetDistSQLCtx() *distsqlctx.DistSQLContext {
LoadBasedReplicaReadThreshold: vars.LoadBasedReplicaReadThreshold,
RunawayChecker: sc.RunawayChecker,
TiKVClientReadTimeout: vars.GetTiKVClientReadTimeout(),
MaxExecutionTime: vars.GetMaxExecutionTime(),

ReplicaClosestReadThreshold: vars.ReplicaClosestReadThreshold,
ConnectionID: vars.ConnectionID,
Expand Down
7 changes: 7 additions & 0 deletions pkg/store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,13 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.V
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)

if req.MaxExecutionTime > 0 {
// If MaxExecutionTime is set, we need to set the deadline for the whole batch coprocessor request context.
ctxWithTimeout, cancel := context.WithTimeout(bo.GetCtx(), time.Duration(req.MaxExecutionTime)*time.Millisecond)
defer cancel()
bo.TiKVBackoffer().SetCtx(ctxWithTimeout)
}

var tasks []*batchCopTask
var err error
if req.PartitionIDAndRanges != nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,13 @@ func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*c
}
})

if req.MaxExecutionTime > 0 {
// If the request has a MaxExecutionTime, we need to set the deadline of the context.
ctxWithTimeout, cancel := context.WithTimeout(bo.GetCtx(), time.Duration(req.MaxExecutionTime)*time.Millisecond)
defer cancel()
bo.TiKVBackoffer().SetCtx(ctxWithTimeout)
}

// TODO(youjiali1995): is there any request type that needn't be split by buckets?
locs, err := cache.SplitKeyRangesByBuckets(bo, ranges)
if err != nil {
Expand Down