Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: wrong execution when CTE meet non-correlated subquery #44054

Merged
merged 5 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions cmd/explaintest/r/cte.result
Original file line number Diff line number Diff line change
Expand Up @@ -564,19 +564,18 @@ create table tpk1(c1 int primary key);
insert into tpk1 values(1), (2), (3);
explain with cte1 as (select c1 from tpk) select /*+ merge_join(dt1, dt2) */ * from tpk1 dt1 inner join cte1 dt2 inner join cte1 dt3 on dt1.c1 = dt2.c1 and dt2.c1 = dt3.c1;
id estRows task access object operator info
HashJoin_20 10000.00 root inner join, equal:[eq(test.tpk.c1, test.tpk.c1)]
├─Selection_29(Build) 6400.00 root not(isnull(test.tpk.c1))
│ └─CTEFullScan_30 8000.00 root CTE:cte1 AS dt3 data:CTE_0
└─MergeJoin_22(Probe) 8000.00 root inner join, left key:test.tpk1.c1, right key:test.tpk.c1
├─Sort_28(Build) 6400.00 root test.tpk.c1
│ └─Selection_26 6400.00 root not(isnull(test.tpk.c1))
│ └─CTEFullScan_27 8000.00 root CTE:cte1 AS dt2 data:CTE_0
└─TableReader_24(Probe) 10000.00 root data:TableFullScan_23
└─TableFullScan_23 10000.00 cop[tikv] table:dt1 keep order:true, stats:pseudo
CTE_0 8000.00 root Non-Recursive CTE
└─Selection_13(Seed Part) 8000.00 root or(not(isnull(test.tpk.c1)), not(isnull(test.tpk.c1)))
└─TableReader_16 10000.00 root data:TableFullScan_15
└─TableFullScan_15 10000.00 cop[tikv] table:tpk keep order:false, stats:pseudo
HashJoin_19 12500.00 root inner join, equal:[eq(test.tpk.c1, test.tpk.c1)]
├─Selection_28(Build) 8000.00 root not(isnull(test.tpk.c1))
│ └─CTEFullScan_29 10000.00 root CTE:cte1 AS dt3 data:CTE_0
└─MergeJoin_21(Probe) 10000.00 root inner join, left key:test.tpk1.c1, right key:test.tpk.c1
├─Sort_27(Build) 8000.00 root test.tpk.c1
│ └─Selection_25 8000.00 root not(isnull(test.tpk.c1))
│ └─CTEFullScan_26 10000.00 root CTE:cte1 AS dt2 data:CTE_0
└─TableReader_23(Probe) 10000.00 root data:TableFullScan_22
└─TableFullScan_22 10000.00 cop[tikv] table:dt1 keep order:true, stats:pseudo
CTE_0 10000.00 root Non-Recursive CTE
└─TableReader_15(Seed Part) 10000.00 root data:TableFullScan_14
└─TableFullScan_14 10000.00 cop[tikv] table:tpk keep order:false, stats:pseudo
with cte1 as (select c1 from tpk) select /*+ merge_join(dt1, dt2) */ * from tpk1 dt1 inner join cte1 dt2 inner join cte1 dt3 on dt1.c1 = dt2.c1 and dt2.c1 = dt3.c1;
c1 c1 c1
1 1 1
Expand Down
43 changes: 20 additions & 23 deletions cmd/explaintest/r/explain_cte.result
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,11 @@ CTE_0 2.00 root Recursive CTE
└─CTETable_17 1.00 root Scan on CTE_0
explain with cte(a) as (with recursive cte1(a) as (select 1 union select a + 1 from cte1 where a < 10) select * from cte1) select * from cte t1, cte t2;
id estRows task access object operator info
HashJoin_26 2.56 root CARTESIAN inner join
├─CTEFullScan_29(Build) 1.60 root CTE:cte AS t2 data:CTE_0
└─CTEFullScan_28(Probe) 1.60 root CTE:cte AS t1 data:CTE_0
CTE_0 1.60 root Non-Recursive CTE
└─Selection_21(Seed Part) 1.60 root 1
└─CTEFullScan_23 2.00 root CTE:cte1 data:CTE_1
HashJoin_25 4.00 root CARTESIAN inner join
├─CTEFullScan_28(Build) 2.00 root CTE:cte AS t2 data:CTE_0
└─CTEFullScan_27(Probe) 2.00 root CTE:cte AS t1 data:CTE_0
CTE_0 2.00 root Non-Recursive CTE
└─CTEFullScan_22(Seed Part) 2.00 root CTE:cte1 data:CTE_1
CTE_1 2.00 root Recursive CTE
├─Projection_16(Seed Part) 1.00 root 1->Column#2
│ └─TableDual_17 1.00 root rows:1
Expand All @@ -91,15 +90,13 @@ CTE_1 8001.00 root Recursive CTE
└─CTETable_33 10000.00 root Scan on CTE_1
explain with q(a,b) as (select * from t1) select /*+ merge(q) no_merge(q1) */ * from q, q q1 where q.a=1 and q1.a=2;
id estRows task access object operator info
HashJoin_19 40960000.00 root CARTESIAN inner join
├─Selection_23(Build) 6400.00 root eq(test.t1.c1, 2)
│ └─CTEFullScan_24 8000.00 root CTE:q AS q1 data:CTE_0
└─Selection_21(Probe) 6400.00 root eq(test.t1.c1, 1)
└─CTEFullScan_22 8000.00 root CTE:q data:CTE_0
CTE_0 8000.00 root Non-Recursive CTE
└─Selection_11(Seed Part) 8000.00 root or(eq(test.t1.c1, 1), eq(test.t1.c1, 2))
└─TableReader_14 10000.00 root data:TableFullScan_13
└─TableFullScan_13 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
HashJoin_15 2.56 root CARTESIAN inner join
├─Selection_19(Build) 1.60 root eq(test.t1.c1, 2)
│ └─CTEFullScan_20 2.00 root CTE:q AS q1 data:CTE_0
└─Selection_17(Probe) 1.60 root eq(test.t1.c1, 1)
└─CTEFullScan_18 2.00 root CTE:q data:CTE_0
CTE_0 2.00 root Non-Recursive CTE
└─Batch_Point_Get_12(Seed Part) 2.00 root table:t1 handle:[1 2], keep order:false, desc:false
explain with recursive cte(a,b) as (select 1, concat('a', 1) union select a+1, concat(b, 1) from cte where a < 5) select * from cte;
id estRows task access object operator info
CTEFullScan_17 2.00 root CTE:cte data:CTE_0
Expand Down Expand Up @@ -458,14 +455,14 @@ select v1.tps v1_tps,v2.tps v2_tps
from version1 v1, version2 v2
where v1.bench_type =v2.bench_type;
id estRows task access object operator info
HashJoin 8000.00 root inner join, equal:[eq(test.t1.bench_type, test.t1.bench_type)]
├─Selection(Build) 6400.00 root eq(test.t1.version, "6.0.0"), not(isnull(test.t1.bench_type))
│ └─CTEFullScan 8000.00 root CTE:all_data data:CTE_0
└─Selection(Probe) 6400.00 root eq(test.t1.version, "5.4.0"), not(isnull(test.t1.bench_type))
└─CTEFullScan 8000.00 root CTE:all_data data:CTE_0
CTE_0 8000.00 root Non-Recursive CTE
└─Selection(Seed Part) 8000.00 root or(and(eq(test.t1.version, "5.4.0"), not(isnull(test.t1.bench_type))), and(eq(test.t1.version, "6.0.0"), not(isnull(test.t1.bench_type))))
└─TableReader 10000.00 root data:TableFullScan
HashJoin 19.97 root inner join, equal:[eq(test.t1.bench_type, test.t1.bench_type)]
├─Selection(Build) 15.98 root eq(test.t1.version, "6.0.0"), not(isnull(test.t1.bench_type))
│ └─CTEFullScan 19.97 root CTE:all_data data:CTE_0
└─Selection(Probe) 15.98 root eq(test.t1.version, "5.4.0"), not(isnull(test.t1.bench_type))
└─CTEFullScan 19.97 root CTE:all_data data:CTE_0
CTE_0 19.97 root Non-Recursive CTE
└─TableReader(Seed Part) 19.97 root data:Selection
└─Selection 19.97 cop[tikv] or(and(eq(test.t1.version, "5.4.0"), not(isnull(test.t1.bench_type))), and(eq(test.t1.version, "6.0.0"), not(isnull(test.t1.bench_type))))
└─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
drop table if exists tbl;
create table tbl (id int);
Expand Down
2 changes: 1 addition & 1 deletion executor/explain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func TestCheckActRowsWithUnistore(t *testing.T) {
},
{
sql: "with cte(a) as (select a from t_unistore_act_rows) select (select 1 from cte limit 1) from cte;",
expected: []string{"4", "4", "4", "4", "4"},
expected: []string{"4", "1", "1", "1", "4", "4", "4", "4", "4"},
},
{
sql: "select a, row_number() over (partition by b) from t_unistore_act_rows;",
Expand Down
14 changes: 13 additions & 1 deletion planner/core/expression_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,7 @@ func (er *expressionRewriter) handleScalarSubquery(ctx context.Context, v *ast.S
noDecorrelate = false
}

if er.b.disableSubQueryPreprocessing || len(ExtractCorrelatedCols4LogicalPlan(np)) > 0 {
if er.b.disableSubQueryPreprocessing || len(ExtractCorrelatedCols4LogicalPlan(np)) > 0 || hasCTEConsumerInSubPlan(np) {
er.p = er.b.buildApplyWithJoinType(er.p, np, LeftOuterJoin, noDecorrelate)
if np.Schema().Len() > 1 {
newCols := make([]expression.Expression, 0, np.Schema().Len())
Expand Down Expand Up @@ -1111,6 +1111,18 @@ func (er *expressionRewriter) handleScalarSubquery(ctx context.Context, v *ast.S
return v, true
}

func hasCTEConsumerInSubPlan(p LogicalPlan) bool {
if _, ok := p.(*LogicalCTE); ok {
return true
}
for _, child := range p.Children() {
if hasCTEConsumerInSubPlan(child) {
return true
}
}
return false
}

func initConstantRepertoire(c *expression.Constant) {
c.SetRepertoire(expression.ASCII)
if c.GetType().EvalType() == types.ETString {
Expand Down
2 changes: 1 addition & 1 deletion planner/core/issuetest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 3,
shard_count = 4,
deps = [
"//parser",
"//planner",
Expand Down
14 changes: 14 additions & 0 deletions planner/core/issuetest/planner_issue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,17 @@ func TestIssue43645(t *testing.T) {
rs := tk.MustQuery("WITH tmp AS (SELECT t2.* FROM t2) select (SELECT tmp.col1 FROM tmp WHERE tmp.id=t1.id ) col1, (SELECT tmp.col2 FROM tmp WHERE tmp.id=t1.id ) col2, (SELECT tmp.col3 FROM tmp WHERE tmp.id=t1.id ) col3 from t1;")
rs.Sort().Check(testkit.Rows("a aa aaa", "b bb bbb", "c cc ccc"))
}

func TestIssue44051(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("CREATE TABLE t1(id int,col1 varchar(10),col2 varchar(10),col3 varchar(10));")
tk.MustExec("CREATE TABLE t2(id int,col1 varchar(10),col2 varchar(10),col3 varchar(10));")
tk.MustExec("INSERT INTO t1 values(1,NULL,NULL,null),(2,NULL,NULL,null),(3,NULL,NULL,null);")
tk.MustExec("INSERT INTO t2 values(1,'a','aa','aaa'),(2,'b','bb','bbb'),(3,'c','cc','ccc');")

rs := tk.MustQuery("WITH tmp AS (SELECT t2.* FROM t2) SELECT * FROM t1 WHERE t1.id = (select id from tmp where id = 1) or t1.id = (select id from tmp where id = 2) or t1.id = (select id from tmp where id = 3)")
rs.Sort().Check(testkit.Rows("1 <nil> <nil> <nil>", "2 <nil> <nil> <nil>", "3 <nil> <nil> <nil>"))
}
20 changes: 18 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,12 +975,28 @@ func (b *PlanBuilder) buildSet(ctx context.Context, v *ast.SetStmt) (Plan, error
char, col := b.ctx.GetSessionVars().GetCharsetInfo()
vars.Value = ast.NewValueExpr(cn.Name.Name.O, char, col)
}
mockTablePlan := LogicalTableDual{}.Init(b.ctx, b.getSelectOffset())
mockTablePlan := LogicalTableDual{RowCount: 1}.Init(b.ctx, b.getSelectOffset())
var err error
assign.Expr, _, err = b.rewrite(ctx, vars.Value, mockTablePlan, nil, true)
var possiblePlan LogicalPlan
assign.Expr, possiblePlan, err = b.rewrite(ctx, vars.Value, mockTablePlan, nil, true)
if err != nil {
return nil, err
}
if _, ok := possiblePlan.(*LogicalTableDual); !ok {
physicalPlan, _, err := DoOptimize(ctx, b.ctx, b.optFlag, possiblePlan)
if err != nil {
return nil, err
}
row, err := EvalSubqueryFirstRow(ctx, physicalPlan, b.is, b.ctx)
if err != nil {
return nil, err
}
constant := &expression.Constant{
Value: row[0],
RetType: assign.Expr.GetType(),
}
assign.Expr = constant
}
} else {
assign.IsDefault = true
}
Expand Down
2 changes: 1 addition & 1 deletion planner/core/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,7 @@ func (p *LogicalCTE) DeriveStats(_ []*property.StatsInfo, selfSchema *expression
newSel.SetChildren(p.cte.seedPartLogicalPlan)
p.cte.seedPartLogicalPlan = newSel
}
p.cte.seedPartPhysicalPlan, _, err = DoOptimize(context.TODO(), p.ctx, p.cte.optFlag, p.cte.seedPartLogicalPlan)
p.cte.seedPartPhysicalPlan, _, err = DoOptimize(context.TODO(), p.ctx, p.cte.optFlag|flagPredicatePushDown, p.cte.seedPartLogicalPlan)
if err != nil {
return nil, err
}
Expand Down