Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix CTE goroutine leak when exceeds mem quota #50828

Merged
merged 7 commits into from
Feb 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 63 additions & 23 deletions pkg/executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ import (
"github.com/pingcap/tidb/pkg/util/cteutil"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/disk"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/memory"
"go.uber.org/zap"
)

var _ exec.Executor = &CTEExec{}
Expand Down Expand Up @@ -91,6 +93,9 @@ func (e *CTEExec) Open(ctx context.Context) (err error) {
return err
}
}
if e.producer.openErr != nil {
return e.producer.openErr
}
if !e.producer.opened {
if err = e.producer.openProducer(ctx, e); err != nil {
return err
Expand All @@ -111,8 +116,18 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
return e.producer.getChunk(e, req)
}

func setFirstErr(firstErr error, newErr error, msg string) error {
if newErr != nil {
logutil.BgLogger().Error("cte got error", zap.Any("err", newErr), zap.Any("extra msg", msg))
if firstErr == nil {
firstErr = newErr
}
}
return firstErr
}

// Close implements the Executor interface.
func (e *CTEExec) Close() (err error) {
func (e *CTEExec) Close() (firstErr error) {
func() {
e.producer.resTbl.Lock()
defer e.producer.resTbl.Unlock()
Expand All @@ -128,13 +143,13 @@ func (e *CTEExec) Close() (err error) {
// It means you can still read resTbl after call closeProducer().
// You can even call all three functions(openProducer/produce/closeProducer) in CTEExec.Next().
// Separating these three function calls is only to follow the abstraction of the volcano model.
err = e.producer.closeProducer()
err := e.producer.closeProducer()
firstErr = setFirstErr(firstErr, err, "close cte producer error")
}
}()
if err != nil {
return err
}
return e.BaseExecutor.Close()
err := e.BaseExecutor.Close()
firstErr = setFirstErr(firstErr, err, "close cte children error")
return
}

func (e *CTEExec) reset() {
Expand All @@ -144,10 +159,16 @@ func (e *CTEExec) reset() {
}

type cteProducer struct {
// opened should be false when not open or open fail(a.k.a. openErr != nil)
opened bool
produced bool
closed bool

// cteProducer is shared by multiple operators, so if the first operator tries to open
// and got error, the second should return open error directly instead of open again.
// Otherwise there may be resource leak because Close() only clean resource for the last Open().
openErr error

ctx sessionctx.Context

seedExec exec.Executor
Expand Down Expand Up @@ -181,18 +202,23 @@ type cteProducer struct {
}

func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err error) {
defer func() {
p.openErr = err
if err == nil {
p.opened = true
} else {
p.opened = false
}
}()
if p.seedExec == nil {
return errors.New("seedExec for CTEExec is nil")
}
if err = exec.Open(ctx, p.seedExec); err != nil {
return err
}

if p.memTracker != nil {
p.memTracker.Reset()
} else {
p.memTracker = memory.NewTracker(cteExec.ID(), -1)
}
p.resetTracker()
p.memTracker = memory.NewTracker(cteExec.ID(), -1)
p.diskTracker = disk.NewTracker(cteExec.ID(), -1)
p.memTracker.AttachTo(p.ctx.GetSessionVars().StmtCtx.MemTracker)
p.diskTracker.AttachTo(p.ctx.GetSessionVars().StmtCtx.DiskTracker)
Expand Down Expand Up @@ -222,28 +248,30 @@ func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err e
p.hCtx.keyColIdx[i] = i
}
}
p.opened = true
return nil
}

func (p *cteProducer) closeProducer() (err error) {
if err = exec.Close(p.seedExec); err != nil {
return err
}
func (p *cteProducer) closeProducer() (firstErr error) {
err := exec.Close(p.seedExec)
firstErr = setFirstErr(firstErr, err, "close seedExec err")

if p.recursiveExec != nil {
if err = exec.Close(p.recursiveExec); err != nil {
return err
}
err = exec.Close(p.recursiveExec)
firstErr = setFirstErr(firstErr, err, "close recursiveExec err")

// `iterInTbl` and `resTbl` are shared by multiple operators,
// so will be closed when the SQL finishes.
if p.iterOutTbl != nil {
if err = p.iterOutTbl.DerefAndClose(); err != nil {
return err
}
err = p.iterOutTbl.DerefAndClose()
firstErr = setFirstErr(firstErr, err, "deref iterOutTbl err")
}
}
// Reset to nil instead of calling Detach(),
// because ExplainExec still needs tracker to get mem usage info.
p.memTracker = nil
p.diskTracker = nil
p.closed = true
return nil
return
}

func (p *cteProducer) getChunk(cteExec *CTEExec, req *chunk.Chunk) (err error) {
Expand Down Expand Up @@ -486,10 +514,22 @@ func (p *cteProducer) reset() {
p.hashTbl = nil

p.opened = false
p.openErr = nil
p.produced = false
p.closed = false
}

func (p *cteProducer) resetTracker() {
if p.memTracker != nil {
p.memTracker.Reset()
p.memTracker = nil
}
if p.diskTracker != nil {
p.diskTracker.Reset()
p.diskTracker = nil
}
}

func (p *cteProducer) reopenTbls() (err error) {
if p.isDistinct {
p.hashTbl = newConcurrentMapHashTable()
Expand Down