Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix CTE goroutine leak when exceeds mem quota (#50828) #51169

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 108 additions & 4 deletions executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,29 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
<<<<<<< HEAD:executor/cte.go
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/cteutil"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/memory"
=======
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/cteutil"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/disk"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/memory"
"go.uber.org/zap"
>>>>>>> fa340f3400a (executor: fix CTE goroutine leak when exceeds mem quota (#50828)):pkg/executor/cte.go
)

var _ Executor = &CTEExec{}
Expand Down Expand Up @@ -83,6 +99,9 @@ func (e *CTEExec) Open(ctx context.Context) (err error) {
if e.producer.isInApply {
e.producer.reset()
}
if e.producer.openErr != nil {
return e.producer.openErr
}
if !e.producer.opened {
if err = e.producer.openProducer(ctx, e); err != nil {
return err
Expand All @@ -103,7 +122,18 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
return e.producer.getChunk(ctx, e, req)
}

func setFirstErr(firstErr error, newErr error, msg string) error {
if newErr != nil {
logutil.BgLogger().Error("cte got error", zap.Any("err", newErr), zap.Any("extra msg", msg))
if firstErr == nil {
firstErr = newErr
}
}
return firstErr
}

// Close implements the Executor interface.
<<<<<<< HEAD:executor/cte.go
func (e *CTEExec) Close() (err error) {
e.producer.resTbl.Lock()
if !e.producer.closed {
Expand All @@ -114,6 +144,31 @@ func (e *CTEExec) Close() (err error) {
return err
}
return e.baseExecutor.Close()
=======
func (e *CTEExec) Close() (firstErr error) {
func() {
e.producer.resTbl.Lock()
defer e.producer.resTbl.Unlock()
if !e.producer.closed {
failpoint.Inject("mock_cte_exec_panic_avoid_deadlock", func(v failpoint.Value) {
ok := v.(bool)
if ok {
// mock an oom panic, returning ErrMemoryExceedForQuery for error identification in recovery work.
panic(exeerrors.ErrMemoryExceedForQuery)
}
})
// closeProducer() only close seedExec and recursiveExec, will not touch resTbl.
// It means you can still read resTbl after call closeProducer().
// You can even call all three functions(openProducer/produce/closeProducer) in CTEExec.Next().
// Separating these three function calls is only to follow the abstraction of the volcano model.
err := e.producer.closeProducer()
firstErr = setFirstErr(firstErr, err, "close cte producer error")
}
}()
err := e.BaseExecutor.Close()
firstErr = setFirstErr(firstErr, err, "close cte children error")
return
>>>>>>> fa340f3400a (executor: fix CTE goroutine leak when exceeds mem quota (#50828)):pkg/executor/cte.go
}

func (e *CTEExec) reset() {
Expand All @@ -123,10 +178,16 @@ func (e *CTEExec) reset() {
}

type cteProducer struct {
// opened should be false when not open or open fail(a.k.a. openErr != nil)
opened bool
produced bool
closed bool

// cteProducer is shared by multiple operators, so if the first operator tries to open
// and got error, the second should return open error directly instead of open again.
// Otherwise there may be resource leak because Close() only clean resource for the last Open().
openErr error

ctx sessionctx.Context

seedExec Executor
Expand Down Expand Up @@ -161,15 +222,29 @@ type cteProducer struct {
}

func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err error) {
defer func() {
p.openErr = err
if err == nil {
p.opened = true
} else {
p.opened = false
}
}()
if p.seedExec == nil {
return errors.New("seedExec for CTEExec is nil")
}
if err = p.seedExec.Open(ctx); err != nil {
return err
}

<<<<<<< HEAD:executor/cte.go
p.memTracker = memory.NewTracker(cteExec.id, -1)
p.diskTracker = disk.NewTracker(cteExec.id, -1)
=======
p.resetTracker()
p.memTracker = memory.NewTracker(cteExec.ID(), -1)
p.diskTracker = disk.NewTracker(cteExec.ID(), -1)
>>>>>>> fa340f3400a (executor: fix CTE goroutine leak when exceeds mem quota (#50828)):pkg/executor/cte.go
p.memTracker.AttachTo(p.ctx.GetSessionVars().StmtCtx.MemTracker)
p.diskTracker.AttachTo(p.ctx.GetSessionVars().StmtCtx.DiskTracker)

Expand Down Expand Up @@ -198,10 +273,10 @@ func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err e
p.hCtx.keyColIdx[i] = i
}
}
p.opened = true
return nil
}

<<<<<<< HEAD:executor/cte.go
func (p *cteProducer) closeProducer() (err error) {
if err = p.seedExec.Close(); err != nil {
return err
Expand All @@ -210,21 +285,38 @@ func (p *cteProducer) closeProducer() (err error) {
if err = p.recursiveExec.Close(); err != nil {
return err
}
=======
func (p *cteProducer) closeProducer() (firstErr error) {
err := exec.Close(p.seedExec)
firstErr = setFirstErr(firstErr, err, "close seedExec err")

if p.recursiveExec != nil {
err = exec.Close(p.recursiveExec)
firstErr = setFirstErr(firstErr, err, "close recursiveExec err")

>>>>>>> fa340f3400a (executor: fix CTE goroutine leak when exceeds mem quota (#50828)):pkg/executor/cte.go
// `iterInTbl` and `resTbl` are shared by multiple operators,
// so will be closed when the SQL finishes.
if p.iterOutTbl != nil {
if err = p.iterOutTbl.DerefAndClose(); err != nil {
return err
}
err = p.iterOutTbl.DerefAndClose()
firstErr = setFirstErr(firstErr, err, "deref iterOutTbl err")
}
}
// Reset to nil instead of calling Detach(),
// because ExplainExec still needs tracker to get mem usage info.
p.memTracker = nil
p.diskTracker = nil
p.closed = true
<<<<<<< HEAD:executor/cte.go
if p.isInApply {
if err = p.reopenTbls(); err != nil {
return err
}
}
return nil
=======
return
>>>>>>> fa340f3400a (executor: fix CTE goroutine leak when exceeds mem quota (#50828)):pkg/executor/cte.go
}

func (p *cteProducer) getChunk(ctx context.Context, cteExec *CTEExec, req *chunk.Chunk) (err error) {
Expand Down Expand Up @@ -467,10 +559,22 @@ func (p *cteProducer) reset() {
p.hashTbl = nil

p.opened = false
p.openErr = nil
p.produced = false
p.closed = false
}

func (p *cteProducer) resetTracker() {
if p.memTracker != nil {
p.memTracker.Reset()
p.memTracker = nil
}
if p.diskTracker != nil {
p.diskTracker.Reset()
p.diskTracker = nil
}
}

func (p *cteProducer) reopenTbls() (err error) {
if p.isDistinct {
p.hashTbl = newConcurrentMapHashTable()
Expand Down