Skip to content

Commit

Permalink
planner: change predicateColumnCollector to columnStatsUsageCollector…
Browse files Browse the repository at this point in the history
… and collect histogram-needed columns (#30671)
  • Loading branch information
xuyifangreeneyes authored Dec 25, 2021
1 parent 9ad0096 commit 76aae0d
Show file tree
Hide file tree
Showing 5 changed files with 425 additions and 252 deletions.
8 changes: 4 additions & 4 deletions expression/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,16 @@ func extractColumns(result []*Column, expr Expression, filter func(*Column) bool
return result
}

// ExtractColumnsAndCorColumns extracts columns and correlated columns from `expr` and append them to `result`.
func ExtractColumnsAndCorColumns(result []*Column, expr Expression) []*Column {
// extractColumnsAndCorColumns extracts columns and correlated columns from `expr` and append them to `result`.
func extractColumnsAndCorColumns(result []*Column, expr Expression) []*Column {
switch v := expr.(type) {
case *Column:
result = append(result, v)
case *CorrelatedColumn:
result = append(result, &v.Column)
case *ScalarFunction:
for _, arg := range v.GetArgs() {
result = ExtractColumnsAndCorColumns(result, arg)
result = extractColumnsAndCorColumns(result, arg)
}
}
return result
Expand All @@ -184,7 +184,7 @@ func ExtractColumnsAndCorColumns(result []*Column, expr Expression) []*Column {
// ExtractColumnsAndCorColumnsFromExpressions extracts columns and correlated columns from expressions and append them to `result`.
func ExtractColumnsAndCorColumnsFromExpressions(result []*Column, list []Expression) []*Column {
for _, expr := range list {
result = ExtractColumnsAndCorColumns(result, expr)
result = extractColumnsAndCorColumns(result, expr)
}
return result
}
Expand Down
296 changes: 165 additions & 131 deletions planner/core/collect_column_stats_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,49 @@ import (
"github.com/pingcap/tidb/parser/model"
)

// predicateColumnCollector collects predicate columns from logical plan. Predicate columns are the columns whose statistics
// are utilized when making query plans, which usually occur in where conditions, join conditions and so on.
type predicateColumnCollector struct {
// colMap maps expression.Column.UniqueID to the table columns whose statistics are utilized to calculate statistics of the column.
colMap map[int64]map[model.TableColumnID]struct{}
const (
collectPredicateColumns uint64 = 1 << iota
collectHistNeededColumns
)

// columnStatsUsageCollector collects predicate columns and/or histogram-needed columns from logical plan.
// Predicate columns are the columns whose statistics are utilized when making query plans, which usually occur in where conditions, join conditions and so on.
// Histogram-needed columns are the columns whose histograms are utilized when making query plans, which usually occur in the conditions pushed down to DataSource.
// The set of histogram-needed columns is the subset of that of predicate columns.
type columnStatsUsageCollector struct {
// collectMode indicates whether to collect predicate columns and/or histogram-needed columns
collectMode uint64
// predicateCols records predicate columns.
predicateCols map[model.TableColumnID]struct{}
// colMap maps expression.Column.UniqueID to the table columns whose statistics may be utilized to calculate statistics of the column.
// It is used for collecting predicate columns.
// For example, in `select count(distinct a, b) as e from t`, the count of column `e` is calculated as `max(ndv(t.a), ndv(t.b))` if
// we don't know `ndv(t.a, t.b)`(see (*LogicalAggregation).DeriveStats and getColsNDV for details). So when calculating the statistics
// of column `e`, we may use the statistics of column `t.a` and `t.b`.
colMap map[int64]map[model.TableColumnID]struct{}
// histNeededCols records histogram-needed columns
histNeededCols map[model.TableColumnID]struct{}
// cols is used to store columns collected from expressions and saves some allocation.
cols []*expression.Column
}

func newPredicateColumnCollector() *predicateColumnCollector {
return &predicateColumnCollector{
colMap: make(map[int64]map[model.TableColumnID]struct{}),
predicateCols: make(map[model.TableColumnID]struct{}),
func newColumnStatsUsageCollector(collectMode uint64) *columnStatsUsageCollector {
collector := &columnStatsUsageCollector{
collectMode: collectMode,
// Pre-allocate a slice to reduce allocation, 8 doesn't have special meaning.
cols: make([]*expression.Column, 0, 8),
}
if collectMode&collectPredicateColumns != 0 {
collector.predicateCols = make(map[model.TableColumnID]struct{})
collector.colMap = make(map[int64]map[model.TableColumnID]struct{})
}
if collectMode&collectHistNeededColumns != 0 {
collector.histNeededCols = make(map[model.TableColumnID]struct{})
}
return collector
}

func (c *predicateColumnCollector) addPredicateColumn(col *expression.Column) {
func (c *columnStatsUsageCollector) addPredicateColumn(col *expression.Column) {
tblColIDs, ok := c.colMap[col.UniqueID]
if !ok {
// It may happen if some leaf of logical plan is LogicalMemTable/LogicalShow/LogicalShowDDLJobs.
Expand All @@ -50,21 +72,14 @@ func (c *predicateColumnCollector) addPredicateColumn(col *expression.Column) {
}
}

func (c *predicateColumnCollector) addPredicateColumnsFromExpression(expr expression.Expression) {
cols := expression.ExtractColumnsAndCorColumns(c.cols[:0], expr)
for _, col := range cols {
c.addPredicateColumn(col)
}
}

func (c *predicateColumnCollector) addPredicateColumnsFromExpressions(list []expression.Expression) {
func (c *columnStatsUsageCollector) addPredicateColumnsFromExpressions(list []expression.Expression) {
cols := expression.ExtractColumnsAndCorColumnsFromExpressions(c.cols[:0], list)
for _, col := range cols {
c.addPredicateColumn(col)
}
}

func (c *predicateColumnCollector) updateColMap(col *expression.Column, relatedCols []*expression.Column) {
func (c *columnStatsUsageCollector) updateColMap(col *expression.Column, relatedCols []*expression.Column) {
if _, ok := c.colMap[col.UniqueID]; !ok {
c.colMap[col.UniqueID] = map[model.TableColumnID]struct{}{}
}
Expand All @@ -80,15 +95,11 @@ func (c *predicateColumnCollector) updateColMap(col *expression.Column, relatedC
}
}

func (c *predicateColumnCollector) updateColMapFromExpression(col *expression.Column, expr expression.Expression) {
c.updateColMap(col, expression.ExtractColumnsAndCorColumns(c.cols[:0], expr))
}

func (c *predicateColumnCollector) updateColMapFromExpressions(col *expression.Column, list []expression.Expression) {
func (c *columnStatsUsageCollector) updateColMapFromExpressions(col *expression.Column, list []expression.Expression) {
c.updateColMap(col, expression.ExtractColumnsAndCorColumnsFromExpressions(c.cols[:0], list))
}

func (ds *DataSource) updateColMapAndAddPredicateColumns(c *predicateColumnCollector) {
func (c *columnStatsUsageCollector) collectPredicateColumnsForDataSource(ds *DataSource) {
tblID := ds.TableInfo().ID
for _, col := range ds.Schema().Columns {
tblColID := model.TableColumnID{TableID: tblID, ColumnID: col.ID}
Expand All @@ -98,7 +109,7 @@ func (ds *DataSource) updateColMapAndAddPredicateColumns(c *predicateColumnColle
c.addPredicateColumnsFromExpressions(ds.pushedDownConds)
}

func (p *LogicalJoin) updateColMapAndAddPredicateColumns(c *predicateColumnCollector) {
func (c *columnStatsUsageCollector) collectPredicateColumnsForJoin(p *LogicalJoin) {
// The only schema change is merging two schemas so there is no new column.
// Assume statistics of all the columns in EqualConditions/LeftConditions/RightConditions/OtherConditions are needed.
exprs := make([]expression.Expression, 0, len(p.EqualConditions)+len(p.LeftConditions)+len(p.RightConditions)+len(p.OtherConditions))
Expand All @@ -117,7 +128,7 @@ func (p *LogicalJoin) updateColMapAndAddPredicateColumns(c *predicateColumnColle
c.addPredicateColumnsFromExpressions(exprs)
}

func (p *LogicalUnionAll) updateColMapAndAddPredicateColumns(c *predicateColumnCollector) {
func (c *columnStatsUsageCollector) collectPredicateColumnsForUnionAll(p *LogicalUnionAll) {
// statistics of the ith column of UnionAll come from statistics of the ith column of each child.
schemas := make([]*expression.Schema, 0, len(p.Children()))
relatedCols := make([]*expression.Column, 0, len(p.Children()))
Expand All @@ -133,120 +144,143 @@ func (p *LogicalUnionAll) updateColMapAndAddPredicateColumns(c *predicateColumnC
}
}

func (c *predicateColumnCollector) collectFromPlan(lp LogicalPlan) {
func (c *columnStatsUsageCollector) addHistNeededColumns(ds *DataSource) {
columns := expression.ExtractColumnsFromExpressions(c.cols[:0], ds.pushedDownConds, nil)
for _, col := range columns {
tblColID := model.TableColumnID{TableID: ds.physicalTableID, ColumnID: col.ID}
c.histNeededCols[tblColID] = struct{}{}
}
}

func (c *columnStatsUsageCollector) collectFromPlan(lp LogicalPlan) {
for _, child := range lp.Children() {
c.collectFromPlan(child)
}
switch x := lp.(type) {
case *DataSource:
x.updateColMapAndAddPredicateColumns(c)
case *LogicalIndexScan:
x.Source.updateColMapAndAddPredicateColumns(c)
// TODO: Is it redundant to add predicate columns from LogicalIndexScan.AccessConds? Is LogicalIndexScan.AccessConds a subset of LogicalIndexScan.Source.pushedDownConds.
c.addPredicateColumnsFromExpressions(x.AccessConds)
case *LogicalTableScan:
x.Source.updateColMapAndAddPredicateColumns(c)
// TODO: Is it redundant to add predicate columns from LogicalTableScan.AccessConds? Is LogicalTableScan.AccessConds a subset of LogicalTableScan.Source.pushedDownConds.
c.addPredicateColumnsFromExpressions(x.AccessConds)
case *TiKVSingleGather:
// TODO: Is it redundant?
x.Source.updateColMapAndAddPredicateColumns(c)
case *LogicalProjection:
// Schema change from children to self.
schema := x.Schema()
for i, expr := range x.Exprs {
c.updateColMapFromExpression(schema.Columns[i], expr)
}
case *LogicalSelection:
// Though the conditions in LogicalSelection are complex conditions which cannot be pushed down to DataSource, we still
// regard statistics of the columns in the conditions as needed.
c.addPredicateColumnsFromExpressions(x.Conditions)
case *LogicalAggregation:
// Just assume statistics of all the columns in GroupByItems are needed.
c.addPredicateColumnsFromExpressions(x.GroupByItems)
// Schema change from children to self.
schema := x.Schema()
for i, aggFunc := range x.AggFuncs {
c.updateColMapFromExpressions(schema.Columns[i], aggFunc.Args)
}
case *LogicalWindow:
// Statistics of the columns in LogicalWindow.PartitionBy are used in optimizeByShuffle4Window.
// It seems that we don't use statistics of the columns in LogicalWindow.OrderBy currently?
for _, item := range x.PartitionBy {
c.addPredicateColumn(item.Col)
}
// Schema change from children to self.
windowColumns := x.GetWindowResultColumns()
for i, col := range windowColumns {
c.updateColMapFromExpressions(col, x.WindowFuncDescs[i].Args)
}
case *LogicalJoin:
x.updateColMapAndAddPredicateColumns(c)
case *LogicalApply:
x.updateColMapAndAddPredicateColumns(c)
// Assume statistics of correlated columns are needed.
// Correlated columns can be found in LogicalApply.Children()[0].Schema(). Since we already visit LogicalApply.Children()[0],
// correlated columns must have existed in predicateColumnCollector.colMap.
for _, corCols := range x.CorCols {
c.addPredicateColumn(&corCols.Column)
}
case *LogicalSort:
// Assume statistics of all the columns in ByItems are needed.
for _, item := range x.ByItems {
c.addPredicateColumnsFromExpression(item.Expr)
}
case *LogicalTopN:
// Assume statistics of all the columns in ByItems are needed.
for _, item := range x.ByItems {
c.addPredicateColumnsFromExpression(item.Expr)
}
case *LogicalUnionAll:
x.updateColMapAndAddPredicateColumns(c)
case *LogicalPartitionUnionAll:
x.updateColMapAndAddPredicateColumns(c)
case *LogicalCTE:
// Visit seedPartLogicalPlan and recursivePartLogicalPlan first.
c.collectFromPlan(x.cte.seedPartLogicalPlan)
if x.cte.recursivePartLogicalPlan != nil {
c.collectFromPlan(x.cte.recursivePartLogicalPlan)
}
// Schema change from seedPlan/recursivePlan to self.
columns := x.Schema().Columns
seedColumns := x.cte.seedPartLogicalPlan.Schema().Columns
var recursiveColumns []*expression.Column
if x.cte.recursivePartLogicalPlan != nil {
recursiveColumns = x.cte.recursivePartLogicalPlan.Schema().Columns
}
relatedCols := make([]*expression.Column, 0, 2)
for i, col := range columns {
relatedCols = append(relatedCols[:0], seedColumns[i])
if recursiveColumns != nil {
relatedCols = append(relatedCols, recursiveColumns[i])
if c.collectMode&collectPredicateColumns != 0 {
switch x := lp.(type) {
case *DataSource:
c.collectPredicateColumnsForDataSource(x)
case *LogicalIndexScan:
c.collectPredicateColumnsForDataSource(x.Source)
c.addPredicateColumnsFromExpressions(x.AccessConds)
case *LogicalTableScan:
c.collectPredicateColumnsForDataSource(x.Source)
c.addPredicateColumnsFromExpressions(x.AccessConds)
case *LogicalProjection:
// Schema change from children to self.
schema := x.Schema()
for i, expr := range x.Exprs {
c.updateColMapFromExpressions(schema.Columns[i], []expression.Expression{expr})
}
c.updateColMap(col, relatedCols)
}
// If IsDistinct is true, then we use getColsNDV to calculate row count(see (*LogicalCTE).DeriveStat). In this case
// statistics of all the columns are needed.
if x.cte.IsDistinct {
for _, col := range columns {
c.addPredicateColumn(col)
case *LogicalSelection:
// Though the conditions in LogicalSelection are complex conditions which cannot be pushed down to DataSource, we still
// regard statistics of the columns in the conditions as needed.
c.addPredicateColumnsFromExpressions(x.Conditions)
case *LogicalAggregation:
// Just assume statistics of all the columns in GroupByItems are needed.
c.addPredicateColumnsFromExpressions(x.GroupByItems)
// Schema change from children to self.
schema := x.Schema()
for i, aggFunc := range x.AggFuncs {
c.updateColMapFromExpressions(schema.Columns[i], aggFunc.Args)
}
case *LogicalWindow:
// Statistics of the columns in LogicalWindow.PartitionBy are used in optimizeByShuffle4Window.
// We don't use statistics of the columns in LogicalWindow.OrderBy currently.
for _, item := range x.PartitionBy {
c.addPredicateColumn(item.Col)
}
// Schema change from children to self.
windowColumns := x.GetWindowResultColumns()
for i, col := range windowColumns {
c.updateColMapFromExpressions(col, x.WindowFuncDescs[i].Args)
}
case *LogicalJoin:
c.collectPredicateColumnsForJoin(x)
case *LogicalApply:
c.collectPredicateColumnsForJoin(&x.LogicalJoin)
// Assume statistics of correlated columns are needed.
// Correlated columns can be found in LogicalApply.Children()[0].Schema(). Since we already visit LogicalApply.Children()[0],
// correlated columns must have existed in columnStatsUsageCollector.colMap.
for _, corCols := range x.CorCols {
c.addPredicateColumn(&corCols.Column)
}
case *LogicalSort:
// Assume statistics of all the columns in ByItems are needed.
for _, item := range x.ByItems {
c.addPredicateColumnsFromExpressions([]expression.Expression{item.Expr})
}
case *LogicalTopN:
// Assume statistics of all the columns in ByItems are needed.
for _, item := range x.ByItems {
c.addPredicateColumnsFromExpressions([]expression.Expression{item.Expr})
}
case *LogicalUnionAll:
c.collectPredicateColumnsForUnionAll(x)
case *LogicalPartitionUnionAll:
c.collectPredicateColumnsForUnionAll(&x.LogicalUnionAll)
case *LogicalCTE:
// Visit seedPartLogicalPlan and recursivePartLogicalPlan first.
c.collectFromPlan(x.cte.seedPartLogicalPlan)
if x.cte.recursivePartLogicalPlan != nil {
c.collectFromPlan(x.cte.recursivePartLogicalPlan)
}
// Schema change from seedPlan/recursivePlan to self.
columns := x.Schema().Columns
seedColumns := x.cte.seedPartLogicalPlan.Schema().Columns
var recursiveColumns []*expression.Column
if x.cte.recursivePartLogicalPlan != nil {
recursiveColumns = x.cte.recursivePartLogicalPlan.Schema().Columns
}
relatedCols := make([]*expression.Column, 0, 2)
for i, col := range columns {
relatedCols = append(relatedCols[:0], seedColumns[i])
if recursiveColumns != nil {
relatedCols = append(relatedCols, recursiveColumns[i])
}
c.updateColMap(col, relatedCols)
}
// If IsDistinct is true, then we use getColsNDV to calculate row count(see (*LogicalCTE).DeriveStat). In this case
// statistics of all the columns are needed.
if x.cte.IsDistinct {
for _, col := range columns {
c.addPredicateColumn(col)
}
}
case *LogicalCTETable:
// Schema change from seedPlan to self.
for i, col := range x.Schema().Columns {
c.updateColMap(col, []*expression.Column{x.seedSchema.Columns[i]})
}
}
case *LogicalCTETable:
// Schema change from seedPlan to self.
for i, col := range x.Schema().Columns {
c.updateColMap(col, []*expression.Column{x.seedSchema.Columns[i]})
}
if c.collectMode&collectHistNeededColumns != 0 {
// Histogram-needed columns are the columns which occur in the conditions pushed down to DataSource.
// We don't consider LogicalCTE because seedLogicalPlan and recursiveLogicalPlan haven't got logical optimization
// yet(seedLogicalPlan and recursiveLogicalPlan are optimized in DeriveStats phase). Without logical optimization,
// there is no condition pushed down to DataSource so no histogram-needed column can be collected.
switch x := lp.(type) {
case *DataSource:
c.addHistNeededColumns(x)
case *LogicalIndexScan:
c.addHistNeededColumns(x.Source)
case *LogicalTableScan:
c.addHistNeededColumns(x.Source)
}
}
}

// CollectPredicateColumnsForTest collects predicate columns from logical plan. It is only for test.
func CollectPredicateColumnsForTest(lp LogicalPlan) []model.TableColumnID {
collector := newPredicateColumnCollector()
// CollectColumnStatsUsage collects column stats usage from logical plan.
// The first return value is predicate columns and the second return value is histogram-needed columns.
func CollectColumnStatsUsage(lp LogicalPlan) ([]model.TableColumnID, []model.TableColumnID) {
collector := newColumnStatsUsageCollector(collectPredicateColumns | collectHistNeededColumns)
collector.collectFromPlan(lp)
tblColIDs := make([]model.TableColumnID, 0, len(collector.predicateCols))
for tblColID := range collector.predicateCols {
tblColIDs = append(tblColIDs, tblColID)
set2slice := func(set map[model.TableColumnID]struct{}) []model.TableColumnID {
ret := make([]model.TableColumnID, 0, len(set))
for tblColID := range set {
ret = append(ret, tblColID)
}
return ret
}
return tblColIDs
return set2slice(collector.predicateCols), set2slice(collector.histNeededCols)
}
Loading

0 comments on commit 76aae0d

Please sign in to comment.