Skip to content

Commit

Permalink
executor: fix CTE goroutine leak when exceeds mem quota (#50828) (#51167
Browse files Browse the repository at this point in the history
)

close #50337
  • Loading branch information
ti-chi-bot authored Feb 23, 2024
1 parent 885e22f commit debaf17
Showing 1 changed file with 62 additions and 23 deletions.
85 changes: 62 additions & 23 deletions executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/cteutil"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"go.uber.org/zap"
)

var _ Executor = &CTEExec{}
Expand Down Expand Up @@ -88,6 +90,9 @@ func (e *CTEExec) Open(ctx context.Context) (err error) {
return err
}
}
if e.producer.openErr != nil {
return e.producer.openErr
}
if !e.producer.opened {
if err = e.producer.openProducer(ctx, e); err != nil {
return err
Expand All @@ -108,8 +113,18 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
return e.producer.getChunk(ctx, e, req)
}

func setFirstErr(firstErr error, newErr error, msg string) error {
if newErr != nil {
logutil.BgLogger().Error("cte got error", zap.Any("err", newErr), zap.Any("extra msg", msg))
if firstErr == nil {
firstErr = newErr
}
}
return firstErr
}

// Close implements the Executor interface.
func (e *CTEExec) Close() (err error) {
func (e *CTEExec) Close() (firstErr error) {
func() {
e.producer.resTbl.Lock()
defer e.producer.resTbl.Unlock()
Expand All @@ -125,13 +140,13 @@ func (e *CTEExec) Close() (err error) {
// It means you can still read resTbl after call closeProducer().
// You can even call all three functions(openProducer/produce/closeProducer) in CTEExec.Next().
// Separating these three function calls is only to follow the abstraction of the volcano model.
err = e.producer.closeProducer()
err := e.producer.closeProducer()
firstErr = setFirstErr(firstErr, err, "close cte producer error")
}
}()
if err != nil {
return err
}
return e.baseExecutor.Close()
err := e.baseExecutor.Close()
firstErr = setFirstErr(firstErr, err, "close cte children error")
return
}

func (e *CTEExec) reset() {
Expand All @@ -141,10 +156,16 @@ func (e *CTEExec) reset() {
}

type cteProducer struct {
// opened should be false when not open or open fail(a.k.a. openErr != nil)
opened bool
produced bool
closed bool

// cteProducer is shared by multiple operators, so if the first operator tries to open
// and got error, the second should return open error directly instead of open again.
// Otherwise there may be resource leak because Close() only clean resource for the last Open().
openErr error

ctx sessionctx.Context

seedExec Executor
Expand Down Expand Up @@ -178,18 +199,23 @@ type cteProducer struct {
}

func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err error) {
defer func() {
p.openErr = err
if err == nil {
p.opened = true
} else {
p.opened = false
}
}()
if p.seedExec == nil {
return errors.New("seedExec for CTEExec is nil")
}
if err = p.seedExec.Open(ctx); err != nil {
return err
}

if p.memTracker != nil {
p.memTracker.Reset()
} else {
p.memTracker = memory.NewTracker(cteExec.id, -1)
}
p.resetTracker()
p.memTracker = memory.NewTracker(cteExec.id, -1)
p.diskTracker = disk.NewTracker(cteExec.id, -1)
p.memTracker.AttachTo(p.ctx.GetSessionVars().StmtCtx.MemTracker)
p.diskTracker.AttachTo(p.ctx.GetSessionVars().StmtCtx.DiskTracker)
Expand Down Expand Up @@ -219,28 +245,29 @@ func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err e
p.hCtx.keyColIdx[i] = i
}
}
p.opened = true
return nil
}

func (p *cteProducer) closeProducer() (err error) {
if err = p.seedExec.Close(); err != nil {
return err
}
func (p *cteProducer) closeProducer() (firstErr error) {
err := p.seedExec.Close()
firstErr = setFirstErr(firstErr, err, "close seedExec err")
if p.recursiveExec != nil {
if err = p.recursiveExec.Close(); err != nil {
return err
}
err = p.recursiveExec.Close()
firstErr = setFirstErr(firstErr, err, "close recursiveExec err")

// `iterInTbl` and `resTbl` are shared by multiple operators,
// so will be closed when the SQL finishes.
if p.iterOutTbl != nil {
if err = p.iterOutTbl.DerefAndClose(); err != nil {
return err
}
err = p.iterOutTbl.DerefAndClose()
firstErr = setFirstErr(firstErr, err, "deref iterOutTbl err")
}
}
// Reset to nil instead of calling Detach(),
// because ExplainExec still needs tracker to get mem usage info.
p.memTracker = nil
p.diskTracker = nil
p.closed = true
return nil
return
}

func (p *cteProducer) getChunk(ctx context.Context, cteExec *CTEExec, req *chunk.Chunk) (err error) {
Expand Down Expand Up @@ -483,10 +510,22 @@ func (p *cteProducer) reset() {
p.hashTbl = nil

p.opened = false
p.openErr = nil
p.produced = false
p.closed = false
}

func (p *cteProducer) resetTracker() {
if p.memTracker != nil {
p.memTracker.Reset()
p.memTracker = nil
}
if p.diskTracker != nil {
p.diskTracker.Reset()
p.diskTracker = nil
}
}

func (p *cteProducer) reopenTbls() (err error) {
if p.isDistinct {
p.hashTbl = newConcurrentMapHashTable()
Expand Down

0 comments on commit debaf17

Please sign in to comment.