Skip to content

Commit

Permalink
executor: remove childrenResult from baseExecutor (#7076)
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored Jul 18, 2018
1 parent 5697826 commit 4f16bdd
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 48 deletions.
26 changes: 19 additions & 7 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ type HashAggExec struct {
defaultVal *chunk.Chunk
// isChildReturnEmpty indicates whether the child executor only returns an empty input.
isChildReturnEmpty bool

childResult *chunk.Chunk
}

// HashAggInput indicates the input of hash agg exec.
Expand Down Expand Up @@ -201,6 +203,7 @@ func (d *HashAggIntermData) ToRows(sc *stmtctx.StatementContext, rows []types.Da
// Close implements the Executor Close interface.
func (e *HashAggExec) Close() error {
if e.isUnparallelExec {
e.childResult = nil
e.groupMap = nil
e.groupIterator = nil
e.aggCtxsMap = nil
Expand Down Expand Up @@ -247,6 +250,7 @@ func (e *HashAggExec) initForUnparallelExec() {
e.rowBuffer = make([]types.Datum, 0, e.Schema().Len())
e.groupKey = make([]byte, 0, 8)
e.groupVals = make([][]byte, 0, 8)
e.childResult = e.children[0].newChunk()
}

func (e *HashAggExec) initForParallelExec() {
Expand Down Expand Up @@ -676,14 +680,14 @@ func (e *HashAggExec) unparallelExec(ctx context.Context, chk *chunk.Chunk) erro

// execute fetches Chunks from src and update each aggregate function for each row in Chunk.
func (e *HashAggExec) execute(ctx context.Context) (err error) {
inputIter := chunk.NewIterator4Chunk(e.childrenResults[0])
inputIter := chunk.NewIterator4Chunk(e.childResult)
for {
err := e.children[0].Next(ctx, e.childrenResults[0])
err := e.children[0].Next(ctx, e.childResult)
if err != nil {
return errors.Trace(err)
}
// no more data.
if e.childrenResults[0].NumRows() == 0 {
if e.childResult.NumRows() == 0 {
return nil
}
for row := inputIter.Begin(); row != inputIter.End(); row = inputIter.Next() {
Expand Down Expand Up @@ -765,17 +769,19 @@ type StreamAggExec struct {
newAggFuncs []aggfuncs.AggFunc
partialResults []aggfuncs.PartialResult
groupRows []chunk.Row

childResult *chunk.Chunk
}

// Open implements the Executor Open interface.
func (e *StreamAggExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}

e.childResult = e.children[0].newChunk()
e.executed = false
e.isChildReturnEmpty = true
e.inputIter = chunk.NewIterator4Chunk(e.childrenResults[0])
e.inputIter = chunk.NewIterator4Chunk(e.childResult)
e.inputRow = e.inputIter.End()
e.mutableRow = chunk.MutRowFromTypes(e.retTypes())
e.rowBuffer = make([]types.Datum, 0, e.Schema().Len())
Expand All @@ -795,6 +801,12 @@ func (e *StreamAggExec) Open(ctx context.Context) error {
return nil
}

// Close implements the Executor Close interface.
func (e *StreamAggExec) Close() error {
e.childResult = nil
return errors.Trace(e.baseExecutor.Close())
}

// Next implements the Executor Next interface.
func (e *StreamAggExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
Expand Down Expand Up @@ -876,13 +888,13 @@ func (e *StreamAggExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Ch
}
}

err = e.children[0].Next(ctx, e.childrenResults[0])
err = e.children[0].Next(ctx, e.childResult)
if err != nil {
return errors.Trace(err)
}

// No more data.
if e.childrenResults[0].NumRows() == 0 {
if e.childResult.NumRows() == 0 {
if !e.isChildReturnEmpty {
err = e.appendResult2Chunk(chk)
} else if e.defaultVal != nil {
Expand Down
81 changes: 49 additions & 32 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,12 @@ var (
)

type baseExecutor struct {
ctx sessionctx.Context
id string
schema *expression.Schema
maxChunkSize int
children []Executor
childrenResults []*chunk.Chunk
retFieldTypes []*types.FieldType
ctx sessionctx.Context
id string
schema *expression.Schema
maxChunkSize int
children []Executor
retFieldTypes []*types.FieldType
}

// Open initializes children recursively and "childrenResults" according to children's schemas.
Expand All @@ -80,10 +79,6 @@ func (e *baseExecutor) Open(ctx context.Context) error {
return errors.Trace(err)
}
}
e.childrenResults = make([]*chunk.Chunk, 0, len(e.children))
for _, child := range e.children {
e.childrenResults = append(e.childrenResults, child.newChunk())
}
return nil
}

Expand All @@ -95,7 +90,6 @@ func (e *baseExecutor) Close() error {
return errors.Trace(err)
}
}
e.childrenResults = nil
return nil
}

Expand Down Expand Up @@ -512,6 +506,8 @@ type LimitExec struct {

// meetFirstBatch represents whether we have met the first valid Chunk from child.
meetFirstBatch bool

childResult *chunk.Chunk
}

// Next implements the Executor Next interface.
Expand All @@ -521,11 +517,11 @@ func (e *LimitExec) Next(ctx context.Context, chk *chunk.Chunk) error {
return nil
}
for !e.meetFirstBatch {
err := e.children[0].Next(ctx, e.childrenResults[0])
err := e.children[0].Next(ctx, e.childResult)
if err != nil {
return errors.Trace(err)
}
batchSize := uint64(e.childrenResults[0].NumRows())
batchSize := uint64(e.childResult.NumRows())
// no more data.
if batchSize == 0 {
return nil
Expand All @@ -540,7 +536,7 @@ func (e *LimitExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if begin == end {
break
}
chk.Append(e.childrenResults[0], int(begin), int(end))
chk.Append(e.childResult, int(begin), int(end))
return nil
}
e.cursor += batchSize
Expand All @@ -567,11 +563,18 @@ func (e *LimitExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
e.childResult = e.children[0].newChunk()
e.cursor = 0
e.meetFirstBatch = e.begin == 0
return nil
}

// Close implements the Executor Close interface.
func (e *LimitExec) Close() error {
e.childResult = nil
return errors.Trace(e.baseExecutor.Close())
}

func init() {
// While doing optimization in the plan package, we need to execute uncorrelated subquery,
// but the plan package cannot import the executor package because of the dependency cycle.
Expand Down Expand Up @@ -646,34 +649,34 @@ func (e *TableDualExec) Next(ctx context.Context, chk *chunk.Chunk) error {
type SelectionExec struct {
baseExecutor

batched bool
filters []expression.Expression
selected []bool
inputIter *chunk.Iterator4Chunk
inputRow chunk.Row
batched bool
filters []expression.Expression
selected []bool
inputIter *chunk.Iterator4Chunk
inputRow chunk.Row
childResult *chunk.Chunk
}

// Open implements the Executor Open interface.
func (e *SelectionExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
e.childResult = e.children[0].newChunk()
e.batched = expression.Vectorizable(e.filters)
if e.batched {
e.selected = make([]bool, 0, chunk.InitialCapacity)
}
e.inputIter = chunk.NewIterator4Chunk(e.childrenResults[0])
e.inputIter = chunk.NewIterator4Chunk(e.childResult)
e.inputRow = e.inputIter.End()
return nil
}

// Close implements plan.Plan Close interface.
func (e *SelectionExec) Close() error {
if err := e.baseExecutor.Close(); err != nil {
return errors.Trace(err)
}
e.childResult = nil
e.selected = nil
return nil
return errors.Trace(e.baseExecutor.Close())
}

// Next implements the Executor Next interface.
Expand All @@ -694,12 +697,12 @@ func (e *SelectionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
}
chk.AppendRow(e.inputRow)
}
err := e.children[0].Next(ctx, e.childrenResults[0])
err := e.children[0].Next(ctx, e.childResult)
if err != nil {
return errors.Trace(err)
}
// no more data.
if e.childrenResults[0].NumRows() == 0 {
if e.childResult.NumRows() == 0 {
return nil
}
e.selected, err = expression.VectorizedFilter(e.ctx, e.filters, e.inputIter, e.selected)
Expand All @@ -726,13 +729,13 @@ func (e *SelectionExec) unBatchedNext(ctx context.Context, chk *chunk.Chunk) err
return nil
}
}
err := e.children[0].Next(ctx, e.childrenResults[0])
err := e.children[0].Next(ctx, e.childResult)
if err != nil {
return errors.Trace(err)
}
e.inputRow = e.inputIter.Begin()
// no more data.
if e.childrenResults[0].NumRows() == 0 {
if e.childResult.NumRows() == 0 {
return nil
}
}
Expand Down Expand Up @@ -838,14 +841,16 @@ func (e *TableScanExec) Open(ctx context.Context) error {
type ExistsExec struct {
baseExecutor

evaluated bool
evaluated bool
childResult *chunk.Chunk
}

// Open implements the Executor Open interface.
func (e *ExistsExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
e.childResult = e.children[0].newChunk()
e.evaluated = false
return nil
}
Expand All @@ -855,11 +860,11 @@ func (e *ExistsExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if !e.evaluated {
e.evaluated = true
err := e.children[0].Next(ctx, e.childrenResults[0])
err := e.children[0].Next(ctx, e.childResult)
if err != nil {
return errors.Trace(err)
}
if e.childrenResults[0].NumRows() > 0 {
if e.childResult.NumRows() > 0 {
chk.AppendInt64(0, 1)
} else {
chk.AppendInt64(0, 0)
Expand All @@ -868,6 +873,12 @@ func (e *ExistsExec) Next(ctx context.Context, chk *chunk.Chunk) error {
return nil
}

// Close implements the Executor Close interface.
func (e *ExistsExec) Close() error {
e.childResult = nil
return errors.Trace(e.baseExecutor.Close())
}

// MaxOneRowExec checks if the number of rows that a query returns is at maximum one.
// It's built from subquery expression.
type MaxOneRowExec struct {
Expand Down Expand Up @@ -937,6 +948,8 @@ type UnionExec struct {
resourcePools []chan *chunk.Chunk
resultPool chan *unionWorkerResult
initialized bool

childrenResults []*chunk.Chunk
}

// unionWorkerResult stores the result for a union worker.
Expand All @@ -959,6 +972,9 @@ func (e *UnionExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
for _, child := range e.children {
e.childrenResults = append(e.childrenResults, child.newChunk())
}
e.stopFetchData.Store(false)
e.initialized = false
e.finished = make(chan struct{})
Expand Down Expand Up @@ -1039,6 +1055,7 @@ func (e *UnionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
// Close implements the Executor Close interface.
func (e *UnionExec) Close() error {
close(e.finished)
e.childrenResults = nil
if e.resultPool != nil {
for range e.resultPool {
}
Expand Down
8 changes: 8 additions & 0 deletions executor/merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type MergeJoinExec struct {
innerRows []chunk.Row
innerIter4Row chunk.Iterator

childrenResults []*chunk.Chunk

memTracker *memory.Tracker
}

Expand Down Expand Up @@ -177,6 +179,7 @@ func (t *mergeJoinInnerTable) reallocReaderResult() {
// Close implements the Executor Close interface.
func (e *MergeJoinExec) Close() error {
e.memTracker.Detach()
e.childrenResults = nil
e.memTracker = nil

return errors.Trace(e.baseExecutor.Close())
Expand All @@ -192,6 +195,11 @@ func (e *MergeJoinExec) Open(ctx context.Context) error {
e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaMergeJoin)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)

e.childrenResults = make([]*chunk.Chunk, 0, len(e.children))
for _, child := range e.children {
e.childrenResults = append(e.childrenResults, child.newChunk())
}

e.innerTable.memTracker = memory.NewTracker("innerTable", -1)
e.innerTable.memTracker.AttachTo(e.memTracker)

Expand Down
Loading

0 comments on commit 4f16bdd

Please sign in to comment.