Skip to content

Commit

Permalink
feat(qrm): allocate overlapping reclaim cpus reversely
Browse files Browse the repository at this point in the history
  • Loading branch information
csfldf committed Jul 2, 2024
1 parent 7629e6a commit 8025608
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,24 @@ func (a *cpuAccumulator) freeCPUsInNUMANode(numaID int) []int {
return a.cpuDetails.CPUsInNUMANodes(numaID).ToSliceInt()
}

// freeCoresInNUMANode returns free core ids in specified NUMA
// freeCPUsInNUMANodeReversely returns free cpu ids in specified NUMA,
// and those returned cpu slices have already been sorted reversely
func (a *cpuAccumulator) freeCPUsInNUMANodeReversely(numaID int) []int {
return a.cpuDetails.CPUsInNUMANodes(numaID).ToSliceIntReversely()
}

// freeCoresInNUMANode returns free core ids in specified NUMA,
// and those returned cpu slices have already been sorted
func (a *cpuAccumulator) freeCoresInNUMANode(numaID int) []int {
return a.cpuDetails.CoresInNUMANodes(numaID).Filter(a.isCoreFree).ToSliceInt()
}

// freeCoresInNUMANode returns free core ids in specified NUMA,
// and those returned cpu slices have already been sorted reversely
func (a *cpuAccumulator) freeCoresInNUMANodeReversely(numaID int) []int {
return a.cpuDetails.CoresInNUMANodes(numaID).Filter(a.isCoreFree).ToSliceIntReversely()
}

// isSocketFree returns true if the supplied socket is fully available
func (a *cpuAccumulator) isSocketFree(socketID int) bool {
return a.cpuDetails.CPUsInSockets(socketID).Size() == a.getTopology().CPUsPerSocket()
Expand Down Expand Up @@ -301,7 +314,7 @@ successful:
}

// TakeHTByNUMABalance tries to make the allocated cpu spread on different
// sockets, and it uses cpu HT as the basic allocation unit
// NUMAs, and it uses cpu HT as the basic allocation unit
func TakeHTByNUMABalance(info *machine.KatalystMachineInfo, availableCPUs machine.CPUSet,
cpuRequirement int,
) (machine.CPUSet, machine.CPUSet, error) {
Expand Down Expand Up @@ -338,3 +351,55 @@ failed:
successful:
return acc.result.Clone(), availableCPUs.Difference(acc.result), nil
}

// TakeByNUMABalanceReversely tries to make the allocated cpu resersely spread on different
// NUMAs, and it uses cpu Cores as the basic allocation unit
func TakeByNUMABalanceReversely(info *machine.KatalystMachineInfo, availableCPUs machine.CPUSet,
cpuRequirement int,
) (machine.CPUSet, machine.CPUSet, error) {
var err error
acc := newCPUAccumulator(info, availableCPUs, cpuRequirement)
if acc.isSatisfied() {
goto successful
}

for {
if acc.isFailed() {
err = fmt.Errorf("not enough cpus available to satisfy request")
goto failed
}

numaLoop:
for _, s := range info.CPUDetails.NUMANodes().ToSliceInt() {
if acc.needs(acc.getTopology().CPUsPerCore()) && len(acc.freeCores()) > 0 {
for _, c := range acc.freeCoresInNUMANodeReversely(s) {
acc.take(acc.getDetails().CPUsInCores(c))
if acc.isSatisfied() {
goto successful
} else {
continue numaLoop
}
}
continue
}

for _, c := range acc.freeCPUsInNUMANodeReversely(s) {
if acc.needs(1) {
acc.take(machine.NewCPUSet(c))
}
if acc.isSatisfied() {
goto successful
} else {
break
}
}
}
}
failed:
if err == nil {
err = errors.New("failed to allocate cpus")
}
return availableCPUs, availableCPUs, err
successful:
return acc.result.Clone(), availableCPUs.Difference(acc.result), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -1241,8 +1241,10 @@ func (p *DynamicPolicy) generatePoolsAndIsolation(poolsQuantityMap map[string]ma

req := int(math.Ceil(float64(cset.Size()) * ratio))

// if p.state.GetAllowSharedCoresOverlapReclaimedCores() == false, we will take cpus for reclaim pool lastly,
// else we also should take cpus for reclaim pool reversely overlapping with share type pool to aviod cpuset jumping obviously
var tErr error
overlapCPUs, _, tErr := calculator.TakeByNUMABalance(p.machineInfo, cset, req)
overlapCPUs, _, tErr := calculator.TakeByNUMABalanceReversely(p.machineInfo, cset, req)
if tErr != nil {
err = fmt.Errorf("take overlapCPUs from: %s to %s by ratio: %.4f failed with err: %v",
poolName, state.PoolNameReclaim, ratio, tErr)
Expand Down
23 changes: 22 additions & 1 deletion pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2809,6 +2809,23 @@ func TestGetResourcesAllocation(t *testing.T) {
}

dynamicPolicy.state.SetAllowSharedCoresOverlapReclaimedCores(true)
dynamicPolicy.dynamicConfig.GetDynamicConfiguration().EnableReclaim = true
dynamicPolicy.state.SetAllocationInfo(state.PoolNameReclaim, "", &state.AllocationInfo{
PodUid: state.PoolNameReclaim,
OwnerPoolName: state.PoolNameReclaim,
AllocationResult: machine.MustParse("1,3,4-5"),
OriginalAllocationResult: machine.MustParse("1,3,4-5"),
TopologyAwareAssignments: map[int]machine.CPUSet{
0: machine.NewCPUSet(1),
1: machine.NewCPUSet(3),
2: machine.NewCPUSet(4, 5),
},
OriginalTopologyAwareAssignments: map[int]machine.CPUSet{
0: machine.NewCPUSet(1),
1: machine.NewCPUSet(3),
2: machine.NewCPUSet(4, 5),
},
})
_, err = dynamicPolicy.Allocate(context.Background(), req)
as.Nil(err)

Expand All @@ -2833,8 +2850,12 @@ func TestGetResourcesAllocation(t *testing.T) {
IsNodeResource: false,
IsScalarResource: true,
AllocatedQuantity: 14,
AllocationResult: machine.NewCPUSet(1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 15).String(),
AllocationResult: machine.NewCPUSet(1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15).String(),
}, resp3.PodResources[req.PodUid].ContainerResources[testName].ResourceAllocation[string(v1.ResourceCPU)])

reclaimEntry := dynamicPolicy.state.GetAllocationInfo(state.PoolNameReclaim, "")
as.NotNil(reclaimEntry)
as.Equal(6, reclaimEntry.AllocationResult.Size()) // ceil("14 * (4 / 10)") == 6
}

func TestAllocateByQoSAwareServerListAndWatchResp(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions pkg/util/machine/cpuset.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,17 @@ func (s CPUSet) ToSliceInt() []int {
return result
}

// ToSliceInt returns an resersely ordered slice of int that contains
// all elements from this set
func (s CPUSet) ToSliceIntReversely() []int {
result := []int{}
for cpu := range s.elems {
result = append(result, cpu)
}
sort.Sort(sort.Reverse(sort.IntSlice(result)))
return result
}

// ToSliceInt64 returns an ordered slice of int64 that contains
// all elements from this set
func (s CPUSet) ToSliceInt64() []int64 {
Expand Down

0 comments on commit 8025608

Please sign in to comment.