Skip to content

Commit

Permalink
*: fix cte miss cleaning spilled-disk file (#44501) (#44528)
Browse files Browse the repository at this point in the history
close #44477
  • Loading branch information
ti-chi-bot committed Jun 30, 2023
1 parent 96f648d commit 410df75
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 36 deletions.
56 changes: 51 additions & 5 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,11 @@ func (a *recordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk {

func (a *recordSet) Close() error {
err := a.executor.Close()
a.stmt.CloseRecordSet(a.txnStartTS, a.lastErr)
return err
err1 := a.stmt.CloseRecordSet(a.txnStartTS, a.lastErr)
if err != nil {
return err
}
return err1
}

// OnFetchReturned implements commandLifeCycle#OnFetchReturned
Expand Down Expand Up @@ -496,6 +499,13 @@ func (a *ExecStmt) handleNoDelay(ctx context.Context, e Executor, isPessimistic
if sc.DiskTracker != nil {
sc.DiskTracker.Detach()
}
if handled {
cteErr := resetCTEStorageMap(a.Ctx)
if err == nil {
// Only overwrite err when it's nil.
err = cteErr
}
}
}
}()

Expand Down Expand Up @@ -589,8 +599,7 @@ func (c *chunkRowRecordSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk {
}

func (c *chunkRowRecordSet) Close() error {
c.execStmt.CloseRecordSet(c.execStmt.Ctx.GetSessionVars().TxnCtx.StartTS, nil)
return nil
return c.execStmt.CloseRecordSet(c.execStmt.Ctx.GetSessionVars().TxnCtx.StartTS, nil)
}

func (a *ExecStmt) handlePessimisticSelectForUpdate(ctx context.Context, e Executor) (sqlexec.RecordSet, error) {
Expand Down Expand Up @@ -990,7 +999,11 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo
}

// CloseRecordSet will finish the execution of current statement and do some record work
func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) {
func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) error {
cteErr := resetCTEStorageMap(a.Ctx)
if cteErr != nil {
logutil.BgLogger().Error("got error when reset cte storage, should check if the spill disk file deleted or not", zap.Error(cteErr))
}
a.FinishExecuteStmt(txnStartTS, lastErr, false)
a.logAudit()
// Detach the Memory and disk tracker for the previous stmtCtx from GlobalMemoryUsageTracker and GlobalDiskUsageTracker
Expand All @@ -1002,6 +1015,39 @@ func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) {
stmtCtx.MemTracker.Detach()
}
}
return cteErr
}

// Clean CTE storage shared by different CTEFullScan executor within a SQL stmt.
// Will return err in two situations:
// 1. Got err when remove disk spill file.
// 2. Some logical error like ref count of CTEStorage is less than 0.
func resetCTEStorageMap(se sessionctx.Context) error {
tmp := se.GetSessionVars().StmtCtx.CTEStorageMap
if tmp == nil {
// Close() is already called, so no need to reset. Such as TraceExec.
return nil
}
storageMap, ok := tmp.(map[int]*CTEStorages)
if !ok {
return errors.New("type assertion for CTEStorageMap failed")
}
for _, v := range storageMap {
v.ResTbl.Lock()
err1 := v.ResTbl.DerefAndClose()
// Make sure we do not hold the lock for longer than necessary.
v.ResTbl.Unlock()
// No need to lock IterInTbl.
err2 := v.IterInTbl.DerefAndClose()
if err1 != nil {
return err1
}
if err2 != nil {
return err2
}
}
se.GetSessionVars().StmtCtx.CTEStorageMap = nil
return nil
}

// LogSlowQuery is used to print the slow query in the log files.
Expand Down
16 changes: 16 additions & 0 deletions executor/cte_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,3 +482,19 @@ func TestCTEPanic(t *testing.T) {
require.Contains(t, err.Error(), fp)
require.NoError(t, failpoint.Disable(fpPathPrefix+fp))
}

