Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add explicit partition pruning to index joins (#32007) #32093

Merged
merged 16 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1295,7 +1295,7 @@ func TestAlterTableTruncatePartitionByList(t *testing.T) {
tk.MustExec(`insert into t values (1),(3),(5),(null)`)
oldTbl := tk.GetTableByName("test", "t")
tk.MustExec(`alter table t truncate partition p1`)
tk.MustQuery("select * from t").Check(testkit.Rows("1", "5", "<nil>"))
tk.MustQuery("select * from t").Sort().Check(testkit.Rows("1", "5", "<nil>"))
tbl := tk.GetTableByName("test", "t")
require.NotNil(t, tbl.Meta().Partition)
part := tbl.Meta().Partition
Expand Down Expand Up @@ -1328,7 +1328,7 @@ func TestAlterTableTruncatePartitionByListColumns(t *testing.T) {
tk.MustExec(`insert into t values (1,'a'),(3,'a'),(5,'a'),(null,null)`)
oldTbl := tk.GetTableByName("test", "t")
tk.MustExec(`alter table t truncate partition p1`)
tk.MustQuery("select * from t").Check(testkit.Rows("1 a", "5 a", "<nil> <nil>"))
tk.MustQuery("select * from t").Sort().Check(testkit.Rows("1 a", "5 a", "<nil> <nil>"))
tbl := tk.GetTableByName("test", "t")
require.NotNil(t, tbl.Meta().Partition)
part := tbl.Meta().Partition
Expand Down
80 changes: 35 additions & 45 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3896,26 +3896,37 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
return nil, err
}
tbInfo := e.table.Meta()
if v.IsCommonHandle {
if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().UseDynamicPartitionPrune() {
if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().UseDynamicPartitionPrune() {
if v.IsCommonHandle {
kvRanges, err := buildKvRangesForIndexJoin(e.ctx, getPhysicalTableID(e.table), -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal)
if err != nil {
return nil, err
}
return builder.buildTableReaderFromKvRanges(ctx, e, kvRanges)
}

tbl, _ := builder.is.TableByID(tbInfo.ID)
pt := tbl.(table.PartitionedTable)
pe, err := tbl.(interface {
PartitionExpr() (*tables.PartitionExpr, error)
}).PartitionExpr()
if err != nil {
return nil, err
}
var kvRanges []kv.KeyRange
handles, _ := dedupHandles(lookUpContents)
return builder.buildTableReaderFromHandles(ctx, e, handles, canReorderHandles)
}
tbl, _ := builder.is.TableByID(tbInfo.ID)
pt := tbl.(table.PartitionedTable)
pe, err := tbl.(interface {
PartitionExpr() (*tables.PartitionExpr, error)
}).PartitionExpr()
if err != nil {
return nil, err
}
partitionInfo := &v.PartitionInfo
usedPartitionList, err := partitionPruning(e.ctx, pt, partitionInfo.PruningConds, partitionInfo.PartitionNames, partitionInfo.Columns, partitionInfo.ColumnNames)
if err != nil {
return nil, err
}
usedPartitions := make(map[int64]table.PhysicalTable, len(usedPartitionList))
for _, p := range usedPartitionList {
usedPartitions[p.GetPhysicalID()] = p
}
var kvRanges []kv.KeyRange
if v.IsCommonHandle {
if len(lookUpContents) > 0 && keyColumnsIncludeAllPartitionColumns(lookUpContents[0].keyCols, pe) {
// In this case we can use dynamic partition pruning.
locateKey := make([]types.Datum, e.Schema().Len())
kvRanges = make([]kv.KeyRange, 0, len(lookUpContents))
for _, content := range lookUpContents {
Expand All @@ -3927,22 +3938,19 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
return nil, err
}
pid := p.GetPhysicalID()
if _, ok := usedPartitions[pid]; !ok {
continue
}
tmp, err := buildKvRangesForIndexJoin(e.ctx, pid, -1, []*indexJoinLookUpContent{content}, indexRanges, keyOff2IdxOff, cwc, nil, interruptSignal)
if err != nil {
return nil, err
}
kvRanges = append(kvRanges, tmp...)
}
} else {
partitionInfo := &v.PartitionInfo
partitions, err := partitionPruning(e.ctx, pt, partitionInfo.PruningConds, partitionInfo.PartitionNames, partitionInfo.Columns, partitionInfo.ColumnNames)
if err != nil {
return nil, err
}
kvRanges = make([]kv.KeyRange, 0, len(partitions)*len(lookUpContents))
for _, p := range partitions {
pid := p.GetPhysicalID()
tmp, err := buildKvRangesForIndexJoin(e.ctx, pid, -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal)
kvRanges = make([]kv.KeyRange, 0, len(usedPartitions)*len(lookUpContents))
for _, p := range usedPartitionList {
tmp, err := buildKvRangesForIndexJoin(e.ctx, p.GetPhysicalID(), -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal)
if err != nil {
return nil, err
}
Expand All @@ -3953,22 +3961,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
}

handles, lookUpContents := dedupHandles(lookUpContents)
if tbInfo.GetPartitionInfo() == nil {
return builder.buildTableReaderFromHandles(ctx, e, handles, canReorderHandles)
}
if !builder.ctx.GetSessionVars().UseDynamicPartitionPrune() {
return builder.buildTableReaderFromHandles(ctx, e, handles, canReorderHandles)
}

tbl, _ := builder.is.TableByID(tbInfo.ID)
pt := tbl.(table.PartitionedTable)
pe, err := tbl.(interface {
PartitionExpr() (*tables.PartitionExpr, error)
}).PartitionExpr()
if err != nil {
return nil, err
}
var kvRanges []kv.KeyRange
if len(lookUpContents) > 0 && keyColumnsIncludeAllPartitionColumns(lookUpContents[0].keyCols, pe) {
locateKey := make([]types.Datum, e.Schema().Len())
kvRanges = make([]kv.KeyRange, 0, len(lookUpContents))
Expand All @@ -3981,19 +3974,16 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
return nil, err
}
pid := p.GetPhysicalID()
if _, ok := usedPartitions[pid]; !ok {
continue
}
handle := kv.IntHandle(content.keys[0].GetInt64())
tmp := distsql.TableHandlesToKVRanges(pid, []kv.Handle{handle})
kvRanges = append(kvRanges, tmp...)
}
} else {
partitionInfo := &v.PartitionInfo
partitions, err := partitionPruning(e.ctx, pt, partitionInfo.PruningConds, partitionInfo.PartitionNames, partitionInfo.Columns, partitionInfo.ColumnNames)
if err != nil {
return nil, err
}
for _, p := range partitions {
pid := p.GetPhysicalID()
tmp := distsql.TableHandlesToKVRanges(pid, handles)
for _, p := range usedPartitionList {
tmp := distsql.TableHandlesToKVRanges(p.GetPhysicalID(), handles)
kvRanges = append(kvRanges, tmp...)
}
}
Expand Down
2 changes: 1 addition & 1 deletion executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ type IndexLookUpExecutor struct {
partitionTableMode bool // if this executor is accessing a partition table
prunedPartitions []table.PhysicalTable // partition tables need to access
partitionRangeMap map[int64][]*ranger.Range
partitionKVRanges [][]kv.KeyRange // kvRanges of each partition table
partitionKVRanges [][]kv.KeyRange // kvRanges of each prunedPartitions

// All fields above are immutable.

Expand Down
1 change: 1 addition & 0 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,7 @@ func (p *LogicalJoin) constructInnerTableScanTask(
Columns: ds.TblCols,
ColumnNames: ds.names,
}
ts.PartitionInfo = copTask.partitionInfo
selStats := ts.stats.Scale(selectivity)
ts.addPushedDownSelection(copTask, selStats)
t := copTask.convertToRootTask(ds.ctx)
Expand Down
24 changes: 23 additions & 1 deletion planner/core/partition_pruner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func TestIssue22898(t *testing.T) {
tk.MustExec("CREATE TABLE NT_RP3763 (COL1 TINYINT(8) SIGNED COMMENT \"NUMERIC NO INDEX\" DEFAULT 41,COL2 VARCHAR(20),COL3 DATETIME,COL4 BIGINT,COL5 FLOAT) PARTITION BY RANGE (COL1 * COL3) (PARTITION P0 VALUES LESS THAN (0),PARTITION P1 VALUES LESS THAN (10),PARTITION P2 VALUES LESS THAN (20),PARTITION P3 VALUES LESS THAN (30),PARTITION P4 VALUES LESS THAN (40),PARTITION P5 VALUES LESS THAN (50),PARTITION PMX VALUES LESS THAN MAXVALUE);")
tk.MustExec("insert into NT_RP3763 (COL1,COL2,COL3,COL4,COL5) values(-82,\"夐齏醕皆磹漋甓崘潮嵙燷渏艂朼洛炷鉢儝鱈肇\",\"5748\\-06\\-26\\ 20:48:49\",-3133527360541070260,-2.624880003397658e+38);")
tk.MustExec("insert into NT_RP3763 (COL1,COL2,COL3,COL4,COL5) values(48,\"簖鹩筈匹眜赖泽騈爷詵赺玡婙Ɇ郝鮙廛賙疼舢\",\"7228\\-12\\-13\\ 02:59:54\",-6181009269190017937,2.7731105531290494e+38);")
tk.MustQuery("select * from `NT_RP3763` where `COL1` in (10, 48, -82);").Check(testkit.Rows("-82 夐齏醕皆磹漋甓崘潮嵙燷渏艂朼洛炷鉢儝鱈肇 5748-06-26 20:48:49 -3133527360541070260 -262488000000000000000000000000000000000", "48 簖鹩筈匹眜赖泽騈爷詵赺玡婙Ɇ郝鮙廛賙疼舢 7228-12-13 02:59:54 -6181009269190017937 277311060000000000000000000000000000000"))
tk.MustQuery("select * from `NT_RP3763` where `COL1` in (10, 48, -82);").Sort().Check(testkit.Rows("-82 夐齏醕皆磹漋甓崘潮嵙燷渏艂朼洛炷鉢儝鱈肇 5748-06-26 20:48:49 -3133527360541070260 -262488000000000000000000000000000000000", "48 簖鹩筈匹眜赖泽騈爷詵赺玡婙Ɇ郝鮙廛賙疼舢 7228-12-13 02:59:54 -6181009269190017937 277311060000000000000000000000000000000"))
tk.MustQuery("select * from `NT_RP3763` where `COL1` in (48);").Check(testkit.Rows("48 簖鹩筈匹眜赖泽騈爷詵赺玡婙Ɇ郝鮙廛賙疼舢 7228-12-13 02:59:54 -6181009269190017937 277311060000000000000000000000000000000"))
}

Expand Down Expand Up @@ -590,3 +590,25 @@ func TestHashPartitionPruning(t *testing.T) {
tk.MustExec("insert into t(col1, col3) values(0, 3522101843073676459);")
tk.MustQuery("SELECT col1, COL3 FROM t WHERE COL1 IN (0,14158354938390,0) AND COL3 IN (3522101843073676459,-2846203247576845955,838395691793635638);").Check(testkit.Rows("0 3522101843073676459"))
}

func TestIssue32007(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("create database Issue32007")
tk.MustExec("USE Issue32007")
tk.MustExec("create table t1 (a int, b tinyint, primary key (a)) partition by range (a) (" +
"partition p0 values less than (5)," +
"partition p1 values less than (20)," +
"partition p2 values less than (30)," +
"partition p3 values less than (40)," +
"partition p4 values less than MAXVALUE)")
tk.MustExec("insert into t1 values (0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (10, 10), (11, 11), (12, 12), (13, 13), (14, 14), (15, 15), (20, 20), (21, 21), (22, 22), (23, 23), (24, 24), (25, 25), (30, 30), (31, 31), (32, 32), (33, 33), (34, 34), (35, 35), (36, 36), (40, 40), (50, 50), (80, 80), (90, 90), (100, 100)")
tk.MustExec("create table t3 (a int, b mediumint, primary key (a))")
tk.MustExec("insert into t3 values (0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (8, 8), (9, 9), (10, 10), (11, 11), (12, 12), (13, 13), (14, 14), (15, 15), (16, 16), (17, 17), (18, 18), (19, 19), (20, 20), (21, 21), (22, 22), (23, 23)")

tk.MustExec("set @@tidb_partition_prune_mode='static'")
tk.MustQuery("select * from t3 where t3.a <> ALL (select t1.a from t1 partition (p0)) order by t3.a").Sort().Check(testkit.Rows("10 10", "11 11", "12 12", "13 13", "14 14", "15 15", "16 16", "17 17", "18 18", "19 19", "20 20", "21 21", "22 22", "23 23", "5 5", "6 6", "7 7", "8 8", "9 9"))
tk.MustExec("set @@tidb_partition_prune_mode='dynamic'")
tk.MustQuery("select * from t3 where t3.a <> ALL (select t1.a from t1 partition (p0)) order by t3.a").Sort().Check(testkit.Rows("10 10", "11 11", "12 12", "13 13", "14 14", "15 15", "16 16", "17 17", "18 18", "19 19", "20 20", "21 21", "22 22", "23 23", "5 5", "6 6", "7 7", "8 8", "9 9"))
}