Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#50828
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
guo-shaoge authored and ti-chi-bot committed Feb 20, 2024
1 parent 24213f2 commit 7d59a18
Showing 1 changed file with 88 additions and 7 deletions.
95 changes: 88 additions & 7 deletions executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
<<<<<<< HEAD:executor/cte.go
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand All @@ -28,6 +29,21 @@ import (
"github.com/pingcap/tidb/util/cteutil"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/memory"
=======
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/cteutil"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/disk"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/memory"
"go.uber.org/zap"
>>>>>>> fa340f3400a (executor: fix CTE goroutine leak when exceeds mem quota (#50828)):pkg/executor/cte.go
)

var _ Executor = &CTEExec{}
Expand Down Expand Up @@ -88,6 +104,9 @@ func (e *CTEExec) Open(ctx context.Context) (err error) {
return err
}
}
if e.producer.openErr != nil {
return e.producer.openErr
}
if !e.producer.opened {
if err = e.producer.openProducer(ctx, e); err != nil {
return err
Expand All @@ -108,8 +127,18 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
return e.producer.getChunk(ctx, e, req)
}

func setFirstErr(firstErr error, newErr error, msg string) error {
if newErr != nil {
logutil.BgLogger().Error("cte got error", zap.Any("err", newErr), zap.Any("extra msg", msg))
if firstErr == nil {
firstErr = newErr
}
}
return firstErr
}

// Close implements the Executor interface.
func (e *CTEExec) Close() (err error) {
func (e *CTEExec) Close() (firstErr error) {
func() {
e.producer.resTbl.Lock()
defer e.producer.resTbl.Unlock()
Expand All @@ -125,13 +154,20 @@ func (e *CTEExec) Close() (err error) {
// It means you can still read resTbl after call closeProducer().
// You can even call all three functions(openProducer/produce/closeProducer) in CTEExec.Next().
// Separating these three function calls is only to follow the abstraction of the volcano model.
err = e.producer.closeProducer()
err := e.producer.closeProducer()
firstErr = setFirstErr(firstErr, err, "close cte producer error")
}
}()
<<<<<<< HEAD:executor/cte.go
if err != nil {
return err
}
return e.baseExecutor.Close()
=======
err := e.BaseExecutor.Close()
firstErr = setFirstErr(firstErr, err, "close cte children error")
return
>>>>>>> fa340f3400a (executor: fix CTE goroutine leak when exceeds mem quota (#50828)):pkg/executor/cte.go
}

func (e *CTEExec) reset() {
Expand All @@ -141,10 +177,16 @@ func (e *CTEExec) reset() {
}

type cteProducer struct {
// opened should be false when not open or open fail(a.k.a. openErr != nil)
opened bool
produced bool
closed bool

// cteProducer is shared by multiple operators, so if the first operator tries to open
// and got error, the second should return open error directly instead of open again.
// Otherwise there may be resource leak because Close() only clean resource for the last Open().
openErr error

ctx sessionctx.Context

seedExec Executor
Expand Down Expand Up @@ -178,19 +220,33 @@ type cteProducer struct {
}

func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err error) {
defer func() {
p.openErr = err
if err == nil {
p.opened = true
} else {
p.opened = false
}
}()
if p.seedExec == nil {
return errors.New("seedExec for CTEExec is nil")
}
if err = p.seedExec.Open(ctx); err != nil {
return err
}

<<<<<<< HEAD:executor/cte.go
if p.memTracker != nil {
p.memTracker.Reset()
} else {
p.memTracker = memory.NewTracker(cteExec.id, -1)
}
p.diskTracker = disk.NewTracker(cteExec.id, -1)
=======
p.resetTracker()
p.memTracker = memory.NewTracker(cteExec.ID(), -1)
p.diskTracker = disk.NewTracker(cteExec.ID(), -1)
>>>>>>> fa340f3400a (executor: fix CTE goroutine leak when exceeds mem quota (#50828)):pkg/executor/cte.go
p.memTracker.AttachTo(p.ctx.GetSessionVars().StmtCtx.MemTracker)
p.diskTracker.AttachTo(p.ctx.GetSessionVars().StmtCtx.DiskTracker)

Expand Down Expand Up @@ -219,10 +275,10 @@ func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err e
p.hCtx.keyColIdx[i] = i
}
}
p.opened = true
return nil
}

<<<<<<< HEAD:executor/cte.go
func (p *cteProducer) closeProducer() (err error) {
if err = p.seedExec.Close(); err != nil {
return err
Expand All @@ -231,16 +287,29 @@ func (p *cteProducer) closeProducer() (err error) {
if err = p.recursiveExec.Close(); err != nil {
return err
}
=======
func (p *cteProducer) closeProducer() (firstErr error) {
err := exec.Close(p.seedExec)
firstErr = setFirstErr(firstErr, err, "close seedExec err")

if p.recursiveExec != nil {
err = exec.Close(p.recursiveExec)
firstErr = setFirstErr(firstErr, err, "close recursiveExec err")

>>>>>>> fa340f3400a (executor: fix CTE goroutine leak when exceeds mem quota (#50828)):pkg/executor/cte.go
// `iterInTbl` and `resTbl` are shared by multiple operators,
// so will be closed when the SQL finishes.
if p.iterOutTbl != nil {
if err = p.iterOutTbl.DerefAndClose(); err != nil {
return err
}
err = p.iterOutTbl.DerefAndClose()
firstErr = setFirstErr(firstErr, err, "deref iterOutTbl err")
}
}
// Reset to nil instead of calling Detach(),
// because ExplainExec still needs tracker to get mem usage info.
p.memTracker = nil
p.diskTracker = nil
p.closed = true
return nil
return
}

func (p *cteProducer) getChunk(ctx context.Context, cteExec *CTEExec, req *chunk.Chunk) (err error) {
Expand Down Expand Up @@ -483,10 +552,22 @@ func (p *cteProducer) reset() {
p.hashTbl = nil

p.opened = false
p.openErr = nil
p.produced = false
p.closed = false
}

func (p *cteProducer) resetTracker() {
if p.memTracker != nil {
p.memTracker.Reset()
p.memTracker = nil
}
if p.diskTracker != nil {
p.diskTracker.Reset()
p.diskTracker = nil
}
}

func (p *cteProducer) reopenTbls() (err error) {
if p.isDistinct {
p.hashTbl = newConcurrentMapHashTable()
Expand Down

0 comments on commit 7d59a18

Please sign in to comment.