Skip to content

Commit

Permalink
Devices: CPU pinning fix (allow explicit pinning on isolated CPUs for…
Browse files Browse the repository at this point in the history
… VM instances) (canonical#14817)

Fixes: canonical#14709
  • Loading branch information
tomponline authored Jan 22, 2025
2 parents 4741e27 + daa8e1a commit 5b47fd7
Showing 1 changed file with 168 additions and 104 deletions.
272 changes: 168 additions & 104 deletions lxd/devices.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import (
)

type deviceTaskCPU struct {
id int64
strID string
count *int
id int64
strID string
count *int
isolated bool
}

type deviceTaskCPUs []deviceTaskCPU
Expand All @@ -36,6 +37,19 @@ func (c deviceTaskCPUs) Len() int { return len(c) }
func (c deviceTaskCPUs) Less(i, j int) bool { return *c[i].count < *c[j].count }
func (c deviceTaskCPUs) Swap(i, j int) { c[i], c[j] = c[j], c[i] }

type instanceCPUPinningMap map[instance.Instance][]string

func (pinning instanceCPUPinningMap) add(inst instance.Instance, cpu deviceTaskCPU) {
id := cpu.strID
_, ok := pinning[inst]
if ok {
pinning[inst] = append(pinning[inst], id)
} else {
pinning[inst] = []string{id}
}
*cpu.count += 1
}

func deviceNetlinkListener() (chan []string, chan []string, chan device.USBEvent, chan device.UnixHotplugEvent, error) {
NETLINK_KOBJECT_UEVENT := 15 //nolint:revive
UEVENT_BUFFER_SIZE := 2048 //nolint:revive
Expand Down Expand Up @@ -298,7 +312,7 @@ func deviceNetlinkListener() (chan []string, chan []string, chan device.USBEvent
* The `loadBalancing` flag indicates whether the CPU pinning should be load balanced or not (e.g, NUMA placement when `limits.cpu` is a single number which means
* a required number of vCPUs per instance that can be chosen within a CPU pool).
*/
func fillFixedInstances(fixedInstances map[int64][]instance.Instance, inst instance.Instance, effectiveCpus []int64, targetCPUPool []int64, targetCPUNum int, loadBalancing bool) {
func fillFixedInstances(fixedInstances map[int64][]instance.Instance, inst instance.Instance, usableCpus []int64, targetCPUPool []int64, targetCPUNum int, loadBalancing bool) {
if len(targetCPUPool) < targetCPUNum {
diffCount := len(targetCPUPool) - targetCPUNum
logger.Warn("Insufficient CPUs in the target pool for required pinning: reducing required CPUs to match available CPUs", logger.Ctx{"available": len(targetCPUPool), "required": targetCPUNum, "difference": -diffCount})
Expand All @@ -308,7 +322,8 @@ func fillFixedInstances(fixedInstances map[int64][]instance.Instance, inst insta
// If the `targetCPUPool` has been manually specified (explicit CPU IDs/ranges specified with `limits.cpu`)
if len(targetCPUPool) == targetCPUNum && !loadBalancing {
for _, nr := range targetCPUPool {
if !shared.ValueInSlice(nr, effectiveCpus) {
if !shared.ValueInSlice(nr, usableCpus) {
logger.Warn("Instance using unavailable cpu", logger.Ctx{"project": inst.Project().Name, "instance": inst.Name(), "cpu": nr})
continue
}

Expand Down Expand Up @@ -366,6 +381,118 @@ func fillFixedInstances(fixedInstances map[int64][]instance.Instance, inst insta
}
}

// getCPULists returns lists of CPUs:
// cpus: list of CPUs which are available for instances to be automatically pinned on,
// isolCPUs: list of CPUs listed in "isolcpus" kernel parameter (not available for automatic pinning).
func getCPULists() (cpus []int64, isolCPUs []int64, err error) {
// Get effective cpus list - those are all guaranteed to be online
cg, err := cgroup.NewFileReadWriter(1, true)
if err != nil {
logger.Error("Unable to load cgroup writer", logger.Ctx{"err": err})
return nil, nil, err
}

effectiveCPUs, err := cg.GetEffectiveCpuset()
if err != nil {
// Older kernel - use cpuset.cpus
effectiveCPUs, err = cg.GetCpuset()
if err != nil {
logger.Error("Error reading host's cpuset.cpus", logger.Ctx{"err": err, "cpuset.cpus": effectiveCPUs})
return nil, nil, err
}
}

effectiveCPUsInt, err := resources.ParseCpuset(effectiveCPUs)
if err != nil {
logger.Error("Error parsing effective CPU set", logger.Ctx{"err": err, "cpuset.cpus": effectiveCPUs})
return nil, nil, err
}

isolCPUs = resources.GetCPUIsolated()
effectiveCPUsSlice := []string{}
for _, id := range effectiveCPUsInt {
if shared.ValueInSlice(id, isolCPUs) {
continue
}

effectiveCPUsSlice = append(effectiveCPUsSlice, fmt.Sprintf("%d", id))
}

effectiveCPUs = strings.Join(effectiveCPUsSlice, ",")
cpus, err = resources.ParseCpuset(effectiveCPUs)
if err != nil {
logger.Error("Error parsing host's cpu set", logger.Ctx{"cpuset": effectiveCPUs, "err": err})
return nil, nil, err
}

return cpus, isolCPUs, nil
}

// getNumaNodeToCPUMap returns map of NUMA node to CPU threads IDs.
func getNumaNodeToCPUMap() (numaNodeToCPU map[int64][]int64, err error) {
// Get CPU topology.
cpusTopology, err := resources.GetCPU()
if err != nil {
logger.Error("Unable to load system CPUs information", logger.Ctx{"err": err})
return nil, err
}

// Build a map of NUMA node to CPU threads.
numaNodeToCPU = make(map[int64][]int64)
for _, cpu := range cpusTopology.Sockets {
for _, core := range cpu.Cores {
for _, thread := range core.Threads {
numaNodeToCPU[int64(thread.NUMANode)] = append(numaNodeToCPU[int64(thread.NUMANode)], thread.ID)
}
}
}

return numaNodeToCPU, err
}

func makeCPUUsageMap(cpus, isolCPUs []int64) map[int64]deviceTaskCPU {
usage := make(map[int64]deviceTaskCPU, len(cpus)+len(isolCPUs))

// Helper function to create deviceTaskCPU
createCPU := func(id int64, isolated bool) deviceTaskCPU {
count := 0
return deviceTaskCPU{
id: id,
strID: strconv.FormatInt(id, 10),
count: &count,
isolated: isolated,
}
}

// Process regular CPUs
for _, id := range cpus {
usage[id] = createCPU(id, false)
}

// Process isolated CPUs
for _, id := range isolCPUs {
usage[id] = createCPU(id, true)
}

return usage
}

func getNumaCPUs(numaNodeToCPU map[int64][]int64, cpuNodes string) ([]int64, error) {
var numaCpus []int64
if cpuNodes != "" {
numaNodeSet, err := resources.ParseNumaNodeSet(cpuNodes)
if err != nil {
return nil, err
}

for _, numaNode := range numaNodeSet {
numaCpus = append(numaCpus, numaNodeToCPU[numaNode]...)
}
}

return numaCpus, nil
}

// deviceTaskBalance is used to balance the CPU load across instances running on a host.
// It first checks if CGroup support is available and returns if it isn't.
// It then retrieves the effective CPU list (the CPUs that are guaranteed to be online) and isolates any isolated CPUs.
Expand All @@ -392,43 +519,8 @@ func deviceTaskBalance(s *state.State) {
return
}

// Get effective cpus list - those are all guaranteed to be online
cg, err := cgroup.NewFileReadWriter(1, true)
if err != nil {
logger.Error("Unable to load cgroup writer", logger.Ctx{"err": err})
return
}

effectiveCpus, err := cg.GetEffectiveCpuset()
if err != nil {
// Older kernel - use cpuset.cpus
effectiveCpus, err = cg.GetCpuset()
if err != nil {
logger.Error("Error reading host's cpuset.cpus", logger.Ctx{"err": err, "cpuset.cpus": effectiveCpus})
return
}
}

effectiveCpusInt, err := resources.ParseCpuset(effectiveCpus)
if err != nil {
logger.Error("Error parsing effective CPU set", logger.Ctx{"err": err, "cpuset.cpus": effectiveCpus})
return
}

isolatedCpusInt := resources.GetCPUIsolated()
effectiveCpusSlice := []string{}
for _, id := range effectiveCpusInt {
if shared.ValueInSlice(id, isolatedCpusInt) {
continue
}

effectiveCpusSlice = append(effectiveCpusSlice, fmt.Sprintf("%d", id))
}

effectiveCpus = strings.Join(effectiveCpusSlice, ",")
cpus, err := resources.ParseCpuset(effectiveCpus)
cpus, isolCPUs, err := getCPULists()
if err != nil {
logger.Error("Error parsing host's cpu set", logger.Ctx{"cpuset": effectiveCpus, "err": err})
return
}

Expand All @@ -439,39 +531,20 @@ func deviceTaskBalance(s *state.State) {
return
}

// Get CPU topology.
cpusTopology, err := resources.GetCPU()
numaNodeToCPU, err := getNumaNodeToCPUMap()
if err != nil {
logger.Error("Unable to load system CPUs information", logger.Ctx{"err": err})
return
}

// Build a map of NUMA node to CPU threads.
numaNodeToCPU := make(map[int64][]int64)
for _, cpu := range cpusTopology.Sockets {
for _, core := range cpu.Cores {
for _, thread := range core.Threads {
numaNodeToCPU[int64(thread.NUMANode)] = append(numaNodeToCPU[int64(thread.NUMANode)], thread.ID)
}
}
}

fixedInstances := map[int64][]instance.Instance{}
balancedInstances := map[instance.Instance]int{}
for _, c := range instances {
conf := c.ExpandedConfig()
cpuNodes := conf["limits.cpu.nodes"]
var numaCpus []int64
if cpuNodes != "" {
numaNodeSet, err := resources.ParseNumaNodeSet(cpuNodes)
if err != nil {
logger.Error("Error parsing numa node set", logger.Ctx{"numaNodes": cpuNodes, "err": err})
return
}

for _, numaNode := range numaNodeSet {
numaCpus = append(numaCpus, numaNodeToCPU[numaNode]...)
}
numaCpus, err := getNumaCPUs(numaNodeToCPU, cpuNodes)
if err != nil {
logger.Error("Error parsing numa node set", logger.Ctx{"numaNodes": cpuNodes, "err": err})
return
}

cpulimit, ok := conf["limits.cpu"]
Expand All @@ -481,7 +554,7 @@ func deviceTaskBalance(s *state.State) {
if c.Type() == instancetype.VM {
cpulimit = "1"
} else {
cpulimit = effectiveCpus
cpulimit = strconv.Itoa(len(cpus))
}
}

Expand Down Expand Up @@ -522,65 +595,56 @@ func deviceTaskBalance(s *state.State) {
logger.Warn("The pinned CPUs override the NUMA configuration CPUs", logger.Ctx{"pinnedCPUs": instanceCpus, "numaCPUs": numaCpus})
}

fillFixedInstances(fixedInstances, c, cpus, instanceCpus, len(instanceCpus), false)
usableCpus := cpus
//
// For VM instances, historically, we allow explicit pinning to a CPUs
// listed in "isolcpus" kernel parameter. While for containers it was
// never allowed and let's keep it like this.
//
if c.Type() == instancetype.VM {
usableCpus = append(usableCpus, isolCPUs...)
}

fillFixedInstances(fixedInstances, c, usableCpus, instanceCpus, len(instanceCpus), false)
}
}

// Balance things
pinning := map[instance.Instance][]string{}
usage := map[int64]deviceTaskCPU{}
pinning := instanceCPUPinningMap{}
usage := makeCPUUsageMap(cpus, isolCPUs)

for _, id := range cpus {
cpu := deviceTaskCPU{}
cpu.id = id
cpu.strID = fmt.Sprintf("%d", id)
count := 0
cpu.count = &count

usage[id] = cpu
}

for cpu, ctns := range fixedInstances {
// Handle instances with explicit CPU pinnings
for cpu, instances := range fixedInstances {
c, ok := usage[cpu]
if !ok {
logger.Error("Internal error: instance using unavailable cpu")
logger.Error("Internal error: instance using unavailable cpu", logger.Ctx{"cpu": cpu})
continue
}

id := c.strID
for _, ctn := range ctns {
_, ok := pinning[ctn]
if ok {
pinning[ctn] = append(pinning[ctn], id)
} else {
pinning[ctn] = []string{id}
}
*c.count += 1
for _, inst := range instances {
pinning.add(inst, c)
}
}

sortedUsage := make(deviceTaskCPUs, 0)
for _, value := range usage {
sortedUsage = append(sortedUsage, value)
for _, cpu := range usage {
if cpu.isolated {
continue
}

sortedUsage = append(sortedUsage, cpu)
}

for ctn, count := range balancedInstances {
// Handle instances where automatic CPU pinning is needed
for inst, cpusToPin := range balancedInstances {
sort.Sort(sortedUsage)
for _, cpu := range sortedUsage {
if count == 0 {
if cpusToPin == 0 {
break
}

count -= 1

id := cpu.strID
_, ok := pinning[ctn]
if ok {
pinning[ctn] = append(pinning[ctn], id)
} else {
pinning[ctn] = []string{id}
}
*cpu.count += 1
cpusToPin -= 1
pinning.add(inst, cpu)
}
}

Expand Down

0 comments on commit 5b47fd7

Please sign in to comment.