Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#44501
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
guo-shaoge authored and ti-chi-bot committed Jun 8, 2023
1 parent 62e6501 commit b46327e
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 36 deletions.
66 changes: 61 additions & 5 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,11 @@ func (a *recordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk {

func (a *recordSet) Close() error {
err := a.executor.Close()
a.stmt.CloseRecordSet(a.txnStartTS, a.lastErr)
return err
err1 := a.stmt.CloseRecordSet(a.txnStartTS, a.lastErr)
if err != nil {
return err
}
return err1
}

// OnFetchReturned implements commandLifeCycle#OnFetchReturned
Expand Down Expand Up @@ -488,12 +491,21 @@ func (a *ExecStmt) handleNoDelay(ctx context.Context, e Executor, isPessimistic
// If the stmt have no rs like `insert`, The session tracker detachment will be directly
// done in the `defer` function. If the rs is not nil, the detachment will be done in
// `rs.Close` in `handleStmt`
<<<<<<< HEAD
if sc != nil && rs == nil {
if sc.MemTracker != nil {
sc.MemTracker.Detach()
}
if sc.DiskTracker != nil {
sc.DiskTracker.Detach()
=======
if handled && sc != nil && rs == nil {
sc.DetachMemDiskTracker()
cteErr := resetCTEStorageMap(a.Ctx)
if err == nil {
// Only overwrite err when it's nil.
err = cteErr
>>>>>>> 437157e7209 (*: fix cte miss cleaning spilled-disk file (#44501))
}
}
}()
Expand Down Expand Up @@ -588,8 +600,7 @@ func (c *chunkRowRecordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk {
}

func (c *chunkRowRecordSet) Close() error {
c.execStmt.CloseRecordSet(c.execStmt.Ctx.GetSessionVars().TxnCtx.StartTS, nil)
return nil
return c.execStmt.CloseRecordSet(c.execStmt.Ctx.GetSessionVars().TxnCtx.StartTS, nil)
}

func (a *ExecStmt) handlePessimisticSelectForUpdate(ctx context.Context, e Executor) (sqlexec.RecordSet, error) {
Expand Down Expand Up @@ -985,9 +996,18 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo
}

// CloseRecordSet will finish the execution of current statement and do some record work
func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) {
func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) error {
cteErr := resetCTEStorageMap(a.Ctx)
if cteErr != nil {
logutil.BgLogger().Error("got error when reset cte storage, should check if the spill disk file deleted or not", zap.Error(cteErr))
}
if lastErr == nil {
// Only overwrite err when it's nil.
lastErr = cteErr
}
a.FinishExecuteStmt(txnStartTS, lastErr, false)
a.logAudit()
<<<<<<< HEAD
// Detach the Memory and disk tracker for the previous stmtCtx from GlobalMemoryUsageTracker and GlobalDiskUsageTracker
if stmtCtx := a.Ctx.GetSessionVars().StmtCtx; stmtCtx != nil {
if stmtCtx.DiskTracker != nil {
Expand All @@ -997,6 +1017,42 @@ func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) {
stmtCtx.MemTracker.Detach()
}
}
=======
a.Ctx.GetSessionVars().StmtCtx.DetachMemDiskTracker()
return cteErr
}

// Clean CTE storage shared by different CTEFullScan executor within a SQL stmt.
// Will return err in two situations:
// 1. Got err when remove disk spill file.
// 2. Some logical error like ref count of CTEStorage is less than 0.
func resetCTEStorageMap(se sessionctx.Context) error {
tmp := se.GetSessionVars().StmtCtx.CTEStorageMap
if tmp == nil {
// Close() is already called, so no need to reset. Such as TraceExec.
return nil
}
storageMap, ok := tmp.(map[int]*CTEStorages)
if !ok {
return errors.New("type assertion for CTEStorageMap failed")
}
for _, v := range storageMap {
v.ResTbl.Lock()
err1 := v.ResTbl.DerefAndClose()
// Make sure we do not hold the lock for longer than necessary.
v.ResTbl.Unlock()
// No need to lock IterInTbl.
err2 := v.IterInTbl.DerefAndClose()
if err1 != nil {
return err1
}
if err2 != nil {
return err2
}
}
se.GetSessionVars().StmtCtx.CTEStorageMap = nil
return nil
>>>>>>> 437157e7209 (*: fix cte miss cleaning spilled-disk file (#44501))
}

// LogSlowQuery is used to print the slow query in the log files.
Expand Down
15 changes: 15 additions & 0 deletions executor/cte_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,3 +482,18 @@ func TestCTEPanic(t *testing.T) {
require.Contains(t, err.Error(), fp)
require.NoError(t, failpoint.Disable(fpPathPrefix+fp))
}

func TestCTEDelSpillFile(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("drop table if exists t1, t2;")
tk.MustExec("create table t1(c1 int, c2 int);")
tk.MustExec("create table t2(c1 int);")
tk.MustExec("set @@cte_max_recursion_depth = 1000000;")
tk.MustExec("set global tidb_mem_oom_action = 'log';")
tk.MustExec("set @@tidb_mem_quota_query = 100;")
tk.MustExec("insert into t2 values(1);")
tk.MustExec("insert into t1 (c1, c2) with recursive cte1 as (select c1 from t2 union select cte1.c1 + 1 from cte1 where cte1.c1 < 100000) select cte1.c1, cte1.c1+1 from cte1;")
require.Nil(t, tk.Session().GetSessionVars().StmtCtx.CTEStorageMap)
}
8 changes: 8 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3513,6 +3513,7 @@ func TestUnreasonablyClose(t *testing.T) {
require.NotNil(t, p)

// This for loop level traverses the plan tree to get which operators are covered.
var hasCTE bool
for child := []plannercore.PhysicalPlan{p.(plannercore.PhysicalPlan)}; len(child) != 0; {
newChild := make([]plannercore.PhysicalPlan, 0, len(child))
for _, ch := range child {
Expand All @@ -3529,6 +3530,7 @@ func TestUnreasonablyClose(t *testing.T) {
case *plannercore.PhysicalCTE:
newChild = append(newChild, x.RecurPlan)
newChild = append(newChild, x.SeedPlan)
hasCTE = true
continue
case *plannercore.PhysicalShuffle:
newChild = append(newChild, x.DataSources...)
Expand All @@ -3540,6 +3542,12 @@ func TestUnreasonablyClose(t *testing.T) {
child = newChild
}

if hasCTE {
// Normally CTEStorages will be setup in ResetContextOfStmt.
// But the following case call e.Close() directly, instead of calling session.ExecStmt(), which calls ResetContextOfStmt.
// So need to setup CTEStorages manually.
tk.Session().GetSessionVars().StmtCtx.CTEStorageMap = map[int]*executor.CTEStorages{}
}
e := executorBuilder.Build(p)

func() {
Expand Down
31 changes: 0 additions & 31 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2153,40 +2153,9 @@ func (rs *execStmtResult) Close() error {
if err := rs.RecordSet.Close(); err != nil {
return finishStmt(context.Background(), se, err, rs.sql)
}
if err := resetCTEStorageMap(se); err != nil {
return finishStmt(context.Background(), se, err, rs.sql)
}
return finishStmt(context.Background(), se, nil, rs.sql)
}

func resetCTEStorageMap(se *session) error {
tmp := se.GetSessionVars().StmtCtx.CTEStorageMap
if tmp == nil {
// Close() is already called, so no need to reset. Such as TraceExec.
return nil
}
storageMap, ok := tmp.(map[int]*executor.CTEStorages)
if !ok {
return errors.New("type assertion for CTEStorageMap failed")
}
for _, v := range storageMap {
v.ResTbl.Lock()
err1 := v.ResTbl.DerefAndClose()
// Make sure we do not hold the lock for longer than necessary.
v.ResTbl.Unlock()
// No need to lock IterInTbl.
err2 := v.IterInTbl.DerefAndClose()
if err1 != nil {
return err1
}
if err2 != nil {
return err2
}
}
se.GetSessionVars().StmtCtx.CTEStorageMap = nil
return nil
}

// rollbackOnError makes sure the next statement starts a new transaction with the latest InfoSchema.
func (s *session) rollbackOnError(ctx context.Context) {
if !s.sessionVars.InTxn() {
Expand Down

0 comments on commit b46327e

Please sign in to comment.