Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into fix_6303
Browse files Browse the repository at this point in the history
  • Loading branch information
spongedu committed Apr 22, 2018
2 parents f918013 + fb8efa5 commit 8ef798a
Show file tree
Hide file tree
Showing 11 changed files with 42 additions and 53 deletions.
3 changes: 0 additions & 3 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ func (s *testSuite) TestSelectNormal(c *C) {
SetDAGRequest(&tipb.DAGRequest{}).
SetDesc(false).
SetKeepOrder(false).
SetPriority(kv.PriorityNormal).
SetFromSessionVars(variable.NewSessionVars()).
Build()
c.Assert(err, IsNil)
Expand Down Expand Up @@ -85,7 +84,6 @@ func (s *testSuite) TestSelectStreaming(c *C) {
SetDAGRequest(&tipb.DAGRequest{}).
SetDesc(false).
SetKeepOrder(false).
SetPriority(kv.PriorityNormal).
SetFromSessionVars(variable.NewSessionVars()).
SetStreaming(true).
Build()
Expand Down Expand Up @@ -137,7 +135,6 @@ func (s *testSuite) TestAnalyze(c *C) {
request, err := (&RequestBuilder{}).SetKeyRanges(nil).
SetAnalyzeRequest(&tipb.AnalyzeReq{}).
SetKeepOrder(true).
SetPriority(kv.PriorityLow).
Build()
c.Assert(err, IsNil)

Expand Down
20 changes: 14 additions & 6 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/juju/errors"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
Expand Down Expand Up @@ -138,18 +139,25 @@ func (builder *RequestBuilder) getIsolationLevel(sv *variable.SessionVars) kv.Is
return kv.SI
}

func (builder *RequestBuilder) getKVPriority(sv *variable.SessionVars) int {
switch sv.StmtCtx.Priority {
case mysql.NoPriority, mysql.DelayedPriority:
return kv.PriorityNormal
case mysql.LowPriority:
return kv.PriorityLow
case mysql.HighPriority:
return kv.PriorityHigh
}
return kv.PriorityNormal
}

// SetFromSessionVars sets the following fields for "kv.Request" from session variables:
// "Concurrency", "IsolationLevel", "NotFillCache".
func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *RequestBuilder {
builder.Request.Concurrency = sv.DistSQLScanConcurrency
builder.Request.IsolationLevel = builder.getIsolationLevel(sv)
builder.Request.NotFillCache = sv.StmtCtx.NotFillCache
return builder
}

// SetPriority sets "Priority" for "kv.Request".
func (builder *RequestBuilder) SetPriority(priority int) *RequestBuilder {
builder.Request.Priority = priority
builder.Request.Priority = builder.getKVPriority(sv)
return builder
}

Expand Down
12 changes: 6 additions & 6 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -260,7 +261,6 @@ func (s *testSuite) TestRequestBuilder1(c *C) {
SetDAGRequest(&tipb.DAGRequest{}).
SetDesc(false).
SetKeepOrder(false).
SetPriority(kv.PriorityNormal).
SetFromSessionVars(variable.NewSessionVars()).
Build()
c.Assert(err, IsNil)
Expand Down Expand Up @@ -335,7 +335,6 @@ func (s *testSuite) TestRequestBuilder2(c *C) {
SetDAGRequest(&tipb.DAGRequest{}).
SetDesc(false).
SetKeepOrder(false).
SetPriority(kv.PriorityNormal).
SetFromSessionVars(variable.NewSessionVars()).
Build()
c.Assert(err, IsNil)
Expand Down Expand Up @@ -384,7 +383,6 @@ func (s *testSuite) TestRequestBuilder3(c *C) {
SetDAGRequest(&tipb.DAGRequest{}).
SetDesc(false).
SetKeepOrder(false).
SetPriority(kv.PriorityNormal).
SetFromSessionVars(variable.NewSessionVars()).
Build()
c.Assert(err, IsNil)
Expand Down Expand Up @@ -446,7 +444,6 @@ func (s *testSuite) TestRequestBuilder4(c *C) {
SetDAGRequest(&tipb.DAGRequest{}).
SetDesc(false).
SetKeepOrder(false).
SetPriority(kv.PriorityNormal).
SetStreaming(true).
SetFromSessionVars(variable.NewSessionVars()).
Build()
Expand Down Expand Up @@ -488,10 +485,13 @@ func (s *testSuite) TestRequestBuilder5(c *C) {
},
}

sv := variable.NewSessionVars()
sv.StmtCtx.Priority = mysql.LowPriority
sv.StmtCtx.NotFillCache = true
actual, err := (&RequestBuilder{}).SetKeyRanges(keyRanges).
SetAnalyzeRequest(&tipb.AnalyzeReq{}).
SetKeepOrder(true).
SetPriority(kv.PriorityLow).
SetFromSessionVars(sv).
Build()
c.Assert(err, IsNil)
expect := &kv.Request{
Expand All @@ -501,7 +501,7 @@ func (s *testSuite) TestRequestBuilder5(c *C) {
KeyRanges: keyRanges,
KeepOrder: true,
Desc: false,
Concurrency: 0,
Concurrency: 15,
IsolationLevel: 0,
Priority: 1,
NotFillCache: true,
Expand Down
14 changes: 6 additions & 8 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co

// buildExecutor build a executor from plan, prepared statement may need additional procedure.
func (a *ExecStmt) buildExecutor(ctx sessionctx.Context) (Executor, error) {
priority := kv.PriorityNormal
if _, ok := a.Plan.(*plan.Execute); !ok {
// Do not sync transaction for Execute statement, because the real optimization work is done in
// "ExecuteExec.Build".
Expand All @@ -289,22 +288,21 @@ func (a *ExecStmt) buildExecutor(ctx sessionctx.Context) (Executor, error) {
return nil, errors.Trace(err)
}

if stmtPri := ctx.GetSessionVars().StmtCtx.Priority; stmtPri != mysql.NoPriority {
priority = int(stmtPri)
} else {
stmtCtx := ctx.GetSessionVars().StmtCtx
if stmtPri := stmtCtx.Priority; stmtPri == mysql.NoPriority {
switch {
case isPointGet:
priority = kv.PriorityHigh
stmtCtx.Priority = kv.PriorityHigh
case a.Expensive:
priority = kv.PriorityLow
stmtCtx.Priority = kv.PriorityLow
}
}
}
if _, ok := a.Plan.(*plan.Analyze); ok && ctx.GetSessionVars().InRestrictedSQL {
priority = kv.PriorityLow
ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
}

b := newExecutorBuilder(ctx, a.InfoSchema, priority)
b := newExecutorBuilder(ctx, a.InfoSchema)
e := b.build(a.Plan)
if b.err != nil {
return nil, errors.Trace(b.err)
Expand Down
2 changes: 0 additions & 2 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ func (e *AnalyzeIndexExec) open() error {
kvReq, err := builder.SetIndexRanges(e.ctx.GetSessionVars().StmtCtx, e.tblInfo.ID, e.idxInfo.ID, ranger.FullNewRange()).
SetAnalyzeRequest(e.analyzePB).
SetKeepOrder(true).
SetPriority(e.priority).
Build()
kvReq.Concurrency = e.concurrency
kvReq.IsolationLevel = kv.RC
Expand Down Expand Up @@ -296,7 +295,6 @@ func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.NewRange) (distsql.Selec
kvReq, err := builder.SetTableRanges(e.tblInfo.ID, ranges, nil).
SetAnalyzeRequest(e.analyzePB).
SetKeepOrder(e.keepOrder).
SetPriority(e.priority).
Build()
kvReq.IsolationLevel = kv.RC
kvReq.Concurrency = e.concurrency
Expand Down
21 changes: 6 additions & 15 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,17 @@ import (
// executorBuilder builds an Executor from a Plan.
// The InfoSchema must not change during execution.
type executorBuilder struct {
ctx sessionctx.Context
is infoschema.InfoSchema
priority int
startTS uint64 // cached when the first time getStartTS() is called
ctx sessionctx.Context
is infoschema.InfoSchema
startTS uint64 // cached when the first time getStartTS() is called
// err is set when there is error happened during Executor building process.
err error
}

func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, priority int) *executorBuilder {
func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema) *executorBuilder {
return &executorBuilder{
ctx: ctx,
is: is,
priority: priority,
ctx: ctx,
is: is,
}
}

Expand Down Expand Up @@ -525,7 +523,6 @@ func (b *executorBuilder) buildInsert(v *plan.Insert) Executor {
insert := &InsertExec{
InsertValues: ivs,
OnDuplicate: append(v.OnDuplicate, v.GenCols.OnDuplicates...),
Priority: v.Priority,
IgnoreErr: v.IgnoreErr,
}
return insert
Expand Down Expand Up @@ -1109,7 +1106,6 @@ func (b *executorBuilder) buildAnalyzeIndexPushdown(task plan.AnalyzeIndexTask)
tblInfo: task.TableInfo,
idxInfo: task.IndexInfo,
concurrency: b.ctx.GetSessionVars().IndexSerialScanConcurrency,
priority: b.priority,
analyzePB: &tipb.AnalyzeReq{
Tp: tipb.AnalyzeType_TypeIndex,
StartTs: math.MaxUint64,
Expand Down Expand Up @@ -1143,7 +1139,6 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plan.AnalyzeColumnsTa
colsInfo: task.ColsInfo,
pkInfo: task.PKInfo,
concurrency: b.ctx.GetSessionVars().DistSQLScanConcurrency,
priority: b.priority,
keepOrder: keepOrder,
analyzePB: &tipb.AnalyzeReq{
Tp: tipb.AnalyzeType_TypeColumn,
Expand Down Expand Up @@ -1307,7 +1302,6 @@ func buildNoRangeTableReader(b *executorBuilder, v *plan.PhysicalTableReader) (*
keepOrder: ts.KeepOrder,
desc: ts.Desc,
columns: ts.Columns,
priority: b.priority,
streaming: streaming,
}
if containsLimit(dagReq.Executors) {
Expand Down Expand Up @@ -1353,7 +1347,6 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plan.PhysicalIndexReader) (*
keepOrder: is.KeepOrder,
desc: is.Desc,
columns: is.Columns,
priority: b.priority,
streaming: streaming,
}
if containsLimit(dagReq.Executors) {
Expand Down Expand Up @@ -1410,7 +1403,6 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plan.PhysicalIndexLook
desc: is.Desc,
tableRequest: tableReq,
columns: is.Columns,
priority: b.priority,
indexStreaming: indexStreaming,
tableStreaming: tableStreaming,
dataReaderBuilder: &dataReaderBuilder{executorBuilder: b},
Expand Down Expand Up @@ -1486,7 +1478,6 @@ func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Contex
SetDAGRequest(e.dagPB).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetPriority(e.priority).
SetStreaming(e.streaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
Build()
Expand Down
7 changes: 0 additions & 7 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ type TableReaderExecutor struct {
// resultHandler handles the order of the result. Since (MAXInt64, MAXUint64] stores before [0, MaxInt64] physically
// for unsigned int.
resultHandler *tableResultHandler
priority int
streaming bool
feedback *statistics.QueryFeedback
}
Expand Down Expand Up @@ -263,7 +262,6 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ne
SetDAGRequest(e.dagPB).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetPriority(e.priority).
SetStreaming(e.streaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
Build()
Expand Down Expand Up @@ -344,7 +342,6 @@ type IndexReaderExecutor struct {
result distsql.SelectResult
// columns are only required by union scan.
columns []*model.ColumnInfo
priority int
streaming bool
feedback *statistics.QueryFeedback
}
Expand Down Expand Up @@ -385,7 +382,6 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
SetDAGRequest(e.dagPB).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetPriority(e.priority).
SetStreaming(e.streaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
Build()
Expand Down Expand Up @@ -418,7 +414,6 @@ type IndexLookUpExecutor struct {
tableRequest *tipb.DAGRequest
// columns are only required by union scan.
columns []*model.ColumnInfo
priority int
indexStreaming bool
tableStreaming bool
*dataReaderBuilder
Expand Down Expand Up @@ -485,7 +480,6 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
SetDAGRequest(e.dagPB).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetPriority(e.priority).
SetStreaming(e.indexStreaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
Build()
Expand Down Expand Up @@ -558,7 +552,6 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, handles []in
table: e.table,
tableID: e.tableID,
dagPB: e.tableRequest,
priority: e.priority,
streaming: e.tableStreaming,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
}, handles)
Expand Down
2 changes: 1 addition & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2204,7 +2204,7 @@ func (s *testContextOptionSuite) TestCoprocessorPriority(c *C) {
tk.MustQuery("select count(*) from t")
tk.MustExec("update t set id = 3")
tk.MustExec("delete from t")
tk.MustExec("insert into t values (2)")
tk.MustExec("insert into t select * from t limit 2")
tk.MustExec("delete from t")

// Insert some data to make sure plan build IndexLookup for t.
Expand Down
11 changes: 9 additions & 2 deletions executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/plan"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -201,7 +201,7 @@ func (e *ExecuteExec) Build() error {
if err != nil {
return errors.Trace(err)
}
b := newExecutorBuilder(e.ctx, e.is, kv.PriorityNormal)
b := newExecutorBuilder(e.ctx, e.is)
stmtExec := b.build(e.plan)
if b.err != nil {
return errors.Trace(b.err)
Expand Down Expand Up @@ -281,19 +281,26 @@ func ResetStmtCtx(ctx sessionctx.Context, s ast.StmtNode) {
sc.InUpdateOrDeleteStmt = true
sc.DividedByZeroAsWarning = stmt.IgnoreErr
sc.IgnoreZeroInDate = !sessVars.StrictSQLMode || stmt.IgnoreErr
if stmt.LowPriority {
sc.Priority = mysql.LowPriority
}
case *ast.DeleteStmt:
sc.IgnoreTruncate = false
sc.OverflowAsWarning = false
sc.TruncateAsWarning = !sessVars.StrictSQLMode || stmt.IgnoreErr
sc.InUpdateOrDeleteStmt = true
sc.DividedByZeroAsWarning = stmt.IgnoreErr
sc.IgnoreZeroInDate = !sessVars.StrictSQLMode || stmt.IgnoreErr
if stmt.LowPriority {
sc.Priority = mysql.LowPriority
}
case *ast.InsertStmt:
sc.IgnoreTruncate = false
sc.TruncateAsWarning = !sessVars.StrictSQLMode || stmt.IgnoreErr
sc.InInsertStmt = true
sc.DividedByZeroAsWarning = stmt.IgnoreErr
sc.IgnoreZeroInDate = !sessVars.StrictSQLMode || stmt.IgnoreErr
sc.Priority = stmt.Priority
case *ast.CreateTableStmt, *ast.AlterTableStmt:
// Make sure the sql_mode is strict when checking column default value.
sc.IgnoreTruncate = false
Expand Down
2 changes: 0 additions & 2 deletions plan/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/auth"
Expand Down Expand Up @@ -324,7 +323,6 @@ type Insert struct {
OnDuplicate []*expression.Assignment

IsReplace bool
Priority mysql.PriorityEnum
IgnoreErr bool

// NeedFillDefaultValue is true when expr in value list reference other column.
Expand Down
1 change: 0 additions & 1 deletion plan/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,6 @@ func (b *planBuilder) buildInsert(insert *ast.InsertStmt) Plan {
Columns: insert.Columns,
tableSchema: schema,
IsReplace: insert.IsReplace,
Priority: insert.Priority,
IgnoreErr: insert.IgnoreErr,
}.init(b.ctx)

Expand Down

0 comments on commit 8ef798a

Please sign in to comment.