diff --git a/lxd/devices.go b/lxd/devices.go index 124330402be1..5b30b4fe17b0 100644 --- a/lxd/devices.go +++ b/lxd/devices.go @@ -25,9 +25,10 @@ import ( ) type deviceTaskCPU struct { - id int64 - strID string - count *int + id int64 + strID string + count *int + isolated bool } type deviceTaskCPUs []deviceTaskCPU @@ -36,6 +37,19 @@ func (c deviceTaskCPUs) Len() int { return len(c) } func (c deviceTaskCPUs) Less(i, j int) bool { return *c[i].count < *c[j].count } func (c deviceTaskCPUs) Swap(i, j int) { c[i], c[j] = c[j], c[i] } +type instanceCPUPinningMap map[instance.Instance][]string + +func (pinning instanceCPUPinningMap) add(inst instance.Instance, cpu deviceTaskCPU) { + id := cpu.strID + _, ok := pinning[inst] + if ok { + pinning[inst] = append(pinning[inst], id) + } else { + pinning[inst] = []string{id} + } + *cpu.count += 1 +} + func deviceNetlinkListener() (chan []string, chan []string, chan device.USBEvent, chan device.UnixHotplugEvent, error) { NETLINK_KOBJECT_UEVENT := 15 //nolint:revive UEVENT_BUFFER_SIZE := 2048 //nolint:revive @@ -298,7 +312,7 @@ func deviceNetlinkListener() (chan []string, chan []string, chan device.USBEvent * The `loadBalancing` flag indicates whether the CPU pinning should be load balanced or not (e.g, NUMA placement when `limits.cpu` is a single number which means * a required number of vCPUs per instance that can be chosen within a CPU pool). */ -func fillFixedInstances(fixedInstances map[int64][]instance.Instance, inst instance.Instance, effectiveCpus []int64, targetCPUPool []int64, targetCPUNum int, loadBalancing bool) { +func fillFixedInstances(fixedInstances map[int64][]instance.Instance, inst instance.Instance, usableCpus []int64, targetCPUPool []int64, targetCPUNum int, loadBalancing bool) { if len(targetCPUPool) < targetCPUNum { diffCount := len(targetCPUPool) - targetCPUNum logger.Warn("Insufficient CPUs in the target pool for required pinning: reducing required CPUs to match available CPUs", logger.Ctx{"available": len(targetCPUPool), "required": targetCPUNum, "difference": -diffCount}) @@ -308,7 +322,8 @@ func fillFixedInstances(fixedInstances map[int64][]instance.Instance, inst insta // If the `targetCPUPool` has been manually specified (explicit CPU IDs/ranges specified with `limits.cpu`) if len(targetCPUPool) == targetCPUNum && !loadBalancing { for _, nr := range targetCPUPool { - if !shared.ValueInSlice(nr, effectiveCpus) { + if !shared.ValueInSlice(nr, usableCpus) { + logger.Warn("Instance using unavailable cpu", logger.Ctx{"project": inst.Project().Name, "instance": inst.Name(), "cpu": nr}) continue } @@ -366,6 +381,118 @@ func fillFixedInstances(fixedInstances map[int64][]instance.Instance, inst insta } } +// getCPULists returns lists of CPUs: +// cpus: list of CPUs which are available for instances to be automatically pinned on, +// isolCPUs: list of CPUs listed in "isolcpus" kernel parameter (not available for automatic pinning). +func getCPULists() (cpus []int64, isolCPUs []int64, err error) { + // Get effective cpus list - those are all guaranteed to be online + cg, err := cgroup.NewFileReadWriter(1, true) + if err != nil { + logger.Error("Unable to load cgroup writer", logger.Ctx{"err": err}) + return nil, nil, err + } + + effectiveCPUs, err := cg.GetEffectiveCpuset() + if err != nil { + // Older kernel - use cpuset.cpus + effectiveCPUs, err = cg.GetCpuset() + if err != nil { + logger.Error("Error reading host's cpuset.cpus", logger.Ctx{"err": err, "cpuset.cpus": effectiveCPUs}) + return nil, nil, err + } + } + + effectiveCPUsInt, err := resources.ParseCpuset(effectiveCPUs) + if err != nil { + logger.Error("Error parsing effective CPU set", logger.Ctx{"err": err, "cpuset.cpus": effectiveCPUs}) + return nil, nil, err + } + + isolCPUs = resources.GetCPUIsolated() + effectiveCPUsSlice := []string{} + for _, id := range effectiveCPUsInt { + if shared.ValueInSlice(id, isolCPUs) { + continue + } + + effectiveCPUsSlice = append(effectiveCPUsSlice, fmt.Sprintf("%d", id)) + } + + effectiveCPUs = strings.Join(effectiveCPUsSlice, ",") + cpus, err = resources.ParseCpuset(effectiveCPUs) + if err != nil { + logger.Error("Error parsing host's cpu set", logger.Ctx{"cpuset": effectiveCPUs, "err": err}) + return nil, nil, err + } + + return cpus, isolCPUs, nil +} + +// getNumaNodeToCPUMap returns map of NUMA node to CPU threads IDs. +func getNumaNodeToCPUMap() (numaNodeToCPU map[int64][]int64, err error) { + // Get CPU topology. + cpusTopology, err := resources.GetCPU() + if err != nil { + logger.Error("Unable to load system CPUs information", logger.Ctx{"err": err}) + return nil, err + } + + // Build a map of NUMA node to CPU threads. + numaNodeToCPU = make(map[int64][]int64) + for _, cpu := range cpusTopology.Sockets { + for _, core := range cpu.Cores { + for _, thread := range core.Threads { + numaNodeToCPU[int64(thread.NUMANode)] = append(numaNodeToCPU[int64(thread.NUMANode)], thread.ID) + } + } + } + + return numaNodeToCPU, err +} + +func makeCPUUsageMap(cpus, isolCPUs []int64) map[int64]deviceTaskCPU { + usage := make(map[int64]deviceTaskCPU, len(cpus)+len(isolCPUs)) + + // Helper function to create deviceTaskCPU + createCPU := func(id int64, isolated bool) deviceTaskCPU { + count := 0 + return deviceTaskCPU{ + id: id, + strID: strconv.FormatInt(id, 10), + count: &count, + isolated: isolated, + } + } + + // Process regular CPUs + for _, id := range cpus { + usage[id] = createCPU(id, false) + } + + // Process isolated CPUs + for _, id := range isolCPUs { + usage[id] = createCPU(id, true) + } + + return usage +} + +func getNumaCPUs(numaNodeToCPU map[int64][]int64, cpuNodes string) ([]int64, error) { + var numaCpus []int64 + if cpuNodes != "" { + numaNodeSet, err := resources.ParseNumaNodeSet(cpuNodes) + if err != nil { + return nil, err + } + + for _, numaNode := range numaNodeSet { + numaCpus = append(numaCpus, numaNodeToCPU[numaNode]...) + } + } + + return numaCpus, nil +} + // deviceTaskBalance is used to balance the CPU load across instances running on a host. // It first checks if CGroup support is available and returns if it isn't. // It then retrieves the effective CPU list (the CPUs that are guaranteed to be online) and isolates any isolated CPUs. @@ -392,43 +519,8 @@ func deviceTaskBalance(s *state.State) { return } - // Get effective cpus list - those are all guaranteed to be online - cg, err := cgroup.NewFileReadWriter(1, true) - if err != nil { - logger.Error("Unable to load cgroup writer", logger.Ctx{"err": err}) - return - } - - effectiveCpus, err := cg.GetEffectiveCpuset() - if err != nil { - // Older kernel - use cpuset.cpus - effectiveCpus, err = cg.GetCpuset() - if err != nil { - logger.Error("Error reading host's cpuset.cpus", logger.Ctx{"err": err, "cpuset.cpus": effectiveCpus}) - return - } - } - - effectiveCpusInt, err := resources.ParseCpuset(effectiveCpus) - if err != nil { - logger.Error("Error parsing effective CPU set", logger.Ctx{"err": err, "cpuset.cpus": effectiveCpus}) - return - } - - isolatedCpusInt := resources.GetCPUIsolated() - effectiveCpusSlice := []string{} - for _, id := range effectiveCpusInt { - if shared.ValueInSlice(id, isolatedCpusInt) { - continue - } - - effectiveCpusSlice = append(effectiveCpusSlice, fmt.Sprintf("%d", id)) - } - - effectiveCpus = strings.Join(effectiveCpusSlice, ",") - cpus, err := resources.ParseCpuset(effectiveCpus) + cpus, isolCPUs, err := getCPULists() if err != nil { - logger.Error("Error parsing host's cpu set", logger.Ctx{"cpuset": effectiveCpus, "err": err}) return } @@ -439,39 +531,20 @@ func deviceTaskBalance(s *state.State) { return } - // Get CPU topology. - cpusTopology, err := resources.GetCPU() + numaNodeToCPU, err := getNumaNodeToCPUMap() if err != nil { - logger.Error("Unable to load system CPUs information", logger.Ctx{"err": err}) return } - // Build a map of NUMA node to CPU threads. - numaNodeToCPU := make(map[int64][]int64) - for _, cpu := range cpusTopology.Sockets { - for _, core := range cpu.Cores { - for _, thread := range core.Threads { - numaNodeToCPU[int64(thread.NUMANode)] = append(numaNodeToCPU[int64(thread.NUMANode)], thread.ID) - } - } - } - fixedInstances := map[int64][]instance.Instance{} balancedInstances := map[instance.Instance]int{} for _, c := range instances { conf := c.ExpandedConfig() cpuNodes := conf["limits.cpu.nodes"] - var numaCpus []int64 - if cpuNodes != "" { - numaNodeSet, err := resources.ParseNumaNodeSet(cpuNodes) - if err != nil { - logger.Error("Error parsing numa node set", logger.Ctx{"numaNodes": cpuNodes, "err": err}) - return - } - - for _, numaNode := range numaNodeSet { - numaCpus = append(numaCpus, numaNodeToCPU[numaNode]...) - } + numaCpus, err := getNumaCPUs(numaNodeToCPU, cpuNodes) + if err != nil { + logger.Error("Error parsing numa node set", logger.Ctx{"numaNodes": cpuNodes, "err": err}) + return } cpulimit, ok := conf["limits.cpu"] @@ -481,7 +554,7 @@ func deviceTaskBalance(s *state.State) { if c.Type() == instancetype.VM { cpulimit = "1" } else { - cpulimit = effectiveCpus + cpulimit = strconv.Itoa(len(cpus)) } } @@ -522,65 +595,56 @@ func deviceTaskBalance(s *state.State) { logger.Warn("The pinned CPUs override the NUMA configuration CPUs", logger.Ctx{"pinnedCPUs": instanceCpus, "numaCPUs": numaCpus}) } - fillFixedInstances(fixedInstances, c, cpus, instanceCpus, len(instanceCpus), false) + usableCpus := cpus + // + // For VM instances, historically, we allow explicit pinning to a CPUs + // listed in "isolcpus" kernel parameter. While for containers it was + // never allowed and let's keep it like this. + // + if c.Type() == instancetype.VM { + usableCpus = append(usableCpus, isolCPUs...) + } + + fillFixedInstances(fixedInstances, c, usableCpus, instanceCpus, len(instanceCpus), false) } } // Balance things - pinning := map[instance.Instance][]string{} - usage := map[int64]deviceTaskCPU{} + pinning := instanceCPUPinningMap{} + usage := makeCPUUsageMap(cpus, isolCPUs) - for _, id := range cpus { - cpu := deviceTaskCPU{} - cpu.id = id - cpu.strID = fmt.Sprintf("%d", id) - count := 0 - cpu.count = &count - - usage[id] = cpu - } - - for cpu, ctns := range fixedInstances { + // Handle instances with explicit CPU pinnings + for cpu, instances := range fixedInstances { c, ok := usage[cpu] if !ok { - logger.Error("Internal error: instance using unavailable cpu") + logger.Error("Internal error: instance using unavailable cpu", logger.Ctx{"cpu": cpu}) continue } - id := c.strID - for _, ctn := range ctns { - _, ok := pinning[ctn] - if ok { - pinning[ctn] = append(pinning[ctn], id) - } else { - pinning[ctn] = []string{id} - } - *c.count += 1 + for _, inst := range instances { + pinning.add(inst, c) } } sortedUsage := make(deviceTaskCPUs, 0) - for _, value := range usage { - sortedUsage = append(sortedUsage, value) + for _, cpu := range usage { + if cpu.isolated { + continue + } + + sortedUsage = append(sortedUsage, cpu) } - for ctn, count := range balancedInstances { + // Handle instances where automatic CPU pinning is needed + for inst, cpusToPin := range balancedInstances { sort.Sort(sortedUsage) for _, cpu := range sortedUsage { - if count == 0 { + if cpusToPin == 0 { break } - count -= 1 - - id := cpu.strID - _, ok := pinning[ctn] - if ok { - pinning[ctn] = append(pinning[ctn], id) - } else { - pinning[ctn] = []string{id} - } - *cpu.count += 1 + cpusToPin -= 1 + pinning.add(inst, cpu) } }