Skip to content

Commit

Permalink
fix(qrm): fix cpu admit failed caused by float ceil
Browse files Browse the repository at this point in the history
  • Loading branch information
nightmeng committed Oct 8, 2024
1 parent f6623f4 commit 30f2294
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 43 deletions.
70 changes: 35 additions & 35 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (p *DynamicPolicy) dedicatedCoresWithNUMABindingHintHandler(_ context.Conte
})
}

reqInt, _, err := util.GetQuantityFromResourceReq(req)
reqInt, request, err := util.GetQuantityFromResourceReq(req)
if err != nil {
return nil, fmt.Errorf("getReqQuantityFromResourceReq failed with error: %v", err)
}
Expand Down Expand Up @@ -179,7 +179,7 @@ func (p *DynamicPolicy) dedicatedCoresWithNUMABindingHintHandler(_ context.Conte
if hints == nil {
var calculateErr error
// calculate hint for container without allocated cpus
hints, calculateErr = p.calculateHints(reqInt, machineState, req)
hints, calculateErr = p.calculateHints(request, machineState, req)
if calculateErr != nil {
return nil, fmt.Errorf("calculateHints failed with error: %v", calculateErr)
}
Expand All @@ -197,7 +197,7 @@ func (p *DynamicPolicy) dedicatedCoresWithoutNUMABindingHintHandler(_ context.Co

// calculateHints is a helper function to calculate the topology hints
// with the given container requests.
func (p *DynamicPolicy) calculateHints(reqInt int,
func (p *DynamicPolicy) calculateHints(request float64,
machineState state.NUMANodeMap,
req *pluginapi.ResourceRequest,
) (map[string]*pluginapi.ListOfTopologyHints, error) {
Expand All @@ -211,7 +211,7 @@ func (p *DynamicPolicy) calculateHints(reqInt int,
}
sort.Ints(numaNodes)

minNUMAsCountNeeded, _, err := util.GetNUMANodesCountToFitCPUReq(reqInt, p.machineInfo.CPUTopology)
minNUMAsCountNeeded, _, err := util.GetNUMANodesCountToFitCPUReq(request, p.machineInfo.CPUTopology)
if err != nil {
return nil, fmt.Errorf("GetNUMANodesCountToFitCPUReq failed with error: %v", err)
}
Expand Down Expand Up @@ -248,8 +248,8 @@ func (p *DynamicPolicy) calculateHints(reqInt int,
}
}

general.Infof("calculate hints with req: %d, numaToAvailableCPUCount: %+v",
reqInt, numaToAvailableCPUCount)
general.Infof("calculate hints with req: %.3f, numaToAvailableCPUCount: %+v",
request, numaToAvailableCPUCount)

numaBound := len(numaNodes)
if numaBound > machine.LargeNUMAsPoint {
Expand Down Expand Up @@ -277,7 +277,7 @@ func (p *DynamicPolicy) calculateHints(reqInt int,
allAvailableCPUsCountInMask += numaToAvailableCPUCount[nodeID]
}

if allAvailableCPUsCountInMask < reqInt {
if float64(allAvailableCPUsCountInMask) < request {
return
}

Expand Down Expand Up @@ -321,7 +321,7 @@ func (p *DynamicPolicy) calculateHints(reqInt int,
_ = p.emitter.StoreInt64(util.MetricNameGetNUMAAllocatedMemBWFailed, 1, metrics.MetricTypeNameRaw)
} else {
p.updatePreferredCPUHintsByMemBW(preferredHintIndexes, availableNumaHints,
reqInt, numaAllocatedMemBW, req, numaExclusive)
request, numaAllocatedMemBW, req, numaExclusive)
}
}

Expand Down Expand Up @@ -403,7 +403,7 @@ func getNUMAAllocatedMemBW(machineState state.NUMANodeMap, metaServer *metaserve
return numaAllocatedMemBW, nil
}

func (p *DynamicPolicy) updatePreferredCPUHintsByMemBW(preferredHintIndexes []int, cpuHints []*pluginapi.TopologyHint, reqInt int,
func (p *DynamicPolicy) updatePreferredCPUHintsByMemBW(preferredHintIndexes []int, cpuHints []*pluginapi.TopologyHint, request float64,
numaAllocatedMemBW map[int]int, req *pluginapi.ResourceRequest, numaExclusive bool,
) {
if len(preferredHintIndexes) == 0 {
Expand All @@ -421,7 +421,7 @@ func (p *DynamicPolicy) updatePreferredCPUHintsByMemBW(preferredHintIndexes []in
Name: req.PodName,
Labels: req.Labels,
Annotations: req.Annotations,
}, reqInt)
}, int(math.Ceil(request)))
if err != nil {
general.Errorf("GetContainerMemoryBandwidthRequest failed with error: %v", err)
return
Expand Down Expand Up @@ -591,7 +591,7 @@ func (p *DynamicPolicy) sharedCoresWithNUMABindingHintHandler(_ context.Context,
}

// calc the hints with the pod aggregated request
reqInt, _, err := util.GetPodAggregatedRequestResource(req)
reqInt, request, err := util.GetPodAggregatedRequestResource(req)
if err != nil {
return nil, fmt.Errorf("getReqQuantityFromResourceReq failed with error: %v", err)
}
Expand Down Expand Up @@ -632,11 +632,11 @@ func (p *DynamicPolicy) sharedCoresWithNUMABindingHintHandler(_ context.Context,
return nil, fmt.Errorf("snb port not support cross numa")
}
nodeID := numaset.ToSliceInt()[0]
availableCPUQuantity := machineState[nodeID].GetAvailableCPUQuantity(p.reservedCPUs)
_, availableCPUQuantity := machineState[nodeID].GetAvailableCPUQuantity(p.reservedCPUs)

general.Infof("pod: %s/%s, container: %s request cpu inplace update resize on numa %d (available: %d, request: %d)",
req.PodNamespace, req.PodName, req.ContainerName, nodeID, availableCPUQuantity, reqInt)
if reqInt > availableCPUQuantity { // no left resource to scale out
general.Infof("pod: %s/%s, container: %s request cpu inplace update resize on numa %d (available: %.3f, request: %.3f)",
req.PodNamespace, req.PodName, req.ContainerName, nodeID, availableCPUQuantity, request)
if request > availableCPUQuantity { // no left resource to scale out
general.Infof("pod: %s/%s, container: %s request cpu inplace update resize, but no enough resource for it in current NUMA, checking migratable",
req.PodNamespace, req.PodName, req.ContainerName)
// TODO move this var to config
Expand All @@ -645,7 +645,7 @@ func (p *DynamicPolicy) sharedCoresWithNUMABindingHintHandler(_ context.Context,
general.Infof("pod: %s/%s, container: %s request inplace update resize and no enough resource in current NUMA, try to migrate it to new NUMA",
req.PodNamespace, req.PodName, req.ContainerName)
var calculateErr error
hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(reqInt, podEntries, machineState, req)
hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(request, podEntries, machineState, req)
if calculateErr != nil {
general.Errorf("pod: %s/%s, container: %s request inplace update resize and no enough resource in current NUMA, failed to migrate it to new NUMA",
req.PodNamespace, req.PodName, req.ContainerName)
Expand All @@ -662,7 +662,7 @@ func (p *DynamicPolicy) sharedCoresWithNUMABindingHintHandler(_ context.Context,
}
} else if hints == nil {
var calculateErr error
hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(reqInt, podEntries, machineState, req)
hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(request, podEntries, machineState, req)
if calculateErr != nil {
return nil, fmt.Errorf("calculateHintsForNUMABindingSharedCores failed with error: %v", calculateErr)
}
Expand All @@ -687,26 +687,26 @@ func (p *DynamicPolicy) clearContainerAndRegenerateMachineState(podEntries state
}

func (p *DynamicPolicy) populateHintsByPreferPolicy(numaNodes []int, preferPolicy string,
hints map[string]*pluginapi.ListOfTopologyHints, machineState state.NUMANodeMap, reqInt int,
hints map[string]*pluginapi.ListOfTopologyHints, machineState state.NUMANodeMap, request float64,
) {
preferIndexes, maxLeft, minLeft := []int{}, -1, math.MaxInt
preferIndexes, maxLeft, minLeft := []int{}, float64(-1), math.MaxFloat64

for _, nodeID := range numaNodes {
availableCPUQuantity := machineState[nodeID].GetAvailableCPUQuantity(p.reservedCPUs)
_, availableCPUQuantity := machineState[nodeID].GetAvailableCPUQuantity(p.reservedCPUs)

if availableCPUQuantity < reqInt {
general.Warningf("numa_binding shared_cores container skip NUMA: %d available: %d",
nodeID, availableCPUQuantity)
if availableCPUQuantity < request {
general.Warningf("numa_binding shared_cores container skip NUMA: %d available: %.3f request: %.3f",
nodeID, availableCPUQuantity, request)
continue
}

hints[string(v1.ResourceCPU)].Hints = append(hints[string(v1.ResourceCPU)].Hints, &pluginapi.TopologyHint{
Nodes: []uint64{uint64(nodeID)},
})

curLeft := availableCPUQuantity - reqInt
curLeft := availableCPUQuantity - request

general.Infof("NUMA: %d, left cpu quantity: %d", nodeID, curLeft)
general.Infof("NUMA: %d, left cpu quantity: %.3f", nodeID, curLeft)

if preferPolicy == cpuconsts.CPUNUMAHintPreferPolicyPacking {
if curLeft < minLeft {
Expand All @@ -732,14 +732,14 @@ func (p *DynamicPolicy) populateHintsByPreferPolicy(numaNodes []int, preferPolic
}
}

func (p *DynamicPolicy) filterNUMANodesByHintPreferLowThreshold(reqInt int,
func (p *DynamicPolicy) filterNUMANodesByHintPreferLowThreshold(request float64,
machineState state.NUMANodeMap, numaNodes []int,
) ([]int, []int) {
filteredNUMANodes := make([]int, 0, len(numaNodes))
filteredOutNUMANodes := make([]int, 0, len(numaNodes))

for _, nodeID := range numaNodes {
availableCPUQuantity := machineState[nodeID].GetAvailableCPUQuantity(p.reservedCPUs)
availableCPUQuantity, _ := machineState[nodeID].GetAvailableCPUQuantity(p.reservedCPUs)
allocatableCPUQuantity := machineState[nodeID].GetFilteredDefaultCPUSet(nil, nil).Difference(p.reservedCPUs).Size()

if allocatableCPUQuantity == 0 {
Expand All @@ -749,7 +749,7 @@ func (p *DynamicPolicy) filterNUMANodesByHintPreferLowThreshold(reqInt int,

availableRatio := float64(availableCPUQuantity) / float64(allocatableCPUQuantity)

general.Infof("NUMA: %d, availableCPUQuantity: %d, allocatableCPUQuantity: %d, availableRatio: %.2f, cpuNUMAHintPreferLowThreshold:%.2f",
general.Infof("NUMA: %d, availableCPUQuantity: %d, allocatableCPUQuantity: %d, availableRatio: %.3f, cpuNUMAHintPreferLowThreshold:%.3f",
nodeID, availableCPUQuantity, allocatableCPUQuantity, availableRatio, p.cpuNUMAHintPreferLowThreshold)

if availableRatio >= p.cpuNUMAHintPreferLowThreshold {
Expand Down Expand Up @@ -790,7 +790,7 @@ func (p *DynamicPolicy) filterNUMANodesByNonBindingSharedRequestedQuantity(nonBi
return filteredNUMANodes
}

func (p *DynamicPolicy) calculateHintsForNUMABindingSharedCores(reqInt int, podEntries state.PodEntries,
func (p *DynamicPolicy) calculateHintsForNUMABindingSharedCores(request float64, podEntries state.PodEntries,
machineState state.NUMANodeMap,
req *pluginapi.ResourceRequest,
) (map[string]*pluginapi.ListOfTopologyHints, error) {
Expand All @@ -809,7 +809,7 @@ func (p *DynamicPolicy) calculateHintsForNUMABindingSharedCores(reqInt int, podE
},
}

minNUMAsCountNeeded, _, err := util.GetNUMANodesCountToFitCPUReq(reqInt, p.machineInfo.CPUTopology)
minNUMAsCountNeeded, _, err := util.GetNUMANodesCountToFitCPUReq(request, p.machineInfo.CPUTopology)
if err != nil {
return nil, fmt.Errorf("GetNUMANodesCountToFitCPUReq failed with error: %v", err)
}
Expand All @@ -822,21 +822,21 @@ func (p *DynamicPolicy) calculateHintsForNUMABindingSharedCores(reqInt int, podE
switch p.cpuNUMAHintPreferPolicy {
case cpuconsts.CPUNUMAHintPreferPolicyPacking, cpuconsts.CPUNUMAHintPreferPolicySpreading:
general.Infof("apply %s policy on NUMAs: %+v", p.cpuNUMAHintPreferPolicy, numaNodes)
p.populateHintsByPreferPolicy(numaNodes, p.cpuNUMAHintPreferPolicy, hints, machineState, reqInt)
p.populateHintsByPreferPolicy(numaNodes, p.cpuNUMAHintPreferPolicy, hints, machineState, request)
case cpuconsts.CPUNUMAHintPreferPolicyDynamicPacking:
filteredNUMANodes, filteredOutNUMANodes := p.filterNUMANodesByHintPreferLowThreshold(reqInt, machineState, numaNodes)
filteredNUMANodes, filteredOutNUMANodes := p.filterNUMANodesByHintPreferLowThreshold(request, machineState, numaNodes)

if len(filteredNUMANodes) > 0 {
general.Infof("dynamically apply packing policy on NUMAs: %+v", filteredNUMANodes)
p.populateHintsByPreferPolicy(filteredNUMANodes, cpuconsts.CPUNUMAHintPreferPolicyPacking, hints, machineState, reqInt)
p.populateHintsByPreferPolicy(filteredNUMANodes, cpuconsts.CPUNUMAHintPreferPolicyPacking, hints, machineState, request)
p.populateNotPreferredHintsByAvailableNUMANodes(filteredOutNUMANodes, hints)
} else {
general.Infof("empty filteredNUMANodes, dynamically apply spreading policy on NUMAs: %+v", numaNodes)
p.populateHintsByPreferPolicy(numaNodes, cpuconsts.CPUNUMAHintPreferPolicySpreading, hints, machineState, reqInt)
p.populateHintsByPreferPolicy(numaNodes, cpuconsts.CPUNUMAHintPreferPolicySpreading, hints, machineState, request)
}
default:
general.Infof("unknown policy: %s, apply default spreading policy on NUMAs: %+v", p.cpuNUMAHintPreferPolicy, numaNodes)
p.populateHintsByPreferPolicy(numaNodes, cpuconsts.CPUNUMAHintPreferPolicySpreading, hints, machineState, reqInt)
p.populateHintsByPreferPolicy(numaNodes, cpuconsts.CPUNUMAHintPreferPolicySpreading, hints, machineState, request)
}

// NOTE: because grpc is inability to distinguish between an empty array and nil,
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,9 +562,9 @@ func (ns *NUMANodeState) GetAvailableCPUSet(reservedCPUs machine.CPUSet) machine
// It's used when allocating CPUs for shared_cores with numa_binding containers,
// since pool size may be adjusted, and DefaultCPUSet & AllocatedCPUSet are calculated by pool size,
// we should use allocationInfo.RequestQuantity to calculate available cpu quantity for candidate shared_cores with numa_binding container.
func (ns *NUMANodeState) GetAvailableCPUQuantity(reservedCPUs machine.CPUSet) int {
func (ns *NUMANodeState) GetAvailableCPUQuantity(reservedCPUs machine.CPUSet) (int, float64) {
if ns == nil {
return 0
return 0, 0
}

allocatableQuantity := ns.GetFilteredDefaultCPUSet(nil, nil).Difference(reservedCPUs).Size()
Expand Down Expand Up @@ -599,7 +599,7 @@ func (ns *NUMANodeState) GetAvailableCPUQuantity(reservedCPUs machine.CPUSet) in
}

allocatedQuantity := int(math.Ceil(preciseAllocatedQuantity))
return general.Max(allocatableQuantity-allocatedQuantity, 0)
return general.Max(allocatableQuantity-allocatedQuantity, 0), general.MaxFloat64(float64(allocatableQuantity)-preciseAllocatedQuantity, 0)
}

// GetFilteredDefaultCPUSet returns default cpuset in this numa, along with the filter functions
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3271,5 +3271,6 @@ func TestGetAvailableCPUQuantity(t *testing.T) {
},
},
}
require.Equal(t, 15, nodeState.GetAvailableCPUQuantity(machine.NewCPUSet()))
cpuQuantity, _ := nodeState.GetAvailableCPUQuantity(machine.NewCPUSet())
require.Equal(t, 15, cpuQuantity)
}
8 changes: 4 additions & 4 deletions pkg/agent/qrm-plugins/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func PackResourceHintsResponse(req *pluginapi.ResourceRequest, resourceName stri
// GetNUMANodesCountToFitCPUReq is used to calculate the amount of numa nodes
// we need if we try to allocate cpu cores among them, assuming that all numa nodes
// contain the same cpu capacity
func GetNUMANodesCountToFitCPUReq(cpuReq int, cpuTopology *machine.CPUTopology) (int, int, error) {
func GetNUMANodesCountToFitCPUReq(cpuReq float64, cpuTopology *machine.CPUTopology) (int, int, error) {
if cpuTopology == nil {
return 0, 0, fmt.Errorf("GetNumaNodesToFitCPUReq got nil cpuTopology")
}
Expand All @@ -210,14 +210,14 @@ func GetNUMANodesCountToFitCPUReq(cpuReq int, cpuTopology *machine.CPUTopology)
}

cpusPerNUMA := cpuTopology.NumCPUs / numaCount
numaCountNeeded := int(math.Ceil(float64(cpuReq) / float64(cpusPerNUMA)))
numaCountNeeded := int(math.Ceil(cpuReq / float64(cpusPerNUMA)))
if numaCountNeeded == 0 {
return 0, 0, fmt.Errorf("zero numaCountNeeded")
} else if numaCountNeeded > numaCount {
return 0, 0, fmt.Errorf("invalid cpu req: %d in topology with NUMAs count: %d and CPUs count: %d", cpuReq, numaCount, cpuTopology.NumCPUs)
return 0, 0, fmt.Errorf("invalid cpu req: %.3f in topology with NUMAs count: %d and CPUs count: %d", cpuReq, numaCount, cpuTopology.NumCPUs)
}

cpusCountNeededPerNUMA := int(math.Ceil(float64(cpuReq) / float64(numaCountNeeded)))
cpusCountNeededPerNUMA := int(math.Ceil(cpuReq / float64(numaCountNeeded)))
return numaCountNeeded, cpusCountNeededPerNUMA, nil
}

Expand Down

0 comments on commit 30f2294

Please sign in to comment.