Skip to content

Commit

Permalink
planner: change the stage of count star rewriter rule from logical to…
Browse files Browse the repository at this point in the history
… post (#39555)

close #37165
  • Loading branch information
elsa0520 committed Dec 2, 2022
1 parent eed9ceb commit 6b4739f
Show file tree
Hide file tree
Showing 15 changed files with 188 additions and 234 deletions.
2 changes: 1 addition & 1 deletion cmd/explaintest/r/access_path_selection.result
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ explain format = 'brief' select count(1) from access_path_selection;
id estRows task access object operator info
StreamAgg 1.00 root funcs:count(Column#18)->Column#4
└─TableReader 1.00 root data:StreamAgg
└─StreamAgg 1.00 cop[tikv] funcs:count(test.access_path_selection._tidb_rowid)->Column#18
└─StreamAgg 1.00 cop[tikv] funcs:count(1)->Column#18
└─TableFullScan 10000.00 cop[tikv] table:access_path_selection keep order:false, stats:pseudo
16 changes: 8 additions & 8 deletions cmd/explaintest/r/clustered_index.result
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,25 @@ explain select count(*) from with_cluster_index.tbl_0 where col_0 < 5429 ;
id estRows task access object operator info
StreamAgg_17 1.00 root funcs:count(Column#8)->Column#6
└─IndexReader_18 1.00 root index:StreamAgg_9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(with_cluster_index.tbl_0.col_0)->Column#8
└─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#8
└─IndexRangeScan_16 798.90 cop[tikv] table:tbl_0, index:idx_3(col_0) range:[-inf,5429), keep order:false
explain select count(*) from wout_cluster_index.tbl_0 where col_0 < 5429 ;
id estRows task access object operator info
StreamAgg_17 1.00 root funcs:count(Column#9)->Column#7
└─IndexReader_18 1.00 root index:StreamAgg_9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(wout_cluster_index.tbl_0.col_0)->Column#9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#9
└─IndexRangeScan_16 798.90 cop[tikv] table:tbl_0, index:idx_3(col_0) range:[-inf,5429), keep order:false
explain select count(*) from with_cluster_index.tbl_0 where col_0 < 41 ;
id estRows task access object operator info
StreamAgg_17 1.00 root funcs:count(Column#8)->Column#6
└─IndexReader_18 1.00 root index:StreamAgg_9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(with_cluster_index.tbl_0.col_0)->Column#8
└─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#8
└─IndexRangeScan_16 41.00 cop[tikv] table:tbl_0, index:idx_3(col_0) range:[-inf,41), keep order:false
explain select count(*) from wout_cluster_index.tbl_0 where col_0 < 41 ;
id estRows task access object operator info
StreamAgg_17 1.00 root funcs:count(Column#9)->Column#7
└─IndexReader_18 1.00 root index:StreamAgg_9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(wout_cluster_index.tbl_0.col_0)->Column#9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#9
└─IndexRangeScan_16 41.00 cop[tikv] table:tbl_0, index:idx_3(col_0) range:[-inf,41), keep order:false
explain select col_14 from with_cluster_index.tbl_2 where col_11 <> '2013-11-01' ;
id estRows task access object operator info
Expand Down Expand Up @@ -109,24 +109,24 @@ explain select count(*) from with_cluster_index.tbl_0 where col_0 <= 0 ;
id estRows task access object operator info
StreamAgg_16 1.00 root funcs:count(Column#8)->Column#6
└─IndexReader_17 1.00 root index:StreamAgg_9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(with_cluster_index.tbl_0.col_0)->Column#8
└─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#8
└─IndexRangeScan_11 1.00 cop[tikv] table:tbl_0, index:idx_3(col_0) range:[-inf,0], keep order:false
explain select count(*) from wout_cluster_index.tbl_0 where col_0 <= 0 ;
id estRows task access object operator info
StreamAgg_16 1.00 root funcs:count(Column#9)->Column#7
└─IndexReader_17 1.00 root index:StreamAgg_9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(wout_cluster_index.tbl_0.col_0)->Column#9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#9
└─IndexRangeScan_11 1.00 cop[tikv] table:tbl_0, index:idx_3(col_0) range:[-inf,0], keep order:false
explain select count(*) from with_cluster_index.tbl_0 where col_0 >= 803163 ;
id estRows task access object operator info
StreamAgg_17 1.00 root funcs:count(Column#8)->Column#6
└─IndexReader_18 1.00 root index:StreamAgg_9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(with_cluster_index.tbl_0.col_0)->Column#8
└─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#8
└─IndexRangeScan_16 109.70 cop[tikv] table:tbl_0, index:idx_3(col_0) range:[803163,+inf], keep order:false
explain select count(*) from wout_cluster_index.tbl_0 where col_0 >= 803163 ;
id estRows task access object operator info
StreamAgg_17 1.00 root funcs:count(Column#9)->Column#7
└─IndexReader_18 1.00 root index:StreamAgg_9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(wout_cluster_index.tbl_0.col_0)->Column#9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#9
└─IndexRangeScan_16 109.70 cop[tikv] table:tbl_0, index:idx_3(col_0) range:[803163,+inf], keep order:false
set @@tidb_enable_outer_join_reorder=false;
4 changes: 2 additions & 2 deletions cmd/explaintest/r/explain_easy.result
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,14 @@ id estRows task access object operator info
Projection 1.00 root 1->Column#6
└─StreamAgg 1.00 root funcs:count(Column#14)->Column#9
└─TableReader 1.00 root data:StreamAgg
└─StreamAgg 1.00 cop[tikv] funcs:count(test.t1.c1)->Column#14
└─StreamAgg 1.00 cop[tikv] funcs:count(1)->Column#14
└─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
explain format = 'brief' select count(1) from (select max(c2), count(c3) as m from t1) k;
id estRows task access object operator info
StreamAgg 1.00 root funcs:count(1)->Column#6
└─StreamAgg 1.00 root funcs:count(Column#13)->Column#8
└─TableReader 1.00 root data:StreamAgg
└─StreamAgg 1.00 cop[tikv] funcs:count(test.t1.c1)->Column#13
└─StreamAgg 1.00 cop[tikv] funcs:count(1)->Column#13
└─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
explain format = 'brief' select count(1) from (select count(c2) from t1 group by c3) k;
id estRows task access object operator info
Expand Down
8 changes: 4 additions & 4 deletions cmd/explaintest/r/index_merge.result
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ Sort_11 5098.44 root test.t1.c1
├─StreamAgg_23(Build) 1.00 root funcs:min(Column#9)->Column#10, funcs:sum(0)->Column#11, funcs:count(1)->Column#12
│ └─StreamAgg_43 1.00 root funcs:count(Column#25)->Column#9
│ └─IndexReader_44 1.00 root index:StreamAgg_27
│ └─StreamAgg_27 1.00 cop[tikv] funcs:count(test.t2._tidb_rowid)->Column#25
│ └─StreamAgg_27 1.00 cop[tikv] funcs:count(1)->Column#25
│ └─IndexFullScan_41 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:false, stats:pseudo
└─IndexMerge_21(Probe) 2825.66 root type: union
├─IndexRangeScan_17(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
Expand All @@ -148,7 +148,7 @@ Sort_11 5098.44 root test.t1.c1
├─StreamAgg_23(Build) 1.00 root funcs:min(Column#9)->Column#10, funcs:sum(0)->Column#11, funcs:count(1)->Column#12
│ └─StreamAgg_43 1.00 root funcs:count(Column#25)->Column#9
│ └─IndexReader_44 1.00 root index:StreamAgg_27
│ └─StreamAgg_27 1.00 cop[tikv] funcs:count(test.t2._tidb_rowid)->Column#25
│ └─StreamAgg_27 1.00 cop[tikv] funcs:count(1)->Column#25
│ └─IndexFullScan_41 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:false, stats:pseudo
└─IndexMerge_21(Probe) 2825.66 root type: union
├─IndexRangeScan_17(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
Expand All @@ -170,7 +170,7 @@ Sort_11 5542.21 root test.t1.c1
├─StreamAgg_22(Build) 1.00 root funcs:max(Column#9)->Column#10, funcs:sum(0)->Column#11, funcs:count(1)->Column#12
│ └─StreamAgg_42 1.00 root funcs:count(Column#25)->Column#9
│ └─IndexReader_43 1.00 root index:StreamAgg_26
│ └─StreamAgg_26 1.00 cop[tikv] funcs:count(test.t2._tidb_rowid)->Column#25
│ └─StreamAgg_26 1.00 cop[tikv] funcs:count(1)->Column#25
│ └─IndexFullScan_40 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:false, stats:pseudo
└─IndexMerge_20(Probe) 5542.21 root type: union
├─IndexRangeScan_17(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
Expand All @@ -192,7 +192,7 @@ Sort_39 5542.21 root test.t1.c1
├─StreamAgg_50(Build) 1.00 root funcs:max(Column#13)->Column#14, funcs:sum(0)->Column#15, funcs:count(1)->Column#16
│ └─StreamAgg_70 1.00 root funcs:count(Column#38)->Column#13
│ └─IndexReader_71 1.00 root index:StreamAgg_54
│ └─StreamAgg_54 1.00 cop[tikv] funcs:count(test.t2._tidb_rowid)->Column#38
│ └─StreamAgg_54 1.00 cop[tikv] funcs:count(1)->Column#38
│ └─IndexFullScan_68 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:false, stats:pseudo
└─IndexMerge_48(Probe) 5542.21 root type: union
├─IndexRangeScan_45(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
Expand Down
1 change: 0 additions & 1 deletion planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ go_library(
"rule_aggregation_skew_rewrite.go",
"rule_build_key_info.go",
"rule_column_pruning.go",
"rule_count_star_rewriter.go",
"rule_decorrelate.go",
"rule_eliminate_projection.go",
"rule_generate_column_substitute.go",
Expand Down
1 change: 0 additions & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ func (b *PlanBuilder) buildAggregation(ctx context.Context, p LogicalPlan, aggFu
b.optFlag |= flagPredicatePushDown
b.optFlag |= flagEliminateAgg
b.optFlag |= flagEliminateProjection
b.optFlag |= flagCountStarRewriter

if b.ctx.GetSessionVars().EnableSkewDistinctAgg {
b.optFlag |= flagSkewDistinctAgg
Expand Down
119 changes: 117 additions & 2 deletions planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ package core

import (
"context"
"fmt"
"math"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/lock"
Expand Down Expand Up @@ -72,7 +74,6 @@ const (
flagSyncWaitStatsLoadPoint
flagJoinReOrder
flagPrunColumnsAgain
flagCountStarRewriter
)

var optRuleList = []logicalOptRule{
Expand All @@ -95,7 +96,6 @@ var optRuleList = []logicalOptRule{
&syncWaitStatsLoadPoint{},
&joinReOrderSolver{},
&columnPruner{}, // column pruning again at last, note it will mess up the results of buildKeySolver
&countStarRewriter{},
}

type logicalOptimizeOp struct {
Expand Down Expand Up @@ -392,6 +392,7 @@ func postOptimize(sctx sessionctx.Context, plan PhysicalPlan) (PhysicalPlan, err
handleFineGrainedShuffle(sctx, plan)
checkPlanCacheable(sctx, plan)
propagateProbeParents(plan, nil)
countStarRewrite(plan)
return plan, nil
}

Expand Down Expand Up @@ -533,6 +534,120 @@ func prunePhysicalColumnsInternal(sctx sessionctx.Context, plan PhysicalPlan) er
return nil
}

/*
*
The countStarRewriter is used to rewrite
count(*) -> count(not null column)
**Only for TiFlash**
Attention:
Since count(*) is directly translated into count(1) during grammar parsing,
the rewritten pattern actually matches count(constant)
Pattern:
PhysicalAggregation: count(constant)
|
TableFullScan: TiFlash
Optimize:
Table
<k1 bool not null, k2 int null, k3 bigint not null>
Query: select count(*) from table
ColumnPruningRule: datasource pick row_id
countStarRewrite: datasource pick k1 instead of row_id
rewrite count(*) -> count(k1)
Rewritten Query: select count(k1) from table
*/
func countStarRewrite(plan PhysicalPlan) {
countStarRewriteInternal(plan)
if tableReader, ok := plan.(*PhysicalTableReader); ok {
countStarRewrite(tableReader.tablePlan)
} else {
for _, child := range plan.Children() {
countStarRewrite(child)
}
}
}

func countStarRewriteInternal(plan PhysicalPlan) {
// match pattern any agg(count(constant)) -> tablefullscan(tiflash)
var physicalAgg *basePhysicalAgg
switch x := plan.(type) {
case *PhysicalHashAgg:
physicalAgg = x.getPointer()
case *PhysicalStreamAgg:
physicalAgg = x.getPointer()
default:
return
}
if len(physicalAgg.GroupByItems) > 0 || len(physicalAgg.children) != 1 {
return
}
for _, aggFunc := range physicalAgg.AggFuncs {
if aggFunc.Name != "count" || len(aggFunc.Args) != 1 || aggFunc.HasDistinct {
return
}
if _, ok := aggFunc.Args[0].(*expression.Constant); !ok {
return
}
}
physicalTableScan, ok := physicalAgg.Children()[0].(*PhysicalTableScan)
if !ok || !physicalTableScan.isFullScan() || physicalTableScan.StoreType != kv.TiFlash || len(physicalTableScan.schema.Columns) != 1 {
return
}
// rewrite datasource and agg args
rewriteTableScanAndAggArgs(physicalTableScan, physicalAgg.AggFuncs)
}

// rewriteTableScanAndAggArgs Pick the narrowest and not null column from table
// If there is no not null column in Data Source, the row_id or pk column will be retained
func rewriteTableScanAndAggArgs(physicalTableScan *PhysicalTableScan, aggFuncs []*aggregation.AggFuncDesc) {
var resultColumnInfo *model.ColumnInfo
var resultColumn *expression.Column

resultColumnInfo = physicalTableScan.Columns[0]
resultColumn = physicalTableScan.schema.Columns[0]
// prefer not null column from table
for _, columnInfo := range physicalTableScan.Table.Columns {
if columnInfo.FieldType.IsVarLengthType() {
continue
}
if mysql.HasNotNullFlag(columnInfo.GetFlag()) {
if columnInfo.GetFlen() < resultColumnInfo.GetFlen() {
resultColumnInfo = columnInfo
resultColumn = &expression.Column{
UniqueID: physicalTableScan.ctx.GetSessionVars().AllocPlanColumnID(),
ID: resultColumnInfo.ID,
RetType: resultColumnInfo.FieldType.Clone(),
OrigName: fmt.Sprintf("%s.%s.%s", physicalTableScan.DBName.L, physicalTableScan.Table.Name.L, resultColumnInfo.Name),
}
}
}
}
// table scan (row_id) -> (not null column)
physicalTableScan.Columns[0] = resultColumnInfo
physicalTableScan.schema.Columns[0] = resultColumn
// agg arg count(1) -> count(not null column)
arg := resultColumn.Clone()
for _, aggFunc := range aggFuncs {
constExpr, ok := aggFunc.Args[0].(*expression.Constant)
if !ok {
return
}
// count(null) shouldn't be rewritten
if constExpr.Value.IsNull() {
continue
}
aggFunc.Args[0] = arg
}
}

// Only for MPP(Window<-[Sort]<-ExchangeReceiver<-ExchangeSender).
// TiFlashFineGrainedShuffleStreamCount:
// < 0: fine grained shuffle is disabled.
Expand Down
4 changes: 2 additions & 2 deletions planner/core/partition_pruner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ func TestRangeColumnPartitionPruningForIn(t *testing.T) {
"└─PartitionUnion 2.00 root ",
" ├─HashAgg 1.00 root funcs:count(Column#7)->Column#5",
" │ └─IndexReader 1.00 root index:HashAgg",
" │ └─HashAgg 1.00 cop[tikv] funcs:count(test_range_col_in.t1.dt)->Column#7",
" │ └─HashAgg 1.00 cop[tikv] funcs:count(1)->Column#7",
" │ └─Selection 20.00 cop[tikv] in(test_range_col_in.t1.dt, 2020-11-27 00:00:00.000000, 2020-11-28 00:00:00.000000)",
" │ └─IndexFullScan 10000.00 cop[tikv] table:t1, partition:p20201127, index:PRIMARY(id, dt) keep order:false, stats:pseudo",
" └─HashAgg 1.00 root funcs:count(Column#10)->Column#5",
" └─IndexReader 1.00 root index:HashAgg",
" └─HashAgg 1.00 cop[tikv] funcs:count(test_range_col_in.t1.dt)->Column#10",
" └─HashAgg 1.00 cop[tikv] funcs:count(1)->Column#10",
" └─Selection 20.00 cop[tikv] in(test_range_col_in.t1.dt, 2020-11-27 00:00:00.000000, 2020-11-28 00:00:00.000000)",
" └─IndexFullScan 10000.00 cop[tikv] table:t1, partition:p20201128, index:PRIMARY(id, dt) keep order:false, stats:pseudo"))

Expand Down
8 changes: 8 additions & 0 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1753,6 +1753,10 @@ type PhysicalHashAgg struct {
basePhysicalAgg
}

func (p *PhysicalHashAgg) getPointer() *basePhysicalAgg {
return &p.basePhysicalAgg
}

// Clone implements PhysicalPlan interface.
func (p *PhysicalHashAgg) Clone() (PhysicalPlan, error) {
cloned := new(PhysicalHashAgg)
Expand Down Expand Up @@ -1787,6 +1791,10 @@ type PhysicalStreamAgg struct {
basePhysicalAgg
}

func (p *PhysicalStreamAgg) getPointer() *basePhysicalAgg {
return &p.basePhysicalAgg
}

// Clone implements PhysicalPlan interface.
func (p *PhysicalStreamAgg) Clone() (PhysicalPlan, error) {
cloned := new(PhysicalStreamAgg)
Expand Down
48 changes: 1 addition & 47 deletions planner/core/rule_column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,24 +336,10 @@ func (ds *DataSource) PruneColumns(parentUsedCols []*expression.Column, opt *log
appendColumnPruneTraceStep(ds, prunedColumns, opt)
// For SQL like `select 1 from t`, tikv's response will be empty if no column is in schema.
// So we'll force to push one if schema doesn't have any column.
// There are two situations
// case 1: tiflash. Select a non-empty and narrowest column.
// The main reason is that tiflash is a column storage structure,
// and choosing the narrowest column can reduce the amount of data read as much as possible,
// making the reading efficiency the best.
// case 2: tikv. Select row_id or pk column.
// The main reason is that tikv is a kv store.
// Select the key column for the best read efficiency.
if ds.schema.Len() == 0 {
var handleCol *expression.Column
var handleColInfo *model.ColumnInfo
// case 1: tiflash
if ds.tableInfo.TiFlashReplica != nil {
handleCol, handleColInfo = preferNotNullColumnFromTable(ds)
} else {
// case 2: tikv
handleCol, handleColInfo = preferKeyColumnFromTable(ds, originSchemaColumns, originColumns)
}
handleCol, handleColInfo = preferKeyColumnFromTable(ds, originSchemaColumns, originColumns)
ds.Columns = append(ds.Columns, handleColInfo)
ds.schema.Append(handleCol)
}
Expand Down Expand Up @@ -661,38 +647,6 @@ func appendItemPruneTraceStep(p LogicalPlan, itemType string, prunedObjects []fm
opt.appendStepToCurrent(p.ID(), p.TP(), reason, action)
}

// pick a not null and narrowest column from table
func preferNotNullColumnFromTable(dataSource *DataSource) (*expression.Column, *model.ColumnInfo) {
var resultColumnInfo *model.ColumnInfo
var resultColumn *expression.Column

if dataSource.handleCols != nil {
resultColumn = dataSource.handleCols.GetCol(0)
resultColumnInfo = resultColumn.ToInfo()
} else {
resultColumn = dataSource.newExtraHandleSchemaCol()
resultColumnInfo = model.NewExtraHandleColInfo()
}

for _, columnInfo := range dataSource.tableInfo.Columns {
if columnInfo.FieldType.IsVarLengthType() {
continue
}
if mysql.HasNotNullFlag(columnInfo.GetFlag()) {
if columnInfo.GetFlen() < resultColumnInfo.GetFlen() {
resultColumnInfo = columnInfo
resultColumn = &expression.Column{
UniqueID: dataSource.ctx.GetSessionVars().AllocPlanColumnID(),
ID: resultColumnInfo.ID,
RetType: resultColumnInfo.FieldType.Clone(),
OrigName: fmt.Sprintf("%s.%s.%s", dataSource.DBName.L, dataSource.tableInfo.Name.L, resultColumnInfo.Name),
}
}
}
}
return resultColumn, resultColumnInfo
}

func preferKeyColumnFromTable(dataSource *DataSource, originColumns []*expression.Column,
originSchemaColumns []*model.ColumnInfo) (*expression.Column, *model.ColumnInfo) {
var resultColumnInfo *model.ColumnInfo
Expand Down
Loading

0 comments on commit 6b4739f

Please sign in to comment.