Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: refine explain analyze #7888

Merged
merged 7 commits into from
Oct 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,9 +502,9 @@ func (w *HashAggFinalWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGro

// Next implements the Executor Next interface.
func (e *HashAggExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.isUnparallelExec {
Expand Down Expand Up @@ -761,9 +761,9 @@ func (e *StreamAggExec) Close() error {

// Next implements the Executor Next interface.
func (e *StreamAggExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
for !e.executed && chk.NumRows() < e.maxChunkSize {
Expand Down
38 changes: 6 additions & 32 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,41 +659,15 @@ func (b *executorBuilder) buildTrace(v *plannercore.Trace) Executor {

// buildExplain builds a explain executor. `e.rows` collects final result to `ExplainExec`.
func (b *executorBuilder) buildExplain(v *plannercore.Explain) Executor {
if v.Analyze {
stmt := &ExecStmt{
InfoSchema: GetInfoSchema(b.ctx),
Plan: v.ExecPlan,
StmtNode: v.ExecStmt,
Ctx: b.ctx,
}
b.ctx.GetSessionVars().StmtCtx.RuntimeStats = execdetails.NewRuntimeStats()
ctx := context.Background()
rs, err := stmt.Exec(ctx)
if err != nil {
return nil
}
if rs != nil {
chk := rs.NewChunk()
for {
err := rs.Next(ctx, chk)
if err != nil {
return nil
}
if chk.NumRows() == 0 {
break
}
}
}
}
v.PrepareRows()
e := &ExplainExec{
explainExec := &ExplainExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
explain: v,
}
e.rows = make([][]string, 0, len(v.Rows))
for _, row := range v.Rows {
e.rows = append(e.rows, row)
if v.Analyze {
b.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl()
explainExec.analyzeExec = b.build(v.ExecPlan)
}
return e
return explainExec
}

func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) Executor {
Expand Down
17 changes: 10 additions & 7 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,9 @@ func (e *IndexReaderExecutor) Close() error {

// Next implements the Executor Next interface.
func (e *IndexReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
err := e.result.Next(ctx, chk)
if err != nil {
Expand Down Expand Up @@ -458,7 +458,6 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-chan *lookupTableTask) {
lookupConcurrencyLimit := e.ctx.GetSessionVars().IndexLookupConcurrency
e.tblWorkerWg.Add(lookupConcurrencyLimit)
e.baseExecutor.ctx.GetSessionVars().StmtCtx.RuntimeStats.GetRuntimeStat(e.id + "_tableReader")
for i := 0; i < lookupConcurrencyLimit; i++ {
worker := &tableWorker{
workCh: workCh,
Expand All @@ -480,7 +479,7 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha
}

func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, handles []int64) (Executor, error) {
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, &TableReaderExecutor{
tableReaderExec := &TableReaderExecutor{
baseExecutor: newBaseExecutor(e.ctx, e.schema, e.id+"_tableReader"),
table: e.table,
physicalTableID: e.physicalTableID,
Expand All @@ -489,7 +488,11 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, handles []in
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
corColInFilter: e.corColInTblSide,
plans: e.tblPlans,
}, handles)
}
// We assign `nil` to `runtimeStats` to forbidden `TableWorker` driven `IndexLookupExecutor`'s runtime stats collecting,
// because TableWorker information isn't showing in explain result now.
tableReaderExec.runtimeStats = nil
lysu marked this conversation as resolved.
Show resolved Hide resolved
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, handles)
if err != nil {
log.Error(err)
return nil, errors.Trace(err)
Expand Down Expand Up @@ -518,9 +521,9 @@ func (e *IndexLookUpExecutor) Close() error {

// Next implements Exec Next interface.
func (e *IndexLookUpExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
for {
Expand Down
34 changes: 18 additions & 16 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type baseExecutor struct {
maxChunkSize int
children []Executor
retFieldTypes []*types.FieldType
runtimeStat *execdetails.RuntimeStat
runtimeStats *execdetails.RuntimeStats
}

// Open initializes children recursively and "childrenResults" according to children's schemas.
Expand Down Expand Up @@ -134,7 +134,9 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id strin
schema: schema,
initCap: ctx.GetSessionVars().MaxChunkSize,
maxChunkSize: ctx.GetSessionVars().MaxChunkSize,
runtimeStat: ctx.GetSessionVars().StmtCtx.RuntimeStats.GetRuntimeStat(id),
}
if ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
e.runtimeStats = e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.Get(e.id)
}
if schema != nil {
cols := schema.Columns
Expand Down Expand Up @@ -177,9 +179,9 @@ type CancelDDLJobsExec struct {

// Next implements the Executor Next interface.
func (e *CancelDDLJobsExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)
if e.cursor >= len(e.jobIDs) {
Expand Down Expand Up @@ -623,9 +625,9 @@ type LimitExec struct {

// Next implements the Executor Next interface.
func (e *LimitExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.cursor >= e.end {
Expand Down Expand Up @@ -746,9 +748,9 @@ func (e *TableDualExec) Open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (e *TableDualExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.numReturned >= e.numDualRows {
Expand Down Expand Up @@ -801,9 +803,9 @@ func (e *SelectionExec) Close() error {

// Next implements the Executor Next interface.
func (e *SelectionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)

Expand Down Expand Up @@ -880,9 +882,9 @@ type TableScanExec struct {

// Next implements the Executor Next interface.
func (e *TableScanExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)
if e.isVirtualTable {
Expand Down Expand Up @@ -984,9 +986,9 @@ func (e *MaxOneRowExec) Open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (e *MaxOneRowExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.evaluated {
Expand Down Expand Up @@ -1130,9 +1132,9 @@ func (e *UnionExec) resultPuller(ctx context.Context, childID int) {

// Next implements the Executor Next interface.
func (e *UnionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)
if !e.initialized {
Expand Down
46 changes: 44 additions & 2 deletions executor/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package executor

import (
"github.com/cznic/mathutil"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/util/chunk"
"golang.org/x/net/context"
)
Expand All @@ -23,18 +24,39 @@ import (
type ExplainExec struct {
baseExecutor

rows [][]string
cursor int
explain *core.Explain
analyzeExec Executor
rows [][]string
cursor int
}

// Open implements the Executor Open interface.
func (e *ExplainExec) Open(ctx context.Context) error {
if e.analyzeExec != nil {
return e.analyzeExec.Open(ctx)
}
return nil
}

// Close implements the Executor Close interface.
func (e *ExplainExec) Close() error {
if e.analyzeExec != nil {
e.analyzeExec.Close()
}
e.rows = nil
return nil
}

// Next implements the Executor Next interface.
func (e *ExplainExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.rows == nil {
var err error
e.rows, err = e.generateExplainInfo(ctx)
if err != nil {
return err
}
}

chk.GrowAndReset(e.maxChunkSize)
if e.cursor >= len(e.rows) {
return nil
Expand All @@ -49,3 +71,23 @@ func (e *ExplainExec) Next(ctx context.Context, chk *chunk.Chunk) error {
e.cursor += numCurRows
return nil
}

func (e *ExplainExec) generateExplainInfo(ctx context.Context) ([][]string, error) {
if e.analyzeExec != nil {
chk := e.analyzeExec.newFirstChunk()
for {
err := e.analyzeExec.Next(ctx, chk)
if err != nil {
return nil, err
}
if chk.NumRows() == 0 {
break
}
}
}
e.explain.RenderResult()
if e.analyzeExec != nil {
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = nil
}
return e.explain.Rows, nil
}
4 changes: 2 additions & 2 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,9 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork

// Next implements the Executor interface.
func (e *IndexLookUpJoin) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
e.joinResult.Reset()
Expand Down
8 changes: 4 additions & 4 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,9 +509,9 @@ func (e *HashJoinExec) join2Chunk(workerID uint, outerChk *chunk.Chunk, joinResu
// step 1. fetch data from inner child and build a hash table;
// step 2. fetch data from outer child in a background goroutine and probe the hash table in multiple join workers.
func (e *HashJoinExec) Next(ctx context.Context, chk *chunk.Chunk) (err error) {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
if !e.prepared {
e.innerFinished = make(chan error, 1)
Expand Down Expand Up @@ -726,9 +726,9 @@ func (e *NestedLoopApplyExec) fetchAllInners(ctx context.Context) error {

// Next implements the Executor interface.
func (e *NestedLoopApplyExec) Next(ctx context.Context, chk *chunk.Chunk) (err error) {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
for {
Expand Down
4 changes: 2 additions & 2 deletions executor/merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,9 @@ func (e *MergeJoinExec) prepare(ctx context.Context, chk *chunk.Chunk) error {

// Next implements the Executor Next interface.
func (e *MergeJoinExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if !e.prepared {
Expand Down
4 changes: 2 additions & 2 deletions executor/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@ func (e *ProjectionExec) Open(ctx context.Context) error {
// +------------------------------+ +----------------------+
//
func (e *ProjectionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)
if e.isUnparallelExec() {
Expand Down
8 changes: 4 additions & 4 deletions executor/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ func (e *SortExec) Open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (e *SortExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if !e.fetched {
Expand Down Expand Up @@ -301,9 +301,9 @@ func (e *TopNExec) Open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (e *TopNExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if !e.fetched {
Expand Down
4 changes: 2 additions & 2 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error {
// Next fills data into the chunk passed by its caller.
// The task was actually done by tableReaderHandler.
func (e *TableReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStat != nil {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStat.Record(time.Now().Sub(start), chk.NumRows()) }()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
if err := e.resultHandler.nextChunk(ctx, chk); err != nil {
e.feedback.Invalidate()
Expand Down
Loading