diff --git a/executor/mpp_gather.go b/executor/mpp_gather.go index 107d3501c6d5f..69a6b0c9ceb79 100644 --- a/executor/mpp_gather.go +++ b/executor/mpp_gather.go @@ -88,7 +88,7 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment, tasks []*kv.M e.mppReqs = append(e.mppReqs, req) } for _, r := range pf.ExchangeReceivers { - err = e.appendMPPDispatchReq(r.ChildPf, r.Tasks, false) + err = e.appendMPPDispatchReq(r.GetExchangeSender().Fragment, r.Tasks, false) if err != nil { return errors.Trace(err) } diff --git a/executor/tiflash_test.go b/executor/tiflash_test.go index 89b7e48af3bc8..750293c9f2cec 100644 --- a/executor/tiflash_test.go +++ b/executor/tiflash_test.go @@ -183,12 +183,26 @@ func (s *tiflashTestSuite) TestMppExecution(c *C) { tk.MustQuery("select count(*) from t1 , t where t1.a = t.a").Check(testkit.Rows("3")) tk.MustQuery("select count(*) from t1 , t, t2 where t1.a = t.a and t2.a = t.a").Check(testkit.Rows("3")) + // test agg by expression tk.MustExec("insert into t1 values(4,0)") tk.MustQuery("select count(*) k, t2.b from t1 left join t2 on t1.a = t2.a group by t2.b order by k").Check(testkit.Rows("1 ", "3 0")) tk.MustQuery("select count(*) k, t2.b+1 from t1 left join t2 on t1.a = t2.a group by t2.b+1 order by k").Check(testkit.Rows("1 ", "3 1")) tk.MustQuery("select count(*) k, t2.b * t2.a from t2 group by t2.b * t2.a").Check(testkit.Rows("3 0")) tk.MustQuery("select count(*) k, t2.a/2 m from t2 group by t2.a / 2 order by m").Check(testkit.Rows("1 0.5000", "1 1.0000", "1 1.5000")) tk.MustQuery("select count(*) k, t2.a div 2 from t2 group by t2.a div 2 order by k").Check(testkit.Rows("1 0", "2 1")) + + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (c1 decimal(8, 5) not null, c2 decimal(9, 5), c3 decimal(9, 4) , c4 decimal(8, 4) not null)") + tk.MustExec("alter table t set tiflash replica 1") + tb = testGetTableByName(c, tk.Se, "test", "t") + err = domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true) + c.Assert(err, IsNil) + tk.MustExec("insert into t values(1.00000,1.00000,1.0000,1.0000)") + tk.MustExec("insert into t values(1.00010,1.00010,1.0001,1.0001)") + tk.MustExec("insert into t values(1.00001,1.00001,1.0000,1.0002)") + tk.MustQuery("select t1.c1 from t t1 join t t2 on t1.c1 = t2.c1 order by t1.c1").Check(testkit.Rows("1.00000", "1.00001", "1.00010")) + tk.MustQuery("select t1.c1 from t t1 join t t2 on t1.c1 = t2.c3 order by t1.c1").Check(testkit.Rows("1.00000", "1.00000", "1.00010")) + tk.MustQuery("select t1.c4 from t t1 join t t2 on t1.c4 = t2.c3 order by t1.c4").Check(testkit.Rows("1.0000", "1.0000", "1.0001")) } func (s *tiflashTestSuite) TestPartitionTable(c *C) { diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 330d2ea1ae7e7..bba801ee275e5 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -1817,6 +1817,7 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC Concurrency: uint(p.ctx.GetSessionVars().CopTiFlashConcurrencyFactor), EqualConditions: p.EqualConditions, storeTp: kv.TiFlash, + mppShuffleJoin: !useBCJ, }.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, childrenProps...) return []PhysicalPlan{join} } diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index 555962313c9c2..88b7ed9de805f 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -1493,7 +1493,6 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid p: ts, cst: cost, partTp: property.AnyType, - ts: ts, } ts.PartitionInfo = PartitionInfo{ PruningConds: ds.allConds, diff --git a/planner/core/fragment.go b/planner/core/fragment.go index f5bdf781716b9..1ed688a1a63f7 100644 --- a/planner/core/fragment.go +++ b/planner/core/fragment.go @@ -59,7 +59,7 @@ func (e *mppTaskGenerator) generateMPPTasks(s *PhysicalExchangeSender) ([]*kv.MP StartTs: e.startTS, ID: -1, } - rootTasks, err := e.generateMPPTasksForFragment(s.Fragment) + rootTasks, err := e.generateMPPTasksForFragment(s) if err != nil { return nil, errors.Trace(err) } @@ -67,9 +67,39 @@ func (e *mppTaskGenerator) generateMPPTasks(s *PhysicalExchangeSender) ([]*kv.MP return rootTasks, nil } -func (e *mppTaskGenerator) generateMPPTasksForFragment(f *Fragment) (tasks []*kv.MPPTask, err error) { +func (f *Fragment) init(p PhysicalPlan) error { + switch x := p.(type) { + case *PhysicalTableScan: + if f.TableScan != nil { + return errors.New("one task contains at most one table scan") + } + f.TableScan = x + case *PhysicalExchangeReceiver: + f.ExchangeReceivers = append(f.ExchangeReceivers, x) + default: + for _, ch := range p.Children() { + if err := f.init(ch); err != nil { + return errors.Trace(err) + } + } + } + return nil +} + +func newFragment(s *PhysicalExchangeSender) (*Fragment, error) { + f := &Fragment{ExchangeSender: s} + s.Fragment = f + err := f.init(s) + return f, errors.Trace(err) +} + +func (e *mppTaskGenerator) generateMPPTasksForFragment(s *PhysicalExchangeSender) (tasks []*kv.MPPTask, err error) { + f, err := newFragment(s) + if err != nil { + return nil, errors.Trace(err) + } for _, r := range f.ExchangeReceivers { - r.Tasks, err = e.generateMPPTasksForFragment(r.ChildPf) + r.Tasks, err = e.generateMPPTasksForFragment(r.GetExchangeSender()) if err != nil { return nil, errors.Trace(err) } @@ -86,7 +116,7 @@ func (e *mppTaskGenerator) generateMPPTasksForFragment(f *Fragment) (tasks []*kv return nil, errors.New("cannot find mpp task") } for _, r := range f.ExchangeReceivers { - s := r.ChildPf.ExchangeSender + s := r.GetExchangeSender() s.TargetTasks = tasks } f.ExchangeSender.Tasks = tasks diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 983dc9cac3130..5b0235cccffb4 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -2807,6 +2807,47 @@ func (s *testIntegrationSerialSuite) TestPushDownAggForMPP(c *C) { } } +func (s *testIntegrationSerialSuite) TestMppJoinDecimal(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (c1 decimal(8, 5), c2 decimal(9, 5), c3 decimal(9, 4) NOT NULL, c4 decimal(8, 4) NOT NULL, c5 decimal(40, 20))") + tk.MustExec("analyze table t") + + // Create virtual tiflash replica info. + dom := domain.GetDomain(tk.Se) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + c.Assert(exists, IsTrue) + for _, tblInfo := range db.Tables { + if tblInfo.Name.L == "t" { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + } + + tk.MustExec("set @@tidb_allow_mpp=1;") + tk.MustExec("set @@session.tidb_broadcast_join_threshold_size = 1") + tk.MustExec("set @@session.tidb_broadcast_join_threshold_count = 1") + + var input []string + var output []struct { + SQL string + Plan []string + } + s.testData.GetTestCases(c, &input, &output) + for i, tt := range input { + s.testData.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + }) + res := tk.MustQuery(tt) + res.Check(testkit.Rows(output[i].Plan...)) + } +} + func (s *testIntegrationSerialSuite) TestMppAggWithJoin(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/planner/core/optimizer_test.go b/planner/core/optimizer_test.go index 1a8b91ec682e2..8ceff4056787b 100644 --- a/planner/core/optimizer_test.go +++ b/planner/core/optimizer_test.go @@ -13,6 +13,49 @@ package core +import ( + . "github.com/pingcap/check" + "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/types" +) + // LogicalOptimize exports the `logicalOptimize` function for test packages and // doesn't affect the normal package and access control of Golang (tricky ^_^) var LogicalOptimize = logicalOptimize + +var _ = Suite(&testPlannerFunctionSuite{}) + +type testPlannerFunctionSuite struct { +} + +func testDecimalConvert(lDec, lLen, rDec, rLen int, lConvert, rConvert bool, cDec, cLen int, c *C) { + lType := types.NewFieldType(mysql.TypeNewDecimal) + lType.Decimal = lDec + lType.Flen = lLen + + rType := types.NewFieldType(mysql.TypeNewDecimal) + rType.Decimal = rDec + rType.Flen = rLen + + cType, lCon, rCon := negotiateCommonType(lType, rType) + c.Assert(cType.Tp, Equals, mysql.TypeNewDecimal) + c.Assert(cType.Decimal, Equals, cDec) + c.Assert(cType.Flen, Equals, cLen) + c.Assert(lConvert, Equals, lCon) + c.Assert(rConvert, Equals, rCon) +} + +func (t *testPlannerFunctionSuite) TestMPPDecimalConvert(c *C) { + testDecimalConvert(5, 9, 5, 8, false, false, 5, 9, c) + testDecimalConvert(5, 8, 5, 9, false, false, 5, 9, c) + testDecimalConvert(0, 8, 0, 11, true, false, 0, 11, c) + testDecimalConvert(0, 16, 0, 11, false, false, 0, 16, c) + testDecimalConvert(5, 9, 4, 9, true, true, 5, 10, c) + testDecimalConvert(5, 8, 4, 9, true, true, 5, 10, c) + testDecimalConvert(5, 9, 4, 8, false, true, 5, 9, c) + testDecimalConvert(10, 16, 0, 11, true, true, 10, 21, c) + testDecimalConvert(5, 19, 0, 20, false, true, 5, 25, c) + testDecimalConvert(20, 20, 0, 60, true, true, 20, 65, c) + testDecimalConvert(20, 40, 0, 60, false, true, 20, 65, c) + testDecimalConvert(0, 40, 0, 60, false, false, 0, 60, c) +} diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index ed8e7ce626af6..137fd54227c4c 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -728,6 +728,7 @@ type PhysicalHashJoin struct { // on which store the join executes. storeTp kv.StoreType globalChildIndex int + mppShuffleJoin bool } // Clone implements PhysicalPlan interface. @@ -851,8 +852,12 @@ type PhysicalMergeJoin struct { type PhysicalExchangeReceiver struct { basePhysicalPlan - Tasks []*kv.MPPTask - ChildPf *Fragment + Tasks []*kv.MPPTask +} + +// GetExchangeSender return the connected sender of this receiver. We assume that its child must be a receiver. +func (p *PhysicalExchangeReceiver) GetExchangeSender() *PhysicalExchangeSender { + return p.children[0].(*PhysicalExchangeSender) } // PhysicalExchangeSender dispatches data to upstream tasks. That means push mode processing, diff --git a/planner/core/task.go b/planner/core/task.go index 59dc164239380..15b47b754ea65 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -16,6 +16,7 @@ package core import ( "math" + "github.com/cznic/mathutil" "github.com/pingcap/errors" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/charset" @@ -557,28 +558,185 @@ func (p *PhysicalHashJoin) attach2Task(tasks ...task) task { return task } +// for different decimal scale and precision, tiflash uses different underlying type. +// Here we check the scale and precision to decide whether conversion is a must. +func needDecimalConvert(tp *types.FieldType, rtp *types.FieldType) bool { + if tp.Decimal != rtp.Decimal { + return true + } + if tp.Flen >= 0 && tp.Flen <= 9 && rtp.Flen >= 0 && rtp.Flen <= 9 { + return false + } + if tp.Flen > 9 && tp.Flen <= 18 && rtp.Flen > 9 && rtp.Flen <= 18 { + return false + } + if tp.Flen > 18 && tp.Flen <= 38 && rtp.Flen > 18 && rtp.Flen <= 38 { + return false + } + if tp.Flen > 38 && tp.Flen <= 65 && rtp.Flen > 38 && rtp.Flen <= 65 { + return false + } + return true +} + +func negotiateCommonType(lType, rType *types.FieldType) (*types.FieldType, bool, bool) { + lExtend := 0 + rExtend := 0 + cDec := rType.Decimal + if lType.Decimal < rType.Decimal { + lExtend = rType.Decimal - lType.Decimal + } else if lType.Decimal > rType.Decimal { + rExtend = lType.Decimal - rType.Decimal + cDec = lType.Decimal + } + lLen, rLen := lType.Flen+lExtend, rType.Flen+rExtend + cLen := mathutil.Max(lLen, rLen) + cLen = mathutil.Min(65, cLen) + cType := types.NewFieldType(mysql.TypeNewDecimal) + cType.Decimal = cDec + cType.Flen = cLen + return cType, needDecimalConvert(lType, cType), needDecimalConvert(rType, cType) +} + +func getProj(ctx sessionctx.Context, p PhysicalPlan) *PhysicalProjection { + proj := PhysicalProjection{ + Exprs: make([]expression.Expression, 0, len(p.Schema().Columns)), + }.Init(ctx, p.statsInfo(), p.SelectBlockOffset()) + for _, col := range p.Schema().Columns { + proj.Exprs = append(proj.Exprs, col) + } + proj.SetSchema(p.Schema().Clone()) + proj.SetChildren(p) + return proj +} + +func appendExpr(p *PhysicalProjection, expr expression.Expression) *expression.Column { + p.Exprs = append(p.Exprs, expr) + + col := &expression.Column{ + UniqueID: p.ctx.GetSessionVars().AllocPlanColumnID(), + RetType: expr.GetType(), + } + col.SetCoercibility(expr.Coercibility()) + p.schema.Append(col) + return col +} + +// If the join key's type are decimal and needs conversion, we will add a projection below the join or exchanger if exists. +func (p *PhysicalHashJoin) convertDecimalKeyIfNeed(lTask, rTask *mppTask) (*mppTask, *mppTask) { + lp := lTask.p + if _, ok := lp.(*PhysicalExchangeReceiver); ok { + lp = lp.Children()[0].Children()[0] + } + rp := rTask.p + if _, ok := rp.(*PhysicalExchangeReceiver); ok { + rp = rp.Children()[0].Children()[0] + } + // to mark if any equal cond needs to convert + lMask := make([]bool, len(p.EqualConditions)) + rMask := make([]bool, len(p.EqualConditions)) + cTypes := make([]*types.FieldType, len(p.EqualConditions)) + lChanged := false + rChanged := false + for i, eqFunc := range p.EqualConditions { + lKey := eqFunc.GetArgs()[0].(*expression.Column) + rKey := eqFunc.GetArgs()[1].(*expression.Column) + if lKey.RetType.Tp == mysql.TypeNewDecimal && rKey.RetType.Tp == mysql.TypeNewDecimal { + cType, lConvert, rConvert := negotiateCommonType(lKey.RetType, rKey.RetType) + if lConvert { + lMask[i] = true + cTypes[i] = cType + lChanged = true + } + if rConvert { + rMask[i] = true + cTypes[i] = cType + rChanged = true + } + } + } + if !lChanged && !rChanged { + return lTask, rTask + } + var lProj, rProj *PhysicalProjection + if lChanged { + lProj = getProj(p.ctx, lp) + lp = lProj + } + if rChanged { + rProj = getProj(p.ctx, rp) + rp = rProj + } + lKeys := make([]*expression.Column, 0, len(p.EqualConditions)) + rKeys := make([]*expression.Column, 0, len(p.EqualConditions)) + for i, eqFunc := range p.EqualConditions { + lKey := eqFunc.GetArgs()[0].(*expression.Column) + rKey := eqFunc.GetArgs()[1].(*expression.Column) + if lMask[i] { + cType := cTypes[i].Clone() + cType.Flag = lKey.RetType.Flag + lCast := expression.BuildCastFunction(p.ctx, lKey, cType) + lKey = appendExpr(lProj, lCast) + } + if rMask[i] { + cType := cTypes[i].Clone() + cType.Flag = rKey.RetType.Flag + rCast := expression.BuildCastFunction(p.ctx, rKey, cType) + rKey = appendExpr(rProj, rCast) + } + if lMask[i] || rMask[i] { + eqCond := expression.NewFunctionInternal(p.ctx, ast.EQ, types.NewFieldType(mysql.TypeTiny), lKey, rKey) + p.EqualConditions[i] = eqCond.(*expression.ScalarFunction) + } + lKeys = append(lKeys, lKey) + rKeys = append(rKeys, rKey) + } + // if left or right child changes, we need to add enforcer. + if lChanged { + nlTask := lTask.copy().(*mppTask) + nlTask.p = lProj + nlTask = nlTask.enforceExchangerImpl(&property.PhysicalProperty{ + TaskTp: property.MppTaskType, + PartitionTp: property.HashType, + PartitionCols: lKeys, + }) + nlTask.cst = lTask.cst + lTask = nlTask + } + if rChanged { + nrTask := rTask.copy().(*mppTask) + nrTask.p = rProj + nrTask = nrTask.enforceExchangerImpl(&property.PhysicalProperty{ + TaskTp: property.MppTaskType, + PartitionTp: property.HashType, + PartitionCols: rKeys, + }) + nrTask.cst = rTask.cst + rTask = nrTask + } + return lTask, rTask +} + func (p *PhysicalHashJoin) attach2TaskForMpp(tasks ...task) task { lTask, lok := tasks[0].(*mppTask) rTask, rok := tasks[1].(*mppTask) if !lok || !rok { return invalidTask } + if p.mppShuffleJoin { + lTask, rTask = p.convertDecimalKeyIfNeed(lTask, rTask) + } p.SetChildren(lTask.plan(), rTask.plan()) p.schema = BuildPhysicalJoinSchema(p.JoinType, p) lCost := lTask.cost() rCost := rTask.cost() outerTask := tasks[1-p.InnerChildIdx].(*mppTask) - receivers := make([]*PhysicalExchangeReceiver, 0) - receivers = append(receivers, lTask.receivers...) - receivers = append(receivers, rTask.receivers...) task := &mppTask{ - cst: lCost + rCost + p.GetCost(lTask.count(), rTask.count()), - p: p, - partTp: outerTask.partTp, - hashCols: outerTask.hashCols, - ts: outerTask.ts, - receivers: receivers, + cst: lCost + rCost + p.GetCost(lTask.count(), rTask.count()), + p: p, + partTp: outerTask.partTp, + hashCols: outerTask.hashCols, } return task } @@ -1727,9 +1885,6 @@ type mppTask struct { partTp property.PartitionType hashCols []*expression.Column - - ts *PhysicalTableScan - receivers []*PhysicalExchangeReceiver } func (t *mppTask) count() float64 { @@ -1766,7 +1921,6 @@ func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask { ExchangeType: tipb.ExchangeType_PassThrough, }.Init(ctx, t.p.statsInfo()) sender.SetChildren(t.p) - sender.Fragment = &Fragment{ExchangeReceivers: t.receivers, ExchangeSender: sender, TableScan: t.ts} p := PhysicalTableReader{ tablePlan: sender, @@ -1819,18 +1973,13 @@ func (t *mppTask) enforceExchangerImpl(prop *property.PhysicalProperty) *mppTask HashCols: prop.PartitionCols, }.Init(ctx, t.p.statsInfo()) sender.SetChildren(t.p) - f := &Fragment{ExchangeSender: sender, TableScan: t.ts, ExchangeReceivers: t.receivers} - sender.Fragment = f - receiver := PhysicalExchangeReceiver{ - ChildPf: f, - }.Init(ctx, t.p.statsInfo()) + receiver := PhysicalExchangeReceiver{}.Init(ctx, t.p.statsInfo()) receiver.SetChildren(sender) cst := t.cst + t.count()*ctx.GetSessionVars().NetworkFactor return &mppTask{ - p: receiver, - cst: cst, - partTp: prop.PartitionTp, - hashCols: prop.PartitionCols, - receivers: []*PhysicalExchangeReceiver{receiver}, + p: receiver, + cst: cst, + partTp: prop.PartitionTp, + hashCols: prop.PartitionCols, } } diff --git a/planner/core/testdata/integration_serial_suite_in.json b/planner/core/testdata/integration_serial_suite_in.json index ae3d7a5796d70..1c15f4fe184de 100644 --- a/planner/core/testdata/integration_serial_suite_in.json +++ b/planner/core/testdata/integration_serial_suite_in.json @@ -182,6 +182,18 @@ "desc format = 'brief' select id from t as A where not exists (select 1 from t where t.id=A.id)" ] }, + { + "name": "TestMppJoinDecimal", + "cases": [ + "desc format = 'brief' select t1.c1, t1.c2, t2.c1, t2.c2, t2.c3 from t t1 join t t2 on t1.c1 + 1 = t2.c2 - 10 and t1.c1 * 3 = t2.c3 / 2", + "desc format = 'brief' select * from (select c1, c2, c5, count(*) c from t group by c1, c2, c5) t1 join (select c1, c2, c3, count(*) c from t group by c1, c2, c3) t2 on t1.c1 = t2.c2 and t1.c2 = t2.c3 and t1.c5 = t2.c1", + "desc format = 'brief' select * from t t1 join t t2 on t1.c1 = t2.c2 and t1.c2 = t2.c2 and t1.c3 = t2.c3 and t1.c4 = t2.c4 and t1.c5 = t2.c5", + "desc format = 'brief' select * from t t1 join t t2 on t1.c1 = t2.c2 and t1.c2 = t2.c3 and t1.c3 = t2.c1 and t1.c4 = t2.c3 and t1.c1 = t2.c5", + "desc format = 'brief' select * from t t1 join t t2 on t1.c1 + t1.c2 = t2.c2 / t2.c3", + "desc format = 'brief' select * from t t1 where exists (select * from t t2 where t1.c1 = t2.c2 and t1.c2 = t2.c3 and t1.c3 = t2.c1 and t1.c4 = t2.c3 and t1.c1 = t2.c5)", + "desc format = 'brief' select * from t t1 left join t t2 on t1.c1 = t2.c2 join t t3 on t2.c5 = t3.c3 right join t t4 on t3.c3 = t4.c4 " + ] + }, { "name": "TestPushDownAggForMPP", "cases": [ diff --git a/planner/core/testdata/integration_serial_suite_out.json b/planner/core/testdata/integration_serial_suite_out.json index c6e3cb33cbe91..9d94e6e16ae8e 100644 --- a/planner/core/testdata/integration_serial_suite_out.json +++ b/planner/core/testdata/integration_serial_suite_out.json @@ -1499,6 +1499,159 @@ } ] }, + { + "Name": "TestMppJoinDecimal", + "Cases": [ + { + "SQL": "desc format = 'brief' select t1.c1, t1.c2, t2.c1, t2.c2, t2.c3 from t t1 join t t2 on t1.c1 + 1 = t2.c2 - 10 and t1.c1 * 3 = t2.c3 / 2", + "Plan": [ + "Projection 12500.00 root test.t.c1, test.t.c2, test.t.c1, test.t.c2, test.t.c3", + "└─TableReader 12500.00 root data:ExchangeSender", + " └─ExchangeSender 12500.00 cop[tiflash] ExchangeType: PassThrough", + " └─HashJoin 12500.00 cop[tiflash] inner join, equal:[eq(Column#21, Column#22) eq(Column#15, Column#16)]", + " ├─ExchangeReceiver(Build) 10000.00 cop[tiflash] ", + " │ └─ExchangeSender 10000.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: Column#21, Column#15", + " │ └─Projection 10000.00 cop[tiflash] test.t.c1, test.t.c2, Column#13, Column#15, cast(Column#13, decimal(34,8) BINARY)->Column#21", + " │ └─Projection 10000.00 cop[tiflash] test.t.c1, test.t.c2, mul(test.t.c1, 3)->Column#13, plus(test.t.c1, 1)->Column#15", + " │ └─TableFullScan 10000.00 cop[tiflash] table:t1 keep order:false, stats:pseudo", + " └─ExchangeReceiver(Probe) 10000.00 cop[tiflash] ", + " └─ExchangeSender 10000.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: Column#22, Column#16", + " └─Projection 10000.00 cop[tiflash] test.t.c1, test.t.c2, test.t.c3, Column#14, Column#16, cast(Column#14, decimal(34,8) BINARY)->Column#22", + " └─Projection 10000.00 cop[tiflash] test.t.c1, test.t.c2, test.t.c3, div(test.t.c3, 2)->Column#14, minus(test.t.c2, 10)->Column#16", + " └─TableFullScan 10000.00 cop[tiflash] table:t2 keep order:false, stats:pseudo" + ] + }, + { + "SQL": "desc format = 'brief' select * from (select c1, c2, c5, count(*) c from t group by c1, c2, c5) t1 join (select c1, c2, c3, count(*) c from t group by c1, c2, c3) t2 on t1.c1 = t2.c2 and t1.c2 = t2.c3 and t1.c5 = t2.c1", + "Plan": [ + "Projection 7976.02 root test.t.c1, test.t.c2, test.t.c5, Column#7, test.t.c1, test.t.c2, test.t.c3, Column#14", + "└─TableReader 7976.02 root data:ExchangeSender", + " └─ExchangeSender 7976.02 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashJoin 7976.02 batchCop[tiflash] inner join, equal:[eq(test.t.c1, test.t.c2) eq(Column#31, Column#32) eq(test.t.c5, Column#33)]", + " ├─ExchangeReceiver(Build) 7976.02 batchCop[tiflash] ", + " │ └─ExchangeSender 7976.02 batchCop[tiflash] ExchangeType: HashPartition, Hash Cols: test.t.c1, Column#31, test.t.c5", + " │ └─Projection 7976.02 batchCop[tiflash] Column#7, test.t.c1, test.t.c2, test.t.c5, cast(test.t.c2, decimal(10,5))->Column#31", + " │ └─Projection 7976.02 batchCop[tiflash] Column#7, test.t.c1, test.t.c2, test.t.c5", + " │ └─HashAgg 7976.02 batchCop[tiflash] group by:test.t.c1, test.t.c2, test.t.c5, funcs:sum(Column#15)->Column#7, funcs:firstrow(test.t.c1)->test.t.c1, funcs:firstrow(test.t.c2)->test.t.c2, funcs:firstrow(test.t.c5)->test.t.c5", + " │ └─ExchangeReceiver 7976.02 batchCop[tiflash] ", + " │ └─ExchangeSender 7976.02 batchCop[tiflash] ExchangeType: HashPartition, Hash Cols: test.t.c1, test.t.c2, test.t.c5", + " │ └─HashAgg 7976.02 batchCop[tiflash] group by:test.t.c1, test.t.c2, test.t.c5, funcs:count(1)->Column#15", + " │ └─Selection 9970.03 batchCop[tiflash] not(isnull(test.t.c1)), not(isnull(test.t.c2)), not(isnull(test.t.c5))", + " │ └─TableFullScan 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo", + " └─ExchangeReceiver(Probe) 7984.01 batchCop[tiflash] ", + " └─ExchangeSender 7984.01 batchCop[tiflash] ExchangeType: HashPartition, Hash Cols: test.t.c2, Column#32, Column#33", + " └─Projection 7984.01 batchCop[tiflash] Column#14, test.t.c1, test.t.c2, test.t.c3, cast(test.t.c3, decimal(10,5))->Column#32, cast(test.t.c1, decimal(40,20))->Column#33", + " └─Projection 7984.01 batchCop[tiflash] Column#14, test.t.c1, test.t.c2, test.t.c3", + " └─HashAgg 7984.01 batchCop[tiflash] group by:test.t.c1, test.t.c2, test.t.c3, funcs:sum(Column#23)->Column#14, funcs:firstrow(test.t.c1)->test.t.c1, funcs:firstrow(test.t.c2)->test.t.c2, funcs:firstrow(test.t.c3)->test.t.c3", + " └─ExchangeReceiver 7984.01 batchCop[tiflash] ", + " └─ExchangeSender 7984.01 batchCop[tiflash] ExchangeType: HashPartition, Hash Cols: test.t.c1, test.t.c2, test.t.c3", + " └─HashAgg 7984.01 batchCop[tiflash] group by:test.t.c1, test.t.c2, test.t.c3, funcs:count(1)->Column#23", + " └─Selection 9980.01 batchCop[tiflash] not(isnull(test.t.c1)), not(isnull(test.t.c2))", + " └─TableFullScan 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo" + ] + }, + { + "SQL": "desc format = 'brief' select * from t t1 join t t2 on t1.c1 = t2.c2 and t1.c2 = t2.c2 and t1.c3 = t2.c3 and t1.c4 = t2.c4 and t1.c5 = t2.c5", + "Plan": [ + "TableReader 12462.54 root data:ExchangeSender", + "└─ExchangeSender 12462.54 cop[tiflash] ExchangeType: PassThrough", + " └─HashJoin 12462.54 cop[tiflash] inner join, equal:[eq(test.t.c1, test.t.c2) eq(test.t.c2, test.t.c2) eq(test.t.c3, test.t.c3) eq(test.t.c4, test.t.c4) eq(test.t.c5, test.t.c5)]", + " ├─ExchangeReceiver(Build) 9970.03 cop[tiflash] ", + " │ └─ExchangeSender 9970.03 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.t.c1, test.t.c2, test.t.c3, test.t.c4, test.t.c5", + " │ └─Selection 9970.03 cop[tiflash] not(isnull(test.t.c1)), not(isnull(test.t.c2)), not(isnull(test.t.c5))", + " │ └─TableFullScan 10000.00 cop[tiflash] table:t1 keep order:false, stats:pseudo", + " └─ExchangeReceiver(Probe) 9980.01 cop[tiflash] ", + " └─ExchangeSender 9980.01 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.t.c2, test.t.c2, test.t.c3, test.t.c4, test.t.c5", + " └─Selection 9980.01 cop[tiflash] not(isnull(test.t.c2)), not(isnull(test.t.c5))", + " └─TableFullScan 10000.00 cop[tiflash] table:t2 keep order:false, stats:pseudo" + ] + }, + { + "SQL": "desc format = 'brief' select * from t t1 join t t2 on t1.c1 = t2.c2 and t1.c2 = t2.c3 and t1.c3 = t2.c1 and t1.c4 = t2.c3 and t1.c1 = t2.c5", + "Plan": [ + "Projection 12462.54 root test.t.c1, test.t.c2, test.t.c3, test.t.c4, test.t.c5, test.t.c1, test.t.c2, test.t.c3, test.t.c4, test.t.c5", + "└─TableReader 12462.54 root data:ExchangeSender", + " └─ExchangeSender 12462.54 cop[tiflash] ExchangeType: PassThrough", + " └─HashJoin 12462.54 cop[tiflash] inner join, equal:[eq(test.t.c2, test.t.c1) eq(Column#13, Column#14) eq(Column#15, Column#16) eq(test.t.c3, test.t.c4) eq(test.t.c5, Column#17)]", + " ├─ExchangeReceiver(Build) 9970.03 cop[tiflash] ", + " │ └─ExchangeSender 9970.03 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.t.c2, Column#13, Column#15, test.t.c3, test.t.c5", + " │ └─Projection 9970.03 cop[tiflash] test.t.c1, test.t.c2, test.t.c3, test.t.c4, test.t.c5, cast(test.t.c3, decimal(10,5))->Column#13, cast(test.t.c1, decimal(10,5))->Column#15", + " │ └─Selection 9970.03 cop[tiflash] not(isnull(test.t.c1)), not(isnull(test.t.c2)), not(isnull(test.t.c5))", + " │ └─TableFullScan 10000.00 cop[tiflash] table:t2 keep order:false, stats:pseudo", + " └─ExchangeReceiver(Probe) 9980.01 cop[tiflash] ", + " └─ExchangeSender 9980.01 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.t.c1, Column#14, Column#16, test.t.c4, Column#17", + " └─Projection 9980.01 cop[tiflash] test.t.c1, test.t.c2, test.t.c3, test.t.c4, test.t.c5, cast(test.t.c2, decimal(10,5))->Column#14, cast(test.t.c3, decimal(10,5))->Column#16, cast(test.t.c1, decimal(40,20))->Column#17", + " └─Selection 9980.01 cop[tiflash] not(isnull(test.t.c1)), not(isnull(test.t.c2))", + " └─TableFullScan 10000.00 cop[tiflash] table:t1 keep order:false, stats:pseudo" + ] + }, + { + "SQL": "desc format = 'brief' select * from t t1 join t t2 on t1.c1 + t1.c2 = t2.c2 / t2.c3", + "Plan": [ + "Projection 12500.00 root test.t.c1, test.t.c2, test.t.c3, test.t.c4, test.t.c5, test.t.c1, test.t.c2, test.t.c3, test.t.c4, test.t.c5", + "└─TableReader 12500.00 root data:ExchangeSender", + " └─ExchangeSender 12500.00 cop[tiflash] ExchangeType: PassThrough", + " └─HashJoin 12500.00 cop[tiflash] inner join, equal:[eq(Column#17, Column#14)]", + " ├─ExchangeReceiver(Build) 10000.00 cop[tiflash] ", + " │ └─ExchangeSender 10000.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: Column#17", + " │ └─Projection 10000.00 cop[tiflash] test.t.c1, test.t.c2, test.t.c3, test.t.c4, test.t.c5, Column#13, cast(Column#13, decimal(17,9) BINARY)->Column#17", + " │ └─Projection 10000.00 cop[tiflash] test.t.c1, test.t.c2, test.t.c3, test.t.c4, test.t.c5, plus(test.t.c1, test.t.c2)->Column#13", + " │ └─TableFullScan 10000.00 cop[tiflash] table:t1 keep order:false, stats:pseudo", + " └─ExchangeReceiver(Probe) 10000.00 cop[tiflash] ", + " └─ExchangeSender 10000.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: Column#14", + " └─Projection 10000.00 cop[tiflash] test.t.c1, test.t.c2, test.t.c3, test.t.c4, test.t.c5, div(test.t.c2, test.t.c3)->Column#14", + " └─TableFullScan 10000.00 cop[tiflash] table:t2 keep order:false, stats:pseudo" + ] + }, + { + "SQL": "desc format = 'brief' select * from t t1 where exists (select * from t t2 where t1.c1 = t2.c2 and t1.c2 = t2.c3 and t1.c3 = t2.c1 and t1.c4 = t2.c3 and t1.c1 = t2.c5)", + "Plan": [ + "Projection 7984.01 root test.t.c1, test.t.c2, test.t.c3, test.t.c4, test.t.c5", + "└─TableReader 7984.01 root data:ExchangeSender", + " └─ExchangeSender 7984.01 cop[tiflash] ExchangeType: PassThrough", + " └─HashJoin 7984.01 cop[tiflash] semi join, equal:[eq(test.t.c1, test.t.c2) eq(Column#13, Column#14) eq(Column#15, Column#16) eq(test.t.c4, test.t.c3) eq(Column#17, test.t.c5)]", + " ├─ExchangeReceiver(Build) 9970.03 cop[tiflash] ", + " │ └─ExchangeSender 9970.03 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.t.c2, Column#14, Column#16, test.t.c3, test.t.c5", + " │ └─Projection 9970.03 cop[tiflash] test.t.c1, test.t.c2, test.t.c3, test.t.c5, cast(test.t.c3, decimal(10,5))->Column#14, cast(test.t.c1, decimal(10,5))->Column#16", + " │ └─Selection 9970.03 cop[tiflash] not(isnull(test.t.c1)), not(isnull(test.t.c2)), not(isnull(test.t.c5))", + " │ └─TableFullScan 10000.00 cop[tiflash] table:t2 keep order:false, stats:pseudo", + " └─ExchangeReceiver(Probe) 9980.01 cop[tiflash] ", + " └─ExchangeSender 9980.01 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.t.c1, Column#13, Column#15, test.t.c4, Column#17", + " └─Projection 9980.01 cop[tiflash] test.t.c1, test.t.c2, test.t.c3, test.t.c4, test.t.c5, cast(test.t.c2, decimal(10,5))->Column#13, cast(test.t.c3, decimal(10,5))->Column#15, cast(test.t.c1, decimal(40,20))->Column#17", + " └─Selection 9980.01 cop[tiflash] not(isnull(test.t.c1)), not(isnull(test.t.c2))", + " └─TableFullScan 10000.00 cop[tiflash] table:t1 keep order:false, stats:pseudo" + ] + }, + { + "SQL": "desc format = 'brief' select * from t t1 left join t t2 on t1.c1 = t2.c2 join t t3 on t2.c5 = t3.c3 right join t t4 on t3.c3 = t4.c4 ", + "Plan": [ + "TableReader 19492.21 root data:ExchangeSender", + "└─ExchangeSender 19492.21 cop[tiflash] ExchangeType: PassThrough", + " └─HashJoin 19492.21 cop[tiflash] right outer join, equal:[eq(test.t.c3, test.t.c4)]", + " ├─Projection(Build) 15593.77 cop[tiflash] test.t.c1, test.t.c2, test.t.c3, test.t.c4, test.t.c5, test.t.c1, test.t.c2, test.t.c3, test.t.c4, test.t.c5, test.t.c1, test.t.c2, test.t.c3, test.t.c4, test.t.c5", + " │ └─HashJoin 15593.77 cop[tiflash] inner join, equal:[eq(test.t.c5, Column#25)]", + " │ ├─ExchangeReceiver(Build) 10000.00 cop[tiflash] ", + " │ │ └─ExchangeSender 10000.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: Column#25", + " │ │ └─Projection 10000.00 cop[tiflash] test.t.c1, test.t.c2, test.t.c3, test.t.c4, test.t.c5, cast(test.t.c3, decimal(40,20))->Column#25", + " │ │ └─TableFullScan 10000.00 cop[tiflash] table:t3 keep order:false, stats:pseudo", + " │ └─ExchangeReceiver(Probe) 12475.01 cop[tiflash] ", + " │ └─ExchangeSender 12475.01 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.t.c5", + " │ └─HashJoin 12475.01 cop[tiflash] inner join, equal:[eq(test.t.c2, test.t.c1)]", + " │ ├─ExchangeReceiver(Build) 9980.01 cop[tiflash] ", + " │ │ └─ExchangeSender 9980.01 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.t.c2", + " │ │ └─Selection 9980.01 cop[tiflash] not(isnull(test.t.c2)), not(isnull(test.t.c5))", + " │ │ └─TableFullScan 10000.00 cop[tiflash] table:t2 keep order:false, stats:pseudo", + " │ └─ExchangeReceiver(Probe) 9990.00 cop[tiflash] ", + " │ └─ExchangeSender 9990.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.t.c1", + " │ └─Selection 9990.00 cop[tiflash] not(isnull(test.t.c1))", + " │ └─TableFullScan 10000.00 cop[tiflash] table:t1 keep order:false, stats:pseudo", + " └─ExchangeReceiver(Probe) 10000.00 cop[tiflash] ", + " └─ExchangeSender 10000.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.t.c4", + " └─TableFullScan 10000.00 cop[tiflash] table:t4 keep order:false, stats:pseudo" + ] + } + ] + }, { "Name": "TestPushDownAggForMPP", "Cases": [