Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: open the partial order prop push down for LIST PARTITION (#40290) #40316

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 102 additions & 11 deletions executor/partition_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package executor_test
import (
"fmt"
"math/rand"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -356,7 +357,7 @@ func TestPartitionInfoDisable(t *testing.T) {
tk.MustQuery("select * from t_info_null where (date = '2020-10-02' or date = '2020-10-06') and app = 'xxx' and media = '19003006'").Check(testkit.Rows())
}

func TestOrderByandLimit(t *testing.T) {
func TestOrderByAndLimit(t *testing.T) {
store := testkit.CreateMockStore(t)

tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -401,6 +402,52 @@ func TestOrderByandLimit(t *testing.T) {
// regular table with clustered index
tk.MustExec("create table tregular_clustered(a int, b int, primary key(a, b) clustered)")

listVals := make([]int, 0, 2000)

for i := 0; i < 2000; i++ {
listVals = append(listVals, i)
}
rand.Shuffle(len(listVals), func(i, j int) {
listVals[i], listVals[j] = listVals[j], listVals[i]
})

var listVals1, listVals2, listVals3 string

for i := 0; i <= 600; i++ {
listVals1 += strconv.Itoa(listVals[i])
if i != 600 {
listVals1 += ","
}
}
for i := 601; i <= 1200; i++ {
listVals2 += strconv.Itoa(listVals[i])
if i != 1200 {
listVals2 += ","
}
}
for i := 1201; i <= 1999; i++ {
listVals3 += strconv.Itoa(listVals[i])
if i != 1999 {
listVals3 += ","
}
}

tk.MustExec(fmt.Sprintf(`create table tlist_intpk(a int primary key, b int) partition by list(a)(
partition p1 values in (%s),
partition p2 values in (%s),
partition p3 values in (%s)
)`, listVals1, listVals2, listVals3))
tk.MustExec(fmt.Sprintf(`create table tlist(a int, b int, index idx_a(a)) partition by list(a)(
partition p1 values in (%s),
partition p2 values in (%s),
partition p3 values in (%s)
)`, listVals1, listVals2, listVals3))
tk.MustExec(fmt.Sprintf(`create table tlist_clustered(a int, b int, primary key(a, b)) partition by list(a)(
partition p1 values in (%s),
partition p2 values in (%s),
partition p3 values in (%s)
)`, listVals1, listVals2, listVals3))

// generate some random data to be inserted
vals := make([]string, 0, 2000)
for i := 0; i < 2000; i++ {
Expand Down Expand Up @@ -429,15 +476,22 @@ func TestOrderByandLimit(t *testing.T) {
dedupMapAB[val] = struct{}{}
}

tk.MustExec("insert into trange values " + strings.Join(vals, ","))
tk.MustExec("insert into thash values " + strings.Join(vals, ","))
tk.MustExec("insert into tregular values " + strings.Join(vals, ","))
tk.MustExec("insert into trange_intpk values " + strings.Join(dedupValsA, ","))
tk.MustExec("insert into thash_intpk values " + strings.Join(dedupValsA, ","))
tk.MustExec("insert into tregular_intpk values " + strings.Join(dedupValsA, ","))
tk.MustExec("insert into trange_clustered values " + strings.Join(dedupValsAB, ","))
tk.MustExec("insert into thash_clustered values " + strings.Join(dedupValsAB, ","))
tk.MustExec("insert into tregular_clustered values " + strings.Join(dedupValsAB, ","))
valInserted := strings.Join(vals, ",")
valDedupAInserted := strings.Join(dedupValsA, ",")
valDedupABInserted := strings.Join(dedupValsAB, ",")

tk.MustExec("insert into trange values " + valInserted)
tk.MustExec("insert into thash values " + valInserted)
tk.MustExec("insert into tlist values" + valInserted)
tk.MustExec("insert into tregular values " + valInserted)
tk.MustExec("insert into trange_intpk values " + valDedupAInserted)
tk.MustExec("insert into thash_intpk values " + valDedupAInserted)
tk.MustExec("insert into tlist_intpk values " + valDedupAInserted)
tk.MustExec("insert into tregular_intpk values " + valDedupAInserted)
tk.MustExec("insert into trange_clustered values " + valDedupABInserted)
tk.MustExec("insert into thash_clustered values " + valDedupABInserted)
tk.MustExec("insert into tlist_clustered values " + valDedupABInserted)
tk.MustExec("insert into tregular_clustered values " + valDedupABInserted)

tk.MustExec("analyze table trange")
tk.MustExec("analyze table trange_intpk")
Expand All @@ -448,14 +502,17 @@ func TestOrderByandLimit(t *testing.T) {
tk.MustExec("analyze table tregular")
tk.MustExec("analyze table tregular_intpk")
tk.MustExec("analyze table tregular_clustered")
tk.MustExec("analyze table tlist")
tk.MustExec("analyze table tlist_intpk")
tk.MustExec("analyze table tlist_clustered")

// Create virtual tiflash replica info.
dom := domain.GetDomain(tk.Session())
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test_orderby_limit"))
require.True(t, exists)
for _, tblInfo := range db.Tables {
if strings.HasPrefix(tblInfo.Name.L, "tr") || strings.HasPrefix(tblInfo.Name.L, "thash") {
if strings.HasPrefix(tblInfo.Name.L, "tr") || strings.HasPrefix(tblInfo.Name.L, "thash") || strings.HasPrefix(tblInfo.Name.L, "tlist") {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
Expand Down Expand Up @@ -487,16 +544,21 @@ func TestOrderByandLimit(t *testing.T) {
maxEle := tk.MustQuery(fmt.Sprintf("select ifnull(max(a), 1100) from (select * from tregular use index(idx_a) where a > %v order by a limit %v) t", x, y)).Rows()[0][0]
queryRangePartitionWithLimitHint := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from trange use index(idx_a) where a > %v and a < greatest(%v+1, %v) order by a limit %v", x, x+1, maxEle, y)
queryHashPartitionWithLimitHint := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from thash use index(idx_a) where a > %v and a < greatest(%v+1, %v) order by a limit %v", x, x+1, maxEle, y)
queryListPartitionWithLimitHint := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from thash use index(idx_a) where a > %v and a < greatest(%v+1, %v) order by a limit %v", x, x+1, maxEle, y)
queryRegular := fmt.Sprintf("select * from tregular use index(idx_a) where a > %v and a < greatest(%v+1, %v) order by a limit %v;", x, x+1, maxEle, y)
require.True(t, tk.HasPlan(queryRangePartitionWithLimitHint, "Limit"))
require.True(t, tk.HasPlan(queryRangePartitionWithLimitHint, "IndexLookUp"))
require.True(t, tk.HasPlan(queryHashPartitionWithLimitHint, "Limit"))
require.True(t, tk.HasPlan(queryHashPartitionWithLimitHint, "IndexLookUp"))
require.True(t, tk.HasPlan(queryListPartitionWithLimitHint, "Limit"))
require.True(t, tk.HasPlan(queryListPartitionWithLimitHint, "IndexLookUp"))
require.True(t, tk.HasPlan(queryRangePartitionWithLimitHint, "TopN")) // but not fully pushed
require.True(t, tk.HasPlan(queryHashPartitionWithLimitHint, "TopN"))
require.True(t, tk.HasPlan(queryListPartitionWithLimitHint, "TopN"))
regularResult := tk.MustQuery(queryRegular).Sort().Rows()
tk.MustQuery(queryRangePartitionWithLimitHint).Sort().Check(regularResult)
tk.MustQuery(queryHashPartitionWithLimitHint).Sort().Check(regularResult)
tk.MustQuery(queryListPartitionWithLimitHint).Sort().Check(regularResult)
}

// test tableReader
Expand All @@ -519,41 +581,54 @@ func TestOrderByandLimit(t *testing.T) {
y := rand.Intn(2000) + 1
queryRangePartition := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from trange ignore index(idx_a) where a > %v order by a, b limit %v;", x, y)
queryHashPartition := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from thash ignore index(idx_a) where a > %v order by a, b limit %v;", x, y)
queryListPartition := fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from tlist ignore index(idx_a) where a > %v order by a, b limit %v;", x, y)
queryRegular := fmt.Sprintf("select * from tregular ignore index(idx_a) where a > %v order by a, b limit %v;", x, y)
require.True(t, tk.HasPlan(queryRangePartition, "TableReader")) // check if tableReader is used
require.True(t, tk.HasPlan(queryHashPartition, "TableReader"))
require.True(t, tk.HasPlan(queryListPartition, "TableReader"))
require.False(t, tk.HasPlan(queryRangePartition, "Limit")) // check if order property is not pushed
require.False(t, tk.HasPlan(queryHashPartition, "Limit"))
require.False(t, tk.HasPlan(queryListPartition, "Limit"))
regularResult := tk.MustQuery(queryRegular).Sort().Rows()
tk.MustQuery(queryRangePartition).Sort().Check(regularResult)
tk.MustQuery(queryHashPartition).Sort().Check(regularResult)
tk.MustQuery(queryListPartition).Sort().Check(regularResult)

// test int pk
// To be simplified, we only read column a.
queryRangePartition = fmt.Sprintf("select /*+ LIMIT_TO_COP() */ a from trange_intpk use index(primary) where a > %v order by a limit %v", x, y)
queryHashPartition = fmt.Sprintf("select /*+ LIMIT_TO_COP() */ a from thash_intpk use index(primary) where a > %v order by a limit %v", x, y)
queryListPartition = fmt.Sprintf("select /*+ LIMIT_TO_COP() */ a from tlist_intpk use index(primary) where a > %v order by a limit %v", x, y)
queryRegular = fmt.Sprintf("select a from tregular_intpk where a > %v order by a limit %v", x, y)
require.True(t, tk.HasPlan(queryRangePartition, "TableReader"))
require.True(t, tk.HasPlan(queryHashPartition, "TableReader"))
require.True(t, tk.HasPlan(queryListPartition, "TableReader"))
require.True(t, tk.HasPlan(queryRangePartition, "Limit")) // check if order property is not pushed
require.True(t, tk.HasPlan(queryHashPartition, "Limit"))
require.True(t, tk.HasPlan(queryListPartition, "Limit"))
regularResult = tk.MustQuery(queryRegular).Rows()
tk.MustQuery(queryRangePartition).Check(regularResult)
tk.MustQuery(queryHashPartition).Check(regularResult)
tk.MustQuery(queryListPartition).Check(regularResult)

// test clustered index
queryRangePartition = fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from trange_clustered use index(primary) where a > %v order by a, b limit %v;", x, y)
queryHashPartition = fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from thash_clustered use index(primary) where a > %v order by a, b limit %v;", x, y)
queryListPartition = fmt.Sprintf("select /*+ LIMIT_TO_COP() */ * from tlist_clustered use index(primary) where a > %v order by a, b limit %v;", x, y)
queryRegular = fmt.Sprintf("select * from tregular_clustered where a > %v order by a, b limit %v;", x, y)
require.True(t, tk.HasPlan(queryRangePartition, "TableReader")) // check if tableReader is used
require.True(t, tk.HasPlan(queryHashPartition, "TableReader"))
require.True(t, tk.HasPlan(queryListPartition, "TableReader"))
require.True(t, tk.HasPlan(queryRangePartition, "Limit")) // check if order property is pushed
require.True(t, tk.HasPlan(queryHashPartition, "Limit"))
require.True(t, tk.HasPlan(queryListPartition, "Limit"))
require.True(t, tk.HasPlan(queryRangePartition, "TopN")) // but not fully pushed
require.True(t, tk.HasPlan(queryHashPartition, "TopN"))
require.True(t, tk.HasPlan(queryListPartition, "TopN"))
regularResult = tk.MustQuery(queryRegular).Rows()
tk.MustQuery(queryRangePartition).Check(regularResult)
tk.MustQuery(queryHashPartition).Check(regularResult)
tk.MustQuery(queryListPartition).Check(regularResult)

tk.MustExec(" set @@tidb_allow_mpp=1;")
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash,tikv\"")
Expand Down Expand Up @@ -591,6 +666,22 @@ func TestOrderByandLimit(t *testing.T) {
require.True(t, tk.HasTiFlashPlan(queryPartitionWithTiFlash))
// but order is not pushed
require.False(t, tk.HasPlan(queryPartitionWithTiFlash, "Limit"), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
queryPartitionWithTiFlash = fmt.Sprintf("select /*+ read_from_storage(tiflash[tlist_intpk]) */ * from tlist_intpk where a > %v order by a limit %v", x, y)
// check if tiflash is used
require.True(t, tk.HasTiFlashPlan(queryPartitionWithTiFlash), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
queryPartitionWithTiFlash = fmt.Sprintf("select /*+ read_from_storage(tiflash[tlist_intpk]) */ /*+ LIMIT_TO_COP() */ * from tlist_intpk where a > %v order by a limit %v", x, y)
// check if tiflash is used
require.True(t, tk.HasTiFlashPlan(queryPartitionWithTiFlash))
// but order is not pushed
require.False(t, tk.HasPlan(queryPartitionWithTiFlash, "Limit"), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
queryPartitionWithTiFlash = fmt.Sprintf("select /*+ read_from_storage(tiflash[tlist_clustered]) */ * from tlist_clustered where a > %v order by a limit %v", x, y)
// check if tiflash is used
require.True(t, tk.HasTiFlashPlan(queryPartitionWithTiFlash), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
queryPartitionWithTiFlash = fmt.Sprintf("select /*+ read_from_storage(tiflash[tlist_clustered]) */ /*+ LIMIT_TO_COP() */ * from tlist_clustered where a > %v order by a limit %v", x, y)
// check if tiflash is used
require.True(t, tk.HasTiFlashPlan(queryPartitionWithTiFlash))
// but order is not pushed
require.False(t, tk.HasPlan(queryPartitionWithTiFlash, "Limit"), fmt.Sprintf("%v", tk.MustQuery("explain "+queryPartitionWithTiFlash).Rows()))
tk.MustExec(" set @@tidb_allow_mpp=0;")
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tikv\"")
}
Expand Down
3 changes: 0 additions & 3 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1064,9 +1064,6 @@ func (p *PhysicalTopN) pushTopNDownToDynamicPartition(copTsk *copTask) (task, bo
if pi == nil {
return nil, false
}
if pi.Type == model.PartitionTypeList {
return nil, false
}

if !copTsk.indexPlanFinished {
// If indexPlan side isn't finished, there's no selection on the table side.
Expand Down