Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: change the stage of count star rewriter rule from logical to post #39555

Merged
merged 13 commits into from
Dec 2, 2022
2 changes: 1 addition & 1 deletion cmd/explaintest/r/access_path_selection.result
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ explain format = 'brief' select count(1) from access_path_selection;
id estRows task access object operator info
StreamAgg 1.00 root funcs:count(Column#18)->Column#4
└─TableReader 1.00 root data:StreamAgg
└─StreamAgg 1.00 cop[tikv] funcs:count(test.access_path_selection._tidb_rowid)->Column#18
└─StreamAgg 1.00 cop[tikv] funcs:count(1)->Column#18
└─TableFullScan 10000.00 cop[tikv] table:access_path_selection keep order:false, stats:pseudo
16 changes: 8 additions & 8 deletions cmd/explaintest/r/clustered_index.result
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,25 @@ explain select count(*) from with_cluster_index.tbl_0 where col_0 < 5429 ;
id estRows task access object operator info
StreamAgg_17 1.00 root funcs:count(Column#8)->Column#6
└─IndexReader_18 1.00 root index:StreamAgg_9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(with_cluster_index.tbl_0.col_0)->Column#8
└─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#8
└─IndexRangeScan_16 798.90 cop[tikv] table:tbl_0, index:idx_3(col_0) range:[-inf,5429), keep order:false
explain select count(*) from wout_cluster_index.tbl_0 where col_0 < 5429 ;
id estRows task access object operator info
StreamAgg_17 1.00 root funcs:count(Column#9)->Column#7
└─IndexReader_18 1.00 root index:StreamAgg_9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(wout_cluster_index.tbl_0.col_0)->Column#9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#9
└─IndexRangeScan_16 798.90 cop[tikv] table:tbl_0, index:idx_3(col_0) range:[-inf,5429), keep order:false
explain select count(*) from with_cluster_index.tbl_0 where col_0 < 41 ;
id estRows task access object operator info
StreamAgg_17 1.00 root funcs:count(Column#8)->Column#6
└─IndexReader_18 1.00 root index:StreamAgg_9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(with_cluster_index.tbl_0.col_0)->Column#8
└─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#8
└─IndexRangeScan_16 41.00 cop[tikv] table:tbl_0, index:idx_3(col_0) range:[-inf,41), keep order:false
explain select count(*) from wout_cluster_index.tbl_0 where col_0 < 41 ;
id estRows task access object operator info
StreamAgg_17 1.00 root funcs:count(Column#9)->Column#7
└─IndexReader_18 1.00 root index:StreamAgg_9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(wout_cluster_index.tbl_0.col_0)->Column#9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#9
└─IndexRangeScan_16 41.00 cop[tikv] table:tbl_0, index:idx_3(col_0) range:[-inf,41), keep order:false
explain select col_14 from with_cluster_index.tbl_2 where col_11 <> '2013-11-01' ;
id estRows task access object operator info
Expand Down Expand Up @@ -109,24 +109,24 @@ explain select count(*) from with_cluster_index.tbl_0 where col_0 <= 0 ;
id estRows task access object operator info
StreamAgg_16 1.00 root funcs:count(Column#8)->Column#6
└─IndexReader_17 1.00 root index:StreamAgg_9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(with_cluster_index.tbl_0.col_0)->Column#8
└─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#8
└─IndexRangeScan_11 1.00 cop[tikv] table:tbl_0, index:idx_3(col_0) range:[-inf,0], keep order:false
explain select count(*) from wout_cluster_index.tbl_0 where col_0 <= 0 ;
id estRows task access object operator info
StreamAgg_16 1.00 root funcs:count(Column#9)->Column#7
└─IndexReader_17 1.00 root index:StreamAgg_9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(wout_cluster_index.tbl_0.col_0)->Column#9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#9
└─IndexRangeScan_11 1.00 cop[tikv] table:tbl_0, index:idx_3(col_0) range:[-inf,0], keep order:false
explain select count(*) from with_cluster_index.tbl_0 where col_0 >= 803163 ;
id estRows task access object operator info
StreamAgg_17 1.00 root funcs:count(Column#8)->Column#6
└─IndexReader_18 1.00 root index:StreamAgg_9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(with_cluster_index.tbl_0.col_0)->Column#8
└─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#8
└─IndexRangeScan_16 109.70 cop[tikv] table:tbl_0, index:idx_3(col_0) range:[803163,+inf], keep order:false
explain select count(*) from wout_cluster_index.tbl_0 where col_0 >= 803163 ;
id estRows task access object operator info
StreamAgg_17 1.00 root funcs:count(Column#9)->Column#7
└─IndexReader_18 1.00 root index:StreamAgg_9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(wout_cluster_index.tbl_0.col_0)->Column#9
└─StreamAgg_9 1.00 cop[tikv] funcs:count(1)->Column#9
└─IndexRangeScan_16 109.70 cop[tikv] table:tbl_0, index:idx_3(col_0) range:[803163,+inf], keep order:false
set @@tidb_enable_outer_join_reorder=false;
4 changes: 2 additions & 2 deletions cmd/explaintest/r/explain_easy.result
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,14 @@ id estRows task access object operator info
Projection 1.00 root 1->Column#6
└─StreamAgg 1.00 root funcs:count(Column#14)->Column#9
└─TableReader 1.00 root data:StreamAgg
└─StreamAgg 1.00 cop[tikv] funcs:count(test.t1.c1)->Column#14
└─StreamAgg 1.00 cop[tikv] funcs:count(1)->Column#14
└─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
explain format = 'brief' select count(1) from (select max(c2), count(c3) as m from t1) k;
id estRows task access object operator info
StreamAgg 1.00 root funcs:count(1)->Column#6
└─StreamAgg 1.00 root funcs:count(Column#13)->Column#8
└─TableReader 1.00 root data:StreamAgg
└─StreamAgg 1.00 cop[tikv] funcs:count(test.t1.c1)->Column#13
└─StreamAgg 1.00 cop[tikv] funcs:count(1)->Column#13
└─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
explain format = 'brief' select count(1) from (select count(c2) from t1 group by c3) k;
id estRows task access object operator info
Expand Down
8 changes: 4 additions & 4 deletions cmd/explaintest/r/index_merge.result
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ Sort_11 5098.44 root test.t1.c1
├─StreamAgg_23(Build) 1.00 root funcs:min(Column#9)->Column#10, funcs:sum(0)->Column#11, funcs:count(1)->Column#12
│ └─StreamAgg_43 1.00 root funcs:count(Column#25)->Column#9
│ └─IndexReader_44 1.00 root index:StreamAgg_27
│ └─StreamAgg_27 1.00 cop[tikv] funcs:count(test.t2._tidb_rowid)->Column#25
│ └─StreamAgg_27 1.00 cop[tikv] funcs:count(1)->Column#25
│ └─IndexFullScan_41 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:false, stats:pseudo
└─IndexMerge_21(Probe) 2825.66 root type: union
├─IndexRangeScan_17(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
Expand All @@ -148,7 +148,7 @@ Sort_11 5098.44 root test.t1.c1
├─StreamAgg_23(Build) 1.00 root funcs:min(Column#9)->Column#10, funcs:sum(0)->Column#11, funcs:count(1)->Column#12
│ └─StreamAgg_43 1.00 root funcs:count(Column#25)->Column#9
│ └─IndexReader_44 1.00 root index:StreamAgg_27
│ └─StreamAgg_27 1.00 cop[tikv] funcs:count(test.t2._tidb_rowid)->Column#25
│ └─StreamAgg_27 1.00 cop[tikv] funcs:count(1)->Column#25
│ └─IndexFullScan_41 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:false, stats:pseudo
└─IndexMerge_21(Probe) 2825.66 root type: union
├─IndexRangeScan_17(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
Expand All @@ -170,7 +170,7 @@ Sort_11 5542.21 root test.t1.c1
├─StreamAgg_22(Build) 1.00 root funcs:max(Column#9)->Column#10, funcs:sum(0)->Column#11, funcs:count(1)->Column#12
│ └─StreamAgg_42 1.00 root funcs:count(Column#25)->Column#9
│ └─IndexReader_43 1.00 root index:StreamAgg_26
│ └─StreamAgg_26 1.00 cop[tikv] funcs:count(test.t2._tidb_rowid)->Column#25
│ └─StreamAgg_26 1.00 cop[tikv] funcs:count(1)->Column#25
│ └─IndexFullScan_40 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:false, stats:pseudo
└─IndexMerge_20(Probe) 5542.21 root type: union
├─IndexRangeScan_17(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
Expand All @@ -192,7 +192,7 @@ Sort_39 5542.21 root test.t1.c1
├─StreamAgg_50(Build) 1.00 root funcs:max(Column#13)->Column#14, funcs:sum(0)->Column#15, funcs:count(1)->Column#16
│ └─StreamAgg_70 1.00 root funcs:count(Column#38)->Column#13
│ └─IndexReader_71 1.00 root index:StreamAgg_54
│ └─StreamAgg_54 1.00 cop[tikv] funcs:count(test.t2._tidb_rowid)->Column#38
│ └─StreamAgg_54 1.00 cop[tikv] funcs:count(1)->Column#38
│ └─IndexFullScan_68 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:false, stats:pseudo
└─IndexMerge_48(Probe) 5542.21 root type: union
├─IndexRangeScan_45(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo
Expand Down
1 change: 0 additions & 1 deletion planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ go_library(
"rule_aggregation_skew_rewrite.go",
"rule_build_key_info.go",
"rule_column_pruning.go",
"rule_count_star_rewriter.go",
"rule_decorrelate.go",
"rule_eliminate_projection.go",
"rule_generate_column_substitute.go",
Expand Down
1 change: 0 additions & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ func (b *PlanBuilder) buildAggregation(ctx context.Context, p LogicalPlan, aggFu
b.optFlag |= flagPredicatePushDown
b.optFlag |= flagEliminateAgg
b.optFlag |= flagEliminateProjection
b.optFlag |= flagCountStarRewriter

if b.ctx.GetSessionVars().EnableSkewDistinctAgg {
b.optFlag |= flagSkewDistinctAgg
Expand Down
118 changes: 116 additions & 2 deletions planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ package core

import (
"context"
"fmt"
"math"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/lock"
Expand Down Expand Up @@ -72,7 +74,6 @@ const (
flagSyncWaitStatsLoadPoint
flagJoinReOrder
flagPrunColumnsAgain
flagCountStarRewriter
)

var optRuleList = []logicalOptRule{
Expand All @@ -95,7 +96,6 @@ var optRuleList = []logicalOptRule{
&syncWaitStatsLoadPoint{},
&joinReOrderSolver{},
&columnPruner{}, // column pruning again at last, note it will mess up the results of buildKeySolver
&countStarRewriter{},
}

type logicalOptimizeOp struct {
Expand Down Expand Up @@ -392,6 +392,7 @@ func postOptimize(sctx sessionctx.Context, plan PhysicalPlan) (PhysicalPlan, err
handleFineGrainedShuffle(sctx, plan)
checkPlanCacheable(sctx, plan)
propagateProbeParents(plan, nil)
countStarRewrite(plan)
return plan, nil
}

Expand Down Expand Up @@ -533,6 +534,119 @@ func prunePhysicalColumnsInternal(sctx sessionctx.Context, plan PhysicalPlan) er
return nil
}

/*
*
The countStarRewriter is used to rewrite

count(*) -> count(not null column)

**Only for TiFlash**
Attention:
Since count(*) is directly translated into count(1) during grammar parsing,
the rewritten pattern actually matches count(constant)

Pattern:
PhysicalAggregation: count(constant)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why there is a blank line? Is it expected?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a typo, we can delete it in later commits.

|
TableFullScan: TiFlash

Optimize:
Table

<k1 bool not null, k2 int null, k3 bigint not null>

Query: select count(*) from table
ColumnPruningRule: datasource pick row_id
countStarRewrite: datasource pick k1 instead of row_id

rewrite count(*) -> count(k1)

Rewritten Query: select count(k1) from table
*/
func countStarRewrite(plan PhysicalPlan) {
countStarRewriteInternal(plan)
if tableReader, ok := plan.(*PhysicalTableReader); ok {
countStarRewrite(tableReader.tablePlan)
} else {
for _, child := range plan.Children() {
countStarRewrite(child)
}
}
}

func countStarRewriteInternal(plan PhysicalPlan) {
// match pattern any agg(count(constant)) -> tablefullscan(tiflash)
var physicalAgg *basePhysicalAgg
switch x := plan.(type) {
case *PhysicalHashAgg:
physicalAgg = x.getPointer()
case *PhysicalStreamAgg:
physicalAgg = x.getPointer()
default:
return
}
if len(physicalAgg.GroupByItems) > 0 || len(physicalAgg.children) != 1 {
return
}
for _, aggFunc := range physicalAgg.AggFuncs {
if aggFunc.Name != "count" || len(aggFunc.Args) != 1 || aggFunc.HasDistinct {
return
}
if _, ok := aggFunc.Args[0].(*expression.Constant); !ok {
return
}
}
physicalTableScan, ok := physicalAgg.Children()[0].(*PhysicalTableScan)
if !ok || !physicalTableScan.isFullScan() || physicalTableScan.StoreType != kv.TiFlash || len(physicalTableScan.schema.Columns) != 1 {
return
}
// rewrite datasource and agg args
rewriteTableScanAndAggArgs(physicalTableScan, physicalAgg.AggFuncs)
}

// rewriteTableScanAndAggArgs Pick the narrowest and not null column from table
// If there is no not null column in Data Source, the row_id or pk column will be retained
func rewriteTableScanAndAggArgs(physicalTableScan *PhysicalTableScan, aggFuncs []*aggregation.AggFuncDesc) {
var resultColumnInfo *model.ColumnInfo
var resultColumn *expression.Column

resultColumnInfo = physicalTableScan.Columns[0]
resultColumn = physicalTableScan.schema.Columns[0]
// prefer not null column from table
for _, columnInfo := range physicalTableScan.Table.Columns {
if columnInfo.FieldType.IsVarLengthType() {
continue
}
if mysql.HasNotNullFlag(columnInfo.GetFlag()) {
if columnInfo.GetFlen() < resultColumnInfo.GetFlen() {
resultColumnInfo = columnInfo
resultColumn = &expression.Column{
UniqueID: physicalTableScan.ctx.GetSessionVars().AllocPlanColumnID(),
ID: resultColumnInfo.ID,
RetType: resultColumnInfo.FieldType.Clone(),
OrigName: fmt.Sprintf("%s.%s.%s", physicalTableScan.DBName.L, physicalTableScan.Table.Name.L, resultColumnInfo.Name),
}
}
}
}
// table scan (row_id) -> (not null column)
physicalTableScan.Columns[0] = resultColumnInfo
physicalTableScan.schema.Columns[0] = resultColumn
// agg args count(1) -> count(not null column)
for _, aggFunc := range aggFuncs {
constExpr, ok := aggFunc.Args[0].(*expression.Constant)
if !ok {
return
}
// count(null) shouldn't be rewritten
if constExpr.Value.IsNull() {
continue
}
aggFunc.Args[0] = resultColumn
winoros marked this conversation as resolved.
Show resolved Hide resolved
}
}

// Only for MPP(Window<-[Sort]<-ExchangeReceiver<-ExchangeSender).
// TiFlashFineGrainedShuffleStreamCount:
// < 0: fine grained shuffle is disabled.
Expand Down
4 changes: 2 additions & 2 deletions planner/core/partition_pruner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ func TestRangeColumnPartitionPruningForIn(t *testing.T) {
"└─PartitionUnion 2.00 root ",
" ├─HashAgg 1.00 root funcs:count(Column#7)->Column#5",
" │ └─IndexReader 1.00 root index:HashAgg",
" │ └─HashAgg 1.00 cop[tikv] funcs:count(test_range_col_in.t1.dt)->Column#7",
" │ └─HashAgg 1.00 cop[tikv] funcs:count(1)->Column#7",
" │ └─Selection 20.00 cop[tikv] in(test_range_col_in.t1.dt, 2020-11-27 00:00:00.000000, 2020-11-28 00:00:00.000000)",
" │ └─IndexFullScan 10000.00 cop[tikv] table:t1, partition:p20201127, index:PRIMARY(id, dt) keep order:false, stats:pseudo",
" └─HashAgg 1.00 root funcs:count(Column#10)->Column#5",
" └─IndexReader 1.00 root index:HashAgg",
" └─HashAgg 1.00 cop[tikv] funcs:count(test_range_col_in.t1.dt)->Column#10",
" └─HashAgg 1.00 cop[tikv] funcs:count(1)->Column#10",
" └─Selection 20.00 cop[tikv] in(test_range_col_in.t1.dt, 2020-11-27 00:00:00.000000, 2020-11-28 00:00:00.000000)",
" └─IndexFullScan 10000.00 cop[tikv] table:t1, partition:p20201128, index:PRIMARY(id, dt) keep order:false, stats:pseudo"))

Expand Down
8 changes: 8 additions & 0 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1753,6 +1753,10 @@ type PhysicalHashAgg struct {
basePhysicalAgg
}

func (p *PhysicalHashAgg) getPointer() *basePhysicalAgg {
return &p.basePhysicalAgg
}

// Clone implements PhysicalPlan interface.
func (p *PhysicalHashAgg) Clone() (PhysicalPlan, error) {
cloned := new(PhysicalHashAgg)
Expand Down Expand Up @@ -1787,6 +1791,10 @@ type PhysicalStreamAgg struct {
basePhysicalAgg
}

func (p *PhysicalStreamAgg) getPointer() *basePhysicalAgg {
return &p.basePhysicalAgg
}

// Clone implements PhysicalPlan interface.
func (p *PhysicalStreamAgg) Clone() (PhysicalPlan, error) {
cloned := new(PhysicalStreamAgg)
Expand Down
48 changes: 1 addition & 47 deletions planner/core/rule_column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,24 +336,10 @@ func (ds *DataSource) PruneColumns(parentUsedCols []*expression.Column, opt *log
appendColumnPruneTraceStep(ds, prunedColumns, opt)
// For SQL like `select 1 from t`, tikv's response will be empty if no column is in schema.
// So we'll force to push one if schema doesn't have any column.
// There are two situations
// case 1: tiflash. Select a non-empty and narrowest column.
// The main reason is that tiflash is a column storage structure,
// and choosing the narrowest column can reduce the amount of data read as much as possible,
// making the reading efficiency the best.
// case 2: tikv. Select row_id or pk column.
// The main reason is that tikv is a kv store.
// Select the key column for the best read efficiency.
if ds.schema.Len() == 0 {
var handleCol *expression.Column
var handleColInfo *model.ColumnInfo
// case 1: tiflash
if ds.tableInfo.TiFlashReplica != nil {
handleCol, handleColInfo = preferNotNullColumnFromTable(ds)
} else {
// case 2: tikv
handleCol, handleColInfo = preferKeyColumnFromTable(ds, originSchemaColumns, originColumns)
}
handleCol, handleColInfo = preferKeyColumnFromTable(ds, originSchemaColumns, originColumns)
ds.Columns = append(ds.Columns, handleColInfo)
ds.schema.Append(handleCol)
}
Expand Down Expand Up @@ -661,38 +647,6 @@ func appendItemPruneTraceStep(p LogicalPlan, itemType string, prunedObjects []fm
opt.appendStepToCurrent(p.ID(), p.TP(), reason, action)
}

// pick a not null and narrowest column from table
func preferNotNullColumnFromTable(dataSource *DataSource) (*expression.Column, *model.ColumnInfo) {
var resultColumnInfo *model.ColumnInfo
var resultColumn *expression.Column

if dataSource.handleCols != nil {
resultColumn = dataSource.handleCols.GetCol(0)
resultColumnInfo = resultColumn.ToInfo()
} else {
resultColumn = dataSource.newExtraHandleSchemaCol()
resultColumnInfo = model.NewExtraHandleColInfo()
}

for _, columnInfo := range dataSource.tableInfo.Columns {
if columnInfo.FieldType.IsVarLengthType() {
continue
}
if mysql.HasNotNullFlag(columnInfo.GetFlag()) {
if columnInfo.GetFlen() < resultColumnInfo.GetFlen() {
resultColumnInfo = columnInfo
resultColumn = &expression.Column{
UniqueID: dataSource.ctx.GetSessionVars().AllocPlanColumnID(),
ID: resultColumnInfo.ID,
RetType: resultColumnInfo.FieldType.Clone(),
OrigName: fmt.Sprintf("%s.%s.%s", dataSource.DBName.L, dataSource.tableInfo.Name.L, resultColumnInfo.Name),
}
}
}
}
return resultColumn, resultColumnInfo
}

func preferKeyColumnFromTable(dataSource *DataSource, originColumns []*expression.Column,
originSchemaColumns []*model.ColumnInfo) (*expression.Column, *model.ColumnInfo) {
var resultColumnInfo *model.ColumnInfo
Expand Down
Loading