diff --git a/distsql/request_builder.go b/distsql/request_builder.go index f1ed9f77ad637..96f97d9193565 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -154,7 +154,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req builder.Request.IsolationLevel = builder.getIsolationLevel() builder.Request.NotFillCache = sv.StmtCtx.NotFillCache builder.Request.Priority = builder.getKVPriority(sv) - builder.Request.ReplicaRead = sv.ReplicaRead + builder.Request.ReplicaRead = sv.GetReplicaRead() return builder } diff --git a/distsql/request_builder_test.go b/distsql/request_builder_test.go index c73f775096193..2274f8003cf57 100644 --- a/distsql/request_builder_test.go +++ b/distsql/request_builder_test.go @@ -564,7 +564,7 @@ func (s *testSuite) TestRequestBuilder6(c *C) { func (s *testSuite) TestRequestBuilder7(c *C) { vars := variable.NewSessionVars() - vars.ReplicaRead = kv.ReplicaReadFollower + vars.SetReplicaRead(kv.ReplicaReadFollower) concurrency := 10 diff --git a/executor/analyze.go b/executor/analyze.go index 12106f89a59d4..f55f2354fd474 100755 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -624,7 +624,7 @@ func (e *AnalyzeFastExec) getSampRegionsRowCount(bo *tikv.Backoffer, needRebuild var resp *tikvrpc.Response var rpcCtx *tikv.RPCContext // we always use the first follower when follower read is enabled - rpcCtx, *err = e.cache.GetRPCContext(bo, loc.Region, e.ctx.GetSessionVars().ReplicaRead, 0) + rpcCtx, *err = e.cache.GetRPCContext(bo, loc.Region, e.ctx.GetSessionVars().GetReplicaRead(), 0) if *err != nil { return } @@ -925,7 +925,7 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err if err != nil { return 0, err } - if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() { + if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } for _, t := range e.scanTasks { @@ -949,7 +949,7 @@ func (e *AnalyzeFastExec) handleSampTasks(bo *tikv.Backoffer, workID int, err *e if *err != nil { return } - if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() { + if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } rander := rand.New(rand.NewSource(e.randSeed + int64(workID))) diff --git a/executor/analyze_test.go b/executor/analyze_test.go index 70f5617849f3f..71b3ecb47c1d1 100644 --- a/executor/analyze_test.go +++ b/executor/analyze_test.go @@ -112,7 +112,7 @@ func (s *testSuite1) TestAnalyzeReplicaReadFollower(c *C) { tk.MustExec("drop table if exists t") tk.MustExec("create table t(a int)") ctx := tk.Se.(sessionctx.Context) - ctx.GetSessionVars().ReplicaRead = kv.ReplicaReadFollower + ctx.GetSessionVars().SetReplicaRead(kv.ReplicaReadFollower) tk.MustExec("analyze table t") } diff --git a/executor/executor.go b/executor/executor.go index 3e2548ec081ef..7be328e873ea8 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1366,13 +1366,101 @@ func (e *UnionExec) Close() error { return e.baseExecutor.Close() } +func extractStmtHintsFromStmtNode(stmtNode ast.StmtNode) []*ast.TableOptimizerHint { + switch x := stmtNode.(type) { + case *ast.SelectStmt: + return x.TableHints + case *ast.UpdateStmt: + return x.TableHints + case *ast.DeleteStmt: + return x.TableHints + // TODO: support hint for InsertStmt + case *ast.ExplainStmt: + return extractStmtHintsFromStmtNode(x.Stmt) + default: + return nil + } +} + +func handleStmtHints(hints []*ast.TableOptimizerHint) (stmtHints stmtctx.StmtHints, warns []error) { + var memoryQuotaHintList, noIndexMergeHintList, useToJAHintList, readReplicaHintList []*ast.TableOptimizerHint + for _, hint := range hints { + switch hint.HintName.L { + case "memory_quota": + memoryQuotaHintList = append(memoryQuotaHintList, hint) + case "no_index_merge": + noIndexMergeHintList = append(noIndexMergeHintList, hint) + case "use_toja": + useToJAHintList = append(useToJAHintList, hint) + case "read_consistent_replica": + readReplicaHintList = append(readReplicaHintList, hint) + } + } + // Handle MEMORY_QUOTA + if len(memoryQuotaHintList) != 0 { + if len(memoryQuotaHintList) > 1 { + warn := errors.New("There are multiple MEMORY_QUOTA hints, only the last one will take effect") + warns = append(warns, warn) + } + hint := memoryQuotaHintList[len(memoryQuotaHintList)-1] + // Executor use MemoryQuota <= 0 to indicate no memory limit, here use < 0 to handle hint syntax error. + if hint.MemoryQuota < 0 { + warn := errors.New("The use of MEMORY_QUOTA hint is invalid, valid usage: MEMORY_QUOTA(10 MB) or MEMORY_QUOTA(10 GB)") + warns = append(warns, warn) + } else { + stmtHints.HasMemQuotaHint = true + stmtHints.MemQuotaQuery = hint.MemoryQuota + if hint.MemoryQuota == 0 { + warn := errors.New("Setting the MEMORY_QUOTA to 0 means no memory limit") + warns = append(warns, warn) + } + } + } + // Handle USE_TOJA + if len(useToJAHintList) != 0 { + if len(useToJAHintList) > 1 { + warn := errors.New("There are multiple USE_TOJA hints, only the last one will take effect") + warns = append(warns, warn) + } + hint := useToJAHintList[len(useToJAHintList)-1] + stmtHints.HasAllowInSubqToJoinAndAggHint = true + stmtHints.AllowInSubqToJoinAndAgg = hint.HintFlag + } + // Handle NO_INDEX_MERGE + if len(noIndexMergeHintList) != 0 { + if len(noIndexMergeHintList) > 1 { + warn := errors.New("There are multiple NO_INDEX_MERGE hints, only the last one will take effect") + warns = append(warns, warn) + } + stmtHints.HasEnableIndexMergeHint = true + stmtHints.EnableIndexMerge = false + } + // Handle READ_CONSISTENT_REPLICA + if len(readReplicaHintList) != 0 { + if len(readReplicaHintList) > 1 { + warn := errors.New("There are multiple READ_CONSISTENT_REPLICA hints, only the last one will take effect") + warns = append(warns, warn) + } + stmtHints.HasReplicaReadHint = true + stmtHints.ReplicaRead = byte(kv.ReplicaReadFollower) + } + return +} + // ResetContextOfStmt resets the StmtContext and session variables. // Before every execution, we must clear statement context. func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { + hints := extractStmtHintsFromStmtNode(s) + stmtHints, hintWarns := handleStmtHints(hints) vars := ctx.GetSessionVars() + memQuota := vars.MemQuotaQuery + if stmtHints.HasMemQuotaHint { + memQuota = stmtHints.MemQuotaQuery + } sc := &stmtctx.StatementContext{ + StmtHints: stmtHints, TimeZone: vars.Location(), - MemTracker: memory.NewTracker(stringutil.MemoizeStr(s.Text), vars.MemQuotaQuery), + MemTracker: memory.NewTracker(stringutil.MemoizeStr(s.Text), memQuota), } switch config.GetGlobalConfig().OOMAction { case config.OOMActionCancel: @@ -1504,5 +1592,8 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { return err } vars.StmtCtx = sc + for _, warn := range hintWarns { + vars.StmtCtx.AppendWarning(warn) + } return } diff --git a/executor/point_get.go b/executor/point_get.go index 743661297610d..7101542db379b 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -91,7 +91,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { if err != nil { return err } - if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() { + if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } if e.idxInfo != nil { diff --git a/planner/core/expression_rewriter.go b/planner/core/expression_rewriter.go index 9a946878c0ae5..229c8c12b1f5a 100644 --- a/planner/core/expression_rewriter.go +++ b/planner/core/expression_rewriter.go @@ -746,7 +746,7 @@ func (er *expressionRewriter) handleInSubquery(ctx context.Context, v *ast.Patte // and has no correlated column from the current level plan(if the correlated column is from upper level, // we can treat it as constant, because the upper LogicalApply cannot be eliminated since current node is a join node), // and don't need to append a scalar value, we can rewrite it to inner join. - if er.sctx.GetSessionVars().AllowInSubqToJoinAndAgg && !v.Not && !asScalar && len(extractCorColumnsBySchema(np, er.p.Schema())) == 0 { + if er.sctx.GetSessionVars().GetAllowInSubqToJoinAndAgg() && !v.Not && !asScalar && len(extractCorColumnsBySchema(np, er.p.Schema())) == 0 { // We need to try to eliminate the agg and the projection produced by this operation. er.b.optFlag |= flagEliminateAgg er.b.optFlag |= flagEliminateProjection diff --git a/planner/core/indexmerge_test.go b/planner/core/indexmerge_test.go index 8c9fc718349d4..5979c3df8436e 100644 --- a/planner/core/indexmerge_test.go +++ b/planner/core/indexmerge_test.go @@ -130,7 +130,7 @@ func (s *testIndexMergeSuite) TestIndexMergePathGenerateion(c *C) { lp = lp.Children()[0] } } - ds.ctx.GetSessionVars().EnableIndexMerge = true + ds.ctx.GetSessionVars().SetEnableIndexMerge(true) idxMergeStartIndex := len(ds.possibleAccessPaths) _, err = lp.recursiveDeriveStats() c.Assert(err, IsNil) diff --git a/planner/core/stats.go b/planner/core/stats.go index cc234fbf30e93..516273a32a92f 100644 --- a/planner/core/stats.go +++ b/planner/core/stats.go @@ -166,7 +166,7 @@ func (ds *DataSource) DeriveStats(childStats []*property.StatsInfo) (*property.S } } // Consider the IndexMergePath. Now, we just generate `IndexMergePath` in DNF case. - if len(ds.pushedDownConds) > 0 && len(ds.possibleAccessPaths) > 1 && ds.ctx.GetSessionVars().EnableIndexMerge { + if len(ds.pushedDownConds) > 0 && len(ds.possibleAccessPaths) > 1 && ds.ctx.GetSessionVars().GetEnableIndexMerge() { needConsiderIndexMerge := true for i := 1; i < len(ds.possibleAccessPaths); i++ { if len(ds.possibleAccessPaths[i].accessConds) != 0 { diff --git a/session/session.go b/session/session.go index 4c25828592d63..34cf8840911f4 100644 --- a/session/session.go +++ b/session/session.go @@ -1212,7 +1212,7 @@ func (s *session) Txn(active bool) (kv.Transaction, error) { s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true) } s.sessionVars.TxnCtx.CouldRetry = s.isTxnRetryable() - if s.sessionVars.ReplicaRead.IsFollowerRead() { + if s.sessionVars.GetReplicaRead().IsFollowerRead() { s.txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } } @@ -1275,7 +1275,7 @@ func (s *session) NewTxn(ctx context.Context) error { } txn.SetCap(s.getMembufCap()) txn.SetVars(s.sessionVars.KVVars) - if s.GetSessionVars().ReplicaRead.IsFollowerRead() { + if s.GetSessionVars().GetReplicaRead().IsFollowerRead() { txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } s.txn.changeInvalidToValid(txn) diff --git a/session/session_test.go b/session/session_test.go index 9ba4eb7415fb1..5580141d2393f 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -2808,9 +2808,59 @@ func (s *testSessionSuite) TestReplicaRead(c *C) { tk := testkit.NewTestKit(c, s.store) tk.Se, err = session.CreateSession4Test(s.store) c.Assert(err, IsNil) - c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadLeader) + c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadLeader) tk.MustExec("set @@tidb_replica_read = 'follower';") - c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadFollower) + c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadFollower) tk.MustExec("set @@tidb_replica_read = 'leader';") - c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadLeader) + c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadLeader) +} + +func (s *testSessionSuite) TestStmtHints(c *C) { + var err error + tk := testkit.NewTestKit(c, s.store) + tk.Se, err = session.CreateSession4Test(s.store) + c.Assert(err, IsNil) + + // Test MEMORY_QUOTA hint + tk.MustExec("select /*+ MEMORY_QUOTA(1 MB) */ 1;") + val := int64(1) * 1024 * 1024 + c.Assert(tk.Se.GetSessionVars().StmtCtx.MemTracker.CheckBytesLimit(val), IsTrue) + tk.MustExec("select /*+ MEMORY_QUOTA(1 GB) */ 1;") + val = int64(1) * 1024 * 1024 * 1024 + c.Assert(tk.Se.GetSessionVars().StmtCtx.MemTracker.CheckBytesLimit(val), IsTrue) + tk.MustExec("select /*+ MEMORY_QUOTA(1 GB), MEMORY_QUOTA(1 MB) */ 1;") + val = int64(1) * 1024 * 1024 + c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1) + c.Assert(tk.Se.GetSessionVars().StmtCtx.MemTracker.CheckBytesLimit(val), IsTrue) + tk.MustExec("select /*+ MEMORY_QUOTA(0 GB) */ 1;") + val = int64(0) + c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1) + c.Assert(tk.Se.GetSessionVars().StmtCtx.MemTracker.CheckBytesLimit(val), IsTrue) + + // Test NO_INDEX_MERGE hint + tk.Se.GetSessionVars().SetEnableIndexMerge(true) + tk.MustExec("select /*+ NO_INDEX_MERGE() */ 1;") + c.Assert(tk.Se.GetSessionVars().GetEnableIndexMerge(), IsFalse) + tk.MustExec("select /*+ NO_INDEX_MERGE(), NO_INDEX_MERGE() */ 1;") + c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1) + c.Assert(tk.Se.GetSessionVars().GetEnableIndexMerge(), IsFalse) + + // Test USE_TOJA hint + tk.Se.GetSessionVars().SetAllowInSubqToJoinAndAgg(true) + tk.MustExec("select /*+ USE_TOJA(false) */ 1;") + c.Assert(tk.Se.GetSessionVars().GetAllowInSubqToJoinAndAgg(), IsFalse) + tk.Se.GetSessionVars().SetAllowInSubqToJoinAndAgg(false) + tk.MustExec("select /*+ USE_TOJA(true) */ 1;") + c.Assert(tk.Se.GetSessionVars().GetAllowInSubqToJoinAndAgg(), IsTrue) + tk.MustExec("select /*+ USE_TOJA(false), USE_TOJA(true) */ 1;") + c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1) + c.Assert(tk.Se.GetSessionVars().GetAllowInSubqToJoinAndAgg(), IsTrue) + + // Test READ_CONSISTENT_REPLICA hint + tk.Se.GetSessionVars().SetReplicaRead(kv.ReplicaReadLeader) + tk.MustExec("select /*+ READ_CONSISTENT_REPLICA() */ 1;") + c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadFollower) + tk.MustExec("select /*+ READ_CONSISTENT_REPLICA(), READ_CONSISTENT_REPLICA() */ 1;") + c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1) + c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadFollower) } diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index a7ef3c75bd8e3..80fc128505978 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -47,6 +47,7 @@ type SQLWarn struct { // It should be reset before executing a statement. type StatementContext struct { // Set the following variables before execution + StmtHints // IsDDLJobInQueue is used to mark whether the DDL job is put into the queue. // If IsDDLJobInQueue is true, it means the DDL job is in the queue of storage, and it can be handled by the DDL worker. @@ -137,6 +138,21 @@ type StatementContext struct { Tables []TableEntry } +// StmtHints are SessionVars related sql hints. +type StmtHints struct { + // Hint flags + HasAllowInSubqToJoinAndAggHint bool + HasEnableIndexMergeHint bool + HasMemQuotaHint bool + HasReplicaReadHint bool + + // Hint Information + AllowInSubqToJoinAndAgg bool + EnableIndexMerge bool + MemQuotaQuery int64 + ReplicaRead byte +} + // GetNowTsCached getter for nowTs, if not set get now time and cache it func (sc *StatementContext) GetNowTsCached() time.Time { if !sc.stmtTimeCached { diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index e402b060e02cf..6fb291d6d57c7 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -287,9 +287,6 @@ type SessionVars struct { // This variable is currently not recommended to be turned on. AllowWriteRowID bool - // AllowInSubqToJoinAndAgg can be set to false to forbid rewriting the semi join to inner join with agg. - AllowInSubqToJoinAndAgg bool - // CorrelationThreshold is the guard to enable row count estimation using column order correlation. CorrelationThreshold float64 @@ -342,9 +339,6 @@ type SessionVars struct { // EnableVectorizedExpression enables the vectorized expression evaluation. EnableVectorizedExpression bool - // EnableIndexMerge enables the generation of IndexMergePath. - EnableIndexMerge bool - // DDLReorgPriority is the operation priority of adding indices. DDLReorgPriority int @@ -403,9 +397,6 @@ type SessionVars struct { // use noop funcs or not EnableNoopFuncs bool - // ReplicaRead is used for reading data from replicas, only follower is supported at this time. - ReplicaRead kv.ReplicaReadType - // StartTime is the start time of the last query. StartTime time.Time @@ -420,6 +411,17 @@ type SessionVars struct { // AllowRemoveAutoInc indicates whether a user can drop the auto_increment column attribute or not. AllowRemoveAutoInc bool + + // Unexported fields should be accessed and set through interfaces like GetReplicaRead() and SetReplicaRead(). + + // allowInSubqToJoinAndAgg can be set to false to forbid rewriting the semi join to inner join with agg. + allowInSubqToJoinAndAgg bool + + // EnableIndexMerge enables the generation of IndexMergePath. + enableIndexMerge bool + + // replicaRead is used for reading data from replicas, only follower is supported at this time. + replicaRead kv.ReplicaReadType } // ConnectionInfo present connection used by audit. @@ -462,7 +464,7 @@ func NewSessionVars() *SessionVars { RetryLimit: DefTiDBRetryLimit, DisableTxnAutoRetry: DefTiDBDisableTxnAutoRetry, DDLReorgPriority: kv.PriorityLow, - AllowInSubqToJoinAndAgg: DefOptInSubqToJoinAndAgg, + allowInSubqToJoinAndAgg: DefOptInSubqToJoinAndAgg, CorrelationThreshold: DefOptCorrelationThreshold, CorrelationExpFactor: DefOptCorrelationExpFactor, EnableRadixJoin: false, @@ -473,9 +475,9 @@ func NewSessionVars() *SessionVars { SlowQueryFile: config.GetGlobalConfig().Log.SlowQueryFile, WaitSplitRegionFinish: DefTiDBWaitSplitRegionFinish, WaitSplitRegionTimeout: DefWaitSplitRegionTimeout, - EnableIndexMerge: false, + enableIndexMerge: false, EnableNoopFuncs: DefTiDBEnableNoopFuncs, - ReplicaRead: kv.ReplicaReadLeader, + replicaRead: kv.ReplicaReadLeader, AllowRemoveAutoInc: DefTiDBAllowRemoveAutoInc, } vars.Concurrency = Concurrency{ @@ -516,6 +518,45 @@ func NewSessionVars() *SessionVars { return vars } +// GetAllowInSubqToJoinAndAgg get AllowInSubqToJoinAndAgg from sql hints and SessionVars.allowInSubqToJoinAndAgg. +func (s *SessionVars) GetAllowInSubqToJoinAndAgg() bool { + if s.StmtCtx.HasAllowInSubqToJoinAndAggHint { + return s.StmtCtx.AllowInSubqToJoinAndAgg + } + return s.allowInSubqToJoinAndAgg +} + +// SetAllowInSubqToJoinAndAgg set SessionVars.allowInSubqToJoinAndAgg. +func (s *SessionVars) SetAllowInSubqToJoinAndAgg(val bool) { + s.allowInSubqToJoinAndAgg = val +} + +// GetEnableIndexMerge get EnableIndexMerge from sql hints and SessionVars.enableIndexMerge. +func (s *SessionVars) GetEnableIndexMerge() bool { + if s.StmtCtx.HasEnableIndexMergeHint { + return s.StmtCtx.EnableIndexMerge + } + return s.enableIndexMerge +} + +// SetEnableIndexMerge set SessionVars.enableIndexMerge. +func (s *SessionVars) SetEnableIndexMerge(val bool) { + s.enableIndexMerge = val +} + +// GetReplicaRead get ReplicaRead from sql hints and SessionVars.replicaRead. +func (s *SessionVars) GetReplicaRead() kv.ReplicaReadType { + if s.StmtCtx.HasReplicaReadHint { + return kv.ReplicaReadType(s.StmtCtx.ReplicaRead) + } + return s.replicaRead +} + +// SetReplicaRead set SessionVars.replicaRead. +func (s *SessionVars) SetReplicaRead(val kv.ReplicaReadType) { + s.replicaRead = val +} + // GetWriteStmtBufs get pointer of SessionVars.writeStmtBufs. func (s *SessionVars) GetWriteStmtBufs() *WriteStmtBufs { return &s.writeStmtBufs @@ -742,7 +783,7 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { case TiDBOptWriteRowID: s.AllowWriteRowID = TiDBOptOn(val) case TiDBOptInSubqToJoinAndAgg: - s.AllowInSubqToJoinAndAgg = TiDBOptOn(val) + s.SetAllowInSubqToJoinAndAgg(TiDBOptOn(val)) case TiDBOptCorrelationThreshold: s.CorrelationThreshold = tidbOptFloat64(val, DefOptCorrelationThreshold) case TiDBOptCorrelationExpFactor: @@ -852,14 +893,14 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { case TiDBLowResolutionTSO: s.LowResolutionTSO = TiDBOptOn(val) case TiDBEnableIndexMerge: - s.EnableIndexMerge = TiDBOptOn(val) + s.SetEnableIndexMerge(TiDBOptOn(val)) case TiDBEnableNoopFuncs: s.EnableNoopFuncs = TiDBOptOn(val) case TiDBReplicaRead: if strings.EqualFold(val, "follower") { - s.ReplicaRead = kv.ReplicaReadFollower + s.SetReplicaRead(kv.ReplicaReadFollower) } else if strings.EqualFold(val, "leader") || len(val) == 0 { - s.ReplicaRead = kv.ReplicaReadLeader + s.SetReplicaRead(kv.ReplicaReadLeader) } case TiDBAllowRemoveAutoInc: s.AllowRemoveAutoInc = TiDBOptOn(val) diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index c3227884114f7..9ee35c7bf3ae6 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -300,12 +300,12 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) { val, err = GetSessionSystemVar(v, TiDBReplicaRead) c.Assert(err, IsNil) c.Assert(val, Equals, "follower") - c.Assert(v.ReplicaRead, Equals, kv.ReplicaReadFollower) + c.Assert(v.GetReplicaRead(), Equals, kv.ReplicaReadFollower) SetSessionSystemVar(v, TiDBReplicaRead, types.NewStringDatum("leader")) val, err = GetSessionSystemVar(v, TiDBReplicaRead) c.Assert(err, IsNil) c.Assert(val, Equals, "leader") - c.Assert(v.ReplicaRead, Equals, kv.ReplicaReadLeader) + c.Assert(v.GetReplicaRead(), Equals, kv.ReplicaReadLeader) } func (s *testVarsutilSuite) TestSetOverflowBehave(c *C) { diff --git a/util/admin/admin.go b/util/admin/admin.go index 2d1d74e74d81f..1a65d9e8ed18e 100644 --- a/util/admin/admin.go +++ b/util/admin/admin.go @@ -555,7 +555,7 @@ func ScanSnapshotTableRecord(sessCtx sessionctx.Context, store kv.Storage, ver k return nil, 0, errors.Trace(err) } - if sessCtx.GetSessionVars().ReplicaRead.IsFollowerRead() { + if sessCtx.GetSessionVars().GetReplicaRead().IsFollowerRead() { snap.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } diff --git a/util/memory/tracker.go b/util/memory/tracker.go index 003a1867e3b9f..9ffda11790391 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -62,6 +62,12 @@ func NewTracker(label fmt.Stringer, bytesLimit int64) *Tracker { } } +// CheckBytesLimit check whether the bytes limit of the tracker is equal to a value. +// Only used in test. +func (t *Tracker) CheckBytesLimit(val int64) bool { + return t.bytesLimit == val +} + // SetBytesLimit sets the bytes limit for this tracker. // "bytesLimit <= 0" means no limit. func (t *Tracker) SetBytesLimit(bytesLimit int64) {