Skip to content

Commit

Permalink
executor: support explain analyze for CTE statement (#25023)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiongjiwei authored Jun 2, 2021
1 parent 6c44ec2 commit 7c3e036
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 6 deletions.
22 changes: 16 additions & 6 deletions executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/cteutil"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/memory"
)

Expand Down Expand Up @@ -77,6 +78,9 @@ type CTEExec struct {
curIter int
hCtx *hashContext
sel []int

memTracker *memory.Tracker
diskTracker *disk.Tracker
}

// Open implements the Executor interface.
Expand All @@ -93,6 +97,11 @@ func (e *CTEExec) Open(ctx context.Context) (err error) {
return err
}

e.memTracker = memory.NewTracker(e.id, -1)
e.diskTracker = disk.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
e.diskTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.DiskTracker)

if e.recursiveExec != nil {
if err = e.recursiveExec.Open(ctx); err != nil {
return err
Expand All @@ -103,7 +112,7 @@ func (e *CTEExec) Open(ctx context.Context) (err error) {
return err
}

setupCTEStorageTracker(e.iterOutTbl, e.ctx)
setupCTEStorageTracker(e.iterOutTbl, e.ctx, e.memTracker, e.diskTracker)
}

if e.isDistinct {
Expand All @@ -126,8 +135,8 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
e.resTbl.Lock()
if !e.resTbl.Done() {
defer e.resTbl.Unlock()
resAction := setupCTEStorageTracker(e.resTbl, e.ctx)
iterInAction := setupCTEStorageTracker(e.iterInTbl, e.ctx)
resAction := setupCTEStorageTracker(e.resTbl, e.ctx, e.memTracker, e.diskTracker)
iterInAction := setupCTEStorageTracker(e.iterInTbl, e.ctx, e.memTracker, e.diskTracker)

failpoint.Inject("testCTEStorageSpill", func(val failpoint.Value) {
if val.(bool) && config.GetGlobalConfig().OOMUseTmpStorage {
Expand Down Expand Up @@ -323,14 +332,15 @@ func (e *CTEExec) reopenTbls() (err error) {
return e.iterInTbl.Reopen()
}

func setupCTEStorageTracker(tbl cteutil.Storage, ctx sessionctx.Context) (actionSpill *chunk.SpillDiskAction) {
func setupCTEStorageTracker(tbl cteutil.Storage, ctx sessionctx.Context, parentMemTracker *memory.Tracker,
parentDiskTracker *disk.Tracker) (actionSpill *chunk.SpillDiskAction) {
memTracker := tbl.GetMemTracker()
memTracker.SetLabel(memory.LabelForCTEStorage)
memTracker.AttachTo(ctx.GetSessionVars().StmtCtx.MemTracker)
memTracker.AttachTo(parentMemTracker)

diskTracker := tbl.GetDiskTracker()
diskTracker.SetLabel(memory.LabelForCTEStorage)
diskTracker.AttachTo(ctx.GetSessionVars().StmtCtx.DiskTracker)
diskTracker.AttachTo(parentDiskTracker)

if config.GetGlobalConfig().OOMUseTmpStorage {
actionSpill = tbl.ActionSpill()
Expand Down
25 changes: 25 additions & 0 deletions executor/explain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func (s *testSuite2) TestExplainAnalyzeExecutionInfo(c *C) {
s.checkExecutionInfo(c, tk, "explain analyze select * from t")
s.checkExecutionInfo(c, tk, "explain analyze select k from t use index(k)")
s.checkExecutionInfo(c, tk, "explain analyze select * from t use index(k)")
s.checkExecutionInfo(c, tk, "explain analyze with recursive cte(a) as (select 1 union select a + 1 from cte where a < 1000) select * from cte;")

tk.MustExec("CREATE TABLE IF NOT EXISTS nation ( N_NATIONKEY BIGINT NOT NULL,N_NAME CHAR(25) NOT NULL,N_REGIONKEY BIGINT NOT NULL,N_COMMENT VARCHAR(152),PRIMARY KEY (N_NATIONKEY));")
tk.MustExec("CREATE TABLE IF NOT EXISTS part ( P_PARTKEY BIGINT NOT NULL,P_NAME VARCHAR(55) NOT NULL,P_MFGR CHAR(25) NOT NULL,P_BRAND CHAR(10) NOT NULL,P_TYPE VARCHAR(25) NOT NULL,P_SIZE BIGINT NOT NULL,P_CONTAINER CHAR(10) NOT NULL,P_RETAILPRICE DECIMAL(15,2) NOT NULL,P_COMMENT VARCHAR(23) NOT NULL,PRIMARY KEY (P_PARTKEY));")
Expand Down Expand Up @@ -320,9 +321,33 @@ func (s *testSuite1) TestCheckActRowsWithUnistore(c *C) {
sql: "select count(*) from t_unistore_act_rows group by b",
expected: []string{"2", "2", "2", "4"},
},
{
sql: "with cte(a) as (select a from t_unistore_act_rows) select (select 1 from cte limit 1) from cte;",
expected: []string{"4", "4", "4", "4", "4"},
},
}

for _, test := range tests {
checkActRows(c, tk, test.sql, test.expected)
}
}

func (s *testSuite2) TestExplainAnalyzeCTEMemoryAndDiskInfo(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int)")
tk.MustExec("insert into t with recursive cte(a) as (select 1 union select a + 1 from cte where a < 1000) select * from cte;")

rows := tk.MustQuery("explain analyze with recursive cte(a) as (select 1 union select a + 1 from cte where a < 1000)" +
" select * from cte, t;").Rows()

c.Assert(rows[4][7].(string), Not(Equals), "N/A")
c.Assert(rows[4][8].(string), Equals, "0 Bytes")

tk.MustExec("set @@tidb_mem_quota_query=10240;")
rows = tk.MustQuery("explain analyze with recursive cte(a) as (select 1 union select a + 1 from cte where a < 1000)" +
" select * from cte, t;").Rows()

c.Assert(rows[4][7].(string), Not(Equals), "N/A")
c.Assert(rows[4][8].(string), Not(Equals), "N/A")
}

0 comments on commit 7c3e036

Please sign in to comment.