Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: introduce new cost formula for MPPJoins #35643

Merged
merged 8 commits into from
Jun 22, 2022
28 changes: 19 additions & 9 deletions planner/core/plan_cost.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ func (p *PhysicalApply) GetPlanCost(taskType property.TaskType, costFlag uint64)
}

// GetCost computes cost of merge join operator itself.
func (p *PhysicalMergeJoin) GetCost(lCnt, rCnt float64) float64 {
func (p *PhysicalMergeJoin) GetCost(lCnt, rCnt float64, costFlag uint64) float64 {
outerCnt := lCnt
innerCnt := rCnt
innerKeys := p.RightJoinKeys
Expand Down Expand Up @@ -766,6 +766,9 @@ func (p *PhysicalMergeJoin) GetCost(lCnt, rCnt float64) float64 {
numPairs = 0
}
}
if hasCostFlag(costFlag, CostFlagUseTrueCardinality) {
numPairs = getOperatorActRows(p)
}
sessVars := p.ctx.GetSessionVars()
probeCost := numPairs * sessVars.GetCPUFactor()
// Cost of evaluating outer filters.
Expand Down Expand Up @@ -795,13 +798,13 @@ func (p *PhysicalMergeJoin) GetPlanCost(taskType property.TaskType, costFlag uin
}
p.planCost += childCost
}
p.planCost += p.GetCost(getCardinality(p.children[0], costFlag), getCardinality(p.children[1], costFlag))
p.planCost += p.GetCost(getCardinality(p.children[0], costFlag), getCardinality(p.children[1], costFlag), costFlag)
p.planCostInit = true
return p.planCost, nil
}

