Skip to content

Commit

Permalink
fix two bugs
Browse files Browse the repository at this point in the history
1. unlock() manually -> defer unlock.
2. reopen CTEStorage when we got error during the filling of CTEStorage.

Signed-off-by: guo-shaoge <shaoge1994@163.com>
  • Loading branch information
guo-shaoge committed May 11, 2021
1 parent 2f2c162 commit f492570
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 81 deletions.
186 changes: 109 additions & 77 deletions executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,88 +108,29 @@ func (e *CTEExec) Open(ctx context.Context) (err error) {
func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
req.Reset()
e.resTbl.Lock()
defer func() {
e.resTbl.Unlock()
}()
if !e.resTbl.Done() {
// e.resTbl and e.iterInTbl is shared by different CTEExec, so only setup once.
setupCTEStorageTracker(e.resTbl, e.ctx)
setupCTEStorageTracker(e.iterInTbl, e.ctx)

// Compute seed part.
e.curIter = 0
e.iterInTbl.SetIter(e.curIter)
if e.curIter >= e.ctx.GetSessionVars().CTEMaxRecursionDepth {
return ErrCTEMaxRecursionDepth.GenWithStackByArgs(e.curIter + 1)
if err = e.computeSeedPart(ctx); err != nil {
// Don't put it in defer.
// Because it should be called only when the filling is not completed.
if err1 := e.reopenTbls(); err1 != nil {
return err1
}
return err
}
for {
chk := newFirstChunk(e.seedExec)
if err = Next(ctx, e.seedExec, chk); err != nil {
return err
}
if chk.NumRows() == 0 {
break
}
if chk, err = e.iterInTbl.Add(chk); err != nil {
return err
}
if _, err = e.resTbl.Add(chk); err != nil {
return err
}
}

// TODO: too tricky. This means iterInTbl fill done
e.curIter++
close(e.iterInTbl.GetBegCh())

if e.recursiveExec != nil && e.iterInTbl.NumChunks() != 0 {
// Start to compute recursive part. Iteration 1 begins.
for {
chk := newFirstChunk(e.recursiveExec)
if err = Next(ctx, e.recursiveExec, chk); err != nil {
return err
}
if chk.NumRows() == 0 {
e.iterInTbl.ResetData()
for i := 0; i < e.iterOutTbl.NumChunks(); i++ {
if chk, err = e.iterOutTbl.GetChunk(i); err != nil {
return err
}
if chk, err = e.resTbl.Add(chk); err != nil {
return err
}
if _, err = e.iterInTbl.Add(chk); err != nil {
return err
}
}
if err = e.iterOutTbl.ResetData(); err != nil {
return err
}
if e.iterInTbl.NumChunks() == 0 {
break
} else {
if e.curIter >= e.ctx.GetSessionVars().CTEMaxRecursionDepth {
return ErrCTEMaxRecursionDepth.GenWithStackByArgs(e.curIter + 1)
}
// Next iteration begins. Need use iterOutTbl as input of next iteration.
e.curIter++
e.iterInTbl.SetIter(e.curIter)
// Make sure iterInTbl is setup before Close/Open,
// because some executors will read iterInTbl in Open() (like IndexLookupJoin).
if err = e.recursiveExec.Close(); err != nil {
return err
}
if err = e.recursiveExec.Open(ctx); err != nil {
return err
}
}
} else {
if _, err = e.iterOutTbl.Add(chk); err != nil {
return err
}
}
}
}
e.resTbl.SetDone()
}
e.resTbl.Unlock()
if err = e.computeRecursivePart(ctx); err != nil {
if err1 := e.reopenTbls(); err1 != nil {
return err1
}
return err
}
}
e.resTbl.SetDone()

if e.chkIdx < e.resTbl.NumChunks() {
res, err := e.resTbl.GetChunk(e.chkIdx)
Expand Down Expand Up @@ -226,11 +167,102 @@ func (e *CTEExec) Close() (err error) {
return e.baseExecutor.Close()
}

func (e *CTEExec) computeSeedPart(ctx context.Context) (err error) {
e.curIter = 0
e.iterInTbl.SetIter(e.curIter)
if e.curIter >= e.ctx.GetSessionVars().CTEMaxRecursionDepth {
return ErrCTEMaxRecursionDepth.GenWithStackByArgs(e.curIter + 1)
}
for {
chk := newFirstChunk(e.seedExec)
if err = Next(ctx, e.seedExec, chk); err != nil {
return err
}
if chk.NumRows() == 0 {
break
}
if chk, err = e.iterInTbl.Add(chk); err != nil {
return err
}
if _, err = e.resTbl.Add(chk); err != nil {
return err
}
}
e.curIter++

// TODO: This means iterInTbl fill done. But too tricky.
close(e.iterInTbl.GetBegCh())
return nil
}

func (e *CTEExec) computeRecursivePart(ctx context.Context) (err error) {
if e.recursiveExec == nil || e.iterInTbl.NumChunks() == 0 {
return nil
}

for {
chk := newFirstChunk(e.recursiveExec)
if err = Next(ctx, e.recursiveExec, chk); err != nil {
return err
}
if chk.NumRows() == 0 {
e.iterInTbl.ResetData()
for i := 0; i < e.iterOutTbl.NumChunks(); i++ {
if chk, err = e.iterOutTbl.GetChunk(i); err != nil {
return err
}
if chk, err = e.resTbl.Add(chk); err != nil {
return err
}
if _, err = e.iterInTbl.Add(chk); err != nil {
return err
}
}
if err = e.iterOutTbl.ResetData(); err != nil {
return err
}
if e.iterInTbl.NumChunks() == 0 {
break
} else {
if e.curIter >= e.ctx.GetSessionVars().CTEMaxRecursionDepth {
return ErrCTEMaxRecursionDepth.GenWithStackByArgs(e.curIter + 1)
}
// Next iteration begins. Need use iterOutTbl as input of next iteration.
e.curIter++
e.iterInTbl.SetIter(e.curIter)
// Make sure iterInTbl is setup before Close/Open,
// because some executors will read iterInTbl in Open() (like IndexLookupJoin).
if err = e.recursiveExec.Close(); err != nil {
return err
}
if err = e.recursiveExec.Open(ctx); err != nil {
return err
}
}
} else {
if _, err = e.iterOutTbl.Add(chk); err != nil {
return err
}
}
}
return nil
}

func (e *CTEExec) reset() {
e.curIter = 0
e.chkIdx = 0
}

func (e *CTEExec) reopenTbls() (err error) {
if err := e.resTbl.Reopen(); err != nil {
return err
}
if err := e.iterInTbl.Reopen(); err != nil {
return err
}
return nil
}

func setupCTEStorageTracker(tbl CTEStorage, ctx sessionctx.Context) {
memTracker := tbl.GetMemTracker()
memTracker.SetLabel(memory.LabelForCTEStorage)
Expand Down
29 changes: 25 additions & 4 deletions executor/cte_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type CTEStorage interface {
ForceClose() error

ResetData() error
Reopen() error
NumChunks() int

GetMemTracker() *memory.Tracker
Expand All @@ -84,15 +85,18 @@ type CTEStorage interface {
type CTEStorageRC struct {
// meta info
mu sync.Mutex
begCh chan struct{}
refCnt int
done bool
iter int
filterDup bool
sc *stmtctx.StatementContext
tp []*types.FieldType
chkSize int

// data info
tp []*types.FieldType
begCh chan struct{}
done bool
iter int

// data
rc *chunk.RowContainer
// TODO: also track mem usage of ht
ht baseHashTable
Expand All @@ -109,6 +113,7 @@ func (s *CTEStorageRC) OpenAndRef(fieldType []*types.FieldType, chkSize int) (er
return errors.Trace(errors.New("chunk field types are nil"))
}
s.tp = fieldType
s.chkSize = chkSize
s.rc = chunk.NewRowContainer(fieldType, chkSize)
s.refCnt = 1
s.begCh = make(chan struct{})
Expand Down Expand Up @@ -222,6 +227,22 @@ func (s *CTEStorageRC) ResetData() error {
return s.rc.Reset()
}

func (s *CTEStorageRC) Reopen() (err error) {
if s.filterDup {
s.ht = newConcurrentMapHashTable()
}
if err = s.rc.Reset(); err != nil {
return err
}
s.iter = 0
s.begCh = make(chan struct{})
s.done = false
// Create a new RowContainer. Because some meta infos in old RowContainer are not resetted.
// Such as memTracker/actionSpill etc. So we just use a new one.
s.rc = chunk.NewRowContainer(s.tp, s.chkSize)
return nil
}

func (s *CTEStorageRC) NumChunks() int {
return s.rc.NumChunks()
}
Expand Down

0 comments on commit f492570

Please sign in to comment.