diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 356985692713b..e2c27a362bb9b 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -1574,16 +1574,25 @@ func (p *LogicalJoin) tryToGetBroadCastJoin(prop *property.PhysicalProperty) []P return nil } - if p.JoinType != InnerJoin || len(p.LeftConditions) != 0 || len(p.RightConditions) != 0 || len(p.OtherConditions) != 0 || len(p.EqualConditions) == 0 { + // for left join the global idx must be 1, and for right join the global idx must be 0 + if (p.JoinType != InnerJoin && p.JoinType != LeftOuterJoin && p.JoinType != RightOuterJoin) || len(p.LeftConditions) != 0 || len(p.RightConditions) != 0 || len(p.OtherConditions) != 0 || len(p.EqualConditions) == 0 { return nil } if hasPrefer, idx := p.getPreferredBCJLocalIndex(); hasPrefer { + if (idx == 0 && p.JoinType == RightOuterJoin) || (idx == 1 && p.JoinType == LeftOuterJoin) { + return nil + } return p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 1-idx) } - results := p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 0) - results = append(results, p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 1)...) - return results + if p.JoinType == InnerJoin { + results := p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 0) + results = append(results, p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 1)...) + return results + } else if p.JoinType == LeftOuterJoin { + return p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 1) + } + return p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 0) } func (p *LogicalJoin) tryToGetBroadCastJoinByPreferGlobalIdx(prop *property.PhysicalProperty, preferredGlobalIndex int) []PhysicalPlan { diff --git a/planner/core/explain.go b/planner/core/explain.go index 33ad947ff3819..e01f6d9244e6d 100644 --- a/planner/core/explain.go +++ b/planner/core/explain.go @@ -602,6 +602,32 @@ func (p *PhysicalMergeJoin) ExplainNormalizedInfo() string { return p.explainInfo(true) } +// ExplainInfo implements Plan interface. +func (p *PhysicalBroadCastJoin) ExplainInfo() string { + return p.explainInfo() +} + +// ExplainNormalizedInfo implements Plan interface. +func (p *PhysicalBroadCastJoin) ExplainNormalizedInfo() string { + return p.explainInfo() +} + +func (p *PhysicalBroadCastJoin) explainInfo() string { + buffer := new(bytes.Buffer) + + buffer.WriteString(p.JoinType.String()) + + if len(p.LeftJoinKeys) > 0 { + fmt.Fprintf(buffer, ", left key:%s", + expression.ExplainColumnList(p.LeftJoinKeys)) + } + if len(p.RightJoinKeys) > 0 { + fmt.Fprintf(buffer, ", right key:%s", + expression.ExplainColumnList(p.RightJoinKeys)) + } + return buffer.String() +} + // ExplainInfo implements Plan interface. func (p *PhysicalTopN) ExplainInfo() string { buffer := bytes.NewBufferString("") diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index c93102516e534..1900174d04c85 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -404,8 +404,8 @@ func (s *testIntegrationSerialSuite) TestBroadcastJoin(c *C) { res.Check(testkit.Rows(output[i].Plan...)) } - // out join not supported - _, err := tk.Exec("explain select /*+ broadcast_join(fact_t, d1_t) */ count(*) from fact_t left join d1_t on fact_t.d1_k = d1_t.d1_k") + // out table of out join should not be global + _, err := tk.Exec("explain select /*+ broadcast_join(fact_t, d1_t), broadcast_join_local(d1_t) */ count(*) from fact_t left join d1_t on fact_t.d1_k = d1_t.d1_k") c.Assert(err, NotNil) c.Assert(err.Error(), Equals, "[planner:1815]Internal : Can't find a proper physical plan for this query") // join with non-equal condition not supported diff --git a/planner/core/plan_to_pb.go b/planner/core/plan_to_pb.go index 1785275c3ca98..bccaacce3aa22 100644 --- a/planner/core/plan_to_pb.go +++ b/planner/core/plan_to_pb.go @@ -254,8 +254,15 @@ func (p *PhysicalBroadCastJoin) ToPB(ctx sessionctx.Context, storeType kv.StoreT if err != nil { return nil, err } + pbJoinType := tipb.JoinType_TypeInnerJoin + switch p.JoinType { + case LeftOuterJoin: + pbJoinType = tipb.JoinType_TypeLeftOuterJoin + case RightOuterJoin: + pbJoinType = tipb.JoinType_TypeRightOuterJoin + } join := &tipb.Join{ - JoinType: tipb.JoinType_TypeInnerJoin, + JoinType: pbJoinType, JoinExecType: tipb.JoinExecType_TypeHashJoin, InnerIdx: int64(p.InnerChildIdx), LeftJoinKeys: left, diff --git a/planner/core/testdata/integration_serial_suite_in.json b/planner/core/testdata/integration_serial_suite_in.json index eb17fe5e54308..241dd2e3a9f51 100644 --- a/planner/core/testdata/integration_serial_suite_in.json +++ b/planner/core/testdata/integration_serial_suite_in.json @@ -12,7 +12,9 @@ "explain select /*+ broadcast_join(fact_t,d1_t) */ count(*) from fact_t, d1_t where fact_t.d1_k = d1_t.d1_k", "explain select /*+ broadcast_join(fact_t,d1_t,d2_t,d3_t) */ count(*) from fact_t, d1_t, d2_t, d3_t where fact_t.d1_k = d1_t.d1_k and fact_t.d2_k = d2_t.d2_k and fact_t.d3_k = d3_t.d3_k", "explain select /*+ broadcast_join(fact_t,d1_t), broadcast_join_local(d1_t) */ count(*) from fact_t, d1_t where fact_t.d1_k = d1_t.d1_k", - "explain select /*+ broadcast_join(fact_t,d1_t,d2_t,d3_t), broadcast_join_local(d2_t) */ count(*) from fact_t, d1_t, d2_t, d3_t where fact_t.d1_k = d1_t.d1_k and fact_t.d2_k = d2_t.d2_k and fact_t.d3_k = d3_t.d3_k" + "explain select /*+ broadcast_join(fact_t,d1_t,d2_t,d3_t), broadcast_join_local(d2_t) */ count(*) from fact_t, d1_t, d2_t, d3_t where fact_t.d1_k = d1_t.d1_k and fact_t.d2_k = d2_t.d2_k and fact_t.d3_k = d3_t.d3_k", + "explain select /*+ broadcast_join(fact_t,d1_t) */ count(*) from fact_t left join d1_t on fact_t.d1_k = d1_t.d1_k", + "explain select /*+ broadcast_join(fact_t,d1_t) */ count(*) from fact_t right join d1_t on fact_t.d1_k = d1_t.d1_k" ] }, { diff --git a/planner/core/testdata/integration_serial_suite_out.json b/planner/core/testdata/integration_serial_suite_out.json index 5beb702a949dd..b3be7bdeeab4f 100644 --- a/planner/core/testdata/integration_serial_suite_out.json +++ b/planner/core/testdata/integration_serial_suite_out.json @@ -29,7 +29,7 @@ "StreamAgg_32 1.00 root funcs:count(Column#14)->Column#11", "└─TableReader_33 1.00 root data:StreamAgg_13", " └─StreamAgg_13 1.00 cop[tiflash] funcs:count(1)->Column#14", - " └─BroadcastJoin_31 8.00 cop[tiflash] ", + " └─BroadcastJoin_31 8.00 cop[tiflash] inner join, left key:test.fact_t.d1_k, right key:test.d1_t.d1_k", " ├─Selection_23(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))", " │ └─TableFullScan_22 2.00 cop[tiflash] table:d1_t keep order:false, global read", " └─Selection_21(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k))", @@ -42,13 +42,13 @@ "StreamAgg_52 1.00 root funcs:count(Column#20)->Column#17", "└─TableReader_53 1.00 root data:StreamAgg_17", " └─StreamAgg_17 1.00 cop[tiflash] funcs:count(1)->Column#20", - " └─BroadcastJoin_51 8.00 cop[tiflash] ", + " └─BroadcastJoin_51 8.00 cop[tiflash] inner join, left key:test.fact_t.d3_k, right key:test.d3_t.d3_k", " ├─Selection_43(Build) 2.00 cop[tiflash] not(isnull(test.d3_t.d3_k))", " │ └─TableFullScan_42 2.00 cop[tiflash] table:d3_t keep order:false, global read", - " └─BroadcastJoin_33(Probe) 8.00 cop[tiflash] ", + " └─BroadcastJoin_33(Probe) 8.00 cop[tiflash] inner join, left key:test.fact_t.d2_k, right key:test.d2_t.d2_k", " ├─Selection_29(Build) 2.00 cop[tiflash] not(isnull(test.d2_t.d2_k))", " │ └─TableFullScan_28 2.00 cop[tiflash] table:d2_t keep order:false, global read", - " └─BroadcastJoin_37(Probe) 8.00 cop[tiflash] ", + " └─BroadcastJoin_37(Probe) 8.00 cop[tiflash] inner join, left key:test.fact_t.d1_k, right key:test.d1_t.d1_k", " ├─Selection_27(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))", " │ └─TableFullScan_26 2.00 cop[tiflash] table:d1_t keep order:false, global read", " └─Selection_41(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k)), not(isnull(test.fact_t.d2_k)), not(isnull(test.fact_t.d3_k))", @@ -61,7 +61,7 @@ "StreamAgg_25 1.00 root funcs:count(Column#14)->Column#11", "└─TableReader_26 1.00 root data:StreamAgg_13", " └─StreamAgg_13 1.00 cop[tiflash] funcs:count(1)->Column#14", - " └─BroadcastJoin_24 8.00 cop[tiflash] ", + " └─BroadcastJoin_24 8.00 cop[tiflash] inner join, left key:test.fact_t.d1_k, right key:test.d1_t.d1_k", " ├─Selection_18(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))", " │ └─TableFullScan_17 2.00 cop[tiflash] table:d1_t keep order:false", " └─Selection_16(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k))", @@ -74,18 +74,42 @@ "StreamAgg_36 1.00 root funcs:count(Column#20)->Column#17", "└─TableReader_37 1.00 root data:StreamAgg_17", " └─StreamAgg_17 1.00 cop[tiflash] funcs:count(1)->Column#20", - " └─BroadcastJoin_35 8.00 cop[tiflash] ", + " └─BroadcastJoin_35 8.00 cop[tiflash] inner join, left key:test.fact_t.d3_k, right key:test.d3_t.d3_k", " ├─Selection_29(Build) 2.00 cop[tiflash] not(isnull(test.d3_t.d3_k))", " │ └─TableFullScan_28 2.00 cop[tiflash] table:d3_t keep order:false, global read", - " └─BroadcastJoin_19(Probe) 8.00 cop[tiflash] ", + " └─BroadcastJoin_19(Probe) 8.00 cop[tiflash] inner join, left key:test.fact_t.d2_k, right key:test.d2_t.d2_k", " ├─Selection_27(Build) 2.00 cop[tiflash] not(isnull(test.d2_t.d2_k))", " │ └─TableFullScan_26 2.00 cop[tiflash] table:d2_t keep order:false", - " └─BroadcastJoin_20(Probe) 8.00 cop[tiflash] ", + " └─BroadcastJoin_20(Probe) 8.00 cop[tiflash] inner join, left key:test.fact_t.d1_k, right key:test.d1_t.d1_k", " ├─Selection_25(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))", " │ └─TableFullScan_24 2.00 cop[tiflash] table:d1_t keep order:false, global read", " └─Selection_23(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k)), not(isnull(test.fact_t.d2_k)), not(isnull(test.fact_t.d3_k))", " └─TableFullScan_22 8.00 cop[tiflash] table:fact_t keep order:false, global read" ] + }, + { + "SQL": "explain select /*+ broadcast_join(fact_t,d1_t) */ count(*) from fact_t left join d1_t on fact_t.d1_k = d1_t.d1_k", + "Plan": [ + "StreamAgg_23 1.00 root funcs:count(Column#14)->Column#11", + "└─TableReader_24 1.00 root data:StreamAgg_12", + " └─StreamAgg_12 1.00 cop[tiflash] funcs:count(1)->Column#14", + " └─BroadcastJoin_22 8.00 cop[tiflash] left outer join, left key:test.fact_t.d1_k, right key:test.d1_t.d1_k", + " ├─Selection_16(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))", + " │ └─TableFullScan_15 2.00 cop[tiflash] table:d1_t keep order:false, global read", + " └─TableFullScan_14(Probe) 8.00 cop[tiflash] table:fact_t keep order:false" + ] + }, + { + "SQL": "explain select /*+ broadcast_join(fact_t,d1_t) */ count(*) from fact_t right join d1_t on fact_t.d1_k = d1_t.d1_k", + "Plan": [ + "StreamAgg_23 1.00 root funcs:count(Column#14)->Column#11", + "└─TableReader_24 1.00 root data:StreamAgg_12", + " └─StreamAgg_12 1.00 cop[tiflash] funcs:count(1)->Column#14", + " └─BroadcastJoin_22 8.00 cop[tiflash] right outer join, left key:test.fact_t.d1_k, right key:test.d1_t.d1_k", + " ├─TableFullScan_16(Build) 2.00 cop[tiflash] table:d1_t keep order:false", + " └─Selection_15(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k))", + " └─TableFullScan_14 8.00 cop[tiflash] table:fact_t keep order:false, global read" + ] } ] },