Skip to content

Commit

Permalink
executor: avoid ProjectoinExec's goroutine leak (#14127) (#14226)
Browse files Browse the repository at this point in the history
  • Loading branch information
fzhedu authored and sre-bot committed Dec 26, 2019
1 parent da1427a commit 4eb6c8e
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 8 deletions.
33 changes: 25 additions & 8 deletions executor/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/util/chunk"
)
Expand Down Expand Up @@ -70,23 +71,39 @@ func (e *ExplainExec) Next(ctx context.Context, req *chunk.Chunk) error {
return nil
}

func (e *ExplainExec) generateExplainInfo(ctx context.Context) ([][]string, error) {
func (e *ExplainExec) generateExplainInfo(ctx context.Context) (rows [][]string, err error) {
closed := false
defer func() {
if !closed && e.analyzeExec != nil {
err = e.analyzeExec.Close()
closed = true
}
}()
if e.analyzeExec != nil {
chk := newFirstChunk(e.analyzeExec)
var nextErr, closeErr error
for {
err := e.analyzeExec.Next(ctx, chk)
if err != nil {
return nil, err
}
if chk.NumRows() == 0 {
nextErr = Next(ctx, e.analyzeExec, chk)
if nextErr != nil || chk.NumRows() == 0 {
break
}
}
if err := e.analyzeExec.Close(); err != nil {
closeErr = e.analyzeExec.Close()
closed = true
if nextErr != nil {
if closeErr != nil {
err = errors.New(nextErr.Error() + ", " + closeErr.Error())
} else {
err = nextErr
}
} else if closeErr != nil {
err = closeErr
}
if err != nil {
return nil, err
}
}
if err := e.explain.RenderResult(); err != nil {
if err = e.explain.RenderResult(); err != nil {
return nil, err
}
if e.analyzeExec != nil {
Expand Down
92 changes: 92 additions & 0 deletions executor/explain_unit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"context"
"errors"
"testing"

"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/mock"
)

var (
_ Executor = &mockErrorOperator{}
)

type mockErrorOperator struct {
baseExecutor
toPanic bool
closed bool
}

func (e *mockErrorOperator) Open(ctx context.Context) error {
return nil
}

func (e *mockErrorOperator) Next(ctx context.Context, req *chunk.Chunk) error {
if e.toPanic {
panic("next panic")
} else {
return errors.New("next error")
}
}

func (e *mockErrorOperator) Close() error {
e.closed = true
return errors.New("close error")
}

func getColumns() []*expression.Column {
return []*expression.Column{
{Index: 1, RetType: types.NewFieldType(mysql.TypeLonglong)},
}
}

// close() must be called after next() to avoid goroutines leak
func TestExplainAnalyzeInvokeNextAndClose(t *testing.T) {
ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
schema := expression.NewSchema(getColumns()...)
baseExec := newBaseExecutor(ctx, schema, nil)
explainExec := &ExplainExec{
baseExecutor: baseExec,
explain: nil,
}
// mockErrorOperator returns errors
mockOper := mockErrorOperator{baseExec, false, false}
explainExec.analyzeExec = &mockOper
tmpCtx := context.Background()
_, err := explainExec.generateExplainInfo(tmpCtx)

expectedStr := "next error, close error"
if err.Error() != expectedStr || !mockOper.closed {
t.Errorf(err.Error())
}
// mockErrorOperator panic
mockOper = mockErrorOperator{baseExec, true, false}
explainExec.analyzeExec = &mockOper
defer func() {
if panicErr := recover(); panicErr == nil || !mockOper.closed {
t.Errorf("panic test failed: without panic or close() is not called")
}
}()
_, err = explainExec.generateExplainInfo(tmpCtx)
}

0 comments on commit 4eb6c8e

Please sign in to comment.