diff --git a/executor/cte.go b/executor/cte.go index a5e063e9dc9ee..95921670cb5cf 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/cteutil" + "github.com/pingcap/tidb/util/disk" "github.com/pingcap/tidb/util/memory" ) @@ -77,6 +78,9 @@ type CTEExec struct { curIter int hCtx *hashContext sel []int + + memTracker *memory.Tracker + diskTracker *disk.Tracker } // Open implements the Executor interface. @@ -93,6 +97,11 @@ func (e *CTEExec) Open(ctx context.Context) (err error) { return err } + e.memTracker = memory.NewTracker(e.id, -1) + e.diskTracker = disk.NewTracker(e.id, -1) + e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) + e.diskTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.DiskTracker) + if e.recursiveExec != nil { if err = e.recursiveExec.Open(ctx); err != nil { return err @@ -103,7 +112,7 @@ func (e *CTEExec) Open(ctx context.Context) (err error) { return err } - setupCTEStorageTracker(e.iterOutTbl, e.ctx) + setupCTEStorageTracker(e.iterOutTbl, e.ctx, e.memTracker, e.diskTracker) } if e.isDistinct { @@ -126,8 +135,8 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { e.resTbl.Lock() if !e.resTbl.Done() { defer e.resTbl.Unlock() - resAction := setupCTEStorageTracker(e.resTbl, e.ctx) - iterInAction := setupCTEStorageTracker(e.iterInTbl, e.ctx) + resAction := setupCTEStorageTracker(e.resTbl, e.ctx, e.memTracker, e.diskTracker) + iterInAction := setupCTEStorageTracker(e.iterInTbl, e.ctx, e.memTracker, e.diskTracker) failpoint.Inject("testCTEStorageSpill", func(val failpoint.Value) { if val.(bool) && config.GetGlobalConfig().OOMUseTmpStorage { @@ -323,14 +332,15 @@ func (e *CTEExec) reopenTbls() (err error) { return e.iterInTbl.Reopen() } -func setupCTEStorageTracker(tbl cteutil.Storage, ctx sessionctx.Context) (actionSpill *chunk.SpillDiskAction) { +func setupCTEStorageTracker(tbl cteutil.Storage, ctx sessionctx.Context, parentMemTracker *memory.Tracker, + parentDiskTracker *disk.Tracker) (actionSpill *chunk.SpillDiskAction) { memTracker := tbl.GetMemTracker() memTracker.SetLabel(memory.LabelForCTEStorage) - memTracker.AttachTo(ctx.GetSessionVars().StmtCtx.MemTracker) + memTracker.AttachTo(parentMemTracker) diskTracker := tbl.GetDiskTracker() diskTracker.SetLabel(memory.LabelForCTEStorage) - diskTracker.AttachTo(ctx.GetSessionVars().StmtCtx.DiskTracker) + diskTracker.AttachTo(parentDiskTracker) if config.GetGlobalConfig().OOMUseTmpStorage { actionSpill = tbl.ActionSpill() diff --git a/executor/explain_test.go b/executor/explain_test.go index 8a0b062ef68cb..a0ce1a1eb2e0c 100644 --- a/executor/explain_test.go +++ b/executor/explain_test.go @@ -207,6 +207,7 @@ func (s *testSuite2) TestExplainAnalyzeExecutionInfo(c *C) { s.checkExecutionInfo(c, tk, "explain analyze select * from t") s.checkExecutionInfo(c, tk, "explain analyze select k from t use index(k)") s.checkExecutionInfo(c, tk, "explain analyze select * from t use index(k)") + s.checkExecutionInfo(c, tk, "explain analyze with recursive cte(a) as (select 1 union select a + 1 from cte where a < 1000) select * from cte;") tk.MustExec("CREATE TABLE IF NOT EXISTS nation ( N_NATIONKEY BIGINT NOT NULL,N_NAME CHAR(25) NOT NULL,N_REGIONKEY BIGINT NOT NULL,N_COMMENT VARCHAR(152),PRIMARY KEY (N_NATIONKEY));") tk.MustExec("CREATE TABLE IF NOT EXISTS part ( P_PARTKEY BIGINT NOT NULL,P_NAME VARCHAR(55) NOT NULL,P_MFGR CHAR(25) NOT NULL,P_BRAND CHAR(10) NOT NULL,P_TYPE VARCHAR(25) NOT NULL,P_SIZE BIGINT NOT NULL,P_CONTAINER CHAR(10) NOT NULL,P_RETAILPRICE DECIMAL(15,2) NOT NULL,P_COMMENT VARCHAR(23) NOT NULL,PRIMARY KEY (P_PARTKEY));") @@ -320,9 +321,33 @@ func (s *testSuite1) TestCheckActRowsWithUnistore(c *C) { sql: "select count(*) from t_unistore_act_rows group by b", expected: []string{"2", "2", "2", "4"}, }, + { + sql: "with cte(a) as (select a from t_unistore_act_rows) select (select 1 from cte limit 1) from cte;", + expected: []string{"4", "4", "4", "4", "4"}, + }, } for _, test := range tests { checkActRows(c, tk, test.sql, test.expected) } } + +func (s *testSuite2) TestExplainAnalyzeCTEMemoryAndDiskInfo(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a int)") + tk.MustExec("insert into t with recursive cte(a) as (select 1 union select a + 1 from cte where a < 1000) select * from cte;") + + rows := tk.MustQuery("explain analyze with recursive cte(a) as (select 1 union select a + 1 from cte where a < 1000)" + + " select * from cte, t;").Rows() + + c.Assert(rows[4][7].(string), Not(Equals), "N/A") + c.Assert(rows[4][8].(string), Equals, "0 Bytes") + + tk.MustExec("set @@tidb_mem_quota_query=10240;") + rows = tk.MustQuery("explain analyze with recursive cte(a) as (select 1 union select a + 1 from cte where a < 1000)" + + " select * from cte, t;").Rows() + + c.Assert(rows[4][7].(string), Not(Equals), "N/A") + c.Assert(rows[4][8].(string), Not(Equals), "N/A") +}