Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CPUManager policy option to distribute CPUs across NUMA nodes instead of packing them #105631

Merged
349 changes: 348 additions & 1 deletion pkg/kubelet/cm/cpumanager/cpu_assignment.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cpumanager

import (
"fmt"
"math"
"sort"

"k8s.io/klog/v2"
Expand All @@ -26,6 +27,63 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
)

type LoopControl int

const (
Continue LoopControl = iota
Break
)

type mapIntInt map[int]int
klueska marked this conversation as resolved.
Show resolved Hide resolved

func (m mapIntInt) Clone() mapIntInt {
cp := make(mapIntInt, len(m))
for k, v := range m {
cp[k] = v
}
return cp
}

func (m mapIntInt) Keys() []int {
keys := make([]int, len(m))
for k := range m {
keys = append(keys, k)
}
return keys
}

func (m mapIntInt) Values() []int {
values := make([]int, len(m))
for _, v := range m {
values = append(values, v)
}
return values
}

func mean(xs []int) float64 {
var sum float64
for _, x := range xs {
sum += float64(x)
}
return sum / float64(len(xs))
}

func standardDeviation(xs []int) float64 {
m := mean(xs)
var sum float64
for _, x := range xs {
sum += (float64(x) - m) * (float64(x) - m)
}
return math.Sqrt(sum / float64(len(xs)))
}

func min(x, y int) int {
if x < y {
return x
}
return y
}

type numaOrSocketsFirstFuncs interface {
takeFullFirstLevel()
takeFullSecondLevel()
Expand Down Expand Up @@ -306,6 +364,31 @@ func (a *cpuAccumulator) takeRemainingCPUs() {
}
}

func (a *cpuAccumulator) rangeNUMANodesNeededToSatisfy(cpuGroupSize int) (int, int) {
// Get the total number of NUMA nodes that have CPUs available on them.
numNUMANodesAvailable := a.details.NUMANodes().Size()

// Get the total number of CPUs available across all NUMA nodes.
numCPUsAvailable := a.details.CPUs().Size()

// Calculate the number of available 'cpuGroups' across all NUMA nodes as
// well as the number of 'cpuGroups' that need to be allocated (rounding up).
numCPUGroupsAvailable := (numCPUsAvailable-1)/cpuGroupSize + 1
numCPUGroupsNeeded := (a.numCPUsNeeded-1)/cpuGroupSize + 1

// Calculate the number of available 'cpuGroups' per NUMA Node (rounding up).
numCPUGroupsPerNUMANode := (numCPUGroupsAvailable-1)/numNUMANodesAvailable + 1

// Calculate the minimum number of numa nodes required to satisfy the
// allocation (rounding up).
minNUMAs := (numCPUGroupsNeeded-1)/numCPUGroupsPerNUMANode + 1

// Calculate the maximum number of numa nodes required to satisfy the allocation.
maxNUMAs := min(numCPUGroupsNeeded, numNUMANodesAvailable)

return minNUMAs, maxNUMAs
}

func (a *cpuAccumulator) needs(n int) bool {
return a.numCPUsNeeded >= n
}
Expand All @@ -318,7 +401,33 @@ func (a *cpuAccumulator) isFailed() bool {
return a.numCPUsNeeded > a.details.CPUs().Size()
}

