Skip to content

Commit

Permalink
fix(region,host,scheduler): isolated device balance with to reserve c…
Browse files Browse the repository at this point in the history
…pus (#21614)
  • Loading branch information
wanyaoqi authored Nov 17, 2024
1 parent a6e7ea5 commit 84ecb2e
Show file tree
Hide file tree
Showing 13 changed files with 420 additions and 36 deletions.
3 changes: 2 additions & 1 deletion pkg/apis/compute/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,8 @@ type SHostPingInput struct {
type HostReserveCpusInput struct {
Cpus string
Mems string
DisableSchedLoadBalance *bool `json:"disable_sched_load_balance"`
DisableSchedLoadBalance *bool `json:"disable_sched_load_balance"`
ProcessesPrefix []string `json:"processes_prefix"`
}

type HostAutoMigrateInput struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/compute/host_const.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,4 +185,5 @@ const (

const (
HOSTMETA_RESERVED_CPUS_INFO = "reserved_cpus_info"
HOSTMETA_RESERVED_CPUS_RATE = "reserved_cpus_rate"
)
70 changes: 61 additions & 9 deletions pkg/compute/models/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -4750,6 +4750,50 @@ func (hh *SHost) PerformPing(ctx context.Context, userCred mcclient.TokenCredent
return result, nil
}

func (host *SHost) getHostNodeReservePercent(reservedCpusStr string) (map[string]float32, error) {
reservedCpuset, err := cpuset.Parse(reservedCpusStr)
if err != nil {
return nil, errors.Wrap(err, "cpuset parse reserved cpus")
}

topoObj, err := host.SysInfo.Get("topology")
if err != nil {
return nil, errors.Wrap(err, "get topology from host sys_info")
}
info := new(hostapi.HostTopology)
if err := topoObj.Unmarshal(info); err != nil {
return nil, errors.Wrap(err, "Unmarshal host topology struct")
}
nodecpus := map[int]int{}
nodeReservedCpus := map[int]int{}
for i := range info.Nodes {
cSet := cpuset.NewBuilder()
for j := 0; j < len(info.Nodes[i].Cores); j++ {
for k := 0; k < len(info.Nodes[i].Cores[j].LogicalProcessors); k++ {
if reservedCpuset.Contains(info.Nodes[i].Cores[j].LogicalProcessors[k]) {
if cnt, ok := nodeReservedCpus[info.Nodes[i].ID]; !ok {
nodeReservedCpus[info.Nodes[i].ID] = 1
} else {
nodeReservedCpus[info.Nodes[i].ID] = 1 + cnt
}
}

cSet.Add(info.Nodes[i].Cores[j].LogicalProcessors[k])
}
}
nodecpus[info.Nodes[i].ID] = cSet.Result().Size()
}
reserveRate := map[string]float32{}
for nodeId, cnt := range nodecpus {
reserveCnt, ok := nodeReservedCpus[nodeId]
if !ok {
reserveCnt = 0
}
reserveRate[strconv.Itoa(nodeId)] = float32(reserveCnt) / float32(cnt)
}
return reserveRate, nil
}

func (host *SHost) getHostLogicalCores() ([]int, error) {
cpuObj, err := host.SysInfo.Get("cpu_info")
if err != nil {
Expand Down Expand Up @@ -4805,14 +4849,6 @@ func (hh *SHost) PerformReserveCpus(
return nil, httperrors.NewNotSupportedError("host type %s not support reserve cpus", hh.HostType)
}

cnt, err := hh.GetRunningGuestCount()
if err != nil {
return nil, err
}
if cnt > 0 {
return nil, httperrors.NewBadRequestError("host %s has %d guests, can't update reserve cpus", hh.Id, cnt)
}

if input.Cpus == "" {
return nil, httperrors.NewInputParameterError("missing cpus")
}
Expand Down Expand Up @@ -4847,11 +4883,27 @@ func (hh *SHost) PerformReserveCpus(
}
}

if len(input.Cpus) > 0 {
reservePercent, err := hh.getHostNodeReservePercent(input.Cpus)
if err != nil {
return nil, errors.Errorf("failed getHostNodeReservePercent: %s", err)
}
err = hh.SetMetadata(ctx, api.HOSTMETA_RESERVED_CPUS_RATE, reservePercent, userCred)
if err != nil {
return nil, err
}
} else {
err = hh.RemoveMetadata(ctx, api.HOSTMETA_RESERVED_CPUS_RATE, userCred)
if err != nil {
return nil, err
}
}

err = hh.SetMetadata(ctx, api.HOSTMETA_RESERVED_CPUS_INFO, input, userCred)
if err != nil {
return nil, err
}
if hh.CpuReserved < cs.Size() {
if hh.CpuReserved != cs.Size() {
_, err = db.Update(hh, func() error {
hh.CpuReserved = cs.Size()
return nil
Expand Down
231 changes: 224 additions & 7 deletions pkg/compute/models/isolated_devices.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"math"
"reflect"
"sort"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -652,6 +653,121 @@ func (pq *SorttedGroupDevs) Pop() interface{} {
return item
}

type SNodeIsolateDevicesInfo struct {
TotalDevCount int
ReservedRate float32
}

func (manager *SIsolatedDeviceManager) getDevNodesUsedRate(
ctx context.Context, host *SHost, devConfig *api.IsolatedDeviceConfig, topo *hostapi.HostTopology,
) (map[string]SNodeIsolateDevicesInfo, error) {
devs, err := manager.findHostDevsByDevConfig(devConfig.Model, devConfig.DevType, host.Id, devConfig.WireId)
if err != nil || len(devs) == 0 {
return nil, fmt.Errorf("Can't found model %s on host %s", devConfig.Model, host.Id)
}
mapDevs := map[string][]SIsolatedDevice{}
for i := range devs {
dev := devs[i]
devPath := dev.DevicePath
var gdevs []SIsolatedDevice

gdevs, ok := mapDevs[devPath]
if !ok {
gdevs = []SIsolatedDevice{dev}
} else {
gdevs = append(gdevs, dev)
}
mapDevs[devPath] = gdevs
}
nodesGroupDevs := map[string]SorttedGroupDevs{}
for devPath, mappedDevs := range mapDevs {
numaNode := strconv.Itoa(int(mappedDevs[0].NumaNode))
if _, ok := nodesGroupDevs[numaNode]; ok {
nodesGroupDevs[numaNode] = append(nodesGroupDevs[numaNode], &GroupDevs{
DevPath: devPath,
Devs: mappedDevs,
})
} else {
groupDevs := make(SorttedGroupDevs, 0)
nodesGroupDevs[numaNode] = append(groupDevs, &GroupDevs{
DevPath: devPath,
Devs: mappedDevs,
})
}
}

reserveRate := map[string]float32{}
reserveRateStr := host.GetMetadata(ctx, api.HOSTMETA_RESERVED_CPUS_RATE, nil)
reserveRateJ, err := jsonutils.ParseString(reserveRateStr)
if err != nil {
return nil, errors.Wrap(err, "parse reserveRateStr")
}
err = reserveRateJ.Unmarshal(&reserveRate)
if err != nil {
return nil, errors.Wrap(err, "unmarshal reserveRateStr")
}

nodeNoDevIds := map[int]int{}
for i := range topo.Nodes {
nodeId := strconv.Itoa(topo.Nodes[i].ID)
if _, ok := nodesGroupDevs[nodeId]; !ok {
nodeInt, _ := strconv.Atoi(nodeId)
nodeNoDevIds[nodeInt] = -1
}
}
//
//for nodeId, _ := range reserveRate {
// if _, ok := nodesGroupDevs[nodeId]; !ok {
// nodeInt, _ := strconv.Atoi(nodeId)
// nodeNoDevIds[nodeInt] = -1
// }
//}

reserveNodes := map[string][]string{}
for i := range topo.Nodes {
if _, ok := nodeNoDevIds[topo.Nodes[i].ID]; ok {
minDistance := int(math.MaxInt16)
selectNodeId := ""
for nodeId, _ := range nodesGroupDevs {
nodeInt, _ := strconv.Atoi(nodeId)
if topo.Nodes[i].Distances[nodeInt] < minDistance {
selectNodeId = strconv.Itoa(nodeInt)
minDistance = topo.Nodes[i].Distances[nodeInt]
}
}
noDevNodeId := strconv.Itoa(topo.Nodes[i].ID)
log.Debugf("node %s select node %s", noDevNodeId, selectNodeId)
if nodes, ok := reserveNodes[selectNodeId]; ok {
reserveNodes[selectNodeId] = append(nodes, noDevNodeId)
} else {
reserveNodes[selectNodeId] = []string{noDevNodeId}
}
}
}
reserveRates := map[string]SNodeIsolateDevicesInfo{}
for nodeId, devGroups := range nodesGroupDevs {
nodeCnt := 1
nodeReserveRate := reserveRate[nodeId]
if nodes, ok := reserveNodes[nodeId]; ok {
for i := range nodes {
nodeReserveRate += reserveRate[nodes[i]]
nodeCnt += 1
}
}
nodeReserveRate = nodeReserveRate / float32(nodeCnt)
devCnt := 0
for i := range devGroups {
devCnt += len(devGroups[i].Devs)
}
reserveRates[nodeId] = SNodeIsolateDevicesInfo{
TotalDevCount: devCnt,
ReservedRate: nodeReserveRate,
}
log.Debugf("node %v nodeCnt %v nodeReserveRate %v", nodeId, nodeCnt, nodeReserveRate)
}
return reserveRates, nil
}

func (manager *SIsolatedDeviceManager) attachHostDeviceToGuestByModel(
ctx context.Context, guest *SGuest, host *SHost, devConfig *api.IsolatedDeviceConfig,
userCred mcclient.TokenCredential, usedDevMap map[string]*SIsolatedDevice, preferNumaNodes []int,
Expand All @@ -664,8 +780,8 @@ func (manager *SIsolatedDeviceManager) attachHostDeviceToGuestByModel(
if err != nil || len(devs) == 0 {
return fmt.Errorf("Can't found model %s on host %s", devConfig.Model, host.Id)
}
// 1. group devices by device_path
groupDevs := make(SorttedGroupDevs, 0)
// 1. group devices by device_path and numa nodes
//groupDevs := make(SorttedGroupDevs, 0)
mapDevs := map[string][]SIsolatedDevice{}
for i := range devs {
dev := devs[i]
Expand All @@ -680,11 +796,89 @@ func (manager *SIsolatedDeviceManager) attachHostDeviceToGuestByModel(
}
mapDevs[devPath] = gdevs
}
for devPath, mappedDevs := range mapDevs {
groupDevs = append(groupDevs, &GroupDevs{
DevPath: devPath,
Devs: mappedDevs,
})

var groupDevs SorttedGroupDevs
if len(preferNumaNodes) > 0 {
groupDevs = make(SorttedGroupDevs, 0)
for devPath, mappedDevs := range mapDevs {
groupDevs = append(groupDevs, &GroupDevs{
DevPath: devPath,
Devs: mappedDevs,
})
}
} else {
nodesGroupDevs := map[int8]SorttedGroupDevs{}
for devPath, mappedDevs := range mapDevs {
numaNode := mappedDevs[0].NumaNode
if _, ok := nodesGroupDevs[numaNode]; ok {
nodesGroupDevs[numaNode] = append(nodesGroupDevs[numaNode], &GroupDevs{
DevPath: devPath,
Devs: mappedDevs,
})
} else {
groupDevs := make(SorttedGroupDevs, 0)
nodesGroupDevs[numaNode] = append(groupDevs, &GroupDevs{
DevPath: devPath,
Devs: mappedDevs,
})
}
}

var selectedNode int8 = -1
if len(nodesGroupDevs) == 1 {
for nodeId := range nodesGroupDevs {
selectedNode = nodeId
}
} else {
reservedCpusStr := host.GetMetadata(ctx, api.HOSTMETA_RESERVED_CPUS_INFO, nil)
if len(reservedCpusStr) > 0 {
topoObj, err := host.SysInfo.Get("topology")
if err != nil {
return errors.Wrap(err, "get topology from host sys_info")
}
topo := new(hostapi.HostTopology)
if err := topoObj.Unmarshal(topo); err != nil {
return errors.Wrap(err, "Unmarshal host topology struct")
}
nodesReserveRate, err := manager.getDevNodesUsedRate(ctx, host, devConfig, topo)
if err != nil {
return err
}
var selectedNodeUtil float32 = 1.0
for nodeId, gds := range nodesGroupDevs {
freeDevCnt := 0
for i := range gds {
freeDevCnt += len(gds[i].Devs)
}

nodeTotalCnt := nodesReserveRate[strconv.Itoa(int(nodeId))].TotalDevCount
usedDevCnt := nodeTotalCnt - freeDevCnt

nodeReserveRate := nodesReserveRate[strconv.Itoa(int(nodeId))].ReservedRate
nodeCnt := (1 - nodeReserveRate) * float32(nodeTotalCnt)
nodeutil := float32(usedDevCnt) / nodeCnt
log.Debugf("selectedNodeUtil node %v util %v usedDevCnt %v totalDevCnt %v", nodeId, nodeutil, usedDevCnt, nodeCnt)
if nodeutil < selectedNodeUtil {
selectedNodeUtil = nodeutil
selectedNode = nodeId
}
}
} else {
var selectedNodeDevCnt = 0
for nodeId, gds := range nodesGroupDevs {
devCnt := 0
for i := range gds {
devCnt += len(gds[i].Devs)
}
if devCnt > selectedNodeDevCnt {
selectedNodeDevCnt = devCnt
selectedNode = nodeId
}
}
}
}
log.Debugf("selectedNodeUtil node %v", selectedNode)
groupDevs = nodesGroupDevs[selectedNode]
}
sort.Sort(groupDevs)

Expand Down Expand Up @@ -845,6 +1039,29 @@ func (manager *SIsolatedDeviceManager) findHostUnusedByDevConfig(model, devType,
return manager.findHostUnusedByDevAttr(model, "dev_type", devType, hostId, wireId)
}

func (manager *SIsolatedDeviceManager) findHostDevsByDevConfig(model, devType, hostId, wireId string) ([]SIsolatedDevice, error) {
return manager.findHostDevsByDevAttr(model, "dev_type", devType, hostId, wireId)
}
func (manager *SIsolatedDeviceManager) findHostDevsByDevAttr(model, attrKey, attrVal, hostId, wireId string) ([]SIsolatedDevice, error) {
devs := make([]SIsolatedDevice, 0)
q := manager.Query()
q = q.Equals("model", model).Equals("host_id", hostId)
if attrVal != "" {
q.Equals(attrKey, attrVal)
}
if wireId != "" {
wire := WireManager.FetchWireById(wireId)
if wire.VpcId == api.DEFAULT_VPC_ID {
q = q.Equals("wire_id", wireId)
}
}
err := db.FetchModelObjects(manager, q, &devs)
if err != nil {
return nil, err
}
return devs, nil
}

func (manager *SIsolatedDeviceManager) findHostUnusedByDevAttr(model, attrKey, attrVal, hostId, wireId string) ([]SIsolatedDevice, error) {
devs := make([]SIsolatedDevice, 0)
q := manager.findUnusedQuery()
Expand Down
Loading

0 comments on commit 84ecb2e

Please sign in to comment.