Skip to content

Commit

Permalink
planner: open the partial order prop push down for LIST PARTITION (#4…
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros authored Jan 4, 2023
1 parent 706c3fa commit 26d4764
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 14 deletions.
113 changes: 102 additions & 11 deletions executor/partition_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package executor_test
import (
"fmt"
"math/rand"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -356,7 +357,7 @@ func TestPartitionInfoDisable(t *testing.T) {
tk.MustQuery("select * from t_info_null where (date = '2020-10-02' or date = '2020-10-06') and app = 'xxx' and media = '19003006'").Check(testkit.Rows())
}

func TestOrderByandLimit(t *testing.T) {
func TestOrderByAndLimit(t *testing.T) {
store := testkit.CreateMockStore(t)

tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -401,6 +402,52 @@ func TestOrderByandLimit(t *testing.T) {
// regular table with clustered index
tk.MustExec("create table tregular_clustered(a int, b int, primary key(a, b) clustered)")

listVals := make([]int, 0, 2000)

for i := 0; i < 2000; i++ {
listVals = append(listVals, i)
}
rand.Shuffle(len(listVals), func(i, j int) {
listVals[i], listVals[j] = listVals[j], listVals[i]
})

var listVals1, listVals2, listVals3 string

for i := 0; i <= 600; i++ {
listVals1 += strconv.Itoa(listVals[i])
if i != 600 {
listVals1 += ","
}
}
for i := 601; i <= 1200; i++ {
listVals2 += strconv.Itoa(listVals[i])
if i != 1200 {
listVals2 += ","
}
}
for i := 1201; i <= 1999; i++ {
listVals3 += strconv.Itoa(listVals[i])
if i != 1999 {
listVals3 += ","
}
}

tk.MustExec(fmt.Sprintf(`create table tlist_intpk(a int primary key, b int) partition by list(a)(
partition p1 values in (%s),
partition p2 values in (%s),
partition p3 values in (%s)
)`, listVals1, listVals2, listVals3))
tk.MustExec(fmt.Sprintf(`create table tlist(a int, b int, index idx_a(a)) partition by list(a)(
partition p1 values in (%s),
partition p2 values in (%s),
partition p3 values in (%s)
)`, listVals1, listVals2, listVals3))
tk.MustExec(fmt.Sprintf(`create table tlist_clustered(a int, b int, primary key(a, b)) partition by list(a)(
partition p1 values in (%s),
partition p2 values in (%s),
partition p3 values in (%s)
)`, listVals1, listVals2, listVals3))

// generate some random data to be inserted
vals := make([]string, 0, 2000)
for i := 0; i < 2000; i++ {
Expand Down Expand Up @@ -429,15 +476,22 @@ func TestOrderByandLimit(t *testing.T) {
dedupMapAB[val] = struct{}{}
}

tk.MustExec("insert into trange values " + strings.Join(vals, ","))
tk.MustExec("insert into thash values " + strings.Join(vals, ","))
tk.MustExec("insert into tregular values " + strings.Join(vals, ","))
tk.MustExec("insert into trange_intpk values " + strings.Join(dedupValsA, ","))
tk.MustExec("insert into thash_intpk values " + strings.Join(dedupValsA, ","))
tk.MustExec("insert into tregular_intpk values " + strings.Join(dedupValsA, ","))
tk.MustExec("insert into trange_clustered values " + strings.Join(dedupValsAB, ","))
tk.MustExec("insert into thash_clustered values " + strings.Join(dedupValsAB, ","))
tk.MustExec("insert into tregular_clustered values " + strings.Join(dedupValsAB, ","))
valInserted := strings.Join(vals, ",")
valDedupAInserted := strings.Join(dedupValsA, ",")
valDedupABInserted := strings.Join(dedupValsAB, ",")

tk.MustExec("insert into trange values " + valInserted)
tk.MustExec("insert into thash values " + valInserted)
tk.MustExec("insert into tlist values" + valInserted)
tk.MustExec("insert into tregular values " + valInserted)
tk.MustExec("insert into trange_intpk values " + valDedupAInserted)
tk.MustExec("insert into thash_intpk values " + valDedupAInserted)
tk.MustExec("insert into tlist_intpk values " + valDedupAInserted)
tk.MustExec("insert into tregular_intpk values " + valDedupAInserted)
tk.MustExec("insert into trange_clustered values " + valDedupABInserted)
tk.MustExec("insert into thash_clustered values " + valDedupABInserted)
tk.MustExec("insert into tlist_clustered values " + valDedupABInserted)
tk.MustExec("insert into tregular_clustered values " + valDedupABInserted)

tk.MustExec("analyze table trange")
tk.MustExec("analyze table trange_intpk")
Expand All @@ -448,14 +502,17 @@ func TestOrderByandLimit(t *testing.T) {
tk.MustExec("analyze table tregular")
tk.MustExec("analyze table tregular_intpk")
tk.MustExec("analyze table tregular_clustered")
tk.MustExec("analyze table tlist")
tk.MustExec("analyze table tlist_intpk")
tk.MustExec("analyze table tlist_clustered")

// Create virtual tiflash replica info.
dom := domain.GetDomain(tk.Session())
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test_orderby_limit"))
require.True(t, exists)
for _, tblInfo := range db.Tables {
if strings.HasPrefix(tblInfo.Name.L, "tr") || strings.HasPrefix(tblInfo.Name.L, "thash") {
if strings.HasPrefix(tblInfo.Name.L, "tr") || strings.HasPrefix(tblInfo.Name.L, "thash") || strings.HasPrefix(tblInfo.Name.L, "tlist") {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
Expand Down Expand Up @@ -487,16 +544,21 @@ func TestOrderByandLimit(t *testing.T) {
maxEle := tk.MustQuery(fmt.Sprintf("select ifnull(max(a), 1100) from (select * from tregular use index(idx_a) where a > %v order by a limit %v) t", x, y)).Rows()[0][0]
queryRangePartitionWithLimitHint := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from trange use index(idx_a) where a > %v and a < greatest(%v+1, %v) order by a limit %v", x, x+1, maxEle, y)
queryHashPartitionWithLimitHint := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from thash use index(idx_a) where a > %v and a < greatest(%v+1, %v) order by a limit %v", x, x+1, maxEle, y)
queryListPartitionWithLimitHint := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from thash use index(idx_a) where a > %v and a < greatest(%v+1, %v) order by a limit %v", x, x+1, maxEle, y)
queryRegular := fmt.Sprintf("select * from tregular use index(idx_a) where a > %v and a < greatest(%v+1, %v) order by a limit %v;", x, x+1, maxEle, y)
require.True(t, tk.HasPlan(queryRangePartitionWithLimitHint, "Limit"))
require.True(t, tk.HasPlan(queryRangePartitionWithLimitHint, "IndexLookUp"))
require.True(t, tk.HasPlan(queryHashPartitionWithLimitHint, "Limit"))
require.True(t, tk.HasPlan(queryHashPartitionWithLimitHint, "IndexLookUp"))
require.True(t, tk.HasPlan(queryListPartitionWithLimitHint, "Limit"))
require.True(t, tk.HasPlan(queryListPartitionWithLimitHint, "IndexLookUp"))
require.True(t, tk.HasPlan(queryRangePartitionWithLimitHint, "TopN")) // but not fully pushed
require.True(t, tk.HasPlan(queryHashPartitionWithLimitHint, "TopN"))
require.True(t, tk.HasPlan(queryListPartitionWithLimitHint, "TopN"))
regularResult := tk.MustQuery(queryRegular).Sort().Rows()
tk.MustQuery(queryRangePartitionWithLimitHint).Sort().Check(regularResult)
tk.MustQuery(queryHashPartitionWithLimitHint).Sort().Check(regularResult)
tk.MustQuery(queryListPartitionWithLimitHint).Sort().Check(regularResult)
}

// test tableReader
Expand All @@ -519,41 +581,54 @@ func TestOrderByandLimit(t *testing.T) {
y := rand.Intn(2000) + 1
queryRangePartition := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from trange ignore index(idx_a) where a > %v order by a, b limit %v;", x, y)
queryHashPartition := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from thash ignore index(idx_a) where a > %v order by a, b limit %v;", x, y)
queryListPartition := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from tlist ignore index(idx_a) where a > %v order by a, b limit %v;", x, y)
queryRegular := fmt.Sprintf("select * from tregular ignore index(idx_a) where a > %v order by a, b limit %v;", x, y)
require.True(t, tk.HasPlan(queryRangePartition, "TableReader")) // check if tableReader is used
require.True(t, tk.HasPlan(queryHashPartition, "TableReader"))
require.True(t, tk.HasPlan(queryListPartition, "TableReader"))
require.False(t, tk.HasPlan(queryRangePartition, "Limit")) // check if order property is not pushed
require.False(t, tk.HasPlan(queryHashPartition, "Limit"))
require.False(t, tk.HasPlan(queryListPartition, "Limit"))
regularResult := tk.MustQuery(queryRegular).Sort().Rows()
tk.MustQuery(queryRangePartition).Sort().Check(regularResult)
tk.MustQuery(queryHashPartition).Sort().Check(regularResult)
tk.MustQuery(queryListPartition).Sort().Check(regularResult)

// test int pk
// To be simplified, we only read column a.
queryRangePartition = fmt.Sprintf("select /*+ LIMIT_TO_COP() */ a from trange_intpk use index(primary) where a > %v order by a limit %v", x, y)
queryHashPartition = fmt.Sprintf("select /*+ LIMIT_TO_COP() */ a from thash_intpk use index(primary) where a > %v order by a limit %v", x, y)
queryListPartition = fmt.Sprintf("select /*+ LIMIT_TO_COP() */ a from tlist_intpk use index(primary) where a > %v order by a limit %v", x, y)
queryRegular = fmt.Sprintf("select a from tregular_intpk where a > %v order by a limit %v", x, y)
require.True(t, tk.HasPlan(queryRangePartition, "TableReader"))
require.True(t, tk.HasPlan(queryHashPartition, "TableReader"))
require.True(t, tk.HasPlan(queryListPartition, "TableReader"))
require.True(t, tk.HasPlan(queryRangePartition, "Limit")) // check if order property is not pushed
require.True(t, tk.HasPlan(queryHashPartition, "Limit"))
require.True(t, tk.HasPlan(queryListPartition, "Limit"))
regularResult = tk.MustQuery(queryRegular).Rows()
tk.MustQuery(queryRangePartition).Check(regularResult)
tk.MustQuery(queryHashPartition).Check(regularResult)
tk.MustQuery(queryListPartition).Check(regularResult)

// test clustered index
queryRangePartition = fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from trange_clustered use index(primary) where a > %v order by a, b limit %v;", x, y)
queryHashPartition = fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from thash_clustered use index(primary) where a > %v order by a, b limit %v;", x, y)
queryListPartition = fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from tlist_clustered use index(primary) where a > %v order by a, b limit %v;", x, y)
queryRegular = fmt.Sprintf("select * from tregular_clustered where a > %v order by a, b limit %v;", x, y)
require.True(t, tk.HasPlan(queryRangePartition, "TableReader")) // check if tableReader is used
require.True(t, tk.HasPlan(queryHashPartition, "TableReader"))
require.True(t, tk.HasPlan(queryListPartition, "TableReader"))
require.True(t, tk.HasPlan(queryRangePartition, "Limit")) // check if order property is pushed
require.True(t, tk.HasPlan(queryHashPartition, "Limit"))
require.True(t, tk.HasPlan(queryListPartition, "Limit"))
require.True(t, tk.HasPlan(queryRangePartition, "TopN")) // but not fully pushed
require.True(t, tk.HasPlan(queryHashPartition, "TopN"))
require.True(t, tk.HasPlan(queryListPartition, "TopN"))
regularResult = tk.MustQuery(queryRegular).Rows()
tk.MustQuery(queryRangePartition).Check(regularResult)
tk.MustQuery(queryHashPartition).Check(regularResult)
tk.MustQuery(queryListPartition).Check(regularResult)

tk.MustExec(" set @@tidb_allow_mpp=1;")
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash,tikv\"")
Expand Down Expand Up @@ -591,6 +666,22 @@ func TestOrderByandLimit(t *testing.T) {
require.True(t, tk.HasTiFlashPlan(queryPartitionWithTiFlash))
// but order is not pushed
require.False(t, tk.HasPlan(queryPartitionWithTiFlash, "Limit"), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
queryPartitionWithTiFlash = fmt.Sprintf("select /*+ read_from_storage(tiflash[tlist_intpk]) */ * from tlist_intpk where a > %v order by a limit %v", x, y)
// check if tiflash is used
require.True(t, tk.HasTiFlashPlan(queryPartitionWithTiFlash), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
queryPartitionWithTiFlash = fmt.Sprintf("select /*+ read_from_storage(tiflash[tlist_intpk]) */ /*+ LIMIT_TO_COP() */ * from tlist_intpk where a > %v order by a limit %v", x, y)
// check if tiflash is used
require.True(t, tk.HasTiFlashPlan(queryPartitionWithTiFlash))
// but order is not pushed
require.False(t, tk.HasPlan(queryPartitionWithTiFlash, "Limit"), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
queryPartitionWithTiFlash = fmt.Sprintf("select /*+ read_from_storage(tiflash[tlist_clustered]) */ * from tlist_clustered where a > %v order by a limit %v", x, y)
// check if tiflash is used
require.True(t, tk.HasTiFlashPlan(queryPartitionWithTiFlash), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
queryPartitionWithTiFlash = fmt.Sprintf("select /*+ read_from_storage(tiflash[tlist_clustered]) */ /*+ LIMIT_TO_COP() */ * from tlist_clustered where a > %v order by a limit %v", x, y)
// check if tiflash is used
require.True(t, tk.HasTiFlashPlan(queryPartitionWithTiFlash))
// but order is not pushed
require.False(t, tk.HasPlan(queryPartitionWithTiFlash, "Limit"), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
tk.MustExec(" set @@tidb_allow_mpp=0;")
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tikv\"")
}
Expand Down
3 changes: 0 additions & 3 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1064,9 +1064,6 @@ func (p *PhysicalTopN) pushTopNDownToDynamicPartition(copTsk *copTask) (task, bo
if pi == nil {
return nil, false
}
if pi.Type == model.PartitionTypeList {
return nil, false
}

if !copTsk.indexPlanFinished {
// If indexPlan side isn't finished, there's no selection on the table side.
Expand Down

0 comments on commit 26d4764

Please sign in to comment.