Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Devices: CPU pinning fix (allow explicit pinning on isolated CPUs for VM instances) #14817

Merged
merged 4 commits into from
Jan 22, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
272 changes: 168 additions & 104 deletions lxd/devices.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import (
)

type deviceTaskCPU struct {
id int64
strID string
count *int
id int64
strID string
count *int
isolated bool
}

type deviceTaskCPUs []deviceTaskCPU
Expand All @@ -36,6 +37,19 @@ func (c deviceTaskCPUs) Len() int { return len(c) }
func (c deviceTaskCPUs) Less(i, j int) bool { return *c[i].count < *c[j].count }
func (c deviceTaskCPUs) Swap(i, j int) { c[i], c[j] = c[j], c[i] }

type instanceCPUPinningMap map[instance.Instance][]string

func (pinning instanceCPUPinningMap) add(inst instance.Instance, cpu deviceTaskCPU) {
id := cpu.strID
_, ok := pinning[inst]
if ok {
pinning[inst] = append(pinning[inst], id)
} else {
pinning[inst] = []string{id}
}
*cpu.count += 1
}

func deviceNetlinkListener() (chan []string, chan []string, chan device.USBEvent, chan device.UnixHotplugEvent, error) {
NETLINK_KOBJECT_UEVENT := 15 //nolint:revive
UEVENT_BUFFER_SIZE := 2048 //nolint:revive
Expand Down Expand Up @@ -298,7 +312,7 @@ func deviceNetlinkListener() (chan []string, chan []string, chan device.USBEvent
* The `loadBalancing` flag indicates whether the CPU pinning should be load balanced or not (e.g, NUMA placement when `limits.cpu` is a single number which means
* a required number of vCPUs per instance that can be chosen within a CPU pool).
*/
func fillFixedInstances(fixedInstances map[int64][]instance.Instance, inst instance.Instance, effectiveCpus []int64, targetCPUPool []int64, targetCPUNum int, loadBalancing bool) {
func fillFixedInstances(fixedInstances map[int64][]instance.Instance, inst instance.Instance, usableCpus []int64, targetCPUPool []int64, targetCPUNum int, loadBalancing bool) {
if len(targetCPUPool) < targetCPUNum {
diffCount := len(targetCPUPool) - targetCPUNum
logger.Warn("Insufficient CPUs in the target pool for required pinning: reducing required CPUs to match available CPUs", logger.Ctx{"available": len(targetCPUPool), "required": targetCPUNum, "difference": -diffCount})
Expand All @@ -308,7 +322,8 @@ func fillFixedInstances(fixedInstances map[int64][]instance.Instance, inst insta
// If the `targetCPUPool` has been manually specified (explicit CPU IDs/ranges specified with `limits.cpu`)
if len(targetCPUPool) == targetCPUNum && !loadBalancing {
for _, nr := range targetCPUPool {
if !shared.ValueInSlice(nr, effectiveCpus) {
if !shared.ValueInSlice(nr, usableCpus) {
logger.Warn("Instance using unavailable cpu", logger.Ctx{"project": inst.Project().Name, "instance": inst.Name(), "cpu": nr})
continue
}

Expand Down Expand Up @@ -366,6 +381,118 @@ func fillFixedInstances(fixedInstances map[int64][]instance.Instance, inst insta
}
}

// getCPULists returns lists of CPUs:
// cpus: list of CPUs which are available for instances to be automatically pinned on,
// isolCPUs: list of CPUs listed in "isolcpus" kernel parameter (not available for automatic pinning).
func getCPULists() (cpus []int64, isolCPUs []int64, err error) {
// Get effective cpus list - those are all guaranteed to be online
cg, err := cgroup.NewFileReadWriter(1, true)
if err != nil {
logger.Error("Unable to load cgroup writer", logger.Ctx{"err": err})
return nil, nil, err
}

effectiveCPUs, err := cg.GetEffectiveCpuset()
if err != nil {
// Older kernel - use cpuset.cpus
effectiveCPUs, err = cg.GetCpuset()
if err != nil {
logger.Error("Error reading host's cpuset.cpus", logger.Ctx{"err": err, "cpuset.cpus": effectiveCPUs})
return nil, nil, err
}
}

effectiveCPUsInt, err := resources.ParseCpuset(effectiveCPUs)
if err != nil {
logger.Error("Error parsing effective CPU set", logger.Ctx{"err": err, "cpuset.cpus": effectiveCPUs})
return nil, nil, err
}

isolCPUs = resources.GetCPUIsolated()
effectiveCPUsSlice := []string{}
for _, id := range effectiveCPUsInt {
if shared.ValueInSlice(id, isolCPUs) {
continue
}

effectiveCPUsSlice = append(effectiveCPUsSlice, fmt.Sprintf("%d", id))
}

effectiveCPUs = strings.Join(effectiveCPUsSlice, ",")
cpus, err = resources.ParseCpuset(effectiveCPUs)
if err != nil {
logger.Error("Error parsing host's cpu set", logger.Ctx{"cpuset": effectiveCPUs, "err": err})
return nil, nil, err
}

return cpus, isolCPUs, nil
}

// getNumaNodeToCPUMap returns map of NUMA node to CPU threads IDs.
func getNumaNodeToCPUMap() (numaNodeToCPU map[int64][]int64, err error) {
mihalicyn marked this conversation as resolved.
Show resolved Hide resolved
// Get CPU topology.
cpusTopology, err := resources.GetCPU()
if err != nil {
logger.Error("Unable to load system CPUs information", logger.Ctx{"err": err})
return nil, err
}

// Build a map of NUMA node to CPU threads.
numaNodeToCPU = make(map[int64][]int64)
for _, cpu := range cpusTopology.Sockets {
for _, core := range cpu.Cores {
for _, thread := range core.Threads {
numaNodeToCPU[int64(thread.NUMANode)] = append(numaNodeToCPU[int64(thread.NUMANode)], thread.ID)
}
}
}

return numaNodeToCPU, err
}

func makeCPUUsageMap(cpus, isolCPUs []int64) map[int64]deviceTaskCPU {
usage := make(map[int64]deviceTaskCPU, len(cpus)+len(isolCPUs))

// Helper function to create deviceTaskCPU
createCPU := func(id int64, isolated bool) deviceTaskCPU {
count := 0
return deviceTaskCPU{
id: id,
strID: strconv.FormatInt(id, 10),
count: &count,
isolated: isolated,
}
}

// Process regular CPUs
for _, id := range cpus {
usage[id] = createCPU(id, false)
}

// Process isolated CPUs
for _, id := range isolCPUs {
usage[id] = createCPU(id, true)
}

return usage
}

func getNumaCPUs(numaNodeToCPU map[int64][]int64, cpuNodes string) ([]int64, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing doc block on this function.

As it would be useful to the caller and reviewer to understand what is in the returned values and what format the cpuNodes string is in.

var numaCpus []int64
if cpuNodes != "" {
numaNodeSet, err := resources.ParseNumaNodeSet(cpuNodes)
if err != nil {
return nil, err
}

for _, numaNode := range numaNodeSet {
numaCpus = append(numaCpus, numaNodeToCPU[numaNode]...)
}
}

return numaCpus, nil
}

// deviceTaskBalance is used to balance the CPU load across instances running on a host.
// It first checks if CGroup support is available and returns if it isn't.
// It then retrieves the effective CPU list (the CPUs that are guaranteed to be online) and isolates any isolated CPUs.
Expand All @@ -392,43 +519,8 @@ func deviceTaskBalance(s *state.State) {
return
}

// Get effective cpus list - those are all guaranteed to be online
cg, err := cgroup.NewFileReadWriter(1, true)
if err != nil {
logger.Error("Unable to load cgroup writer", logger.Ctx{"err": err})
return
}

effectiveCpus, err := cg.GetEffectiveCpuset()
if err != nil {
// Older kernel - use cpuset.cpus
effectiveCpus, err = cg.GetCpuset()
if err != nil {
logger.Error("Error reading host's cpuset.cpus", logger.Ctx{"err": err, "cpuset.cpus": effectiveCpus})
return
}
}

effectiveCpusInt, err := resources.ParseCpuset(effectiveCpus)
if err != nil {
logger.Error("Error parsing effective CPU set", logger.Ctx{"err": err, "cpuset.cpus": effectiveCpus})
return
}

isolatedCpusInt := resources.GetCPUIsolated()
effectiveCpusSlice := []string{}
for _, id := range effectiveCpusInt {
if shared.ValueInSlice(id, isolatedCpusInt) {
continue
}

effectiveCpusSlice = append(effectiveCpusSlice, fmt.Sprintf("%d", id))
}

effectiveCpus = strings.Join(effectiveCpusSlice, ",")
cpus, err := resources.ParseCpuset(effectiveCpus)
cpus, isolCPUs, err := getCPULists()
if err != nil {
logger.Error("Error parsing host's cpu set", logger.Ctx{"cpuset": effectiveCpus, "err": err})
return
}

Expand All @@ -439,39 +531,20 @@ func deviceTaskBalance(s *state.State) {
return
}

// Get CPU topology.
cpusTopology, err := resources.GetCPU()
numaNodeToCPU, err := getNumaNodeToCPUMap()
if err != nil {
logger.Error("Unable to load system CPUs information", logger.Ctx{"err": err})
return
}

// Build a map of NUMA node to CPU threads.
numaNodeToCPU := make(map[int64][]int64)
for _, cpu := range cpusTopology.Sockets {
for _, core := range cpu.Cores {
for _, thread := range core.Threads {
numaNodeToCPU[int64(thread.NUMANode)] = append(numaNodeToCPU[int64(thread.NUMANode)], thread.ID)
}
}
}

fixedInstances := map[int64][]instance.Instance{}
balancedInstances := map[instance.Instance]int{}
for _, c := range instances {
conf := c.ExpandedConfig()
cpuNodes := conf["limits.cpu.nodes"]
var numaCpus []int64
if cpuNodes != "" {
numaNodeSet, err := resources.ParseNumaNodeSet(cpuNodes)
if err != nil {
logger.Error("Error parsing numa node set", logger.Ctx{"numaNodes": cpuNodes, "err": err})
return
}

for _, numaNode := range numaNodeSet {
numaCpus = append(numaCpus, numaNodeToCPU[numaNode]...)
}
numaCpus, err := getNumaCPUs(numaNodeToCPU, cpuNodes)
if err != nil {
logger.Error("Error parsing numa node set", logger.Ctx{"numaNodes": cpuNodes, "err": err})
return
}

cpulimit, ok := conf["limits.cpu"]
Expand All @@ -481,7 +554,7 @@ func deviceTaskBalance(s *state.State) {
if c.Type() == instancetype.VM {
cpulimit = "1"
} else {
cpulimit = effectiveCpus
cpulimit = strconv.Itoa(len(cpus))
}
}

Expand Down Expand Up @@ -522,65 +595,56 @@ func deviceTaskBalance(s *state.State) {
logger.Warn("The pinned CPUs override the NUMA configuration CPUs", logger.Ctx{"pinnedCPUs": instanceCpus, "numaCPUs": numaCpus})
}

fillFixedInstances(fixedInstances, c, cpus, instanceCpus, len(instanceCpus), false)
usableCpus := cpus
//
// For VM instances, historically, we allow explicit pinning to a CPUs
mihalicyn marked this conversation as resolved.
Show resolved Hide resolved
// listed in "isolcpus" kernel parameter. While for containers it was
// never allowed and let's keep it like this.
//
if c.Type() == instancetype.VM {
usableCpus = append(usableCpus, isolCPUs...)
}

fillFixedInstances(fixedInstances, c, usableCpus, instanceCpus, len(instanceCpus), false)
}
}

// Balance things
pinning := map[instance.Instance][]string{}
usage := map[int64]deviceTaskCPU{}
pinning := instanceCPUPinningMap{}
usage := makeCPUUsageMap(cpus, isolCPUs)

for _, id := range cpus {
cpu := deviceTaskCPU{}
cpu.id = id
cpu.strID = fmt.Sprintf("%d", id)
count := 0
cpu.count = &count

usage[id] = cpu
}

for cpu, ctns := range fixedInstances {
// Handle instances with explicit CPU pinnings
for cpu, instances := range fixedInstances {
c, ok := usage[cpu]
if !ok {
logger.Error("Internal error: instance using unavailable cpu")
logger.Error("Internal error: instance using unavailable cpu", logger.Ctx{"cpu": cpu})
tomponline marked this conversation as resolved.
Show resolved Hide resolved
continue
}

id := c.strID
for _, ctn := range ctns {
_, ok := pinning[ctn]
if ok {
pinning[ctn] = append(pinning[ctn], id)
} else {
pinning[ctn] = []string{id}
}
*c.count += 1
for _, inst := range instances {
pinning.add(inst, c)
}
}

sortedUsage := make(deviceTaskCPUs, 0)
for _, value := range usage {
sortedUsage = append(sortedUsage, value)
for _, cpu := range usage {
if cpu.isolated {
continue
}

sortedUsage = append(sortedUsage, cpu)
}

for ctn, count := range balancedInstances {
// Handle instances where automatic CPU pinning is needed
for inst, cpusToPin := range balancedInstances {
sort.Sort(sortedUsage)
for _, cpu := range sortedUsage {
if count == 0 {
if cpusToPin == 0 {
break
}

count -= 1

id := cpu.strID
_, ok := pinning[ctn]
if ok {
pinning[ctn] = append(pinning[ctn], id)
} else {
pinning[ctn] = []string{id}
}
*cpu.count += 1
cpusToPin -= 1
pinning.add(inst, cpu)
}
}

Expand Down
Loading