diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index b979c6bd655ea..57bfcb2d1cf54 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -651,10 +651,11 @@ func (p *LogicalJoin) constructInnerTableScanTask( ds.stats.Cardinality[i] = 1 } rowSize := ds.TblColHists.GetAvgRowSize(ds.TblCols, false) + sessVars := ds.ctx.GetSessionVars() copTask := &copTask{ tablePlan: ts, indexPlanFinished: true, - cst: ScanFactor * rowSize * ts.stats.RowCount, + cst: sessVars.ScanFactor * rowSize * ts.stats.RowCount, tblColHists: ds.TblColHists, keepOrder: ts.KeepOrder, } @@ -719,7 +720,8 @@ func (p *LogicalJoin) constructInnerIndexScanTask( } is.initSchema(ds.id, path.index, path.fullIdxCols, cop.tablePlan != nil) rowSize := is.indexScanRowSize(path.index, ds) - cop.cst = rowCount * rowSize * ScanFactor + sessVars := ds.ctx.GetSessionVars() + cop.cst = rowCount * rowSize * sessVars.ScanFactor indexConds, tblConds := splitIndexFilterConditions(filterConds, path.fullIdxCols, path.fullIdxColLens, ds.tableInfo) tmpPath := &accessPath{ indexFilters: indexConds, diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index b9e96acb17bff..ca317f950cab8 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -34,19 +34,6 @@ import ( ) const ( - // NetworkFactor is the network cost of transferring 1 byte data. - NetworkFactor = 1.0 - // CPUFactor is the CPU cost of processing one expression for one row. - CPUFactor = 3 * NetworkFactor - // CopCPUFactor is the CPU cost of processing one expression for one row in coprocessor. - CopCPUFactor = 3 * NetworkFactor - // ScanFactor is the IO cost of scanning 1 byte data on TiKV. - ScanFactor = 1.5 * NetworkFactor - // DescScanFactor is the IO cost of scanning 1 byte data on TiKV in desc order. - DescScanFactor = 2 * ScanFactor - memoryFactor = 0.001 - concurrencyFactor = 3.0 - selectionFactor = 0.8 distinctFactor = 0.8 ) @@ -516,9 +503,10 @@ func (ds *DataSource) convertToPartialIndexScan(prop *property.PhysicalProperty, rowSize := is.indexScanRowSize(idx, ds) isCovered = isCoveringIndex(ds.schema.Columns, path.fullIdxCols, path.fullIdxColLens, ds.tableInfo.PKIsHandle) indexConds := path.indexFilters + sessVars := ds.ctx.GetSessionVars() if indexConds != nil { var selectivity float64 - partialCost += rowCount * CopCPUFactor + partialCost += rowCount * sessVars.CopCPUFactor if path.countAfterAccess > 0 { selectivity = path.countAfterIndex / path.countAfterAccess } @@ -530,10 +518,10 @@ func (ds *DataSource) convertToPartialIndexScan(prop *property.PhysicalProperty, } indexPlan := PhysicalSelection{Conditions: indexConds}.Init(is.ctx, stats, ds.blockOffset) indexPlan.SetChildren(is) - partialCost += rowCount * rowSize * NetworkFactor + partialCost += rowCount * rowSize * sessVars.NetworkFactor return indexPlan, partialCost, rowCount, isCovered } - partialCost += rowCount * rowSize * NetworkFactor + partialCost += rowCount * rowSize * sessVars.NetworkFactor indexPlan = is return indexPlan, partialCost, rowCount, isCovered } @@ -545,6 +533,7 @@ func (ds *DataSource) convertToPartialTableScan(prop *property.PhysicalProperty, isCovered bool) { ts, partialCost, rowCount := ds.getOriginalPhysicalTableScan(prop, path, false) rowSize := ds.TblColHists.GetAvgRowSize(ds.TblCols, false) + sessVars := ds.ctx.GetSessionVars() if len(ts.filterCondition) > 0 { selectivity, _, err := ds.tableStats.HistColl.Selectivity(ds.ctx, ts.filterCondition) if err != nil { @@ -553,17 +542,18 @@ func (ds *DataSource) convertToPartialTableScan(prop *property.PhysicalProperty, } tablePlan = PhysicalSelection{Conditions: ts.filterCondition}.Init(ts.ctx, ts.stats.ScaleByExpectCnt(selectivity*rowCount), ds.blockOffset) tablePlan.SetChildren(ts) - partialCost += rowCount * CopCPUFactor - partialCost += selectivity * rowCount * rowSize * NetworkFactor + partialCost += rowCount * sessVars.CopCPUFactor + partialCost += selectivity * rowCount * rowSize * sessVars.NetworkFactor return tablePlan, partialCost, rowCount, true } - partialCost += rowCount * rowSize * NetworkFactor + partialCost += rowCount * rowSize * sessVars.NetworkFactor tablePlan = ts return tablePlan, partialCost, rowCount, true } func (ds *DataSource) buildIndexMergeTableScan(prop *property.PhysicalProperty, tableFilters []expression.Expression, totalRowCount float64) (PhysicalPlan, float64) { var partialCost float64 + sessVars := ds.ctx.GetSessionVars() ts := PhysicalTableScan{ Table: ds.tableInfo, Columns: ds.Columns, @@ -581,13 +571,13 @@ func (ds *DataSource) buildIndexMergeTableScan(prop *property.PhysicalProperty, } } rowSize := ds.TblColHists.GetAvgRowSize(ds.TblCols, false) - partialCost += totalRowCount * rowSize * ScanFactor + partialCost += totalRowCount * rowSize * sessVars.ScanFactor ts.stats = ds.tableStats.ScaleByExpectCnt(totalRowCount) if ds.statisticTable.Pseudo { ts.stats.StatsVersion = statistics.PseudoVersion } if len(tableFilters) > 0 { - partialCost += totalRowCount * CopCPUFactor + partialCost += totalRowCount * sessVars.CopCPUFactor selectivity, _, err := ds.tableStats.HistColl.Selectivity(ds.ctx, tableFilters) if err != nil { logutil.BgLogger().Debug("calculate selectivity failed, use selection factor", zap.Error(err)) @@ -743,8 +733,9 @@ func (is *PhysicalIndexScan) initSchema(id int, idx *model.IndexInfo, idxExprCol func (is *PhysicalIndexScan) addPushedDownSelection(copTask *copTask, p *DataSource, path *accessPath, finalStats *property.StatsInfo) { // Add filter condition to table plan now. indexConds, tableConds := path.indexFilters, path.tableFilters + sessVars := is.ctx.GetSessionVars() if indexConds != nil { - copTask.cst += copTask.count() * CopCPUFactor + copTask.cst += copTask.count() * sessVars.CopCPUFactor var selectivity float64 if path.countAfterAccess > 0 { selectivity = path.countAfterIndex / path.countAfterAccess @@ -757,7 +748,7 @@ func (is *PhysicalIndexScan) addPushedDownSelection(copTask *copTask, p *DataSou } if tableConds != nil { copTask.finishIndexPlan() - copTask.cst += copTask.count() * CopCPUFactor + copTask.cst += copTask.count() * sessVars.CopCPUFactor tableSel := PhysicalSelection{Conditions: tableConds}.Init(is.ctx, finalStats, is.blockOffset) tableSel.SetChildren(copTask.tablePlan) copTask.tablePlan = tableSel @@ -995,8 +986,9 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid func (ts *PhysicalTableScan) addPushedDownSelection(copTask *copTask, stats *property.StatsInfo) { // Add filter condition to table plan now. + sessVars := ts.ctx.GetSessionVars() if len(ts.filterCondition) > 0 { - copTask.cst += copTask.count() * CopCPUFactor + copTask.cst += copTask.count() * sessVars.CopCPUFactor sel := PhysicalSelection{Conditions: ts.filterCondition}.Init(ts.ctx, stats, ts.blockOffset) sel.SetChildren(ts) copTask.tablePlan = sel @@ -1048,11 +1040,12 @@ func (ds *DataSource) getOriginalPhysicalTableScan(prop *property.PhysicalProper // for all columns now, as we do in `deriveStatsByFilter`. ts.stats = ds.tableStats.ScaleByExpectCnt(rowCount) rowSize := ds.TblColHists.GetAvgRowSize(ds.TblCols, false) - cost := rowCount * rowSize * ScanFactor + sessVars := ds.ctx.GetSessionVars() + cost := rowCount * rowSize * sessVars.ScanFactor if isMatchProp { if prop.Items[0].Desc { ts.Desc = true - cost = rowCount * rowSize * DescScanFactor + cost = rowCount * rowSize * sessVars.DescScanFactor } ts.KeepOrder = true } @@ -1090,11 +1083,12 @@ func (ds *DataSource) getOriginalPhysicalIndexScan(prop *property.PhysicalProper } is.stats = ds.tableStats.ScaleByExpectCnt(rowCount) rowSize := is.indexScanRowSize(idx, ds) - cost := rowCount * rowSize * ScanFactor + sessVars := ds.ctx.GetSessionVars() + cost := rowCount * rowSize * sessVars.ScanFactor if isMatchProp { if prop.Items[0].Desc { is.Desc = true - cost = rowCount * rowSize * DescScanFactor + cost = rowCount * rowSize * sessVars.DescScanFactor } is.KeepOrder = true } diff --git a/planner/core/task.go b/planner/core/task.go index 2ae87eae85110..de19ff1524abc 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -122,15 +122,16 @@ func (t *copTask) finishIndexPlan() { } cnt := t.count() t.indexPlanFinished = true + sessVars := t.indexPlan.SCtx().GetSessionVars() // Network cost of transferring rows of index scan to TiDB. - t.cst += cnt * NetworkFactor * t.tblColHists.GetAvgRowSize(t.indexPlan.Schema().Columns, true) + t.cst += cnt * sessVars.NetworkFactor * t.tblColHists.GetAvgRowSize(t.indexPlan.Schema().Columns, true) if t.tablePlan == nil { return } // Calculate the IO cost of table scan here because we cannot know its stats until we finish index plan. t.tablePlan.(*PhysicalTableScan).stats = t.indexPlan.statsInfo() rowSize := t.tblColHists.GetAvgRowSize(t.tblCols, false) - t.cst += cnt * rowSize * ScanFactor + t.cst += cnt * rowSize * sessVars.ScanFactor } func (p *basePhysicalPlan) attach2Task(tasks ...task) task { @@ -145,17 +146,18 @@ func (p *PhysicalApply) attach2Task(tasks ...task) task { p.schema = buildPhysicalJoinSchema(p.JoinType, p) var cpuCost float64 lCount := lTask.count() + sessVars := p.ctx.GetSessionVars() if len(p.LeftConditions) > 0 { - cpuCost += lCount * CPUFactor + cpuCost += lCount * sessVars.CPUFactor lCount *= selectionFactor } rCount := rTask.count() if len(p.RightConditions) > 0 { - cpuCost += lCount * rCount * CPUFactor + cpuCost += lCount * rCount * sessVars.CPUFactor rCount *= selectionFactor } if len(p.EqualConditions)+len(p.OtherConditions) > 0 { - cpuCost += lCount * rCount * CPUFactor + cpuCost += lCount * rCount * sessVars.CPUFactor } return &rootTask{ p: p, @@ -182,15 +184,16 @@ func (p *PhysicalIndexMergeJoin) attach2Task(tasks ...task) task { func (p *PhysicalIndexMergeJoin) GetCost(outerTask, innerTask task) float64 { var cpuCost float64 outerCnt, innerCnt := outerTask.count(), innerTask.count() + sessVars := p.ctx.GetSessionVars() // Add the cost of evaluating outer filter, since inner filter of index join // is always empty, we can simply tell whether outer filter is empty using the // summed length of left/right conditions. if len(p.LeftConditions)+len(p.RightConditions) > 0 { - cpuCost += CPUFactor * outerCnt + cpuCost += sessVars.CPUFactor * outerCnt outerCnt *= selectionFactor } // Cost of extracting lookup keys. - innerCPUCost := CPUFactor * outerCnt + innerCPUCost := sessVars.CPUFactor * outerCnt // Cost of sorting and removing duplicate lookup keys: // (outerCnt / batchSize) * (sortFactor + 1.0) * batchSize * cpuFactor // If `p.NeedOuterSort` is true, the sortFactor is batchSize * Log2(batchSize). @@ -201,12 +204,12 @@ func (p *PhysicalIndexMergeJoin) GetCost(outerTask, innerTask task) float64 { sortFactor = math.Log2(float64(batchSize)) } if batchSize > 2 { - innerCPUCost += outerCnt * (sortFactor + 1.0) * CPUFactor + innerCPUCost += outerCnt * (sortFactor + 1.0) * sessVars.CPUFactor } // Add cost of building inner executors. CPU cost of building copTasks: // (outerCnt / batchSize) * (batchSize * distinctFactor) * cpuFactor // Since we don't know the number of copTasks built, ignore these network cost now. - innerCPUCost += outerCnt * distinctFactor * CPUFactor + innerCPUCost += outerCnt * distinctFactor * sessVars.CPUFactor innerConcurrency := float64(p.ctx.GetSessionVars().IndexLookupJoinConcurrency) cpuCost += innerCPUCost / innerConcurrency // Cost of merge join in inner worker. @@ -226,15 +229,15 @@ func (p *PhysicalIndexMergeJoin) GetCost(outerTask, innerTask task) float64 { // linear execution. In a word, the merge join only run in parallel for the first // `innerConcurrency` number of inner tasks. if outerCnt/batchSize >= innerConcurrency { - probeCost = (numPairs - batchSize*avgProbeCnt*(innerConcurrency-1)) * CPUFactor + probeCost = (numPairs - batchSize*avgProbeCnt*(innerConcurrency-1)) * sessVars.CPUFactor } else { - probeCost = batchSize * avgProbeCnt * CPUFactor + probeCost = batchSize * avgProbeCnt * sessVars.CPUFactor } - cpuCost += probeCost + (innerConcurrency+1.0)*concurrencyFactor + cpuCost += probeCost + (innerConcurrency+1.0)*sessVars.ConcurrencyFactor // Index merge join save the join results in inner worker. // So the memory cost consider the results size for each batch. - memoryCost := innerConcurrency * (batchSize * avgProbeCnt) * memoryFactor + memoryCost := innerConcurrency * (batchSize * avgProbeCnt) * sessVars.MemoryFactor innerPlanCost := outerCnt * innerTask.cost() return outerTask.cost() + innerPlanCost + cpuCost + memoryCost @@ -259,30 +262,31 @@ func (p *PhysicalIndexHashJoin) attach2Task(tasks ...task) task { func (p *PhysicalIndexHashJoin) GetCost(outerTask, innerTask task) float64 { var cpuCost float64 outerCnt, innerCnt := outerTask.count(), innerTask.count() + sessVars := p.ctx.GetSessionVars() // Add the cost of evaluating outer filter, since inner filter of index join // is always empty, we can simply tell whether outer filter is empty using the // summed length of left/right conditions. if len(p.LeftConditions)+len(p.RightConditions) > 0 { - cpuCost += CPUFactor * outerCnt + cpuCost += sessVars.CPUFactor * outerCnt outerCnt *= selectionFactor } // Cost of extracting lookup keys. - innerCPUCost := CPUFactor * outerCnt + innerCPUCost := sessVars.CPUFactor * outerCnt // Cost of sorting and removing duplicate lookup keys: // (outerCnt / batchSize) * (batchSize * Log2(batchSize) + batchSize) * CPUFactor - batchSize := math.Min(float64(p.ctx.GetSessionVars().IndexJoinBatchSize), outerCnt) + batchSize := math.Min(float64(sessVars.IndexJoinBatchSize), outerCnt) if batchSize > 2 { - innerCPUCost += outerCnt * (math.Log2(batchSize) + 1) * CPUFactor + innerCPUCost += outerCnt * (math.Log2(batchSize) + 1) * sessVars.CPUFactor } // Add cost of building inner executors. CPU cost of building copTasks: // (outerCnt / batchSize) * (batchSize * distinctFactor) * CPUFactor // Since we don't know the number of copTasks built, ignore these network cost now. - innerCPUCost += outerCnt * distinctFactor * CPUFactor - concurrency := float64(p.ctx.GetSessionVars().IndexLookupJoinConcurrency) + innerCPUCost += outerCnt * distinctFactor * sessVars.CPUFactor + concurrency := float64(sessVars.IndexLookupJoinConcurrency) cpuCost += innerCPUCost / concurrency // CPU cost of building hash table for outer results concurrently. // (outerCnt / batchSize) * (batchSize * CPUFactor) - outerCPUCost := outerCnt * CPUFactor + outerCPUCost := outerCnt * sessVars.CPUFactor cpuCost += outerCPUCost / concurrency // Cost of probing hash table concurrently. numPairs := outerCnt * innerCnt @@ -300,16 +304,16 @@ func (p *PhysicalIndexHashJoin) GetCost(outerTask, innerTask task) float64 { // in parallel for the first `innerConcurrency` number of inner tasks. var probeCost float64 if outerCnt/batchSize >= concurrency { - probeCost = (numPairs - batchSize*innerCnt*(concurrency-1)) * CPUFactor + probeCost = (numPairs - batchSize*innerCnt*(concurrency-1)) * sessVars.CPUFactor } else { - probeCost = batchSize * innerCnt * CPUFactor + probeCost = batchSize * innerCnt * sessVars.CPUFactor } cpuCost += probeCost // Cost of additional concurrent goroutines. - cpuCost += (concurrency + 1.0) * concurrencyFactor + cpuCost += (concurrency + 1.0) * sessVars.ConcurrencyFactor // Memory cost of hash tables for outer rows. The computed result is the upper bound, // since the executor is pipelined and not all workers are always in full load. - memoryCost := concurrency * (batchSize * distinctFactor) * innerCnt * memoryFactor + memoryCost := concurrency * (batchSize * distinctFactor) * innerCnt * sessVars.MemoryFactor // Cost of inner child plan, i.e, mainly I/O and network cost. innerPlanCost := outerCnt * innerTask.cost() return outerTask.cost() + innerPlanCost + cpuCost + memoryCost @@ -334,28 +338,29 @@ func (p *PhysicalIndexJoin) attach2Task(tasks ...task) task { func (p *PhysicalIndexJoin) GetCost(outerTask, innerTask task) float64 { var cpuCost float64 outerCnt, innerCnt := outerTask.count(), innerTask.count() + sessVars := p.ctx.GetSessionVars() // Add the cost of evaluating outer filter, since inner filter of index join // is always empty, we can simply tell whether outer filter is empty using the // summed length of left/right conditions. if len(p.LeftConditions)+len(p.RightConditions) > 0 { - cpuCost += CPUFactor * outerCnt + cpuCost += sessVars.CPUFactor * outerCnt outerCnt *= selectionFactor } // Cost of extracting lookup keys. - innerCPUCost := CPUFactor * outerCnt + innerCPUCost := sessVars.CPUFactor * outerCnt // Cost of sorting and removing duplicate lookup keys: // (outerCnt / batchSize) * (batchSize * Log2(batchSize) + batchSize) * CPUFactor batchSize := math.Min(float64(p.ctx.GetSessionVars().IndexJoinBatchSize), outerCnt) if batchSize > 2 { - innerCPUCost += outerCnt * (math.Log2(batchSize) + 1) * CPUFactor + innerCPUCost += outerCnt * (math.Log2(batchSize) + 1) * sessVars.CPUFactor } // Add cost of building inner executors. CPU cost of building copTasks: // (outerCnt / batchSize) * (batchSize * distinctFactor) * CPUFactor // Since we don't know the number of copTasks built, ignore these network cost now. - innerCPUCost += outerCnt * distinctFactor * CPUFactor + innerCPUCost += outerCnt * distinctFactor * sessVars.CPUFactor // CPU cost of building hash table for inner results: // (outerCnt / batchSize) * (batchSize * distinctFactor) * innerCnt * CPUFactor - innerCPUCost += outerCnt * distinctFactor * innerCnt * CPUFactor + innerCPUCost += outerCnt * distinctFactor * innerCnt * sessVars.CPUFactor innerConcurrency := float64(p.ctx.GetSessionVars().IndexLookupJoinConcurrency) cpuCost += innerCPUCost / innerConcurrency // Cost of probing hash table in main thread. @@ -368,12 +373,12 @@ func (p *PhysicalIndexJoin) GetCost(outerTask, innerTask task) float64 { numPairs = 0 } } - probeCost := numPairs * CPUFactor + probeCost := numPairs * sessVars.CPUFactor // Cost of additional concurrent goroutines. - cpuCost += probeCost + (innerConcurrency+1.0)*concurrencyFactor + cpuCost += probeCost + (innerConcurrency+1.0)*sessVars.ConcurrencyFactor // Memory cost of hash tables for inner rows. The computed result is the upper bound, // since the executor is pipelined and not all workers are always in full load. - memoryCost := innerConcurrency * (batchSize * distinctFactor) * innerCnt * memoryFactor + memoryCost := innerConcurrency * (batchSize * distinctFactor) * innerCnt * sessVars.MemoryFactor // Cost of inner child plan, i.e, mainly I/O and network cost. innerPlanCost := outerCnt * innerTask.cost() return outerTask.cost() + innerPlanCost + cpuCost + memoryCost @@ -385,9 +390,10 @@ func (p *PhysicalHashJoin) GetCost(lCnt, rCnt float64) float64 { if p.InnerChildIdx == 1 { innerCnt, outerCnt = rCnt, lCnt } + sessVars := p.ctx.GetSessionVars() // Cost of building hash table. - cpuCost := innerCnt * CPUFactor - memoryCost := innerCnt * memoryFactor + cpuCost := innerCnt * sessVars.CPUFactor + memoryCost := innerCnt * sessVars.MemoryFactor // Number of matched row pairs regarding the equal join conditions. helper := &fullJoinRowCountHelper{ cartesian: false, @@ -417,16 +423,16 @@ func (p *PhysicalHashJoin) GetCost(lCnt, rCnt float64) float64 { } // Cost of quering hash table is cheap actually, so we just compute the cost of // evaluating `OtherConditions` and joining row pairs. - probeCost := numPairs * CPUFactor + probeCost := numPairs * sessVars.CPUFactor // Cost of evaluating outer filter. if len(p.LeftConditions)+len(p.RightConditions) > 0 { // Input outer count for the above compution should be adjusted by selectionFactor. probeCost *= selectionFactor - probeCost += outerCnt * CPUFactor + probeCost += outerCnt * sessVars.CPUFactor } probeCost /= float64(p.Concurrency) // Cost of additional concurrent goroutines. - cpuCost += probeCost + float64(p.Concurrency+1)*concurrencyFactor + cpuCost += probeCost + float64(p.Concurrency+1)*sessVars.ConcurrencyFactor return cpuCost + memoryCost } @@ -471,18 +477,19 @@ func (p *PhysicalMergeJoin) GetCost(lCnt, rCnt float64) float64 { numPairs = 0 } } - probeCost := numPairs * CPUFactor + sessVars := p.ctx.GetSessionVars() + probeCost := numPairs * sessVars.CPUFactor // Cost of evaluating outer filters. var cpuCost float64 if len(p.LeftConditions)+len(p.RightConditions) > 0 { probeCost *= selectionFactor - cpuCost += outerCnt * CPUFactor + cpuCost += outerCnt * sessVars.CPUFactor } cpuCost += probeCost // For merge join, only one group of rows with same join key(not null) are cached, // we compute averge memory cost using estimated group size. NDV := getCardinality(innerKeys, innerSchema, innerStats) - memoryCost := (innerStats.RowCount / NDV) * memoryFactor + memoryCost := (innerStats.RowCount / NDV) * sessVars.MemoryFactor return cpuCost + memoryCost } @@ -535,6 +542,7 @@ func finishCopTask(ctx sessionctx.Context, task task) task { if !ok { return task } + sessVars := ctx.GetSessionVars() // copTasks are run in parallel, to make the estimated cost closer to execution time, we amortize // the cost to cop iterator workers. According to `CopClient::Send`, the concurrency // is Min(DistSQLScanConcurrency, numRegionsInvolvedInScan), since we cannot infer @@ -543,7 +551,7 @@ func finishCopTask(ctx sessionctx.Context, task task) task { t.finishIndexPlan() // Network cost of transferring rows of table scan to TiDB. if t.tablePlan != nil { - t.cst += t.count() * NetworkFactor * t.tblColHists.GetAvgRowSize(t.tablePlan.Schema().Columns, false) + t.cst += t.count() * sessVars.NetworkFactor * t.tblColHists.GetAvgRowSize(t.tablePlan.Schema().Columns, false) } t.cst /= copIterWorkers newTask := &rootTask{ @@ -566,17 +574,17 @@ func finishCopTask(ctx sessionctx.Context, task task) task { // (indexRows / batchSize) * batchSize * CPUFactor // Since we don't know the number of copTasks built, ignore these network cost now. indexRows := t.indexPlan.statsInfo().RowCount - newTask.cst += indexRows * CPUFactor + newTask.cst += indexRows * sessVars.CPUFactor // Add cost of worker goroutines in index lookup. - numTblWorkers := float64(t.indexPlan.SCtx().GetSessionVars().IndexLookupConcurrency) - newTask.cst += (numTblWorkers + 1) * concurrencyFactor + numTblWorkers := float64(sessVars.IndexLookupConcurrency) + newTask.cst += (numTblWorkers + 1) * sessVars.ConcurrencyFactor // When building table reader executor for each batch, we would sort the handles. CPU // cost of sort is: // CPUFactor * batchSize * Log2(batchSize) * (indexRows / batchSize) - indexLookupSize := float64(t.indexPlan.SCtx().GetSessionVars().IndexLookupSize) + indexLookupSize := float64(sessVars.IndexLookupSize) batchSize := math.Min(indexLookupSize, indexRows) if batchSize > 2 { - sortCPUCost := (indexRows * math.Log2(batchSize) * CPUFactor) / numTblWorkers + sortCPUCost := (indexRows * math.Log2(batchSize) * sessVars.CPUFactor) / numTblWorkers newTask.cst += sortCPUCost } // Also, we need to sort the retrieved rows if index lookup reader is expected to return @@ -586,7 +594,7 @@ func finishCopTask(ctx sessionctx.Context, task task) task { selectivity := tableRows / indexRows batchSize = math.Min(indexLookupSize*selectivity, tableRows) if t.keepOrder && batchSize > 2 { - sortCPUCost := (tableRows * math.Log2(batchSize) * CPUFactor) / numTblWorkers + sortCPUCost := (tableRows * math.Log2(batchSize) * sessVars.CPUFactor) / numTblWorkers newTask.cst += sortCPUCost } if t.doubleReadNeedProj { @@ -701,6 +709,7 @@ func (p *PhysicalTopN) GetCost(count float64, isRoot bool) float64 { if heapSize < 2.0 { heapSize = 2.0 } + sessVars := p.ctx.GetSessionVars() // Ignore the cost of `doCompaction` in current implementation of `TopNExec`, since it is the // special side-effect of our Chunk format in TiDB layer, which may not exist in coprocessor's // implementation, or may be removed in the future if we change data format. @@ -709,11 +718,11 @@ func (p *PhysicalTopN) GetCost(count float64, isRoot bool) float64 { // row. var cpuCost float64 if isRoot { - cpuCost = count * math.Log2(heapSize) * CPUFactor + cpuCost = count * math.Log2(heapSize) * sessVars.CPUFactor } else { - cpuCost = count * math.Log2(heapSize) * CopCPUFactor + cpuCost = count * math.Log2(heapSize) * sessVars.CopCPUFactor } - memoryCost := heapSize * memoryFactor + memoryCost := heapSize * sessVars.MemoryFactor return cpuCost + memoryCost } @@ -740,7 +749,8 @@ func (p *PhysicalSort) GetCost(count float64) float64 { if count < 2.0 { count = 2.0 } - return count*math.Log2(count)*CPUFactor + count*memoryFactor + sessVars := p.ctx.GetSessionVars() + return count*math.Log2(count)*sessVars.CPUFactor + count*sessVars.MemoryFactor } func (p *PhysicalSort) attach2Task(tasks ...task) task { @@ -797,13 +807,14 @@ func (p *PhysicalTopN) attach2Task(tasks ...task) task { // GetCost computes the cost of projection operator itself. func (p *PhysicalProjection) GetCost(count float64) float64 { - cpuCost := count * CPUFactor - concurrency := float64(p.ctx.GetSessionVars().ProjectionConcurrency) + sessVars := p.ctx.GetSessionVars() + cpuCost := count * sessVars.CPUFactor + concurrency := float64(sessVars.ProjectionConcurrency) if concurrency <= 0 { return cpuCost } cpuCost /= concurrency - concurrencyCost := (1 + concurrency) * concurrencyFactor + concurrencyCost := (1 + concurrency) * sessVars.ConcurrencyFactor return cpuCost + concurrencyCost } @@ -831,14 +842,16 @@ func (p *PhysicalUnionAll) attach2Task(tasks ...task) task { childPlans = append(childPlans, task.plan()) } p.SetChildren(childPlans...) + sessVars := p.ctx.GetSessionVars() // Children of UnionExec are executed in parallel. - t.cst = childMaxCost + float64(1+len(tasks))*concurrencyFactor + t.cst = childMaxCost + float64(1+len(tasks))*sessVars.ConcurrencyFactor return t } func (sel *PhysicalSelection) attach2Task(tasks ...task) task { + sessVars := sel.ctx.GetSessionVars() t := finishCopTask(sel.ctx, tasks[0].copy()) - t.addCost(t.count() * CPUFactor) + t.addCost(t.count() * sessVars.CPUFactor) t = attachPlan2Task(sel, t) return t } @@ -994,13 +1007,14 @@ func (p *PhysicalStreamAgg) attach2Task(tasks ...task) task { func (p *PhysicalStreamAgg) GetCost(inputRows float64, isRoot bool) float64 { aggFuncFactor := p.getAggFuncCostFactor() var cpuCost float64 + sessVars := p.ctx.GetSessionVars() if isRoot { - cpuCost = inputRows * CPUFactor * aggFuncFactor + cpuCost = inputRows * sessVars.CPUFactor * aggFuncFactor } else { - cpuCost = inputRows * CopCPUFactor * aggFuncFactor + cpuCost = inputRows * sessVars.CopCPUFactor * aggFuncFactor } rowsPerGroup := inputRows / p.statsInfo().RowCount - memoryCost := rowsPerGroup * distinctFactor * memoryFactor * float64(p.numDistinctFunc()) + memoryCost := rowsPerGroup * distinctFactor * sessVars.MemoryFactor * float64(p.numDistinctFunc()) return cpuCost + memoryCost } @@ -1072,20 +1086,21 @@ func (p *PhysicalHashAgg) GetCost(inputRows float64, isRoot bool) float64 { numDistinctFunc := p.numDistinctFunc() aggFuncFactor := p.getAggFuncCostFactor() var cpuCost float64 + sessVars := p.ctx.GetSessionVars() if isRoot { - cpuCost = inputRows * CPUFactor * aggFuncFactor + cpuCost = inputRows * sessVars.CPUFactor * aggFuncFactor divisor, con := p.cpuCostDivisor(numDistinctFunc > 0) if divisor > 0 { cpuCost /= divisor // Cost of additional goroutines. - cpuCost += (con + 1) * concurrencyFactor + cpuCost += (con + 1) * sessVars.ConcurrencyFactor } } else { - cpuCost = inputRows * CopCPUFactor * aggFuncFactor + cpuCost = inputRows * sessVars.CopCPUFactor * aggFuncFactor } - memoryCost := cardinality * memoryFactor * float64(len(p.AggFuncs)) + memoryCost := cardinality * sessVars.MemoryFactor * float64(len(p.AggFuncs)) // When aggregation has distinct flag, we would allocate a map for each group to // check duplication. - memoryCost += inputRows * distinctFactor * memoryFactor * float64(numDistinctFunc) + memoryCost += inputRows * distinctFactor * sessVars.MemoryFactor * float64(numDistinctFunc) return cpuCost + memoryCost } diff --git a/planner/implementation/datasource.go b/planner/implementation/datasource.go index 708504838a81e..45b7feeacd7f5 100644 --- a/planner/implementation/datasource.go +++ b/planner/implementation/datasource.go @@ -55,12 +55,13 @@ func NewTableReaderImpl(reader *plannercore.PhysicalTableReader, hists *statisti func (impl *TableReaderImpl) CalcCost(outCount float64, childCosts []float64, children ...*memo.Group) float64 { reader := impl.plan.(*plannercore.PhysicalTableReader) width := impl.tblColHists.GetAvgRowSize(reader.Schema().Columns, false) - networkCost := outCount * plannercore.NetworkFactor * width + sessVars := reader.SCtx().GetSessionVars() + networkCost := outCount * sessVars.NetworkFactor * width // copTasks are run in parallel, to make the estimated cost closer to execution time, we amortize // the cost to cop iterator workers. According to `CopClient::Send`, the concurrency // is Min(DistSQLScanConcurrency, numRegionsInvolvedInScan), since we cannot infer // the number of regions involved, we simply use DistSQLScanConcurrency. - copIterWorkers := float64(reader.SCtx().GetSessionVars().DistSQLScanConcurrency) + copIterWorkers := float64(sessVars.DistSQLScanConcurrency) impl.cost = (networkCost + childCosts[0]) / copIterWorkers return impl.cost } @@ -87,9 +88,10 @@ func NewTableScanImpl(ts *plannercore.PhysicalTableScan, cols []*expression.Colu func (impl *TableScanImpl) CalcCost(outCount float64, childCosts []float64, children ...*memo.Group) float64 { ts := impl.plan.(*plannercore.PhysicalTableScan) width := impl.tblColHists.GetAvgRowSize(impl.tblCols, false) - impl.cost = outCount * plannercore.ScanFactor * width + sessVars := ts.SCtx().GetSessionVars() + impl.cost = outCount * sessVars.ScanFactor * width if ts.Desc { - impl.cost = outCount * plannercore.DescScanFactor * width + impl.cost = outCount * sessVars.DescScanFactor * width } return impl.cost } diff --git a/session/session.go b/session/session.go index 3ce811ab673a9..92d51ac75e6a6 100644 --- a/session/session.go +++ b/session/session.go @@ -1781,6 +1781,13 @@ var builtinGlobalVariable = []string{ variable.TiDBOptInSubqToJoinAndAgg, variable.TiDBOptCorrelationThreshold, variable.TiDBOptCorrelationExpFactor, + variable.TiDBOptCPUFactor, + variable.TiDBOptCopCPUFactor, + variable.TiDBOptNetworkFactor, + variable.TiDBOptScanFactor, + variable.TiDBOptDescScanFactor, + variable.TiDBOptMemoryFactor, + variable.TiDBOptConcurrencyFactor, variable.TiDBDistSQLScanConcurrency, variable.TiDBInitChunkSize, variable.TiDBMaxChunkSize, diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 8680493809d32..fc907c73f4c9a 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -293,6 +293,21 @@ type SessionVars struct { // CorrelationExpFactor is used to control the heuristic approach of row count estimation when CorrelationThreshold is not met. CorrelationExpFactor int + // CPUFactor is the CPU cost of processing one expression for one row. + CPUFactor float64 + // CopCPUFactor is the CPU cost of processing one expression for one row in coprocessor. + CopCPUFactor float64 + // NetworkFactor is the network cost of transferring 1 byte data. + NetworkFactor float64 + // ScanFactor is the IO cost of scanning 1 byte data on TiKV. + ScanFactor float64 + // DescScanFactor is the IO cost of scanning 1 byte data on TiKV in desc order. + DescScanFactor float64 + // MemoryFactor is the memory cost of storing one tuple. + MemoryFactor float64 + // ConcurrencyFactor is the CPU cost of additional one goroutine. + ConcurrencyFactor float64 + // CurrInsertValues is used to record current ValuesExpr's values. // See http://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_values CurrInsertValues chunk.Row @@ -477,6 +492,13 @@ func NewSessionVars() *SessionVars { allowInSubqToJoinAndAgg: DefOptInSubqToJoinAndAgg, CorrelationThreshold: DefOptCorrelationThreshold, CorrelationExpFactor: DefOptCorrelationExpFactor, + CPUFactor: DefOptCPUFactor, + CopCPUFactor: DefOptCopCPUFactor, + NetworkFactor: DefOptNetworkFactor, + ScanFactor: DefOptScanFactor, + DescScanFactor: DefOptDescScanFactor, + MemoryFactor: DefOptMemoryFactor, + ConcurrencyFactor: DefOptConcurrencyFactor, EnableRadixJoin: false, EnableVectorizedExpression: DefEnableVectorizedExpression, L2CacheSize: cpuid.CPU.Cache.L2, @@ -778,6 +800,20 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.CorrelationThreshold = tidbOptFloat64(val, DefOptCorrelationThreshold) case TiDBOptCorrelationExpFactor: s.CorrelationExpFactor = int(tidbOptInt64(val, DefOptCorrelationExpFactor)) + case TiDBOptCPUFactor: + s.CPUFactor = tidbOptFloat64(val, DefOptCPUFactor) + case TiDBOptCopCPUFactor: + s.CopCPUFactor = tidbOptFloat64(val, DefOptCopCPUFactor) + case TiDBOptNetworkFactor: + s.NetworkFactor = tidbOptFloat64(val, DefOptNetworkFactor) + case TiDBOptScanFactor: + s.ScanFactor = tidbOptFloat64(val, DefOptScanFactor) + case TiDBOptDescScanFactor: + s.DescScanFactor = tidbOptFloat64(val, DefOptDescScanFactor) + case TiDBOptMemoryFactor: + s.MemoryFactor = tidbOptFloat64(val, DefOptMemoryFactor) + case TiDBOptConcurrencyFactor: + s.ConcurrencyFactor = tidbOptFloat64(val, DefOptConcurrencyFactor) case TiDBIndexLookupConcurrency: s.IndexLookupConcurrency = tidbOptPositiveInt32(val, DefIndexLookupConcurrency) case TiDBIndexLookupJoinConcurrency: diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 5b97fa5e7d168..d9db98d454858 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -651,6 +651,13 @@ var defaultSysVars = []*SysVar{ {ScopeGlobal | ScopeSession, TiDBOptInSubqToJoinAndAgg, BoolToIntStr(DefOptInSubqToJoinAndAgg)}, {ScopeGlobal | ScopeSession, TiDBOptCorrelationThreshold, strconv.FormatFloat(DefOptCorrelationThreshold, 'f', -1, 64)}, {ScopeGlobal | ScopeSession, TiDBOptCorrelationExpFactor, strconv.Itoa(DefOptCorrelationExpFactor)}, + {ScopeGlobal | ScopeSession, TiDBOptCPUFactor, strconv.FormatFloat(DefOptCPUFactor, 'f', -1, 64)}, + {ScopeGlobal | ScopeSession, TiDBOptCopCPUFactor, strconv.FormatFloat(DefOptCopCPUFactor, 'f', -1, 64)}, + {ScopeGlobal | ScopeSession, TiDBOptNetworkFactor, strconv.FormatFloat(DefOptNetworkFactor, 'f', -1, 64)}, + {ScopeGlobal | ScopeSession, TiDBOptScanFactor, strconv.FormatFloat(DefOptScanFactor, 'f', -1, 64)}, + {ScopeGlobal | ScopeSession, TiDBOptDescScanFactor, strconv.FormatFloat(DefOptDescScanFactor, 'f', -1, 64)}, + {ScopeGlobal | ScopeSession, TiDBOptMemoryFactor, strconv.FormatFloat(DefOptMemoryFactor, 'f', -1, 64)}, + {ScopeGlobal | ScopeSession, TiDBOptConcurrencyFactor, strconv.FormatFloat(DefOptConcurrencyFactor, 'f', -1, 64)}, {ScopeGlobal | ScopeSession, TiDBIndexJoinBatchSize, strconv.Itoa(DefIndexJoinBatchSize)}, {ScopeGlobal | ScopeSession, TiDBIndexLookupSize, strconv.Itoa(DefIndexLookupSize)}, {ScopeGlobal | ScopeSession, TiDBIndexLookupConcurrency, strconv.Itoa(DefIndexLookupConcurrency)}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index b489bc56face2..8e3aefacd4965 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -172,6 +172,21 @@ const ( // tidb_opt_correlation_exp_factor is an exponential factor to control heuristic approach when tidb_opt_correlation_threshold is not satisfied. TiDBOptCorrelationExpFactor = "tidb_opt_correlation_exp_factor" + // tidb_opt_cpu_factor is the CPU cost of processing one expression for one row. + TiDBOptCPUFactor = "tidb_opt_cpu_factor" + // tidb_opt_copcpu_factor is the CPU cost of processing one expression for one row in coprocessor. + TiDBOptCopCPUFactor = "tidb_opt_copcpu_factor" + // tidb_opt_network_factor is the network cost of transferring 1 byte data. + TiDBOptNetworkFactor = "tidb_opt_network_factor" + // tidb_opt_scan_factor is the IO cost of scanning 1 byte data on TiKV. + TiDBOptScanFactor = "tidb_opt_scan_factor" + // tidb_opt_desc_factor is the IO cost of scanning 1 byte data on TiKV in desc order. + TiDBOptDescScanFactor = "tidb_opt_desc_factor" + // tidb_opt_memory_factor is the memory cost of storing one tuple. + TiDBOptMemoryFactor = "tidb_opt_memory_factor" + // tidb_opt_concurrency_factor is the CPU cost of additional one goroutine. + TiDBOptConcurrencyFactor = "tidb_opt_concurrency_factor" + // tidb_index_join_batch_size is used to set the batch size of a index lookup join. // The index lookup join fetches batches of data from outer executor and constructs ranges for inner executor. // This value controls how much of data in a batch to do the index join. @@ -321,6 +336,13 @@ const ( DefOptWriteRowID = false DefOptCorrelationThreshold = 0.9 DefOptCorrelationExpFactor = 1 + DefOptCPUFactor = 3.0 + DefOptCopCPUFactor = 3.0 + DefOptNetworkFactor = 1.0 + DefOptScanFactor = 1.5 + DefOptDescScanFactor = 3.0 + DefOptMemoryFactor = 0.001 + DefOptConcurrencyFactor = 3.0 DefOptInSubqToJoinAndAgg = true DefBatchInsert = false DefBatchDelete = false diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index e02efe3ed51d5..711dab9e4f3b1 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -476,6 +476,21 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string, return value, ErrWrongValueForVar.GenWithStackByArgs(name, value) } return value, nil + case TiDBOptCPUFactor, + TiDBOptCopCPUFactor, + TiDBOptNetworkFactor, + TiDBOptScanFactor, + TiDBOptDescScanFactor, + TiDBOptMemoryFactor, + TiDBOptConcurrencyFactor: + v, err := strconv.ParseFloat(value, 64) + if err != nil { + return value, ErrWrongTypeForVar.GenWithStackByArgs(name) + } + if v < 0 { + return value, ErrWrongValueForVar.GenWithStackByArgs(name, value) + } + return value, nil case TiDBProjectionConcurrency, TIDBMemQuotaQuery, TIDBMemQuotaHashJoin, diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index ed0183db5ad12..216104f6f360c 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -296,6 +296,62 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) { c.Assert(val, Equals, "0") c.Assert(v.CorrelationThreshold, Equals, float64(0)) + c.Assert(v.CPUFactor, Equals, 3.0) + err = SetSessionSystemVar(v, TiDBOptCPUFactor, types.NewStringDatum("5.0")) + c.Assert(err, IsNil) + val, err = GetSessionSystemVar(v, TiDBOptCPUFactor) + c.Assert(err, IsNil) + c.Assert(val, Equals, "5.0") + c.Assert(v.CPUFactor, Equals, 5.0) + + c.Assert(v.CopCPUFactor, Equals, 3.0) + err = SetSessionSystemVar(v, TiDBOptCopCPUFactor, types.NewStringDatum("5.0")) + c.Assert(err, IsNil) + val, err = GetSessionSystemVar(v, TiDBOptCopCPUFactor) + c.Assert(err, IsNil) + c.Assert(val, Equals, "5.0") + c.Assert(v.CopCPUFactor, Equals, 5.0) + + c.Assert(v.NetworkFactor, Equals, 1.0) + err = SetSessionSystemVar(v, TiDBOptNetworkFactor, types.NewStringDatum("3.0")) + c.Assert(err, IsNil) + val, err = GetSessionSystemVar(v, TiDBOptNetworkFactor) + c.Assert(err, IsNil) + c.Assert(val, Equals, "3.0") + c.Assert(v.NetworkFactor, Equals, 3.0) + + c.Assert(v.ScanFactor, Equals, 1.5) + err = SetSessionSystemVar(v, TiDBOptScanFactor, types.NewStringDatum("3.0")) + c.Assert(err, IsNil) + val, err = GetSessionSystemVar(v, TiDBOptScanFactor) + c.Assert(err, IsNil) + c.Assert(val, Equals, "3.0") + c.Assert(v.ScanFactor, Equals, 3.0) + + c.Assert(v.DescScanFactor, Equals, 3.0) + err = SetSessionSystemVar(v, TiDBOptDescScanFactor, types.NewStringDatum("5.0")) + c.Assert(err, IsNil) + val, err = GetSessionSystemVar(v, TiDBOptDescScanFactor) + c.Assert(err, IsNil) + c.Assert(val, Equals, "5.0") + c.Assert(v.DescScanFactor, Equals, 5.0) + + c.Assert(v.MemoryFactor, Equals, 0.001) + err = SetSessionSystemVar(v, TiDBOptMemoryFactor, types.NewStringDatum("1.0")) + c.Assert(err, IsNil) + val, err = GetSessionSystemVar(v, TiDBOptMemoryFactor) + c.Assert(err, IsNil) + c.Assert(val, Equals, "1.0") + c.Assert(v.MemoryFactor, Equals, 1.0) + + c.Assert(v.ConcurrencyFactor, Equals, 3.0) + err = SetSessionSystemVar(v, TiDBOptConcurrencyFactor, types.NewStringDatum("5.0")) + c.Assert(err, IsNil) + val, err = GetSessionSystemVar(v, TiDBOptConcurrencyFactor) + c.Assert(err, IsNil) + c.Assert(val, Equals, "5.0") + c.Assert(v.ConcurrencyFactor, Equals, 5.0) + SetSessionSystemVar(v, TiDBReplicaRead, types.NewStringDatum("follower")) val, err = GetSessionSystemVar(v, TiDBReplicaRead) c.Assert(err, IsNil) @@ -367,6 +423,20 @@ func (s *testVarsutilSuite) TestValidate(c *C) { {TiDBOptCorrelationExpFactor, "-10", true}, {TiDBOptCorrelationThreshold, "a", true}, {TiDBOptCorrelationThreshold, "-2", true}, + {TiDBOptCPUFactor, "a", true}, + {TiDBOptCPUFactor, "-2", true}, + {TiDBOptCopCPUFactor, "a", true}, + {TiDBOptCopCPUFactor, "-2", true}, + {TiDBOptNetworkFactor, "a", true}, + {TiDBOptNetworkFactor, "-2", true}, + {TiDBOptScanFactor, "a", true}, + {TiDBOptScanFactor, "-2", true}, + {TiDBOptDescScanFactor, "a", true}, + {TiDBOptDescScanFactor, "-2", true}, + {TiDBOptMemoryFactor, "a", true}, + {TiDBOptMemoryFactor, "-2", true}, + {TiDBOptConcurrencyFactor, "a", true}, + {TiDBOptConcurrencyFactor, "-2", true}, {TxnIsolation, "READ-UNCOMMITTED", true}, {TiDBInitChunkSize, "a", true}, {TiDBInitChunkSize, "-1", true},