Skip to content

Commit

Permalink
executor: fix CTE goroutine leak when exceeds mem quota (#50828) (#51168
Browse files Browse the repository at this point in the history
)

close #50337
  • Loading branch information
ti-chi-bot authored Feb 22, 2024
1 parent 1f0ca5f commit cbd44d4
Showing 1 changed file with 76 additions and 26 deletions.
102 changes: 76 additions & 26 deletions executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/cteutil"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"go.uber.org/zap"
)

var _ Executor = &CTEExec{}
Expand Down Expand Up @@ -88,6 +90,9 @@ func (e *CTEExec) Open(ctx context.Context) (err error) {
return err
}
}
if e.producer.openErr != nil {
return e.producer.openErr
}
if !e.producer.opened {
if err = e.producer.openProducer(ctx, e); err != nil {
return err
Expand All @@ -108,21 +113,38 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
return e.producer.getChunk(ctx, e, req)
}

// Close implements the Executor interface.
func (e *CTEExec) Close() (err error) {
e.producer.resTbl.Lock()
if !e.producer.closed {
// closeProducer() only close seedExec and recursiveExec, will not touch resTbl.
// It means you can still read resTbl after call closeProducer().
// You can even call all three functions(openProducer/produce/closeProducer) in CTEExec.Next().
// Separating these three function calls is only to follow the abstraction of the volcano model.
err = e.producer.closeProducer()
}
e.producer.resTbl.Unlock()
if err != nil {
return err
func setFirstErr(firstErr error, newErr error, msg string) error {
if newErr != nil {
logutil.BgLogger().Error("cte got error", zap.Any("err", newErr), zap.Any("extra msg", msg))
if firstErr == nil {
firstErr = newErr
}
}
return e.baseExecutor.Close()
return firstErr
}

// Close implements the Executor interface.
func (e *CTEExec) Close() (firstErr error) {
func() {
e.producer.resTbl.Lock()
defer e.producer.resTbl.Unlock()
if !e.producer.closed {
failpoint.Inject("mock_cte_exec_panic_avoid_deadlock", func(v failpoint.Value) {
if ok := v.(bool); ok {
panic("mock mem oom panic")
}
})
// closeProducer() only close seedExec and recursiveExec, will not touch resTbl.
// It means you can still read resTbl after call closeProducer().
// You can even call all three functions(openProducer/produce/closeProducer) in CTEExec.Next().
// Separating these three function calls is only to follow the abstraction of the volcano model.
err := e.producer.closeProducer()
firstErr = setFirstErr(firstErr, err, "close cte producer error")
}
}()
err := e.baseExecutor.Close()
firstErr = setFirstErr(firstErr, err, "close cte children error")
return
}

func (e *CTEExec) reset() {
Expand All @@ -132,10 +154,16 @@ func (e *CTEExec) reset() {
}

type cteProducer struct {
// opened should be false when not open or open fail(a.k.a. openErr != nil)
opened bool
produced bool
closed bool

// cteProducer is shared by multiple operators, so if the first operator tries to open
// and got error, the second should return open error directly instead of open again.
// Otherwise there may be resource leak because Close() only clean resource for the last Open().
openErr error

ctx sessionctx.Context

seedExec Executor
Expand Down Expand Up @@ -169,13 +197,22 @@ type cteProducer struct {
}

func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err error) {
defer func() {
p.openErr = err
if err == nil {
p.opened = true
} else {
p.opened = false
}
}()
if p.seedExec == nil {
return errors.New("seedExec for CTEExec is nil")
}
if err = p.seedExec.Open(ctx); err != nil {
return err
}

p.resetTracker()
p.memTracker = memory.NewTracker(cteExec.id, -1)
p.diskTracker = disk.NewTracker(cteExec.id, -1)
p.memTracker.AttachTo(p.ctx.GetSessionVars().StmtCtx.MemTracker)
Expand Down Expand Up @@ -206,28 +243,29 @@ func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err e
p.hCtx.keyColIdx[i] = i
}
}
p.opened = true
return nil
}

func (p *cteProducer) closeProducer() (err error) {
if err = p.seedExec.Close(); err != nil {
return err
}
func (p *cteProducer) closeProducer() (firstErr error) {
err := p.seedExec.Close()
firstErr = setFirstErr(firstErr, err, "close seedExec err")
if p.recursiveExec != nil {
if err = p.recursiveExec.Close(); err != nil {
return err
}
err = p.recursiveExec.Close()
firstErr = setFirstErr(firstErr, err, "close recursiveExec err")

// `iterInTbl` and `resTbl` are shared by multiple operators,
// so will be closed when the SQL finishes.
if p.iterOutTbl != nil {
if err = p.iterOutTbl.DerefAndClose(); err != nil {
return err
}
err = p.iterOutTbl.DerefAndClose()
firstErr = setFirstErr(firstErr, err, "deref iterOutTbl err")
}
}
// Reset to nil instead of calling Detach(),
// because ExplainExec still needs tracker to get mem usage info.
p.memTracker = nil
p.diskTracker = nil
p.closed = true
return nil
return
}

func (p *cteProducer) getChunk(ctx context.Context, cteExec *CTEExec, req *chunk.Chunk) (err error) {
Expand Down Expand Up @@ -470,10 +508,22 @@ func (p *cteProducer) reset() {
p.hashTbl = nil

p.opened = false
p.openErr = nil
p.produced = false
p.closed = false
}

func (p *cteProducer) resetTracker() {
if p.memTracker != nil {
p.memTracker.Detach()
p.memTracker = nil
}
if p.diskTracker != nil {
p.diskTracker.Detach()
p.diskTracker = nil
}
}

func (p *cteProducer) reopenTbls() (err error) {
if p.isDistinct {
p.hashTbl = newConcurrentMapHashTable()
Expand Down

0 comments on commit cbd44d4

Please sign in to comment.