Skip to content

Commit

Permalink
cherry pick pingcap#15727 to release-3.0
Browse files Browse the repository at this point in the history
Signed-off-by: sre-bot <sre-bot@pingcap.com>
  • Loading branch information
XuHuaiyu authored and sre-bot committed Mar 27, 2020
1 parent c6d91a1 commit 390eb7e
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 45 deletions.
64 changes: 26 additions & 38 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2179,65 +2179,53 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context

// buildKvRangesForIndexJoin builds kv ranges for index join when the inner plan is index scan plan.
func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, lookUpContents []*indexJoinLookUpContent,
ranges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) ([]kv.KeyRange, error) {
ranges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (_ []kv.KeyRange, err error) {
kvRanges := make([]kv.KeyRange, 0, len(ranges)*len(lookUpContents))
lastPos := len(ranges[0].LowVal) - 1
sc := ctx.GetSessionVars().StmtCtx
tmpDatumRanges := make([]*ranger.Range, 0, len(lookUpContents))
for _, content := range lookUpContents {
for _, ran := range ranges {
for keyOff, idxOff := range keyOff2IdxOff {
ran.LowVal[idxOff] = content.keys[keyOff]
ran.HighVal[idxOff] = content.keys[keyOff]
}
}
if cwc != nil {
nextColRanges, err := cwc.BuildRangesByRow(ctx, content.row)
if cwc == nil {
tmpKvRanges, err := distsql.IndexRangesToKVRanges(sc, tableID, indexID, ranges, nil)
if err != nil {
return nil, err
}
for _, nextColRan := range nextColRanges {
for _, ran := range ranges {
ran.LowVal[lastPos] = nextColRan.LowVal[0]
ran.HighVal[lastPos] = nextColRan.HighVal[0]
ran.LowExclude = nextColRan.LowExclude
ran.HighExclude = nextColRan.HighExclude
}
tmpKvRanges, err := distsql.IndexRangesToKVRanges(sc, tableID, indexID, ranges, nil)
if err != nil {
return nil, errors.Trace(err)
}
kvRanges = append(kvRanges, tmpKvRanges...)
}
kvRanges = append(kvRanges, tmpKvRanges...)
continue
}

tmpKvRanges, err := distsql.IndexRangesToKVRanges(sc, tableID, indexID, ranges, nil)
nextColRanges, err := cwc.BuildRangesByRow(ctx, content.row)
if err != nil {
return nil, err
}
kvRanges = append(kvRanges, tmpKvRanges...)
}
// Sort and merge the overlapped ranges.
sort.Slice(kvRanges, func(i, j int) bool {
return bytes.Compare(kvRanges[i].StartKey, kvRanges[j].StartKey) < 0
})
if cwc != nil {
// If cwc is not nil, we need to merge the overlapped ranges here.
mergedKeyRanges := make([]kv.KeyRange, 0, len(kvRanges))
for i := range kvRanges {
if len(mergedKeyRanges) == 0 {
mergedKeyRanges = append(mergedKeyRanges, kvRanges[i])
continue
}
if bytes.Compare(kvRanges[i].StartKey, mergedKeyRanges[len(mergedKeyRanges)-1].EndKey) <= 0 {
mergedKeyRanges[len(mergedKeyRanges)-1].EndKey = kvRanges[i].EndKey
} else {
mergedKeyRanges = append(mergedKeyRanges, kvRanges[i])
for _, nextColRan := range nextColRanges {
for _, ran := range ranges {
ran.LowVal[lastPos] = nextColRan.LowVal[0]
ran.HighVal[lastPos] = nextColRan.HighVal[0]
ran.LowExclude = nextColRan.LowExclude
ran.HighExclude = nextColRan.HighExclude
tmpDatumRanges = append(tmpDatumRanges, ran.Clone())
}
}
return mergedKeyRanges, nil
}
return kvRanges, nil

if cwc == nil {
sort.Slice(kvRanges, func(i, j int) bool {
return bytes.Compare(kvRanges[i].StartKey, kvRanges[j].StartKey) < 0
})
return kvRanges, nil
}

tmpDatumRanges, err = ranger.UnionRanges(ctx.GetSessionVars().StmtCtx, tmpDatumRanges)
if err != nil {
return nil, err
}
return distsql.IndexRangesToKVRanges(ctx.GetSessionVars().StmtCtx, tableID, indexID, tmpDatumRanges, nil)
}

func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) *WindowExec {
Expand Down
154 changes: 154 additions & 0 deletions executor/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,82 @@ func (s *testSuite2) TestIndexLookupJoin(c *C) {
tk.MustQuery("select /*+ TIDB_INLJ(s) */ count(*) from t join s use index(idx) on s.a = t.a and s.b < t.b").Check(testkit.Rows("64"))
}

<<<<<<< HEAD
func (s *testSuite2) TestMergejoinOrder(c *C) {
=======
func (s *testSuiteJoin1) TestIndexNestedLoopHashJoin(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("set @@tidb_init_chunk_size=2")
tk.MustExec("set @@tidb_index_join_batch_size=10")
tk.MustExec("DROP TABLE IF EXISTS t, s")
tk.MustExec("create table t(pk int primary key, a int)")
for i := 0; i < 100; i++ {
tk.MustExec(fmt.Sprintf("insert into t values(%d, %d)", i, i))
}
tk.MustExec("create table s(a int primary key)")
for i := 0; i < 100; i++ {
if rand.Float32() < 0.3 {
tk.MustExec(fmt.Sprintf("insert into s values(%d)", i))
} else {
tk.MustExec(fmt.Sprintf("insert into s values(%d)", i*100))
}
}
tk.MustExec("analyze table t")
tk.MustExec("analyze table s")
// Test IndexNestedLoopHashJoin keepOrder.
tk.MustQuery("explain select /*+ INL_HASH_JOIN(s) */ * from t left join s on t.a=s.a order by t.pk").Check(testkit.Rows(
"IndexHashJoin_28 100.00 root left outer join, inner:TableReader_22, outer key:test.t.a, inner key:test.s.a",
"├─TableReader_30(Build) 100.00 root data:TableFullScan_29",
"│ └─TableFullScan_29 100.00 cop[tikv] table:t keep order:true",
"└─TableReader_22(Probe) 1.00 root data:TableRangeScan_21",
" └─TableRangeScan_21 1.00 cop[tikv] table:s range: decided by [test.t.a], keep order:false",
))
rs := tk.MustQuery("select /*+ INL_HASH_JOIN(s) */ * from t left join s on t.a=s.a order by t.pk")
for i, row := range rs.Rows() {
c.Assert(row[0].(string), Equals, fmt.Sprintf("%d", i))
}
}

func (s *testSuiteJoin3) TestIssue15686(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t, k;")
tk.MustExec("create table k (a int, pk int primary key, index(a));")
tk.MustExec("create table t (a int, pk int primary key, index(a));")
tk.MustExec("insert into k values(0,8),(0,23),(1,21),(1,33),(1,52),(2,17),(2,34),(2,39),(2,40),(2,66),(2,67),(3,9),(3,25),(3,41),(3,48),(4,4),(4,11),(4,15),(4,26),(4,27),(4,31),(4,35),(4,45),(4,47),(4,49);")
tk.MustExec("insert into t values(3,4),(3,5),(3,27),(3,29),(3,57),(3,58),(3,79),(3,84),(3,92),(3,95);")
tk.MustQuery("select /*+ inl_join(t) */ count(*) from k left join t on k.a = t.a and k.pk > t.pk;").Check(testkit.Rows("33"))
tk.MustQuery("select /*+ inl_hash_join(t) */ count(*) from k left join t on k.a = t.a and k.pk > t.pk;").Check(testkit.Rows("33"))
tk.MustQuery("select /*+ inl_merge_join(t) */ count(*) from k left join t on k.a = t.a and k.pk > t.pk;").Check(testkit.Rows("33"))
}

func (s *testSuiteJoin3) TestIssue13449(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t, s;")
tk.MustExec("create table t(a int, index(a));")
tk.MustExec("create table s(a int, index(a));")
for i := 1; i <= 128; i++ {
tk.MustExec(fmt.Sprintf("insert into t values(%d)", i))
}
tk.MustExec("insert into s values(1), (128)")
tk.MustExec("set @@tidb_max_chunk_size=32;")
tk.MustExec("set @@tidb_index_lookup_join_concurrency=1;")
tk.MustExec("set @@tidb_index_join_batch_size=32;")

tk.MustQuery("desc select /*+ INL_HASH_JOIN(s) */ * from t join s on t.a=s.a order by t.a;").Check(testkit.Rows(
"IndexHashJoin_35 12487.50 root inner join, inner:IndexReader_27, outer key:test.t.a, inner key:test.s.a",
"├─IndexReader_37(Build) 9990.00 root index:IndexFullScan_36",
"│ └─IndexFullScan_36 9990.00 cop[tikv] table:t, index:a(a) keep order:true, stats:pseudo",
"└─IndexReader_27(Probe) 1.25 root index:Selection_26",
" └─Selection_26 1.25 cop[tikv] not(isnull(test.s.a))",
" └─IndexRangeScan_25 1.25 cop[tikv] table:s, index:a(a) range: decided by [eq(test.s.a, test.t.a)], keep order:false, stats:pseudo"))
tk.MustQuery("select /*+ INL_HASH_JOIN(s) */ * from t join s on t.a=s.a order by t.a;").Check(testkit.Rows("1 1", "128 128"))
}

func (s *testSuiteJoin3) TestMergejoinOrder(c *C) {
>>>>>>> 15c3b74... util, executor: use UnionRanges build index kv range for INLJ (#15727)
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2;")
Expand Down Expand Up @@ -1428,7 +1503,86 @@ func (s *testSuite2) TestIssue11390(c *C) {
tk.MustQuery("select /*+ TIDB_INLJ(t1, t2) */ * from 11390t t1, 11390t t2 where t1.k2 > 0 and t1.k2 = t2.k2 and t2.k1=1;").Check(testkit.Rows("1 1 1 1"))
}

<<<<<<< HEAD
func (s *testSuite2) TestIssue14514(c *C) {
=======
func (s *testSuiteJoinSerial) TestOuterTableBuildHashTableIsuse13933(c *C) {
plannercore.ForceUseOuterBuild4Test = true
defer func() { plannercore.ForceUseOuterBuild4Test = false }()
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t, s")
tk.MustExec("create table t (a int,b int)")
tk.MustExec("create table s (a int,b int)")
tk.MustExec("insert into t values (11,11),(1,2)")
tk.MustExec("insert into s values (1,2),(2,1),(11,11)")
tk.MustQuery("select * from t left join s on s.a > t.a").Sort().Check(testkit.Rows("1 2 11 11", "1 2 2 1", "11 11 <nil> <nil>"))
tk.MustQuery("explain select * from t left join s on s.a > t.a").Check(testkit.Rows(
"HashJoin_6 99900000.00 root CARTESIAN left outer join, other cond:gt(test.s.a, test.t.a)",
"├─TableReader_8(Build) 10000.00 root data:TableFullScan_7",
"│ └─TableFullScan_7 10000.00 cop[tikv] table:t keep order:false, stats:pseudo",
"└─TableReader_11(Probe) 9990.00 root data:Selection_10",
" └─Selection_10 9990.00 cop[tikv] not(isnull(test.s.a))",
" └─TableFullScan_9 10000.00 cop[tikv] table:s keep order:false, stats:pseudo"))
tk.MustExec("drop table if exists t, s")
tk.MustExec("Create table s (a int, b int, key(b))")
tk.MustExec("Create table t (a int, b int, key(b))")
tk.MustExec("Insert into s values (1,2),(2,1),(11,11)")
tk.MustExec("Insert into t values (11,2),(1,2),(5,2)")
tk.MustQuery("select /*+ INL_HASH_JOIN(s)*/ * from t left join s on s.b=t.b and s.a < t.a;").Sort().Check(testkit.Rows("1 2 <nil> <nil>", "11 2 1 2", "5 2 1 2"))
tk.MustQuery("explain select /*+ INL_HASH_JOIN(s)*/ * from t left join s on s.b=t.b and s.a < t.a;").Check(testkit.Rows(
"IndexHashJoin_22 12475.01 root left outer join, inner:IndexLookUp_11, outer key:test.t.b, inner key:test.s.b, other cond:lt(test.s.a, test.t.a)",
"├─TableReader_24(Build) 10000.00 root data:TableFullScan_23",
"│ └─TableFullScan_23 10000.00 cop[tikv] table:t keep order:false, stats:pseudo",
"└─IndexLookUp_11(Probe) 1.25 root ",
" ├─Selection_9(Build) 1.25 cop[tikv] not(isnull(test.s.b))",
" │ └─IndexRangeScan_7 1.25 cop[tikv] table:s, index:b(b) range: decided by [eq(test.s.b, test.t.b)], keep order:false, stats:pseudo",
" └─Selection_10(Probe) 1.25 cop[tikv] not(isnull(test.s.a))",
" └─TableRowIDScan_8 1.25 cop[tikv] table:s keep order:false, stats:pseudo"))
}

func (s *testSuiteJoin1) TestIssue13177(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1(a varchar(20), b int, c int)")
tk.MustExec("create table t2(a varchar(20), b int, c int, primary key(a, b))")
tk.MustExec("insert into t1 values(\"abcd\", 1, 1), (\"bacd\", 2, 2), (\"cbad\", 3, 3)")
tk.MustExec("insert into t2 values(\"bcd\", 1, 1), (\"acd\", 2, 2), (\"bad\", 3, 3)")
tk.MustQuery("select /*+ inl_join(t1, t2) */ * from t1 join t2 on substr(t1.a, 2, 4) = t2.a and t1.b = t2.b where t1.c between 1 and 5").Sort().Check(testkit.Rows(
"abcd 1 1 bcd 1 1",
"bacd 2 2 acd 2 2",
"cbad 3 3 bad 3 3",
))
tk.MustQuery("select /*+ inl_hash_join(t1, t2) */ * from t1 join t2 on substr(t1.a, 2, 4) = t2.a and t1.b = t2.b where t1.c between 1 and 5").Sort().Check(testkit.Rows(
"abcd 1 1 bcd 1 1",
"bacd 2 2 acd 2 2",
"cbad 3 3 bad 3 3",
))
tk.MustQuery("select /*+ inl_merge_join(t1, t2) */ * from t1 join t2 on substr(t1.a, 2, 4) = t2.a and t1.b = t2.b where t1.c between 1 and 5").Sort().Check(testkit.Rows(
"abcd 1 1 bcd 1 1",
"bacd 2 2 acd 2 2",
"cbad 3 3 bad 3 3",
))
tk.MustQuery("select /*+ inl_join(t1, t2) */ t1.* from t1 join t2 on substr(t1.a, 2, 4) = t2.a and t1.b = t2.b where t1.c between 1 and 5").Sort().Check(testkit.Rows(
"abcd 1 1",
"bacd 2 2",
"cbad 3 3",
))
tk.MustQuery("select /*+ inl_hash_join(t1, t2) */ t1.* from t1 join t2 on substr(t1.a, 2, 4) = t2.a and t1.b = t2.b where t1.c between 1 and 5").Sort().Check(testkit.Rows(
"abcd 1 1",
"bacd 2 2",
"cbad 3 3",
))
tk.MustQuery("select /*+ inl_merge_join(t1, t2) */ t1.* from t1 join t2 on substr(t1.a, 2, 4) = t2.a and t1.b = t2.b where t1.c between 1 and 5").Sort().Check(testkit.Rows(
"abcd 1 1",
"bacd 2 2",
"cbad 3 3",
))
}

func (s *testSuiteJoin1) TestIssue14514(c *C) {
>>>>>>> 15c3b74... util, executor: use UnionRanges build index kv range for INLJ (#15727)
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
Expand Down
2 changes: 1 addition & 1 deletion util/ranger/detacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func detachDNFCondAndBuildRangeForIndex(sctx sessionctx.Context, condition *expr
}
}

totalRanges, err := unionRanges(sc, totalRanges)
totalRanges, err := UnionRanges(sc, totalRanges)
if err != nil {
return nil, nil, false, errors.Trace(err)
}
Expand Down
13 changes: 7 additions & 6 deletions util/ranger/ranger.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func buildColumnRange(accessConditions []expression.Expression, sc *stmtctx.Stat
ran.HighExclude = false
}
}
ranges, err = unionRanges(sc, ranges)
ranges, err = UnionRanges(sc, ranges)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -340,7 +340,7 @@ func buildCNFIndexRange(sc *stmtctx.StatementContext, cols []*expression.Column,
// Take prefix index into consideration.
if hasPrefix(lengths) {
if fixPrefixColRange(ranges, lengths, newTp) {
ranges, err = unionRanges(sc, ranges)
ranges, err = UnionRanges(sc, ranges)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -356,7 +356,11 @@ type sortRange struct {
encodedEnd []byte
}

func unionRanges(sc *stmtctx.StatementContext, ranges []*Range) ([]*Range, error) {
// UnionRanges sorts `ranges`, union adjacent ones if possible.
// For two intervals [a, b], [c, d], we have guaranteed that a <= c. If b >= c. Then two intervals are overlapped.
// And this two can be merged as [a, max(b, d)].
// Otherwise they aren't overlapped.
func UnionRanges(sc *stmtctx.StatementContext, ranges []*Range) ([]*Range, error) {
if len(ranges) == 0 {
return nil, nil
}
Expand Down Expand Up @@ -384,9 +388,6 @@ func unionRanges(sc *stmtctx.StatementContext, ranges []*Range) ([]*Range, error
ranges = ranges[:0]
lastRange := objects[0]
for i := 1; i < len(objects); i++ {
// For two intervals [a, b], [c, d], we have guaranteed that a >= c. If b >= c. Then two intervals are overlapped.
// And this two can be merged as [a, max(b, d)].
// Otherwise they aren't overlapped.
if bytes.Compare(lastRange.encodedEnd, objects[i].encodedStart) >= 0 {
if bytes.Compare(lastRange.encodedEnd, objects[i].encodedEnd) < 0 {
lastRange.encodedEnd = objects[i].encodedEnd
Expand Down

0 comments on commit 390eb7e

Please sign in to comment.