diff --git a/pkg/executor/cte.go b/pkg/executor/cte.go index dcad923cabc2f..ef8babbc86aab 100644 --- a/pkg/executor/cte.go +++ b/pkg/executor/cte.go @@ -29,7 +29,9 @@ import ( "github.com/pingcap/tidb/pkg/util/cteutil" "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" "github.com/pingcap/tidb/pkg/util/disk" + "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/memory" + "go.uber.org/zap" ) var _ exec.Executor = &CTEExec{} @@ -90,6 +92,9 @@ func (e *CTEExec) Open(ctx context.Context) (err error) { return err } } + if e.producer.openErr != nil { + return e.producer.openErr + } if !e.producer.opened { if err = e.producer.openProducer(ctx, e); err != nil { return err @@ -110,8 +115,18 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { return e.producer.getChunk(e, req) } +func setFirstErr(firstErr error, newErr error, msg string) error { + if newErr != nil { + logutil.BgLogger().Error("cte got error", zap.Any("err", newErr), zap.Any("extra msg", msg)) + if firstErr == nil { + firstErr = newErr + } + } + return firstErr +} + // Close implements the Executor interface. -func (e *CTEExec) Close() (err error) { +func (e *CTEExec) Close() (firstErr error) { func() { e.producer.resTbl.Lock() defer e.producer.resTbl.Unlock() @@ -127,13 +142,13 @@ func (e *CTEExec) Close() (err error) { // It means you can still read resTbl after call closeProducer(). // You can even call all three functions(openProducer/produce/closeProducer) in CTEExec.Next(). // Separating these three function calls is only to follow the abstraction of the volcano model. - err = e.producer.closeProducer() + err := e.producer.closeProducer() + firstErr = setFirstErr(firstErr, err, "close cte producer error") } }() - if err != nil { - return err - } - return e.BaseExecutor.Close() + err := e.BaseExecutor.Close() + firstErr = setFirstErr(firstErr, err, "close cte children error") + return } func (e *CTEExec) reset() { @@ -143,10 +158,16 @@ func (e *CTEExec) reset() { } type cteProducer struct { + // opened should be false when not open or open fail(a.k.a. openErr != nil) opened bool produced bool closed bool + // cteProducer is shared by multiple operators, so if the first operator tries to open + // and got error, the second should return open error directly instead of open again. + // Otherwise there may be resource leak because Close() only clean resource for the last Open(). + openErr error + ctx sessionctx.Context seedExec exec.Executor @@ -180,6 +201,14 @@ type cteProducer struct { } func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err error) { + defer func() { + p.openErr = err + if err == nil { + p.opened = true + } else { + p.opened = false + } + }() if p.seedExec == nil { return errors.New("seedExec for CTEExec is nil") } @@ -187,11 +216,8 @@ func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err e return err } - if p.memTracker != nil { - p.memTracker.Reset() - } else { - p.memTracker = memory.NewTracker(cteExec.ID(), -1) - } + p.resetTracker() + p.memTracker = memory.NewTracker(cteExec.ID(), -1) p.diskTracker = disk.NewTracker(cteExec.ID(), -1) p.memTracker.AttachTo(p.ctx.GetSessionVars().StmtCtx.MemTracker) p.diskTracker.AttachTo(p.ctx.GetSessionVars().StmtCtx.DiskTracker) @@ -221,28 +247,29 @@ func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err e p.hCtx.keyColIdx[i] = i } } - p.opened = true return nil } -func (p *cteProducer) closeProducer() (err error) { - if err = p.seedExec.Close(); err != nil { - return err - } +func (p *cteProducer) closeProducer() (firstErr error) { + err := p.seedExec.Close() + firstErr = setFirstErr(firstErr, err, "close seedExec err") if p.recursiveExec != nil { - if err = p.recursiveExec.Close(); err != nil { - return err - } + err = p.recursiveExec.Close() + firstErr = setFirstErr(firstErr, err, "close recursiveExec err") + // `iterInTbl` and `resTbl` are shared by multiple operators, // so will be closed when the SQL finishes. if p.iterOutTbl != nil { - if err = p.iterOutTbl.DerefAndClose(); err != nil { - return err - } + err = p.iterOutTbl.DerefAndClose() + firstErr = setFirstErr(firstErr, err, "deref iterOutTbl err") } } + // Reset to nil instead of calling Detach(), + // because ExplainExec still needs tracker to get mem usage info. + p.memTracker = nil + p.diskTracker = nil p.closed = true - return nil + return } func (p *cteProducer) getChunk(cteExec *CTEExec, req *chunk.Chunk) (err error) { @@ -485,10 +512,22 @@ func (p *cteProducer) reset() { p.hashTbl = nil p.opened = false + p.openErr = nil p.produced = false p.closed = false } +func (p *cteProducer) resetTracker() { + if p.memTracker != nil { + p.memTracker.Reset() + p.memTracker = nil + } + if p.diskTracker != nil { + p.diskTracker.Reset() + p.diskTracker = nil + } +} + func (p *cteProducer) reopenTbls() (err error) { if p.isDistinct { p.hashTbl = newConcurrentMapHashTable()