From beaa70449e664296995dc415681a8cc31e6b5968 Mon Sep 17 00:00:00 2001 From: HaoW30 <2007harf@gmail.com> Date: Mon, 28 Oct 2024 08:53:25 -0700 Subject: [PATCH 1/2] Plumb the query max execution time to timebox the PD GetRegion grpc calls --- pkg/distsql/context/context.go | 1 + pkg/distsql/request_builder.go | 1 + pkg/distsql/request_builder_test.go | 27 +++++++++++++++++++++++++++ pkg/executor/batch_point_get.go | 5 +++++ pkg/executor/point_get.go | 9 ++++++++- pkg/kv/kv.go | 2 ++ pkg/session/session.go | 1 + pkg/store/copr/batch_coprocessor.go | 6 ++++++ pkg/store/copr/coprocessor.go | 6 ++++++ 9 files changed, 57 insertions(+), 1 deletion(-) diff --git a/pkg/distsql/context/context.go b/pkg/distsql/context/context.go index 9612a30f1048f..f0327326e3854 100644 --- a/pkg/distsql/context/context.go +++ b/pkg/distsql/context/context.go @@ -78,6 +78,7 @@ type DistSQLContext struct { LoadBasedReplicaReadThreshold time.Duration RunawayChecker resourcegroup.RunawayChecker TiKVClientReadTimeout uint64 + MaxExecutionTime uint64 ReplicaClosestReadThreshold int64 ConnectionID uint64 diff --git a/pkg/distsql/request_builder.go b/pkg/distsql/request_builder.go index 672543d329af1..fd06047eec6d3 100644 --- a/pkg/distsql/request_builder.go +++ b/pkg/distsql/request_builder.go @@ -341,6 +341,7 @@ func (builder *RequestBuilder) SetFromSessionVars(dctx *distsqlctx.DistSQLContex builder.Request.StoreBusyThreshold = dctx.LoadBasedReplicaReadThreshold builder.Request.RunawayChecker = dctx.RunawayChecker builder.Request.TiKVClientReadTimeout = dctx.TiKVClientReadTimeout + builder.Request.MaxExecutionTime = dctx.MaxExecutionTime return builder } diff --git a/pkg/distsql/request_builder_test.go b/pkg/distsql/request_builder_test.go index 64c1c232562e6..7dbad068608dc 100644 --- a/pkg/distsql/request_builder_test.go +++ b/pkg/distsql/request_builder_test.go @@ -678,6 +678,33 @@ func TestRequestBuilderTiKVClientReadTimeout(t *testing.T) { require.Equal(t, expect, actual) } +func TestRequestBuilderMaxExecutionTime(t *testing.T) { + dctx := NewDistSQLContextForTest() + dctx.MaxExecutionTime = 100 + actual, err := (&RequestBuilder{}). + SetFromSessionVars(dctx). + Build() + require.NoError(t, err) + expect := &kv.Request{ + Tp: 0, + StartTs: 0x0, + Data: []uint8(nil), + KeyRanges: kv.NewNonPartitionedKeyRanges(nil), + Concurrency: variable.DefDistSQLScanConcurrency, + IsolationLevel: 0, + Priority: 0, + MemTracker: (*memory.Tracker)(nil), + SchemaVar: 0, + ReadReplicaScope: kv.GlobalReplicaScope, + MaxExecutionTime: 100, + ResourceGroupName: resourcegroup.DefaultResourceGroupName, + } + expect.Paging.MinPagingSize = paging.MinPagingSize + expect.Paging.MaxPagingSize = paging.MaxPagingSize + actual.ResourceGroupTagger = nil + require.Equal(t, expect, actual) +} + func TestTableRangesToKVRangesWithFbs(t *testing.T) { ranges := []*ranger.Range{ { diff --git a/pkg/executor/batch_point_get.go b/pkg/executor/batch_point_get.go index 6921135fe9887..4392605e40a24 100644 --- a/pkg/executor/batch_point_get.go +++ b/pkg/executor/batch_point_get.go @@ -19,6 +19,7 @@ import ( "fmt" "slices" "sync/atomic" + "time" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/executor/internal/exec" @@ -235,6 +236,10 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { var indexKeys []kv.Key var err error batchGetter := e.batchGetter + if e.Ctx().GetSessionVars().MaxExecutionTime > 0 { + // If MaxExecutionTime is set, we need to set the context deadline for the batch get. + ctx, _ = context.WithTimeout(ctx, time.Duration(e.Ctx().GetSessionVars().MaxExecutionTime)*time.Millisecond) + } rc := e.Ctx().GetSessionVars().IsPessimisticReadConsistency() if e.idxInfo != nil && !isCommonHandleRead(e.tblInfo, e.idxInfo) { // `SELECT a, b FROM t WHERE (a, b) IN ((1, 2), (1, 2), (2, 1), (1, 2))` should not return duplicated rows diff --git a/pkg/executor/point_get.go b/pkg/executor/point_get.go index cea86884a841e..51aa939c335b0 100644 --- a/pkg/executor/point_get.go +++ b/pkg/executor/point_get.go @@ -19,6 +19,7 @@ import ( "fmt" "sort" "strconv" + "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -672,7 +673,13 @@ func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) ([]byte, error) } } // if not read lock or table was unlock then snapshot get - return e.snapshot.Get(ctx, key) + if e.Ctx().GetSessionVars().MaxExecutionTime > 0 { + // if the query has max execution time set, we need to set the context deadline for the get request + ctxWithTimeout, _ := context.WithTimeout(ctx, time.Duration(e.Ctx().GetSessionVars().MaxExecutionTime)*time.Millisecond) + return e.snapshot.Get(ctxWithTimeout, key) + } else { + return e.snapshot.Get(ctx, key) + } } func (e *PointGetExecutor) verifyTxnScope() error { diff --git a/pkg/kv/kv.go b/pkg/kv/kv.go index 785ab7240dd05..26a5ee7a4fccf 100644 --- a/pkg/kv/kv.go +++ b/pkg/kv/kv.go @@ -602,6 +602,8 @@ type Request struct { StoreBusyThreshold time.Duration // TiKVClientReadTimeout is the timeout of kv read request TiKVClientReadTimeout uint64 + // MaxExecutionTime is the timeout of the whole query execution + MaxExecutionTime uint64 RunawayChecker resourcegroup.RunawayChecker diff --git a/pkg/session/session.go b/pkg/session/session.go index 9a427d13bd35b..1850ba2729559 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -2644,6 +2644,7 @@ func (s *session) GetDistSQLCtx() *distsqlctx.DistSQLContext { LoadBasedReplicaReadThreshold: vars.LoadBasedReplicaReadThreshold, RunawayChecker: sc.RunawayChecker, TiKVClientReadTimeout: vars.GetTiKVClientReadTimeout(), + MaxExecutionTime: vars.GetMaxExecutionTime(), ReplicaClosestReadThreshold: vars.ReplicaClosestReadThreshold, ConnectionID: vars.ConnectionID, diff --git a/pkg/store/copr/batch_coprocessor.go b/pkg/store/copr/batch_coprocessor.go index 5a1847e99ae86..57c36883b218f 100644 --- a/pkg/store/copr/batch_coprocessor.go +++ b/pkg/store/copr/batch_coprocessor.go @@ -1115,6 +1115,12 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.V ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs) bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars) + if req.MaxExecutionTime > 0 { + // If MaxExecutionTime is set, we need to set the deadline for the whole batch coprocessor request context. + ctxWithTimeout, _ := context.WithTimeout(bo.GetCtx(), time.Duration(req.MaxExecutionTime)*time.Millisecond) + bo.TiKVBackoffer().SetCtx(ctxWithTimeout) + } + var tasks []*batchCopTask var err error if req.PartitionIDAndRanges != nil { diff --git a/pkg/store/copr/coprocessor.go b/pkg/store/copr/coprocessor.go index ca9866073a371..41de1f8b26efd 100644 --- a/pkg/store/copr/coprocessor.go +++ b/pkg/store/copr/coprocessor.go @@ -349,6 +349,12 @@ func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*c } }) + if req.MaxExecutionTime > 0 { + // If the request has a MaxExecutionTime, we need to set the deadline of the context. + ctxWithTimeout, _ := context.WithTimeout(bo.GetCtx(), time.Duration(req.MaxExecutionTime)*time.Millisecond) + bo.TiKVBackoffer().SetCtx(ctxWithTimeout) + } + // TODO(youjiali1995): is there any request type that needn't be split by buckets? locs, err := cache.SplitKeyRangesByBuckets(bo, ranges) if err != nil { From 8c021ab328e53a3e37f5b9637183d3e289de619f Mon Sep 17 00:00:00 2001 From: HaoW30 <2007harf@gmail.com> Date: Wed, 20 Nov 2024 10:53:46 -0800 Subject: [PATCH 2/2] Fixes for ci test failures * Add defer cancel to avoid memory leakage * Fix test context.TestContextDetach (pkg/distsql/context/context_test.go) * update bazel file --- pkg/distsql/BUILD.bazel | 2 +- pkg/distsql/context/context_test.go | 1 + pkg/executor/batch_point_get.go | 4 +++- pkg/executor/point_get.go | 6 +++--- pkg/store/copr/batch_coprocessor.go | 3 ++- pkg/store/copr/coprocessor.go | 3 ++- 6 files changed, 12 insertions(+), 7 deletions(-) diff --git a/pkg/distsql/BUILD.bazel b/pkg/distsql/BUILD.bazel index a3be4231a4e8f..d97f39d2ce1dd 100644 --- a/pkg/distsql/BUILD.bazel +++ b/pkg/distsql/BUILD.bazel @@ -65,7 +65,7 @@ go_test( embed = [":distsql"], flaky = True, race = "on", - shard_count = 28, + shard_count = 29, deps = [ "//pkg/distsql/context", "//pkg/errctx", diff --git a/pkg/distsql/context/context_test.go b/pkg/distsql/context/context_test.go index 5cf8a36a2e2b5..d65f801207e85 100644 --- a/pkg/distsql/context/context_test.go +++ b/pkg/distsql/context/context_test.go @@ -84,6 +84,7 @@ func TestContextDetach(t *testing.T) { ResourceGroupName: "c", LoadBasedReplicaReadThreshold: time.Second, TiKVClientReadTimeout: 1, + MaxExecutionTime: 1, ReplicaClosestReadThreshold: 1, ConnectionID: 1, diff --git a/pkg/executor/batch_point_get.go b/pkg/executor/batch_point_get.go index 4392605e40a24..3df09cfd4fb18 100644 --- a/pkg/executor/batch_point_get.go +++ b/pkg/executor/batch_point_get.go @@ -238,7 +238,9 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { batchGetter := e.batchGetter if e.Ctx().GetSessionVars().MaxExecutionTime > 0 { // If MaxExecutionTime is set, we need to set the context deadline for the batch get. - ctx, _ = context.WithTimeout(ctx, time.Duration(e.Ctx().GetSessionVars().MaxExecutionTime)*time.Millisecond) + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Duration(e.Ctx().GetSessionVars().MaxExecutionTime)*time.Millisecond) + defer cancel() } rc := e.Ctx().GetSessionVars().IsPessimisticReadConsistency() if e.idxInfo != nil && !isCommonHandleRead(e.tblInfo, e.idxInfo) { diff --git a/pkg/executor/point_get.go b/pkg/executor/point_get.go index 51aa939c335b0..4add1d108e991 100644 --- a/pkg/executor/point_get.go +++ b/pkg/executor/point_get.go @@ -675,11 +675,11 @@ func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) ([]byte, error) // if not read lock or table was unlock then snapshot get if e.Ctx().GetSessionVars().MaxExecutionTime > 0 { // if the query has max execution time set, we need to set the context deadline for the get request - ctxWithTimeout, _ := context.WithTimeout(ctx, time.Duration(e.Ctx().GetSessionVars().MaxExecutionTime)*time.Millisecond) + ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Duration(e.Ctx().GetSessionVars().MaxExecutionTime)*time.Millisecond) + defer cancel() return e.snapshot.Get(ctxWithTimeout, key) - } else { - return e.snapshot.Get(ctx, key) } + return e.snapshot.Get(ctx, key) } func (e *PointGetExecutor) verifyTxnScope() error { diff --git a/pkg/store/copr/batch_coprocessor.go b/pkg/store/copr/batch_coprocessor.go index 57c36883b218f..b28d5a812c494 100644 --- a/pkg/store/copr/batch_coprocessor.go +++ b/pkg/store/copr/batch_coprocessor.go @@ -1117,7 +1117,8 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.V if req.MaxExecutionTime > 0 { // If MaxExecutionTime is set, we need to set the deadline for the whole batch coprocessor request context. - ctxWithTimeout, _ := context.WithTimeout(bo.GetCtx(), time.Duration(req.MaxExecutionTime)*time.Millisecond) + ctxWithTimeout, cancel := context.WithTimeout(bo.GetCtx(), time.Duration(req.MaxExecutionTime)*time.Millisecond) + defer cancel() bo.TiKVBackoffer().SetCtx(ctxWithTimeout) } diff --git a/pkg/store/copr/coprocessor.go b/pkg/store/copr/coprocessor.go index 41de1f8b26efd..b99499f1b4cf3 100644 --- a/pkg/store/copr/coprocessor.go +++ b/pkg/store/copr/coprocessor.go @@ -351,7 +351,8 @@ func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*c if req.MaxExecutionTime > 0 { // If the request has a MaxExecutionTime, we need to set the deadline of the context. - ctxWithTimeout, _ := context.WithTimeout(bo.GetCtx(), time.Duration(req.MaxExecutionTime)*time.Millisecond) + ctxWithTimeout, cancel := context.WithTimeout(bo.GetCtx(), time.Duration(req.MaxExecutionTime)*time.Millisecond) + defer cancel() bo.TiKVBackoffer().SetCtx(ctxWithTimeout) }