From 34319420fb61a200924284f4ea933be16078b41a Mon Sep 17 00:00:00 2001 From: foreyes Date: Wed, 21 Aug 2019 17:36:02 +0800 Subject: [PATCH 1/8] add hints --- session/session.go | 134 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 134 insertions(+) diff --git a/session/session.go b/session/session.go index a21aad4186ee2..b091e952d8537 100644 --- a/session/session.go +++ b/session/session.go @@ -1045,6 +1045,128 @@ func (s *session) Execute(ctx context.Context, sql string) (recordSets []sqlexec return } +type sessionVarsRevertInfo struct { + varName string + datum types.Datum +} + +func getHintDatumFromBool(flag bool) types.Datum { + if flag { + return types.NewStringDatum("on") + } + return types.NewStringDatum("off") +} + +func (s *session) setSessionVarsByHints(stmtNode ast.StmtNode) (revertInfos []sessionVarsRevertInfo, warns []error) { + var hints []*ast.TableOptimizerHint + switch x := stmtNode.(type) { + case *ast.SelectStmt: + hints = x.TableHints + case *ast.UpdateStmt: + hints = x.TableHints + case *ast.DeleteStmt: + hints = x.TableHints + // TODO: support hint for InsertStmt + default: + return + } + var memoryQuotaHintList, noIndexMergeHintList, useToJAHintList, readReplicaHintList []*ast.TableOptimizerHint + for _, hint := range hints { + switch hint.HintName.L { + case "memory_quota": + memoryQuotaHintList = append(memoryQuotaHintList, hint) + case "no_index_merge": + noIndexMergeHintList = append(noIndexMergeHintList, hint) + case "use_toja": + useToJAHintList = append(useToJAHintList, hint) + case "read_consistent_replica": + readReplicaHintList = append(readReplicaHintList, hint) + } + } + // Handle MEMORY_QUOTA + if len(memoryQuotaHintList) > 1 { + warn := errors.New("There are multiple MEMORY_QUOTA hints, only the last one will take effect") + warns = append(warns, warn) + } + if len(memoryQuotaHintList) != 0 { + hint := memoryQuotaHintList[len(memoryQuotaHintList)-1] + // Executor use MemoryQuota < 0 to indicate no memory limit, here use it to handle hint syntax error. + if hint.MemoryQuota < 0 { + warn := errors.New("There are some syntax error in MEMORY_QUOTA hint, correct usage: MEMORY_QUOTA(10 M) or MEMORY_QUOTA(10 G)") + warns = append(warns, warn) + } else { + revertInfos = append(revertInfos, sessionVarsRevertInfo{ + varName: variable.TIDBMemQuotaQuery, + datum: types.NewIntDatum(s.sessionVars.MemQuotaQuery), + }) + err := variable.SetSessionSystemVar(s.sessionVars, variable.TIDBMemQuotaQuery, types.NewIntDatum(int64(hint.MemoryQuota))) + if err != nil { + // SQL hints should not trigger error. + warns = append(warns, err) + } + } + } + // Handle USE_TOJA + if len(useToJAHintList) > 1 { + warn := errors.New("There are multiple USE_TOJA hints, only the last one will take effect") + warns = append(warns, warn) + } + if len(useToJAHintList) != 0 { + revertInfos = append(revertInfos, sessionVarsRevertInfo{ + varName: variable.TiDBOptInSubqToJoinAndAgg, + datum: getHintDatumFromBool(s.sessionVars.AllowInSubqToJoinAndAgg), + }) + hint := useToJAHintList[len(useToJAHintList)-1] + err := variable.SetSessionSystemVar(s.sessionVars, variable.TiDBOptInSubqToJoinAndAgg, getHintDatumFromBool(hint.HintFlag)) + if err != nil { + // SQL hints should not trigger error. + warns = append(warns, err) + } + } + // Handle NO_INDEX_MERGE + if len(noIndexMergeHintList) > 1 { + warn := errors.New("There are multiple NO_INDEX_MERGE hints, only the last one will take effect") + warns = append(warns, warn) + } + if len(noIndexMergeHintList) != 0 { + revertInfos = append(revertInfos, sessionVarsRevertInfo{ + varName: variable.TiDBEnableIndexMerge, + datum: getHintDatumFromBool(s.sessionVars.EnableIndexMerge), + }) + err := variable.SetSessionSystemVar(s.sessionVars, variable.TiDBEnableIndexMerge, getHintDatumFromBool(false)) + if err != nil { + // SQL hints should not trigger error. + warns = append(warns, err) + } + } + // Handle READ_CONSISTENT_REPLICA + if len(readReplicaHintList) > 1 { + warn := errors.New("There are multiple READ_CONSISTENT_REPLICA hints, only the last one will take effect") + warns = append(warns, warn) + } + if len(readReplicaHintList) != 0 { + var datum types.Datum + switch s.sessionVars.ReplicaRead { + case kv.ReplicaReadLeader: + datum := types.NewStringDatum("leader") + case kv.ReplicaReadFollower: + datum := types.NewStringDatum("follower") + default: + // Read from learner not implement yet + } + revertInfos = append(revertInfos, sessionVarsRevertInfo{ + varName: variable.TiDBReplicaRead, + datum: datum, + }) + err := variable.SetSessionSystemVar(s.sessionVars, variable.TiDBReplicaRead, types.NewStringDatum("follower")) + if err != nil { + // SQL hints should not trigger error. + warns = append(warns, err) + } + } + return +} + func (s *session) execute(ctx context.Context, sql string) (recordSets []sqlexec.RecordSet, err error) { s.PrepareTxnCtx(ctx) connID := s.sessionVars.ConnectionID @@ -1080,6 +1202,7 @@ func (s *session) execute(ctx context.Context, sql string) (recordSets []sqlexec multiQuery := len(stmtNodes) > 1 for idx, stmtNode := range stmtNodes { s.PrepareTxnCtx(ctx) + revertInfos, hintWarns := s.setSessionVarsByHints(stmtNode) // Step2: Transform abstract syntax tree to a physical plan(stored in executor.ExecStmt). startTS = time.Now() @@ -1087,6 +1210,9 @@ func (s *session) execute(ctx context.Context, sql string) (recordSets []sqlexec if err := executor.ResetContextOfStmt(s, stmtNode); err != nil { return nil, err } + for _, warn := range hintWarns { + s.sessionVars.StmtCtx.AppendWarning(warn) + } stmt, err := compiler.Compile(ctx, stmtNode) if err != nil { if tempStmtNodes == nil { @@ -1119,6 +1245,14 @@ func (s *session) execute(ctx context.Context, sql string) (recordSets []sqlexec if recordSets, err = s.executeStatement(ctx, connID, stmtNode, stmt, recordSets, multiQuery); err != nil { return nil, err } + + for _, revertInfo := range revertInfos { + err := variable.SetSessionSystemVar(s.sessionVars, revertInfo.varName, revertInfo.datum) + if err != nil { + // SQL hints should not trigger error. + s.sessionVars.StmtCtx.AppendWarning(err) + } + } } if s.sessionVars.ClientCapability&mysql.ClientMultiResults == 0 && len(recordSets) > 1 { From 4152caa0b49368c4342a7280a990e7cd07704da4 Mon Sep 17 00:00:00 2001 From: foreyes Date: Mon, 9 Sep 2019 11:20:28 +0800 Subject: [PATCH 2/8] implement change --- executor/executor.go | 84 ++++++++++++++++++++- session/session.go | 134 ---------------------------------- sessionctx/stmtctx/stmtctx.go | 17 +++++ 3 files changed, 100 insertions(+), 135 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index 0663770c30914..2866576886fc9 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1366,13 +1366,92 @@ func (e *UnionExec) Close() error { return e.baseExecutor.Close() } +func extractStmtHintsFromStmtNode(stmtNode ast.StmtNode) (stmtHints stmtctx.StmtHints, warns []error) { + var hints []*ast.TableOptimizerHint + switch x := stmtNode.(type) { + case *ast.SelectStmt: + hints = x.TableHints + case *ast.UpdateStmt: + hints = x.TableHints + case *ast.DeleteStmt: + hints = x.TableHints + // TODO: support hint for InsertStmt + default: + return + } + var memoryQuotaHintList, noIndexMergeHintList, useToJAHintList, readReplicaHintList []*ast.TableOptimizerHint + for _, hint := range hints { + switch hint.HintName.L { + case "memory_quota": + memoryQuotaHintList = append(memoryQuotaHintList, hint) + case "no_index_merge": + noIndexMergeHintList = append(noIndexMergeHintList, hint) + case "use_toja": + useToJAHintList = append(useToJAHintList, hint) + case "read_consistent_replica": + readReplicaHintList = append(readReplicaHintList, hint) + } + } + // Handle MEMORY_QUOTA + if len(memoryQuotaHintList) != 0 { + if len(memoryQuotaHintList) > 1 { + warn := errors.New("There are multiple MEMORY_QUOTA hints, only the last one will take effect") + warns = append(warns, warn) + } + hint := memoryQuotaHintList[len(memoryQuotaHintList)-1] + // Executor use MemoryQuota < 0 to indicate no memory limit, here use it to handle hint syntax error. + if hint.MemoryQuota < 0 { + warn := errors.New("There are some syntax error in MEMORY_QUOTA hint, correct usage: MEMORY_QUOTA(10 M) or MEMORY_QUOTA(10 G)") + warns = append(warns, warn) + } else { + stmtHints.HasMemQuotaHint = true + stmtHints.MemQuotaQuery = int64(hint.MemoryQuota) + } + } + // Handle USE_TOJA + if len(useToJAHintList) != 0 { + if len(useToJAHintList) > 1 { + warn := errors.New("There are multiple USE_TOJA hints, only the last one will take effect") + warns = append(warns, warn) + } + hint := useToJAHintList[len(useToJAHintList)-1] + stmtHints.HasAllowInSubqToJoinAndAggHint = true + stmtHints.AllowInSubqToJoinAndAgg = hint.HintFlag + } + // Handle NO_INDEX_MERGE + if len(noIndexMergeHintList) != 0 { + if len(noIndexMergeHintList) > 1 { + warn := errors.New("There are multiple NO_INDEX_MERGE hints, only the last one will take effect") + warns = append(warns, warn) + } + stmtHints.HasEnableIndexMergeHint = true + stmtHints.EnableIndexMerge = false + } + // Handle READ_CONSISTENT_REPLICA + if len(readReplicaHintList) != 0 { + if len(readReplicaHintList) > 1 { + warn := errors.New("There are multiple READ_CONSISTENT_REPLICA hints, only the last one will take effect") + warns = append(warns, warn) + } + stmtHints.HasReplicaReadHint = true + stmtHints.ReplicaRead = kv.ReplicaReadFollower + } + return +} + // ResetContextOfStmt resets the StmtContext and session variables. // Before every execution, we must clear statement context. func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { + stmtHints, hintWarns := extractStmtHintsFromStmtNode(s) vars := ctx.GetSessionVars() + memQuota := vars.MemQuotaQuery + if stmtHints.HasMemQuotaHint { + memQuota = stmtHints.MemQuotaQuery + } sc := &stmtctx.StatementContext{ + StmtHints: stmtHints, TimeZone: vars.Location(), - MemTracker: memory.NewTracker(stringutil.MemoizeStr(s.Text), vars.MemQuotaQuery), + MemTracker: memory.NewTracker(stringutil.MemoizeStr(s.Text), memQuota), } switch config.GetGlobalConfig().OOMAction { case config.OOMActionCancel: @@ -1503,5 +1582,8 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { // execute missed stmtID uses empty sql sc.OriginalSQL = s.Text() vars.StmtCtx = sc + for _, warn := range hintWarns { + vars.StmtCtx.AppendWarning(warn) + } return } diff --git a/session/session.go b/session/session.go index b091e952d8537..a21aad4186ee2 100644 --- a/session/session.go +++ b/session/session.go @@ -1045,128 +1045,6 @@ func (s *session) Execute(ctx context.Context, sql string) (recordSets []sqlexec return } -type sessionVarsRevertInfo struct { - varName string - datum types.Datum -} - -func getHintDatumFromBool(flag bool) types.Datum { - if flag { - return types.NewStringDatum("on") - } - return types.NewStringDatum("off") -} - -func (s *session) setSessionVarsByHints(stmtNode ast.StmtNode) (revertInfos []sessionVarsRevertInfo, warns []error) { - var hints []*ast.TableOptimizerHint - switch x := stmtNode.(type) { - case *ast.SelectStmt: - hints = x.TableHints - case *ast.UpdateStmt: - hints = x.TableHints - case *ast.DeleteStmt: - hints = x.TableHints - // TODO: support hint for InsertStmt - default: - return - } - var memoryQuotaHintList, noIndexMergeHintList, useToJAHintList, readReplicaHintList []*ast.TableOptimizerHint - for _, hint := range hints { - switch hint.HintName.L { - case "memory_quota": - memoryQuotaHintList = append(memoryQuotaHintList, hint) - case "no_index_merge": - noIndexMergeHintList = append(noIndexMergeHintList, hint) - case "use_toja": - useToJAHintList = append(useToJAHintList, hint) - case "read_consistent_replica": - readReplicaHintList = append(readReplicaHintList, hint) - } - } - // Handle MEMORY_QUOTA - if len(memoryQuotaHintList) > 1 { - warn := errors.New("There are multiple MEMORY_QUOTA hints, only the last one will take effect") - warns = append(warns, warn) - } - if len(memoryQuotaHintList) != 0 { - hint := memoryQuotaHintList[len(memoryQuotaHintList)-1] - // Executor use MemoryQuota < 0 to indicate no memory limit, here use it to handle hint syntax error. - if hint.MemoryQuota < 0 { - warn := errors.New("There are some syntax error in MEMORY_QUOTA hint, correct usage: MEMORY_QUOTA(10 M) or MEMORY_QUOTA(10 G)") - warns = append(warns, warn) - } else { - revertInfos = append(revertInfos, sessionVarsRevertInfo{ - varName: variable.TIDBMemQuotaQuery, - datum: types.NewIntDatum(s.sessionVars.MemQuotaQuery), - }) - err := variable.SetSessionSystemVar(s.sessionVars, variable.TIDBMemQuotaQuery, types.NewIntDatum(int64(hint.MemoryQuota))) - if err != nil { - // SQL hints should not trigger error. - warns = append(warns, err) - } - } - } - // Handle USE_TOJA - if len(useToJAHintList) > 1 { - warn := errors.New("There are multiple USE_TOJA hints, only the last one will take effect") - warns = append(warns, warn) - } - if len(useToJAHintList) != 0 { - revertInfos = append(revertInfos, sessionVarsRevertInfo{ - varName: variable.TiDBOptInSubqToJoinAndAgg, - datum: getHintDatumFromBool(s.sessionVars.AllowInSubqToJoinAndAgg), - }) - hint := useToJAHintList[len(useToJAHintList)-1] - err := variable.SetSessionSystemVar(s.sessionVars, variable.TiDBOptInSubqToJoinAndAgg, getHintDatumFromBool(hint.HintFlag)) - if err != nil { - // SQL hints should not trigger error. - warns = append(warns, err) - } - } - // Handle NO_INDEX_MERGE - if len(noIndexMergeHintList) > 1 { - warn := errors.New("There are multiple NO_INDEX_MERGE hints, only the last one will take effect") - warns = append(warns, warn) - } - if len(noIndexMergeHintList) != 0 { - revertInfos = append(revertInfos, sessionVarsRevertInfo{ - varName: variable.TiDBEnableIndexMerge, - datum: getHintDatumFromBool(s.sessionVars.EnableIndexMerge), - }) - err := variable.SetSessionSystemVar(s.sessionVars, variable.TiDBEnableIndexMerge, getHintDatumFromBool(false)) - if err != nil { - // SQL hints should not trigger error. - warns = append(warns, err) - } - } - // Handle READ_CONSISTENT_REPLICA - if len(readReplicaHintList) > 1 { - warn := errors.New("There are multiple READ_CONSISTENT_REPLICA hints, only the last one will take effect") - warns = append(warns, warn) - } - if len(readReplicaHintList) != 0 { - var datum types.Datum - switch s.sessionVars.ReplicaRead { - case kv.ReplicaReadLeader: - datum := types.NewStringDatum("leader") - case kv.ReplicaReadFollower: - datum := types.NewStringDatum("follower") - default: - // Read from learner not implement yet - } - revertInfos = append(revertInfos, sessionVarsRevertInfo{ - varName: variable.TiDBReplicaRead, - datum: datum, - }) - err := variable.SetSessionSystemVar(s.sessionVars, variable.TiDBReplicaRead, types.NewStringDatum("follower")) - if err != nil { - // SQL hints should not trigger error. - warns = append(warns, err) - } - } - return -} - func (s *session) execute(ctx context.Context, sql string) (recordSets []sqlexec.RecordSet, err error) { s.PrepareTxnCtx(ctx) connID := s.sessionVars.ConnectionID @@ -1202,7 +1080,6 @@ func (s *session) execute(ctx context.Context, sql string) (recordSets []sqlexec multiQuery := len(stmtNodes) > 1 for idx, stmtNode := range stmtNodes { s.PrepareTxnCtx(ctx) - revertInfos, hintWarns := s.setSessionVarsByHints(stmtNode) // Step2: Transform abstract syntax tree to a physical plan(stored in executor.ExecStmt). startTS = time.Now() @@ -1210,9 +1087,6 @@ func (s *session) execute(ctx context.Context, sql string) (recordSets []sqlexec if err := executor.ResetContextOfStmt(s, stmtNode); err != nil { return nil, err } - for _, warn := range hintWarns { - s.sessionVars.StmtCtx.AppendWarning(warn) - } stmt, err := compiler.Compile(ctx, stmtNode) if err != nil { if tempStmtNodes == nil { @@ -1245,14 +1119,6 @@ func (s *session) execute(ctx context.Context, sql string) (recordSets []sqlexec if recordSets, err = s.executeStatement(ctx, connID, stmtNode, stmt, recordSets, multiQuery); err != nil { return nil, err } - - for _, revertInfo := range revertInfos { - err := variable.SetSessionSystemVar(s.sessionVars, revertInfo.varName, revertInfo.datum) - if err != nil { - // SQL hints should not trigger error. - s.sessionVars.StmtCtx.AppendWarning(err) - } - } } if s.sessionVars.ClientCapability&mysql.ClientMultiResults == 0 && len(recordSets) > 1 { diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 6ffbe1831edc5..25ca524e0d744 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/parser" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" "go.uber.org/zap" @@ -47,6 +48,7 @@ type SQLWarn struct { // It should be reset before executing a statement. type StatementContext struct { // Set the following variables before execution + StmtHints // IsDDLJobInQueue is used to mark whether the DDL job is put into the queue. // If IsDDLJobInQueue is true, it means the DDL job is in the queue of storage, and it can be handled by the DDL worker. @@ -144,6 +146,21 @@ type StatementContext struct { Tables []TableEntry } +// StmtHints are SessionVars related sql hints. +type StmtHints struct { + // Hint flags + HasAllowInSubqToJoinAndAggHint bool + HasEnableIndexMergeHint bool + HasMemQuotaHint bool + HasReplicaReadHint bool + + // Hint Infomation + AllowInSubqToJoinAndAgg bool + EnableIndexMerge bool + MemQuotaQuery int64 + ReplicaRead kv.ReplicaReadType +} + // GetNowTsCached getter for nowTs, if not set get now time and cache it func (sc *StatementContext) GetNowTsCached() time.Time { if !sc.stmtTimeCached { From d8bc63c3125e9ddb55e76480ec87e5ecfe67d8f6 Mon Sep 17 00:00:00 2001 From: foreyes Date: Mon, 9 Sep 2019 18:11:17 +0800 Subject: [PATCH 3/8] change session vars fields --- distsql/request_builder.go | 2 +- distsql/request_builder_test.go | 2 +- executor/analyze.go | 6 +-- executor/analyze_test.go | 2 +- executor/point_get.go | 2 +- planner/core/expression_rewriter.go | 2 +- planner/core/indexmerge_test.go | 2 +- planner/core/stats.go | 2 +- session/session.go | 4 +- session/session_test.go | 6 +-- sessionctx/variable/session.go | 71 ++++++++++++++++++++++------ sessionctx/variable/varsutil_test.go | 4 +- util/admin/admin.go | 2 +- 13 files changed, 74 insertions(+), 33 deletions(-) diff --git a/distsql/request_builder.go b/distsql/request_builder.go index f1ed9f77ad637..96f97d9193565 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -154,7 +154,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req builder.Request.IsolationLevel = builder.getIsolationLevel() builder.Request.NotFillCache = sv.StmtCtx.NotFillCache builder.Request.Priority = builder.getKVPriority(sv) - builder.Request.ReplicaRead = sv.ReplicaRead + builder.Request.ReplicaRead = sv.GetReplicaRead() return builder } diff --git a/distsql/request_builder_test.go b/distsql/request_builder_test.go index c73f775096193..2274f8003cf57 100644 --- a/distsql/request_builder_test.go +++ b/distsql/request_builder_test.go @@ -564,7 +564,7 @@ func (s *testSuite) TestRequestBuilder6(c *C) { func (s *testSuite) TestRequestBuilder7(c *C) { vars := variable.NewSessionVars() - vars.ReplicaRead = kv.ReplicaReadFollower + vars.SetReplicaRead(kv.ReplicaReadFollower) concurrency := 10 diff --git a/executor/analyze.go b/executor/analyze.go index a155684377cf8..8212ea11cd3f9 100755 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -624,7 +624,7 @@ func (e *AnalyzeFastExec) getSampRegionsRowCount(bo *tikv.Backoffer, needRebuild var resp *tikvrpc.Response var rpcCtx *tikv.RPCContext // we always use the first follower when follower read is enabled - rpcCtx, *err = e.cache.GetRPCContext(bo, loc.Region, e.ctx.GetSessionVars().ReplicaRead, 0) + rpcCtx, *err = e.cache.GetRPCContext(bo, loc.Region, e.ctx.GetSessionVars().GetReplicaRead(), 0) if *err != nil { return } @@ -925,7 +925,7 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err if err != nil { return 0, err } - if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() { + if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } for _, t := range e.scanTasks { @@ -949,7 +949,7 @@ func (e *AnalyzeFastExec) handleSampTasks(bo *tikv.Backoffer, workID int, err *e if *err != nil { return } - if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() { + if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } rander := rand.New(rand.NewSource(e.randSeed + int64(workID))) diff --git a/executor/analyze_test.go b/executor/analyze_test.go index 70f5617849f3f..71b3ecb47c1d1 100644 --- a/executor/analyze_test.go +++ b/executor/analyze_test.go @@ -112,7 +112,7 @@ func (s *testSuite1) TestAnalyzeReplicaReadFollower(c *C) { tk.MustExec("drop table if exists t") tk.MustExec("create table t(a int)") ctx := tk.Se.(sessionctx.Context) - ctx.GetSessionVars().ReplicaRead = kv.ReplicaReadFollower + ctx.GetSessionVars().SetReplicaRead(kv.ReplicaReadFollower) tk.MustExec("analyze table t") } diff --git a/executor/point_get.go b/executor/point_get.go index 743661297610d..7101542db379b 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -91,7 +91,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { if err != nil { return err } - if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() { + if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } if e.idxInfo != nil { diff --git a/planner/core/expression_rewriter.go b/planner/core/expression_rewriter.go index a2b810c504397..429c68cdd3c9b 100644 --- a/planner/core/expression_rewriter.go +++ b/planner/core/expression_rewriter.go @@ -737,7 +737,7 @@ func (er *expressionRewriter) handleInSubquery(ctx context.Context, v *ast.Patte // and has no correlated column from the current level plan(if the correlated column is from upper level, // we can treat it as constant, because the upper LogicalApply cannot be eliminated since current node is a join node), // and don't need to append a scalar value, we can rewrite it to inner join. - if er.sctx.GetSessionVars().AllowInSubqToJoinAndAgg && !v.Not && !asScalar && len(extractCorColumnsBySchema(np, er.p.Schema())) == 0 { + if er.sctx.GetSessionVars().GetAllowInSubqToJoinAndAgg() && !v.Not && !asScalar && len(extractCorColumnsBySchema(np, er.p.Schema())) == 0 { // We need to try to eliminate the agg and the projection produced by this operation. er.b.optFlag |= flagEliminateAgg er.b.optFlag |= flagEliminateProjection diff --git a/planner/core/indexmerge_test.go b/planner/core/indexmerge_test.go index d6ac524c28fa5..3a03ecf4d03a0 100644 --- a/planner/core/indexmerge_test.go +++ b/planner/core/indexmerge_test.go @@ -130,7 +130,7 @@ func (s *testIndexMergeSuite) TestIndexMergePathGenerateion(c *C) { lp = lp.Children()[0] } } - ds.ctx.GetSessionVars().EnableIndexMerge = true + ds.ctx.GetSessionVars().SetEnableIndexMerge(true) idxMergeStartIndex := len(ds.possibleAccessPaths) _, err = lp.recursiveDeriveStats() c.Assert(err, IsNil) diff --git a/planner/core/stats.go b/planner/core/stats.go index 25d30fbd1fc83..d7f9d78b12856 100644 --- a/planner/core/stats.go +++ b/planner/core/stats.go @@ -149,7 +149,7 @@ func (ds *DataSource) DeriveStats(childStats []*property.StatsInfo) (*property.S } } // Consider the IndexMergePath. Now, we just generate `IndexMergePath` in DNF case. - if len(ds.pushedDownConds) > 0 && len(ds.possibleAccessPaths) > 1 && ds.ctx.GetSessionVars().EnableIndexMerge { + if len(ds.pushedDownConds) > 0 && len(ds.possibleAccessPaths) > 1 && ds.ctx.GetSessionVars().GetEnableIndexMerge() { needConsiderIndexMerge := true for i := 1; i < len(ds.possibleAccessPaths); i++ { if len(ds.possibleAccessPaths[i].accessConds) != 0 { diff --git a/session/session.go b/session/session.go index a21aad4186ee2..8ec887c1c9021 100644 --- a/session/session.go +++ b/session/session.go @@ -1264,7 +1264,7 @@ func (s *session) Txn(active bool) (kv.Transaction, error) { s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true) } s.sessionVars.TxnCtx.CouldRetry = s.isTxnRetryable() - if s.sessionVars.ReplicaRead.IsFollowerRead() { + if s.sessionVars.GetReplicaRead().IsFollowerRead() { s.txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } } @@ -1327,7 +1327,7 @@ func (s *session) NewTxn(ctx context.Context) error { } txn.SetCap(s.getMembufCap()) txn.SetVars(s.sessionVars.KVVars) - if s.GetSessionVars().ReplicaRead.IsFollowerRead() { + if s.GetSessionVars().GetReplicaRead().IsFollowerRead() { txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } s.txn.changeInvalidToValid(txn) diff --git a/session/session_test.go b/session/session_test.go index 3ca8974b56afe..1eaa910b09dc0 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -2802,9 +2802,9 @@ func (s *testSessionSuite) TestReplicaRead(c *C) { tk := testkit.NewTestKit(c, s.store) tk.Se, err = session.CreateSession4Test(s.store) c.Assert(err, IsNil) - c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadLeader) + c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadLeader) tk.MustExec("set @@tidb_replica_read = 'follower';") - c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadFollower) + c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadFollower) tk.MustExec("set @@tidb_replica_read = 'leader';") - c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadLeader) + c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadLeader) } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index d30b11e03b49e..b90701d9c71fc 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -287,9 +287,6 @@ type SessionVars struct { // This variable is currently not recommended to be turned on. AllowWriteRowID bool - // AllowInSubqToJoinAndAgg can be set to false to forbid rewriting the semi join to inner join with agg. - AllowInSubqToJoinAndAgg bool - // CorrelationThreshold is the guard to enable row count estimation using column order correlation. CorrelationThreshold float64 @@ -339,9 +336,6 @@ type SessionVars struct { // EnableWindowFunction enables the window function. EnableWindowFunction bool - // EnableIndexMerge enables the generation of IndexMergePath. - EnableIndexMerge bool - // DDLReorgPriority is the operation priority of adding indices. DDLReorgPriority int @@ -400,8 +394,16 @@ type SessionVars struct { // use noop funcs or not EnableNoopFuncs bool - // ReplicaRead is used for reading data from replicas, only follower is supported at this time. - ReplicaRead kv.ReplicaReadType + // Unexported fields should be accessed and set through interfaces like GetReplicaRead() and SetReplicaRead(). + + // allowInSubqToJoinAndAgg can be set to false to forbid rewriting the semi join to inner join with agg. + allowInSubqToJoinAndAgg bool + + // EnableIndexMerge enables the generation of IndexMergePath. + enableIndexMerge bool + + // replicaRead is used for reading data from replicas, only follower is supported at this time. + replicaRead kv.ReplicaReadType } // ConnectionInfo present connection used by audit. @@ -444,7 +446,7 @@ func NewSessionVars() *SessionVars { RetryLimit: DefTiDBRetryLimit, DisableTxnAutoRetry: DefTiDBDisableTxnAutoRetry, DDLReorgPriority: kv.PriorityLow, - AllowInSubqToJoinAndAgg: DefOptInSubqToJoinAndAgg, + allowInSubqToJoinAndAgg: DefOptInSubqToJoinAndAgg, CorrelationThreshold: DefOptCorrelationThreshold, CorrelationExpFactor: DefOptCorrelationExpFactor, EnableRadixJoin: false, @@ -454,9 +456,9 @@ func NewSessionVars() *SessionVars { SlowQueryFile: config.GetGlobalConfig().Log.SlowQueryFile, WaitSplitRegionFinish: DefTiDBWaitSplitRegionFinish, WaitSplitRegionTimeout: DefWaitSplitRegionTimeout, - EnableIndexMerge: false, + enableIndexMerge: false, EnableNoopFuncs: DefTiDBEnableNoopFuncs, - ReplicaRead: kv.ReplicaReadLeader, + replicaRead: kv.ReplicaReadLeader, } vars.Concurrency = Concurrency{ IndexLookupConcurrency: DefIndexLookupConcurrency, @@ -496,6 +498,45 @@ func NewSessionVars() *SessionVars { return vars } +// GetAllowInSubqToJoinAndAgg get AllowInSubqToJoinAndAgg from sql hints and SessionVars.allowInSubqToJoinAndAgg. +func (s *SessionVars) GetAllowInSubqToJoinAndAgg() bool { + if s.StmtCtx.HasAllowInSubqToJoinAndAggHint { + return s.StmtCtx.AllowInSubqToJoinAndAgg + } + return s.allowInSubqToJoinAndAgg +} + +// SetAllowInSubqToJoinAndAgg set SessionVars.allowInSubqToJoinAndAgg. +func (s *SessionVars) SetAllowInSubqToJoinAndAgg(val bool) { + s.allowInSubqToJoinAndAgg = val +} + +// GetEnableIndexMerge get EnableIndexMerge from sql hints and SessionVars.enableIndexMerge. +func (s *SessionVars) GetEnableIndexMerge() bool { + if s.StmtCtx.HasEnableIndexMergeHint { + return s.StmtCtx.EnableIndexMerge + } + return s.enableIndexMerge +} + +// SetEnableIndexMerge set SessionVars.enableIndexMerge. +func (s *SessionVars) SetEnableIndexMerge(val bool) { + s.enableIndexMerge = val +} + +// GetReplicaRead get ReplicaRead from sql hints and SessionVars.replicaRead. +func (s *SessionVars) GetReplicaRead() kv.ReplicaReadType { + if s.StmtCtx.HasReplicaReadHint { + return s.StmtCtx.ReplicaRead + } + return s.replicaRead +} + +// SetReplicaRead set SessionVars.replicaRead. +func (s *SessionVars) SetReplicaRead(val kv.ReplicaReadType) { + s.replicaRead = val +} + // GetWriteStmtBufs get pointer of SessionVars.writeStmtBufs. func (s *SessionVars) GetWriteStmtBufs() *WriteStmtBufs { return &s.writeStmtBufs @@ -722,7 +763,7 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { case TiDBOptWriteRowID: s.AllowWriteRowID = TiDBOptOn(val) case TiDBOptInSubqToJoinAndAgg: - s.AllowInSubqToJoinAndAgg = TiDBOptOn(val) + s.allowInSubqToJoinAndAgg = TiDBOptOn(val) case TiDBOptCorrelationThreshold: s.CorrelationThreshold = tidbOptFloat64(val, DefOptCorrelationThreshold) case TiDBOptCorrelationExpFactor: @@ -832,14 +873,14 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { case TiDBLowResolutionTSO: s.LowResolutionTSO = TiDBOptOn(val) case TiDBEnableIndexMerge: - s.EnableIndexMerge = TiDBOptOn(val) + s.enableIndexMerge = TiDBOptOn(val) case TiDBEnableNoopFuncs: s.EnableNoopFuncs = TiDBOptOn(val) case TiDBReplicaRead: if strings.EqualFold(val, "follower") { - s.ReplicaRead = kv.ReplicaReadFollower + s.replicaRead = kv.ReplicaReadFollower } else if strings.EqualFold(val, "leader") || len(val) == 0 { - s.ReplicaRead = kv.ReplicaReadLeader + s.replicaRead = kv.ReplicaReadLeader } } s.systems[name] = val diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index d36d9143380ab..98a9de7450d19 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -300,12 +300,12 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) { val, err = GetSessionSystemVar(v, TiDBReplicaRead) c.Assert(err, IsNil) c.Assert(val, Equals, "follower") - c.Assert(v.ReplicaRead, Equals, kv.ReplicaReadFollower) + c.Assert(v.GetReplicaRead(), Equals, kv.ReplicaReadFollower) SetSessionSystemVar(v, TiDBReplicaRead, types.NewStringDatum("leader")) val, err = GetSessionSystemVar(v, TiDBReplicaRead) c.Assert(err, IsNil) c.Assert(val, Equals, "leader") - c.Assert(v.ReplicaRead, Equals, kv.ReplicaReadLeader) + c.Assert(v.GetReplicaRead(), Equals, kv.ReplicaReadLeader) } func (s *testVarsutilSuite) TestSetOverflowBehave(c *C) { diff --git a/util/admin/admin.go b/util/admin/admin.go index a949ec69b17ec..e62cec471cec6 100644 --- a/util/admin/admin.go +++ b/util/admin/admin.go @@ -555,7 +555,7 @@ func ScanSnapshotTableRecord(sessCtx sessionctx.Context, store kv.Storage, ver k return nil, 0, errors.Trace(err) } - if sessCtx.GetSessionVars().ReplicaRead.IsFollowerRead() { + if sessCtx.GetSessionVars().GetReplicaRead().IsFollowerRead() { snap.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } From ad56592c77ad4ad357dbde8e4ac31fcd07c03b8d Mon Sep 17 00:00:00 2001 From: foreyes Date: Mon, 9 Sep 2019 18:55:11 +0800 Subject: [PATCH 4/8] resolve import cycle --- executor/executor.go | 2 +- sessionctx/stmtctx/stmtctx.go | 3 +-- sessionctx/variable/session.go | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index 5bb4fb3d25b80..86d7321143252 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1434,7 +1434,7 @@ func extractStmtHintsFromStmtNode(stmtNode ast.StmtNode) (stmtHints stmtctx.Stmt warns = append(warns, warn) } stmtHints.HasReplicaReadHint = true - stmtHints.ReplicaRead = kv.ReplicaReadFollower + stmtHints.ReplicaRead = byte(kv.ReplicaReadFollower) } return } diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 896edf23b6e16..6205c305ce1ce 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -23,7 +23,6 @@ import ( "github.com/pingcap/parser" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" - "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" "go.uber.org/zap" @@ -151,7 +150,7 @@ type StmtHints struct { AllowInSubqToJoinAndAgg bool EnableIndexMerge bool MemQuotaQuery int64 - ReplicaRead kv.ReplicaReadType + ReplicaRead byte } // GetNowTsCached getter for nowTs, if not set get now time and cache it diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 1f14744770df8..4cfc9683ce0db 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -543,7 +543,7 @@ func (s *SessionVars) SetEnableIndexMerge(val bool) { // GetReplicaRead get ReplicaRead from sql hints and SessionVars.replicaRead. func (s *SessionVars) GetReplicaRead() kv.ReplicaReadType { if s.StmtCtx.HasReplicaReadHint { - return s.StmtCtx.ReplicaRead + return kv.ReplicaReadType(s.StmtCtx.ReplicaRead) } return s.replicaRead } From c057ae46428419522bc8cef8c16157afea48c336 Mon Sep 17 00:00:00 2001 From: foreyes Date: Mon, 9 Sep 2019 18:58:52 +0800 Subject: [PATCH 5/8] fix --- sessionctx/stmtctx/stmtctx.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 6205c305ce1ce..80fc128505978 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -146,7 +146,7 @@ type StmtHints struct { HasMemQuotaHint bool HasReplicaReadHint bool - // Hint Infomation + // Hint Information AllowInSubqToJoinAndAgg bool EnableIndexMerge bool MemQuotaQuery int64 From a2de44c050e5e96ee0e4bbaead360e510c77e7ef Mon Sep 17 00:00:00 2001 From: foreyes Date: Wed, 11 Sep 2019 10:00:31 +0800 Subject: [PATCH 6/8] address comment --- sessionctx/variable/session.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 0f6d5075d4dba..a5f18ac2440b3 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -783,7 +783,7 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { case TiDBOptWriteRowID: s.AllowWriteRowID = TiDBOptOn(val) case TiDBOptInSubqToJoinAndAgg: - s.allowInSubqToJoinAndAgg = TiDBOptOn(val) + s.SetAllowInSubqToJoinAndAgg(TiDBOptOn(val)) case TiDBOptCorrelationThreshold: s.CorrelationThreshold = tidbOptFloat64(val, DefOptCorrelationThreshold) case TiDBOptCorrelationExpFactor: @@ -893,14 +893,14 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { case TiDBLowResolutionTSO: s.LowResolutionTSO = TiDBOptOn(val) case TiDBEnableIndexMerge: - s.enableIndexMerge = TiDBOptOn(val) + s.SetEnableIndexMerge(TiDBOptOn(val)) case TiDBEnableNoopFuncs: s.EnableNoopFuncs = TiDBOptOn(val) case TiDBReplicaRead: if strings.EqualFold(val, "follower") { - s.replicaRead = kv.ReplicaReadFollower + s.SetReplicaRead(kv.ReplicaReadFollower) } else if strings.EqualFold(val, "leader") || len(val) == 0 { - s.replicaRead = kv.ReplicaReadLeader + s.SetReplicaRead(kv.ReplicaReadLeader) } case TiDBAllowRemoveAutoInc: s.AllowRemoveAutoInc = TiDBOptOn(val) From c0d59029d175669404fe480a140dac1c7730e9ab Mon Sep 17 00:00:00 2001 From: foreyes Date: Wed, 11 Sep 2019 11:24:46 +0800 Subject: [PATCH 7/8] fix & add test --- executor/executor.go | 23 +++++++++++++-------- session/session_test.go | 46 +++++++++++++++++++++++++++++++++++++++++ util/memory/tracker.go | 5 +++++ 3 files changed, 65 insertions(+), 9 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index 86d7321143252..3102cef0eb037 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1366,19 +1366,23 @@ func (e *UnionExec) Close() error { return e.baseExecutor.Close() } -func extractStmtHintsFromStmtNode(stmtNode ast.StmtNode) (stmtHints stmtctx.StmtHints, warns []error) { - var hints []*ast.TableOptimizerHint +func extractStmtHintsFromStmtNode(stmtNode ast.StmtNode) []*ast.TableOptimizerHint { switch x := stmtNode.(type) { case *ast.SelectStmt: - hints = x.TableHints + return x.TableHints case *ast.UpdateStmt: - hints = x.TableHints + return x.TableHints case *ast.DeleteStmt: - hints = x.TableHints + return x.TableHints // TODO: support hint for InsertStmt + case *ast.ExplainStmt: + return extractStmtHintsFromStmtNode(x.Stmt) default: - return + return nil } +} + +func handleStmtHints(hints []*ast.TableOptimizerHint) (stmtHints stmtctx.StmtHints, warns []error) { var memoryQuotaHintList, noIndexMergeHintList, useToJAHintList, readReplicaHintList []*ast.TableOptimizerHint for _, hint := range hints { switch hint.HintName.L { @@ -1401,11 +1405,11 @@ func extractStmtHintsFromStmtNode(stmtNode ast.StmtNode) (stmtHints stmtctx.Stmt hint := memoryQuotaHintList[len(memoryQuotaHintList)-1] // Executor use MemoryQuota < 0 to indicate no memory limit, here use it to handle hint syntax error. if hint.MemoryQuota < 0 { - warn := errors.New("There are some syntax error in MEMORY_QUOTA hint, correct usage: MEMORY_QUOTA(10 M) or MEMORY_QUOTA(10 G)") + warn := errors.New("There are some syntax error in MEMORY_QUOTA hint, correct usage: MEMORY_QUOTA(10 MB) or MEMORY_QUOTA(10 GB)") warns = append(warns, warn) } else { stmtHints.HasMemQuotaHint = true - stmtHints.MemQuotaQuery = int64(hint.MemoryQuota) + stmtHints.MemQuotaQuery = hint.MemoryQuota } } // Handle USE_TOJA @@ -1442,7 +1446,8 @@ func extractStmtHintsFromStmtNode(stmtNode ast.StmtNode) (stmtHints stmtctx.Stmt // ResetContextOfStmt resets the StmtContext and session variables. // Before every execution, we must clear statement context. func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { - stmtHints, hintWarns := extractStmtHintsFromStmtNode(s) + hints := extractStmtHintsFromStmtNode(s) + stmtHints, hintWarns := handleStmtHints(hints) vars := ctx.GetSessionVars() memQuota := vars.MemQuotaQuery if stmtHints.HasMemQuotaHint { diff --git a/session/session_test.go b/session/session_test.go index 87ac6c878229b..1457e570cf6eb 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -2808,3 +2808,49 @@ func (s *testSessionSuite) TestReplicaRead(c *C) { tk.MustExec("set @@tidb_replica_read = 'leader';") c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadLeader) } + +func (s *testSessionSuite) TestStmtHints(c *C) { + var err error + tk := testkit.NewTestKit(c, s.store) + tk.Se, err = session.CreateSession4Test(s.store) + c.Assert(err, IsNil) + + // Test MEMORY_QUOTA hint + tk.MustExec("select /*+ MEMORY_QUOTA(1 MB) */ 1;") + val := int64(1) * 1024 * 1024 + c.Assert(tk.Se.GetSessionVars().StmtCtx.MemTracker.CheckBytesLimit(val), IsTrue) + tk.MustExec("select /*+ MEMORY_QUOTA(1 GB) */ 1;") + val = int64(1) * 1024 * 1024 * 1024 + c.Assert(tk.Se.GetSessionVars().StmtCtx.MemTracker.CheckBytesLimit(val), IsTrue) + tk.MustExec("select /*+ MEMORY_QUOTA(1 GB), MEMORY_QUOTA(1 MB) */ 1;") + val = int64(1) * 1024 * 1024 + c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1) + c.Assert(tk.Se.GetSessionVars().StmtCtx.MemTracker.CheckBytesLimit(val), IsTrue) + + // Test NO_INDEX_MERGE hint + tk.Se.GetSessionVars().SetEnableIndexMerge(true) + tk.MustExec("select /*+ NO_INDEX_MERGE() */ 1;") + c.Assert(tk.Se.GetSessionVars().GetEnableIndexMerge(), IsFalse) + tk.MustExec("select /*+ NO_INDEX_MERGE(), NO_INDEX_MERGE() */ 1;") + c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1) + c.Assert(tk.Se.GetSessionVars().GetEnableIndexMerge(), IsFalse) + + // Test USE_TOJA hint + tk.Se.GetSessionVars().SetAllowInSubqToJoinAndAgg(true) + tk.MustExec("select /*+ USE_TOJA(false) */ 1;") + c.Assert(tk.Se.GetSessionVars().GetAllowInSubqToJoinAndAgg(), IsFalse) + tk.Se.GetSessionVars().SetAllowInSubqToJoinAndAgg(false) + tk.MustExec("select /*+ USE_TOJA(true) */ 1;") + c.Assert(tk.Se.GetSessionVars().GetAllowInSubqToJoinAndAgg(), IsTrue) + tk.MustExec("select /*+ USE_TOJA(false), USE_TOJA(true) */ 1;") + c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1) + c.Assert(tk.Se.GetSessionVars().GetAllowInSubqToJoinAndAgg(), IsTrue) + + // Test READ_CONSISTENT_REPLICA hint + tk.Se.GetSessionVars().SetReplicaRead(kv.ReplicaReadLeader) + tk.MustExec("select /*+ READ_CONSISTENT_REPLICA() */ 1;") + c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadFollower) + tk.MustExec("select /*+ READ_CONSISTENT_REPLICA(), READ_CONSISTENT_REPLICA() */ 1;") + c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1) + c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadFollower) +} diff --git a/util/memory/tracker.go b/util/memory/tracker.go index 3b935360f0fce..0a546080a3385 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -62,6 +62,11 @@ func NewTracker(label fmt.Stringer, bytesLimit int64) *Tracker { } } +// CheckBytesLimit check whether the bytes limit of the tracker is equal to a value. +func (t *Tracker) CheckBytesLimit(val int64) bool { + return t.bytesLimit == val +} + // SetActionOnExceed sets the action when memory usage is out of memory quota. func (t *Tracker) SetActionOnExceed(a ActionOnExceed) { t.actionOnExceed = a From 2e4f64904616aa7dc1a4a6ee7e9935c06ba4686e Mon Sep 17 00:00:00 2001 From: foreyes Date: Tue, 17 Sep 2019 11:40:33 +0800 Subject: [PATCH 8/8] address comment & add test --- executor/executor.go | 8 ++++++-- session/session_test.go | 4 ++++ util/memory/tracker.go | 1 + 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index 3102cef0eb037..7be328e873ea8 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1403,13 +1403,17 @@ func handleStmtHints(hints []*ast.TableOptimizerHint) (stmtHints stmtctx.StmtHin warns = append(warns, warn) } hint := memoryQuotaHintList[len(memoryQuotaHintList)-1] - // Executor use MemoryQuota < 0 to indicate no memory limit, here use it to handle hint syntax error. + // Executor use MemoryQuota <= 0 to indicate no memory limit, here use < 0 to handle hint syntax error. if hint.MemoryQuota < 0 { - warn := errors.New("There are some syntax error in MEMORY_QUOTA hint, correct usage: MEMORY_QUOTA(10 MB) or MEMORY_QUOTA(10 GB)") + warn := errors.New("The use of MEMORY_QUOTA hint is invalid, valid usage: MEMORY_QUOTA(10 MB) or MEMORY_QUOTA(10 GB)") warns = append(warns, warn) } else { stmtHints.HasMemQuotaHint = true stmtHints.MemQuotaQuery = hint.MemoryQuota + if hint.MemoryQuota == 0 { + warn := errors.New("Setting the MEMORY_QUOTA to 0 means no memory limit") + warns = append(warns, warn) + } } } // Handle USE_TOJA diff --git a/session/session_test.go b/session/session_test.go index e7f2ede391ab5..1ed66bc984dae 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -2832,6 +2832,10 @@ func (s *testSessionSuite) TestStmtHints(c *C) { val = int64(1) * 1024 * 1024 c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1) c.Assert(tk.Se.GetSessionVars().StmtCtx.MemTracker.CheckBytesLimit(val), IsTrue) + tk.MustExec("select /*+ MEMORY_QUOTA(0 GB) */ 1;") + val = int64(0) + c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1) + c.Assert(tk.Se.GetSessionVars().StmtCtx.MemTracker.CheckBytesLimit(val), IsTrue) // Test NO_INDEX_MERGE hint tk.Se.GetSessionVars().SetEnableIndexMerge(true) diff --git a/util/memory/tracker.go b/util/memory/tracker.go index b797c1dbdb678..9ffda11790391 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -63,6 +63,7 @@ func NewTracker(label fmt.Stringer, bytesLimit int64) *Tracker { } // CheckBytesLimit check whether the bytes limit of the tracker is equal to a value. +// Only used in test. func (t *Tracker) CheckBytesLimit(val int64) bool { return t.bytesLimit == val }