diff --git a/Makefile b/Makefile index 4e83bcc378..fe123162a6 100644 --- a/Makefile +++ b/Makefile @@ -122,7 +122,7 @@ vet: ## Run go vet against code. .PHONY: test test: ## Run go test against code. - go test -v -coverprofile=coverage.txt -parallel=16 -p=16 -covermode=atomic -race -coverpkg=./... \ + go test -v -json -coverprofile=coverage.txt -parallel=16 -p=16 -covermode=atomic -race -coverpkg=./... \ `go list ./pkg/... | grep -E -v "pkg/scheduler|pkg/controller/resource-recommend|pkg/util/resource-recommend"` .PHONY: license diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go index cebdff5c17..490c4713c9 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_hint_handlers.go @@ -133,7 +133,7 @@ func (p *DynamicPolicy) dedicatedCoresWithNUMABindingHintHandler(_ context.Conte }) } - reqInt, _, err := util.GetQuantityFromResourceReq(req) + reqInt, request, err := util.GetQuantityFromResourceReq(req) if err != nil { return nil, fmt.Errorf("getReqQuantityFromResourceReq failed with error: %v", err) } @@ -179,7 +179,7 @@ func (p *DynamicPolicy) dedicatedCoresWithNUMABindingHintHandler(_ context.Conte if hints == nil { var calculateErr error // calculate hint for container without allocated cpus - hints, calculateErr = p.calculateHints(reqInt, machineState, req) + hints, calculateErr = p.calculateHints(request, machineState, req) if calculateErr != nil { return nil, fmt.Errorf("calculateHints failed with error: %v", calculateErr) } @@ -197,7 +197,7 @@ func (p *DynamicPolicy) dedicatedCoresWithoutNUMABindingHintHandler(_ context.Co // calculateHints is a helper function to calculate the topology hints // with the given container requests. -func (p *DynamicPolicy) calculateHints(reqInt int, +func (p *DynamicPolicy) calculateHints(request float64, machineState state.NUMANodeMap, req *pluginapi.ResourceRequest, ) (map[string]*pluginapi.ListOfTopologyHints, error) { @@ -211,7 +211,7 @@ func (p *DynamicPolicy) calculateHints(reqInt int, } sort.Ints(numaNodes) - minNUMAsCountNeeded, _, err := util.GetNUMANodesCountToFitCPUReq(reqInt, p.machineInfo.CPUTopology) + minNUMAsCountNeeded, _, err := util.GetNUMANodesCountToFitCPUReq(request, p.machineInfo.CPUTopology) if err != nil { return nil, fmt.Errorf("GetNUMANodesCountToFitCPUReq failed with error: %v", err) } @@ -248,8 +248,8 @@ func (p *DynamicPolicy) calculateHints(reqInt int, } } - general.Infof("calculate hints with req: %d, numaToAvailableCPUCount: %+v", - reqInt, numaToAvailableCPUCount) + general.Infof("calculate hints with req: %.3f, numaToAvailableCPUCount: %+v", + request, numaToAvailableCPUCount) numaBound := len(numaNodes) if numaBound > machine.LargeNUMAsPoint { @@ -277,7 +277,7 @@ func (p *DynamicPolicy) calculateHints(reqInt int, allAvailableCPUsCountInMask += numaToAvailableCPUCount[nodeID] } - if allAvailableCPUsCountInMask < reqInt { + if float64(allAvailableCPUsCountInMask) < request { return } @@ -321,7 +321,7 @@ func (p *DynamicPolicy) calculateHints(reqInt int, _ = p.emitter.StoreInt64(util.MetricNameGetNUMAAllocatedMemBWFailed, 1, metrics.MetricTypeNameRaw) } else { p.updatePreferredCPUHintsByMemBW(preferredHintIndexes, availableNumaHints, - reqInt, numaAllocatedMemBW, req, numaExclusive) + request, numaAllocatedMemBW, req, numaExclusive) } } @@ -403,7 +403,7 @@ func getNUMAAllocatedMemBW(machineState state.NUMANodeMap, metaServer *metaserve return numaAllocatedMemBW, nil } -func (p *DynamicPolicy) updatePreferredCPUHintsByMemBW(preferredHintIndexes []int, cpuHints []*pluginapi.TopologyHint, reqInt int, +func (p *DynamicPolicy) updatePreferredCPUHintsByMemBW(preferredHintIndexes []int, cpuHints []*pluginapi.TopologyHint, request float64, numaAllocatedMemBW map[int]int, req *pluginapi.ResourceRequest, numaExclusive bool, ) { if len(preferredHintIndexes) == 0 { @@ -421,7 +421,7 @@ func (p *DynamicPolicy) updatePreferredCPUHintsByMemBW(preferredHintIndexes []in Name: req.PodName, Labels: req.Labels, Annotations: req.Annotations, - }, reqInt) + }, int(math.Ceil(request))) if err != nil { general.Errorf("GetContainerMemoryBandwidthRequest failed with error: %v", err) return @@ -591,7 +591,7 @@ func (p *DynamicPolicy) sharedCoresWithNUMABindingHintHandler(_ context.Context, } // calc the hints with the pod aggregated request - reqInt, _, err := util.GetPodAggregatedRequestResource(req) + reqInt, request, err := util.GetPodAggregatedRequestResource(req) if err != nil { return nil, fmt.Errorf("getReqQuantityFromResourceReq failed with error: %v", err) } @@ -632,11 +632,11 @@ func (p *DynamicPolicy) sharedCoresWithNUMABindingHintHandler(_ context.Context, return nil, fmt.Errorf("snb port not support cross numa") } nodeID := numaset.ToSliceInt()[0] - availableCPUQuantity := machineState[nodeID].GetAvailableCPUQuantity(p.reservedCPUs) + _, availableCPUQuantity := machineState[nodeID].GetAvailableCPUQuantity(p.reservedCPUs) - general.Infof("pod: %s/%s, container: %s request cpu inplace update resize on numa %d (available: %d, request: %d)", - req.PodNamespace, req.PodName, req.ContainerName, nodeID, availableCPUQuantity, reqInt) - if reqInt > availableCPUQuantity { // no left resource to scale out + general.Infof("pod: %s/%s, container: %s request cpu inplace update resize on numa %d (available: %.3f, request: %.3f)", + req.PodNamespace, req.PodName, req.ContainerName, nodeID, availableCPUQuantity, request) + if request > availableCPUQuantity { // no left resource to scale out general.Infof("pod: %s/%s, container: %s request cpu inplace update resize, but no enough resource for it in current NUMA, checking migratable", req.PodNamespace, req.PodName, req.ContainerName) // TODO move this var to config @@ -645,7 +645,7 @@ func (p *DynamicPolicy) sharedCoresWithNUMABindingHintHandler(_ context.Context, general.Infof("pod: %s/%s, container: %s request inplace update resize and no enough resource in current NUMA, try to migrate it to new NUMA", req.PodNamespace, req.PodName, req.ContainerName) var calculateErr error - hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(reqInt, podEntries, machineState, req) + hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(request, podEntries, machineState, req) if calculateErr != nil { general.Errorf("pod: %s/%s, container: %s request inplace update resize and no enough resource in current NUMA, failed to migrate it to new NUMA", req.PodNamespace, req.PodName, req.ContainerName) @@ -662,7 +662,7 @@ func (p *DynamicPolicy) sharedCoresWithNUMABindingHintHandler(_ context.Context, } } else if hints == nil { var calculateErr error - hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(reqInt, podEntries, machineState, req) + hints, calculateErr = p.calculateHintsForNUMABindingSharedCores(request, podEntries, machineState, req) if calculateErr != nil { return nil, fmt.Errorf("calculateHintsForNUMABindingSharedCores failed with error: %v", calculateErr) } @@ -687,16 +687,16 @@ func (p *DynamicPolicy) clearContainerAndRegenerateMachineState(podEntries state } func (p *DynamicPolicy) populateHintsByPreferPolicy(numaNodes []int, preferPolicy string, - hints map[string]*pluginapi.ListOfTopologyHints, machineState state.NUMANodeMap, reqInt int, + hints map[string]*pluginapi.ListOfTopologyHints, machineState state.NUMANodeMap, request float64, ) { - preferIndexes, maxLeft, minLeft := []int{}, -1, math.MaxInt + preferIndexes, maxLeft, minLeft := []int{}, float64(-1), math.MaxFloat64 for _, nodeID := range numaNodes { - availableCPUQuantity := machineState[nodeID].GetAvailableCPUQuantity(p.reservedCPUs) + _, availableCPUQuantity := machineState[nodeID].GetAvailableCPUQuantity(p.reservedCPUs) - if availableCPUQuantity < reqInt { - general.Warningf("numa_binding shared_cores container skip NUMA: %d available: %d", - nodeID, availableCPUQuantity) + if availableCPUQuantity < request { + general.Warningf("numa_binding shared_cores container skip NUMA: %d available: %.3f request: %.3f", + nodeID, availableCPUQuantity, request) continue } @@ -704,9 +704,9 @@ func (p *DynamicPolicy) populateHintsByPreferPolicy(numaNodes []int, preferPolic Nodes: []uint64{uint64(nodeID)}, }) - curLeft := availableCPUQuantity - reqInt + curLeft := availableCPUQuantity - request - general.Infof("NUMA: %d, left cpu quantity: %d", nodeID, curLeft) + general.Infof("NUMA: %d, left cpu quantity: %.3f", nodeID, curLeft) if preferPolicy == cpuconsts.CPUNUMAHintPreferPolicyPacking { if curLeft < minLeft { @@ -732,14 +732,14 @@ func (p *DynamicPolicy) populateHintsByPreferPolicy(numaNodes []int, preferPolic } } -func (p *DynamicPolicy) filterNUMANodesByHintPreferLowThreshold(reqInt int, +func (p *DynamicPolicy) filterNUMANodesByHintPreferLowThreshold(request float64, machineState state.NUMANodeMap, numaNodes []int, ) ([]int, []int) { filteredNUMANodes := make([]int, 0, len(numaNodes)) filteredOutNUMANodes := make([]int, 0, len(numaNodes)) for _, nodeID := range numaNodes { - availableCPUQuantity := machineState[nodeID].GetAvailableCPUQuantity(p.reservedCPUs) + availableCPUQuantity, _ := machineState[nodeID].GetAvailableCPUQuantity(p.reservedCPUs) allocatableCPUQuantity := machineState[nodeID].GetFilteredDefaultCPUSet(nil, nil).Difference(p.reservedCPUs).Size() if allocatableCPUQuantity == 0 { @@ -749,7 +749,7 @@ func (p *DynamicPolicy) filterNUMANodesByHintPreferLowThreshold(reqInt int, availableRatio := float64(availableCPUQuantity) / float64(allocatableCPUQuantity) - general.Infof("NUMA: %d, availableCPUQuantity: %d, allocatableCPUQuantity: %d, availableRatio: %.2f, cpuNUMAHintPreferLowThreshold:%.2f", + general.Infof("NUMA: %d, availableCPUQuantity: %d, allocatableCPUQuantity: %d, availableRatio: %.3f, cpuNUMAHintPreferLowThreshold:%.3f", nodeID, availableCPUQuantity, allocatableCPUQuantity, availableRatio, p.cpuNUMAHintPreferLowThreshold) if availableRatio >= p.cpuNUMAHintPreferLowThreshold { @@ -790,7 +790,7 @@ func (p *DynamicPolicy) filterNUMANodesByNonBindingSharedRequestedQuantity(nonBi return filteredNUMANodes } -func (p *DynamicPolicy) calculateHintsForNUMABindingSharedCores(reqInt int, podEntries state.PodEntries, +func (p *DynamicPolicy) calculateHintsForNUMABindingSharedCores(request float64, podEntries state.PodEntries, machineState state.NUMANodeMap, req *pluginapi.ResourceRequest, ) (map[string]*pluginapi.ListOfTopologyHints, error) { @@ -809,7 +809,7 @@ func (p *DynamicPolicy) calculateHintsForNUMABindingSharedCores(reqInt int, podE }, } - minNUMAsCountNeeded, _, err := util.GetNUMANodesCountToFitCPUReq(reqInt, p.machineInfo.CPUTopology) + minNUMAsCountNeeded, _, err := util.GetNUMANodesCountToFitCPUReq(request, p.machineInfo.CPUTopology) if err != nil { return nil, fmt.Errorf("GetNUMANodesCountToFitCPUReq failed with error: %v", err) } @@ -822,21 +822,21 @@ func (p *DynamicPolicy) calculateHintsForNUMABindingSharedCores(reqInt int, podE switch p.cpuNUMAHintPreferPolicy { case cpuconsts.CPUNUMAHintPreferPolicyPacking, cpuconsts.CPUNUMAHintPreferPolicySpreading: general.Infof("apply %s policy on NUMAs: %+v", p.cpuNUMAHintPreferPolicy, numaNodes) - p.populateHintsByPreferPolicy(numaNodes, p.cpuNUMAHintPreferPolicy, hints, machineState, reqInt) + p.populateHintsByPreferPolicy(numaNodes, p.cpuNUMAHintPreferPolicy, hints, machineState, request) case cpuconsts.CPUNUMAHintPreferPolicyDynamicPacking: - filteredNUMANodes, filteredOutNUMANodes := p.filterNUMANodesByHintPreferLowThreshold(reqInt, machineState, numaNodes) + filteredNUMANodes, filteredOutNUMANodes := p.filterNUMANodesByHintPreferLowThreshold(request, machineState, numaNodes) if len(filteredNUMANodes) > 0 { general.Infof("dynamically apply packing policy on NUMAs: %+v", filteredNUMANodes) - p.populateHintsByPreferPolicy(filteredNUMANodes, cpuconsts.CPUNUMAHintPreferPolicyPacking, hints, machineState, reqInt) + p.populateHintsByPreferPolicy(filteredNUMANodes, cpuconsts.CPUNUMAHintPreferPolicyPacking, hints, machineState, request) p.populateNotPreferredHintsByAvailableNUMANodes(filteredOutNUMANodes, hints) } else { general.Infof("empty filteredNUMANodes, dynamically apply spreading policy on NUMAs: %+v", numaNodes) - p.populateHintsByPreferPolicy(numaNodes, cpuconsts.CPUNUMAHintPreferPolicySpreading, hints, machineState, reqInt) + p.populateHintsByPreferPolicy(numaNodes, cpuconsts.CPUNUMAHintPreferPolicySpreading, hints, machineState, request) } default: general.Infof("unknown policy: %s, apply default spreading policy on NUMAs: %+v", p.cpuNUMAHintPreferPolicy, numaNodes) - p.populateHintsByPreferPolicy(numaNodes, cpuconsts.CPUNUMAHintPreferPolicySpreading, hints, machineState, reqInt) + p.populateHintsByPreferPolicy(numaNodes, cpuconsts.CPUNUMAHintPreferPolicySpreading, hints, machineState, request) } // NOTE: because grpc is inability to distinguish between an empty array and nil, diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go index 8d255e2f4a..f2c7751d3a 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go @@ -5290,7 +5290,7 @@ func TestSNBAdmitWithSidecarReallocate(t *testing.T) { t.Parallel() as := require.New(t) - tmpDir, err := ioutil.TempDir("", "checkpoint-TestSNBAdmit") + tmpDir, err := ioutil.TempDir("", "checkpoint-TestSNBAdmitWithSidecarReallocate") as.Nil(err) defer func() { _ = os.RemoveAll(tmpDir) }() @@ -5409,3 +5409,88 @@ func TestSNBAdmitWithSidecarReallocate(t *testing.T) { sidecarAllocation = dynamicPolicy.state.GetAllocationInfo(podUID, sidecarName) as.NotNil(sidecarAllocation) } + +func TestSNBCpuRequestWithFloat(t *testing.T) { + t.Parallel() + as := require.New(t) + + tmpDir, err := ioutil.TempDir("", "checkpoint-TestSNBCpuRequestWithFloat") + as.Nil(err) + defer func() { _ = os.RemoveAll(tmpDir) }() + + cpuTopology, err := machine.GenerateDummyCPUTopology(12, 1, 1) + as.Nil(err) + + dynamicPolicy, err := getTestDynamicPolicyWithInitialization(cpuTopology, tmpDir) + as.Nil(err) + + dynamicPolicy.podAnnotationKeptKeys = []string{ + consts.PodAnnotationMemoryEnhancementNumaBinding, + consts.PodAnnotationInplaceUpdateResizingKey, + consts.PodAnnotationAggregatedRequestsKey, + } + + testName := "test" + podUID := string(uuid.NewUUID()) + + req := &pluginapi.ResourceRequest{ + PodUid: podUID, + PodNamespace: testName, + PodName: testName, + ContainerName: "test1", + ContainerType: pluginapi.ContainerType_MAIN, + ContainerIndex: 0, + ResourceName: string(v1.ResourceCPU), + ResourceRequests: map[string]float64{ + string(v1.ResourceCPU): 9.5, + }, + Labels: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores, + }, + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores, + consts.PodAnnotationMemoryEnhancementKey: `{"numa_binding": "true", "numa_exclusive": "false"}`, + consts.PodAnnotationAggregatedRequestsKey: `{"cpu": 9.5}`, + }, + } + + res, err := dynamicPolicy.GetTopologyHints(context.Background(), req) + as.Nil(err) + hints := res.ResourceHints[string(v1.ResourceCPU)].Hints + as.NotZero(len(hints)) + req.Hint = hints[0] + + _, err = dynamicPolicy.Allocate(context.Background(), req) + as.Nil(err) + + // admit another pod with 0.5c + req2 := &pluginapi.ResourceRequest{ + PodUid: podUID, + PodNamespace: testName, + PodName: testName, + ContainerName: "test2", + ContainerType: pluginapi.ContainerType_MAIN, + ContainerIndex: 0, + ResourceName: string(v1.ResourceCPU), + ResourceRequests: map[string]float64{ + string(v1.ResourceCPU): 0.5, + }, + Labels: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores, + }, + Annotations: map[string]string{ + consts.PodAnnotationQoSLevelKey: consts.PodAnnotationQoSLevelSharedCores, + consts.PodAnnotationMemoryEnhancementKey: `{"numa_binding": "true", "numa_exclusive": "false"}`, + consts.PodAnnotationAggregatedRequestsKey: `{"cpu": 0.5}`, + }, + } + + res2, err := dynamicPolicy.GetTopologyHints(context.Background(), req2) + as.Nil(err) + hints2 := res2.ResourceHints[string(v1.ResourceCPU)].Hints + as.NotZero(len(hints2)) + req2.Hint = hints2[0] + + _, err = dynamicPolicy.Allocate(context.Background(), req2) + as.Nil(err) +} diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state.go index 28ee7e5fa3..319af1b9f2 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state.go @@ -562,9 +562,9 @@ func (ns *NUMANodeState) GetAvailableCPUSet(reservedCPUs machine.CPUSet) machine // It's used when allocating CPUs for shared_cores with numa_binding containers, // since pool size may be adjusted, and DefaultCPUSet & AllocatedCPUSet are calculated by pool size, // we should use allocationInfo.RequestQuantity to calculate available cpu quantity for candidate shared_cores with numa_binding container. -func (ns *NUMANodeState) GetAvailableCPUQuantity(reservedCPUs machine.CPUSet) int { +func (ns *NUMANodeState) GetAvailableCPUQuantity(reservedCPUs machine.CPUSet) (int, float64) { if ns == nil { - return 0 + return 0, 0 } allocatableQuantity := ns.GetFilteredDefaultCPUSet(nil, nil).Difference(reservedCPUs).Size() @@ -599,7 +599,7 @@ func (ns *NUMANodeState) GetAvailableCPUQuantity(reservedCPUs machine.CPUSet) in } allocatedQuantity := int(math.Ceil(preciseAllocatedQuantity)) - return general.Max(allocatableQuantity-allocatedQuantity, 0) + return general.Max(allocatableQuantity-allocatedQuantity, 0), general.MaxFloat64(float64(allocatableQuantity)-preciseAllocatedQuantity, 0) } // GetFilteredDefaultCPUSet returns default cpuset in this numa, along with the filter functions diff --git a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_test.go b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_test.go index 95acb90ce9..57bc76e0dc 100644 --- a/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_test.go +++ b/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_test.go @@ -3271,5 +3271,6 @@ func TestGetAvailableCPUQuantity(t *testing.T) { }, }, } - require.Equal(t, 15, nodeState.GetAvailableCPUQuantity(machine.NewCPUSet())) + cpuQuantity, _ := nodeState.GetAvailableCPUQuantity(machine.NewCPUSet()) + require.Equal(t, 15, cpuQuantity) } diff --git a/pkg/agent/qrm-plugins/util/util.go b/pkg/agent/qrm-plugins/util/util.go index d7bd1348e0..fb6552ac86 100644 --- a/pkg/agent/qrm-plugins/util/util.go +++ b/pkg/agent/qrm-plugins/util/util.go @@ -195,7 +195,7 @@ func PackResourceHintsResponse(req *pluginapi.ResourceRequest, resourceName stri // GetNUMANodesCountToFitCPUReq is used to calculate the amount of numa nodes // we need if we try to allocate cpu cores among them, assuming that all numa nodes // contain the same cpu capacity -func GetNUMANodesCountToFitCPUReq(cpuReq int, cpuTopology *machine.CPUTopology) (int, int, error) { +func GetNUMANodesCountToFitCPUReq(cpuReq float64, cpuTopology *machine.CPUTopology) (int, int, error) { if cpuTopology == nil { return 0, 0, fmt.Errorf("GetNumaNodesToFitCPUReq got nil cpuTopology") } @@ -210,14 +210,14 @@ func GetNUMANodesCountToFitCPUReq(cpuReq int, cpuTopology *machine.CPUTopology) } cpusPerNUMA := cpuTopology.NumCPUs / numaCount - numaCountNeeded := int(math.Ceil(float64(cpuReq) / float64(cpusPerNUMA))) + numaCountNeeded := int(math.Ceil(cpuReq / float64(cpusPerNUMA))) if numaCountNeeded == 0 { return 0, 0, fmt.Errorf("zero numaCountNeeded") } else if numaCountNeeded > numaCount { - return 0, 0, fmt.Errorf("invalid cpu req: %d in topology with NUMAs count: %d and CPUs count: %d", cpuReq, numaCount, cpuTopology.NumCPUs) + return 0, 0, fmt.Errorf("invalid cpu req: %.3f in topology with NUMAs count: %d and CPUs count: %d", cpuReq, numaCount, cpuTopology.NumCPUs) } - cpusCountNeededPerNUMA := int(math.Ceil(float64(cpuReq) / float64(numaCountNeeded))) + cpusCountNeededPerNUMA := int(math.Ceil(cpuReq / float64(numaCountNeeded))) return numaCountNeeded, cpusCountNeededPerNUMA, nil }