func TestCTEDelSpillFile(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("drop table if exists t1, t2;")
tk.MustExec("create table t1(c1 int, c2 int);")
tk.MustExec("create table t2(c1 int);")
tk.MustExec("set @@cte_max_recursion_depth = 1000000;")
tk.MustExec("set global tidb_mem_oom_action = 'log';")
tk.MustExec("set @@tidb_mem_quota_query = 100;")
tk.MustExec("insert into t2 values(1);")
tk.MustExec("insert into t1 (c1, c2) with recursive cte1 as (select c1 from t2 union select cte1.c1 + 1 from cte1 where cte1.c1 < 100000) select cte1.c1, cte1.c1+1 from cte1;")
require.Nil(t, tk.Session().GetSessionVars().StmtCtx.CTEStorageMap)
}
8 changes: 8 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3514,6 +3514,7 @@ func TestUnreasonablyClose(t *testing.T) {
require.NotNil(t, p)

// This for loop level traverses the plan tree to get which operators are covered.
var hasCTE bool
for child := []plannercore.PhysicalPlan{p.(plannercore.PhysicalPlan)}; len(child) != 0; {
newChild := make([]plannercore.PhysicalPlan, 0, len(child))
for _, ch := range child {
Expand All @@ -3530,6 +3531,7 @@ func TestUnreasonablyClose(t *testing.T) {
case *plannercore.PhysicalCTE:
newChild = append(newChild, x.RecurPlan)
newChild = append(newChild, x.SeedPlan)
hasCTE = true
continue
case *plannercore.PhysicalShuffle:
newChild = append(newChild, x.DataSources...)
Expand All @@ -3541,6 +3543,12 @@ func TestUnreasonablyClose(t *testing.T) {
child = newChild
}

if hasCTE {
// Normally CTEStorages will be setup in ResetContextOfStmt.
// But the following case call e.Close() directly, instead of calling session.ExecStmt(), which calls ResetContextOfStmt.
// So need to setup CTEStorages manually.
tk.Session().GetSessionVars().StmtCtx.CTEStorageMap = map[int]*executor.CTEStorages{}
}
e := executorBuilder.Build(p)

func() {
Expand Down
1 change: 1 addition & 0 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,7 @@ func TestSetTransactionInfoSchema(t *testing.T) {
defer tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int primary key);")

time.Sleep(100 * time.Millisecond)
schemaVer1 := tk.Session().GetInfoSchema().SchemaMetaVersion()
time1 := time.Now()
time.Sleep(100 * time.Millisecond)
Expand Down
31 changes: 0 additions & 31 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2170,40 +2170,9 @@ func (rs *execStmtResult) Close() error {
if err := rs.RecordSet.Close(); err != nil {
return finishStmt(context.Background(), se, err, rs.sql)
}
if err := resetCTEStorageMap(se); err != nil {
return finishStmt(context.Background(), se, err, rs.sql)
}
return finishStmt(context.Background(), se, nil, rs.sql)
}

func resetCTEStorageMap(se *session) error {
tmp := se.GetSessionVars().StmtCtx.CTEStorageMap
if tmp == nil {
// Close() is already called, so no need to reset. Such as TraceExec.
return nil
}
storageMap, ok := tmp.(map[int]*executor.CTEStorages)
if !ok {
return errors.New("type assertion for CTEStorageMap failed")
}
for _, v := range storageMap {
v.ResTbl.Lock()
err1 := v.ResTbl.DerefAndClose()
// Make sure we do not hold the lock for longer than necessary.
v.ResTbl.Unlock()
// No need to lock IterInTbl.
err2 := v.IterInTbl.DerefAndClose()
if err1 != nil {
return err1
}
if err2 != nil {
return err2
}
}
se.GetSessionVars().StmtCtx.CTEStorageMap = nil
return nil
}

// rollbackOnError makes sure the next statement starts a new transaction with the latest InfoSchema.
func (s *session) rollbackOnError(ctx context.Context) {
if !s.sessionVars.InTxn() {
Expand Down

0 comments on commit 410df75

Please sign in to comment.