Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support to execute CTE on MPP side #42296

Merged
merged 36 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
3f91dd3
tmp
winoros Feb 15, 2023
95375d7
fix the mpp prop
winoros Feb 22, 2023
e50927c
Merge branch 'master' into add-sequence-operator
winoros Feb 26, 2023
6285fb0
Merge branch 'master' into add-sequence-operator
winoros Feb 27, 2023
5584c9a
planner: support sending cte mpp task
winoros Mar 1, 2023
6044c08
fix panics
winoros Mar 1, 2023
6c69b8a
some codes updates
winoros Mar 8, 2023
d6c27bf
update the codes
winoros Mar 8, 2023
9133d92
change style and clean
winoros Mar 15, 2023
2dc1de9
Merge branch 'master' into add-sequence-operator
winoros Mar 15, 2023
6c81152
clean the debugging info, make it ready for review
winoros Mar 15, 2023
380b4d1
Merge branch 'master' into add-sequence-operator
winoros Mar 28, 2023
e4010a8
push sequence down
winoros Apr 4, 2023
699e39d
Merge remote-tracking branch 'origin/master' into add-sequence-operator
winoros Apr 4, 2023
521d66d
Apply suggestions from code review
winoros Apr 18, 2023
0b5244c
Merge branch 'master' into add-sequence-operator
winoros Apr 18, 2023
3f2418a
fix tests
winoros Apr 18, 2023
671ef17
address comments && add tests
winoros Apr 23, 2023
863890e
Merge branch 'master' into add-sequence-operator
winoros Apr 23, 2023
7e918fc
fix gofmt
winoros Apr 23, 2023
2341c88
fix check && fix test
winoros Apr 23, 2023
fa4ecaf
fix the cte producer status and add tests
winoros Apr 24, 2023
a924eef
Merge branch 'master' into add-sequence-operator
winoros Apr 25, 2023
f5d4303
address comments
winoros Apr 25, 2023
4657c67
merge the methods
winoros Apr 27, 2023
1c8994f
Merge branch 'master' into add-sequence-operator
winoros May 9, 2023
e5879cd
fix the aggregation's bad case
winoros May 10, 2023
af5c066
Merge branch 'master' into add-sequence-operator
winoros May 10, 2023
98868c0
Merge branch 'master' into add-sequence-operator
winoros May 17, 2023
84e8ac8
address comments
winoros May 17, 2023
9004ef0
fix bazel_prepare
winoros May 17, 2023
9088ad4
remove debug log
winoros May 18, 2023
06f4b3b
address comments
winoros May 23, 2023
f7026cd
Merge branch 'master' into add-sequence-operator
winoros May 23, 2023
51a1178
fix lint
winoros May 23, 2023
34864c8
fix tests
winoros May 24, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3410,8 +3410,9 @@ def go_deps():
name = "com_github_pingcap_tipb",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/tipb",
sum = "h1:CeeMOq1aHPAhXrw4eYXtQRyWOFlbfqK1+3f9Iop4IfU=",
version = "v0.0.0-20230310043643-5362260ee6f7",
replace = "github.com/pingcap/tipb",
sum = "h1:dWYwAeMc0QQtWjVrL/mNx/3yTm8wcmcFHw4sHm8EXfU=",
version = "v0.0.0-20230418145421-de2cb4e699e9",
)
go_repository(
name = "com_github_pkg_browser",
Expand Down
2 changes: 1 addition & 1 deletion cmd/explaintest/r/explain_cte.result
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ CTE_0 100.00 root Non-Recursive CTE
└─IndexFullScan_31 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:false, stats:pseudo
explain with recursive cte1(c1) as (select c1 from t1 union select c1 from t2 limit 0 offset 0) select * from cte1;
id estRows task access object operator info
CTEFullScan_18 0.00 root CTE:cte1 data:CTE_0
CTEFullScan_19 0.00 root CTE:cte1 data:CTE_0
CTE_0 0.00 root Non-Recursive CTE
└─TableDual_16(Seed Part) 0.00 root rows:0
CREATE TABLE `customer` (
Expand Down
5 changes: 2 additions & 3 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,12 +572,11 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr
}
if hasExecutor {
var recorededPlanIDs = make(map[int]int)
for i, detail := range r.selectResp.GetExecutionSummaries() {
for _, detail := range r.selectResp.GetExecutionSummaries() {
if detail != nil && detail.TimeProcessedNs != nil &&
detail.NumProducedRows != nil && detail.NumIterations != nil {
planID := r.copPlanIDs[i]
recorededPlanIDs[r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.
RecordOneCopTask(planID, r.storeType.Name(), callee, detail)] = 0
RecordOneCopTask(-1, r.storeType.Name(), callee, detail)] = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change the old here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's emmm...
I debugged a lot of time but could see the reason why the original codes panicked. But passing -1 will always work.

}
}
num := uint64(0)
Expand Down
1 change: 1 addition & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ go_library(
"@com_github_pingcap_kvproto//pkg/encryptionpb",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_kvproto//pkg/mpp",
"@com_github_pingcap_kvproto//pkg/resource_manager",
"@com_github_pingcap_kvproto//pkg/tikvpb",
"@com_github_pingcap_log//:log",
Expand Down
88 changes: 88 additions & 0 deletions executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ package executor

import (
"context"
"fmt"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/mpp"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -99,6 +101,10 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error {
if err != nil {
return errors.Trace(err)
}
err = e.fixTaskForCTEStorageAndReader(dagReq.RootExecutor, mppTask.Meta)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why should we align the address after pb is generated

Copy link
Member Author

@winoros winoros Apr 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because each task of the CTE producer/consumer needs different upstream/downstream compared with the ones in the same fragment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider that there're two producer tasks A,B. You know that A and B's upstream consumer task is different.
In current fragments generation. We just use one struct to represent one fragment. So before we send the task. A&B shares the same upstream task address(The whole address list of the upstream consumer fragments).
Hence, we need to set them again here.

if err != nil {
return err
}
pbData, err := dagReq.Marshal()
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -127,6 +133,88 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error {
return nil
}

// fixTaskForCTEStorageAndReader fixes the upstream/downstream tasks for the producers and consumers.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please write some meaningful comments.

// After we split the fragments. A CTE producer in the fragment will holds all the task address of the consumers.
// For example, the producer has two task on node_1 and node_2. As we know that each consumer also has two task on the same nodes(node_1 and node_2)
// We need to prune address of node_2 for producer's task on node_1 since we just want the producer task on the node_1 only send to the consumer tasks on the node_1.
// And the same for the task on the node_2.
// And the same for the consumer task. We need to prune the unnecessary task address of its producer tasks(i.e. the downstream tasks).
func (e *MPPGather) fixTaskForCTEStorageAndReader(exec *tipb.Executor, meta kv.MPPTaskMeta) error {
children := make([]*tipb.Executor, 0, 2)
switch exec.Tp {
case tipb.ExecType_TypeTableScan, tipb.ExecType_TypePartitionTableScan, tipb.ExecType_TypeIndexScan:
case tipb.ExecType_TypeSelection:
children = append(children, exec.Selection.Child)
case tipb.ExecType_TypeAggregation, tipb.ExecType_TypeStreamAgg:
children = append(children, exec.Aggregation.Child)
case tipb.ExecType_TypeTopN:
children = append(children, exec.TopN.Child)
case tipb.ExecType_TypeLimit:
children = append(children, exec.Limit.Child)
case tipb.ExecType_TypeExchangeSender:
children = append(children, exec.ExchangeSender.Child)
if len(exec.ExchangeSender.UpstreamCteTaskMeta) == 0 {
break
}
actualUpStreamTasks := make([][]byte, 0, len(exec.ExchangeSender.UpstreamCteTaskMeta))
actualTIDs := make([]int64, 0, len(exec.ExchangeSender.UpstreamCteTaskMeta))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used variable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used for the debug log.

for _, tasksFromOneConsumer := range exec.ExchangeSender.UpstreamCteTaskMeta {
for _, taskBytes := range tasksFromOneConsumer.EncodedTasks {
taskMeta := &mpp.TaskMeta{}
err := taskMeta.Unmarshal(taskBytes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really strange and complex code.
Marshal it in constructDistExecForTiFlash then unmarshal it here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but it's difficult to change since we split the tasks of one fragment just out here.

if err != nil {
return err
}
if taskMeta.Address != meta.GetAddress() {
continue
}
actualUpStreamTasks = append(actualUpStreamTasks, taskBytes)
actualTIDs = append(actualTIDs, taskMeta.TaskId)
}
}
logutil.BgLogger().Warn("refine tunnel for cte producer task", zap.String("the final tunnel", fmt.Sprintf("up stream consumer tasks: %v", actualTIDs)))
exec.ExchangeSender.EncodedTaskMeta = actualUpStreamTasks
case tipb.ExecType_TypeExchangeReceiver:
if len(exec.ExchangeReceiver.OriginalCtePrdocuerTaskMeta) == 0 {
break
}
exec.ExchangeReceiver.EncodedTaskMeta = [][]byte{}
actualTIDs := make([]int64, 0, 4)
for _, taskBytes := range exec.ExchangeReceiver.OriginalCtePrdocuerTaskMeta {
taskMeta := &mpp.TaskMeta{}
err := taskMeta.Unmarshal(taskBytes)
if err != nil {
return err
}
if taskMeta.Address != meta.GetAddress() {
continue
}
exec.ExchangeReceiver.EncodedTaskMeta = append(exec.ExchangeReceiver.EncodedTaskMeta, taskBytes)
actualTIDs = append(actualTIDs, taskMeta.TaskId)
}
logutil.BgLogger().Warn("refine tunnel for cte consumer task", zap.String("the final tunnel", fmt.Sprintf("down stream producer task: %v", actualTIDs)))
case tipb.ExecType_TypeJoin:
children = append(children, exec.Join.Children...)
case tipb.ExecType_TypeProjection:
children = append(children, exec.Projection.Child)
case tipb.ExecType_TypeWindow:
children = append(children, exec.Window.Child)
case tipb.ExecType_TypeSort:
children = append(children, exec.Sort.Child)
case tipb.ExecType_TypeExpand:
children = append(children, exec.Expand.Child)
default:
return errors.Errorf("unknown new tipb protocol %d", exec.Tp)
}
for _, child := range children {
err := e.fixTaskForCTEStorageAndReader(child, meta)
if err != nil {
return err
}
}
return nil
}

func collectPlanIDS(plan plannercore.PhysicalPlan, ids []int) []int {
ids = append(ids, plan.ID())
for _, child := range plan.Children() {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -281,5 +281,6 @@ replace (
// fix potential security issue(CVE-2020-26160) introduced by indirect dependency.
github.com/dgrijalva/jwt-go => github.com/form3tech-oss/jwt-go v3.2.6-0.20210809144907-32ab6a8243d7+incompatible
github.com/pingcap/tidb/parser => ./parser
github.com/pingcap/tipb => github.com/pingcap/tipb v0.0.0-20230418145421-de2cb4e699e9
go.opencensus.io => go.opencensus.io v0.23.1-0.20220331163232-052120675fac
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -783,8 +783,8 @@ github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 h1:2SOzvGvE8beiC1Y4g
github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I=
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM=
github.com/pingcap/tipb v0.0.0-20230310043643-5362260ee6f7 h1:CeeMOq1aHPAhXrw4eYXtQRyWOFlbfqK1+3f9Iop4IfU=
github.com/pingcap/tipb v0.0.0-20230310043643-5362260ee6f7/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pingcap/tipb v0.0.0-20230418145421-de2cb4e699e9 h1:dWYwAeMc0QQtWjVrL/mNx/3yTm8wcmcFHw4sHm8EXfU=
github.com/pingcap/tipb v0.0.0-20230418145421-de2cb4e699e9/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 h1:49lOXmGaUpV9Fz3gd7TFZY106KVlPVa5jcYD1gaQf98=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
Expand Down
1 change: 1 addition & 0 deletions planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ go_library(
"rule_partition_processor.go",
"rule_predicate_push_down.go",
"rule_predicate_simplification.go",
"rule_push_down_sequence.go",
"rule_result_reorder.go",
"rule_semi_join_rewrite.go",
"rule_topn_push_down.go",
Expand Down
6 changes: 5 additions & 1 deletion planner/core/access_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,11 @@ func (p *PhysicalTableReader) accessObject(sctx sessionctx.Context) AccessObject
return DynamicPartitionAccessObjects(nil)
}
if len(p.PartitionInfos) == 0 {
ts := p.TablePlans[0].(*PhysicalTableScan)
ts, ok := p.TablePlans[0].(*PhysicalTableScan)
if !ok {
cte := p.TablePlans[0].(*PhysicalCTE)
return OtherAccessObject(fmt.Sprintf("cte: %v as %v", cte.cteName, cte.cteAsName))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic seems useless.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't have this part, some SQL will panic...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then the type assertion is enough, I think the OtherAccessObject is unneeded.

asName := ""
if ts.TableAsName != nil && len(ts.TableAsName.O) > 0 {
asName = ts.TableAsName.O
Expand Down
48 changes: 48 additions & 0 deletions planner/core/casetest/enforce_mpp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,3 +598,51 @@ func TestMPPNullAwareSemiJoinPushDown(t *testing.T) {
require.Equal(t, output[i].Warn, testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings()))
}
}

func TestMPPSharedCTEScan(t *testing.T) {
store := testkit.CreateMockStore(t, internal.WithMockTiFlash(2))
tk := testkit.NewTestKit(t, store)

// test table
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("drop table if exists s")
tk.MustExec("create table t(a int, b int, c int)")
tk.MustExec("create table s(a int, b int, c int)")
tk.MustExec("alter table t set tiflash replica 1")
tk.MustExec("alter table s set tiflash replica 1")

tb := external.GetTableByName(t, tk, "test", "t")
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)

tb = external.GetTableByName(t, tk, "test", "s")
err = domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)

var input []string
var output []struct {
SQL string
Plan []string
Warn []string
}

tk.MustExec("set @@tidb_enforce_mpp='on'")
tk.MustExec("set @@tidb_opt_enable_mpp_shared_cte_execution='on'")

enforceMPPSuiteData := GetEnforceMPPSuiteData()
enforceMPPSuiteData.LoadTestCases(t, &input, &output)
for i, tt := range input {
testdata.OnRecord(func() {
output[i].SQL = tt
})
testdata.OnRecord(func() {
output[i].SQL = tt
output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Rows())
output[i].Warn = testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings())
})
res := tk.MustQuery(tt)
res.Check(testkit.Rows(output[i].Plan...))
require.Equal(t, output[i].Warn, testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings()))
}
}
2 changes: 2 additions & 0 deletions planner/core/casetest/testdata/binary_plan_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@
"BinaryPlan": {
"main": {
"name": "CTEFullScan_17",
"cost": 0.8982000000000001,
"est_rows": 1.8,
"act_rows": 5,
"task_type": 1,
Expand Down Expand Up @@ -363,6 +364,7 @@
"operator_info": "cast(plus(Column#3, 1), bigint(1) BINARY)->Column#5"
}
],
"cost": 0.8982000000000001,
"est_rows": 1.8,
"act_rows": 5,
"task_type": 1,
Expand Down
26 changes: 26 additions & 0 deletions planner/core/casetest/testdata/enforce_mpp_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -172,5 +172,31 @@
"EXPLAIN select *, (t.a, t.b) not in (select s.a, s.b from s) from t; -- 7. left anti semi join, two join key",
"EXPLAIN select *, (t.a, t.b) not in (select s.a, s.b from s where t.c < s.c) from t; -- 8. left anti semi join, two join key + other condition"
]
},
{
"name": "TestMPPSharedCTEScan",
"cases": [
// The most simple case.
"explain format = 'brief' with c1 as (select * from t) select * from c1, c1 c2 where c1.a=c2.b ",
"explain format = 'brief' with c1 as (select /*+ read_from_storage(tikv[t]) */ * from t) select * from c1, c1 c2 where c1.a=c2.b ",
"explain format = 'brief' with c1 as (select * from t) select c1.* from c1, c1 c2 where c1.b=c2.c",
// Can work when there's global limit/topn
"explain format = 'brief' with c1 as (select * from t) select * from c1, c1 c2 where c1.a=c2.b limit 10",
"explain format = 'brief' with c1 as (select * from t) select * from c1, c1 c2 where c1.a=c2.b order by c1.a limit 10",
// The c2 references c1, c1 can mpp, and then c2 can mpp, so the main query can mpp.
"explain format = 'brief' with c1 as (select * from t), c2 as (select c1.* from c1, c1 c2 where c1.b=c2.c) select * from c2 c1, c2 where c1.a=c2.b",
// The same SQL, c1 forces to read tikv. So c2 cannot MPP, then the whole SQL.
"explain format = 'brief' with c1 as (select /*+ read_from_storage(tikv[t]) */ * from t), c2 as (select c1.* from c1, c1 c2 where c1.b=c2.c) select * from c2 c1, c2 where c1.a=c2.b",
// The two WITH satement can all be MPP.
"explain format = 'brief' with c1 as (select * from t), c2 as (select c1.* from c1, c1 c2 where c1.b=c2.c) select * from c2 c1, c2, (with c3 as (select * from t) select c3.* from c3, c3 c4 where c3.c=c4.b) c3 where c1.a=c2.b and c1.a=c3.a",
// The outer one will fail to use MPP. But the inner WITH statement can. But we haven't implemented the least common ancestor to detect the best position of the Sequence. So the whole SQL cannot MPP.
"explain format = 'brief' with c1 as (select /*+ read_from_storage(tikv[t]) */ * from t), c2 as (select c1.* from c1, c1 c2 where c1.b=c2.c) select * from c2 c1, c2, (with c3 as (select * from t) select c3.* from c3, c3 c4 where c3.c=c4.b) c3 where c1.a=c2.b and c1.a=c3.a",
// The inner one will fail. So the whole SQL cannot MPP.
"explain format = 'brief' with c1 as (select * from t), c2 as (select c1.* from c1, c1 c2 where c1.b=c2.c) select * from c2 c1, c2, (with c3 as (select /*+ read_from_storage(tikv[t]) */ * from t) select c3.* from c3, c3 c4 where c3.c=c4.b) c3 where c1.a=c2.b and c1.a=c3.a",
// A little change that the inner WITH statement references the outer's c1.
"explain format = 'brief' with c1 as (select * from t), c2 as (select c1.* from c1, c1 c2 where c1.b=c2.c) select * from c2 c1, c2, (with c3 as (select * from c1) select c3.* from c3, c3 c4 where c3.c=c4.b) c3 where c1.a=c2.b and c1.a=c3.a",
// The outer one will fail to use MPP. Since the inner one is references the outer one, the whole SQL cannot MPP.
"explain format = 'brief' with c1 as (select /*+ read_from_storage(tikv[t]) */ * from t), c2 as (select c1.* from c1, c1 c2 where c1.b=c2.c) select * from c2 c1, c2, (with c3 as (select * from c1) select c3.* from c3, c3 c4 where c3.c=c4.b) c3 where c1.a=c2.b and c1.a=c3.a"
]
}
]
Loading