func takeByTopology(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) {
// iterateCombinations walks through all n-choose-k subsets of size k in n and
// calls function 'f()' on each subset. For example, if n={0,1,2}, and k=2,
// then f() will be called on the subsets {0,1}, {0,2}. and {1,2}. If f() ever
// returns 'Break', we break early and exit the loop.
func (a *cpuAccumulator) iterateCombinations(n []int, k int, f func([]int) LoopControl) {
if k < 1 {
return
}

var helper func(n []int, k int, start int, accum []int, f func([]int) LoopControl) LoopControl
helper = func(n []int, k int, start int, accum []int, f func([]int) LoopControl) LoopControl {
if k == 0 {
return f(accum)
}
for i := start; i <= len(n)-k; i++ {
control := helper(n, k-1, i+1, append(accum, n[i]), f)
if control == Break {
return Break
}
}
return Continue
}

helper(n, k, 0, []int{}, f)
}

func takeByTopologyNUMAPacked(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) {
acc := newCPUAccumulator(topo, availableCPUs, numCPUs)
if acc.isSatisfied() {
return acc.result, nil
Expand Down Expand Up @@ -358,3 +467,241 @@ func takeByTopology(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, num

return cpuset.NewCPUSet(), fmt.Errorf("failed to allocate cpus")
}

// takeByTopologyNUMADistributed returns a CPUSet of size 'numCPUs'.
//
// It generates this CPUset by allocating CPUs from 'availableCPUs' according
// to the algorithm outlined in KEP-2902:
//
// https://github.com/kubernetes/enhancements/tree/e7f51ffbe2ee398ffd1fba4a6d854f276bfad9fb/keps/sig-node/2902-cpumanager-distribute-cpus-policy-option
//
// This algorithm evenly distribute CPUs across NUMA nodes in cases where more
// than one NUMA node is required to satisfy the allocation. This is in
// contrast to the takeByTopologyNUMAPacked algorithm, which attempts to 'pack'
// CPUs onto NUMA nodes and fill them up before moving on to the next one.
//
// At a high-level this algorithm can be summarized as:
//
// For each NUMA single node:
// * If all requested CPUs can be allocated from this NUMA node;
// --> Do the allocation by running takeByTopologyNUMAPacked() over the
// available CPUs in that NUMA node and return
//
// Otherwise, for each pair of NUMA nodes:
// * If the set of requested CPUs (modulo 2) can be evenly split across
// the 2 NUMA nodes; AND
// * Any remaining CPUs (after the modulo operation) can be striped across
// some subset of the NUMA nodes;
// --> Do the allocation by running takeByTopologyNUMAPacked() over the
// available CPUs in both NUMA nodes and return
//
// Otherwise, for each 3-tuple of NUMA nodes:
// * If the set of requested CPUs (modulo 3) can be evenly distributed
// across the 3 NUMA nodes; AND
// * Any remaining CPUs (after the modulo operation) can be striped across
// some subset of the NUMA nodes;
// --> Do the allocation by running takeByTopologyNUMAPacked() over the
// available CPUs in all three NUMA nodes and return
//
// ...
//
// Otherwise, for the set of all NUMA nodes:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The algorithm looks very clever but (didn't do the math myself yet) I wonder how it scales and what's the big-O complexity. It seems multiple numa zones on the same socket are becoming a reality, and considering the trend in CPU design it seems 4-8 NUMA zones (x2 sockets) is not too unlikely in the future.

Copy link
Contributor Author

@klueska klueska Oct 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will definitely be a state explosion for large numbers of NUMA node. There are some optimizations we can (and should do) to make sure this doesn't happen. I will put something together soon.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A ballpark estimate of complexity and upper bound of NUMA nodes on which the algorithm is expected to perform (I'm thinking to the 8-node limits for topology manager) could be a good starting point - and probably good enough for this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't do the calculation yet, but I did write the code for the optimizations. PTAL.

// * If the set of requested CPUs (modulo NUM_NUMA_NODES) can be evenly
// distributed across all NUMA nodes; AND
// * Any remaining CPUs (after the modulo operation) can be striped across
// some subset of the NUMA nodes;
// --> Do the allocation by running takeByTopologyNUMAPacked() over the
// available CPUs in all NUMA nodes and return
//
// If none of the above conditions can be met, then resort back to a
// best-effort fit of packing CPUs into NUMA nodes by calling
// takeByTopologyNUMAPacked() over all available CPUs.
//
// NOTE: A "balance score" will be calculated to help find the best subset of
// NUMA nodes to allocate any 'remainder' CPUs from (in cases where the total
// number of CPUs to allocate cannot be evenly distributed across the chosen
// set of NUMA nodes). This "balance score" is calculated as the standard
// deviation of how many CPUs will be available on each NUMA node after all
// evenly distributed and remainder CPUs are allocated. The subset with the
// lowest "balance score" will receive the CPUs in order to keep the overall
// allocation of CPUs as "balanced" as possible.
//
// NOTE: This algorithm has been generalized to take an additional
// 'cpuGroupSize' parameter to ensure that CPUs are always allocated in groups
// of size 'cpuGroupSize' according to the algorithm described above. This is
// important, for example, to ensure that all CPUs (i.e. all hyperthreads) from
// a single core are allocated together.
func takeByTopologyNUMADistributed(topo *topology.CPUTopology, availableCPUs cpuset.CPUSet, numCPUs int, cpuGroupSize int) (cpuset.CPUSet, error) {
acc := newCPUAccumulator(topo, availableCPUs, numCPUs)
if acc.isSatisfied() {
return acc.result, nil
}
if acc.isFailed() {
return cpuset.NewCPUSet(), fmt.Errorf("not enough cpus available to satisfy request")
}

// Get the list of NUMA nodes represented by the set of CPUs in 'availableCPUs'.
numas := acc.sortAvailableNUMANodes()

// Calculate the minimum and maximum possible number of NUMA nodes that
// could satisfy this request. This is used to optimize how many iterations
// of the loop we need to go through below.
minNUMAs, maxNUMAs := acc.rangeNUMANodesNeededToSatisfy(cpuGroupSize)

// Try combinations of 1,2,3,... NUMA nodes until we find a combination
// where we can evenly distribute CPUs across them. To optimize things, we
// don't always start at 1 and end at len(numas). Instead, we use the
// values of 'minNUMAs' and 'maxNUMAs' calculated above.
for k := minNUMAs; k <= maxNUMAs; k++ {
// Iterate through the various n-choose-k NUMA node combinations,
// looking for the combination of NUMA nodes that can best have CPUs
// distributed across them.
var bestBalance float64 = math.MaxFloat64
var bestRemainder []int = nil
var bestCombo []int = nil
acc.iterateCombinations(numas, k, func(combo []int) LoopControl {
// If we've already found a combo with a balance of 0 in a
// different iteration, then don't bother checking any others.
if bestBalance == 0 {
return Break
}

// Check that this combination of NUMA nodes has enough CPUs to
// satisfy the allocation overall.
cpus := acc.details.CPUsInNUMANodes(combo...)
if cpus.Size() < numCPUs {
return Continue
}

// Check that CPUs can be handed out in groups of size
// 'cpuGroupSize' across the NUMA nodes in this combo.
numCPUGroups := 0
for _, numa := range combo {
numCPUGroups += (acc.details.CPUsInNUMANodes(numa).Size() / cpuGroupSize)
}
if (numCPUGroups * cpuGroupSize) < numCPUs {
return Continue
}

// Check that each NUMA node in this combination can allocate an
// even distribution of CPUs in groups of size 'cpuGroupSize',
// modulo some remainder.
distribution := (numCPUs / len(combo) / cpuGroupSize) * cpuGroupSize
for _, numa := range combo {
cpus := acc.details.CPUsInNUMANodes(numa)
if cpus.Size() < distribution {
return Continue
}
}

// Calculate how many CPUs will be available on each NUMA node in
// 'combo' after allocating an even distribution of CPU groups of
// size 'cpuGroupSize' from them. This will be used in the "balance
// score" calculation to help decide if this combo should
// ultimately be chosen.
availableAfterAllocation := make(mapIntInt, len(combo))
for _, numa := range combo {
availableAfterAllocation[numa] = acc.details.CPUsInNUMANodes(numa).Size() - distribution
}

// Check if there are any remaining CPUs to distribute across the
// NUMA nodes once CPUs have been evenly distributed in groups of
// size 'cpuGroupSize'.
remainder := numCPUs - (distribution * len(combo))

// Declare a set of local variables to help track the "balance
// scores" calculated when using different subsets of 'combo' to
// allocate remainder CPUs from.
var bestLocalBalance float64 = math.MaxFloat64
var bestLocalRemainder []int = nil

// If there aren't any remainder CPUs to allocate, then calculate
// the "balance score" of this combo as the standard deviation of
// the values contained in 'availableAfterAllocation'.
if remainder == 0 {
bestLocalBalance = standardDeviation(availableAfterAllocation.Values())
bestLocalRemainder = nil
}

// Otherwise, find the best "balance score" when allocating the
// remainder CPUs across different subsets of NUMA nodes in 'combo'.
// These remainder CPUs are handed out in groups of size 'cpuGroupSize'.
acc.iterateCombinations(combo, remainder/cpuGroupSize, func(subset []int) LoopControl {
// Make a local copy of 'availableAfterAllocation'.
availableAfterAllocation := availableAfterAllocation.Clone()

// For all NUMA nodes in 'subset', remove another
// 'cpuGroupSize' number of CPUs (to account for any remainder
// CPUs that will be allocated on them).
for _, numa := range subset {
availableAfterAllocation[numa] -= cpuGroupSize
}

// Calculate the "balance score" as the standard deviation of
// the number of CPUs available on all NUMA nodes in 'combo'
// after the remainder CPUs have been allocated across 'subset'
// in groups of size 'cpuGroupSize'.
balance := standardDeviation(availableAfterAllocation.Values())
if balance < bestLocalBalance {
bestLocalBalance = balance
bestLocalRemainder = subset
}

return Continue
})

// If the best "balance score" for this combo is less than the
// lowest "balance score" of all previous combos, then update this
// combo (and remainder set) to be the best one found so far.
if bestLocalBalance < bestBalance {
bestBalance = bestLocalBalance
bestRemainder = bestLocalRemainder
bestCombo = combo
}

return Continue
})

// If we made it through all of the iterations above without finding a
// combination of NUMA nodes that can properly balance CPU allocations,
// then move on to the next larger set of NUMA node combinations.
if bestCombo == nil {
continue
}

// Otherwise, start allocating CPUs from the NUMA node combination
// chosen. First allocate an even distribution of CPUs in groups of
// size 'cpuGroupSize' from 'bestCombo'.
distribution := (numCPUs / len(bestCombo) / cpuGroupSize) * cpuGroupSize
for _, numa := range bestCombo {
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), distribution)
acc.take(cpus)
}

// Then allocate any remaining CPUs in groups of size 'cpuGroupSize'
// from each NUMA node in the remainder set.
for _, numa := range bestRemainder {
cpus, _ := takeByTopologyNUMAPacked(acc.topo, acc.details.CPUsInNUMANodes(numa), cpuGroupSize)
acc.take(cpus)
}

// If we haven't allocated all of our CPUs at this point, then something
// went wrong in our accounting and we should error out.
if acc.numCPUsNeeded > 0 {
return cpuset.NewCPUSet(), fmt.Errorf("accounting error, not enough CPUs allocated, remaining: %v", acc.numCPUsNeeded)
}

// Likewise, if we have allocated too many CPUs at this point, then something
// went wrong in our accounting and we should error out.
if acc.numCPUsNeeded < 0 {
return cpuset.NewCPUSet(), fmt.Errorf("accounting error, too many CPUs allocated, remaining: %v", acc.numCPUsNeeded)
}

// Otherwise, return the result
return acc.result, nil
}

// If we never found a combination of NUMA nodes that we could properly
// distribute CPUs across, fall back to the packing algorithm.
return takeByTopologyNUMAPacked(topo, availableCPUs, numCPUs)
}
Loading