Skip to content

Commit

Permalink
Merge branch 'mx/preventDataAddAndSubStringXXXPushDownToTiflash' of g…
Browse files Browse the repository at this point in the history
…ithub.com:mengxin9014/tidb into mx/preventDataAddAndSubStringXXXPushDownToTiflash
  • Loading branch information
mengxin9014 committed Nov 25, 2021
2 parents ca9a024 + ddc9e90 commit 5977312
Show file tree
Hide file tree
Showing 16 changed files with 170 additions and 68 deletions.
11 changes: 6 additions & 5 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,12 +975,13 @@ func (b *executorBuilder) buildDDL(v *plannercore.DDL) Executor {
// at build().
func (b *executorBuilder) buildTrace(v *plannercore.Trace) Executor {
t := &TraceExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
stmtNode: v.StmtNode,
builder: b,
format: v.Format,
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
stmtNode: v.StmtNode,
builder: b,
format: v.Format,
optimizerTrace: v.OptimizerTrace,
}
if t.format == plannercore.TraceFormatLog {
if t.format == plannercore.TraceFormatLog && !t.optimizerTrace {
return &SortExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), t),
ByItems: []*plannerutil.ByItems{
Expand Down
1 change: 0 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1689,7 +1689,6 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
sc.CTEStorageMap = map[int]*CTEStorages{}
sc.IsStaleness = false
sc.LockTableIDs = make(map[int64]struct{})
sc.EnableOptimizeTrace = false
sc.LogicalOptimizeTrace = nil

sc.InitMemTracker(memory.LabelForSQLText, vars.MemQuotaQuery)
Expand Down
78 changes: 78 additions & 0 deletions executor/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@
package executor

import (
"archive/zip"
"context"
"crypto/rand"
"encoding/base64"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"time"

"github.com/opentracing/basictracer-go"
Expand Down Expand Up @@ -51,6 +57,8 @@ type TraceExec struct {

builder *executorBuilder
format string
// optimizerTrace indicates 'trace plan statement'
optimizerTrace bool
}

// Next executes real query and collects span later.
Expand All @@ -71,6 +79,10 @@ func (e *TraceExec) Next(ctx context.Context, req *chunk.Chunk) error {
e.ctx.GetSessionVars().StmtCtx = stmtCtx
}()

if e.optimizerTrace {
return e.nextOptimizerPlanTrace(ctx, e.ctx, req)
}

switch e.format {
case core.TraceFormatLog:
return e.nextTraceLog(ctx, se, req)
Expand All @@ -79,6 +91,40 @@ func (e *TraceExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
}

func (e *TraceExec) nextOptimizerPlanTrace(ctx context.Context, se sessionctx.Context, req *chunk.Chunk) error {
zf, fileName, err := generateOptimizerTraceFile()
if err != nil {
return err
}
zw := zip.NewWriter(zf)
defer func() {
err := zw.Close()
if err != nil {
logutil.BgLogger().Warn("Closing zip writer failed", zap.Error(err))
}
err = zf.Close()
if err != nil {
logutil.BgLogger().Warn("Closing zip file failed", zap.Error(err))
}
}()
traceZW, err := zw.Create("trace.json")
if err != nil {
return errors.AddStack(err)
}
e.executeChild(ctx, se.(sqlexec.SQLExecutor))
res, err := json.Marshal(se.GetSessionVars().StmtCtx.LogicalOptimizeTrace)
if err != nil {
return errors.AddStack(err)
}
_, err = traceZW.Write(res)
if err != nil {
return errors.AddStack(err)
}
req.AppendString(0, fileName)
e.exhausted = true
return nil
}

func (e *TraceExec) nextTraceLog(ctx context.Context, se sqlexec.SQLExecutor, req *chunk.Chunk) error {
recorder := basictracer.NewInMemoryRecorder()
tracer := basictracer.New(recorder)
Expand Down Expand Up @@ -142,8 +188,11 @@ func (e *TraceExec) executeChild(ctx context.Context, se sqlexec.SQLExecutor) {
vars := e.ctx.GetSessionVars()
origin := vars.InRestrictedSQL
vars.InRestrictedSQL = true
originOptimizeTrace := vars.EnableStmtOptimizeTrace
vars.EnableStmtOptimizeTrace = e.optimizerTrace
defer func() {
vars.InRestrictedSQL = origin
vars.EnableStmtOptimizeTrace = originOptimizeTrace
}()
rs, err := se.ExecuteStmt(ctx, e.stmtNode)
if err != nil {
Expand Down Expand Up @@ -252,3 +301,32 @@ func generateLogResult(allSpans []basictracer.RawSpan, chk *chunk.Chunk) {
}
}
}

func generateOptimizerTraceFile() (*os.File, string, error) {
dirPath := getOptimizerTraceDirName()
// Create path
err := os.MkdirAll(dirPath, os.ModePerm)
if err != nil {
return nil, "", errors.AddStack(err)
}
// Generate key and create zip file
time := time.Now().UnixNano()
b := make([]byte, 16)
_, err = rand.Read(b)
if err != nil {
return nil, "", errors.AddStack(err)
}
key := base64.URLEncoding.EncodeToString(b)
fileName := fmt.Sprintf("optimizer_trace_%v_%v.zip", key, time)
zf, err := os.Create(filepath.Join(dirPath, fileName))
if err != nil {
return nil, "", errors.AddStack(err)
}
return zf, fileName, nil
}

// getOptimizerTraceDirName returns optimizer trace directory path.
// The path is related to the process id.
func getOptimizerTraceDirName() string {
return filepath.Join(os.TempDir(), "optimizer_trace", strconv.Itoa(os.Getpid()))
}
10 changes: 10 additions & 0 deletions executor/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,13 @@ func rowsOrdered(rows [][]interface{}) bool {
}
return true
}

func (s *testSuite1) TestTracePlanStmt(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table tp123(id int);")
rows := tk.MustQuery("trace plan select * from tp123").Rows()
c.Assert(rows, HasLen, 1)
c.Assert(rows[0], HasLen, 1)
c.Assert(rows[0][0].(string), Matches, ".*zip")
}
5 changes: 4 additions & 1 deletion expression/builtin_compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -1461,11 +1461,14 @@ func (c *compareFunctionClass) refineArgs(ctx sessionctx.Context, args []Express
// To keep the result be compatible with MySQL, refine `int non-constant <cmp> str constant`
// here and skip this refine operation in all other cases for safety.
if (arg0IsInt && !arg0IsCon && arg1IsString && arg1IsCon) || (arg1IsInt && !arg1IsCon && arg0IsString && arg0IsCon) {
ctx.GetSessionVars().StmtCtx.MaybeOverOptimized4PlanCache = true
ctx.GetSessionVars().StmtCtx.SkipPlanCache = true
RemoveMutableConst(ctx, args)
} else {
return args
}
} else if ctx.GetSessionVars().StmtCtx.SkipPlanCache {
// We should remove the mutable constant for correctness, because its value may be changed.
RemoveMutableConst(ctx, args)
}
// int non-constant [cmp] non-int constant
if arg0IsInt && !arg0IsCon && !arg1IsInt && arg1IsCon {
Expand Down
7 changes: 1 addition & 6 deletions expression/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,12 +920,7 @@ func ContainCorrelatedColumn(exprs []Expression) bool {
// TODO: Do more careful check here.
func MaybeOverOptimized4PlanCache(ctx sessionctx.Context, exprs []Expression) bool {
// If we do not enable plan cache, all the optimization can work correctly.
if !ctx.GetSessionVars().StmtCtx.UseCache {
return false
}
if ctx.GetSessionVars().StmtCtx.MaybeOverOptimized4PlanCache {
// If the current statement can not be cached. We should remove the mutable constant.
RemoveMutableConst(ctx, exprs)
if !ctx.GetSessionVars().StmtCtx.UseCache || ctx.GetSessionVars().StmtCtx.SkipPlanCache {
return false
}
return containMutableConst(ctx, exprs)
Expand Down
4 changes: 2 additions & 2 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ REBUILD:
e.names = names
e.Plan = p
_, isTableDual := p.(*PhysicalTableDual)
if !isTableDual && prepared.UseCache && !stmtCtx.MaybeOverOptimized4PlanCache {
if !isTableDual && prepared.UseCache && !stmtCtx.SkipPlanCache {
// rebuild key to exclude kv.TiFlash when stmt is not read only
if _, isolationReadContainTiFlash := sessVars.IsolationReadEngines[kv.TiFlash]; isolationReadContainTiFlash && !IsReadOnly(stmt, sessVars) {
delete(sessVars.IsolationReadEngines, kv.TiFlash)
Expand Down Expand Up @@ -531,7 +531,7 @@ REBUILD:
// short paths for these executions, currently "point select" and "point update"
func (e *Execute) tryCachePointPlan(ctx context.Context, sctx sessionctx.Context,
preparedStmt *CachedPrepareStmt, is infoschema.InfoSchema, p Plan) error {
if sctx.GetSessionVars().StmtCtx.MaybeOverOptimized4PlanCache {
if sctx.GetSessionVars().StmtCtx.SkipPlanCache {
return nil
}
var (
Expand Down
5 changes: 4 additions & 1 deletion planner/core/expression_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1454,11 +1454,14 @@ func (er *expressionRewriter) inToExpression(lLen int, not bool, tp *types.Field
if c.GetType().EvalType() == types.ETString {
// To keep the result be compatible with MySQL, refine `int non-constant <cmp> str constant`
// here and skip this refine operation in all other cases for safety.
er.sctx.GetSessionVars().StmtCtx.MaybeOverOptimized4PlanCache = true
er.sctx.GetSessionVars().StmtCtx.SkipPlanCache = true
expression.RemoveMutableConst(er.sctx, []expression.Expression{c})
} else {
continue
}
} else if er.sctx.GetSessionVars().StmtCtx.SkipPlanCache {
// We should remove the mutable constant for correctness, because its value may be changed.
expression.RemoveMutableConst(er.sctx, []expression.Expression{c})
}
args[i], isExceptional = expression.RefineComparedConstant(er.sctx, *leftFt, c, opcode.EQ)
if isExceptional {
Expand Down
2 changes: 1 addition & 1 deletion planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ func (ds *DataSource) findBestTask(prop *property.PhysicalProperty, planCounter
if len(path.Ranges) == 0 {
// We should uncache the tableDual plan.
if expression.MaybeOverOptimized4PlanCache(ds.ctx, path.AccessConds) {
ds.ctx.GetSessionVars().StmtCtx.MaybeOverOptimized4PlanCache = true
ds.ctx.GetSessionVars().StmtCtx.SkipPlanCache = true
}
dual := PhysicalTableDual{}.Init(ds.ctx, ds.stats, ds.blockOffset)
dual.SetSchema(ds.schema)
Expand Down
2 changes: 1 addition & 1 deletion planner/core/logical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2107,7 +2107,7 @@ func (s *testPlanSuite) TestLogicalOptimizeWithTraceEnabled(c *C) {
err = Preprocess(s.ctx, stmt, WithPreprocessorReturn(&PreprocessorReturn{InfoSchema: s.is}))
c.Assert(err, IsNil, comment)
sctx := MockContext()
sctx.GetSessionVars().StmtCtx.EnableOptimizeTrace = true
sctx.GetSessionVars().EnableStmtOptimizeTrace = true
builder, _ := NewPlanBuilder().Init(sctx, s.is, &hint.BlockHintProcessor{})
domain.GetDomain(sctx).MockInfoCacheAndLoadInfoSchema(s.is)
ctx := context.TODO()
Expand Down
14 changes: 7 additions & 7 deletions planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,11 @@ func (op *logicalOptimizeOp) withEnableOptimizeTracer(tracer *tracing.LogicalOpt
return op
}

func (op *logicalOptimizeOp) appendBeforeRuleOptimize(name string, before LogicalPlan) {
func (op *logicalOptimizeOp) appendBeforeRuleOptimize(index int, name string, before LogicalPlan) {
if op.tracer == nil {
return
}
op.tracer.AppendRuleTracerBeforeRuleOptimize(name, before.buildLogicalPlanTrace())
op.tracer.AppendRuleTracerBeforeRuleOptimize(index, name, before.buildLogicalPlanTrace())
}

func (op *logicalOptimizeOp) appendStepToCurrent(id int, tp, reason, action string) {
Expand Down Expand Up @@ -321,7 +321,7 @@ func postOptimize(sctx sessionctx.Context, plan PhysicalPlan) PhysicalPlan {
// Todo: make more careful check here.
func checkPlanCacheable(sctx sessionctx.Context, plan PhysicalPlan) {
if sctx.GetSessionVars().StmtCtx.UseCache && useTiFlash(plan) {
sctx.GetSessionVars().StmtCtx.MaybeOverOptimized4PlanCache = true
sctx.GetSessionVars().StmtCtx.SkipPlanCache = true
}
}

Expand Down Expand Up @@ -377,12 +377,12 @@ func enableParallelApply(sctx sessionctx.Context, plan PhysicalPlan) PhysicalPla

func logicalOptimize(ctx context.Context, flag uint64, logic LogicalPlan) (LogicalPlan, error) {
opt := defaultLogicalOptimizeOption()
stmtCtx := logic.SCtx().GetSessionVars().StmtCtx
if stmtCtx.EnableOptimizeTrace {
vars := logic.SCtx().GetSessionVars()
if vars.EnableStmtOptimizeTrace {
tracer := &tracing.LogicalOptimizeTracer{Steps: make([]*tracing.LogicalRuleOptimizeTracer, 0)}
opt = opt.withEnableOptimizeTracer(tracer)
defer func() {
stmtCtx.LogicalOptimizeTrace = tracer
vars.StmtCtx.LogicalOptimizeTrace = tracer
}()
}
var err error
Expand All @@ -393,7 +393,7 @@ func logicalOptimize(ctx context.Context, flag uint64, logic LogicalPlan) (Logic
if flag&(1<<uint(i)) == 0 || isLogicalRuleDisabled(rule) {
continue
}
opt.appendBeforeRuleOptimize(rule.name(), logic)
opt.appendBeforeRuleOptimize(i, rule.name(), logic)
logic, err = rule.optimize(ctx, logic, opt)
if err != nil {
return nil, err
Expand Down
12 changes: 10 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4048,7 +4048,15 @@ const (
// underlying query and then constructs a schema, which will be used to constructs
// rows result.
func (b *PlanBuilder) buildTrace(trace *ast.TraceStmt) (Plan, error) {
p := &Trace{StmtNode: trace.Stmt, Format: trace.Format}
p := &Trace{StmtNode: trace.Stmt, Format: trace.Format, OptimizerTrace: trace.TracePlan}
// TODO: forbid trace plan if the statement isn't select read-only statement
if trace.TracePlan {
schema := newColumnsWithNames(1)
schema.Append(buildColumnWithName("", "Dump_link", mysql.TypeVarchar, 128))
p.SetSchema(schema.col2Schema())
p.names = schema.names
return p, nil
}
switch trace.Format {
case TraceFormatRow:
schema := newColumnsWithNames(3)
Expand Down Expand Up @@ -4380,7 +4388,7 @@ func buildShowSchema(s *ast.ShowStmt, isView bool, isSequence bool) (schema *exp
func (b *PlanBuilder) buildPlanReplayer(pc *ast.PlanReplayerStmt) Plan {
p := &PlanReplayer{ExecStmt: pc.Stmt, Analyze: pc.Analyze, Load: pc.Load, File: pc.File}
schema := newColumnsWithNames(1)
schema.Append(buildColumnWithName("", "Dump_link", mysql.TypeVarchar, 128))
schema.Append(buildColumnWithName("", "File_token", mysql.TypeVarchar, 128))
p.SetSchema(schema.col2Schema())
p.names = schema.names
return p
Expand Down
3 changes: 3 additions & 0 deletions planner/core/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,7 @@ type Trace struct {

StmtNode ast.StmtNode
Format string

// OptimizerTrace indicates `trace plan <statement>` case
OptimizerTrace bool
}
48 changes: 23 additions & 25 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,29 +67,29 @@ type StatementContext struct {

// IsDDLJobInQueue is used to mark whether the DDL job is put into the queue.
// If IsDDLJobInQueue is true, it means the DDL job is in the queue of storage, and it can be handled by the DDL worker.
IsDDLJobInQueue bool
InInsertStmt bool
InUpdateStmt bool
InDeleteStmt bool
InSelectStmt bool
InLoadDataStmt bool
InExplainStmt bool
InCreateOrAlterStmt bool
IgnoreTruncate bool
IgnoreZeroInDate bool
DupKeyAsWarning bool
BadNullAsWarning bool
DividedByZeroAsWarning bool
TruncateAsWarning bool
OverflowAsWarning bool
InShowWarning bool
UseCache bool
BatchCheck bool
InNullRejectCheck bool
AllowInvalidDate bool
IgnoreNoPartition bool
MaybeOverOptimized4PlanCache bool
IgnoreExplainIDSuffix bool
IsDDLJobInQueue bool
InInsertStmt bool
InUpdateStmt bool
InDeleteStmt bool
InSelectStmt bool
InLoadDataStmt bool
InExplainStmt bool
InCreateOrAlterStmt bool
IgnoreTruncate bool
IgnoreZeroInDate bool
DupKeyAsWarning bool
BadNullAsWarning bool
DividedByZeroAsWarning bool
TruncateAsWarning bool
OverflowAsWarning bool
InShowWarning bool
UseCache bool
BatchCheck bool
InNullRejectCheck bool
AllowInvalidDate bool
IgnoreNoPartition bool
SkipPlanCache bool
IgnoreExplainIDSuffix bool
// If the select statement was like 'select * from t as of timestamp ...' or in a stale read transaction
// or is affected by the tidb_read_staleness session variable, then the statement will be makred as isStaleness
// in stmtCtx
Expand Down Expand Up @@ -199,8 +199,6 @@ type StatementContext struct {
// InVerboseExplain indicates the statement is "explain format='verbose' ...".
InVerboseExplain bool

// EnableOptimizeTrace indicates whether the statement is enable optimize trace
EnableOptimizeTrace bool
// LogicalOptimizeTrace indicates the trace for optimize
LogicalOptimizeTrace *tracing.LogicalOptimizeTracer
}
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,8 @@ type SessionVars struct {
data [2]stmtctx.StatementContext
}

// EnableStmtOptimizeTrace indicates whether enable optimizer trace by 'trace plan statement'
EnableStmtOptimizeTrace bool
// Rng stores the rand_seed1 and rand_seed2 for Rand() function
Rng *utilMath.MysqlRng
}
Expand Down
Loading

0 comments on commit 5977312

Please sign in to comment.