Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: setup mem tracker for CTE correctly (#54208) #54319

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 41 additions & 8 deletions executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,11 @@
e.producer.resTbl.Lock()
defer e.producer.resTbl.Unlock()
if !e.producer.resTbl.Done() {
if err = e.producer.produce(ctx, e); err != nil {
if err = e.producer.produce(ctx); err != nil {
return err
}
}
return e.producer.getChunk(ctx, e, req)
return e.producer.getChunk(e, req)
}

func setFirstErr(firstErr error, newErr error, msg string) error {
Expand Down Expand Up @@ -271,7 +271,7 @@
return
}

func (p *cteProducer) getChunk(ctx context.Context, cteExec *CTEExec, req *chunk.Chunk) (err error) {
func (p *cteProducer) getChunk(cteExec *CTEExec, req *chunk.Chunk) (err error) {
req.Reset()
if p.hasLimit {
return p.nextChunkLimit(cteExec, req)
Expand Down Expand Up @@ -334,15 +334,15 @@
return nil
}

func (p *cteProducer) produce(ctx context.Context, cteExec *CTEExec) (err error) {
func (p *cteProducer) produce(ctx context.Context) (err error) {
if p.resTbl.Error() != nil {
return p.resTbl.Error()
}
resAction := setupCTEStorageTracker(p.resTbl, cteExec.ctx, p.memTracker, p.diskTracker)
iterInAction := setupCTEStorageTracker(p.iterInTbl, cteExec.ctx, p.memTracker, p.diskTracker)
resAction := setupCTEStorageTracker(p.resTbl, p.ctx, p.memTracker, p.diskTracker)
iterInAction := setupCTEStorageTracker(p.iterInTbl, p.ctx, p.memTracker, p.diskTracker)
var iterOutAction *chunk.SpillDiskAction
if p.iterOutTbl != nil {
iterOutAction = setupCTEStorageTracker(p.iterOutTbl, cteExec.ctx, p.memTracker, p.diskTracker)
iterOutAction = setupCTEStorageTracker(p.iterOutTbl, p.ctx, p.memTracker, p.diskTracker)
}

failpoint.Inject("testCTEStorageSpill", func(val failpoint.Value) {
Expand Down Expand Up @@ -425,12 +425,29 @@
return
}

var iterNum uint64
for {
chk := tryNewCacheChunk(p.recursiveExec)
if err = Next(ctx, p.recursiveExec, chk); err != nil {
return
}
if chk.NumRows() == 0 {
if iterNum%1000 == 0 {
// To avoid too many logs.
p.logTbls(ctx, err, iterNum)
}
iterNum++
failpoint.Inject("assertIterTableSpillToDisk", func(maxIter failpoint.Value) {
if iterNum > 0 && iterNum < uint64(maxIter.(int)) && err == nil {
if p.iterInTbl.GetMemBytes() != 0 || p.iterInTbl.GetDiskBytes() == 0 ||
p.iterOutTbl.GetMemBytes() != 0 || p.iterOutTbl.GetDiskBytes() == 0 ||
p.resTbl.GetMemBytes() != 0 || p.resTbl.GetDiskBytes() == 0 {
p.logTbls(ctx, err, iterNum)
panic("assert row container spill disk failed")

Check warning on line 446 in executor/cte.go

View check run for this annotation

Codecov / codecov/patch

executor/cte.go#L441-L446

Added lines #L441 - L446 were not covered by tests
}
}
})

if err = p.setupTblsForNewIteration(); err != nil {
return
}
Expand Down Expand Up @@ -489,6 +506,8 @@
if err = p.iterInTbl.Reopen(); err != nil {
return err
}
setupCTEStorageTracker(p.iterInTbl, p.ctx, p.memTracker, p.diskTracker)

if p.isDistinct {
// Already deduplicated by resTbl, adding directly is ok.
for _, chk := range chks {
Expand All @@ -503,7 +522,11 @@
}

// Clear data in iterOutTbl.
return p.iterOutTbl.Reopen()
if err = p.iterOutTbl.Reopen(); err != nil {
return err
}

Check warning on line 527 in executor/cte.go

View check run for this annotation

Codecov / codecov/patch

executor/cte.go#L526-L527

Added lines #L526 - L527 were not covered by tests
setupCTEStorageTracker(p.iterOutTbl, p.ctx, p.memTracker, p.diskTracker)
return nil
}

func (p *cteProducer) reset() {
Expand Down Expand Up @@ -531,6 +554,8 @@
if p.isDistinct {
p.hashTbl = newConcurrentMapHashTable()
}
// Normally we need to setup tracker after calling Reopen(),
// But reopen resTbl means we need to call produce() again, it will setup tracker.
if err := p.resTbl.Reopen(); err != nil {
return err
}
Expand Down Expand Up @@ -735,3 +760,11 @@
}
return changed
}

func (p *cteProducer) logTbls(ctx context.Context, err error, iterNum uint64) {
logutil.Logger(ctx).Debug("cte iteration info",
zap.Any("iterInTbl mem usage", p.iterInTbl.GetMemBytes()), zap.Any("iterInTbl disk usage", p.iterInTbl.GetDiskBytes()),
zap.Any("iterOutTbl mem usage", p.iterOutTbl.GetMemBytes()), zap.Any("iterOutTbl disk usage", p.iterOutTbl.GetDiskBytes()),
zap.Any("resTbl mem usage", p.resTbl.GetMemBytes()), zap.Any("resTbl disk usage", p.resTbl.GetDiskBytes()),
zap.Any("resTbl rows", p.resTbl.NumRows()), zap.Any("iteration num", iterNum), zap.Error(err))
}
32 changes: 32 additions & 0 deletions executor/cte_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,3 +572,35 @@ func TestIssue46522(t *testing.T) {

tk.MustExec("commit;")
}

func TestCTEIterationMemTracker(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

insertStr := "insert into t1 values(0)"
rowNum := 1000
vals := make([]int, rowNum)
vals[0] = 0
for i := 1; i < rowNum; i++ {
v := rand.Intn(100)
vals[i] = v
insertStr += fmt.Sprintf(", (%d)", v)
}
tk.MustExec("use test;")
tk.MustExec("drop table if exists t1;")
tk.MustExec("create table t1(c1 int);")
tk.MustExec(insertStr)

tk.MustExec("set @@cte_max_recursion_depth=1000000")
tk.MustExec("set global tidb_mem_oom_action = 'log';")
defer func() {
tk.MustExec("set global tidb_mem_oom_action = default;")
}()
tk.MustExec("set @@tidb_mem_quota_query=10;")
maxIter := 5000
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/assertIterTableSpillToDisk", fmt.Sprintf("return(%d)", maxIter)))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/assertIterTableSpillToDisk"))
}()
tk.MustQuery(fmt.Sprintf("explain analyze with recursive cte1 as (select c1 from t1 union all select c1 + 1 c1 from cte1 where c1 < %d) select * from cte1", maxIter))
}
13 changes: 13 additions & 0 deletions util/cteutil/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ type Storage interface {
GetMemTracker() *memory.Tracker
GetDiskTracker() *disk.Tracker
ActionSpill() *chunk.SpillDiskAction

GetMemBytes() int64
GetDiskBytes() int64
}

// StorageRC implements Storage interface using RowContainer.
Expand Down Expand Up @@ -269,3 +272,13 @@ func (s *StorageRC) ActionSpillForTest() *chunk.SpillDiskAction {
func (s *StorageRC) valid() bool {
return s.refCnt > 0 && s.rc != nil
}

// GetMemBytes returns memory bytes used by row container.
func (s *StorageRC) GetMemBytes() int64 {
return s.rc.GetMemTracker().BytesConsumed()
}

// GetDiskBytes returns disk bytes used by row container.
func (s *StorageRC) GetDiskBytes() int64 {
return s.rc.GetDiskTracker().BytesConsumed()
}