// GetCost computes cost of hash join operator itself.
func (p *PhysicalHashJoin) GetCost(lCnt, rCnt float64) float64 {
func (p *PhysicalHashJoin) GetCost(lCnt, rCnt float64, isMPP bool, costFlag uint64) float64 {
buildCnt, probeCnt := lCnt, rCnt
build := p.children[0]
// Taking the right as the inner for right join or using the outer to build a hash table.
Expand All @@ -815,7 +818,11 @@ func (p *PhysicalHashJoin) GetCost(lCnt, rCnt float64) float64 {
rowSize := getAvgRowSize(build.statsInfo(), build.Schema())
spill := oomUseTmpStorage && memQuota > 0 && rowSize*buildCnt > float64(memQuota) && p.storeTp != kv.TiFlash
// Cost of building hash table.
cpuCost := buildCnt * sessVars.GetCPUFactor()
cpuFactor := sessVars.GetCPUFactor()
if isMPP && p.ctx.GetSessionVars().CostModelVersion == modelVer2 {
cpuFactor = sessVars.GetTiFlashCPUFactor() // use the dedicated TiFlash CPU Factor on modelVer2
}
cpuCost := buildCnt * cpuFactor
memoryCost := buildCnt * sessVars.GetMemoryFactor()
diskCost := buildCnt * sessVars.GetDiskFactor() * rowSize
// Number of matched row pairs regarding the equal join conditions.
Expand Down Expand Up @@ -845,16 +852,19 @@ func (p *PhysicalHashJoin) GetCost(lCnt, rCnt float64) float64 {
numPairs = 0
}
}
if hasCostFlag(costFlag, CostFlagUseTrueCardinality) {
numPairs = getOperatorActRows(p)
}
// Cost of querying hash table is cheap actually, so we just compute the cost of
// evaluating `OtherConditions` and joining row pairs.
probeCost := numPairs * sessVars.GetCPUFactor()
probeCost := numPairs * cpuFactor
probeDiskCost := numPairs * sessVars.GetDiskFactor() * rowSize
// Cost of evaluating outer filter.
if len(p.LeftConditions)+len(p.RightConditions) > 0 {
// Input outer count for the above compution should be adjusted by SelectionFactor.
probeCost *= SelectionFactor
probeDiskCost *= SelectionFactor
probeCost += probeCnt * sessVars.GetCPUFactor()
probeCost += probeCnt * cpuFactor
}
diskCost += probeDiskCost
probeCost /= float64(p.Concurrency)
Expand All @@ -864,9 +874,9 @@ func (p *PhysicalHashJoin) GetCost(lCnt, rCnt float64) float64 {
if p.UseOuterToBuild {
if spill {
// It runs in sequence when build data is on disk. See handleUnmatchedRowsFromHashTableInDisk
cpuCost += buildCnt * sessVars.GetCPUFactor()
cpuCost += buildCnt * cpuFactor
} else {
cpuCost += buildCnt * sessVars.GetCPUFactor() / float64(p.Concurrency)
cpuCost += buildCnt * cpuFactor / float64(p.Concurrency)
}
diskCost += buildCnt * sessVars.GetDiskFactor() * rowSize
}
Expand All @@ -892,7 +902,7 @@ func (p *PhysicalHashJoin) GetPlanCost(taskType property.TaskType, costFlag uint
}
p.planCost += childCost
}
p.planCost += p.GetCost(getCardinality(p.children[0], costFlag), getCardinality(p.children[1], costFlag))
p.planCost += p.GetCost(getCardinality(p.children[0], costFlag), getCardinality(p.children[1], costFlag), taskType == property.MppTaskType, costFlag)
p.planCostInit = true
return p.planCost, nil
}
Expand Down
8 changes: 4 additions & 4 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (p *PhysicalHashJoin) attach2Task(tasks ...task) task {
p.SetChildren(lTask.plan(), rTask.plan())
task := &rootTask{
p: p,
cst: lTask.cost() + rTask.cost() + p.GetCost(lTask.count(), rTask.count()),
cst: lTask.cost() + rTask.cost() + p.GetCost(lTask.count(), rTask.count(), false, 0),
}
p.cost = task.cost()
return task
Expand Down Expand Up @@ -547,7 +547,7 @@ func (p *PhysicalHashJoin) attach2TaskForMpp(tasks ...task) task {
outerTask = rTask
}
task := &mppTask{
cst: lCost + rCost + p.GetCost(lTask.count(), rTask.count()),
cst: lCost + rCost + p.GetCost(lTask.count(), rTask.count(), false, 0),
p: p,
partTp: outerTask.partTp,
hashCols: outerTask.hashCols,
Expand Down Expand Up @@ -578,7 +578,7 @@ func (p *PhysicalHashJoin) attach2TaskForTiFlash(tasks ...task) task {
tblColHists: rTask.tblColHists,
indexPlanFinished: true,
tablePlan: p,
cst: lCost + rCost + p.GetCost(lTask.count(), rTask.count()),
cst: lCost + rCost + p.GetCost(lTask.count(), rTask.count(), false, 0),
}
p.cost = task.cst
return task
Expand All @@ -590,7 +590,7 @@ func (p *PhysicalMergeJoin) attach2Task(tasks ...task) task {
p.SetChildren(lTask.plan(), rTask.plan())
t := &rootTask{
p: p,
cst: lTask.cost() + rTask.cost() + p.GetCost(lTask.count(), rTask.count()),
cst: lTask.cost() + rTask.cost() + p.GetCost(lTask.count(), rTask.count(), 0),
}
p.cost = t.cost()
return t
Expand Down
4 changes: 2 additions & 2 deletions planner/implementation/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (impl *HashJoinImpl) CalcCost(outCount float64, children ...memo.Implementa
hashJoin := impl.plan.(*plannercore.PhysicalHashJoin)
// The children here are only used to calculate the cost.
hashJoin.SetChildren(children[0].GetPlan(), children[1].GetPlan())
selfCost := hashJoin.GetCost(children[0].GetPlan().StatsCount(), children[1].GetPlan().StatsCount())
selfCost := hashJoin.GetCost(children[0].GetPlan().StatsCount(), children[1].GetPlan().StatsCount(), false, 0)
impl.cost = selfCost + children[0].GetCost() + children[1].GetCost()
return impl.cost
}
Expand All @@ -56,7 +56,7 @@ func (impl *MergeJoinImpl) CalcCost(outCount float64, children ...memo.Implement
mergeJoin := impl.plan.(*plannercore.PhysicalMergeJoin)
// The children here are only used to calculate the cost.
mergeJoin.SetChildren(children[0].GetPlan(), children[1].GetPlan())
selfCost := mergeJoin.GetCost(children[0].GetPlan().StatsCount(), children[1].GetPlan().StatsCount())
selfCost := mergeJoin.GetCost(children[0].GetPlan().StatsCount(), children[1].GetPlan().StatsCount(), 0)
impl.cost = selfCost + children[0].GetCost() + children[1].GetCost()
return impl.cost
}
Expand Down