Skip to content

Commit

Permalink
planner: rewrite count star to count not null column (#39197)
Browse files Browse the repository at this point in the history
close #37165
  • Loading branch information
elsa0520 committed Nov 28, 2022
1 parent 12d1cf6 commit 37bd052
Show file tree
Hide file tree
Showing 17 changed files with 622 additions and 59 deletions.
2 changes: 1 addition & 1 deletion cmd/explaintest/r/access_path_selection.result
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ explain format = 'brief' select count(1) from access_path_selection;
id estRows task access object operator info
StreamAgg 1.00 root funcs:count(Column#18)->Column#4
└─TableReader 1.00 root data:StreamAgg
└─StreamAgg 1.00 cop[tikv] funcs:count(1)->Column#18
└─StreamAgg 1.00 cop[tikv] funcs:count(test.access_path_selection._tidb_rowid)->Column#18
└─TableFullScan 10000.00 cop[tikv] table:access_path_selection keep order:false, stats:pseudo
16 changes: 8 additions & 8 deletions cmd/explaintest/r/clustered_index.result
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,25 @@ explain select count(*) from with_cluster_index.tbl_0 where col_0 < 5429 ;
id estRows task access object operator info
StreamAgg_17 1.00 root funcs:count(Column#8)->Column#6
└─IndexReader_18 1.00 root index:StreamAgg_9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#8
└─StreamAgg_9 1.00 cop[tikv] funcs:count(with_cluster_index.tbl_0.col_0)->Column#8
└─IndexRangeScan_16 798.90 cop[tikv] table:tbl_0, index:idx_3(col_0) range:[-inf,5429), keep order:false
explain select count(*) from wout_cluster_index.tbl_0 where col_0 < 5429 ;
id estRows task access object operator info
StreamAgg_17 1.00 root funcs:count(Column#9)->Column#7
└─IndexReader_18 1.00 root index:StreamAgg_9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(wout_cluster_index.tbl_0.col_0)->Column#9
└─IndexRangeScan_16 798.90 cop[tikv] table:tbl_0, index:idx_3(col_0) range:[-inf,5429), keep order:false
explain select count(*) from with_cluster_index.tbl_0 where col_0 < 41 ;
id estRows task access object operator info
StreamAgg_17 1.00 root funcs:count(Column#8)->Column#6
└─IndexReader_18 1.00 root index:StreamAgg_9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#8
└─StreamAgg_9 1.00 cop[tikv] funcs:count(with_cluster_index.tbl_0.col_0)->Column#8
└─IndexRangeScan_16 41.00 cop[tikv] table:tbl_0, index:idx_3(col_0) range:[-inf,41), keep order:false
explain select count(*) from wout_cluster_index.tbl_0 where col_0 < 41 ;
id estRows task access object operator info
StreamAgg_17 1.00 root funcs:count(Column#9)->Column#7
└─IndexReader_18 1.00 root index:StreamAgg_9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(wout_cluster_index.tbl_0.col_0)->Column#9
└─IndexRangeScan_16 41.00 cop[tikv] table:tbl_0, index:idx_3(col_0) range:[-inf,41), keep order:false
explain select col_14 from with_cluster_index.tbl_2 where col_11 <> '2013-11-01' ;
id estRows task access object operator info
Expand Down Expand Up @@ -109,24 +109,24 @@ explain select count(*) from with_cluster_index.tbl_0 where col_0 <= 0 ;
id estRows task access object operator info
StreamAgg_16 1.00 root funcs:count(Column#8)->Column#6
└─IndexReader_17 1.00 root index:StreamAgg_9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#8
└─StreamAgg_9 1.00 cop[tikv] funcs:count(with_cluster_index.tbl_0.col_0)->Column#8
└─IndexRangeScan_11 1.00 cop[tikv] table:tbl_0, index:idx_3(col_0) range:[-inf,0], keep order:false
explain select count(*) from wout_cluster_index.tbl_0 where col_0 <= 0 ;
id estRows task access object operator info
StreamAgg_16 1.00 root funcs:count(Column#9)->Column#7
└─IndexReader_17 1.00 root index:StreamAgg_9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(wout_cluster_index.tbl_0.col_0)->Column#9
└─IndexRangeScan_11 1.00 cop[tikv] table:tbl_0, index:idx_3(col_0) range:[-inf,0], keep order:false
explain select count(*) from with_cluster_index.tbl_0 where col_0 >= 803163 ;
id estRows task access object operator info
StreamAgg_17 1.00 root funcs:count(Column#8)->Column#6
└─IndexReader_18 1.00 root index:StreamAgg_9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#8
└─StreamAgg_9 1.00 cop[tikv] funcs:count(with_cluster_index.tbl_0.col_0)->Column#8
└─IndexRangeScan_16 109.70 cop[tikv] table:tbl_0, index:idx_3(col_0) range:[803163,+inf], keep order:false
explain select count(*) from wout_cluster_index.tbl_0 where col_0 >= 803163 ;
id estRows task access object operator info
StreamAgg_17 1.00 root funcs:count(Column#9)->Column#7
└─IndexReader_18 1.00 root index:StreamAgg_9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(wout_cluster_index.tbl_0.col_0)->Column#9
└─IndexRangeScan_16 109.70 cop[tikv] table:tbl_0, index:idx_3(col_0) range:[803163,+inf], keep order:false
set @@tidb_enable_outer_join_reorder=false;
4 changes: 2 additions & 2 deletions cmd/explaintest/r/explain_easy.result
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,14 @@ id estRows task access object operator info
Projection 1.00 root 1->Column#6
└─StreamAgg 1.00 root funcs:count(Column#14)->Column#9
└─TableReader 1.00 root data:StreamAgg
└─StreamAgg 1.00 cop[tikv] funcs:count(1)->Column#14
└─StreamAgg 1.00 cop[tikv] funcs:count(test.t1.c1)->Column#14
└─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
explain format = 'brief' select count(1) from (select max(c2), count(c3) as m from t1) k;
id estRows task access object operator info
StreamAgg 1.00 root funcs:count(1)->Column#6
└─StreamAgg 1.00 root funcs:count(Column#13)->Column#8
└─TableReader 1.00 root data:StreamAgg
└─StreamAgg 1.00 cop[tikv] funcs:count(1)->Column#13
└─StreamAgg 1.00 cop[tikv] funcs:count(test.t1.c1)->Column#13
└─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
explain format = 'brief' select count(1) from (select count(c2) from t1 group by c3) k;
id estRows task access object operator info
Expand Down
8 changes: 4 additions & 4 deletions cmd/explaintest/r/index_merge.result
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ Sort_11 5098.44 root test.t1.c1
├─StreamAgg_23(Build) 1.00 root funcs:min(Column#9)->Column#10, funcs:sum(0)->Column#11, funcs:count(1)->Column#12
│ └─StreamAgg_43 1.00 root funcs:count(Column#25)->Column#9
│ └─IndexReader_44 1.00 root index:StreamAgg_27
│ └─StreamAgg_27 1.00 cop[tikv] funcs:count(1)->Column#25
│ └─StreamAgg_27 1.00 cop[tikv] funcs:count(test.t2._tidb_rowid)->Column#25
│ └─IndexFullScan_41 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:false, stats:pseudo
└─IndexMerge_21(Probe) 2825.66 root
├─IndexRangeScan_17(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
Expand All @@ -148,7 +148,7 @@ Sort_11 5098.44 root test.t1.c1
├─StreamAgg_23(Build) 1.00 root funcs:min(Column#9)->Column#10, funcs:sum(0)->Column#11, funcs:count(1)->Column#12
│ └─StreamAgg_43 1.00 root funcs:count(Column#25)->Column#9
│ └─IndexReader_44 1.00 root index:StreamAgg_27
│ └─StreamAgg_27 1.00 cop[tikv] funcs:count(1)->Column#25
│ └─StreamAgg_27 1.00 cop[tikv] funcs:count(test.t2._tidb_rowid)->Column#25
│ └─IndexFullScan_41 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:false, stats:pseudo
└─IndexMerge_21(Probe) 2825.66 root
├─IndexRangeScan_17(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
Expand All @@ -170,7 +170,7 @@ Sort_11 5542.21 root test.t1.c1
├─StreamAgg_22(Build) 1.00 root funcs:max(Column#9)->Column#10, funcs:sum(0)->Column#11, funcs:count(1)->Column#12
│ └─StreamAgg_42 1.00 root funcs:count(Column#25)->Column#9
│ └─IndexReader_43 1.00 root index:StreamAgg_26
│ └─StreamAgg_26 1.00 cop[tikv] funcs:count(1)->Column#25
│ └─StreamAgg_26 1.00 cop[tikv] funcs:count(test.t2._tidb_rowid)->Column#25
│ └─IndexFullScan_40 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:false, stats:pseudo
└─IndexMerge_20(Probe) 5542.21 root
├─IndexRangeScan_17(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
Expand All @@ -192,7 +192,7 @@ Sort_39 5542.21 root test.t1.c1
├─StreamAgg_50(Build) 1.00 root funcs:max(Column#13)->Column#14, funcs:sum(0)->Column#15, funcs:count(1)->Column#16
│ └─StreamAgg_70 1.00 root funcs:count(Column#38)->Column#13
│ └─IndexReader_71 1.00 root index:StreamAgg_54
│ └─StreamAgg_54 1.00 cop[tikv] funcs:count(1)->Column#38
│ └─StreamAgg_54 1.00 cop[tikv] funcs:count(test.t2._tidb_rowid)->Column#38
│ └─IndexFullScan_68 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:false, stats:pseudo
└─IndexMerge_48(Probe) 5542.21 root
├─IndexRangeScan_45(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
Expand Down
2 changes: 1 addition & 1 deletion parser/mysql/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (
TypeLongBlob byte = 0xfb
TypeBlob byte = 0xfc
TypeVarString byte = 0xfd
TypeString byte = 0xfe
TypeString byte = 0xfe /* TypeString is char type */
TypeGeometry byte = 0xff
)

Expand Down
10 changes: 10 additions & 0 deletions parser/types/field_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ func (ft *FieldType) IsDecimalValid() bool {
return true
}

// IsVarLengthType Determine whether the column type is a variable-length type
func (ft *FieldType) IsVarLengthType() bool {
switch ft.tp {
case mysql.TypeVarchar, mysql.TypeVarString, mysql.TypeJSON, mysql.TypeBlob, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob:
return true
default:
return false
}
}

// GetType returns the type of the FieldType.
func (ft *FieldType) GetType() byte {
return ft.tp
Expand Down
1 change: 1 addition & 0 deletions planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ go_library(
"rule_aggregation_skew_rewrite.go",
"rule_build_key_info.go",
"rule_column_pruning.go",
"rule_count_star_rewriter.go",
"rule_decorrelate.go",
"rule_eliminate_projection.go",
"rule_generate_column_substitute.go",
Expand Down
1 change: 1 addition & 0 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func (b *PlanBuilder) buildAggregation(ctx context.Context, p LogicalPlan, aggFu
b.optFlag |= flagPredicatePushDown
b.optFlag |= flagEliminateAgg
b.optFlag |= flagEliminateProjection
b.optFlag |= flagCountStarRewriter

if b.ctx.GetSessionVars().EnableSkewDistinctAgg {
b.optFlag |= flagSkewDistinctAgg
Expand Down
2 changes: 2 additions & 0 deletions planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const (
flagSyncWaitStatsLoadPoint
flagJoinReOrder
flagPrunColumnsAgain
flagCountStarRewriter
)

var optRuleList = []logicalOptRule{
Expand All @@ -94,6 +95,7 @@ var optRuleList = []logicalOptRule{
&syncWaitStatsLoadPoint{},
&joinReOrderSolver{},
&columnPruner{}, // column pruning again at last, note it will mess up the results of buildKeySolver
&countStarRewriter{},
}

type logicalOptimizeOp struct {
Expand Down
4 changes: 2 additions & 2 deletions planner/core/partition_pruner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ func TestRangeColumnPartitionPruningForIn(t *testing.T) {
"└─PartitionUnion 2.00 root ",
" ├─HashAgg 1.00 root funcs:count(Column#7)->Column#5",
" │ └─IndexReader 1.00 root index:HashAgg",
" │ └─HashAgg 1.00 cop[tikv] funcs:count(1)->Column#7",
" │ └─HashAgg 1.00 cop[tikv] funcs:count(test_range_col_in.t1.dt)->Column#7",
" │ └─Selection 20.00 cop[tikv] in(test_range_col_in.t1.dt, 2020-11-27 00:00:00.000000, 2020-11-28 00:00:00.000000)",
" │ └─IndexFullScan 10000.00 cop[tikv] table:t1, partition:p20201127, index:PRIMARY(id, dt) keep order:false, stats:pseudo",
" └─HashAgg 1.00 root funcs:count(Column#10)->Column#5",
" └─IndexReader 1.00 root index:HashAgg",
" └─HashAgg 1.00 cop[tikv] funcs:count(1)->Column#10",
" └─HashAgg 1.00 cop[tikv] funcs:count(test_range_col_in.t1.dt)->Column#10",
" └─Selection 20.00 cop[tikv] in(test_range_col_in.t1.dt, 2020-11-27 00:00:00.000000, 2020-11-28 00:00:00.000000)",
" └─IndexFullScan 10000.00 cop[tikv] table:t1, partition:p20201128, index:PRIMARY(id, dt) keep order:false, stats:pseudo"))

Expand Down
73 changes: 72 additions & 1 deletion planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2412,7 +2412,7 @@ func TestPhysicalPlanMemoryTrace(t *testing.T) {
ls.ByItems = append(ls.ByItems, &util.ByItems{})
require.Greater(t, ls.MemoryUsage(), size)

//PhysicalProperty
// PhysicalProperty
pp := property.PhysicalProperty{}
size = pp.MemoryUsage()
pp.MPPPartitionCols = append(pp.MPPPartitionCols, &property.MPPPartitionColumn{})
Expand Down Expand Up @@ -2460,3 +2460,74 @@ func TestNoDecorrelateHint(t *testing.T) {
tk.MustQuery("show warnings").Check(testkit.Rows(output[i].Warning...))
}
}

func TestCountStarForTikv(t *testing.T) {
var (
input []string
output []struct {
SQL string
Plan []string
Warning []string
}
)
planSuiteData := core.GetPlanSuiteData()
planSuiteData.LoadTestCases(t, &input, &output)
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("create table t (a int(11) not null, b varchar(10) not null, c date not null, d char(1) not null, e bigint not null, f datetime not null, g bool not null, h bool )")
tk.MustExec("create table t_pick_row_id (a char(20) not null)")

// tikv
for i, ts := range input {
testdata.OnRecord(func() {
output[i].SQL = ts
output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery("explain format = 'brief' " + ts).Rows())
})
tk.MustQuery("explain format = 'brief' " + ts).Check(testkit.Rows(output[i].Plan...))
}
}

func TestCountStarForTiFlash(t *testing.T) {
var (
input []string
output []struct {
SQL string
Plan []string
Warning []string
}
)
planSuiteData := core.GetPlanSuiteData()
planSuiteData.LoadTestCases(t, &input, &output)
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("create table t (a int(11) not null, b varchar(10) not null, c date not null, d char(1) not null, e bigint not null, f datetime not null, g bool not null, h bool )")
tk.MustExec("create table t_pick_row_id (a char(20) not null)")

// tiflash
dom := domain.GetDomain(tk.Session())
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
require.True(t, exists)
for _, tblInfo := range db.Tables {
tableName := tblInfo.Name.L
if tableName == "t" || tableName == "t_pick_row_id" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}

tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_enforce_mpp=1;")
for i, ts := range input {
testdata.OnRecord(func() {
output[i].SQL = ts
output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery("explain format = 'brief' " + ts).Rows())
})
tk.MustQuery("explain format = 'brief' " + ts).Check(testkit.Rows(output[i].Plan...))
}
}
76 changes: 65 additions & 11 deletions planner/core/rule_column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,21 +336,23 @@ func (ds *DataSource) PruneColumns(parentUsedCols []*expression.Column, opt *log
appendColumnPruneTraceStep(ds, prunedColumns, opt)
// For SQL like `select 1 from t`, tikv's response will be empty if no column is in schema.
// So we'll force to push one if schema doesn't have any column.
// There are two situations
// case 1: tiflash. Select a non-empty and narrowest column.
// The main reason is that tiflash is a column storage structure,
// and choosing the narrowest column can reduce the amount of data read as much as possible,
// making the reading efficiency the best.
// case 2: tikv. Select row_id or pk column.
// The main reason is that tikv is a kv store.
// Select the key column for the best read efficiency.
if ds.schema.Len() == 0 {
var handleCol *expression.Column
var handleColInfo *model.ColumnInfo
if ds.table.Type().IsClusterTable() && len(originColumns) > 0 {
// use the first line.
handleCol = originSchemaColumns[0]
handleColInfo = originColumns[0]
// case 1: tiflash
if ds.tableInfo.TiFlashReplica != nil {
handleCol, handleColInfo = preferNotNullColumnFromTable(ds)
} else {
if ds.handleCols != nil {
handleCol = ds.handleCols.GetCol(0)
handleColInfo = handleCol.ToInfo()
} else {
handleCol = ds.newExtraHandleSchemaCol()
handleColInfo = model.NewExtraHandleColInfo()
}
// case 2: tikv
handleCol, handleColInfo = preferKeyColumnFromTable(ds, originSchemaColumns, originColumns)
}
ds.Columns = append(ds.Columns, handleColInfo)
ds.schema.Append(handleCol)
Expand Down Expand Up @@ -658,3 +660,55 @@ func appendItemPruneTraceStep(p LogicalPlan, itemType string, prunedObjects []fm
}
opt.appendStepToCurrent(p.ID(), p.TP(), reason, action)
}

// pick a not null and narrowest column from table
func preferNotNullColumnFromTable(dataSource *DataSource) (*expression.Column, *model.ColumnInfo) {
var resultColumnInfo *model.ColumnInfo
var resultColumn *expression.Column

if dataSource.handleCols != nil {
resultColumn = dataSource.handleCols.GetCol(0)
resultColumnInfo = resultColumn.ToInfo()
} else {
resultColumn = dataSource.newExtraHandleSchemaCol()
resultColumnInfo = model.NewExtraHandleColInfo()
}

for _, columnInfo := range dataSource.tableInfo.Columns {
if columnInfo.FieldType.IsVarLengthType() {
continue
}
if mysql.HasNotNullFlag(columnInfo.GetFlag()) {
if columnInfo.GetFlen() < resultColumnInfo.GetFlen() {
resultColumnInfo = columnInfo
resultColumn = &expression.Column{
UniqueID: dataSource.ctx.GetSessionVars().AllocPlanColumnID(),
ID: resultColumnInfo.ID,
RetType: resultColumnInfo.FieldType.Clone(),
OrigName: fmt.Sprintf("%s.%s.%s", dataSource.DBName.L, dataSource.tableInfo.Name.L, resultColumnInfo.Name),
}
}
}
}
return resultColumn, resultColumnInfo
}

func preferKeyColumnFromTable(dataSource *DataSource, originColumns []*expression.Column,
originSchemaColumns []*model.ColumnInfo) (*expression.Column, *model.ColumnInfo) {
var resultColumnInfo *model.ColumnInfo
var resultColumn *expression.Column
if dataSource.table.Type().IsClusterTable() && len(originColumns) > 0 {
// use the first column.
resultColumnInfo = originSchemaColumns[0]
resultColumn = originColumns[0]
} else {
if dataSource.handleCols != nil {
resultColumn = dataSource.handleCols.GetCol(0)
resultColumnInfo = resultColumn.ToInfo()
} else {
resultColumn = dataSource.newExtraHandleSchemaCol()
resultColumnInfo = model.NewExtraHandleColInfo()
}
}
return resultColumn, resultColumnInfo
}
Loading

0 comments on commit 37bd052

Please sign in to comment.