Skip to content

Commit

Permalink
*: fix cte miss cleaning spilled-disk file (#44501)
Browse files Browse the repository at this point in the history
close #44477
  • Loading branch information
guo-shaoge authored Jun 8, 2023
1 parent 29dffb7 commit 437157e
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 36 deletions.
58 changes: 53 additions & 5 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,11 @@ func (a *recordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk {

func (a *recordSet) Close() error {
err := a.executor.Close()
a.stmt.CloseRecordSet(a.txnStartTS, a.lastErr)
return err
err1 := a.stmt.CloseRecordSet(a.txnStartTS, a.lastErr)
if err != nil {
return err
}
return err1
}

// OnFetchReturned implements commandLifeCycle#OnFetchReturned
Expand Down Expand Up @@ -742,6 +745,11 @@ func (a *ExecStmt) handleNoDelay(ctx context.Context, e Executor, isPessimistic
// `rs.Close` in `handleStmt`
if handled && sc != nil && rs == nil {
sc.DetachMemDiskTracker()
cteErr := resetCTEStorageMap(a.Ctx)
if err == nil {
// Only overwrite err when it's nil.
err = cteErr
}
}
}()

Expand Down Expand Up @@ -839,8 +847,7 @@ func (c *chunkRowRecordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk {
}

func (c *chunkRowRecordSet) Close() error {
c.execStmt.CloseRecordSet(c.execStmt.Ctx.GetSessionVars().TxnCtx.StartTS, nil)
return nil
return c.execStmt.CloseRecordSet(c.execStmt.Ctx.GetSessionVars().TxnCtx.StartTS, nil)
}

func (a *ExecStmt) handlePessimisticSelectForUpdate(ctx context.Context, e Executor) (_ sqlexec.RecordSet, retErr error) {
Expand Down Expand Up @@ -1412,10 +1419,51 @@ func (a *ExecStmt) checkPlanReplayerCapture(txnTS uint64) {
}

// CloseRecordSet will finish the execution of current statement and do some record work
func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) {
func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) error {
cteErr := resetCTEStorageMap(a.Ctx)
if cteErr != nil {
logutil.BgLogger().Error("got error when reset cte storage, should check if the spill disk file deleted or not", zap.Error(cteErr))
}
if lastErr == nil {
// Only overwrite err when it's nil.
lastErr = cteErr
}
a.FinishExecuteStmt(txnStartTS, lastErr, false)
a.logAudit()
a.Ctx.GetSessionVars().StmtCtx.DetachMemDiskTracker()
return cteErr
}

// Clean CTE storage shared by different CTEFullScan executor within a SQL stmt.
// Will return err in two situations:
// 1. Got err when remove disk spill file.
// 2. Some logical error like ref count of CTEStorage is less than 0.
func resetCTEStorageMap(se sessionctx.Context) error {
tmp := se.GetSessionVars().StmtCtx.CTEStorageMap
if tmp == nil {
// Close() is already called, so no need to reset. Such as TraceExec.
return nil
}
storageMap, ok := tmp.(map[int]*CTEStorages)
if !ok {
return errors.New("type assertion for CTEStorageMap failed")
}
for _, v := range storageMap {
v.ResTbl.Lock()
err1 := v.ResTbl.DerefAndClose()
// Make sure we do not hold the lock for longer than necessary.
v.ResTbl.Unlock()
// No need to lock IterInTbl.
err2 := v.IterInTbl.DerefAndClose()
if err1 != nil {
return err1
}
if err2 != nil {
return err2
}
}
se.GetSessionVars().StmtCtx.CTEStorageMap = nil
return nil
}

// LogSlowQuery is used to print the slow query in the log files.
Expand Down
15 changes: 15 additions & 0 deletions executor/cte_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,3 +470,18 @@ func TestCTEPanic(t *testing.T) {
require.Contains(t, err.Error(), fp)
require.NoError(t, failpoint.Disable(fpPathPrefix+fp))
}

func TestCTEDelSpillFile(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("drop table if exists t1, t2;")
tk.MustExec("create table t1(c1 int, c2 int);")
tk.MustExec("create table t2(c1 int);")
tk.MustExec("set @@cte_max_recursion_depth = 1000000;")
tk.MustExec("set global tidb_mem_oom_action = 'log';")
tk.MustExec("set @@tidb_mem_quota_query = 100;")
tk.MustExec("insert into t2 values(1);")
tk.MustExec("insert into t1 (c1, c2) with recursive cte1 as (select c1 from t2 union select cte1.c1 + 1 from cte1 where cte1.c1 < 100000) select cte1.c1, cte1.c1+1 from cte1;")
require.Nil(t, tk.Session().GetSessionVars().StmtCtx.CTEStorageMap)
}
8 changes: 8 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3502,6 +3502,7 @@ func TestUnreasonablyClose(t *testing.T) {
require.NotNil(t, p)

// This for loop level traverses the plan tree to get which operators are covered.
var hasCTE bool
for child := []plannercore.PhysicalPlan{p.(plannercore.PhysicalPlan)}; len(child) != 0; {
newChild := make([]plannercore.PhysicalPlan, 0, len(child))
for _, ch := range child {
Expand All @@ -3518,6 +3519,7 @@ func TestUnreasonablyClose(t *testing.T) {
case *plannercore.PhysicalCTE:
newChild = append(newChild, x.RecurPlan)
newChild = append(newChild, x.SeedPlan)
hasCTE = true
continue
case *plannercore.PhysicalShuffle:
newChild = append(newChild, x.DataSources...)
Expand All @@ -3529,6 +3531,12 @@ func TestUnreasonablyClose(t *testing.T) {
child = newChild
}

if hasCTE {
// Normally CTEStorages will be setup in ResetContextOfStmt.
// But the following case call e.Close() directly, instead of calling session.ExecStmt(), which calls ResetContextOfStmt.
// So need to setup CTEStorages manually.
tk.Session().GetSessionVars().StmtCtx.CTEStorageMap = map[int]*executor.CTEStorages{}
}
e := executorBuilder.Build(p)

func() {
Expand Down
31 changes: 0 additions & 31 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2455,40 +2455,9 @@ func (rs *execStmtResult) Close() error {
if err := rs.RecordSet.Close(); err != nil {
return finishStmt(context.Background(), se, err, rs.sql)
}
if err := resetCTEStorageMap(se); err != nil {
return finishStmt(context.Background(), se, err, rs.sql)
}
return finishStmt(context.Background(), se, nil, rs.sql)
}

func resetCTEStorageMap(se *session) error {
tmp := se.GetSessionVars().StmtCtx.CTEStorageMap
if tmp == nil {
// Close() is already called, so no need to reset. Such as TraceExec.
return nil
}
storageMap, ok := tmp.(map[int]*executor.CTEStorages)
if !ok {
return errors.New("type assertion for CTEStorageMap failed")
}
for _, v := range storageMap {
v.ResTbl.Lock()
err1 := v.ResTbl.DerefAndClose()
// Make sure we do not hold the lock for longer than necessary.
v.ResTbl.Unlock()
// No need to lock IterInTbl.
err2 := v.IterInTbl.DerefAndClose()
if err1 != nil {
return err1
}
if err2 != nil {
return err2
}
}
se.GetSessionVars().StmtCtx.CTEStorageMap = nil
return nil
}

// rollbackOnError makes sure the next statement starts a new transaction with the latest InfoSchema.
func (s *session) rollbackOnError(ctx context.Context) {
if !s.sessionVars.InTxn() {
Expand Down

0 comments on commit 437157e

Please sign in to comment.